AWS Big Data Blog

Introducing generative AI upgrades for Apache Spark in AWS Glue (preview)

Organizations run millions of Apache Spark applications each month on AWS, moving, processing, and preparing data for analytics and machine learning. As these applications age, keeping them secure and efficient becomes increasingly challenging. Data practitioners need to upgrade to the latest Spark releases to benefit from performance improvements, new features, bug fixes, and security enhancements. However, these upgrades are often complex, costly, and time-consuming.

Today, we are excited to announce the preview of generative AI upgrades for Spark, a new capability that enables data practitioners to quickly upgrade and modernize their Spark applications running on AWS. Starting with Spark jobs in AWS Glue, this feature allows you to upgrade from an older AWS Glue version to AWS Glue version 4.0. This new capability reduces the time data engineers spend on modernizing their Spark applications, allowing them to focus on building new data pipelines and getting valuable analytics faster.

Understanding the Spark upgrade challenge

The traditional process of upgrading Spark applications requires significant manual effort and expertise. Data practitioners must carefully review incremental Spark release notes to understand the intricacies and nuances of breaking changes, some of which may be undocumented. They then need to modify their Spark scripts and configurations, updating features, connectors, and library dependencies as needed.

Testing these upgrades involves running the application and addressing issues as they arise. Each test run may reveal new problems, resulting in multiple iterations of changes. After the upgraded application runs successfully, practitioners must validate the new output against the expected results in production. This process often turns into year-long projects that cost millions of dollars and consume tens of thousands of engineering hours.

How generative AI upgrades for Spark works

The Spark upgrades feature uses AI to automate both the identification and validation of required changes to your AWS Glue Spark applications. Let’s explore how these capabilities work together to simplify your upgrade process.

AI-driven upgrade plan generation

When you initiate an upgrade, the service analyzes your application using AI to identify necessary changes across both PySpark code and Spark configurations. During preview, Spark Upgrades supports upgrading from Glue 2.0 (Spark 2.4.3, Python 3.7) to Glue 4.0 (Spark 3.3.0, Python 3.10), automatically handling changes that would typically require extensive manual review of public Spark, Python and Glue version migration guides, followed by development, testing, and verification. Spark Upgrades addresses four key areas of changes:

  • Spark SQL API methods and functions
  • Spark DataFrame API methods and operations
  • Python language updates (including module deprecations and syntax changes)
  • Spark SQL and Core configuration settings

The complexity of these upgrades becomes evident when you consider migrating from Spark 2.4.3 to Spark 3.3.0 involves over a hundred version-specific changes. Several factors contribute to the challenges of performing manual upgrades:

  • Highly expressive language with a mix of imperative and declarative programming styles, allows users to easily develop Spark applications. However, this increases the complexity of identifying impacted code during upgrades.
  • Lazy execution of transformations in a distributed Spark application improves performance but makes runtime verification of application upgrades challenging for users.
  • Spark configurations changes in default values or the introduction of new configurations across versions can impact application behavior in different ways, making it difficult for users to identify issues during upgrades.

For example, in Spark 3.2, Spark SQL TRANSFORM operator can’t support alias in inputs. In Spark 3.1 and earlier, you could write a script transform like SELECT TRANSFORM(a AS c1, b AS c2) USING 'cat' FROM TBL.

# Original code (Glue 2.0)
query = """
SELECT TRANSFORM(item as product_name, price as product_price, number as product_number)
   USING 'cat'
FROM goods
WHERE goods.price > 5
"""
spark.sql(query)

# Updated code (Glue 4.0)
query = """
SELECT TRANSFORM(item, price, number)
   USING 'cat' AS (product_name, product_price, product_number)
FROM goods
WHERE goods.price > 5
"""
spark.sql(query)

In Spark 3.1, loading and saving timestamps before 1900-01-01 00:00:00Z as INT96 in Parquet files causes errors. In Spark 3.0, this wouldn’t fail but could result in timestamp shifts due to calendar rebasing. To restore the old behavior in Spark 3.1, you would need to configure the Spark SQL configurations for spark.sql.legacy.parquet.int96RebaseModeInRead and spark.sql.legacy.parquet.int96RebaseModeInWrite to LEGACY.

