Pipeline Structure

Pipelines present a logical view of your business logic. A single pipeline will break down into one or more Spark jobs. Each job reads data, processes it, and then writes out the data. One common misconception is that each pipeline stage will get processed completely before moving on to the next one. In reality, multiple pipeline stages will get grouped together into jobs based on the structure of the pipeline. More specifically, shuffles and sinks determine how the pipeline will get broken down.

Branches

Pipeline stages are grouped into Spark jobs by starting from a sink and tracing backwards to a source or a shuffle stage. This means that a pipeline with multiple branches will result in multiple Spark jobs. For example, consider the following pipeline with two branches.

The first Spark job consists of the DataGenerator, Wrangler, and Trash stages. You can tell this is the case because the metrics for the first stages will update and complete before the next stages start.

In this example, the metrics for DataGenerator, Wrangler, and Trash grow to the expected output records while Wrangler2 and Trash2 stay at zero. The second Spark job consists of the DataGenerator, Wrangler2, and Trash2 stages. This second job is run after the first job completes.

Caching

In the branch example above, even though the DataGenerator source stage is part of two Spark jobs, the number of output records stays at 1 million instead of 2 million. This means the source is not processed twice. This is achieved using Spark caching. As the first job runs, it caches the output of the DataGenerator stage so that it can be used during the second job. This is important in most pipelines because it prevents multiple reads from the source. This is especially true if it is possible to read different data from the source at different times. For example, suppose a pipeline is reading from a database table that is constantly being updated. If both Spark jobs were to read from the table, they would both read different data because they would be launched at different times. Caching ensures that only a single read is done so that all branches see the same data. Additionally, it is desirable for certain sources like the BigQuery source, where there is a monetary cost associated with each read.

Note that Spark performs caching as part of a Spark job. In the example above, it is performed as part of the first job and then used in the second. There is not an initial job that reads the data and stops at the cache point. Cached data is kept for the lifetime of a single pipeline run. It is not persisted across runs.

Data does not always need to be explicitly cached. When a pipeline contains a data shuffle, Spark will keep the intermediate shuffle data around and read from that point on in other jobs when possible. For example, if the example pipeline is modified to include a Distinct plugin before branching, the pipeline does not need to cache the output of the Distinct plugin in order to prevent the source from being recomputed.

The first job would include the DataGenerator, Distinct, Wrangler, and Trash stages. 

The second would read the intermediate shuffle data used in the first job, execute the post-shuffle part of the Distinct, and execute the Wrangler2 and Trash2 stages.

In CDAP 6.1.3 and earlier, data is cached aggressively to try and prevent any type of reprocessing in the pipeline. Starting from CDAP 6.1.4, data is cached at fewer points, only to prevent sources from being read multiple times. This change was done because caching is often more expensive than re-computing a transformation. In the example above, this means 6.1.3 would cache the output of the Distinct plugin, while 6.1.4 would not, since Spark itself knows it can start from the intermediate shuffle data. However, it does mean that the metrics for the number of output records from the Distinct stage will be double the number of records processed on each branch, as the post-shuffle work is still computed twice.

Cache Levels

Spark supports several different persistence levels when caching data. You can set the cache level for a pipeline by setting spark.cdap.pipeline.caching.storage.level in the Engine config section of the pipeline.

 

The full list of possible cache levels can be found at https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence, but we will focus on MEMORY_ONLY, MEMORY_AND_DISK, and DISK_ONLY.  

The MEMORY_ONLY cache level is the fastest because it avoids I/O. However, this requires the most knowledge about your dataset, as you will need to set executor memory high enough to hold all of the data in memory. If Spark runs out of memory, it will kick some part of the data out of memory, which will result in Spark re-computing data. As discussed above, this is often undesirable as it can cause sources to be read multiple times, potentially resulting in different data. In practice, we have found that Spark will sometimes fail to kick out enough cached data and will instead fail with out of memory exceptions.

The MEMORY_AND_DISK level tells Spark to try to cache data in memory first. If it is running low on memory, it will spill cached data to disk and read it from there. In theory, this gives all the benefits of in-memory caching and will also avoid the re-computation problem. In practice, we have found that large Spark jobs will often run out of memory and fail when using this cache level.

