AWS Big Data Blog

Monitor data quality in your data lake using PyDeequ and AWS Glue

August 2024: This post was reviewed and updated with examples against a new dataset. Additionally, changed the architecture to use AWS Glue Studio Notebooks and added information on the appropriate Deequ/PyDeequ versions.


In our previous post, we introduced PyDeequ, an open-source Python wrapper over Deequ, which enables you to write unit tests on your data to ensure data quality. The use case we ran through was on static, historical data, but most datasets are dynamic, so how can you quantify how your data is changing and detect anomalous changes over time?

At Amazon, we’ve leveraged PyDeequ on AWS Glue to address this problem. AWS Glue is a serverless data integration service that allows you to easily prepare and combine your data for analytics, machine learning (ML), and application development. AWS Glue enables data engineers to build extract, transform, and load (ETL) workflows with ease. By using PyDeequ with AWS Glue, you can create a metrics repository on your data and check for anomalous changes over time inside your ETL workflows. In this post, we share this design pattern with you.

Use cases of PyDeequ on AWS Glue include:

  • Identifying and counting mismatched schema items and then immediately correcting them
  • Reviewing your incoming data with standard or custom, predefined analytics before storing it for big data validation
  • Tracking changes in data distribution by using a data quality metric file
  • Immediately identifying and creating useful constraints based on data distribution

The post describes the implementation process and provides a step-by-step tutorial of tracking changes in data quality. It walks you through an example of transforming a large dataset to identify the seasonality of the trends over time. Next, you create, sort, and load a metrics repository using PyDeequ, which allows you to persist your analysis over time. Finally, you create an alert that notifies you when a data point is outside the forecasted range.

Since you’re reading this post, you may also be interested in the following:

Where are the anomalies?

It can be difficult to immediately find anomalies within your incoming data stream over time. PyDeequ makes it easier to identify changes in data distribution by creating a metrics repository. The repository allows you to store and load a variety of anomaly checks to compare current and past metric values. For this post, you learn about the Holt Winters anomaly detection strategy, one of the various anomaly detection strategies that PyDeequ provides. The Holt Winters model forecasts future datasets based on a repeated periodical pattern (seasonality), a trend (slope), and the average between two corresponding time points.

You can apply the Holt Winters method in many different use cases, such as the following:

  • Business problem – Identifying a shift in the demand of a product
  • Data pattern – Input data deviates from trend and seasonality
  • Business analysis – Detecting changes in profits over time

To demonstrate this anomaly detection strategy, we have generated a synthetic reviews dataset with total votes in the jewelry subset from 2013 to 2015. A graph of this data shows a correlation between years 2013 and 2014, however, the data looks different in 2015. The following graph illustrates January 2015 as divergent from the previous years, with an increase in the number of total votes relative to the previous years.

How can we detect similar events to these in new data?

With PyDeequ, you can easily identify anomalies without any visuals. January 2015 is outside the calculated forecast range; therefore, PyDeequ flags the data point as anomalous. This post demonstrates using PyDeequ’s anomaly detection to get email notifications for anomalous events, which look like the following screenshot.

Solution architecture

With Amazon Athena and an AWS Glue crawler, you can create an AWS Glue Data Catalog entry to access the Amazon Simple Storage Service (Amazon S3) data source. This allows the data to be easily queried for usage downstream. You can use an AWS Glue Notebook to interact with the dataset and run the checks. We configure our AWS Glue ETL jobs to use PyDeequ to store results in Amazon S3, and use Amazon Simple Notification Service (Amazon SNS) to notify administrators of any anomalies identified in the data.

The following diagram illustrates this architecture.

Solution overview

To implement this solution, you complete the following high-level steps:

  1. Create an SNS topic.
  2. Upload PyDeequ and Deequ to Amazon S3.
  3. Create an AWS Identity and Access Management (IAM) role for AWS Glue.
  4. Crawl, query, and create your dataset.
  5. Transform the dataset into a table.
  6. Create an AWS Glue Notebook.
  7. Create a new AWS Glue session.
  8. Extract the table.
  9. Transform the table.
  10. Use PyDeequ to detect anomalous data points.

Create an SNS topic

