Analysis of Top-N DynamoDB Objects using Amazon Athena and Amazon QuickSight

No SQL

If you run an operation that continuously generates a large amount of data, you may want to know what kind of data is being inserted by your application. The ability to analyze data intake quickly can be very valuable for business units, such as operations and marketing. For many operations, it’s important to see what is driving the business at any particular moment. For retail companies, for example, understanding which products are currently popular can aid in planning for future growth. Similarly, for PR companies, understanding the impact of an advertising campaign can help them market their products more effectively.

This post covers an architecture that helps you analyze your streaming data. You’ll build a solution using Amazon DynamoDB Streams, AWS Lambda, Amazon Kinesis Firehose, and Amazon Athena to analyze data intake at a frequency that you choose. And because this is a serverless architecture, you can use all of the services here without the need to provision or manage servers.

The data source

You’ll collect a random sampling of tweets via Twitter’s API and store a variety of attributes in your DynamoDB table, such as: Twitter handle, tweet ID, hashtags, location, and Time-To-Live (TTL) value.

In DynamoDB, the primary key is used as an input to an internal hash function. The output from this function determines the partition in which the data will be stored. When using a combination of primary key and sort key as a DynamoDB schema, you need to make sure that no single partition key contains many more objects than the other partition keys because this can cause partition level throttling. For the demonstration in this blog, the Twitter handle will be the primary key and the tweet ID will be the sort key. This allows you to group and sort tweets from each user.

To help you get started, I have written a script that pulls a live Twitter stream that you can use to generate your data. All you need to do is provide your own Twitter Apps credentials, and it should generate the data immediately. Alternatively, I have also provided a script that you can use to generate random Tweets with little effort.

You can find both scripts in the Github repository:

https://github.com/awslabs/aws-blog-dynamodb-analysis

There are some modules that you may need to install to run these scripts. You can find them in Python’s module repository:

  • boto3: https://aws.amazon.com/sdk-for-python/
  • twitter: https://pypi.python.org/pypi/twitter
  • names: https://pypi.python.org/pypi/names/
  • loremipsum: https://pypi.python.org/pypi/loremipsum

To get your own Twitter credentials, go to https://www.twitter.com/ and sign up for a free account, if you don’t already have one. After your account is set up, go to https://apps.twitter.com/. On the main landing page, choose the Create New App button. After the application is created, go to Keys and Access Tokens to get your credentials to use the Twitter API. You’ll need to generate Customer Tokens/Secret and Access Token/Secret. All four keys will be used to authenticate your request.

Architecture overview

Before we begin, let’s take a look at the overall flow of information will look like, from data ingestion into DynamoDB to visualization of results in Amazon QuickSight.

As illustrated in the architecture diagram above, any changes made to the items in DynamoDB will be captured and processed using DynamoDB Streams. Next, a Lambda function will be invoked by a trigger that is configured to respond to events in DynamoDB Streams. The Lambda function processes the data prior to pushing to Amazon Kinesis Firehose, which will output to Amazon S3. Finally, you use Amazon Athena to analyze the streaming data landing in Amazon S3. The result can be explored and visualized in Amazon QuickSight for your company’s business analytics.

You’ll need to implement your custom Lambda function to help transform the raw <key, value> data stored in DynamoDB to a JSON format for Athena to digest, but I can help you with a sample code that you are free to modify.

Implementation

In the following sections, I’ll walk through how you can set up the architecture discussed earlier.

Create your DynamoDB table

First, let’s create a DynamoDB table and enable DynamoDB Streams. This will enable data to be copied out of this table. From the console, use the user_id as the partition key and tweet_id as the sort key:

After the table is ready, you can enable DynamoDB Streams. This process operates asynchronously, so there is no performance impact on the table when you enable this feature. The easiest way to manage DynamoDB Streams is also through the DynamoDB console.

In the Overview tab of your newly created table, click Manage Stream. In the window, choose the information that will be written to the stream whenever data in the table is added or modified. In this example, you can choose either New image or New and old images.

For more details on this process, check out our documentation:

http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html

Configure Kinesis Firehose

Before creating the Lambda function, you need to configure Kinesis Firehose delivery stream so that it’s ready to accept data from Lambda. Open the Firehose console and choose Create Firehose Delivery Stream. From here, choose S3 as the destination and use the following to information to configure the resource. Note the Delivery stream name because you will use it in the next step.

For more details on this process, check out our documentation:

http://docs.aws.amazon.com/firehose/latest/dev/basic-create.html#console-to-s3

Create your Lambda function

Now that Kinesis Firehose is ready to accept data, you can create your Lambda function.

From the AWS Lambda console, choose the Create a Lambda function button and use the Blank Function. Enter a name and description, and choose Python 2.7 as the Runtime. Note your Lambda function name because you’ll need it in the next step.

In the Lambda function code field, you can paste the script that I have written for this purpose. All this function needs is the name of your Firehose stream name set as an environment variable.

import boto3
import json
import os

# Initiate Firehose client
firehose_client = boto3.client('firehose')

def lambda_handler(event, context):
    records = []
    batch   = []
    try :
        for record in event['Records']:
            tweet = {}
            t_stats = '{ "table_name":"%s", "user_id":"%s", "tweet_id":"%s", "approx_post_time":"%d" }\n' \
                      % ( record['eventSourceARN'].split('/')[1], \
                          record['dynamodb']['Keys']['user_id']['S'], \
                          record['dynamodb']['Keys']['tweet_id']['N'], \
                          int(record['dynamodb']['ApproximateCreationDateTime']) )
            tweet["Data"] = t_stats
            records.append(tweet)
        batch.append(records)
        res = firehose_client.put_record_batch(
            DeliveryStreamName = os.environ['firehose_stream_name'],
            Records = batch[0]
        )
        return 'Successfully processed {} records.'.format(len(event['Records']))
    except Exception :
        pass

