AWS Big Data Blog

Best practices for resizing and automatic scaling in Amazon EMR

September 2024: This post was reviewed and updated for accuracy.

You can increase your savings by taking advantage of the dynamic scaling feature set available in Amazon EMR. The ability to scale the number of nodes in your cluster up and down on the fly is among the major features that make Amazon EMR elastic. You can take advantage of scaling in EMR by resizing your cluster down when you have little or no workload. You can also scale your cluster up to add processing power when the job gets too slow. This allows you to spend just enough to cover the cost of your job and little more.

Knowing the complex logic behind this feature can help you take advantage of it to save on cluster costs. In this post, we detail how EMR clusters resize, and we present some best practices for getting the maximum benefits and resulting cost savings for your own cluster through this feature.

EMR scaling is more complex than simply adding or removing nodes from the cluster. One common misconception is that scaling in Amazon EMR works exactly like Amazon EC2 scaling. With EC2 scaling, you can add/remove nodes almost instantly and without considering any dependent constraints, but EMR has more complexity to it, especially when scaling a cluster down. This is because important data or jobs could be running on your nodes which has to be rebalanced across the nodes in the cluster. This circumstance poses a significant challenge for core nodes to seamlessly scale their operations up or down without encountering any complications.

The superiority of Managed Scaling over Custom Auto Scaling

For most use cases, Managed Scaling approach offered by Amazon EMR is the preferred choice over Custom Auto Scaling, as it provides a more streamlined and efficient solution for the dynamic cluster resource management. Moreover, EMR managed scaling supports on instance fleets, and its scaling configuration is straightforward as the managed scaling algorithm takes care of efficient scaling operations with evaluating various metrics at frequent interval of 5 to 10 seconds. The scaling configurations applies to entire cluster if we have multiple instance groups/fleet whereas custom auto scaling limits to configure for each instance group.

EMR Managed scaling also keeps track of Spark Executors and the location of intermediate shuffle data which often called Spark shuffle awareness. During scaling down operation, managed scaling makes sure to decommission the under-utilized node that do not have actively used shuffle data, which bypasses the heavy re-computation of shuffle data and enhance the job performance. For more information, see Managed Scaling in details and best practices in managed scaling.

The Managed Scaling feature is specifically designed to cater to YARN-based applications like Spark, Hive, Flink, and others of their ilk. However, if you’re running applications that fall outside the YARN ecosystem, such as Presto or similar workloads that require dynamic scaling capabilities, then Custom Auto Scaling is the viable alternative in EMR. This approach provides you to customize various scaling policy configurations such as evaluation period, cool-down intervals and the precise number of nodes to be provisioned/deprovisioned. However, it’s important to note that this level of control comes at the expense of increased complexity, as you will need to configure the appropriate policies to align the scaling behavior with your specific requirements and the scaling effort & efficiency may not match to the managed scaling.

Custom auto scaling process

EMR clusters can use two types of nodes for Hadoop operations: core nodes and task nodes. Core nodes host persistent data by running the HDFS Datanode process and run Hadoop tasks through YARN’s resource manager with help of YARN’s node manager. Task nodes only run Hadoop tasks through YARN(node manager) and DO NOT store data in HDFS.

Scale-up

Upon the initiation of scale-up operation, the modify-instance-group API is invoked, and the EMR proceeds to create an EC2 instance. If the bootstrap action script is enabled within the cluster configuration, it is executed on the newly created instance. Subsequently, the configurations of the existing EC2 instances are replicated on the new instance.

Once the EC2 instance setup is completed, the Hadoop applications such as HDFS, YARN and other applications specified in the cluster configuration are installed on the new instance. In the event that the scale-up operation involves task nodes, the provisioning and deprovisioning processes are expedited, as data node configuration and rebalancing, are not applicable. Once the installation completed, the new instance comes under the management of instance controller daemon.

Scale-down

A scale-down operation is typically not as simple as just terminating EC2 instances, but careful coordination with primary instance and management of resources & applications such as HDFS and YARN sync ups are often required. To prevent data loss, Amazon EMR scaling ensures that your node has no running Apache Hadoop tasks or unique data that could be lost before removing your node. It is worth considering this decommissioning delay when resizing your EMR cluster. By understanding and accounting for how this process works, you can avoid issues that have plagued others, such as slow cluster resizes and inefficient automatic scaling policies.

