AWS Database Blog

Build aggregations for Amazon DynamoDB tables using Amazon DynamoDB Streams

Amazon DynamoDB is a key-value database that delivers single-digit-millisecond performance at any scale. In transactional applications, developers who prioritize Key-Value operations often place significant importance on retrieving results with minimal latency. This holds true even when dealing with composite primary keys that consist of a partition and a sort key. The additional functionalities offered by relational databases, such as complex queries, ad hoc queries, joins, and aggregations, are often considered as overhead in these scenarios. The white paper Amazon DynamoDB – a Fast and Scalable NoSQL Database Service Designed for Internet Scale Applications dives deep into the architecture decisions made while designing Amazon DynamoDB.

Regardless of the access patterns that your application might have, you can still perform aggregations while working with Amazon DynamoDB. For instance, you can export the data to Amazon Simple Storage Service (Amazon S3) and utilize tools such as Amazon Athena for performing aggregations. Alternatively, you can load the data into data warehousing solutions like Amazon Redshift and run aggregations remotely. Another option is to perform a scan on the table to calculate counts, maximums, minimums, and sums. However, depending of the frequency of your queries, this approach can be costly due to the expenses incurred from retrieving all items in the table, which depend on their number and size. As a result, developers often employ different techniques to achieve aggregations based on specific use cases and requirements.

In this post, we discuss how to perform aggregations on a DynamoDB table using Amazon DynamoDB Streams and AWS Lambda. The content includes a reference architecture, a step-by-step guide on enabling DynamoDB Streams for a table, sample code for implementing the solution within a scenario, and an accompanying AWS CloudFormation template for easy deployment and testing.

Solution overview

Imagine a scenario where we have a retail company called AnyCompany, specializing in the sale of books and stationary items. The business stakeholders at AnyCompany have a pressing need to track the real-time sales of specific items. Let’s take the example of a newly released book. By having immediate access to the number of units sold, they can accurately monitor inventory levels and promptly respond to the demand for the book. This valuable insight empowers them to take proactive measures, ensuring they have sufficient stock on hand to meet customer demands and optimize their operations effectively.

The following figure shows the overview architecture of the solution.

The workflow includes the following steps:

  1. A new item is created or inserted via Amazon API Gateway into the DynamoDB table.
  2. A new entry to the DynamoDB Stream is processed, representing the mutation of an item.
  3. Lambda polls the configured DynamoDB Stream 4 times per second, if the filter criteria is met the function runs.
  4. The Lambda function performs the required aggregation logic and inserts the data into the target DynamoDB table (Which could also be the same table).

Based on the scenario that we have discussed earlier and for simplicity, we use the following two tables:

  • Order_by_item – This table is used to capture the orders and order details, in the diagram is represented by the first table on the left. The following image shows a snippet of the table attributes from NoSQL workbench.
  • Item_count_by_date – This table stores the aggregated data—the number of items that are placed on a given day, in the diagram is represented by the table on the right

We implement the solution with the following steps:

  1. Create the source DynamoDB table and enable DynamoDB streams.
  2. Create the target DynamoDB table.
  3. Set up AWS Identity and Access Management (IAM) permission for the Lambda function.
  4. Deploy the Lambda function.
  5. Enable trigger for the source table.
  6. Configure Lambda function to add environment variables and filter criteria.
  7. Insert a new item into the source table to trigger the lambda function and verify the aggregated data on the target table.

If you are using CloudFormation template you can go directly to Step 7 (Insert a new record into the source table to trigger the lambda function and verify the aggregated data on the target table) and test the solution.

Prerequisites

To complete this walk through, you must have the following prerequisites:

Create the source DynamoDB table and enable streams

In this step, we create a DynamoDB table to store the order ID, order date, item number, quantity, and price per item. The primary key for this table is the orderid, and the sort key is the order_date. You can create the table using the DynamoDB console, AWS Command Line Interface (AWS CLI), or any AWS SDK. Capture the TableArn in the output, we will be using it for creating the IAM policy.

For Linux or mac:

aws dynamodb create-table \
--table-name Order_by_item \
--attribute-definitions \
AttributeName=orderid,AttributeType=S \
AttributeName=order_date,AttributeType=S \
--key-schema \
AttributeName=orderid,KeyType=HASH \
AttributeName=order_date,KeyType=RANGE \
--billing-mode PAY_PER_REQUEST \
--stream-specification \
StreamEnabled=true,StreamViewType=NEW_IMAGE

For Windows:

aws dynamodb create-table ^
--table-name Order_by_item ^
--attribute-definitions ^
AttributeName=orderid,AttributeType=S ^
AttributeName=order_date,AttributeType=S ^
--key-schema ^
AttributeName=orderid,KeyType=HASH ^
AttributeName=order_date,KeyType=RANGE ^
--stream-specification ^
StreamEnabled=true,StreamViewType=NEW_IMAGE

Create the target DynamoDB table

In this step, we create the target DynamoDB table where we capture the count of each item that has been sold for the current day and the past days since the solution has been deployed. Capture the TableArn in the output, we will be using it for creating the IAM policy.

For Linux or mac:

aws dynamodb create-table \
--table-name item_count_by_date \
--attribute-definitions \
AttributeName=item_number,AttributeType=S \
AttributeName=order_date,AttributeType=S \
--billing-mode PAY_PER_REQUEST \
--key-schema \
AttributeName=item_number,KeyType=HASH \
AttributeName=order_date,KeyType=RANGE

For Windows:

aws dynamodb create-table ^
--table-name item_count_by_date ^
--attribute-definitions ^
AttributeName=item_number,AttributeType=S ^
AttributeName=order_date,AttributeType=S ^
--key-schema ^
AttributeName=item_number,KeyType=HASH ^
AttributeName=order_date,KeyType=RANGE

Set up IAM permission for the Lambda function

In this step, we set up the necessary permissions that are required for the Lambda function to process the records in the streams and load it to the target table.

First, create an IAM role and attach the policies AWSLambdaDynamoDBExecutionRole and AWSLambdaBasicExecutionRole to the new role:

aws iam create-role --role-name ddb_lambda_role --assume-role-policy-document '{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Principal":{"Service":"lambda.amazonaws.com"},"Action":"sts:AssumeRole"}]}'

aws iam attach-role-policy --role-name ddb_lambda_role --policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole

aws iam attach-role-policy --role-name ddb_lambda_role --policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaDynamoDBExecutionRole

To limit access to only read and write actions, you can create a custom policy that only grants permissions to the dynamodb:PutItem, dynamodb:GetItem, dynamodb:UpdateItem, dynamodb:DeleteItem, and dynamodb:Query actions. This will provide more fine-grained control over access to the table.

  1. Create a file policy.json and copy the below JSON to the file. Replace Source_table_arn and Target_table_arn with the actual TableArn that we have saved when creating the tables.
    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "DynamoDBReadWriteAccess",
          "Effect": "Allow",
          "Action": [ 
                "dynamodb:PutItem",
                "dynamodb:GetItem",
                "dynamodb:UpdateItem",
                "dynamodb:DeleteItem",
                "dynamodb:Query",
                "dynamodb:DescribeStream",
                "dynamodb:GetRecords",
                "dynamodb:GetShardIterator",
                "dynamodb:ListStreams"
          ],
          "Resource": [
            "<Source_table_arn>",
            "<Target_table_arn>"
          ]
        }
      ]
    }
  2. Create a policy MyDynamoDBPolicy using the following command. Save the policy arn from the output, as it is required to attach the policy to the ddb_lambda_role role:
    aws iam create-policy --policy-name MyDynamoDBPolicy --policy-document file://policy.json (file://policy.json/)
  3. Attach the policy MyDynamoDBPolicy to the role ddb_lambda_role using the following command:
    aws iam attach-role-policy --role-name ddb_lambda_role --policy-arn arn:aws:iam::<account_id>:policy/MyDynamoDBPolicy

Deploy the Lambda function to perform the aggregation and load it to a target table