The handler should be set to lambda_function.lambda_handler and you can use the existing lambda_dynamodb_streams role that’s been created by default.

Enable DynamoDB trigger and start collecting data

Everything is ready to go. Open your table using the DynamoDB console and go to the Triggers tab. Select the Create trigger drop down list and choose Existing Lambda function. In the pop-up window, select the function that you just created, and choose the Create button.

At this point, you can start collecting data with the Python script that I’ve provided. The first one will create a script that will pull public Twitter data and the other will generate fake tweets using Lorem Ipsum text.

Configure Amazon Athena to read the data

Next, you will configure Amazon Athena so that it can read the data Kinesis Firehose outputs to Amazon S3 and allow you to analyze the data as needed. You can connect to Athena directly from the Athena console, and you can establish a connection using JDBC or the Athena API. In this example, I’m going to demonstrate what this looks like on the Athena console.

First, create a new database and a new table. You can do this by running the following two queries. The first query creates a new database:

CREATE DATABASE IF NOT EXISTS ddbtablestats

And the second query creates a new table:

CREATE EXTERNAL TABLE IF NOT EXISTS ddbtablestats.twitterfeed (
    `table_name` string,
    `user_id` string,
    `tweet_id` bigint,
    `approx_post_time` timestamp 
) PARTITIONED BY (
    year string,
    month string,
    day string,
    hour string 
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES ('serialization.format' = '1')
LOCATION 's3://myBucket/dynamodb/streams/transactions/'

Note that this table is created using partitions. Partitioning separates your data into logical parts based on certain criteria, such as date, location, language, etc. This allows Athena to selectively pull your data without needing to process the entire data set. This effectively minimizes the query execution time, and it also allows you to have greater control over the data that you want to query.

After the query has completed, you should be able to see the table in the left side pane of the Athena dashboard.

After the database and table have been created, execute the ALTER TABLE query to populate the partitions in your table. Replace the date with the current date when the script was executed.

ALTER TABLE ddbtablestats.TwitterFeed ADD IF NOT EXISTS
PARTITION (year='2017',month='05',day='17',hour='01') location 's3://myBucket/dynamodb/streams/transactions/2017/05/17/01/'

Using the Athena console, you’ll need to manually populate each partition for each additional partition that you’d like to analyze, however you can programmatically automate this process by using the JDBC driver or any AWS SDK of your choice.

For more information on partitioning in Athena, check out our documentation:

http://docs.aws.amazon.com/athena/latest/ug/partitions.html

Querying the data in Amazon Athena

This is it! Let’s run this query to see the top 10 most active Twitter users in the last 24 hours. You can do this from the Athena console:

SELECT user_id, COUNT(DISTINCT tweet_id) tweets FROM ddbTableStats.TwitterFeed
WHERE year='2017' AND month='05' AND day='17'
GROUP BY user_id
ORDER BY tweets DESC
LIMIT 10

The result should look similar to the following:

 

Linking Athena to Amazon QuickSight

Finally, to make this data available to a larger audience, let’s visualize this data in Amazon QuickSight. Amazon QuickSight provides native connectivity to AWS data sources such as Amazon Redshift, Amazon RDS, and Amazon Athena. Amazon QuickSight can also connect to on-premises databases, Excel, or CSV files, and it can connect to cloud data sources such as Salesforce.com. For this solution, we will connect Amazon QuickSight to the Athena table we just created.

Amazon QuickSight has a free tier that provides 1 user and 1GB of SPICE (Superfast Parallel In-memory Calculated Engine) capacity free. So you can sign up and use QuickSight free of charge.

When you are signing up for Amazon QuickSight, ensure that you grant permissions for QuickSight to connect to Athena and the S3 bucket where the data is stored.

After you’ve signed up, navigate to the new analysis button, and choose new data set, and then select the Athena data source option. Create a new name for your data source and proceed to the next prompt. At this point, you should see the Athena table you created earlier.

 

Choose the option to import the data to SPICE for a quicker analysis. SPICE is an in-memory optimized calculation engine that is designed for quick data visualization through parallel processing. SPICE also enables you to refresh your data sets at a regular interval or on-demand as you want.

 

In the dialog box, confirm this data set creation, and you’ll arrive on the landing page where you can start building your graph. The X-axis will represent the user_id and the Value will be used to represent the SUM total of the tweets from each user.

The Amazon QuickSight report looks like this:

 

Through this visualization, I can easily see that there are 3 users that tweeted over 20 times that day and that the majority of the users have fewer than 10 tweets that day. I can also set up a scheduled refresh of my SPICE dataset so that I have a dashboard that is regularly updated with the latest data.

Closing thoughts

Here are the benefits that you can gain from using this architecture:

  1. You can optimize the design of your DynamoDB schema that follows AWS best practice recommendations.
  1. You can run analysis and data intelligence in order to understand the current customer demands for your business.
  1. You can store incremental backup for future auditing.

The flexibility of our AWS services invites you to create and design the ideal workflow for your production at any scale, and, as always, if you ever need some guidance, don’t hesitate to reach out to us.I  hope this has been helpful to you! Please leave any questions and comments below.