Abusing Amazon’s Elastic MapReduce Hadoop service… easily, from R

I built my first Hadoop cluster this week and ran my first two test MapReduce jobs. It took about 15 minutes, 2 lines of R, and cost 55 cents. And you can too with JD Long’s (very, very experimental) ‘segue’ package.

But first, you may be wondering why I use the word “abusing” in this post’s title. Well, the Apache Hadoop project, and Google’s MapReduce processing system which inspired it, is all about Big Data. Its raison d’être is the distributed processing of large data sets. Huge data sets, actually. Huge like all the web logs from Yahoo! and Facebook huge. Its HDFS file system is designed for streaming reads of large, unchanging data files; its default block size is 64MB, in case that resonates with your inner geek. HDFS expects its files to be so big that it even makes replication decisions based on its knowledge of your network topology.

I use the term “abuse” because, well, we’re just not going to use any of that Big Data stuff. Instead, we’re going to take advantage of Hadoop’s core machinery to parcel out some embarrassingly parallel, computationally-intensive work, collect the results, and send them back to us. And to keep everything in the cloud and capex-free, we’ll do it all on a cluster of Amazon EC2 instances marshalled and managed by Amazon’s Elastic MapReduce service.

Could the same thing be done with MPI, PVM, SNOW, or any number of other parallel processing frameworks? Certainly. But with only a couple of lines of R? Probably not.

Start the cluster

> library(segue)
Loading required package: rJava
Loading required package: caTools
Loading required package: bitops
Segue did not find your AWS credentials. Please run the setCredentials() function.

> setCredentials('YOUR_ACCESS_KEY_ID', 'YOUR_SECRET_ACCESS_KEY')

> myCluster <- createCluster(numInstances=5)
STARTING - 2011-01-04 15:07:53
STARTING - 2011-01-04 15:08:24
STARTING - 2011-01-04 15:08:54
STARTING - 2011-01-04 15:09:25
STARTING - 2011-01-04 15:09:56
STARTING - 2011-01-04 15:10:27
STARTING - 2011-01-04 15:10:58
BOOTSTRAPPING - 2011-01-04 15:11:28
BOOTSTRAPPING - 2011-01-04 15:11:59
BOOTSTRAPPING - 2011-01-04 15:12:30
BOOTSTRAPPING - 2011-01-04 15:13:01
BOOTSTRAPPING - 2011-01-04 15:13:32
BOOTSTRAPPING - 2011-01-04 15:14:03
BOOTSTRAPPING - 2011-01-04 15:14:34
BOOTSTRAPPING - 2011-01-04 15:15:04
WAITING - 2011-01-04 15:15:35
Your Amazon EMR Hadoop Cluster is ready for action.
Remember to terminate your cluster with stopCluster().
Amazon is billing you!

The createCluster() function provisions the specified number of nodes from EC2, establishes a security zone so they can communicate, boots them, and, in its bootstrap phase, upgrades the version of R on each node and loads some helper functions. You can also distribute your own code and (small) data files to each node during the bootstrap phase. In any case, after a few minutes, the cluster is WAITING and the taxi meter is running… so now what?

Try it out

Let’s make sure everything is working as expected by running the example from JD’s December announcement of his project on the R-sig-hpc mailing list:

> # first, let's generate a 10-element list of 999 random numbers + 1 NA:

myList <- NULL
set.seed(1)
for (i in 1:10){
   a <- c(rnorm(999), NA) 
   myList[[i]] <- a
   }

> # since this is a toy test case, we can run it locally to compare:
> outputLocal  <- lapply(myList, mean, na.rm=T)

> # now run it on the cluster
> outputEmr   <- emrlapply(myCluster, myList, mean,  na.rm=T)
RUNNING - 2011-01-04 15:16:57
RUNNING - 2011-01-04 15:17:27
RUNNING - 2011-01-04 15:17:58
WAITING - 2011-01-04 15:18:29

> all.equal(outputEmr, outputLocal)
[1] TRUE

The key is the emrlapply() function. It works just like lapply(), but automagically spreads its work across the specified cluster. It just doesn’t get any cooler—or simpler—than that.

Estimate pi stochastically

I first stumbled across JD’s R+MapReduce work in this video of his presentation to the Chicago area Hadoop User Group. As a demonstration, he estimates the value of pi stochastically, by throwing dots at random at a unit circle inscribed within a unit square. On average, the proportion of dots falling inside the circle should be related to its area compared to that of the square. And if you remember anything from what passed as math education in your younger years, you may recall that pi is somehow involved. Fortunately for us, JD has posted his code on github so we can put down our #2 pencils and cut-and-paste instead:

> estimatePi <- function(seed){
   set.seed(seed)
   numDraws <- 1e6

   r <- .5 #radius... in case the unit circle is too boring
   x <- runif(numDraws, min=-r, max=r)
   y <- runif(numDraws, min=-r, max=r)
   inCircle <- ifelse( (x^2 + y^2)^.5 < r , 1, 0)

   return(sum(inCircle) / length(inCircle) * 4)
 }