The Lambda function AggregateFunctions counts the quantity of an item for any order that is received. The target table has order_date as the partition key and item_number as the sort key. Deploy the Lambda function

  1. Open the Functions page of AWS Lambda console.
  2. Choose Create function. Enter function name AggregateFunctions , Runtime Python3.10 , and choose ddb_lamda_role

  3. Copy and paste the following code in lambda_function.py and save the function and deploy the function.
    import json
    import os
    
    import boto3
    
    dynamodb = boto3.resource("dynamodb")
    orders_table = dynamodb.Table(os.environ["source_table_name"])
    orders_by_date_table = dynamodb.Table(os.environ["target_table_name"])
    
    
    def lambda_handler(event, context):
        for record in event["Records"]:
            new_item = record["dynamodb"]["NewImage"]
    
            # Extract the order date and item number from the new item
            order_date = new_item.get("order_date", {}).get("S")
            item_number = new_item.get("item_number", {}).get("S")
            quantity = int(new_item.get("quantity", {}).get("N", 0))
    
            # Ensure that the required attributes exist
            if order_date and item_number and quantity:
                # Construct the partition key for the orders_by_date table
                date_partition_key = order_date.split("T")[0]
                # update orders_by_date table
                orders_by_date_table.update_item(
                    Key={"order_date": date_partition_key, "item_number": item_number},
                    UpdateExpression="ADD #q :val",
                    ExpressionAttributeNames={"#q": "quantity"},
                    ExpressionAttributeValues={":val": quantity},
                )

Create a trigger for the source table

To create a trigger for the source table, complete the following steps:

  1. On the DynamoDB console, navigate to the source table Order_by_item.
  2. On the Exports and streams tab, in the Trigger section, choose Create trigger.
  3. For Lambda function, enter the function that you deployed earlier.
  4. For Batch size, enter a value 10 (Since we are testing the solution with less than 10 records) for this scenario.
  5. Select Turn on trigger.
  6. Choose Create trigger.

Configure Lambda function to add environment variables and filter criteria

Now we create two variables source_table_name and target_table_name using the following steps:

  1. Open the Functions page of AWS Lambda console.
  2. Select the AggregateFunctions function.
  3. Choose the configuration tab, select Environment variables and choose Edit.
    1. Choose add environment variable, add source_table_name in key and Order_by_item in Value
    2. And click Add environament variable to enter target_table_name in key and item_count_by_table in Value and Save.

Add the filtercriteria for the lambda function to trigger only when there is an Insert or Modify event type and ignore rest of the event types that are present in the DynamoDB Stream.

  1. Locate the Triggers tab under Configuration.
  2. Select the exiting trigger for Order_by_item and choose edit.

  3. Select the Additional settings, in Filter criteria section add{"eventName": ["INSERT", "MODIFY"]} and Save.

