Consolidating multiple pipeline branches into single operations (Spark)

Starting in CDF 6.1.4, you can set spark.cdap.pipeline.consolidate.stages to true to consolidate multiple pipeline branches into single operations in pipelines that use Spark as the execution engine. One part of the new approach is to consolidate multiple sinks together so that a single Spark job is run instead of multiple jobs, which means the source is read one time instead of one time for each branch. This can improve performance in pipelines because it avoids reprocessing.

If you set spark.cdap.pipeline.consolidate.stages to true, sinks are consolidated if they share a common parent and there aren’t any analytics plugins (joins, aggregates, distinct, etc) before the sink. For example, the following pipeline has a single source with two branches that write to different sinks, and it doesn’t include any analytics plugins. Therefore, when you run this pipeline, one Spark job reads from the source, executes the transformation logic in each branch, and writes to each sink.

In previous releases, when you ran a pipeline that had multiple branches and didn’t have analytics plugins before the sinks, a Spark job was created that read from all sources required, executed all transforms, and finally wrote to the sinks. When there were multiple sinks, there were multiple Spark jobs launched, which caused reprocessing.

How does it work?

Let’s look at the following example where stages that start with 'S' are sources, stages that start with 'K' are sinks, stages that start with 'T' are transforms, and stages that start with 'A' are aggregators.

Example Pipeline

 

There is only one multi-output stage (S1). Each branch is identified and marked as a candidate or as uncombinable. Red square boxes denote uncombinable branches while rounded yellow boxes denote candidate branches.

The branch with the red square box is uncombinable because it has an analytic transform before the sink. None of the candidate branches overlap with any other branch, so the candidates are combined to end up with:

Setting preferences and runtime arguments

You can turn this feature on by setting a preference or runtime argument for spark.cdap.pipeline.consolidate.stages to true. Set the preference at the system level to enable this feature for all pipelines. Set it at the pipeline level if you only want it to apply to specific pipelines. It’s recommended to try this setting on a few pipelines before setting the preference at the system level.

FAQs

Should I only use this feature at the pipeline level rather than the system or
namespace level?

We recommend trying it out on a few pipelines first before setting it at the system or namespace level.

What is the impact on the cluster?

None unless you need to increase the executor memory. If the executor memory is increased, the cluster memory as needs to increase.

When does this feature adversely impact performance?

The feature doesn't make pipelines run slower, but it will increase memory requirements if a bunch of branches get combined. In our tests, we ran with 100 GB of data per branch, with 1 KB records. Combining 4 branches doubled the memory needed (from 1 GB to 2 GB) and combining 16 branches tripled the memory needed (from 1 GB to 3 GB).



 

Created in 2020 by Google Inc.