Configuring the Joiner

Plugin version: 2.11.0

Configure the Joiner to join data from two or more inputs.

Joiner properties

Property

Macro Enabled?

Release Introduced

Description

Property

Macro Enabled?

Release Introduced

Description

Input Schema

No

 

Derived from the output schema of the previous stages. Cannot be changed.

Fields

Yes

 

Required. List of fields from each input that should be included in the output. Output field names must be unique. If the same field name exists in more than one input, each field must be aliased (renamed) to a unique output name.

Join Type

Yes

 

Optional. Type of join to perform. A join between two required input is an inner join. A join between a required input and an optional input is a left outer join. A join between two optional inputs is an outer join.

A join of more than two inputs is logically equivalent to performing inner joins over all the required inputs, followed by left outer joins on the optional inputs.

Join Condition Type

Yes

6.4.0 / 2.6.0

Optional. Type of join condition to use. A condition can either be 'Basic' or 'Advanced'. Advanced join conditions cannot be used in streaming pipelines or with the MapReduce engine. Advanced join conditions can only be used when joining two inputs.

Join Condition

Yes

 

Required. When the condition type is 'Basic', the condition specifies the list of keys to perform the join operation. The join will be performed based on equality of the join keys. When the condition type is 'Advanced', the condition can be any SQL expression supported by the engine. It is important to note that advanced join conditions can be many times more expensive than basic joins performed on equality. Advanced outer joins must load one of the inputs into memory. Advanced inner joins do not need to load an input into memory. However, without an in-memory input, the engine will be forced to calculate a very expensive cartesian product.

Get Schema

No

 

Optional. If you drop fields or add aliases to field names, Get Schema refreshes the output schema. Also used to validate the join condition.

Inputs to Load in Memory

Yes

6.2.1

Optional. Spark only. Hint to the underlying execution engine that the specified input data should be loaded into memory to perform an in-memory join. This is ignored by the MapReduce engine and passed onto the Spark engine. An in-memory join performs well when one side of the join is small (for example, under 1gb). Be sure to set Spark executor and driver memory to a number large enough to load all of these datasets into memory. This is most commonly used when a large input is being joined to a small input and will lead to much better performance in such scenarios. A general rule of thumb is to set executor and driver memory to fives times the dataset size.
For more information, see “Inputs to Load into Memory” below.

Join on Null keys

Yes

6.2.1

Optional. For more information, see Join on Null Keys below.

Number of Partitions

Yes

6.2.1

Optional. Number of partitions to use when grouping fields. This value is ignored if an input is loaded into memory or if an advanced join is being performed. When an input is loaded into memory, the number of partitions will be equal to the number of partitions used for the input that is not loaded into memory. If no value is given, or if an inner advanced join is being performed, the number of partitions will be determined by the value of spark.sql.shuffle.partitions in the engine config, which defaults to 200. For more information, see Number of Partitions below.

Skewed Input Stage

Yes

6.2.2

Optional. Name of the skewed input stage. The skewed input stage is the one that contains many rows that join to the same row in the non-skewed stage. E.g. If stage A has 1,000,000 rows that join on the same row in stage B, then stage A is the skewed input stage.

For more information about Data Skew, see Data Skew.

Distribution Enabled

Yes

6.2.2

Optional. Enabling distribution will increase the level of parallelism when joining skewed data. A skewed join happens when a significant percentage of input records have the same key. Distribution is possible when the following conditions are met:
1. There are exactly two input stages.
2. Broadcast is not enabled for either stage.
3. The skewed input fields are marked as required.

Distribution requires two parameters:

  1. Distribution Size: This controls the size of the salt that will be generated for distribution. The Number of Partitions property should be greater than or equal to this number for optimal results. A larger value will lead to more parallelism but it will also grow the size of the non-skewed dataset by this factor.

  2. Skewed Input Stage: Listed above.

For more information about Distribution and data skew, see Data Skew.

Distribution Size

Yes

6.2.2

Optional. This controls the size of the composite key (salt) that will be generated for distribution. The Number of Partitions property should be greater than or equal to this number for optimal results. A larger value will lead to more parallelism but it will also grow the size of the non-skewed dataset by this factor.

Input with Larger Data Skew

Yes

6.7.0/2.9.0

Optional. Defines the most skewed stage for a join operation. You can select the most skewed input stage when executing join operations in BigQuery ELT Transformation Pushdown. This might prevent a RESOURCE EXCEEDED error when executing join operations with skewed datasets.

Output Schema

Yes

 

Required. The output schema should not be manually edited. If you make changes to the the input fields, such as adding an alias to a duplicate field name, always use the Get Schema button to refresh the output schema. The output schema will have the same order of fields as the input schemas. Field names in the output schema must be unique.