The DISK_ONLY level tells Spark to cache data directly to disk instead of trying to keep anything in memory. This is desirable when you know the cached data is much larger than what can fit in memory and will need to be written to disk. We have also found this to be the most reliable cache level, as it will never cause out of memory exceptions.

In CDAP 6.1.3 and earlier, the default cache level is MEMORY_AND_DISK. In CDAP 6.1.4 and later, the default cache level is DISK_ONLY. This change was made because Spark pipelines can run out of memory and fail when using the MEMORY_AND_DISK level. 

Manual Caching

Pipelines will automatically determine which points in the pipeline need to be cached. In some more advanced use cases, it can be desirable to turn this off and manually set cache points. For example, you may be using a transform that makes an API call to an external service, with cost associated with each API call. You may want to manually cache the output of that stage in order to prevent the stage from being reprocessed and making additional API calls. This can be done by disabling auto caching and manually placing the Data Cacher plugin at points in the pipeline. The Data Cacher plugin can be found in the Analytics section in CDAP 6.1.4 and later.

Automatic caching can be disabled by setting the spark.cdap.pipeline.autocache.enable property to false in the Engine config.

Stage Consolidation

In certain circumstances, it is possible to consolidate several logical branches of a pipeline into a single physical branch. This reduces the number of Spark jobs and caches that are needed, resulting in better execution times. In our experiments, this has led to up to a 4x improvement in pipeline execution times. 

Consider a simple pipeline with two branches.

Without stage consolidation, this pipeline is executed using two Spark jobs, with data from the source cached to prevent multiple source reads. When stage consolidation is enabled, the Wrangler, Wrangler2, Trash, and Trash2 stages are consolidated into a single physical block, which allows the pipeline to be executed with just a single job. In effect, it is like there is only one sink in the pipeline instead of two.

Stage consolidation can only be done if there are no shuffles being performed on the branch. Most plugins in the Analytics section (Joiner, Group By, Distinct, Deduplicate) require a shuffle. If the pipeline is modified to contain a Distinct plugin after Wrangler2, the branches cannot be consolidated.

Stage consolidation is a performance boost in almost all situations. However, it does require a little more executor memory if a lot of branches are being consolidated. We have conducted experiments with 2 branch, 4 branch, 8 branch, and 16 branch pipelines. Each branch processes 100 million records, with each record around 1kb in size.  Without stage consolidation, the pipeline can be executed with 1gb executors. With stage consolidation, the 4 and 8 branch pipelines needed 2gb executors, and the 16 branch pipeline needed 3gb executors. 

Stage consolidation was added as a beta feature in CDAP 6.1.4. It can be turned on by setting the spark.cdap.pipeline.consolidate.stages runtime argument to true. Like any runtime argument, this can be set at the pipeline, namespace, or instance level.

Parallel Jobs

By default, Spark jobs are executed serially. Starting from CDAP 6.1.0, jobs can be executed in parallel by setting the pipeline.spark.parallel.sinks.enabled runtime argument to true. Like any runtime argument, this can be set at the pipeline, namespace, or instance level. Parallel execution is conceptually similar to breaking a pipeline apart into multiple pipelines. You will need a larger cluster to take full advantage of the parallelism, and you will need to be ok with source data being read multiple times.

Parallel sinks are enabled in the example pipeline below. The pipeline is configured to read 1 million records at the source.

Metrics showing for both branches at the same time instead of one branch completing entirely before the next one starts.

Note that the source has output more than the 1 million records it is configured to output. Remember that Spark caches data during a job and not as a separate job. This means that caching cannot be relied upon to prevent multiple reads from a source. In most situations, it is not worth caching anything in a pipeline with parallel execution enabled. As such, parallel execution should only be used if multiple reads from the source is not a problem.

Even though both Spark jobs are launched at the same time, they may not begin at exactly the same time. Also, if your cluster does not have enough resources to run the entirety of the jobs at the same time, one job may receive more resources than the other, causing them to complete in different amounts of time. In this example, since there are two Spark jobs, your cluster will need to be twice as big as the cluster needed if parallel execution was off. Similarly, a pipeline with N jobs will need a cluster N times as large to take full advantage of the parallelism.

Filters

In most situations, any filtering of data or dropping of fields should occur as early in the pipeline as possible. This is especially true when your pipeline contains shuffles, as it will reduce the amount of I/O needed when performing the shuffle. I/O is often the most time consuming part of a pipeline, so anything that reduces it will speed things up.