Monday, 2 September 2013

Basic tutorial: Map/Reduce example with R & Hadoop, including Amazon Elastic MapReduce (EMR)

This is my write-up of Anette Bergo's very useful session for Women in Data in August 2013, but reordered and with some extra notes and screenshots of my own.

Anette showed exactly how this sort of thing should be done - basic foundation, enough code to demo the key principles without over-complicating things, talk through the code, run it!

Any errors are mine alone, if you spot any please let me know.

Resources

Pre-requisites

  • Download and install R - it's multi-platform so there are Linux, Mac and Windows versions
    • RStudio IDE helps provide a friendlier interface
  • (To clone Anette's example repo) Download and install Git
  • (For the EMR bit only) Sign up for an Amazon Web Services account.
    • If you have an Amazon account you can login with that, but you still need to sign up specifically for AWS.
  • (For EMR only, as it costs you money to run the demo) Sign up for Elastic MapReduce (circled in blue in the screenshot below, accessible via the AWS console http://console.aws.amazon.com - you'll need to enter credit card details and possibly go through a phone verification and wait for their confirmation email before you can use EMR.

What's the R programming language?

R is a DSL for statistical/mathematical analysis.

Everything is a vector in R (just as in Git everything is a directed graph).

What's MapReduce?

MapReduce is a programming framework for parallel distributed processing of large data sets. (Originally devised by Google engineers - MapReduce paper.)

Effectively, Hadoop is the open source version of Google's MapReduce (it also includes an open source version of Google File System and increasingly other components).

Amazon Web Services' Elastic MapReduce lets you set up and tear down Hadoop clusters (master and slaves). The demo uses R but EMR will accept eg Python, shell scripts, Ruby. You can deploy with the Boto library and Python scripts.

MapReduce involves: Input reader - map function - partition function - compare function - reduce function - output writer.

A map is a set of key/value pairs. The compare function usually sorts based on key/map. The reduce function collapses the map to the results. The output writer moves data to a meaningful easily-accessible single location (preventing data loss if the cluster powers down).

The master (ie the framework) organises execution, controlling one or more mapper nodes and one or more reduce nodes. The framework reads input (data file), and passes chunks to the mappers. Each mapper creates a map of the input. The framework sorts the map based on keys. It allocates a number of reducers to each mapper (the number can be specified). Reduce is called once per unique key (producing 0 or more outputs). Output is written to persistent storage.

Usually a mapper is more complex than in the demo, eg it may filter what's to be analysed etc. For less than 10 GB of data, you might run analyses on your own computer, for 10-100 GB your own servers, probably using MapReduce only for over 100GB pf data. It can process images, video files etc too - although the demo analyses words in a text file.

Canonical example of MapReduce: wordcount

Input - a series of different words eg: bla bla bla and so and.
Mapped - bla 1, bla 1, bla 1, and 1, so 1, and 1. (Ie maps 'bla' to value '1').
Reduced - and 2, bla 3, so 1.

Note: this assumes all input info is important, but often only part is, eg to check how often names are mentioned in a series of articles you wouldn't map everything.

The framework has readymade reducers for common map formats but you can write your own reducer.

Anette's example

Clone the demo repo at https://github.com/anettebgo/wid.demo.git (see bottom right hand side - there are buttons to clone in desktop or get the clone URL; the command is git clone <url>).

Ensure everything's executable as necessary.

The input file is data.txt, the mapper is mapper.R and the reducer is reducer.R.

A shell script run.sh will demo the map/reduce locally - it reads data.txt to the mapper, sorts the output and puts the output into the reducer.

Going through the code (RStudio helps):

mapper.R - see last function in the code: it reads input from stdin. hsLineReader takes and reads chunks up to 3 lines, doesn't skip anything (eg headers), then applies emit function to each chunk read. The emit function (top of code) transforms the output (1-3 lines) to a uniform processable stream, turns chunks into words (strsplit). sapply applies an anonymous inner function to each word. (paste is used for string concatenate.) The sorted results go to the reducer.

reducer.R - the final function reads from stdin and runs the reduce function on the input. This creates an array of names - vector of columns. (The chunksize can be tweaked to make it more performant depending on the calculation to be run; the default separator is tab, here it's been set to a space.) Then the process function is applied to it (written as a separate function for clarity, but it could be an anonymous inner function). This function takes each piece of map and aggregates by words using an anonymous inner function producing sums.

Running locally

Run run.sh - this emulates what the framework does.

NB must install further packages, HadoopStreaming and getopt:

>R…
>install.packages(HadoopStreaming)
>install.packages(getopt)

(If that doesn't work, install them from the R_packages folder: R cmd install packagename.tar.gz).

Running on Amazon Web Services

NB this isn't part of Amazon's free tier, so running these demos will cost you - not very much, probably less than a quid?

Go to AWS console http://console.aws.amazon.com

Create a new S3 bucket (click S3 - towards the bottom left at the moment, under 'Storage and Content Delivery'; click Create bucket; give it a unique name. NB the name must be unique for all buckets on AWS, not just for you!).

image

Edit the bootstrapR.sh script at the line
BOOTSTRAP_BUCKET_NAME='<your-bucket-name>'
to replace it with your new bucket's name. (The code is self-explanatory, see the comments)

Open the bucket by clicking on it, rightclick inside and upload the code from Anette's model repo. (You may need to rename the R_packages folder to just R, or change it to R_packages in the script.)

All nodes in the cluster get the code applied to them.

Now in the AWS console go to Elastic MapReduce (under 'Compute and Networking') - best do this in a new browser window or it'll break your upload! Click to sign up for it, if you haven't already, including adding credit card information etc.

Using Amazon's standard example. In EMR, click create a new job flow (see screenshot below):

  • Job Flow Name - anything you like
  • Hadoop version - Amazon Distribution
  • AMI - latest
  • Create a job flow - Run a sample application, pick Word Count (Streaming), then
  • click Continue.

image

In the next screen (see below):

  • Input Location is prepopulated (a public bucket), leave it
  • Output location - change <yourbucket> to your own new bucket's name (case sensitive I think)
  • Mapper and Reducer - use theirs
  • click Continue.

image

In the next screen (screenshot below):

  • Instance Type - small
  • Instance Count - 2, and
  • Continue.

image

In the next screen (see below):

  • Amazon EC2 Key Pair - leave it as Proceed without key pair (you may get an error, if so see below)
  • Amazon VPC Subnet ID - no preference
  • Amazon S3 Log Path - here enter your own path to your bucket, eg s3n://yourbucketname/log (note: s3n is an internal AWS protocol)
  • Enable debugging - Yes, and
  • Continue.

image

Leave it as Proceed with no Bootstrap Actions, click Continue:

imageThe next screen shows a summary of all the settings for your review, use Back to change any errors etc. When happy, click Create job flow to run it (and you'll get charged, so click Cancel if you'd rather not run it yet!).

image

It takes a few minutes to run. Click on the job name and click Debug to see the progress. There's a Refresh button to check if it's gone any further. Click on View jobs to see the jobs set up.

Error? If you get errors, at the top right hand side of the AWS Console click on your username, select Security Credentials, expand Access Keys and click Create New Set of Keys, then try again with Proceed without keypair (it seems that creating a new set of keys then enables you to proceed without actually using the created keys!)

Using the uploaded demo files. This is similar. In EMR create a new job flow, but this time under 'Create a job flow' choose 'Run your own application', with job type 'Streaming'.

For the Input Location use s3n://<yourbucketname>/data.txt, for the Output Location similarly the path to your bucket folder (eg Rtest.output) - it will be created if not already in existence, and can be downloaded to your own location. For Mapper, use the uploaded mapper.R file in your bucket, for Reducer the reducer.R file. Instance type small etc.

Proceed without key pair (see above if there are errors). Bootstrap action - this time choose your own custom action, and enter the path to your bucket and the bootstrapR.sh file. Continue. Create. View. Run when you're happy! (NB again it costs you money.)

 

Further notes: in jobs, tasks can be viewed too - you can see eg 12 mappers and 3 reducers. Output files are created one per reader, you have to stitch them back together. 0 byte files are created where there was no output from the relevant chunk.