Abusing Amazon’s Elastic MapReduce Hadoop service… easily, from R

I built my first Hadoop cluster this week and ran my first two test MapReduce jobs. It took about 15 minutes, 2 lines of R, and cost 55 cents. And you can too with JD Long’s (very, very experimental) ‘segue’ package.

But first, you may be wondering why I use the word “abusing” in this post’s title. Well, the Apache Hadoop project, and Google’s MapReduce processing system which inspired it, is all about Big Data. Its raison d’ĂȘtre is the distributed processing of large data sets. Huge data sets, actually. Huge like all the web logs from Yahoo! and Facebook huge. Its HDFS file system is designed for streaming reads of large, unchanging data files; its default block size is 64MB, in case that resonates with your inner geek. HDFS expects its files to be so big that it even makes replication decisions based on its knowledge of your network topology.

I use the term “abuse” because, well, we’re just not going to use any of that Big Data stuff. Instead, we’re going to take advantage of Hadoop’s core machinery to parcel out some embarrassingly parallel, computationally-intensive work, collect the results, and send them back to us. And to keep everything in the cloud and capex-free, we’ll do it all on a cluster of Amazon EC2 instances marshalled and managed by Amazon’s Elastic MapReduce service.

Could the same thing be done with MPI, PVM, SNOW, or any number of other parallel processing frameworks? Certainly. But with only a couple of lines of R? Probably not.

Start the cluster

> library(segue)
Loading required package: rJava
Loading required package: caTools
Loading required package: bitops
Segue did not find your AWS credentials. Please run the setCredentials() function.

> setCredentials('YOUR_ACCESS_KEY_ID', 'YOUR_SECRET_ACCESS_KEY')

> myCluster <- createCluster(numInstances=5)
STARTING - 2011-01-04 15:07:53
STARTING - 2011-01-04 15:08:24
STARTING - 2011-01-04 15:08:54
STARTING - 2011-01-04 15:09:25
STARTING - 2011-01-04 15:09:56
STARTING - 2011-01-04 15:10:27
STARTING - 2011-01-04 15:10:58
BOOTSTRAPPING - 2011-01-04 15:11:28
BOOTSTRAPPING - 2011-01-04 15:11:59
BOOTSTRAPPING - 2011-01-04 15:12:30
BOOTSTRAPPING - 2011-01-04 15:13:01
BOOTSTRAPPING - 2011-01-04 15:13:32
BOOTSTRAPPING - 2011-01-04 15:14:03
BOOTSTRAPPING - 2011-01-04 15:14:34
BOOTSTRAPPING - 2011-01-04 15:15:04
WAITING - 2011-01-04 15:15:35
Your Amazon EMR Hadoop Cluster is ready for action.
Remember to terminate your cluster with stopCluster().
Amazon is billing you!

The createCluster() function provisions the specified number of nodes from EC2, establishes a security zone so they can communicate, boots them, and, in its bootstrap phase, upgrades the version of R on each node and loads some helper functions. You can also distribute your own code and (small) data files to each node during the bootstrap phase. In any case, after a few minutes, the cluster is WAITING and the taxi meter is running… so now what?

Try it out

Let’s make sure everything is working as expected by running the example from JD’s December announcement of his project on the R-sig-hpc mailing list:

> # first, let's generate a 10-element list of 999 random numbers + 1 NA:

myList <- NULL
set.seed(1)
for (i in 1:10){
   a <- c(rnorm(999), NA) 
   myList[[i]] <- a
   }

> # since this is a toy test case, we can run it locally to compare:
> outputLocal  <- lapply(myList, mean, na.rm=T)

> # now run it on the cluster
> outputEmr   <- emrlapply(myCluster, myList, mean,  na.rm=T)
RUNNING - 2011-01-04 15:16:57
RUNNING - 2011-01-04 15:17:27
RUNNING - 2011-01-04 15:17:58
WAITING - 2011-01-04 15:18:29

> all.equal(outputEmr, outputLocal)
[1] TRUE

The key is the emrlapply() function. It works just like lapply(), but automagically spreads its work across the specified cluster. It just doesn’t get any cooler—or simpler—than that.

Estimate pi stochastically

