AWS Database Blog

Perform near real time analytics using Amazon Redshift on data stored in Amazon DocumentDB

In this post, we learn how to stream data from Amazon DocumentDB (with MongoDB compatibility) to Amazon Redshift, unlocking near-real-time analytics and insights. We cover using Amazon DocumentDB change streams and Amazon Redshift streaming ingestion, along with AWS Lambda and Amazon Kinesis Data Streams. We also provide an AWS CloudFormation template for easy deployment of these services.

Amazon DocumentDB (with MongoDB compatibility), is a fully managed native JSON document database that makes it straightforward and cost-effective to operate critical document workloads at virtually any scale without managing infrastructure. Amazon DocumentDB also has a change streams feature that provides a time-ordered sequence of change events that occur within your cluster’s collections. You can read events from a change stream to implement many different use cases, including analytics with Amazon Redshift.

Amazon Redshift is a fully managed, petabyte-scale data warehouse service in the cloud. It also offers a streaming ingestion feature that natively integrates with Amazon Kinesis Data Streams and Amazon Managed Streaming for Apache Kafka (Amazon MSK) to pull data directly to Amazon Redshift without staging data in Amazon Simple Storage Service (Amazon S3).

Solution overview:

Let’s consider a finance company that uses Amazon DocumentDB as an operational store to maintain customer profiles. Recognizing the importance of data analytics, they now aim to gain valuable insights on customer behavior, product performance, and revenue analysis. To achieve this, they plan to implement near-real-time replication from Amazon DocumentDB to Amazon Redshift, which offers advanced querying capabilities for intricate data transformations. This near-real-time data streaming from Amazon DocumentDB to Amazon Redshift will enable them to perform data analysis on the latest information, empowering them to make data-driven decisions in a timely manner.

The following diagram shows the overall architecture of the solution.

The architecture consists of the following stages:

  1. An Amazon DocumentDB cluster has change streams enabled for the specific collection that will be used for streaming data to Amazon Redshift.
  2. A trigger allows the Lambda function to be invoked whenever a new document is added to a collection with change streams enabled.
  3. The Lambda function writes the incoming document to a Kinesis data stream.
  4. With streaming ingestion, data is streamed from the data stream to Amazon Redshift, enabling near-real-time analysis using views.

This architecture enables the continuous streaming of data from Amazon DocumentDB to Amazon Redshift, providing a seamless flow of data for near-real-time analysis and insights.

Deploy the CloudFormation stack

Use the following CloudFormation template to deploy the stack that creates most of the resources that are required to implement and test the solution in your account.

The CloudFormation deploys the following resources:

  • An AWS Cloud9 environment named DocDBStreaming, using an Amazon Elastic Compute Cloud (Amazon EC2) instance of type t2.micro and Amazon Linux 2 as the default OS.
  • An Amazon DocumentDB instance-based cluster named transactcluster of version 5.0
  • An Amazon Redshift Serverless namespace
  • Secrets in AWS Secrets Manager with the user name and password of the Amazon DocumentDB and Amazon Redshift clusters
  • A Kinesis data stream named docdbstream with capacity mode set as On-Demand
  • A Lambda function called ${AWS::StackName}-DocumentDBLambdaESM-${AWS::Region}, which is used to stream the data to the Kinesis data stream
  • A AWS Identity and Access Management (IAM) role ${AWS::StackName}-DocumentDBLambdaESMRole-${AWS::Region}, That gives Lambda function permissions to access the DocumentDB cluster.

When the deployment of the Stack is complete, you can proceed to configuring and testing the solution.

Connect to the Amazon DocumentDB cluster using AWS Cloud9

Prior to connecting to the Amazon DocumentDB cluster from AWS Cloud9, we install all the required libraries. Complete the following steps:

  1. Launch the AWS Cloud9 environment DocDBStreaming from the AWS Cloud9 console.
  2. Run the following commands to install the required packages and the Amazon DocumentDB Certificate Authority (CA) certificate key:
# Setting up mongo 4.0 repo
echo -e "[mongodb-org-4.0] \nname=MongoDB Repository\nbaseurl=https://repo.mongodb.org/yum/amazon/2013.03/mongodb-org/4.0/x86_64/\ngpgcheck=1 \nenabled=1 \ngpgkey=https://www.mongodb.org/static/pgp/server-4.0.asc" | sudo tee /etc/yum.repos.d/mongodb-org-4.0.repo