Input Aliases

When using advanced join conditions, input aliases can be specified to make the SQL expression more readable. For example, if the join inputs are named 'User Signups 2020' and 'Signup Locations', they can be aliased to a simpler names like 'users' and 'locations'. This allows you to use a simpler condition like 'users.loc = locations.id or users.loc_name = locations.name'.

Inputs to load into memory (Spark only)

You can configure the Joiner to perform an in-memory join (also known as a broadcast join in Spark). An in-memory join improves performance when a large dataset is being joined to a small dataset. If one side of the join is small (on the order of megabytes, not gigabytes), loading the smaller dataset into memory can greatly speed up the join operation.

To configure the Joiner to load one dataset into memory:

  1. Scroll to the Advanced section of the Joiner properties page.

  2. From the Inputs to Load into Memory drop-down box, select the smaller dataset to load into memory.

If you configure the Joiner to load the smaller dataset into memory, the entire dataset must be able to fit in both the driver and executor memory. As a guideline, you should set both driver and executor memory to at least 5 times the total size of all datasets that will be loaded into memory. For example, if the dataset is 1 GB, set executor and driver memory to 5 GB.

If the execution engine is Spark and you don’t configure the Joiner to load inputs into memory, the join operation will first sort each data partition on the join key and then write each record to disk (also known as a sort merge join). This operation is very reliable, but slows the join operation. If the datasets are highly skewed, the join can take a long time to run.

In previous versions, all join operations perform a shuffle hash join, which stores all records with the same key in memory.

Join on Null Keys

Highly skewed data can impact Joiner performance. Highly skewed datasets occur when there is a very common key that has a large number of records. For example, you have a dataset with with 1 million rows and 500,000 of the rows have the same key value. Often, highly skewed datasets have a large number of null keys.

Pipeline Studio joins rows together if both of their key values are null, which can impact performance if a dataset is highly skewed. You can choose whether to join rows together if both of their key values are null. By default, Join on NuIl Keys is set to True. Traditional relational database systems do not join on null key values. If your dataset is highly skewed due to a large number of null keys, set Join on Null Keys to False to improve performance. Even if your datasets are not highly skewed, in most cases, you’ll want to set Join on Null Keys to False to ensure optimal performance.

Example

Suppose you want to join records from from the Customer and Purchase source datasets with the join condition set to customer_name and customer_name.

The Purchase input includes the following records:

purchase_id

customer_name

item

1

alice

donut

2

coffee

coffee

3

bob

water

The Customer input includes the following records:

customer_id

customer_name

1

alice

2

 

3

bob

If you select Left join and Purchase as the required input and set Join on Null Keys to TRUE, the null rows are joined:

purchase_id

customer_name

item

customer_id

name

purchase_id

customer_name

item

customer_id

name

1

alice

donut

1

alice

2

 

coffee

2

 

3

bob

water

3

bob

If you select Left join and Purchase as the required input and set Join on Null Keys to FALSE, the null customer name on the left did not get joined to the null customer name on the right:

purchase_id

customer_name

item

customer_id

name

purchase_id

customer_name

item

customer_id

name

1

alice

donut

1

alice

2

 

coffee

 

 

3

bob

water

3

bob

Note: In previous releases, if you have a highly skewed dataset due to a large number of null keys, to improve Joiner performance, you can use Wrangler to remove null keys before the join. For more information, see https://cdap.atlassian.net/wiki/spaces/DOCS/pages/382042959.

Number of partitions

By default, the number of partitions is not set in the Joiner, Deduplicate, and Group By transformations. This allows the execution framework to determine the number of partitions. If you set the number of partitions in the Joiner, ensure that the number of partitions is less than the number of executors.

For larger Dataproc clusters, it is a good idea to set Number of Partitions to the same number of cores available in your cluster (worker cpus * number of worker nodes).

Large datasets with a large number of unique keys require more partitions. The higher the number of partitions to process the join, the more performant the join.

For example, suppose you have one input with a million keys and the other with 2 million keys. In this case, you might specify the partitions as 100. Then, depending on the number of unique keys in the datasets, the execution framework will create 100 partitions and divide the 100 executors between the two datasets.

If you have hot-spotting or skewed datasets, partitions don’t affect performance. For example, if you have a million keys in the required input, but all the 2 million keys in the non-required input are the same, it’s not going to help the join performance because all of those 2 million keys will go to one executor. For skewed datasets, you might want to increase memory resources for executors to at least 8 GB. 

Validating the Joiner properties

After you configure the properties, it’s a best practice to validate the plugin. 

Validation will detect any errors in the join configuration, including the validity of the join keys and output schema field name collisions. Validating the plugin is especially helpful if you change any of the input fields.

Created in 2020 by Google Inc.