Streaming Data Pipelines Best Practices

This guide provides suggestions on how to configure streaming pipelines for better throughput.

Pipeline Properties

Streaming pipelines are similar to batch pipelines that run every batch interval. The guide (link) outlines how to fine-tune batch data pipelines to achieve a greater level of parallelism which in turn will help with better throughput. Much of that guide will also apply to streaming pipelines. Properties specific to streaming pipelines are described below.

Executors

Use the guide (link) for recommended values for executor CPU and memory that can be set using the Resources tab. The number of executors can also be set. If the source and sink system can handle it, linearly scaling the number of executors will also linearly scale up the throughput. In our experiments, we saw that linearly scaling the executors for a pipeline (DataStreamGenerator -> Wrangler -> Kafka) increased the throughput until a point and then started decreasing because the Kafka Producer sink couldn’t handle the load.

Dynamic Allocation

Dynamic allocation is a Spark feature that allows it to dynamically remove and create executors depending on the workload. While this normally works well for batch pipelines, it can often result in a large performance hit for streaming pipelines. From experience, we have found that Spark will often use too few executors, and might distribute work to them in an uneven fashion. 

For example, instead of using 9 executors to process 9 Kafka partitions in parallel (one partition per executor), it might decide to use 6 executors, with some processing two partitions and some processing one. A scenario like this effectively halves the throughput of the pipeline, as the executors responsible for two partitions will take twice as long to complete.

Spark might also decide to tear down and spin up new executors for each batch interval, adding to the overhead of each interval, further harming throughput, especially in pipelines that use a short interval.  For example, it can take around 30 seconds to create a new executor. If the interval is 60 seconds, that overhead alone cuts the throughput in half. 

For this reason, it is recommended to turn off dynamic allocation and manually set the number of executors when you know what the parallelism should be. For example, pipelines that use the Kafka source will want to set the number of executors to be equal to the number of topic partitions that are being read.  To disable dynamic allocation, set the property spark.dynamicAllocation.enabled to false in Engine config, and set spark.executor.instances to the desired number.

Dataproc Cluster

Use the guide (link), to create a Dataproc cluster of the correct size for your pipeline. It is suggested to create the Dataproc cluster in the same region as the system it reads or writes from. For example, if your Kafka cluster is in the us-east1 region, and your pipeline writes data into a Kafka Producer sink, then create the Dataproc cluster in the us-east1 region. In our experiments, we noticed a huge performance improvement of about 24 times in the throughput of a pipeline with Kafka Producer sink run on a Dataproc cluster in a different region vs in the same region.

Batch interval 

Batch interval should be set at an optimum value such that the various overheads do not take up a significant amount of time within the interval. There is overhead associated with starting and stopping of the pipeline internally. For example, when using dynamic allocation, containers are spun up and torn down in each interval. As another example, both the GCS and BigQuery sinks will write data to GCS, where closing each file takes roughly 3 seconds, regardless of how much data is in the file. When using those sinks, a 10 second interval would be too short, as at least 30% of the time would be spent in closing the file. Instead, a 1 minute interval would be more appropriate. 

Kafka setup

There are several ways to optimize your Kafka setup. There are some great articles that discuss the process. Please refer to some of them here (link, link). We experimented with some important properties of Kafka that might help improve the performance. Please find the details of the experiments below.

Sizing the Kafka cluster

Brokers. Kafka performance scales linearly with the number of brokers. Add more brokers and a corresponding number of partitions to increase the throughput. We tested by increasing the number of brokers and corresponding partitions and saw that the performance increased by about 10% by increasing brokers from 3 to 5 and by 30% when increased from 5 to 7. The number of partitions in each case was kept at 3x times the number of brokers.

Partitions. The number of partitions defines the amount of parallelism in the brokers. So, in general, the greater the number of partitions, the greater the throughput. But, having too many partitions might have a negative effect. In our experiments, we have determined that the number of partitions should be 3x the number of brokers. Having too many partitions was slowing down the pipeline significantly. 

Another thing to consider is that you can only have as many consumers reading from the topic as the number of partitions. If you anticipate more consumers in the future, add more topic partitions. In our experiments, we noted that increasing the number of partitions helped increase throughput till a point and then it starts decreasing. For our experiment, we used a real-time pipeline (DataStreamGenerator -> Wrangler -> Kafka) with a Kafka cluster having 3 brokers. As can be seen from the chart below, throughput increased till we hit 9 partitions and then started decreasing.

 

 

 

Configure the Kafka Producer sink (Ref.)

Once the Kafka cluster size has been finalized, following properties can be set on the Kafka producer in CDAP to fine-tune performance.

async. In previous versions of CDAP, Kafka Producer sink was using a synchronous send function that blocks the call until the record is sent to the topic and an acknowledgement is received. This has now been changed to use an asynchronous send function by default which makes the method return immediately after storing the record in the buffer. This has improved the performance of the Kafka Producer sink by 2.5 times. In our experiments, for a pipeline with DataStreamGenerator -> Wrangler -> Kafka and Kafka server setup with 3 brokers and 9 topic partitions, the pipeline was able to write 236 million records/hour as compared to 82 million records/hour previously. Please note that this feature is now default and no property needs to be set.

linger.ms. Kafka follows a batching strategy to batch messages going to the same partition. It waits for the amount of time configured via linger.ms between two batches. The recommended value for this property is 10-100ms. In our experiments, the best throughput was achieved at 100ms. So, we set the default value of linger.ms to 100ms for Kafka producers in CDAP.

batch.size. Kafka follows a batching strategy to batch messages going to the same partition. The property batch.size is the size of the memory that will be used to hold this batch. Once it reaches the configured value, the Kafka producer will send the messages to the broker. The recommended value for this property is 100K to 200K. The default value is 16,384. Please note that increasing this value might increase the latency because the producer might wait more time to collect data.

compression.type. Setting this property to one of the supported compression types lz4/snappy/zstd/gzip will compress the bytes before being sent by the producer. This might help increase the throughput since fewer bytes are now sent from the producer to the broker. In our experiments, setting this property did not help because the overhead of applying the compression algorithm was more than the time saved due to sending fewer bytes over the network.

acks. By default, the Kafka producer waits for the leader broker to send an acknowledgment back before sending the next message. The lead broker in turn waits for all the other brokers to send an acknowledgment, before sending the acknowledgment to the producer. This reduces the throughput if the producer is waiting too long for acknowledgments. To save some time here, set the property value to 1. This means that the leader broker will not wait for acknowledgments from other brokers before replying to the producer. But the downside to this is if the leader fails immediately after replying to the producer and before the other brokers had a chance to replicate it, the record will be lost.

Configure Kafka Source (Ref.)

Kafka source also has various properties that can be experimented with to find the best possible values for your set up. One such property is mentioned below.

Max Rate Per Partition. This property sets the maximum rate at which the records are read from each partition. The default value is 1000. If left changed, the number of records that are read from each partition and processed will be limited. To get the maximum throughput, change it to 0.

Created in 2020 by Google Inc.