Complete the following steps to create your SNS topic:

  1. On the Amazon SNS console, choose Topics.
  2. Choose Create topic.
  3. For Type, choose Standard.
  4. For Name, enter jewelry_hw.
  5. For Display name, enter Holt Winters Anomaly Example.
  6. Keep the other options as default and choose Create Topic.
  7. On the details page for the topic you just created, under Subscription, choose Create subscription.
  8. For Protocol, choose Email.
  9. For Endpoint, enter the email address to which you want to receive the notification.
  10. Keep the other options as default and choose Create subscription. An email will be sent to the entered endpoint.
  11. Open the email message and choose Confirm subscription.
  12. Take a note of the ARN for your SNS topic as you will need it when we begin working on the data in the AWS Glue notebook.

Prepare dependencies: upload PyDeequ and Deequ to Amazon S3

In this step, you will create an S3 bucket to store Glue job dependencies, Athena query results and PyDeequ metrics. We will upload PyDeequ source and Deequ JAR file to the S3 bucket as these will be needed by the AWS Glue Notebook.

It is critical to download the version of deequ JAR that corresponds to the versions of Spark PyDeequ that you are using. Please see the blog post “Test data quality at scale with PyDeequ” for detailed explanation on how to identify the correct version of deequ.

Due to the ongoing issue with the dependency on the breeze package in deequ, the Holt Winters Anomaly detection works as expected in AWS Glue 2.0, pydeequ release/1_1_1,  Spark 2.4 and deequ-1.1.0_spark-2.4-scala-2.11.jar (only).

The deequ jar can be downloaded from the Apache Maven repository.

  1. From the Amazon S3 console, create a new bucket. We reference it as <__YOUR_BUCKET__> throughout this post.
  2. Create a folder called dependencies from within this bucket.
  3. Download the the deequ jar that corresponds to your version of Spark and pydeequ.
  4. Clone the PyDeequ repository from GitHub. Below we show how to clone the latest version on *nix operating system. Utilize -b <release> git option to clone a specific release. Create a .zip file for PyDeequ by compressing the folder that contains the __init__.py file.
  5. Upload the deequ JAR and PyDeequ zip to your dependencies folder.

If you’re on a *nix operating system or have the AWS Command Line Interface (AWS CLI) configured, you can use the following code:

$ wget https://repo1.maven.org/maven2/com/amazon/deequ/deequ/<version>/deequ-<version>.jar
$ git clone https://github.com/awslabs/python-deequ.git
$ cd python-deequ && zip -r ../pydeequ.zip pydeequ && cd ../
$ aws s3 cp deequ-<version>.jar s3://<__YOUR_BUCKET__>/dependencies/
$ aws s3 cp pydeequ.zip s3://<__YOUR_BUCKET__>/dependencies/

Create an IAM role for AWS Glue

AWS Glue is a serverless data integration service that makes it easy for analytics users to discover, prepare, move, and integrate data from multiple sources such as Amazon Athena and Amazon S3. AWS Glue job notebooks are based on Jupyter notebooks. These notebooks are serverless and require minimal setup in AWS Glue so you can get started quickly. In this section we will create a role and attach required policies needed to work with AWS Glue notebooks and other services in our architecture. To read more about roles and policies needed to work with AWS Glue notebooks follow the documentation link here.

Create a custom policy

First, we create a policy to allow access to S3 buckets and to send a message to an Amazon SNS topic.

  1. On the IAM console choose Policies and choose Create Policy.
  2. Switch from Visual to JSON and paste the policy below, but be sure to replace the placeholder values your resource values:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "s3:Put*",
                    "s3:Get*",
                    "s3:Create*",
                    "s3:Delete*"
                ],
                "Resource": [
                    "arn:aws:s3:::<__YOUR_BUCKET__>/*"
                ]
            },
            {
                "Effect": "Allow",
                "Action": "sns:Publish",
                "Resource": "arn:aws:sns:*:*:jewelry_hw"
            }
        ]
    }
  3. Choose Next.
  4. Under the Policy details section provide policy name, for example, pydeequ-hw-sns-s3.
  5. Choose Create Policy.

Create a role

You now create an IAM role and attach the required policies.

  1. On the IAM console, choose Roles.
  2. Choose Create a role.
  3. For Trusted entity, choose AWS Service.
  4. For Use case, choose Glue.
  5. Choose Next.
  6. Add the AWS managed policy to the role: AWSGlueConsoleFullAccess.
  7. Add the policy you created in the previous section: pydeequ-hw-sns-s3.
  8. Under the Role details section provide policy name. The name should start with AWSGlueServiceRole-. For example, you can name it AWSGlueServiceRole-glue-notebook.
  9. Choose Create Policy.

Crawl, query, and create the dataset

