Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In CDAP 6.1.3 and earlier, data is cached aggressively to try and prevent any type of reprocessing in the pipeline. Starting from CDAP 6.1.4, data is cached at fewer points, only to prevent sources from being read multiple times. This change was done because caching is often more expensive than re-computing a transformation. In the example above, this means 6.1.3 would cache the output of the Distinct plugin, while 6.1.4 would not, since Spark itself knows it can start from the intermediate shuffle data. However, it does mean that the metrics for the number of output records will be double the number of records processed on each branch, as the post-shuffle work is still computed twice.

Cache Levels

Spark supports several different persistence levels when caching data. You can set the cache level for a pipeline by setting spark.cdap.pipeline.caching.storage.level in the Engine config section of the pipeline.

...

The full list of possible cache levels can be found at https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence, but we will focus on MEMORY_ONLY, MEMORY_AND_DISK, and DISK_ONLY.  

The MEMORY_ONLY cache level is the fastest because it avoids I/O. However, this requires the most knowledge about your dataset, as you will need to set executor memory high enough to hold all of the data in memory. If Spark runs out of memory, it will kick some part of the data out of memory, which will result in Spark re-computing data. As discussed above, this is often undesirable as it can cause sources to be read multiple times, potentially resulting in different data. In practice, we have found that Spark will sometimes fail to kick out enough cached data and will instead fail with out of memory exceptions.

The MEMORY_AND_DISK level tells Spark to try to cache data in memory first. If it is running low on memory, it will spill cached data to disk and read it from there. In theory, this gives all the benefits of in-memory caching and will also avoid the re-computation problem. In practice, we have found that large Spark jobs will often run out of memory and fail when using this cache level.

The DISK_ONLY level tells Spark to cache data directly to disk instead of trying to keep anything in memory. This is desirable when you know the cached data is much larger than what can fit in memory and will need to be written to disk. We have also found this to be the most reliable cache level, as it will never cause out of memory exceptions.

In CDAP 6.1.3 and earlier, the default cache level is MEMORY_AND_DISK. In CDAP 6.1.4 and later, the default cache level is DISK_ONLY. This change was made because Spark pipelines can run out of memory and fail when using the MEMORY_AND_DISK level. 

Manual Caching

Pipelines will automatically determine which points in the pipeline need to be cached. In some more advanced use cases, it can be desirable to turn this off and manually set cache points. For example, you may be using a transform that makes an API call to an external service, with cost associated with each API call. You may want to manually cache the output of that stage in order to prevent the stage from being reprocessed and making additional API calls. This can be done by disabling auto caching and manually placing the Data Cacher plugin at points in the pipeline. The Data Cacher plugin can be found in the Analytics section in CDAP 6.1.4 and later.

...

Automatic caching can be disabled by setting the spark.cdap.pipeline.autocache.enable property to false in the Engine config.

...

Stage Consolidation

In certain circumstances, it is possible to consolidate several logical branches of a pipeline into a single physical branch. This reduces the number of Spark jobs and caches that are needed, resulting in better execution times. In our experiments, this has led to up to a 4x improvement in pipeline execution times. 

Consider a simple pipeline with two branches.

...

Without stage consolidation, this pipeline is executed using two Spark jobs, with data from the source cached to prevent multiple source reads. When stage consolidation is enabled, the Wrangler, Wrangler2, Trash, and Trash2 stages are consolidated into a single physical block, which allows the pipeline to be executed with just a single job. In effect, it is like there is only one sink in the pipeline instead of two.

Stage consolidation can only be done if there are no shuffles being performed on the branch. Most plugins in the Analytics section (Joiner, Group By, Distinct, Deduplicate) require a shuffle. If the pipeline is modified to contain a Distinct plugin after Wrangler2, the branches cannot be consolidated.

Stage consolidation is a performance boost in almost all situations. However, it does require a little more executor memory if a lot of branches are being consolidated. We have conducted experiments with 2 branch, 4 branch, 8 branch, and 16 branch pipelines. Each branch processes 100 million records, with each record around 1kb in size.  Without stage consolidation, the pipeline can be executed with 1gb executors. With stage consolidation, the 4 and 8 branch pipelines needed 2gb executors, and the 16 branch pipeline needed 3gb executors. 

Stage consolidation was added as a beta feature in CDAP 6.1.4. It can be turned on by setting the spark.cdap.pipeline.consolidate.stages runtime argument to true. Like any runtime argument, this can be set at the pipeline, namespace, or instance level.

Parallel Jobs

By default, Spark jobs are executed serially. Starting from CDAP 6.1.0, jobs can be executed in parallel by setting the pipeline.spark.parallel.sinks.enabled runtime argument to true. Like any runtime argument, this can be set at the pipeline, namespace, or instance level. Parallel execution is conceptually similar to breaking a pipeline apart into multiple pipelines. You will need a larger cluster to take full advantage of the parallelism, and you will need to be ok with source data being read multiple times.

Parallel sinks are enabled in the example pipeline below. The pipeline is configured to read 1 million records at the source.

...

Metrics showing for both branches at the same time instead of one branch completing entirely before the next one starts.

Note that the source has output more than the 1 million records it is configured to output. Remember that Spark caches data during a job and not as a separate job. This means that caching cannot be relied upon to prevent multiple reads from a source. In most situations, it is not worth caching anything in a pipeline with parallel execution enabled. As such, parallel execution should only be used if multiple reads from the source is not a problem.

Also note that one branch has processed twice as many records as the other. Even though both Spark jobs are launched at the same time, they may not begin at the same time. Also, if your cluster does not have enough resources to run the entirety of the jobs at the same time, one job may receive more resources than the other, causing them to complete in different amounts of time. In this example, since there are two Spark jobs, your cluster will need to be twice as big as the cluster needed if parallel execution was off. Similarly, a pipeline with N jobs will need a cluster N times as large to take full advantage of the parallelism.

Filters

In most situations, any filtering of data or dropping of fields should occur as early in the pipeline as possible. This is especially true when your pipeline contains shuffles, as it will reduce the amount of I/O needed when performing the shuffle. I/O is often the most time consuming part of a pipeline, so anything that reduces it will speed things up.