Monitor Your Application for Processing DynamoDB Streams

No SQL

DynamoDB Streams can handle requests at scale, but you risk losing stream records if your processing application lags: DynamoDB Stream records are unavailable after 24 hours. Therefore, when you maintain multiregion read replicas of your DynamoDB table, you might be afraid of losing data.

In this post, I suggest ways you can monitor the Amazon Kinesis Client Library (KCL) application you use to process DynamoDB Streams to quickly track and resolve issues or failures so you can avoid losing data. Dashboards, metrics, and application logs all play a part. This post may be most relevant to Java applications running on Amazon EC2 instances.

Before you read further, please read my previous posts about designing KCL applications for DynamoDB Streams using a single worker or multiple workers.

Dashboards

You can make a dashboard from important CloudWatch metrics to get a complete overview of the state of your application. A sample dashboard for an application processing a 256-shard DynamoDB stream with two c4.large EC2 workers is shown below:

(FailoverTimeMillis = 60000)

The Metrics section below describes some of these metrics in more detail, but here is what the dashboard quickly reveals:

  • Throughput vs. processing
    The number of records processed by KCL is equal to the number of writes on the base table which means the application is processing at par.
  • ReturnedItemCount
    The total number of leases that KCL is processing is constant and equal to the total number of shards (256).
  • Current leases The initial distribution of leases assigned to workers is uneven, but workers are trying to converge to have equal work assigned.
  • Leases table
    The write and read consumption on the KCL leases table is within the provisioned limits.
  • CPU utilization and ThreadCount
    The worker with more leases assigned to it is doing more work as expected.
  • Memory utilization and heap memory usage
  • The memory footprint of KCL is fairly high (mainly because of the in-memory processing of records). It may be advisable to tune your JVM max heap size for your application to avoid out of memory issues.

Metrics

While there are many useful metrics that KCL provides to monitor your application, the following table shows a few that are crucial to tracking the state of your application.

RecordsProcessed

This metric is available per shard by default. For a large number of shards, you can turn off shard-level metrics and get an aggregated value instead. Monitor this to ensure it is equal to the throughput on your base table (if processing all streams records).

ReturnedItemCount

This is the total number of leases in the KCL DynamoDB leases table. This value is the most straightforward indicator of your application falling behind and can be used to determine if you need an extra worker to speed up processing.

For example, if you have the KCL worker parameter “CleanupLeasesUponShardCompletion” set to TRUE (default is TRUE), you should ideally have only as many leases in the table as the number of partitions on your table, unless perhaps you are reading the stream from TRIM_HORIZON.

When shards rollover periodically, this number is briefly elevated, but if your application is processing at par with the throughput, it would go back down pretty fast. Having twice as many leases as partitiopns for a sustained period of time (say, 5 minutes) is an indication that your application is falling behind and adding an extra worker might help.

You can define an Auto Scaling policy to add an extra worker as follows:

aws autoscaling put-scaling-policy --region <region> --policy-name AddInstance --auto-scaling-group-name <ASG name> --adjustment-type ChangeInCapacity --scaling-adjustment 1

You can use the policyARN returned above to create a CloudWatch alarm as follows, where n is the number of active shards at the start of the application:

aws cloudwatch put-metric-alarm --region <region> --alarm-name NumberOfLeasesExceededLimit --metric-name ReturnedItemCount --namespace AWS/DynamoDB  --statistic Average --period 300 --threshold <2*n> --comparison-operator GreaterThanOrEqualToThreshold --dimensions Name=TableName,Value=<Leases Table Name> Name=Operation,Value=Scan  --evaluation-periods 1 --alarm-actions <Policy ARN above>

The above “NumberofLeasesExceededLimit” alarm fires when the “ReturnedItemCount” value increases beyond the threshold specified and will add another instance to your Auto Scaling group.

CurrentLeases

