ETL Processing Using AWS Data Pipeline and Amazon Elastic MapReduce

Data Moving

This blog post shows you how to build an ETL workflow that uses AWS Data Pipeline to schedule an Amazon Elastic MapReduce (Amazon EMR) cluster to clean and process web server logs stored in an Amazon Simple Storage Service (Amazon S3) bucket. AWS Data Pipeline is an ETL service that you can use to automate the movement and transformation of data. It launches an Amazon EMR cluster for each scheduled interval, submits jobs as steps to the cluster, and terminates the cluster after tasks have completed.

In this post, you’ll create the following ETL workflow:

To create the workflow, we’ll use the Pig and Hive examples discussed in the blog post “Ensuring Consistency when Using Amazon S3 and Amazon EMR.” This ETL workflow pushes webserver logs to an Amazon S3 bucket, cleans and filters the data using Pig scripts, and then generates analytical reports from this data using Hive scripts. AWS Data Pipeline allows you to run this workflow for a schedule in the future and lets you backfill data by scheduling a pipeline to run from a start date in the past.

You can create your workflow using the AWS Management console or use the AWS command line interface or API to automate the process of creating and managing pipelines. To learn more about the CLI and SDK see the following links:

  • AWS Command Line Interface (CLI) — Provides commands for a broad set of AWS products, and is supported on Windows, Mac, and Linux/Unix.
  • AWS Software Development Kits (SDK) — Lets you to build applications that create and manage pipelines using language-specific APIs.
  • Web Service API — AWS provides a low-level interface that you can use to call the web service directly using JSON.

We will use the AWS CLI to create and activate our ETL pipeline. You can install and setup CLI using these guidelines.

Six-step Workflow

Step 1: Check if log files are available in the Amazon S3 bucket.

Step 2: Create an Amazon EMR cluster with EMRFS on it.

Step 3: Run emrfs sync to update metadata with contents of  the Amazon S3 bucket.

Step 4: Submit a Pig job on Amazon EMR cluster as step.

Step 5: Reuse the same Amazon EMR cluster to launch a Hive job for generating reports.

Step 6: Clean EMRFS metadata that is older than two days (emrfs delete).

Let’s look at how each step in our workflow can be represented as an AWS Data Pipeline object and how it translates to JSON.

Step 1: Determine Whether Log Files are Available in an Amazon S3 Bucket

The first step is to check whether files are available in the s3://emr-worksop/data/input/{YYYY-MM-DD} location. This is passed as a variable myInputData. A variable can be referenced in other places by #{variable_name}.

In our example, we use the scheduledStartTime variable provided by AWS Data Pipeline and do a date minus operation to get the data of yesterday’s logs.

#{format(minusDays(@scheduledStartTime,1),’YYYY-MM-dd’)} references to a day before the scheduled date of workflow.

This is done using the Precondition feature of AWS Data Pipeline.

{
"id": "myPreconditionId",
	"name": "DefaultPrecondition1",
	"maximumRetries": "10",
	"role": "DataPipelineDefaultRole",
	"s3Prefix": "#{myInputData}/#{format(minusDays(@scheduledStartTime,1),'YYYY-MM-dd')}",
	"type": "S3PrefixNotEmpty"
}

Step 2: Creating an Amazon EMR Cluster with EMRFS

The data pipeline JSON code below launches  a three-node Amazon EMR cluster with EMRFS with one master and two core nodes. terminateAfter terminates the cluster after one hour.

{
	 "terminateAfter": "1 Hour",
	 "id": "myEmrClusterId",
	"amiVersion": "3.2.1",
	"schedule": {
		"ref": "mySchedule"
		},
	"keyPair": "#{myKeyPair}",
	"masterInstanceType": "m3.xlarge",
	"bootstrapAction": "s3://elasticmapreduce/bootstrap-actions/configure-hadoop,-e,fs.s3.consistent=true,-e, fs.s3.consistent.metadata.tableName=EmrFSMetadata, -e, fs.s3.consistent.retryCount=5, -e, fs.s3.consistent.retryPeriodSeconds=10, -e, fs.s3.enableServerSideEncryption=true",
	"coreInstanceType": "m3.xlarge",
	"enableDebugging": "true",
	"name": "DefaultEmrCluster1",
	"coreInstanceCount": "2",
	"type": "EmrCluster"
		}

Step 3: Run emrfs sync to Update Metadata with the Contents of the Amazon S3 Bucket

For this, we use the preStepCommand feature. Amazon EMR’s documentation provides more information about this feature.

“preStepCommand”: “/home/hadoop/bin/emrfs sync #{myInputData}”,

Step 4: Launch a Pig Job on the Amazon EMR Cluster