# Original code (Glue 2.0)
data = [(1, "1899-12-31 23:59:59"), (2, "1900-01-01 00:00:00")]
schema = StructType([ StructField("id", IntegerType(), True), StructField("timestamp", TimestampType(), True) ])
df = spark.createDataFrame(data, schema=schema)
df.write.mode("overwrite").parquet("path/to/parquet_file") 

# Updated code (Glue 4.0)
qspark.conf.set("spark.sql.legacy.parquet.int96RebaseModeInRead", "LEGACY") 
spark.conf.set("spark.sql.legacy.parquet.int96RebaseModeInWrite", "LEGACY")

data = [(1, "1899-12-31 23:59:59"), (2, "1900-01-01 00:00:00")]
schema = StructType([ StructField("id", IntegerType(), True), StructField("timestamp", TimestampType(), True) ])
df = spark.createDataFrame(data, schema=schema)
df.write.mode("overwrite").parquet("path/to/parquet_file")

Automated validation in your environment

After identifying the necessary changes, Spark Upgrades validates the upgraded application by running it as an AWS Glue job in your AWS account. The service iterates through multiple validation runs, up to 10, reviewing any errors encountered in each iteration and refining the upgrade plan until it achieves a successful run. You can run a Spark Upgrade Analysis in your development account using mock datasets supplied through Glue job parameters used for validation runs.

After Spark Upgrades has successfully validated the changes, it presents an upgrade plan for you to review. You can then accept and apply the changes to your job in the development account, before replicating them to your job in the production account. The Spark Upgrade plan includes the following:

  • An upgrade summary with an explanation of code updates made during the process
  • The final script that you can use in place of your current script
  • Logs from validation runs showing how issues were identified and resolved

You can review all aspects of the upgrade, including intermediate validation attempts and any error resolutions, before deciding to apply the changes to your production job. This approach ensures you have full visibility into and control over the upgrade process while benefiting from AI-driven automation.

Get started with generative AI Spark upgrades

Let’s walk through the process of upgrading an AWS Glue 2.0 job to AWS Glue 4.0. Complete the following steps:

  1. On the AWS Glue console, choose ETL jobs in the navigation pane.
  2. Select your AWS Glue 2.0 job, and choose Run upgrade analysis with AI.
  3. For Result path, enter s3://aws-glue-assets-<account-id>-<region>/scripts/upgraded/ (provide your own account ID and AWS Region).
  4. Choose Run.
  5. On the Upgrade analysis tab, wait for the analysis to be completed.

    While an analysis is running, you can view the intermediate job analysis attempts (up to 10) for validation under the Runs tab. Additionally, the Upgraded summary in S3 documents the upgrades made by the Spark Upgrade service so far, refining the upgrade plan with each attempt. Each attempt will display a different failure reason, which the service tries to address in the subsequent attempt through code or configuration updates.
    After a successful analysis, the upgraded script and a summary of changes will be uploaded to Amazon Simple Storage Service (Amazon S3).
  6. Review the changes to make sure they meet your requirements, then choose Apply upgraded script.

Your job has now been successfully upgraded to AWS Glue version 4.0. You can check the Script tab to verify the updated script and the Job details tab to review the modified configuration.

Understanding the upgrade process through an example

We now show a production Glue 2.0 job that we would like to upgrade to Glue 4.0 using the Spark Upgrade feature. This Glue 2.0 job reads a dataset, updated daily in an S3 bucket under different partitions, containing new book reviews from an online marketplace and runs SparkSQL to gather insights into the user votes for the book reviews.

Original code (Glue 2.0) – before upgrade

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
from collections import Sequence
from pyspark.sql.types import DecimalType
from pyspark.sql.functions import lit, to_timestamp, col

def is_data_type_sequence(coming_dict):
    return True if isinstance(coming_dict, Sequence) else False

def dataframe_to_dict_list(df):
    return [row.asDict() for row in df.collect()]

