Pub/Sub Streaming Source Best Practices

In CDAP 6.3 and earlier, the CDAP Pub/Sub streaming source plugin used Apache Bahir receiver to fetch messages from Google Cloud Pub/Sub. The Apache Bahir receiver doesn't support Spark’s built-in Backpressure (rate limiting) mechanism. When the rate of messages written to the Pub/Sub topic is high, this may overwhelm the Spark application and cause the application to become unstable and crash.

CDAP 6.4.0 introduced a new Pub/Sub streaming source (version 0.17.0) with support for Spark’s Backpressure mechanism. It is strongly recommended that you upgrade to CDAP 6.4.0 to use realtime pipelines. By enabling Checkpointing along with Backpressure, the pipeline reliability is greatly improved.

Backpressure allows the Apache Spark streaming engine to control the receiving rate based on the current batch scheduling delays and processing times so that the system receives only as fast as the system can process.

To enable Checkpointing and Backpressure, follow these steps:

  1. In Pipeline Studio, click Configure > Pipeline Config.

  2. Set Checkpointing to On and enter the Checkpoint directory:

  3. Click Engine Config and set Backpressure to On:

Configuring the number of Receiver workers

To increase overall system throughput, you can increase the number of receiver workers that are executed in parallel by using the Number of Readers option in the Pub/Sub source.

Note: Each reader needs one executor in the cluster, so you must edit the Number of Executors setting in the Configure > Engine Config section of the Pipeline Studio.

Configuring Write-Ahead Logs for at-least-once guarantees

To ensure no messages are lost in case of a worker failure, you can enable Spark’s Write-Ahead log functionality. When used in conjunction with Checkpointing, this ensures every message that is read from the Pub/Sub source is stored safely in case one of a worker failure.

To enable Write-Ahead logs, follow these steps:

  1. In Pipeline Studio, click Configure > Engine Config.

  2. Click Show Custom config.

  3. Enter the following properties:

spark.streaming.receiver.writeAheadLog.enable = true spark.streaming.receiver.writeAheadLog.closeFileAfterWrite = true spark.streaming.driver.writeAheadLog.closeFileAfterWrite = true spark.streaming.receiver.writeAheadLog.rollingIntervalSecs = 0 spark.streaming.driver.writeAheadLog.rollingIntervalSecs = 0 

Note: Enabling Write-Ahead logs decreases overall reader throughput. You may elect to run additional receiver workers in order to maintain throughput.

Configuring Backpressure (Message Rates)

To ensure that the pipeline meets a minimum message throughput (messages/second), you can configure the minimum message rate for Spark’s backpres

sure mechanism by adding the following property to the Engine Config:

spark.streaming.backpressure.pid.minRate = <value> 

Note: This is a per-reader setting, which means each reader will try to maintain this throughput as the target performance level.

The default value for this setting is 100.

 

Created in 2020 by Google Inc.