Insert a new record into the source table to trigger the lambda function which performs the aggregation.

  1. Insert test items to the source DynamoDB table and verify the aggregated data based on Item in the target table.
    For Linux or Mac:

    aws dynamodb put-item --table-name Order_by_item \
     --item '{"orderid": {"S": "178526"}, "order_date": {"S": "'"$(date -u +"%Y-%m-%dT%H:%M:%SZ")"'"}, "item_number": {"S": "item123"}, "quantity": {"N": "10"}}'
     
     aws dynamodb put-item --table-name Order_by_item \
     --item '{"orderid": {"S": "178527"}, "order_date": {"S": "'"$(date -u +"%Y-%m-%dT%H:%M:%SZ")"'"}, "item_number": {"S": "item123"}, "quantity": {"N": "30"}}'
    
     aws dynamodb put-item --table-name Order_by_item \
     --item '{"orderid": {"S": "172528"}, "order_date": {"S": "'"$(date -u +"%Y-%m-%dT%H:%M:%SZ")"'"}, "item_number": {"S": "item312"}, "quantity": {"N": "10"}}'
     
     aws dynamodb put-item --table-name Order_by_item \
     --item '{"orderid": {"S": "178529"}, "order_date": {"S": "'"$(date -u +"%Y-%m-%dT%H:%M:%SZ")"'"}, "item_number": {"S": "item312"}, "quantity": {"N": "30"}}'

    For Windows:

    aws dynamodb put-item --table-name Order_by_item ^
     --item '{"orderid": {"S": "178526"}, "order_date": {"S": "'"$(date -u +"%Y-%m-%dT%H:%M:%SZ")"'"}, "item_number": {"S": "item123"}, "quantity": {"N": "10"}}'
     
     aws dynamodb put-item --table-name Order_by_item ^
     --item '{"orderid": {"S": "178527"}, "order_date": {"S": "'"$(date -u +"%Y-%m-%dT%H:%M:%SZ")"'"}, "item_number": {"S": "item123"}, "quantity": {"N": "30"}}'
    
     aws dynamodb put-item --table-name Order_by_item ^
     --item '{"orderid": {"S": "178528"}, "order_date": {"S": "'"$(date -u +"%Y-%m-%dT%H:%M:%SZ")"'"}, "item_number": {"S": "item312"}, "quantity": {"N": "10"}}'
     
     aws dynamodb put-item --table-name Order_by_item ^
     --item '{"orderid": {"S": "178529"}, "order_date": {"S": "'"$(date -u +"%Y-%m-%dT%H:%M:%SZ")"'"}, "item_number": {"S": "item312"}, "quantity": {"N": "30"}}'

    The following image shows how the table data would look like after the insert statements in NoSQL Workbench.

  2. Verify if the target table has the aggregated data using AWS CLI:
    For Linux or Mac:

    aws dynamodb query --table-name item_count_by_date \
        --key-condition-expression "item_number = :pkValue" \
        --expression-attribute-values '{":pkValue": {"S": "item123"}}' \
        --projection-expression "item_number, quantity" 
        
     For Windows:
     aws dynamodb query --table-name item_count_by_date ^
        --key-condition-expression "item_number = :pkValue" ^
        --expression-attribute-values '{":pkValue": {"S": "item123"}}' ^
        --projection-expression "item_number, quantity" 

    Response:

    {
        "Items": [
            {
                "quantity": {
                    "N": "40"
                },
                "item_number": {
                    "S": "item123"
                }
            }
        ],
        "Count": 1,
        "ScannedCount": 1,
        "ConsumedCapacity": null
    }

The following image is how the item_count_by_date table which contains the aggregated data in NoSQL Workbench.

Clean Up

To prevent incurring future expenses, after testing the solution, ensure resource deletion by following the steps below:

  1. If you have used the CloudFormation template follow the steps mentioned in this link to delete the resources.
  2. Delete the Lambda function AggregateFunctions by running the following command:
    aws lambda delete-function AggregateFunctions
  3. Delete the DynamoDB tables Order_by_item and item_count_by_date by running the following command:
aws dynamodb delete-table –-table-name Order_by_item

aws dynamodb delete-table –-table-name item_count_by_date

Conclusion

In this post, we showed how to build aggregations on data stored in DynamoDB tables using DynamoDB Streams and Lambda. DynamoDB Streams provides an easy integration point to retrieve changed data without needing to query your table. Lambda provides a serverless, event-driven mechanism to update counts and aggregates for more efficient retrieval later. Together, these tools offer a simple and effective way to implement real-time aggregations with DynamoDB.

Test the procedure that is outlined in this post by deploying the sample code provided and share your feedback in the comments section. Use this CloudFormation template  to deploy the solution for testing.

We have adapted the concepts from this post into a deployable solution, now available as Guidance for Processing Real-Time Data Using Amazon DynamoDB in the AWS Solutions Library. To get started, review the architecture diagrams and the corresponding AWS Well-Architected framework, then deploy the sample code to implement the Guidance into your workloads.


About the author

Rajesh Kantamani is a Senior Database Specialist SA. He specializes in assisting customers with designing, migrating, and optimizing database solutions on Amazon Web Services, ensuring scalability, security, and performance. In his spare time, he loves spending time outdoors with family and friends.