Data Skew

Data skew is an important characteristic to consider when implementing joins. A skewed join happens when a significant percentage of input records in one dataset have the same key and therefore join to the same record in the second dataset. This is problematic due to the way the execution framework handles joins. At a high level, all records with matching keys are grouped into a partition, these partitions are distributed across the nodes in a cluster to perform the join operation. In a skewed join, one or more of these partitions will be significantly larger than the rest, which will result in a majority of the workers in the cluster remaining idle while a couple workers process the large partitions. This results in poor performance since the cluster is being under utilized.

Plugin version: 2.9.0

Solution 1: In-Memory Join (Spark only)

This feature was introduced in CDAP 6.1.3.

This option is only available if the Spark engine is used. MapReduce does not support In-Memory joins.

The first approach for increasing performance of skewed joins is using an In-Memory join. An in-memory join is a performance improvement when a large dataset is being joined with a small dataset. In this approach, the small dataset is broadcast to workers and loaded into workers memory. Once it is in memory, a join is performed by iterating through the elements of the large dataset. With this approach, data from the large dataset is never shuffled. Data with the same key can be joined in parallel across the cluster instead of handled only by a single worker providing optimal performance. Data sets that have a small size can be used for in-memory joins. Make sure the total size of broadcast data does not exceed 2 GB.

Solution 2: Distribution

This feature was introduced in CDAP 6.2.2.

Distribution should be used when the smaller dataset cannot fit into memory. This solution solves the data skew problem by adding a composite key (salt) on both the datasets and by specifying a join condition with a combination of composite keys along with original keys to be joined. The addition of the composite key allows the data to be spread across more workers therefore increasing parallelism which increases overall performance. The following example illustrates how salting works and how it is used for distribution.

Suppose the skewed side (Stage A) has data like:

id

country

id

country

0

us

1

us

2

us

3

gb

where most of the data has the same value for the country. The unskewed side (Stage B) has data like:

country

code

country

code

us

1

gb

44

The join key is country. With a Distribution Size of 2 and Skewed Stage of ‘Stage A’, a new salt column is added to the skewed data, where the value is a random number between 0 and 1:

id

country

salt

id

country

salt

0

us

0

1

us

1

2

us

0

2

gb

1

The unskewed data is exploded, where each row is becomes 2 rows, one for each value between 0 and Distribution Size:

country

code

salt

country

code

salt

us

1

0

us

1

1

gb

44

0

gb

44

1

The salt column is added to the join key and the join can be performed as normal. However, now the skewed key can be processed across two workers which increases the performance.

Solution 3: Reorder inputs (BigQuery Transformation Pushdown only)

Skewed joins can cause a lot of shuffling and disk operation in BigQuery, which impacts query performance. BigQuery has some general recommendations when executing join operations on Skewed data, which can be found here Data skew.

If one of the sides of a join operation is known to be heavily skewed, then this feature can be used so the BigQuery SQL engine sorts stages appropriately when building the SQL Join statements. This reduces the possibility of join operations running into a resource exceeded error during the join operation execution.

In order to set the most skewed input for a join operation, use the Skewed Input Stage setting.

Example

This example performs an inner join on records from customers and purchases inputs on customer id. It selects customer_id, name, item and price fields as the output fields. This is equivalent to a SQL query like:

SELECT customes.id as customer_id, customers.first_name as name, purchases.item, purchases.price FROM customers INNER JOIN purchases ON customers.id = purchases.customer_id

For example, suppose the joiner receives input records from customers and purchases as below:

id

first_name

last_name

street_address

city

state

zipcode

phone number

id

first_name

last_name

street_address

city

state

zipcode

phone number

1

Douglas

Williams

1, Vista Montana

San Jose

CA

95134

408-777-3214

2

David

Johnson

3, Baypointe Parkway

Houston

TX

78970

804-777-2341

3

Hugh

Jackman

5, Cool Way

Manhattan

NY

67263

708-234-2168

4

Walter

White

3828, Piermont Dr

Orlando

FL

73498

201-734-7315

5

Frank

Underwood

1609 Far St.

San Diego

CA

29770

201-506-8756

6

Serena

Woods

123 Far St.

Las Vegas

Nv

45334

888-605-3479

customer_id

item

price

customer_id

item

price

1

donut

0.80

1

coffee

2.05

2

donut

1.50

2

plate

0.50

3

tea

1.99

5

cookie

0.50

Output records will contain inner join on customer id:

customer_id

name

item

price

customer_id

name

item

price

1

Douglas

donut

0.80

1

Douglas

coffee

2.05

2

David

donut

1.50

2

David

plate

0.50

3

Hugh

tea

1.99

5

Frank

cookie

0.50

Created in 2020 by Google Inc.