I first stumbled across JD’s R+MapReduce work in this video of his presentation to the Chicago area Hadoop User Group. As a demonstration, he estimates the value of pi stochastically, by throwing dots at random at a unit circle inscribed within a unit square. On average, the proportion of dots falling inside the circle should be related to its area compared to that of the square. And if you remember anything from what passed as math education in your younger years, you may recall that pi is somehow involved. Fortunately for us, JD has posted his code on github so we can put down our #2 pencils and cut-and-paste instead:

> estimatePi <- function(seed){
   set.seed(seed)
   numDraws <- 1e6

   r <- .5 #radius... in case the unit circle is too boring
   x <- runif(numDraws, min=-r, max=r)
   y <- runif(numDraws, min=-r, max=r)
   inCircle <- ifelse( (x^2 + y^2)^.5 < r , 1, 0)

   return(sum(inCircle) / length(inCircle) * 4)
 }

> seedList <- as.list(1:1e3)

> myEstimates <- emrlapply( myCluster, seedList, estimatePi )
RUNNING - 2011-01-04 15:22:28
RUNNING - 2011-01-04 15:22:59
RUNNING - 2011-01-04 15:23:30
RUNNING - 2011-01-04 15:24:01
RUNNING - 2011-01-04 15:24:32
RUNNING - 2011-01-04 15:25:02
RUNNING - 2011-01-04 15:25:34
RUNNING - 2011-01-04 15:26:04
RUNNING - 2011-01-04 15:26:39
RUNNING - 2011-01-04 15:27:10
RUNNING - 2011-01-04 15:27:41
RUNNING - 2011-01-04 15:28:11
RUNNING - 2011-01-04 15:28:42
RUNNING - 2011-01-04 15:29:13
RUNNING - 2011-01-04 15:29:44
RUNNING - 2011-01-04 15:30:14
RUNNING - 2011-01-04 15:30:45
RUNNING - 2011-01-04 15:31:16
RUNNING - 2011-01-04 15:31:47
WAITING - 2011-01-04 15:32:18

> stopCluster(myCluster)
> head(myEstimates)
[[1]]
[1] 3.142512

[[2]]
[1] 3.140052

[[3]]
[1] 3.138796

[[4]]
[1] 3.145028

[[5]]
[1] 3.14204

[[6]]
[1] 3.142136

> # Reduce() is R's Reduce() -- look it up! -- and not related to the cluster:
> myPi <- Reduce(sum, myEstimates) / length(myEstimates)

> format(myPi, digits=10)
[1] "3.141586544"

> format(pi, digits=10)
[1] "3.141592654"

So, a thousand simulations of a million throws each takes about 10 minutes on a 5-node cluster and gets us five decimal places. Not bad.

How does this example relate to MapReduce?

First of all, I am not MapReduce expert, but here’s what I understand based on JD’s talk and my skimming of Hadoop: The Definitive Guide (highly recommended and each purchase goes towards my beer^H^H^H^Helastic computing budget):

  1. Instead of a terabyte or so of log files, we feed Hadoop a list of the numbers 1-1000. It dutifully doles each one to a “mapper” process running our estimatePi() function.
  2. Each invocation of our function uses this input as the seed for its random number generator. (It sure would be embarrassing to have all 1,000 simulations generate exactly the same results!)
  3. The output of the mappers is collected by Hadoop and normally sent on for reducing, but segue’s reduce step just concatenates all of the results so they can be sent back to our local instance as an R list.

All communication between Hadoop and the R code on the cluster is peformed using Hadoop Streaming which allows map and reduce functions to be written in nearly any language which knows the difference between stdin and stdout.

Conclusion and alternatives

If you do your modeling in R and are looking for an easy way to spread around some CPU-intensive work, segue may be right up your alley. But if you’re looking to use Hadoop the right way—The Big Data Way—segue’s not for you. Instead, check out Saptarshi Guha’s RHIPE, the R and Hadoop Integrated Processing Environment.

If you’re just looking to run R on an EC2 node, you can start with this old post by Robert Grossman.

If you’re in Facebook’s data infrastructure engineering team, or are otherwise hooked on Hive, I bet you could use the RJDBC package and the HiveDriver JDBC driver, but I understand that most people just pass CSV files back and forth. The more things change….

But if you think all of this is unnatural and makes you want to take a shower, perhaps I can direct you to CRAN’s High-Performance and Parallel Computing with R task view for more traditional parallel processing options.

Follow

Get every new post delivered to your Inbox.

Join 59 other followers