books_input_path = (
    "s3://aws-bigdata-blog/generated_synthetic_reviews/data/product_category=Books/"
)
view_name = "books_temp_view"
static_date = "2010-01-01"
books_source_df = (
    spark.read.option("header", "true")
    .option("recursiveFileLookup", "true")
    .option("path", books_input_path)
    .parquet(books_input_path)
)
books_source_df.createOrReplaceTempView(view_name)
books_with_new_review_dates_df = spark.sql(
    f"""
        SELECT 
        {view_name}.*,
            DATE_ADD(to_date(review_date), "180.8") AS next_review_date,
            CASE 
                WHEN DATE_ADD(to_date(review_date), "365") < to_date('{static_date}') THEN 'Yes' 
                ELSE 'No' 
            END AS Actionable
        FROM {view_name}
    """
)
books_with_new_review_dates_df.createOrReplaceTempView(view_name)
aggregate_books_by_marketplace_df = spark.sql(
    f"SELECT marketplace, count({view_name}.*) as total_count, avg(star_rating) as average_star_ratings, avg(helpful_votes) as average_helpful_votes, avg(total_votes) as average_total_votes  FROM {view_name} group by marketplace"
)
aggregate_books_by_marketplace_df.show()
data = dataframe_to_dict_list(aggregate_books_by_marketplace_df)
if is_data_type_sequence(data):
    print("data is valid")
else:
    raise ValueError("Data is invalid")

aggregated_target_books_df = aggregate_books_by_marketplace_df.withColumn(
    "average_total_votes_decimal", col("average_total_votes").cast(DecimalType(3, -2))
)
aggregated_target_books_df.show()

New code (Glue 4.0) – after upgrade

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from collections.abc import Sequence
from pyspark.sql.types import DecimalType
from pyspark.sql.functions import lit, to_timestamp, col

sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
spark.conf.set("spark.sql.adaptive.enabled", "false")
spark.conf.set("spark.sql.legacy.allowStarWithSingleTableIdentifierInCount", "true")
spark.conf.set("spark.sql.legacy.allowNegativeScaleOfDecimal", "true")
job = Job(glueContext)

def is_data_type_sequence(coming_dict):
    return True if isinstance(coming_dict, Sequence) else False

def dataframe_to_dict_list(df):
    return [row.asDict() for row in df.collect()]

books_input_path = (
    "s3://aws-bigdata-blog/generated_synthetic_reviews/data/product_category=Books/"
)
view_name = "books_temp_view"
static_date = "2010-01-01"
books_source_df = (
    spark.read.option("header", "true")
    .option("recursiveFileLookup", "true")
    .load(books_input_path)
)
books_source_df.createOrReplaceTempView(view_name)
books_with_new_review_dates_df = spark.sql(
    f"""
        SELECT 
        {view_name}.*,
            DATE_ADD(to_date(review_date), 180) AS next_review_date,
            CASE 
                WHEN DATE_ADD(to_date(review_date), 365) < to_date('{static_date}') THEN 'Yes' 
                ELSE 'No' 
            END AS Actionable
        FROM {view_name}
    """
)
books_with_new_review_dates_df.createOrReplaceTempView(view_name)
aggregate_books_by_marketplace_df = spark.sql(
    f"SELECT marketplace, count({view_name}.*) as total_count, avg(star_rating) as average_star_ratings, avg(helpful_votes) as average_helpful_votes, avg(total_votes) as average_total_votes  FROM {view_name} group by marketplace"
)
aggregate_books_by_marketplace_df.show()
data = dataframe_to_dict_list(aggregate_books_by_marketplace_df)
if is_data_type_sequence(data):
    print("data is valid")
else:
    raise ValueError("Data is invalid")

aggregated_target_books_df = aggregate_books_by_marketplace_df.withColumn(
    "average_total_votes_decimal", col("average_total_votes").cast(DecimalType(3, -2))
)
aggregated_target_books_df.show()

Upgrade summary

In Spark 3.2, spark.sql.adaptive.enabled is enabled by default. To restore the behavior before Spark 3.2, 
you can set spark.sql.adaptive.enabled to false.

No suitable migration rule was found in the provided context for this specific error. The change was made based on the error message, which indicated that Sequence could not be imported from collections module. In Python 3.10, Sequence has been moved to the collections.abc module.

