Joins are often the most expensive part of a pipeline. Like everything else in a pipeline, joins are executed in parallel. The first step of a join is to shuffle data so that every record with the same join key is sent to the same executor. Once all the data is shuffled, it is actually joined, and output to the rest of the pipeline.
For example, suppose an 'Purchases' dataset is being joined to an 'Items' dataset. Each purchase record contains an item name and number purchased. Each item record contains the item name and the price of that item. A join is performed on the item name in order to calculate the total price of each purchase. When the data is joined, data is shuffled across the cluster such that records with the same id end up on the same executor.
When the join keys are fairly evenly distributed, joins perform well because they can be executed in parallel.
Like any shuffle, data skew will negatively impact performance. In the small example above, eggs are purchased much more frequently than chicken or milk, which means the executor joining egg purchases does more work than the other executors. If you notice that a join is skewed, there are two ways to improve performance.
Automatic splitting up of skewed partitions
Starting CDAP 6.4.0, with adaptive query execution, really heavy skews will be handled automatically. As soon as a join produces some partitions much bigger than others, they are split into smaller ones. To confirm you have adaptive query execution enabled, see the Autotuning section above.
In-memory joins were added in version 6.1.3. An in-memory join can be performed if one side of the join is small enough to fit in memory. In this situation, the small dataset is loaded into memory, and then broadcast to every executor. This means the large dataset is not shuffled at all, removing the uneven partitions generated when shuffling on the join key.
In the previous example, the items dataset is first loaded into memory of the Spark driver. It is then broadcast out to each executor. Executors can now join the data without shuffling any of the purchase dataset.
This approach requires you to give enough memory to both the Spark driver and executors to allow them to store the broadcast dataset in memory. By default, Spark reserves slightly less than 30% of its memory for storing this type of data. When using in-memory joins, multiply the size of the dataset by 4 and set that as the executor and driver memory. For example, if the items dataset was 1 GB in size, we would need to set the executor and driver memory to at least 4 GB. Datasets larger than 8 GB cannot be loaded into memory.
When both sides of the join are too large to fit in memory, a different technique can be used to break up each join key into multiple keys in order to increase the level of parallelism. This technique can be applied to inner joins and left outer joins. It cannot be used for full outer joins. Key distribution was added in version 6.2.2.
In this approach, the skewed side is salted with a new integer column with a random number from 1 to N. The unskewed side is exploded, with each existing row generating N new rows. A new column is added to the exploded side, populated with each number from 1 to N. A normal join is then performed, except the new column is added as part of the join key. In this way, all the data that used to go to a single partition is now spread out to up to N different partitions.
In the example above, the distribution factor N is set to 3. The original datasets are shown on the left. The salted and exploded versions of the dataset are shown in the middle. The shuffled data is shown on the right, with 3 different executors joining egg purchases instead of just one.
Greater parallelism is achieved by increasing the distribution factor. However, this comes with the cost of exploding one side of the join, resulting in more data shuffled across the cluster. Because of this, the benefit of increasing the distribution factor diminishes as it increases. In most situations, you should keep it to 20 or lower.