This is a per-worker metric for the number of leases (shards) assigned to a worker. In a single worker scenario, the value increases initially when the worker picks up leases. It remains constant over time. Although you might see brief spikes when shards roll over. In a multi-worker scenario, depending on your worker configurations (MaxLeasesForWorker, MaxLeasesToStealAtOneTime), you would see workers converging to an equal distribution of leases over time.

Consumed capacity and throttling metrics

The consumed write capacity on the KCL leases table remains fairly constant over time when your application is processing at par. The consumed read capacity will have periodic spikes because of scans on the table, depending on some of your worker configurations (FailoverTimeMillis). A linear increase in the consumed read capacity, a sharp drop in consumed write capacity or excessive throttling are potential indicators of your application falling behind.

Resource utilization metrics

There are a few points to keep in mind related to resource utilization metrics.You can monitor the CPU utilization on your EC2 instances using EC2 metrics. You can also turn on memory utilization metrics. For more information, see Monitoring Memory and Disk Metrics for Amazon EC2 Linux Instances. In addition, you can define Auto Scaling policies based on the CPU and memory usage metrics

If you are running a Java application, it may be useful to monitor JMX metrics such as the number of threads and the heap memory usage. There are some open source packages that let you collect and publish JMX metrics to CloudWatch.

Application logs

To make it easier to track and resolve issues, it might help to separate KCL logs from your application logs. The log4j config below can be used to publish KCL logs to a file which rolls over after reaching a size of 10 MB.

log4j.logger.com.amazonaws.services.kinesis=INFO, KCLLOGS

log4j.appender.KCLLOGS.ImmediateFlush=true
log4j.appender.KCLLOGS=org.apache.log4j.RollingFileAppender
log4j.appender.KCLLOGS.MaxFileSize=10MB
log4j.appender.KCLLOGS.MaxBackupIndex=100
log4j.appender.KCLLOGS.File=/home/ec2-user/kcl.log
log4j.appender.KCLLOGS.threshold=INFO
log4j.appender.KCLLOGS.layout=org.apache.log4j.EnhancedPatternLayout
log4j.appender.KCLLOGS.layout.ConversionPattern=%d{ISO8601} %-5p %40C - %m%n%throwable

You can push all your logs to CloudWatch using the CloudWatch Logs Agent. You can also configure metrics on the CloudWatch logs to monitor exceptions, errors, or other occurrences specific to your application.

Below are some of the KCL log messages and their implications:

“Skipping over the following data records”

This error appears when your record processor throws an exception while processing a batch of records. That batch will likely get skipped, resulting in loss of data. It is essential to have the appropriate retry logic in your record processor implementation to avoid this outcome.

“Can’t update checkpoint – instance doesn’t hold the lease for this shard”

This exception gets thrown when the KCL can’t checkpoint records because the worker no longer holds the lease for the shard. This typically happens when the worker is shutdown with reason “ZOMBIE” because it was unable to renew its leases.

KCL gracefully resumes processing and checkpointing with another worker, but it is essential to understand why the worker was unable to renew its leases because it may indicate other issues in the system. For example, the KCL leases table may be under-provisioned or the worker may not have enough resources to handle the number of shards assigned to it.

Log the shutdown reason in your implementation of ProcessRecords, so that you can easily search and configure metrics on “ZOMBIE” messages in your application logs.

“Parent shardId <shardId> is not closed. This can happen due to a race condition between describeStream and a reshard operation”

This error shows up when you attempt to delete completed shards from the leases table, because describeStream results for DynamoDB Streams are eventually consistent. It’s possible that even though the shard is closed, the metadata still shows it as open or shows it as closed but does not yet have child shard information.

It is usually safe to ignore this error because the lease should eventually get deleted. In the rare event that you continue to get this error for the same shardId, a deeper investigation may be required as to why the shard consumer got shut down prematurely.

Conclusion

Using the information in this post, you can start building your own CloudWatch dashboards and alarms for your DynamoDB Streams processing application and be confident that you’ve taken concrete steps to avoid losing stream records.