When an EMR cluster is scaled down, two different decommission processes are triggered on the nodes that will be terminated. The first process is the decommissioning of Hadoop YARN, which is the Hadoop resource manager. Hadoop tasks that are submitted to Amazon EMR generally run through YARN, so EMR must ensure that any running YARN tasks are complete before removing the node. If for some reason the YARN task is stuck, there is a configurable timeout to ensure that the decommissioning still finishes. When this timeout happens, the YARN task is terminated and is instead rescheduled to a different node so that the task can finish. However, the overall job execution time may increase due to this delay in decommissioning.

The second decommission process is that of the Hadoop Distributed File System or HDFS. HDFS stores data in blocks that are spread through the EMR cluster on all core nodes that are running HDFS. When an HDFS node is decommissioning, it must replicate those data blocks to other HDFS nodes so that they are not lost when the node is terminated.

Strategies for effective resizing

The following are some issues to consider when resizing your clusters using custom auto scaling feature. Generally, scale up operation is typically faster especially task nodes compared to scale down the core nodes.

When scaling down task nodes on a running cluster, expect a short delay for any running Hadoop task on the cluster to decommission. This allows you to get the best usage of your task node by not losing task progress through interruption. However, if your job allows for this interruption, you can adjust the one-hour default timeout on the resize by adjusting the yarn.resourcemanager.nodemanager-graceful-decommission-timeout-secs property (later in EMR 5.14) in yarn-site.xml. When this process times out, your task node is shut down regardless of any running tasks. This process is usually relatively quick, which makes it fast to scale down task nodes.

When you’re scaling down core nodes, Amazon EMR must also wait for HDFS to decommission to protect your data. HDFS can take a relatively long time to decommission. This is because HDFS block replication is throttled by design through configurations located in hdfs-site.xml. This in turn means that HDFS decommissioning is throttled. This protects your cluster from a spiked workload if a node goes down, but it slows down decommissioning. When scaling down a large number of core nodes, consider adjusting these configurations beforehand so that you can scale down more quickly.

For example, consider this exercise with HDFS and resizing speed.

The HDFS configurations, located in hdfs-site.xml, have some of the most significant impact on throttling block replication:

  • datanode.balance.bandwidthPerSec: Bandwidth for each node’s replication. This is a maximum amount of bandwidth used by each data node. So, consider increasing gradually as this will be used when replication is going on.
  • namenode.replication.max-streams: Max streams running for block replication
  • namenode.replication.max-streams-hard-limit: Hard limit on max streams
  • datanode.balance.max.concurrent.moves: Number of threads used by the block balancer for pending moves
  • namenode.replication.work.multiplier.per.iteration: Used to determine the number of blocks to begin transfers immediately during each replication interval. The actual number is obtained by multiplying this multiplier value with total number of live data nodes in the cluster.
  • dfs.datanode.max.xcievers: Defines maximum number of threads to use for transferring data in and out of data node. Consider increasing it when dealing larger HDFS blocks in write-heavy applications.
  • dfs.datanode.handler.count: Used to define the number of threads for the data node. If the data is not read very often, we do not need higher number of parallel threads, which may pressure the IO traffic requests. Default is 64.
  • dfs.replication: The actual number of replications can be specified when the file is created. Default is chosen based on the number of core nodes in EMR. Adjust this value depends on your requirement.

(Beware when modifying: Changing these configurations improperly, especially on a cluster with high load, can seriously degrade cluster performance.)

Cluster resizing speed exercise

Modifying these configurations can speed up the decommissioning time significantly. Try the following exercise to see this difference for yourself. I have created two tests as first one with latest EMR version(EMR 7.1.0) and second test with EMR 5.x stable version which is 5.36.2 to find the difference in timings. The only additional step that need to be included in EMR 5.36.2 test is “step 5” which will configure hdfs-site.xml with modified properties.

  1. Create an EMR cluster with the following hardware configuration in both EMR 7.1.0 and EMR 5.36.2.
    • Master: 1 node – m3.xlarge
    • Core: 6 nodes – m3.xlarge
    • EC2 key pair: your accessible ec2 key pair to connect primary instance if SSM connectivity is not available
  1. Connect to the master node of your cluster using SSH (Secure Shell)/Session Manager method once the cluster is in waiting state.