In Spark 3.1, path option cannot coexist when the following methods are called with path parameter(s): DataFrameReader.load(), DataFrameWriter.save(), DataStreamReader.load(), or DataStreamWriter.start(). In addition, paths option cannot coexist for DataFrameReader.load(). For example, spark.read.format(csv).option(path, /tmp).load(/tmp2) or spark.read.option(path, /tmp).csv(/tmp2) will throw org.apache.spark.sql.AnalysisException. In Spark version 3.0 and below, path option is overwritten if one path parameter is passed to above methods; path option is added to the overall paths if multiple path parameters are passed to DataFrameReader.load(). To restore the behavior before Spark 3.1, you can set spark.sql.legacy.pathOptionBehavior.enabled to true.

In Spark 3.0, the `date_add` and `date_sub` functions accepts only int, smallint, tinyint as the 2nd argument; fractional and non-literal strings are not valid anymore, for example: `date_add(cast('1964-05-23' as date), '12.34')` causes `AnalysisException`. Note that, string literals are still allowed, but Spark will throw `AnalysisException` if the string content is not a valid integer. In Spark version 2.4 and below, if the 2nd argument is fractional or string value, it is coerced to int value, and the result is a date value of `1964-06-04`.

In Spark 3.2, the usage of count(tblName.*) is blocked to avoid producing ambiguous results. Because count(*) and count(tblName.*) will output differently if there is any null values. To restore the behavior before Spark 3.2, you can set spark.sql.legacy.allowStarWithSingleTableIdentifierInCount to true.

In Spark 3.0, negative scale of decimal is not allowed by default, for example, data type of literal like 1E10BD is DecimalType(11, 0). In Spark version 2.4 and below, it was DecimalType(2, -9). To restore the behavior before Spark 3.0, you can set spark.sql.legacy.allowNegativeScaleOfDecimal to true.

As seen in the updated Glue 4.0 (Spark 3.3.0) script diff compared to the Glue 2.0 (Spark 2.4.3) script and the resulting upgrade summary, a total of six different code and configuration updates were applied across the six attempts of the Spark Upgrade Analysis.

  • Attempt #1 included a Spark SQL configuration (spark.sql.adaptive.enabled) to restore the application behavior as a new feature for Spark SQL adaptive query execution is introduced starting Spark 3.2. Users can inspect this configuration change and can further enable or disable it as per their preference.
  • Attempt #2 resolved a Python language change between Python 3.7 and 3.10 with the introduction of a new abstract base class (abc) under the Python collections module for importing Sequence.
  • Attempt #3 resolved an error encountered due to a change in behavior of DataFrame API starting Spark 3.1 where path option cannot exist with other DataFrameReader operations.
  • Attempt #4 resolved an error caused by a change in the Spark SQL function API signature for DATE_ADD which now only accepts integers as the second argument starting from Spark 3.0.
  • Attempt #5 resolved an error encountered due to the change in behavior Spark SQL function API for count(tblName.*) starting Spark 3.2. The behavior was restored with the introduction of a new Spark SQL configuration spark.sql.legacy.allowStarWithSingleTableIdentifierInCount
  • Attempt #6 successfully completed the analysis and ran the new script on Glue 4.0 without any new errors. The final attempt resolved an error encountered due to the prohibited use of negative scale for cast(DecimalType(3, -6) in Spark DataFrame API starting Spark 3.0. The issue was addressed by enabling the new Spark SQL configuration spark.sql.legacy.allowNegativeScaleOfDecimal.

Important considerations for preview