> seedList <- as.list(1:1e3)

> myEstimates <- emrlapply( myCluster, seedList, estimatePi )
RUNNING - 2011-01-04 15:22:28
RUNNING - 2011-01-04 15:22:59
RUNNING - 2011-01-04 15:23:30
RUNNING - 2011-01-04 15:24:01
RUNNING - 2011-01-04 15:24:32
RUNNING - 2011-01-04 15:25:02
RUNNING - 2011-01-04 15:25:34
RUNNING - 2011-01-04 15:26:04
RUNNING - 2011-01-04 15:26:39
RUNNING - 2011-01-04 15:27:10
RUNNING - 2011-01-04 15:27:41
RUNNING - 2011-01-04 15:28:11
RUNNING - 2011-01-04 15:28:42
RUNNING - 2011-01-04 15:29:13
RUNNING - 2011-01-04 15:29:44
RUNNING - 2011-01-04 15:30:14
RUNNING - 2011-01-04 15:30:45
RUNNING - 2011-01-04 15:31:16
RUNNING - 2011-01-04 15:31:47
WAITING - 2011-01-04 15:32:18

> stopCluster(myCluster)
> head(myEstimates)
[[1]]
[1] 3.142512

[[2]]
[1] 3.140052

[[3]]
[1] 3.138796

[[4]]
[1] 3.145028

[[5]]
[1] 3.14204

[[6]]
[1] 3.142136

> # Reduce() is R's Reduce() -- look it up! -- and not related to the cluster:
> myPi <- Reduce(sum, myEstimates) / length(myEstimates)

> format(myPi, digits=10)
[1] "3.141586544"

> format(pi, digits=10)
[1] "3.141592654"

So, a thousand simulations of a million throws each takes about 10 minutes on a 5-node cluster and gets us five decimal places. Not bad.

How does this example relate to MapReduce?

First of all, I am not MapReduce expert, but here’s what I understand based on JD’s talk and my skimming of Hadoop: The Definitive Guide (highly recommended and each purchase goes towards my beer^H^H^H^Helastic computing budget):

  1. Instead of a terabyte or so of log files, we feed Hadoop a list of the numbers 1-1000. It dutifully doles each one to a “mapper” process running our estimatePi() function.
  2. Each invocation of our function uses this input as the seed for its random number generator. (It sure would be embarrassing to have all 1,000 simulations generate exactly the same results!)
  3. The output of the mappers is collected by Hadoop and normally sent on for reducing, but segue’s reduce step just concatenates all of the results so they can be sent back to our local instance as an R list.

All communication between Hadoop and the R code on the cluster is peformed using Hadoop Streaming which allows map and reduce functions to be written in nearly any language which knows the difference between stdin and stdout.

Conclusion and alternatives

If you do your modeling in R and are looking for an easy way to spread around some CPU-intensive work, segue may be right up your alley. But if you’re looking to use Hadoop the right way—The Big Data Way—segue’s not for you. Instead, check out Saptarshi Guha’s RHIPE, the R and Hadoop Integrated Processing Environment.

If you’re just looking to run R on an EC2 node, you can start with this old post by Robert Grossman.

If you’re in Facebook’s data infrastructure engineering team, or are otherwise hooked on Hive, I bet you could use the RJDBC package and the HiveDriver JDBC driver, but I understand that most people just pass CSV files back and forth. The more things change….

But if you think all of this is unnatural and makes you want to take a shower, perhaps I can direct you to CRAN’s High-Performance and Parallel Computing with R task view for more traditional parallel processing options.