For more information, see Connect to the Master Node Using SSH in the Amazon EMR documentation.

  1. Load data into HDFS by using the following two jobs:
    • The below command will generate the dummy HDFS data.
      $ hadoop jar /usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar teragen 1000000000 /user/hadoop/data1/
    • The below S3-dist-cp command will copy the small files from AWS public S3 bucket to your HDFS location. This simulation is to involve additional overhead with small file issue.
      $ s3-dist-cp --src s3://aws-bigdata-blog/artifacts/ClusterResize/smallfiles25k/ --dest  hdfs:///user/hadoop/data2/

If you are running this test from a different region than us-east-1, then please follow below S3-dist-cp which includes the s3 endpoint flag.

$ s3-dist-cp --src s3://aws-bigdata-blog/artifacts/ClusterResize/smallfiles25k/ --dest hdfs:///user/hadoop/data2/ --s3Endpoint https://s3.us-east-1.amazonaws.com

Once the above two jobs completed, you can see two directories in the HDFS location.

[hadoop@ip-172-31-39-181 ~]$ hdfs dfs -ls /user/hadoop/
Found 2 items
drwxr-xr-x   - hadoop hdfsadmingroup          0 2024-05-31 11:16 /user/hadoop/data1
drwxr-xr-x   - hadoop hdfsadmingroup          0 2024-05-31 11:57 /user/hadoop/data2

We can also see how much HDFS disk utilized with dfsadmin report command,

[hadoop@ip-172-31-39-181 ~]$ hdfs dfsadmin -report
Configured Capacity: 452533469184 (421.45 GB)
Present Capacity: 443491758080 (413.03 GB)
DFS Remaining: 241504776192 (224.92 GB)
DFS Used: 201986981888 (188.12 GB)
DFS Used%: 45.54%
Replicated Blocks:
	Under replicated blocks: 0
	Blocks with corrupt replicas: 0
	Missing blocks: 0
	Missing blocks (with replication factor 1): 0
	Low redundancy blocks with highest priority to recover: 0
	Pending deletion blocks: 0
Erasure Coded Block Groups: 
	Low redundancy block groups: 0
	Block groups with corrupt internal blocks: 0
	Missing block groups: 0
	Low redundancy blocks with highest priority to recover: 0
	Pending deletion blocks: 0

-------------------------------------------------
  1. Resize your EMR clusters from six to five core nodes, and look at the EMR events tab to see how long the resize took. In my case, it took 4 minutes to scale down the node in the core node group in EMR 7.1.0.

Same scale down operation in EMR 5.36.2, took 11 minutes to complete.

  1. Edit your hdfs-site.xml configs ONLY in the attempt of EMR 5.36.2 test.
    $ sudo vim /etc/hadoop/conf/hdfs-site.xml

Then include/edit in the following configuration setup in the hdfs-site properties.

Disclaimer: These values are relatively high for example purposes and should not necessarily be used in production. Be sure to test config values for production clusters under load before modifying them.

<property>
  <name>dfs.datanode.balance.bandwidthPerSec</name>
  <value>100m</value>
</property>

<property>
  <name>dfs.namenode.replication.max-streams</name>
  <value>100</value>
</property>

<property>
  <name>dfs.namenode.replication.max-streams-hard-limit</name>
  <value>200</value>
</property>

<property>
  <name>dfs.datanode.balance.max.concurrent.moves</name>
  <value>500</value>
</property>

<property>
  <name>dfs.namenode.replication.work.multiplier.per.iteration</name>
  <value>30</value>
</property>

After completing the above config changes, restart the HDFS-name-node daemon to take the effect.

$ sudo systemctl restart hadoop-hdfs-namenode.service
$ sudo systemctl status hadoop-hdfs-namenode.service
  1. Now, resize the EMR 5.36.2 cluster from five to four nodes and observe the time duration for the scale down operation in EMR console Events tab. In my case, it took 6 mins to complete.

Basically,  I observed the resizing time lower from 11+ minutes (without config changes) down to about 6 minutes (with modified hdfs-site configs) in EMR 5.36.2. However, the latest version EMR 7.1.0 took only 4 mins to complete since the above HDFS configurations are already enabled except datanode.balance related properties. This exercise demonstrates how much HDFS is throttled under default configurations on specific scenario. Although removing these throttles is dangerous and performance using them should be tested first, they can significantly speed up decommissioning time and therefore resizing.