The Pig job is submitted to the Amazon EMR cluster as a Step. The input to this job are log files present in an Amazon S3 bucket referenced by variable myInputData. After processing the log file, the output is written to another Amazon S3 location referenced by the variable myTransformedData. A sub-folder is created under this bucket for each scheduled date.

The Pig script is referenced by variable myPigScript.

"step": [
	"s3://us-east-1.elasticmapreduce/libs/script-runner/script-runner.jar,s3://us-east-1.elasticmapreduce/libs/pig/pig-script,--base-path,s3://us-east-1.elasticmapreduce/libs/pig/,--pig-versions,latest,--run-pig-script,--args,-f,#{myPigScript},-p,INPUT=#{myInputData}/#{format(minusDays(@scheduledStartTime,1),'YYYY-MM-dd')},-p,OUTPUT=#{myTransformedData}/#{format(minusDays(@scheduledStartTime,1),'YYYY-MM-dd')}"
]

Step 5: Reuse the Amazon EMR Cluster to Launch a Hive job for Generating Reports

After the Pig job has cleaned the data, we kick off the Hive job to generate reports on same Amazon EMR cluster. Let’s take a closer look at this Hive script (download the script to check it out). The script creates two tables: one from the cleaned up log reports and one from the subset of data about Google and Bing search terms (both were outputs from the Pig step).

The Hive job is submitted to the Amazon EMR cluster as a step using the command below. The myHiveScript variable is defined as a expression, the value of which is substituted AWS Data Pipeline at run time. The AWS Data Pipeline documentation provides details about the expressions.

"step": "s3://us-east-1.elasticmapreduce/libs/script-runner/script-runner.jar,s3://us-east-1.elasticmapreduce/libs/hive/hive-script,--base-path,s3://us-east-1.elasticmapreduce/libs/hive/,--hive-versions,latest,--run-hive-script,--args,-f,#{myHiveScript},-d,INPUT=#{myTransformedData}/#{format(minusDays(@scheduledStartTime,1),'YYYY-MM-dd')},-d,OUTPUT=#{myOutputData},-d,DATE=#{format(minusDays(@scheduledStartTime,1),'YYYY-MM-dd')}"

Step 6: Clean EMRFS Metadata Older than Two Days (emrfs delete)

We do this post processing after both the jobs have completed on cluster. Amazon EMR documentation provides more information about EMRFS delete. For this we use the postStepCommand feature of AWS Data Pipeline which runs the command after successfully completing the step.

“postStepCommand”: “/home/hadoop/bin/emrfs delete –time 2 -u days”

Defining a Schedule for the Workflow

The Schedule object of the pipeline can be used to define the schedule on which the pipeline should execute. You can define a schedule by specifying a startDateTime, an endDataTime and the interval at which the pipeline should run, such as once a day.

The endDateTime field is optional. If the endDateTime is not specified, the pipeline will run until it is shut down. AWS Data Pipeline documentation provides details about pipeline scheduling.

For our ETL example we define a schedule to run the pipeline for a day in the past.

{
      "id": "ScheduleId_E8jhD",
      "startDateTime": "2014-09-22T00:00:00",
      "name": "MyEMRJobSchedule",
      "type": "Schedule",
      "period": "1 Day",
      "endDateTime": "2014-09-23T00:00:00"
    }

Now that we’ve defined the required objects in our JSON pipeline definition, we can create and activate our pipeline.

Creating and Activating the Pipeline

You can download the sample myWorkflow.json used in this example and create a pipeline using the AWS Data Pipeline CLI. This pipeline will run once for a day in the past. Before you create the pipeline, replace <<MY_BUCKET>> and <<MY_KEYPAIR>> values with your Amazon S3 bucket and keypair.

Creating the pipeline is simple using create-pipeline.  The output of this command will be pipeline id.

$ aws datapipeline create-pipeline --name myETLWorkflow --unique-id myETLWorkflow
 {
     "pipelineId": "df-XXXXXXXXXXX "
 }

Next we put the pipeline definition. Any errors/warnings in your JSON appear on the screen. Creating the pipeline does not schedule it for execution. You must activate the pipeline before it will run on the schedule.

$ aws datapipeline put-pipeline-definition --pipeline-id df-XXXXXXXXXXX --pipeline-definition file://myWorkflow.json

$ aws datapipeline activate-pipeline --pipeline-id df- XXXXXXXXXXX

Conclusion

Congratulations! You have successfully launched your first ETL pipeline to do data processing using Amazon EMR. You can continue evolving your workflow to include other AWS services like Amazon Redshift, Amazon RDS for MySQL, and Amazon DynamoDB.