AWS Big Data Blog
Introducing the Cloud Shuffle Storage Plugin for Apache Spark
AWS Glue is a serverless data integration service that makes it easy to discover, prepare, and combine data for analytics, machine learning (ML), and application development. In AWS Glue, you can use Apache Spark, an open-source, distributed processing system for your data integration tasks and big data workloads.
Apache Spark utilizes in-memory caching and optimized query execution for fast analytic queries against your datasets, which are split into multiple Spark partitions on different nodes so that you can process a large amount of data in parallel. In Apache Spark, shuffling happens when data needs to be redistributed across the cluster. During a shuffle, data is written to local disk and transferred across the network. The shuffle operation is often constrained by the available local disk capacity, or data skew, which can cause straggling executors. Spark often throws a No space left on device
or MetadataFetchFailedException
error when there isn’t enough disk space left on the executor and there is no recovery. Such Spark jobs can’t typically succeed without adding additional compute and attached storage, wherein compute is often idle, and results in additional cost.
In 2021, we launched Amazon S3 shuffle for AWS Glue 2.0 with Spark 2.4. This feature disaggregated Spark compute and shuffle storage by utilizing Amazon Simple Storage Service (Amazon S3) to store Spark shuffle files. Using Amazon S3 for Spark shuffle storage enabled you to run data-intensive workloads more reliably. After the launch, we continued investing in this area, and collected customer feedback.
Today, we’re pleased to release Cloud Shuffle Storage Plugin for Apache Spark. It supports the latest Apache Spark 3.x distribution so you can take advantage of the plugin in AWS Glue or any other Spark environments. It’s now also natively available to use in AWS Glue Spark jobs on AWS Glue 3.0 and the latest AWS Glue version 4.0 without requiring any extra setup or bringing external libraries. Like the Amazon S3 shuffle for AWS Glue 2.0, the Cloud Shuffle Storage Plugin helps you solve constrained disk space errors during shuffle in serverless Spark environments.
We’re also excited to announce the release of software binaries for the Cloud Shuffle Storage Plugin for Apache Spark under the Apache 2.0 license. You can download the binaries and run them on any Spark environment. The new plugin is open-cloud, comes with out-of-the box support for Amazon S3, and can be easily configured to use other forms of cloud storage such as Google Cloud Storage and Microsoft Azure Blob Storage.
Understanding a shuffle operation in Apache Spark
In Apache Spark, there are two types of transformations:
- Narrow transformation – This includes
map
,filter
,union
, andmapPartition
, where each input partition contributes to only one output partition. - Wide transformation – This includes
join
,groupBykey
,reduceByKey
, andrepartition
, where each input partition contributes to many output partitions. Spark SQL queries includingJOIN
,ORDER BY
,GROUP BY
require wide transformations.
A wide transformation triggers a shuffle, which occurs whenever data is reorganized into new partitions with each key assigned to one of them. During a shuffle phase, all Spark map tasks write shuffle data to a local disk that is then transferred across the network and fetched by Spark reduce tasks. The volume of data shuffled is visible in the Spark UI. When shuffle writes take up more space than the local available disk capacity, it causes a No space left on device
error.
To illustrate one of the typical scenarios, let’s use the query q80.sql from the standard TPC-DS 3 TB dataset as an example. This query attempts to calculate the total sales, returns, and eventual profit realized during a specific time frame. It involves multiple wide transformations (shuffles) caused by left outer join
and group by
.
Let’s run the following query on AWS Glue 3.0 job with 10 G1.X workers where a total of 640GB of local disk space is available:
The following screenshot shows the Executor tab in the Spark UI.
The following screenshot shows the status of Spark jobs included in the AWS Glue job run.
In the failed Spark job (job ID=7), we can see the failed Spark stage in the Spark UI.
There was 167.8GiB shuffle write during the stage, and 14 tasks failed due to the error java.io.IOException: No space left on device
because the host 172.34.97.212 ran out of local disk.
Cloud Shuffle Storage for Apache Spark
Cloud Shuffle Storage for Apache Spark allows you to store Spark shuffle files on Amazon S3 or other cloud storage services. This gives complete elasticity to Spark jobs, thereby allowing you to run your most data intensive workloads reliably. The following figure illustrates how Spark map tasks write the shuffle files to the Cloud Shuffle Storage. Reducer tasks consider the shuffle blocks as remote blocks and read them from the same shuffle storage.
This architecture enables your serverless Spark jobs to use Amazon S3 without the overhead of running, operating, and maintaining additional storage or compute nodes.
The following Glue job parameters enable and tune Spark to use S3 buckets for storing shuffle data. You can also enable at-rest encryption when writing shuffle data to Amazon S3 by using security configuration settings.
Key | Value | Explanation |
---|---|---|
--write-shuffle-files-to-s3 |
TRUE |
This is the main flag, which tells Spark to use S3 buckets for writing and reading shuffle data. |
--conf |
spark.shuffle.storage.path=s3://<shuffle-bucket> |
This is optional, and specifies the S3 bucket where the plugin writes the shuffle files. By default, we use –TempDir/shuffle-data. |
The shuffle files are written to the location and create files such as following:
s3://<shuffle-storage-path>/<Spark application ID>/[0-9]/<Shuffle ID>/shuffle_<Shuffle ID>_<Mapper ID>_0.data
With the Cloud Shuffle Storage plugin enabled and using the same AWS Glue job setup, the TPC-DS query now succeeded without any job or stage failures.
Software binaries for the Cloud Shuffle Storage Plugin
You can now also download and use the plugin in your own Spark environments and with other cloud storage services. The plugin binaries are available for use under the Apache 2.0 license.
Bundle the plugin with your Spark applications
You can bundle the plugin with your Spark applications by adding it as a dependency in your Maven pom.xml as you develop your Spark applications, as shown in the follwoing code. For more details on the plugin and Spark versions, refer to Plugin versions.
You can alternatively download the binaries from AWS Glue Maven artifacts directly and include them in your Spark application as follows:
Submit the Spark application by including the JAR files on the classpath and specifying the two Spark configs for the plugin:
The following Spark parameters enable and configure Spark to use an external storage URI such as Amazon S3 for storing shuffle files; the URI protocol determines which storage system to use.
Key | Value | Explanation |
---|---|---|
spark.shuffle.storage.path |
s3://<shuffle-storage-path> |
It specifies an URI where the shuffle files are stored, which much be a valid Hadoop FileSystem and be configured as needed |
spark.shuffle.sort.io.plugin.class |
com.amazonaws.spark.shuffle.io.cloud.ChopperPlugin |
The entry class in the plugin |
Other cloud storage integration
This plugin comes with out-of-the box support for Amazon S3 and can also be configured to use other forms of cloud storage such as Google Cloud Storage and Microsoft Azure Blob Storage. To enable other Hadoop FileSystem compatible cloud storage services, you can simply add a storage URI for the corresponding service scheme, such as gs://
for Google Cloud Storage instead of s3://
for Amazon S3, add the FileSystem JAR files for the service, and set the appropriate authentication configurations.
For more information about how to integrate the plugin with Google Cloud Storage and Microsoft Azure Blob Storage, refer to Using AWS Glue Cloud Shuffle Plugin for Apache Spark with Other Cloud Storage Services.
Best practices and considerations
Note the following considerations:
- This feature replaces local shuffle storage with Amazon S3. You can use it to address common failures with price/performance benefits for your serverless analytics jobs and pipelines. We recommend enabling this feature when you want to ensure reliable runs of your data-intensive workloads that create a large amount of shuffle data or when you’re getting
No space left on device
error. You can also use this plugin if your job encounters fetch failuresorg.apache.spark.shuffle.MetadataFetchFailedException
or if your data is skewed. - We recommend setting S3 bucket lifecycle policies on the shuffle bucket (
spark.shuffle.storage.s3.path
) in order to clean up old shuffle data automatically. - The shuffle data on Amazon S3 is encrypted by default. You can also encrypt the data with your own AWS Key Management Service (AWS KMS) keys.
Conclusion
This post introduced the new Cloud Shuffle Storage Plugin for Apache Spark and described its benefits to independently scale storage in your Spark jobs without adding additional workers. With this plugin, you can expect jobs processing terabytes of data to run much more reliably.
The plugin is available in AWS Glue 3.0 and 4.0 Spark jobs in all AWS Glue supported Regions. We’re also releasing the plugin’s software binaries under the Apache 2.0 license. You can use the plugin in AWS Glue or other Spark environments. We look forward to hearing your feedback.
About the Authors
Noritaka Sekiyama s a Principal Big Data Architect on the AWS Glue team. He is responsible for building software artifacts that help customers build data lakes on the cloud.
Rajendra Gujja is a Senior Software Development Engineer on the AWS Glue team. He is passionate about distributed computing and everything and anything about the data.
Chuhan Liu is a Software Development Engineer on the AWS Glue team.
Gonzalo Herreros is a Senior Big Data Architect on the AWS Glue team.
Mohit Saxena is a Senior Software Development Manager on the AWS Glue team. His team focuses on building distributed systems to enable customers with data integration and connectivity to a variety of sources, efficiently manage data lakes on Amazon S3, and optimizes Apache Spark for fault-tolerance with ETL workloads.