The following are some additional tips for resizing clusters:

  • Shrink resizing timeouts. You can configure EMR nodes in two ways: instance groups or instance fleets. For more information, see Create a Cluster with Instance Fleets or Uniform Instance Groups. EMR has implemented shrink resize timeouts when nodes are configured in instance fleets. This timeout prevents an instance fleet from attempting to resize forever if something goes wrong during the resize. The minimum provisioning timeout is 5 and maximum timeout is 7 days, so keep it in mind when you are resizing an instance fleet down.

If an instance fleet shrink request takes longer than one day, it finishes and pauses at however many instances are currently running. On the other hand, instance groups have no default shrink resize timeout. However, both types have the one-hour YARN timeout described earlier in the yarn.resourcemanager.nodemanager-graceful-decommission-timeout-secs property in yarn-site.xml.

  • Watch out for high frequency HDFS writes when resizing core nodes. If HDFS is receiving a lot of writes, it will modify a large number of blocks that require replication. This replication can interfere with the block replication from any decommissioning core nodes and significantly slow down the resizing process. So, before scale-down the core node, please consider avoiding heavy write operations in HDFS.
  • Constant core nodes with extensive scaling of task nodes. It is recommended to maintain a constant ratio of core nodes in the cluster. Core nodes should be dedicated to storing HDFS data, while task nodes can be scaled up or down as needed. Scaling task nodes is a faster process for provisioning and decommissioning resources, as it does configure only YARN as no overhead associated like core node scaling.
  • Avoid over disk utilization. It also advised to keep the Yarn/HDFS nodes healthy. Though this might not directly relate with scaling, it is recommended to monitor the disk utilization of EC2 instances and scale the additional EBS volumes manually or dynamically with bootstrap action script to avoid the core/task nodes to get blacklisted which will exclude the node to schedule new tasks.

Setting up policies for automatic scaling

Although manual scaling is useful, a majority of the time cluster resizes are executed dynamically through Amazon EMR managed scaling(cost optimized) and Amazon EMR automatic scaling for non-YARN based applications such as Presto and so on. Generally, the details of the automatic scaling policy must be tailored to the specific Hadoop job, so I won’t go into detail with specific use-case. Instead, I provide some general guidelines for setting up your cluster’s auto scaling policies.

The following are some considerations when setting up your auto scaling policy.

Metrics for scaling

Choose the right metrics for your node types to trigger scaling. For example, scaling core nodes solely on the YARNMemoryAvailablePercentage metric doesn’t make sense. This is because you would be increasing/decreasing HDFS total size when really you only need more processing power. Scaling task nodes on HDFSUtilization also doesn’t make sense because you would want more HDFS storage space that does not come with task nodes. A common automatic scaling metric for core nodes is HDFSUtilization. Common auto scaling metrics for task nodes include ContainerPendingRatio and YarnMemoryAvailablePercentage.

You use the example template in scale out and scale-in operations and customize your own templates based on various metrics available in scaling policy.

It’s important to note that the evaluation period is set to a minimum of one five-minute interval and the cooldown period between scaling activities measured in seconds, will be determined by the individual scaling policy based on the specific requirements.

Note: Keep in mind that Amazon EMR currently requires HDFS, so you must have at least one core node in your cluster. Core nodes can also provide CPU and memory resources. But if you don’t need to scale HDFS, and you just need more CPU or memory resources for your job, we recommend that you use task nodes for that purpose.

Scaling core nodes

As described earlier, one of the two EMR node types in your cluster is the core node. Core nodes run HDFS, so they have a longer decommissioning delay. This means that they are slow to scale and should not be aggressively scaled. Only adding and removing a few core nodes at a time will help you avoid scaling issues. Unless you need the HDFS storage, scaling task nodes is usually a better option. If you find that you have to scale large numbers of core nodes, consider changing hdfs-site.xml configurations to allow faster decommission time and faster scale down. As mentioned, it is recommended to keep to the core nodes constant and scale the task nodes as required.

Scaling task nodes

Task nodes don’t run HDFS, which makes them perfect for aggressively scaling with a dynamic job. When your Hadoop task has spikes of work between periods of downtime, this is the node type that you want to use.