14 Responses to “Abusing Amazon’s Elastic MapReduce Hadoop service… easily, from R”

  1. Shivani Rao Says:

    Hello,

    Have a question. I was unable to install seque package from R’s install.package() command. Is there any other way to acquire this?

    • Jeffrey Breen Says:

      IIRC, JD had taken the download package offline while he cleaned up some code. It’s back as a source download at http://code.google.com/p/segue/downloads/list.

      You’ll need to download it and install as a source package from R — and I think it’s still limited to Linux and Mac OS, but I haven’t tried Windows.

      From the shell, it can installed like this

      $ R CMD install segue_0.02.tar.gz

  2. Cloud based Econometrics and Statistics Software : Core Economics Says:

    […] aggregated back elegantly. Here is a good example of using R with MapReduce by Stephen Barr, and another by Jeffrey Breen. I will be looking more into using more of this in my […]

  3. Ernesto Says:

    Jeffrey:
    Fantastic post. I’m looking forward to replicate your examples and work on mine.

    I have a small problem though. I’m a Windows creature. I’m using Ubuntu for the first time. When I try to install the library as you instructed ….:

    #R CMD install /home/ubuntu/segue_0.02.tar.gz

    .. I get:

    install: missing destination file operand after `/home/ubuntu/segue_0.02.tar.gz’

    I tried to add the destination directory. Then it works but it only copies the tar file to the lib directory.

    I tried to unzip the files contained in the tar file to the directory that contains the lib directoyr. It successfully creates the segue directory but when I invoke segue from R I get : invalid package.

    Boy I miss windows 🙂 and its “install from local zip files”

    Thanks in advance for any help.

    • Jeffrey Breen Says:

      Hi Ernesto:

      Thanks for the kind words. It’s the hard work of generous community members like JD which makes it easy for the rest of us to do amazingly cool things very easily.

      I am a little confused by that error — it looks as though it’s coming from Ubuntu’s install command, not from R.

      If you had mis-typed the package name, I would expect R to complain with something like this:

      R CMD install /this/file/is/not/here/ohwell.tar.gz
      Warning: invalid package ‘/this/file/is/not/here/ohwell.tar.gz’
      Error: ERROR: no packages specified
      

      Perhaps R on Ubuntu uses `install` to copy the files as part of installation process — dunno.

      I haven’t checked out the Linux GUI options recently, but I know RStudio should work — and it has a menu option to install from local source packages.

      Good luck!
      Jeffrey

      • Jeffrey Breen Says:

        Oops — looks like I spoke too soon. RStudio’s menu option doesn’t seem to handle source packages (yet…).

        Try this instead from R:

        > install.packages("~/Downloads/segue_0.02.tar.gz", repos=NULL, type="source")
        

        (substituting your file location for ~/Downloads/)

        Hopefully you will see something like this:

        * installing *source* package ‘segue’ ...
        ** R
        ** inst
        ** preparing package for lazy loading
        Loading required package: bitops
        ** help
        *** installing help indices
        ** building package indices ...
        ** testing if installed package can be loaded
        
        * DONE (segue)
        

        Hope that works!
        Jeffrey

  4. dataminingincae Says:

    Jeffrey:
    Thank you very much for taking the time to answer a rookie’s question. I used your suggestion and it is now working as expected.
    Yeah!

    I found another post which your readers might find useful. It’s http://www.econsteve.com/r/barr-parallelPresoFeb2011.pdf.

    Thanks again,

    Ernesto

  5. Segue: Easy cloud computing in R, now with custom packages – Carl Boettiger Says:

    […] of computers on the Amazon cloud. For a basic introduction to the package see Jeff Breen’s post.Quick notes on updating using mercurial: Since I’ve already pulled the code using hg clone […]

  6. Cloud based Econometrics and Statistics Software « kwanghui.com Says:

    […] aggregated back elegantly. Here is a good example of using R with MapReduce by Stephen Barr, and another by Jeffrey Breen. I will be looking more into using more of this in my […]

  7. Arun Says:

    I got this error when i tried initializing a cluster

    myclus<-createCluster(numInstances=5)
    Error in .jcall("RJavaTools", "Ljava/lang/Object;", "invokeMethod", cl, :
    com.amazonaws.AmazonClientException: Can't turn bucket name into a URI: Illegal character in authority at index 8: https://c:\users\arun\appdata\local\temp\rtmp8a6hgi7pp0pikbwz-segue.s3.amazonaws.com

    i'm pretty sure about the access keys.

    Any help is appreciated.

    AK

  8. Arun Says:

    Works like magic..as usual, i missed the “BIG” message in Segue – NOT FOR WINDOWS.

    Jeff, could you help me with bootstrapping some packages on the nodes? For instance, i’m trying to use MSBVAR on clusters for forecasting and am not sure how R gets updated in the nodes. So, any package i have on R will get replicated on each of the child nodes?

    Thanks,
    Ak

  9. Paolo Says:

    Hi, one question: I accidentally closed R before closing the cluster, what happens in terms of billing? On Amazon console I couldn’t find any running task, can I consider the task closed? Thanks…

  10. Vedant Jain Says:

    Hi Jeff,

    Thanks so much for this post!!

    However, I am having problems in starting the cluster itself. The Cluster starts however, automatically shuts down in the R console, even though they are running on AWS EC2 console just fine.

    If I ignore the shutting down, and type the following code:
    myList <- NULL
    set.seed(1)
    for (i in 1:10){
    a <- c(rnorm(999), NA)
    myList[[i]] <- a
    }
    outputEmr <- emrlapply(myCluster, myList, mean, na.rm=T)

    I get this error message:
    Error in .jcall("RJavaTools", "Ljava/lang/Object;", "invokeMethod", cl, : Status Code: 404, AWS Service: Amazon S3, AWS Request ID: 796AA0C8C6AC6E3D, AWS Error Code: NoSuchBucket, AWS Error Message: The specified bucket does not exist, S3 Extended Request ID: vAUKj+CeV7En1bkqL7EymtcMcPHrXjyg47ucTTCyI1lFia1vKQpKhirZ48SoE74Z

    Any help would be appreciated!

    –Vedant


Leave a comment