# Installing packages
sudo yum -y update
sudo yum -y install mongodb-org-shell

# Downloading the SSL file and the loader
wget https://truststore.pki.rds.amazonaws.com/global/global-bundle.pem

Now you can connect to the Amazon DocumentDB cluster.

  1. Navigate to Secrets Manager Console, to access the credentials for your DocumentDB cluster. Look for the secret named secretDocDBAdminUser-<randomly generated string>.
  2. Navigate to Amazon DocumentDB console, choose Clusters in the navigation pane.
  3. Choose the cluster transactcluster.
  4. In the Connectivity & Security section, Copy the Mongo shell command, add the username and password from the Secrets Manager as mentioned in Step 1.
# Connect to the cluster
mongo --ssl --host <cluster endpoint>:27017 --sslCAFile global-bundle.pem  --username <user_name> --password <insertYourPassword> 

Create the database and collection, and enable change streams

Create the database findata and the collection financialData to capture the transactions and customer profiles, then enable the streams for that collection. Use the following commands:

# Create database findata
use findata

# Create Collection financialData
db.createCollection("financialData")

# Enable change streams for collection
db.adminCommand({modifyChangeStreams: 1,
database: "findata",
collection: "financialData",
enable: true});

# Insert a record into the new Collection
db.financialData.insertOne({"customer_id": "116", "customer_name": "Nina Quinn", "customer_email": "ninaq@example.com", "customer_address": "123 Aspen St", "transaction_amount": 850.00, "transaction_timestamp": "2023-01-16 13:00:00", "transaction_type": "Purchase", "payment_method": "Credit Card", "transaction_status": "Completed"});

Note that change streams have the capability to be enabled at different scopes, including the cluster, database, and collection levels. However, for our purposes, we are initiating them solely at the collection level.

Configure a trigger for the Lambda function

Create a trigger for the Lambda function to be invoked whenever there is a change in the financialData collection:

  1. On the Lambda console, navigate to the function ${AWS::StackName}-DocumentDBLambdaESM-${AWS::Region}.
  2. Choose the Configuration tab, then choose Triggers in the left pane.
  3. Choose Add trigger.
  4. Under Trigger configuration, for the source, select DocumentDB.
  5. Create the event source mapping with the following configuration:
    • For DocumentDB cluster, choose the cluster transactcluster.
    • For Database name, enter findata.
    • For Collection name, you can leave this blank or enter financialData as the collection name.
    • For Batch size, enter 100.
    • For Starting position, choose Latest.
    • For Authentication, choose BASIC_AUTH.
    • For Secrets Manager key, enter the DocDBSecret that you created using the CloudFormation template.
    • For Full document configuration, choose UpdateLookup.

      It’s important to recognize that stream polling during the creation and updates of event source mappings exhibits eventual consistency. This means there can be a delay in starting or restarting the polling of events, which could take several minutes. Consequently, setting the stream’s starting position to LATEST might lead to missed events during these periods. To prevent this, it’s recommended to use TRIM_HORIZON or AT_TIMESTAMP as the starting position to ensure all events are captured. For more information on how to configure change streams, see Using change streams with Amazon DocumentDB.
  6. Choose Add.Wait for the event source mapping to be created (which can take a few minutes) and the state to change to Enabled.
    Let’s insert a few records into Amazon DocumentDB through AWS Cloud9 to test the solution and confirm that we see the data on the Amazon Redshift side.
  7. Insert the data into Amazon DocumentDB:
db.financialData.insertOne({"customer_id": "117", "customer_name": "Oscar Reed", "customer_email": "oscarr@example.com", "customer_address": "456 Birch St", "transaction_amount": 900.00, "transaction_timestamp": "2023-01-17 14:00:00", "transaction_type": "Refund", "payment_method": "Debit Card", "transaction_status": "Failed"});
db.financialData.insertOne({"customer_id": "118", "customer_name": "Paula Stone", "customer_email": "paulas@example.com", "customer_address": "789 Cedar St", "transaction_amount": 950.00, "transaction_timestamp": "2023-01-18 15:00:00", "transaction_type": "Purchase", "payment_method": "PayPal", "transaction_status": "Pending"});
db.financialData.insertOne({"customer_id": "119", "customer_name": "Quincy Tate", "customer_email": "quincyt@example.com", "customer_address": "321 Douglas St", "transaction_amount": 1000.00, "transaction_timestamp": "2023-01-19 16:00:00", "transaction_type": "Purchase", "payment_method": "Credit Card", "transaction_status": "Completed"});
db.financialData.insertOne({"customer_id": "120", "customer_name": "Rachel Underwood", "customer_email": "rachelu@example.com", "customer_address": "654 Elm St", "transaction_amount": 1050.00, "transaction_timestamp": "2023-01-20 17:00:00", "transaction_type": "Refund", "payment_method": "Debit Card", "transaction_status": "Completed"});
db.financialData.insertOne({"customer_id": "121", "customer_name": "Steve Victor", "customer_email": "stevev@example.com", "customer_address": "987 Fir St", "transaction_amount": 1100.00, "transaction_timestamp": "2023-01-21 18:00:00", "transaction_type": "Purchase", "payment_method": "Credit Card", "transaction_status": "Pending"});
db.financialData.insertOne({"customer_id": "122", "customer_name": "Tina Walker", "customer_email": "tinaw@example.com", "customer_address": "123 Grove St", "transaction_amount": 1150.00, "transaction_timestamp": "2023-01-22 19:00:00", "transaction_type": "Purchase", "payment_method": "PayPal", "transaction_status": "Failed"});
db.financialData.insertOne({"customer_id": "123", "customer_name": "Uma Xander", "customer_email": "umax@example.com", "customer_address": "456 Holly St", "transaction_amount": 1200.00, "transaction_timestamp": "2023-01-23 20:00:00", "transaction_type": "Refund", "payment_method": "Credit Card", "transaction_status": "Completed"});
db.financialData.insertOne({"customer_id": "124", "customer_name": "Victor Young", "customer_email": "victory@example.com", "customer_address": "789 Ivy St", "transaction_amount": 1250.00, "transaction_timestamp": "2023-01-24 21:00:00", "transaction_type": "Purchase", "payment_method": "Debit Card", "transaction_status": "Pending"});
db.financialData.insertOne({"customer_id": "125", "customer_name": "Wendy Zane", "customer_email": "wendyz@example.com", "customer_address": "321 Juniper St", "transaction_amount": 1300.00, "transaction_timestamp": "2023-01-25 22:00:00", "transaction_type": "Purchase", "payment_method": "Credit Card", "transaction_status": "Completed"});

Note that when change streams are enabled at the database or cluster level, appropriate scaling of Lambda functions in response to write throughput is necessary. This scaling can result in increased latency in the availability of data on Amazon Redshift after it has been persisted in Amazon DocumentDB. Choosing the right batch size is crucial for the efficiency and performance of the Lambda function. Moreover, understanding Lambda concurrency, which refers to the number of instances of your Lambda function that can run simultaneously, is vital. For an in-depth exploration of Lambda scaling and throughput, refer to Understanding AWS Lambda scaling and throughput.

In some cases, interruptions in Lambda can lead to the occurrence of duplicate events. To address these rare but important scenarios, it’s crucial to properly configure the Lambda event source mapping and develop robust error handling mechanisms. For comprehensive guidance on making your Lambda function idempotent, which is a key strategy in managing such cases, refer to How do I make my Lambda function idempotent. Additionally, for guidance on configuring Lambda functions with Amazon DocumentDB, see Using Lambda with Amazon DocumentDB.

Having accomplished the setup to capture changes in Amazon DocumentDB and stream them to Kinesis Data Streams, we have successfully configured the initial phase. Now we can set up Amazon Redshift streaming ingestion. This step involves reading the data from Kinesis Data Streams and loading it into Redshift tables.

Set up Amazon Redshift streaming ingestion