As you begin using automated Spark upgrades during the preview period, there are several important aspects to consider for optimal usage of the service:

  • Service scope and limitations – The preview release focuses on PySpark code upgrades from AWS Glue versions 2.0 to version 4.0. At the time of writing, the service handles PySpark code that doesn’t rely on additional library dependencies. You can run automated upgrades for up to 10 jobs concurrently in an AWS account, allowing you to efficiently modernize multiple jobs while maintaining system stability.
  • Optimizing costs during the upgrade process – Because the service uses generative AI to validate the upgrade plan through multiple iterations, with each iteration running as an AWS Glue job in your account, it’s essential to optimize the validation job run configurations for cost-efficiency. To achieve this, we recommend specifying a run configuration when starting an upgrade analysis as follows:
    • Using non-production developer accounts and selecting sample mock datasets that represent your production data but are smaller in size for validation with Spark Upgrades.
    • Using right-sized compute resources, such as G.1X workers, and selecting an appropriate number of workers for processing your sample data.
    • Enabling Glue auto scaling when applicable to automatically adjust resources based on workload.

    For example, if your production job processes terabytes of data with 20 G.2X workers, you might configure the upgrade job to process a few gigabytes of representative data with 2 G.2X workers and auto scaling enabled for validation.

  • Preview best practices – During the preview period, we strongly recommend starting your upgrade journey with non-production jobs. This approach allows you to familiarize yourself with the upgrade workflow, and understand how the service handles different types of Spark code patterns.

Your experience and feedback are crucial in helping us enhance and improve this feature. We encourage you to share your insights, suggestions, and any challenges you encounter through AWS Support or your account team. This feedback will help us improve the service and add capabilities that matter most to you during preview.

Conclusion

This post demonstrates how automated Spark upgrades can assist with migrating your Spark applications in AWS Glue. It simplifies the migration process by using generative AI to automatically identify the necessary script changes across different Spark versions.

To learn more about this feature in AWS Glue, see Generative AI upgrades for Apache Spark in AWS Glue.

A special thanks to everyone who contributed to the launch of generative AI upgrades for Apache Spark in AWS Glue: Shuai Zhang, Mukul Prasad, Liyuan Lin, Rishabh Nair, Raghavendhar Thiruvoipadi Vidyasagar, Tina Shao, Chris Kha, Neha Poonia, Xiaoxi Liu, Japson Jeyasekaran, Suthan Phillips, Raja Jaya Chandra Mannem, Yu-Ting Su, Neil Jonkers, Boyko Radulov, Sujatha Rudra, Mohammad Sabeel, Mingmei Yang, Matt Su, Daniel Greenberg, Charlie Sim, McCall Petier, Adam Rohrscheib, Andrew King, Ranu Shah, Aleksei Ivanov, Bernie Wang, Karthik Seshadri, Sriram Ramarathnam, Asterios Katsifodimos, Brody Bowman, Sunny Konoplev, Bijay Bisht, Saroj Yadav, Carlos Orozco, Nitin Bahadur, Kinshuk Pahare, Santosh Chandrachood, and William Vambenepe.


About the Authors

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He is responsible for building software artifacts to help customers. In his spare time, he enjoys cycling with his new road bike.

Keerthi Chadalavada is a Senior Software Development Engineer at AWS Glue, focusing on combining generative AI and data integration technologies to design and build comprehensive solutions for customers’ data and analytics needs.

Shubham Mehta is a Senior Product Manager at AWS Analytics. He leads generative AI feature development across services such as AWS Glue, Amazon EMR, and Amazon MWAA, using AI/ML to simplify and enhance the experience of data practitioners building data applications on AWS.

Pradeep Patel is a Software Development Manager on the AWS Glue team. He is passionate about helping customers solve their problems by using the power of the AWS Cloud to deliver highly scalable and robust solutions. In his spare time, he loves to hike and play with web applications.

Chuhan LiuChuhan Liu is a Software Engineer at AWS Glue. He is passionate about building scalable distributed systems for big data processing, analytics, and management. He is also keen on using generative AI technologies to provide brand-new experience to customers. In his spare time, he likes sports and enjoys playing tennis.

Vaibhav Naik is a software engineer at AWS Glue, passionate about building robust, scalable solutions to tackle complex customer problems. With a keen interest in generative AI, he likes to explore innovative ways to develop enterprise-level solutions that harness the power of cutting-edge AI technologies.

Mohit Saxena is a Senior Software Development Manager on the AWS Glue and Amazon EMR team. His team focuses on building distributed systems to enable customers with simple-to-use interfaces and AI-driven capabilities to efficiently transform petabytes of data across data lakes on Amazon S3, and databases and data warehouses on the cloud.