Cluster Sizing

Master

Master nodes use resources proportional to the number of pipelines or additional applications that are running on the cluster. If you are running pipelines on ephemeral clusters, we suggest using 2 CPU and 8gb memory for the master nodes. You will often not need larger master nodes than this because there will not be concurrent workloads running on the cluster and there will not be much data stored on the cluster.

If you are using persistent clusters, you may need larger master nodes to keep up with the workflow. However, many users will not need to go above 4cpu 15gb for the master nodes. We suggest monitoring memory and CPU usage on the node to understand whether you need to create larger master nodes.

Workers

CPU and Memory

We recommend sizing your worker nodes with at least 2cpu and 8gb memory. You will need to use larger workers if you have configured your pipelines to use larger amounts of memory. For example, with a 4cpu 15gb worker node, each worker will have 4cpu and 12gb available to run YARN containers. If your pipeline is configured to run 1cpu, 8gb executors, YARN will be unable to run more than one container per worker node. Each worker node will then have an extra 3cpu and 4gb that is wasted because it can't be used to run anything. To maximize resource utilization on your cluster, you will want the YARN memory and CPUs to be an exact multiple of the amount needed per Spark executor. You can check how much memory each worker has reserved for YARN by checking the yarn.nodemanager.resource.memory-mb property in YARN.

If you are using Dataproc, the memory available for YARN containers will be roughly 75% of the VM memory. The minimum YARN container size is also adjusted depending on the size of the worker VMs. Some common worker sizes and their corresponding YARN settings are given in the table below.

Worker CPU

Worker memory (gb)

YARN node memory (gb)

Yarn min allocation memory (mb)

Worker CPU

Worker memory (gb)

YARN node memory (gb)

Yarn min allocation memory (mb)

1

4

3

256

2

8

6

512

4

16

12

1024

8

32

24

1024

16

64

51

1024

Keep in mind that Spark will request for more memory than the executor memory set for the pipeline, and that YARN will round that requested amount up. For example, suppose you have set your executor memory to 2048mb, and have not given a value for spark.yarn.executor.memoryOverhead, which means the default of 384mb is used. That means Spark will request 2048mb + 384mb for each executor, which YARN will round up to an exact multiple of the YARN minimum allocation. When running on a 8gb worker node, because the YARN minimum allocation is 512mb, it will get rounded up to 2.5gb. This means each worker will be able to run two containers, using up all of the available CPUs, but leaving 1gb of YARN memory (6gb - 2.5gb - 2.5gb) unused. This means the worker node can actually be sized a little smaller, or the executors can be given a little bit more memory. When running on a 16gb worker node, 2048mb + 1024mb will get rounded up to 3gb because the YARN minimum allocation is 1024mb. This means each worker node is able to run four containers, with all CPUs and YARN memory in use.

This can be rather confusing to put together, so below is a table of recommended worker sizes given some common executor sizes.

Executor CPU

Executor Memory (mb)

Worker CPU

Worker Memory (gb)

Executor CPU

Executor Memory (mb)

Worker CPU

Worker Memory (gb)

1

2048

4

15

1

3072

4

21

1

4096

4

26

2

8192

4

26

For example, a 26gb worker node translates to 20gb of memory usable for running YARN containers. With executor memory set to 4gb, 1gb is added as overhead, which means 5gb YARN containers for each executor. This means the worker can run four containers without any extra resources leftover. You can also multiply the size of the workers. For example, if executor memory is set to 4096gb, a worker with 8 cpus and 52gb memory would also work well.

GCE VMs restrict how much memory the VM can have based on the number of cores. For example, a VM with 4 cores must have at least 7.25gb of memory and at most 26gb of memory. This means an executor set to use 1 cpu and 8gb of memory will end up using 2 cpus and 26gb of memory on the VM. If executors are instead configured to use 2 cpus and 8gb of memory, all of the CPUs will be utilized. 

Disk

Disk is important for some pipelines but not all of them. If your pipeline does not contain any shuffles, disk will only be used when Spark runs out of memory and needs to spill data to disk. For these types of pipelines, disk size and type are generally not going to make a big impact on your performance.

If your pipeline is shuffling a lot of data, disk performance will make a difference. If you are using Dataproc, it is recommended that you use disk sizes of at least 1tb, as disk performance scales up with disk size. For information about disk performance, see https://cloud.google.com/compute/docs/disks/performance. To give an idea of the performance difference, we executed a pipeline joining 500gb of data on clusters with different disk sizes, and therefore different disk speeds.

Disk Size (gb)

Total Time

Source + Shuffle Write Time

Shuffle Read + Join Time

Disk Size (gb)

Total Time

Source + Shuffle Write Time