You can set up task nodes with a very aggressive auto scaling policy, and they can be scaled up or down easily. If you don’t need HDFS space, you can use more task nodes in your cluster with one minimum core node.

Using Spot Instances

You can leverage Spot Instance types in EMR Automatic scaling for workloads that can retrigger from the checkpoints with handling the spot interruptions. The tendency of Spot Instances to disappear and reappear makes them perfect for task nodes. Because these task nodes are already used to scale in and out aggressively, Spot Instances can have very little disadvantage in terms of spot interruptions. However, for time-sensitive Hadoop tasks, On-Demand Instances might be prioritized for the guaranteed availability.

Scale-in vs. scale-out policies for core nodes

Don’t fall into the trap of making your scale-in policy the exact opposite of your scale-out policy, especially for core nodes. Many times, scaling in results in additional delays for decommissioning. Take this into account and allow your scale-in policy to be more forgiving than your scale-out policy. This means longer cooldowns and higher metric requirements to trigger resizes.

You can think of scale-out policies as easily triggered with a low cooldown and small node increments. Scale-in policies should be hard to trigger, with larger cooldowns and node increments. Please analyze the different metrics provided by EMR custom scaling feature to determine most suitable ones for defining the individual scaling policy.

Minimum nodes for core node auto scaling

One last thing to consider when scaling core nodes is the yarn.node-labels.enabled and yarn.node-labels.am.default-node-label-expression property located in yarn-site.xml. In Amazon EMR 5.19 and later 5.x versions, yarn.node-labels.enabled is set to true and  yarn.node-labels.am.default-node-label-expression is set to CORE by default, which means that the application master always runs on core nodes and not task nodes. This is to help and prevent application failure in a scenario where Spot Instances are used for the task nodes.

However, in Amazon EMR 6.x and later versions, these properties are disabled by default. This means that when setting a minimum number of core nodes in EMR 5.x versions, you should choose a number that is greater than or at least equal to the number of simultaneous application masters that you plan to have running on your cluster. If you want the application master to also run on task nodes, you turn off this property to that include both CORE and TASK.

On the other hand, if you want the application master to run specifically only on core nodes in EMR 6.x version later, you need to explicitly enable them. As a best practice, do not configure the yarn.node-labels.enabled property to include TASK if Spot Instances are used for task nodes.

Aggregating data using S3DistCp

Before wrapping up this post, I have one last piece of information to share about cluster resizing. When resizing core nodes, you might notice that HDFS decommissioning takes a very long time. Often this is the result of storing many small files in your cluster’s HDFS. Having many small files within HDFS (files smaller than the HDFS block size of 128 MB) adds lots of metadata overhead and can cause slowdowns in both decommissioning and Hadoop tasks.

Keeping your small files to a minimum by aggregating your data can help your cluster and jobs run more smoothly. For information about how to aggregate files, see the post Seven Tips for Using S3DistCp on Amazon EMR to Move Data Efficiently Between HDFS and Amazon S3.

Summary

In this post, you read about how Amazon EMR resizing logic works to protect your data and Hadoop tasks. We detailed the efficient scaling techniques offered by EMR, including cost optimized EMR Managed Scaling. I also highlighted some additional considerations for EMR resizing in Custom Automatic Scaling. Incorporating these practices can enable you maximize cost savings by ensuring that only the necessary cluster resources utilized.

Do you have follow-up queries, please leave a comment. We would love to hear your thoughts, suggestions and look forward your valuable feedback.


Additional Reading

If you found this post useful, be sure to check out Seven Tips for Using S3DistCp on Amazon EMR to Move Data Efficiently Between HDFS and Amazon S3 and Dynamically Scale Applications on Amazon EMR with Auto Scaling.


About the Authors

Brandon Scheller is a software development engineer for Amazon EMR. His passion lies in developing and advancing the applications of the Hadoop ecosystem and working with the open source community. He enjoys mountaineering in the Cascades with his free time.

Yokesh N K is a Subject Matter Expert for Amazon EMR at AWS, focused on Apache Spark, Apache Hive and analytical services like AWS Glue and AWS Redshift and so on, where he provides architectural support to various customers for running complex big data workloads over AWS platform. In idle hours, he spends time answering the technical questions in AWS:Repost and writing community articles.


Audit History

Last reviewed and updated in September 2024 by Yokesh N K | SME