Setting up streaming ingestion in Amazon Redshift is a two-step process. You first need to create an external schema to map to Kinesis Data Streams, then you create a materialized view to pull data from the stream. The materialized view must be incrementally maintainable. Complete the following steps to set up streaming ingestion:

  1. On the Amazon Redshift console, choose Query editor v2 in the navigation pane.
  2. In the query editor, choose the workgroup Serverless:work-group-streamingblog in the left pane and toggle to use database user name and password.
  3. Navigate to secrets manager and use the username and password for secret named secretRedshiftAdminUser-<randomly generated string>.
  4. Run the following command to create an external schema. This will map the data from Kinesis Data Streams to a Redshift object:
CREATE EXTERNAL SCHEMA findataanalysis FROM KINESIS
IAM_ROLE default;

5. Run the following command to create the materialized view. The materialized view is set to auto refresh and will be refreshed as data keeps arriving in the stream.

CREATE MATERIALIZED VIEW findata_extract sortkey(1) AUTO REFRESH YES AS
SELECT
refresh_time,
approximate_arrival_timestamp,
partition_key,
shard_id,
sequence_number,
json_parse(kinesis_data) as payload
FROM findataanalysis.docdbstream
WHERE CAN_JSON_PARSE(kinesis_data);

6. Now that we have created the materialized view, let’s examine the data that’s been loaded. Note that the first refresh of the materialized view can take a while, so you will need to wait about a minute before checking the materialized view.

SELECT payload from findata_extract;

We can confirm in the output that we are streaming the data from the Amazon DocumentDB collection financialdata to the Redshift materialized view findata_extract.

Create a view on the materialized view to perform analysis

To enable near-real-time analysis on the financialData collection, let’s create a view on top of the materialized view findata_extract. This approach allows us to seamlessly integrate data from multiple sources without the need to wait for extract, load, and transform (ELT) operations. See the following code:

ALTER USER awsuser set enable_case_sensitive_identifier =true;
set enable_case_sensitive_identifier =true;
CREATE
OR REPLACE VIEW "public"."findata_extract_view" AS
SELECT
    CAST(
        "findata_extract"."payload"."customer_id" AS VARCHAR
    ) AS "customer_id",
    CAST(
        "findata_extract"."payload"."customer_email" AS VARCHAR
    ) AS "customer_email",
    CAST(
        "findata_extract"."payload"."customer_address" AS VARCHAR
    ) AS "customer_address",
    CAST(
        "findata_extract"."payload"."transaction_amount" AS VARCHAR
    ) AS "transaction_amount",
    CAST(
        "findata_extract"."payload"."transaction_timestamp" AS VARCHAR
    ) AS "transaction_timestamp",
    CAST(
        "findata_extract"."payload"."transaction_type" AS VARCHAR
    ) AS "transaction_type",
    CAST(
        "findata_extract"."payload"."payment_method" AS VARCHAR
    ) AS "payment_method",
    CAST(
        "findata_extract"."payload"."transaction_status" AS VARCHAR
    ) AS "transaction_status"
FROM
    "public"."findata_extract" AS "findata_extract";

To validate the transformation of raw data into columnar data, we can query the near-real-time view. The following query enables us to examine the structured data resulting from the transformation process:

Select * from "public"."findata_extract_view"

You can continue to add items in Amazon DocumentDB and view the data refreshed in Amazon Redshift in near-real time.

Clean up

To prevent incurring additional charges and to effectively remove resources, delete the resources you created with the CloudFormation template. For instructions, refer to Deleting a stack on the AWS CloudFormation console.

Conclusion

In this post, explored a near-real-time data streaming solution from Amazon DocumentDB to Amazon Redshift. We used the capabilities of Amazon DocumentDB change streams, Lambda, and Kinesis Data Streams to create an efficient and responsive data pipeline. This approach simplifies data integration and synchronization, providing valuable insights for data-driven decision-making and analytics in real-time scenarios.

Test the procedure that is outlined in this post by deploying the sample code provided and share your feedback in the comments section.


About the authors

Rajesh Kantamani is a Senior Database Specialist SA. He specializes in assisting customers with designing, migrating, and optimizing database solutions on Amazon Web Services, ensuring scalability, security, and performance. In his spare time, he loves spending time outdoors with family and friends.

Veerendra Nayak is a Principal Database Solution Architect based in the Bay Area, California. He works with customers to share best practices on database migrations, resiliency, and integrating operational data with Analytics and AI services.