However, migrations can have two common issues:
- Service outage due to downtime, especially when customer service must be seamlessly available 24/7/365
- Different key design between RDBMS and DynamoDB
This post introduces two methods of seamlessly migrating data from MySQL to DynamoDB, minimizing downtime and converting the MySQL key design into one more suitable for NoSQL.
I’ve included sample code that uses the following AWS services:
- AWS Database Migration Service (AWS DMS) can migrate your data to and from most widely used commercial and open-source databases. It supports homogeneous and heterogeneous migrations between different database platforms.
- Amazon EMR is a managed Hadoop framework that helps you process vast amounts of data quickly. Build EMR clusters easily with preconfigured software stacks that include Hive and other business software.
- Amazon Kinesis can continuously capture and retain a vast amount of data such as transaction, IT logs, or clickstreams for up to 7 days.
- AWS Lambda helps you run your code without provisioning or managing servers. Your code can be automatically triggered by other AWS services such Amazon Kinesis Streams.
Here are the two options I describe in this post:
- Use AWS DMS
AWS DMS supports migration to a DynamoDB table as a target. You can use object mapping to restructure original data to the desired structure of the data in DynamoDB during migration.
- Use EMR, Amazon Kinesis, and Lambda with custom scripts
Consider this method when more complex conversion processes and flexibility are required. Fine-grained user control is needed for grouping MySQL records into fewer DynamoDB items, determining attribute names dynamically, adding business logic programmatically during migration, supporting more data types, or adding parallel control for one big table.
After the initial load/bulk-puts are finished, and the most recent real-time data is caught up by the CDC (change data capture) process, you can change the application endpoint to DynamoDB.
The method of capturing changed data in option 2 is covered in the AWS Database post Streaming Changes in a Database with Amazon Kinesis. All code in this post is available in the big-data-blog GitHub repo, including test codes.
The following diagram shows the overall architecture of both options.
Option 1: Use AWS DMS
This section discusses how to connect to MySQL, read the source data, and then format the data for consumption by the target DynamoDB database using DMS.
Create the replication instance and source and target endpoints
Create a replication instance that has sufficient storage and processing power to perform the migration job, as mentioned in the AWS Database Migration Service Best Practices whitepaper. For example, if your migration involves a large number of tables, or if you intend to run multiple concurrent replication tasks, consider using one of the larger instances. The service consumes a fair amount of memory and CPU.
As the MySQL user, connect to MySQL and retrieve data from the database with the privileges of SUPER, REPLICATION CLIENT. Enable the binary log and set the binlog_format parameter to ROW for CDC in the MySQL configuration. For more information about how to use DMS, see Getting Started in the AWS Database Migration Service User Guide.
Before you begin to work with a DynamoDB database as a target for DMS, make sure that you create an IAM role for DMS to assume, and grant access to the DynamoDB target tables. Two endpoints must be created to connect the source and target. The following screenshot shows sample endpoints.
The following screenshot shows the details for one of the endpoints, source-mysql.
Create a task with an object mapping rule
In this example, assume that the MySQL table has a composite primary key (customerid + orderid + productid). You are going to restructure the key to the desired structure of the data in DynamoDB, using an object mapping rule.
In this case, the DynamoDB table has the hash key that is a combination of the customerid and orderid columns, and the sort key is the productid column. However, the partition key should be decided by the user in an actual migration, based on data ingestion and access pattern. You would usually use high-cardinality attributes. For more information about how to choose the right DynamoDB partition key, see the Choosing the Right DynamoDB Partition Key AWS Database blog post.
DMS automatically creates a corresponding attribute on the target DynamoDB table for the quantity column from the source table because rule-action is set to map-record-to-record and the column is not listed in the exclude-columns attribute list. For more information about map-record-to-record and map-record-to-document, see Using an Amazon DynamoDB Database as a Target for AWS Database Migration Service.
Migration starts immediately after the task is created, unless you clear the Start task on create option. I recommend enabling logging to make sure that you are informed about what is going on with the migration task in the background.
The following screenshot shows the task creation page.
You can use the console to specify the individual database tables to migrate and the schema to use for the migration, including transformations. On the Guided tab, use the Where section to specify the schema, table, and action (include or exclude). Use the Filter section to specify the column name in a table and the conditions to apply.
Table mappings also can be created in JSON format. On the JSON tab, check Enable JSON editing.
Here’s an example of an object mapping rule that determines where the source data is located in the target. If you copy the code, replace the values of the following attributes. For more examples, see Using an Amazon DynamoDB Database as a Target for AWS Database Migration Service.
Start the migration task
If the target table specified in the target-table-name property does not exist in DynamoDB, DMS creates the table according to data type conversion rules for source and target data types. There are many metrics to monitor the progress of migration. For more information, see Monitoring AWS Database Migration Service Tasks.
The following screenshot shows example events and errors recorded by CloudWatch Logs.
DMS replication instances that you used for the migration should be deleted once all migration processes are completed. Any CloudWatch logs data older than the retention period is automatically deleted.
Option 2: Use EMR, Amazon Kinesis, and Lambda
This section discusses an alternative option using EMR, Amazon Kinesis, and Lambda to provide more flexibility and precise control. If you have a MySQL replica in your environment, it would be better to dump data from the replica.
Change the key design
When you decide to change your database from RDMBS to NoSQL, you need to find a more suitable key design for NoSQL, for performance as well as cost-effectiveness.
Similar to option #1, assume that the MySQL source has a composite primary key (customerid + orderid + productid). However, for this option, group the MySQL records into fewer DynamoDB items by customerid (hash key) and orderid (sort key). Also, remove the last column (productid) of the composite key by converting the record values productid column in MySQL to the attribute name in DynamoDB, and setting the attribute value as quantity.
This conversion method reduces the number of items. You can retrieve the same amount of information with fewer read capacity units, resulting in cost savings and better performance. For more information about how to calculate read/write capacity units, see Provisioned Throughput.
Option 2 has two paths for migration, performed at the same time:
- Batch-puts: Export MySQL data, upload it to Amazon S3, and import into DynamoDB.
- Real-time puts: Capture changed data in MySQL, send the insert/update/delete transaction to Amazon Kinesis Streams, and trigger the Lambda function to put data into DynamoDB.
To keep the data consistency and integrity, capturing and feeding data to Amazon Kinesis Streams should be started before the batch-puts process. The Lambda function should stand by and Streams should retain the captured data in the stream until the batch-puts process on EMR finishes. Here’s the order:
- Start real-time puts to Amazon Kinesis Streams.
- As soon as real-time puts commences, start batch-puts.
- After batch-puts finishes, trigger the Lambda function to execute put_item from Amazon Kinesis Streams to DynamoDB.
- Change the application endpoints from MySQL to DynamoDB.
Step 1: Capture changing data and put into Amazon Kinesis Streams
Firstly, create an Amazon Kinesis stream to retain transaction data from MySQL. Set the Data retention period value based on your estimate for the batch-puts migration process. For data integrity, the retention period should be enough to hold all transactions until batch-puts migration finishes. However you do not necessarily need to select the maximum retention period. It depends on the amount of data to migrate.
In the MySQL configuration, set binlog_format to ROW to capture transactions by using the BinLogStreamReader module. The log_bin parameter must be set as well to enable the binlog. For more information, see the Streaming Changes in a Database with Amazon Kinesis AWS Database blog post.
The following sample code is a Python example that captures transactions and sends them to Amazon Kinesis Streams.
The following code is sample JSON data generated by the Python script. The type attribute defines the transaction recorded by that JSON record:
- WriteRowsEvent = INSERT
- UpdateRowsEvent = UPDATE
- DeleteRowsEvent = DELETE
Step 2. Dump data from MySQL to DynamoDB
The easiest way is to use DMS, which recently added Amazon S3 as a migration target. For an S3 target, both full load and CDC data is written to CSV format. However, CDC is not a good fit as UPDATE and DELETE statements are not supported. For more information, see Using Amazon S3 as a Target for AWS Database Migration Service.
Another way to upload data to Amazon S3 is to use the INTO OUTFILE SQL clause and aws s3 sync CLI command in parallel with your own script. The degree of parallelism depends on your server capacity and local network bandwidth. You might find a third-party tool useful, such as pt-archiver (part of the Percona Toolkit see the appendix for details).
I recommend the aws s3 sync command for this use case. This command works internally with the S3 multipart upload feature. Pattern matching can exclude or include particular files. In addition, if the sync process crashes in the middle of processing, you do not need to upload the same files again. The sync command compares the size and modified time of files between local and S3 versions, and synchronizes only local files whose size and modified time are different from those in S3. For more information, see the sync command in the S3 section of the AWS CLI Command Reference.
After all data is uploaded to S3, put it into DynamoDB. There are two ways to do this:
- Use Hive with an external table
- Write MapReduce code
Hive with an external table
Create a Hive external table against the data on S3 and insert it into another external table against the DynamoDB table, using the org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler property. To improve productivity and the scalability, consider using Brickhouse, which is a collection of UDFs for Hive.
The following sample code assumes that the Hive table for DynamoDB is created with the products column, which is of type ARRAY<STRING >. The productid and quantity columns are aggregated, grouping by customerid and orderid, and inserted into the products column with the CollectUDAF columns provided by Brickhouse.
Unfortunately, the MAP, LIST, BOOLEAN, and NULL data types are not supported by the DynamoDBStorageHandler class, so the ARRAY<STRING> data type has been chosen. The products column of ARRAY<STRING> data type in Hive is matched to the StringSet type attribute in DynamoDB. The sample code mostly shows how Brickhouse works, and only for those who want to aggregate multiple records into one StringSet type attribute in DynamoDB.
Python MapReduce with Hadoop Streaming
A mapper task reads each record from the input data on S3, and maps input key-value pairs to intermediate key-value pairs. It divides source data from S3 into two parts (key part and value part) delimited by a TAB character (“\t”). Mapper data is sorted in order by their intermediate key (customerid and orderid) and sent to the reducer. Records are put into DynamoDB in the reducer step.
Generally, the reduce task receives the output produced after map processing (which is key/list-of-values pairs) and then performs an operation on the list of values against each key.
In this case, the reducer is written in Python and is based on STDIN/STDOUT/hadoop streaming. The enumeration data type is not available. The reducer receives data sorted and ordered by the intermediate key set in the mapper, customerid and orderid (cols,cols) in this case, and stores all attributes for the specific key in the item_data dictionary. The attributes in the item_data dictionary are put, or flushed, into DynamoDB every time a new intermediate key comes from sys.stdin.
To run the MapReduce job, connect to the EMR master node and run a Hadoop streaming job. The hadoop-streaming.jar file location or name could be different, depending on your EMR version. Exception messages that occur while reducers run are stored at the directory assigned as the –output option. Hash key and range key values are also logged to identify which data causes exceptions or errors.
In my migration experiment using the above scripts, with self-generated test data, I found the following results, including database size and the time taken to complete the migration.
|EMR cluster||master : 1 x m3.xlarge
core : 2 x m4.4xlarge
|DynamoDB||2000 write capacity unit|
|Data||Number of records||1,000,000,000|
|Database file size (.ib)||100.6 GB|
|CSV files size||37 GB|
|Performance (time)||Export to CSV||6 min 10 sec|
|Upload to S3 (sync)||3 min 30 sec|
|Import to DynamoDB||depending on write capacity unit|
The following screenshot shows the performance results by write capacity.
Note that the performance result is flexible and can vary depending on the server capacity, network bandwidth, degree of parallelism, conversion logic, program language, and other conditions. All provisioned write capacity units are consumed by the MapReduce job for data import, so the more you increase the size of the EMR cluster and write capacity units of DynamoDB table, the less time it takes to complete. Java-based MapReduce code would be more flexible for function and MapReduce framework.
Step 3: Amazon Lambda function updates DynamoDB by reading data from Amazon Kinesis
In the Lambda console, choose Create a Lambda function and the kinesis-process-record-python blueprint. Next, in the Configure triggers page, select the stream that you just created.
The Lambda function must have an IAM role with permissions to read from Amazon Kinesis and put items into DynamoDB.
The Lambda function can recognize the transaction type of the record by looking up the type attribute. The transaction type determines the method for conversion and update.
For example, when a JSON record is passed to the function, the function looks up the type attribute. It also checks whether an existing item in the DynamoDB table has the same key with the incoming record. If so, the existing item must be retrieved and saved in a dictionary variable (item, in this case). Apply a new update information command to the item dictionary before it is put back into DynamoDB table. This prevents the existing item from being overwritten by the incoming record.
Step 4: Switch the application endpoint to DynamoDB
Application codes need to be refactored when you change from MySQL to DynamoDB. The following simple Java code snippets focus on the connection and query part because it is difficult to cover all cases for all applications. For more information, see Programming with DynamoDB and the AWS SDKs.
Query to MySQL
The following sample code shows a common way to connect to MySQL and retrieve data.
Query to DynamoDB
To retrieve items from DynamoDB, follow these steps:
- Create an instance of the DynamoDB
- Create an instance of the Table
- Add the withHashKey and withRangeKeyCondition methods to an instance of the QuerySpec
- Execute the query method with the querySpec instance previously created. Items are retrieved as JSON format, so use the getJSON method to look up a specific attribute in an item.
In this post, I introduced two options for seamlessly migrating data from MySQL to DynamoDB and minimizing downtime during the migration. Option #1 used DMS, and option #2 combined EMR, Amazon Kinesis, and Lambda. I also showed you how to convert the key design in accordance with database characteristics to improve read/write performance and reduce costs. Each option has advantages and disadvantages, so the best option depends on your business requirements.
The sample code in this post is not enough for a complete, efficient, and reliable data migration code base to be reused across many different environments. Use it to get started, but design for other variables in your actual migration.
I hope this post helps you plan and implement your migration and minimizes service outages. If you have questions or suggestions, please leave a comment below.