AWS Big Data Blog
Optimize Spark-Streaming to Efficiently Process Amazon Kinesis Streams
Rahul Bhartia is a Solutions Architect with AWS
Martin Schade, a Solutions Architect with AWS, also contributed to this post.
Do you use real-time analytics on AWS to quickly extract value from large volumes of data streams? For example, have you built a recommendation engine on clickstream data to personalize content suggestions in real time using Amazon Kinesis Streams and Apache Spark? These frameworks make it easy to implement real-time analytics, but understanding how they work together helps you optimize performance. In this post, I explain some ways to tune Spark Streaming for the best performance and the right semantics.
Amazon Kinesis integration with Apache Spark is via Spark Streaming. Spark Streaming is an extension of the core Spark framework that enables scalable, high-throughput, fault-tolerant stream processing of data streams such as Amazon Kinesis Streams. Spark Streaming provides a high-level abstraction called a Discretized Stream or DStream, which represents a continuous sequence of RDDs. Spark Streaming uses the Amazon Kinesis Client Library (KCL) to consume data from an Amazon Kinesis stream. The KCL takes care of many of the complex tasks associated with distributed computing, such as load balancing, failure recovery, and check-pointing. For more information, see the Spark Streaming + Kinesis Integration guide.
Spark Streaming receivers and KCL workers
Think of Spark Streaming as two main components:
- Fetching data from the streaming sources into DStreams
- Processing data in these DStreams as batches
Every input DStream is associated with a receiver, and in this case also with a KCL worker. The best way to understand this is to refer to the method createStream defined in the KinesisUtils Scala class.
Every call to KinesisUtils.createStream instantiates a Spark Streaming receiver and a KCL worker process on a Spark executor. The first time a KCL worker is created, it connects to the Amazon Kinesis stream and instantiates a record processor for every shard that it manages. For every subsequent call, a new KCL worker is created and the record processors are re-balanced among all available KCL workers. The KCL workers pull data from the shards, and routes them to the receiver, which in turns stores them into the associated DStream.
To understand how these extensions interface with the core Spark framework, we will need an environment that illustrates some of these concepts.
Executors, cores, and tasks
To create the environment, use an Amazon Kinesis stream named “KinesisStream” with 10 shards capable of ingesting 10 MBps or 10,000 TPS. For writing records into KinesisStream, use the AdvancedKPLClickEventsToKinesis class described in the Implementing Efficient and Reliable Producers with the Amazon Kinesis Producer Library blog post. Each event here is a line from the web access log, which averaged 140 KB in size. You can see from the screenshot below that this producer is capable of streaming over 10,000 records per second into KinesisStream by taking advantage of the PutRecords API operation.
To process the data using Spark Streaming, create an Amazon EMR cluster in the same AWS region using three m3.xlarge EC2 instances (one core and two workers). On the EMR cluster, the following default values are defined in the Spark configuration file (spark-defaults.conf):
…
spark.executor.instances 2
spark.executor.cores 4
…
This essentially means that each Spark application has two executors (an executor here is one container on YARN) with four cores each, and can run a maximum of eight tasks concurrently.
Note: By default, YARN with EMR is configured to use DefaultCapacityScheduler, which doesn’t take vCores as a factor into scheduling containers; To use vCores as an additional factor, you can switch the scheduler to use DominantResourceCalculator.
In the context of your stream and cluster, this is how Spark Streaming might look like after the first invocation of KinesisUtils.createStream.
Calling KinesisUtils.createStream a second time rebalances the record processors between the two instances of KCL workers, each running on a separate executor as shown below.
Throughput
In the code snippet below from the Spark Streaming example, you can see that it’s creating as many DStreams (in turn, KCL workers) as there are shards.
// Create the Amazon Kinesis DStreams val kinesisStreams = (0 until numStreams).map { i => KinesisUtils.createStream(ssc, appName, streamName, endpointUrl, regionName, InitialPositionInStream.LATEST, kinesisCheckpointInterval, StorageLevel.MEMORY_AND_DISK_2) }
If you run a simple record count application using the logic above, you’ll see that the streaming application starts but doesn’t produce anything till you terminate it.
To see why, look at the details of the test environment. The Spark default configuration with Amazon EMR could run eight concurrent tasks; you have created 10 receiver tasks, as the stream has 10 shards. Therefore, only eight receivers are active, with the others tasks being scheduled but never executed. A quick look at the application UI confirms this.
As you can observe in the screenshot, there are only eight active receivers out of 10 created, and processing time is undefined, as there are no slots left for other tasks to run. Re-run the line count application but this time use the number of executors (2) for the number of receivers instead of shards.
Now both your receivers are active with no scheduling delay and an average processing time of 111 milliseconds. In fact, running the same application with a single executor and a single receiver came out with the lowest delay of 93 ms, as displayed below. Spark Streaming was able to comfortably manage all KCL workers plus a count task on a single executor with four vCores.
For the processing time, the best way to find this is to put your application to the test, as the processing which you can do in the given batch interval is a function of your application logic, and how many stages/tasks Spark would create.
As demonstrated above, at minimum you need to ensure that every executor has enough vCores left to process the data received in the defined batch interval. When the application is running, you can use the Spark Streaming interface (or look for “Total delay” in the Spark driver log4j logs) to verify whether the system is able to keep up with the data rate, by using the total delay metrics. A system could be considered optimized if the total delay is comparable to the batch interval for the streaming context; otherwise, if the total delay is great than the batch interval, then your application will not be able to keep up with the incoming velocity.
Another parameter that you can consider is block interval, determined by the configuration parameter spark.streaming.blockInterval. The number of tasks per receiver per batch is approximately batch interval / block interval. For example, a block interval of 1 sec creates five tasks for a 5-second batch interval. If the number of tasks is too high, this leads to a scheduling delay. If it is too low, it will be inefficient because not all available cores will be used to process the data.
Note: Using the KCL, the interval at which the receiver reads data from a stream is by default set at 1 second (or 1000 millisecond). This lets you have multiple consumer applications processing a stream concurrently without hitting the Amazon Kinesis stream limits of five GetRecords calls per shards per second. There are currently no parameters exposed by the Spark Streaming framework to change this.
Dynamic resource allocation
A Spark cluster can also be configured to make use of “dynamic resource allocation”, which allows for a variable number of executors to be allocated to an application over time. A good use case is workloads with variable throughput. Using dynamic resource allocation, the number of executors and therefore the number of resources allocated to process a stream can be modified and matched to the actual workload.
For more information, see the Submitting User Applications with spark-submit blog post.
Note: Dynamic resource allocation currently only works with processing and not with receivers. While the KCL automatically balances workers among the receivers to keep up with changes for a given shard, you need to shut down your application and re-start it if you need to create more receivers.
Failure recovery
Now that you know more about the core integration of the KCL with Spark Streaming, I’ll move the focus from performance to reliability.
Checkpoint using the KCL
When you create or invoke the KinesisUtils.createStream method, in addition to the details about the stream—such as the name and AWS region—there are three other parameters:
- Initial position – Location where your application starts reading in the stream: from the oldest record available (InitialPositionInStream.TRIM_HORIZON) or the latest record (InitialPostitionInStream.LATEST).
- Application name – Name of the DynamoDB table where information about checkpoints is stored.
- Checkpoint interval – Interval at which the KCL workers save their position in the stream. In other words, at every duration defined by checkpoint interval, the last sequence number read from all the shards are written to a DynamoDB table. You can view the DynamoDB table by navigating to the DynamoDB console and looking at the items in the table, you will see records for every shard and its associated information.
These parameters determine the failure recovery and checkpointing semantics as supported by the KCL.
Note: Sequence numbers are used as the checkpoints for every shard, and they are assigned after you write to the stream; for the same partition key, they generally increase over time.
How does a KCL application use this information for recovery from failure? Simply by looking up a DynamoDB table with the name provided by the parameter. If the DynamoDB table exists, then the KCL application starts reading from the sequence numbers stored in the DynamoDB table. For more information, see Kinesis Checkpointing.
Here’s an example of how this works. Stream records into an Amazon Kinesis stream with monotonically increasing numbers, so it becomes easy to demonstrate recovery. Your Spark streaming application is calling print on the DStream to print out the first 10 records.
Start your application and randomly terminate it using control-c, after it prints a batch:
Now, start the application again using spark-shell, and within a few seconds, you see it start printing the contents of your stream again.
You can see how the second run of the application re-printed the exact same records from the last set printed just before terminating the previous run. This recovery is implemented using the DynamoDB table, and helps recover from failure of KCL-based processors.
The KCL (and Spark Streaming, by extension) creates the DynamoDB table with a provisioned throughput of 10 reads per second and 10 writes per second. If you’re processing many shards or checkpointing often, you might need more throughput. At a minimum, you need throughput as defined by the ratio number of shards/KCL checkpoint interval in seconds. For more information, see Provisioned Throughput in Amazon DynamoDB and Working with Tables in the Amazon DynamoDB Developer Guide.
Checkpoints using Spark Streaming
What if failure happens after KCL checkpoints, for instance, if a driver fails? On recovery, you lose these records because the application starts processing from the record after the sequence numbers currently stored in DynamoDB.
To help recover from such failures, Spark Streaming also supports a checkpoint-based mechanism. This implementation does two things:
- Metadata checkpoints – Stores the state of the Spark driver and metadata about unallocated blocks.
- Processing checkpoints – Stores the state of DStream as it is processed through the chain, including beginning intermediate and ending states.
Note: Prior to Spark 1.5, you could also use write ahead logs (WAL) which also saved the received data to fault-tolerant storage. As of Spark 1.5, this has been replaced by checkpoints, which can use the source to recover instead of storing data.
For more information, see Checkpointing.
Here’s another example. This time, keep the batch processing around 2 seconds with both the KCL checkpoint and block interval at 1 second. Also, enable the Spark-based checkpoint using the following outline:
/* Create a streaming context */ def functionToCreateContext(): StreamingContext = { val ssc = new StreamingContext(sc, Seconds(2) ) ssc.checkpoint(checkpointDirectory) val kinesisStreams = KinesisUtils.createStream(ssc, appName, streamName, endpointUrl,awsRegion,InitialPositionInStream.LATEST,Seconds(1),StorageLevel.MEMORY_ONLY) /* Do the processing */ ... ssc } /* First, recover the context; otherwise, create a new context */ val ssc = StreamingContext.getOrCreate(checkpointDirectory,functionToCreateContext _ ) ssc.start() ssc.awaitTermination()
After your application starts, fail the driver by killing the process associated with spark-shell on the master node.
Now re-start your application without checkpoint recovery enabled. Before you start your application, you need to copy the content of the DynamoDB table, so that you can use it to re-run the application with recovery enabled.
As you noticed above, your application didn’t print the record 166 and directly jumped to the record 167. Here, as you are checkpointing in KCL twice per batch interval, you have a 50% probability of missing one block’s worth of data for every failure.
Now re-run the application, but this time use the checkpoint for recovery. Before you start your application, you need to restore the content of the DynamoDB table to ensure your application uses the same state.
This time you didn’t miss any records and your record-set of 163-165 gets printed again, with the same time. There is also another batch, with records 166-167, that was recovered from the checkpoint, after which your normal process starts from record 167 using the KCL checkpoint.
You might have noticed that both the KCL and Spark checkpoints help to recover from failures, which could lead to records being replayed again when using Spark Streaming with Amazon Kinesis, implying at-least-once semantics. To learm more about the failure scenarios with Spark Streaming, see Fault-tolerance Semantics. To avoid potential side effects, ensure that the downstream processing has idempotent or transactional semantics.
Summary
Below are guidelines for processing Amazon Kinesis streams using Spark Streaming:
- Ensure that the number of Amazon Kinesis receivers created are a multiple of executors so that they are load balanced evenly across all the executors.
- Ensure that the total processing time is less than the batch interval.
- Use the number of executors and number of cores per executor parameters to optimize parallelism and use the available resources efficiently.
- Be aware that Spark Streaming uses the default of 1 sec with KCL to read data from Amazon Kinesis.
- For reliable at-least-once semantics, enable the Spark-based checkpoints and ensure that your processing is idempotent (recommended) or you have transactional semantics around your processing.
- Ensure that you’re using Spark version 1.6 or later with the EMRFS consistent view option, when using Amazon S3 as the storage for Spark checkpoints.
- Ensure that there is only one instance of the application running with Spark Streaming, and that multiple applications are not using the same DynamoDB table (via the KCL).
I hope that this post helps you identify potential bottlenecks when using Amazon Kinesis with Spark Streaming, and helps you apply optimizations to leverage your computing resources effectively. Want to do more? Launch an EMR cluster and use Amazon Kinesis out of the box to get started.
Happy streaming!
If you have questions or suggestions, please leave a comment below.
—————————-
Related
Querying Amazon Kinesis Streams Directly with SQL and Spark Streaming
Looking to learn more about Big Data or Streaming Data? Check out our Big Data and Streaming data educational pages.