Shuffle Read + Join Time

256

119 min

78 min

37 min

512

63 min

45 min

16 min

1000

52 min

42 min

8 min

2048

45 min

35 min

8 min

The 256gb disks take around twice as much time as the 512gb disks. The 1000gb disks also perform significantly better than the 512gb disks, but the gains start to level off after that. This is because the disks are fast enough to no longer be the major bottleneck for the join.

For heavy shuffle pipelines, you may also experience a performance boost using SSDs instead of a HDD disk, though you will have to make a decision on whether the extra cost is worth it. To give an idea of the performance difference, we executed a pipeline joining 500gb of data on clusters with SSD and without SSD. Each cluster was a 10 worker cluster, using n1-standard-4 machines (4cpu, 15gb).

Local SSDs

Total Time

Source +Shuffle Write Time

Shuffle Read + Join Time

Local SSDs

Total Time

Source +Shuffle Write Time

Shuffle Read + Join Time

0

52 min

42 min

8 min

4 (one per cpu)

35 min

26 min

7 min

Number of Workers

In order to minimize execution time, you will want to ensure that your cluster is large enough that it can run as much as it can in parallel. For example, if your pipeline source reads data using 100 splits, you will want to make sure the cluster is large enough to run 100 executors at once.

The easiest way to tell if your cluster is undersized is by looking at the YARN pending memory over time. If you are using Dataproc, a graph can be found on the cluster detail page.

If pending memory is high for long periods of time, you can increase the number of workers to add that much extra capacity to your cluster. In the example above, the cluster should be increased by around 28gb to ensure that the maximum level of parallelism is achieved.

One question people sometimes have is whether it is better to scale up the size of each worker, or whether it is better to add additional worker machines. For example, is it better to have 50 nodes with 16gb memory each, or is it better to have 100 nodes with 8gb memory each? The answer is that it normally does not matter much. The following table shows execution times for a pipeline that performs an evenly distributed 500gb join on a few different clusters. The overall cluster CPU, memory, and disk is the same. The only difference is the number of workers.

Worker Nodes

Worker CPU

Worker Memory

Worker Disk

Total Time

Source +Shuffle Write Time

Shuffle Read + Join Time

Worker Nodes

Worker CPU

Worker Memory

Worker Disk

Total Time

Source +Shuffle Write Time

Shuffle Read + Join Time

5

8

30gb

4000gb

42 min

33 min

8 min

10

4

15gb

2048gb

45 min

35 min

8 min

2 0

2

7.5gb

1000gb

44 min

34 min

7.5 min

 Each cluster has totals of 40 cpu, 120gb memory, and 20tb of disk. Each cluster executed the pipeline in roughly the same amount of time. Cost is roughly the same as well, as pricing is generally linear to the amount of cores, memory, and disk in use. For more information, see https://cloud.google.com/compute/vm-instance-pricing.

Autoscale

If you are running on Dataproc clusters, you can enable Dataproc autoscaling to automatically size your cluster depending on the workload. See https://cloud.google.com/dataproc/docs/concepts/configuring-clusters/autoscaling for more information about Dataproc autoscaling.

Autoscale involves dividing a cluster into two types of worker nodes -- primary and secondary. Primary nodes run both HDFS (storage) and YARN (computation). Secondary nodes only run YARN (computation). Primary nodes should not be allowed to autoscale because it causes problems with HDFS. Only secondary nodes should be allowed to scale.

Scaling Down

It is simplest to use an autoscaling policy that allows the cluster to scale up, but never to scale down. Scaling down in the middle of a job will often cause Spark tasks to fail, resulting in pipeline delays or outright failure. A policy that never scales down matches well with the ephemeral nature of the Dataproc provisioner, which creates a cluster for each pipeline run and tears it down after the run. When possible, it is advised to use this type of policy.

If you are using a static cluster, you may want an autoscaling policy that does scale down to reduce costs during periods of low activity. Scale down policies are more complicated to configure, as you will want to set the cooldown duration to a large enough value to avoid scaling down in the middle of a pipeline run. Alternatively, you can use Dataproc's Enhanced Flexibility Mode to make scaling down a safer operation.

Enhanced Flexibility Mode (EFM)

EFM allows you to specify that only primary worker nodes be involved when shuffling data. Since secondary workers are no longer responsible for intermediate shuffle data, when they are removed from a cluster, Spark jobs will not run into delays or errors. Since primary workers are never scaled down, this makes cluster scale down more stable and efficient. If you are running pipelines with shuffles on a static cluster, it is recommended that you use EFM.

See https://cloud.google.com/dataproc/docs/concepts/configuring-clusters/flex for more information on EFM.

Created in 2020 by Google Inc.