You will use Amazon Athena to query the data stored in an Amazon S3 bucket. Since Athena needs to access table metadata to run queries, you first need to create a metastore or a data catalog. AWS Glue data catalog natively integrates with Athena. You can create a data catalog by either creating a data crawler from Athena Query console under Tables and views or from AWS Glue Console. Here we will show you how to create a crawler in AWS Glue Console. With either option the newly created databases and tables will become available for query in Athena.

  1. On the AWS Glue console, choose Databases under Data Catalog and choose Add database.
  2. Provide database name jewelry_hw under Name and choose Create database.
  3. Choose the newly created database jewelry_hw and under Tables and select Add tables using crawler.
  4. On the page Set crawler properties provide crawler name and choose Next.

  5. Under Choose data sources and classifiers choose Not yet under Is your data already mapped to Glue tables? 
  6. Choose Add a data source.
  7. In the pop-up window update the location of S3 data to In a different account
  8. Under S3 path enter s3://aws-bigdata-blog/generated_synthetic_reviews/data/product_category=Jewelry/
  9. Under Subsequent crawler runs choose Crawl all subfolders.
  10. Select Add an S3 data source.
  11. Choose Next.
  12. Under Configure security settings choose Create new IAM role. Provide the following name: AWSGlueServiceRole-jewelry-hw.
  13. Choose Next.
  14. Under Set output and scheduling choose jewerly_hw as the Target database and choose On demand under Crawler schedule.
  15. Choose Next.
  16. Under Review and create review the crawler properties, data sources, created role, output and scheduling and choose Create Crawler.
  17. On the next page, choose Run crawler.

The crawler will begin its run which will take a couple of minutes.

You are now ready to navigate to Athena console to query and explore the dataset.

Create a table with the sum of all total product review votes by month in Athena

We create a new table and apply Holt Winters anomaly detection algorithm in PyDeequ to the sum of the total product review votes by year and month.

If this is your first time working with Athena you will need to define the query result location in S3 for your queries.

  1. In Athena Query editor, under the Settings tab, choose Manage.
  2. Provide s3://<__YOUR_BUCKET__>/athena-queries/ as the location and choose Save.
  3. You might also be prompted to provide an S3 location when you first open Athena Query Editor. This step needs to be done only once.
  4. Return to the Editor and under Database select jewelry_hw. You will see the table product_category_jewelry.

Let’s create a second table from this dataset. This table includes three columns, which contain month, year, the sum of total product review votes and will subset the review only for the US. Enter the following query in the query editor:

CREATE TABLE jewelry_hw.jewelry_dataset
WITH (
format='PARQUET'
) AS
SELECT total_votes, review_year,
Date_FORMAT(review_date,
'%Y-%c-01') AS review_date
FROM product_category_jewelry
WHERE marketplace = 'US'
ORDER BY review_date DESC
  1. Choose Run.
  2. Under Tables and views in Athena Query Editor you will now see a new table called jewelry_dataset in jewelry_hw database.

Create an AWS Glue Notebook session

To create your AWS Glue Notebook, complete the following steps:

  1. In the AWS Glue console under ETL jobs, choose Notebooks.
  2. On the page AWS Glue Studio choose Notebook.
  3. Select Engine: Spark (Python), Options: Start fresh.
  4. Under IAM role choose the role you created earlier: AWSGlueServiceRole-glue-notebook.
  5. Choose Create notebook.

Wait a couple minutes for the resources to be provisioned. The provisioned AWS Glue Studio Notebook will be populated with the sample code and markdown instructions to help you get started. Feel free to delete all cells.

  1. Using notebook commands (“magics”) provide the path to the zipped PyDeequ library in S3:
    %extra_py_files s3:// <__YOUR_BUCKET__>/dependencies/pydeequ.zip
  2. Provide the path to the deequ JAR:
    %extra_jars s3:// <__YOUR_BUCKET__>/dependencies/deequ-<version>.jar
  3. Set up and start your interactive session. Specify your glue version using %glue_version magic (see all supported AWS Glue versions here. However, due to the ongoing dependency issue, only AWS Glue 2.0 works at this time):
    %idle_timeout 2880
    %glue_version <version> 
    %worker_type G.1X
    %number_of_workers 5
    
    import sys
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.job import Job
      
    sc = SparkContext.getOrCreate()
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    job = Job(glueContext)

Wait for the session to start. You should see the following message Session <session_id> has been created, the kernel will become idle and ready to accept a new command.

  1. Configure variables to store your S3 bucket, ARN for the SNS topic and the region in the next cell
    s3_bucket = <__YOUR_BUCKET__>
    topic_arn = "arn:aws:sns: <__YOUR_REGION__>:<__YOUR_ACCOUNT__>:jewelry_hw"
    region = <__YOUR_REGION__>

Extract the table

You must read the data table jewelry_dataset and turn it into to a DataFrame so that it can be used with PyDeequ. Next, use the dropDuplicates method to remove any potential duplicates within the dataset. See the following code:

jewelry_dyf = glueContext.create_dynamic_frame.from_catalog(database="jewelry_db", table_name="jewelry_dataset")
jewelry_df = jewelry_dyf.toDF()
jewelry_df.dropDuplicates()

The following screenshot shows your output.

Transform the table

You can further simplify the jewelry_df table by using the date_format method to change the column to only show the month and year of total_votes. Afterwards, you can filter jewelry_df2 by year to contain only the two columns needed. See the following code:

import pyspark.sql.functions as f

jewelry_df2 = jewelry_df.withColumn('review_date', f.date_format('review_date', 'yyyy/M'))\
.orderBy('review_date', ascending = False)

df_2013 = jewelry_df2.filter("year ='2013'").select("review_date","total_votes")
df_2014 = jewelry_df2.filter("year ='2014'").select("review_date","total_votes")
df_2015 = jewelry_df2.filter("year ='2015'").select("review_date","total_votes")

You can use df_2013.show(10) to see an iteration of what your data table looks like before iterating through PyDeequ. The following screenshot shows our output.

Use PyDeequ to detect anomalous data points

First, specify your version of Spark (use Spark 2.4 for now, due to the dependency issue):

import os
os.environ["SPARK_VERSION"] = '<your Spark version>'

For this post, we demonstrate how you can detect anomalous data points with the FileSystemMetricsRepository class. A metrics repository is stored in JSON format to be used as a data quality report over time in Amazon S3, HDFS, or in memory. The variable s3_write_path is where you want your JSON file to be stored within Amazon S3. See the following code:

s3_write_path = f"s3://{s3_bucket}/holt_winters_tutorial.json"
import pydeequ
from pydeequ.repository import *
metricsRepository = FileSystemMetricsRepository(session,s3_write_path)

Now load the 2013–2014 dataset into metrics.

If your dataset is collected monthly, and follows an annual seasonal trend, use the MetricInterval.Monthly and SeriesSeasonality.Yearly metrics. This selection requires you to collect at least 25 data points. The initial 24 data points are monthly values from 2013–2014, which we use to create the Holt Winters model. The values in 2015 are the forecasted points, which can identify an anomalous value.

As shown in the following code, create a for loop that iterates through df_2013. Use month to create a date to later help us query values from df_2013. The filter method allows you to create a df data frame that contains the total_votes values by month (for this post, the first iteration is a table of values from January 2013). Repeat the process for the data from 2014.

Next, each set of metrics that were computed needs be indexed by a ResultKey, which contains a timestamp and supports arbitrary tags in the form of key-value pairs.

Finally, create a VerificationSuite. You can make PyDeequ write and store metrics in Amazon S3 by adding the useRepository and saveOrAppendResult method. Then add Holt Winters with a Sum analyzer to calculate monthly total_votes. See the following code:

from pydeequ.verification import *
from pydeequ.anomaly_detection import *
from pydeequ.analyzers import *

for month in range(1,13):
    date = "\'2013/"+str(month)+"\'"
    df = df_2013.filter("review_date =" + date)
    
    key_tags = {'tag':  date}
    result_key_2013 = ResultKey(spark, ResultKey.current_milli_time(), key_tags)

    jewelry_result = VerificationSuite(spark).onData(df)\
        .useRepository(metricsRepository) \
        .saveOrAppendResult(result_key_2013) \
        .addAnomalyCheck(HoltWinters(MetricInterval.Monthly, SeriesSeasonality.Yearly), Sum('total_votes'))\
        .run()

for month in range(1,13):
    date = "\'2014" +'/'+str(month)+"\'"
    df = df_2014.filter("review_date =" + date)
    key_tags = {'tag':  date}
    result_key_2014 = ResultKey(spark, ResultKey.current_milli_time(), key_tags)

    jewelry_result = VerificationSuite(spark).onData(df)\
        .useRepository(metricsRepository) \
        .saveOrAppendResult(result_key_2014) \
        .addAnomalyCheck(HoltWinters(MetricInterval.Monthly, SeriesSeasonality.Yearly), Sum('total_votes'))\
        .run()

You can also load the metrics to a Data Frame to review:

analysisResult_metRep = metricsRepository.load() \
                            .before(ResultKey.current_milli_time()) \
                            .getSuccessMetricsAsDataFrame()

analysisResult_metRep.show()

Great! You have created the trend for the Holt Winters algorithm. Now it’s time to detect any anomalies within 2015.

Create another Holt Winters anomaly check similar to the 2013–2014 dataset. For each month, check for an anomaly using jewelry_result.status. If it’s not a success, that means an anomaly has been detected. Collect the constraint_message to see the error value. Use publish to create an SNS notification. Include the topicArn created in Amazon SNS, a Message, subject, and MessageAttribute. If an anomaly has been detected, break out of the loop. See the following code:

# Use AWS SNS 
import boto3 
import json

# Topic for AWS SNS 
snsClient = boto3.client('sns', region_name = region)

for month in range(1,13):
    date = "\'2015/"+str(month)+"\'"
    df = df_2015.filter("review_date =" + date)
    
    key_tags = {'tag':  date}
    result_key_2015 = ResultKey(spark, ResultKey.current_milli_time(), key_tags)

    jewelry_result = VerificationSuite(spark).onData(df)\
        .useRepository(metricsRepository) \
        .saveOrAppendResult(result_key_2015) \
        .addAnomalyCheck(HoltWinters(MetricInterval.Monthly, SeriesSeasonality.Yearly), Sum('total_votes'))\
        .run()
    
    df = VerificationResult.checkResultsAsDataFrame(spark, jewelry_result)
    
    if (jewelry_result.status != "Success"):
        print("Anomaly for total_votes has been detected")
        print(date)
        message = df.select("constraint_message").collect()
        response = snsClient.publish(TopicArn = topic_arn,
                             Message = "anomaly detected in data frame: \n" + json.dumps(message),
                             Subject = "Anomaly Detected in the jewelry dataset:"+ date,
                             MessageAttributes = {"TransactionType":
                                            {"DataType": "String.Array", "StringValue": "Anomaly Detected in Glue"}})
        break

After completing this tutorial, you should receive an email notification stating an anomaly has been detected for January 2015. This coincides with our hypothesis that PyDeequ will flag the same anomaly from the graph!

More on using AWS Glue and PyDeequ

This post shows how you can start exploring anomaly detection with PyDeequ. This simple tutorial is just the beginning of what you can do with AWS Glue. To add to this tutorial, you can create a time-based schedule for jobs and crawlers to run every time a dataset is appended.

Alternatively, you can use the different modules provided by PyDeequ and its tutorials, or the use case examples provided at the beginning of this post to further understand the dataset.

Resource cleanup

Clean up the resources created in this post when you’re finished:

Conclusion

This post demonstrates the basics of detecting anomalies using PyDeequ and AWS Glue. Anomaly detection relies on the metrics repository file. This repository can easily be stored within Amazon S3, HDFS, or in memory as a JSON object for future test usage and AWS Glue ETL jobs. In addition to AWS Glue, PyDeequ can function within Amazon EMR and SageMaker in order to best handle the needs of your data pipeline.

This approach allows you to improve the quality and your own knowledge of your dataset. You can also apply this tool to a variety of business scenarios. The contents of this tutorial are for demonstration purposes and not production workloads. Be sure to follow security best practices for handling data at rest and in transit when you adapt PyDeequ into your workflows.


About the Authors

Vitalina Komashko is a Data Scientist with AWS Professional Services. She holds a PhD in Pharmacology and Toxicology but transitioned to data science from experimental work because she wanted to own data generation and the interpretation of the results. Earlier in her career, she worked with biotech and pharma companies. At AWS, she enjoys solving problems for customers from a variety of industries and learning about their unique challenges.

Veronika Megler, PhD, is a Principal Applied Scientist for Amazon.com Consumer Packaging. Previously she was the Principal Data Scientist for AWS Professional Services. She enjoys adapting innovative big data, AI, and ML technologies to help solve new problems, and to solve old problems more efficiently and effectively. Her work has lately been focused on economic impacts of ML models and exploring causality from observational data.

Joan Aoanan is a ProServe Consultant at AWS. With her B.S. Mathematics-Computer Science degree from Gonzaga University, she is interested in integrating her interests in math and science with technology.

Calvin Wang is a Data Scientist at AWS AI/ML. He holds a B.S. in Computer Science from UC Santa Barbara and loves using machine learning to build cool stuff.


Audit History

Last reviewed and updated in August 2024 by Vitalina Komashko | Data Scientist