Using Transformation pushdown

You can enable Transformation Pushdown to leverage the advantages of both ETL and ELT processing in your pipelines.

When you use Transformation Pushdown, CDAP executes certain Join operations and transformations in BigQuery (instead of Apache Spark). All other stages in a pipeline are executed using Spark. For pipelines that perform multiple complex joins and supported transformations, BigQuery can execute these joins operations faster than Spark.

Note: When you preview data, CDAP uses Spark as the execution engine, including processing joins and transformations.

Transformations that support pushdown

CDAP supports Transformation Pushdown for pipelines that include:

  1. Joiner Transformations

  2. Group By Aggregation (with certain aggregations)

  3. Deduplicate Aggregation (when using certain filter conditions)

Joiner Transformation (6.5.0+)

Joiner transformations are supported for both Basic (on-keys) and Advanced Join operations. Joins must have exactly 2 input stages for the execution to take place in BigQuery.

Note that join stages configured to load one or more inputs into memory are not pushed down by default, unless:

  1. One or both inputs to this Join stage are already pushed down.

  2. The stage is configured as a stage that should be executed in the SQL Engine using the Stages to force pushdown property. (The Stages to force pushdown property was introduced in CDAP 6.7.0.)

BigQuery Sink (6.7.0+)

When a BigQuery Sink follows a stage that has been already executed in BigQuery, the operation to write records into BigQuery can be performed directly within BigQuery.

In order to ensure this BigQuery Sink can take advantage of the performance improvements provided by this feature, you must ensure that:

  1. The Service Account configured for BigQuery Transformation Pushdown has permissions to create and update tables in the dataset used by the BigQuery Sink.

  2. The Datasets used for BigQuery Transformation Pushdown and BigQuery Sink must be stored in the same location.

  3. The operation is either

    1. Insert

      1. The Truncate Table option is not supported

    2. Update

    3. Upsert

Note: The sink operation will use the existing workflow and not execute a direct copy if the above mentioned conditions are not met.

Group By Aggregation (6.7.0+)

Group By Aggregations can be executed in BigQuery when the following aggregation operations are in use:

Operation

Comments

Avg

 

Collect List

Null values are removed from the output array.

Collect Set

Null values are removed from the output array.

Concat

 

Concat Distinct

 

Count

 

Count Distinct

 

Count Nulls

 

Logical And

 

Logical Or

 

Max

 

Min

 

Standard Deviation

 

Sum

 

Sum of Squares

 

Corrected Sum of Squares

 

Variance

 

Shortest String

 

Longest String

 

Note: Conditional aggregations, which allow the use of a filter expression before aggregating records, are not supported. If a conditional aggregation is used, the group by operation will execute in Spark.

Group By aggregations are executed in BigQuery in the following scenarios:

  1. The Group By aggregation stage follows a stage that has already been pushed down.

  2. The Group By aggregation stage is configured as a stage that should be executed in BigQuery using the Stages to force pushdown option.

Deduplicate Aggregation (6.7.0+)

Deduplicate aggregations can be executed in BigQuery when the following Filter Operations are configured:

  1. No filter operation specified

  2. ANY (tries to find a non-null value for the desired field)

  3. MIN (minimum value found for the specified field)

  4. MAX (maximum value found for the specified field)

The following operations are not supported:

  1. FIRST

  2. LAST

Deduplicate are executed in BigQuery in the following scenarios:

  1. The Deduplicate stage follows a stage that has already been pushed down.

  2. The Deduplicate stage is configured as a stage that should be executed in BigQuery using the Stages to force pushdown option.

​​BigQuery Source Pushdown (6.8.0+)

BigQuery Source Pushdown is available in CDAP versions 6.8.0 and later.

When a BigQuery Source follows a stage that is compatible for BigQuery pushdown, the pipeline is able to execute all compatible stages within BigQuery.

Internally, CDAP copies the records necessary to execute the pipeline within BigQuery.

When using source pushdown, the table partitioning and clustering properties are preserved, which lets you use these properties to optimize further operations like joins.

Additional requirements

To use BigQuery Source Pushdown, the following requirements must be in place:

  • The service account configured for BigQuery Transformation Pushdown must have permissions to read tables in the BigQuery Source's dataset.

  • The Datasets used in the BigQuery Source and the dataset configured for Transformation Pushdown must be stored in the same location.

Wrangler Filter Pushdown (6.9.0+)

Wrangler Filter Pushdown is available in CDAP versions 6.9.0 and later. It lets you push down Precondition operations in the Wrangler plugin to BigQuery.

Wrangler Filter Pushdown is only supported with the SQL mode for Preconditions, also newly added in 6.9.0. In this mode, the plugin accepts a precondition expression in ANSI-standard SQL.

If the SQL mode is used for preconditions, Directives and User Defined Directives are disabled for the Wrangler transformation plugin as they are not supported along with preconditions in SQL mode.

SQL mode for preconditions is currently unsupported for Wrangler plugins with multiple inputs.

Wrangler filters are executed in BigQuery in the following cases:

  • It follows a stage that has already been pushed down.

  • You configured it to be executed in SQL Engine (see the Stages to force execution property).

Window Aggregations Pushdown (6.9.0+)

Transformation Pushdown is available for Window aggregations in CDAP version 6.9.0 and later. Window aggregations in BigQuery are available for the following functions are supported for BigQuery Transformation Pushdown: 

  • Rank

  • Dense Rank

  • Percent Rank

  • N tile

  • Row Number

  • Median

  • Continuous Percentile

  • Lead

  • Lag

  • First

  • Last

  • Cumulative distribution

  • Accumulate. 

Window aggregations are executed in BigQuery in the following cases:

  • It follows a stage that has already been pushed down.

  • You configured it to be executed in SQL Engine (see the Stages to force execution property).

How Transformation Pushdown works

When you run a pipeline with Transformation Pushdown enabled, CDAP will push down compatible stages into BigQuery for execution. The remaining stages in a pipeline will continue to execute in Spark.

When executing transformations:

  1. CDAP loads the input datasets into BigQuery (by writing records into GCS and then executing a BigQuery Load job). For 6.8.0+, records are directly copied to BigQuery execution dataset). 

  2. Join operations and supported transformations are then executed as BigQuery jobs using SQL join statements. 

  3. After the supported join operations and transformations pushed down to BigQuery are executed, records may be exported from BigQuery for further processing in Spark if needed. However, if a BigQuery sink follows a stage which was executed in BigQuery, records will be written directly to the destination BigQuery Sink table. For more information, see the BigQuery Sink section.

The following diagram shows when BigQuery executes transformations rather than Spark.

When to use Transformation pushdown

Executing transformations operations in BigQuery involves the following steps:

  1. Write records into BigQuery for supported stages

  2. Execute supported stages in BigQuery

  3. Read records from BigQuery after the supported stages are executed, unless they are followed only by a BigQuery sink.

  4. Source is BigQuery and is followed by supported stages.

Depending on the size of your datasets, there might be considerable network overhead, which can have a negative impact on overall pipeline execution time when Transformation Pushdown is enabled.

Due to this network overhead, Transformation Pushdown is better suited for pipelines where multiple supported operations are executed in sequence (with no steps between these stages) and the performance gains from BigQuery execution of the transformations, relative to Spark, outweighs the latency of data movement into and possibly out of BigQuery.

Best practices

Adjust cluster and executor sizes

If your pipeline structure consists mostly of simple transformations and joins, your pipeline can benefit from additional parallelism when writing and reading records to and from BigQuery.

The ideal outcome of optimizing your pipeline’s resource utilization is that your pipeline will take full advantage of the provisioned Dataproc Cluster (by fully utilizing the available CPU and Memory for your instance), while also benefiting from the execution speed of BigQuery for large join operations.

A relatively straightforward solution for achieving substantial parallelism on your pipelines is to take advantage of Autoscaling Dataproc clusters

However, additional performance benefits can be achieved by tweaking your resource configurations to optimize resource utilization in the stages of your pipeline where records are pushed or pulled from BigQuery during your pipeline execution.

Recommended: Experiment with increasing the number of CPU cores for your executor resources (up to the number of CPU cores that your worker node uses). This allows your executors to maximize CPU utilization during the serialization/deserialization steps required to write data into BigQuery and then read from BigQuery when needed for further processing. For tips on cluster sizing, see Cluster sizing.

Another benefit of pushing down Join operations into BigQuery is that your pipelines may now be able to run on smaller clusters. If your pipelines are structured in such a way that Joins are the most resource intensive operation in your pipeline, you can experiment with smaller cluster sizes (as the heavy join operations are now performed in BigQuery), allowing you to potentially reduce your overall compute costs.

Faster data retrieval using the BigQuery Storage Read API

BigQuery Transformation Pushdown can execute large join and transformation operations in BigQuery in a short amount of time. However, to continue processing these records in the pipeline, it might be necessary to read these records from BigQuery into Spark.

In CDAP 6.7.0, you can use the BigQuery Storage Read API to retrieve records after executing join or aggregation operations. Using the BigQuery Storage Read API improves latency and can result in faster read operations, reducing the overall pipeline execution time. 

The BigQuery Storage Read API benefits from having the ability to read records in parallel, so it is useful to adjust executor sizes to maximize parallelism. If resource intensive operations are pushed down to BigQuery, it can be useful to reduce the memory allocation for the executors as a way to ensure a high level of parallelism when running the pipeline. For more information, see the Adjust cluster and executor sizes.

The BigQuery Storage Read API is disabled by default, and it can be enabled in execution environments where Scala 2.12 is installed (including Dataproc 2.0 and Dataproc 1.5). 

Consider the dataset size

Consider the dataset sizes involved in join operations. For join operations that generate a substantial number of output records (for example, something that resembles a cross join operation), the resulting dataset size could potentially be orders of magnitude larger than the input dataset. It’s necessary to take into account the overhead of pulling these records back into Spark when additional Spark processing for these records is needed (such as a Transformation or a Sink) in the context of the overall pipeline performance.

Skewed Joins

Join operations where the data is heavily skewed may cause the BigQuery operation to exceed BigQuery’s resource utilization limits, which will cause the join operation to fail. You can prevent this behavior by specifying the most skewed input using the Skewed Input Stage option in the Joiner plugin.

Using this option allows the platform to arrange join inputs in a way that reduces the risk of the BigQuery statement exceeding BigQuery’s resource utilization limits.

Enabling Transformation Pushdown

You can enable Transformation Pushdown for deployed pipelines.

To enable Transformation Pushdown, follow these steps:

  1. From the Pipeline List page, double-click the deployed pipeline you want to configure for Transformation Pushdown.

  2. From the Deployed Pipeline page, click Configure.

  3. Click Enable Transformation Pushdown.


    The configuration properties for Transformation Pushdown appear. BigQuery Dataset is the only required property and is used to store temporary tables. All other properties are optional.

Configuration properties

Property

Macro Enabled?

Version Introduced

Description

Property

Macro Enabled?

Version Introduced

Description

Enable Transformation Pushdown

No

6.5.0

Select to enable Transformation Pushdown to BigQuery.

BigQuery Dataset

Yes

6.5.0

Required. The BigQuery dataset that is used to stage temporary tables used for BigQuery execution. A dataset is contained within a specific project. Datasets are top-level containers that are used to organize and control access to tables and views.

Use connection

No

6.7.0

Optional. Whether to use an existing connection.

Connection

Yes

6.7.0

Optional. Name of the connection to use. Project and service account information will be provided by the connection. You can also use the macro function ${conn(connection_name)}

Dataset Project ID

Yes

6.5.0

Optional. The project the dataset belongs to. This is only required if the dataset is not in the same project that the BigQuery job will run in. If no value is given, it will default to the configured Project ID.

Project ID

Yes

6.5.0

Optional. Google Cloud Project ID, which uniquely identifies a project. It can be found on the Dashboard in the Google Cloud Platform Console. 

Default is auto-detect.

Service Account Type

Yes

6.5.0

Optional. Select one of the following options:

  • File Path. File path where the service account is located.

  • JSON. JSON content of the service account.

Default is JSON.

Service Account File Path

Yes

6.5.0

Optional. Path on the local file system of the service account key used for authorization. Can be set to 'auto-detect' when running on a Dataproc cluster. When running on other clusters, the file must be present on every node in the cluster.

Default is auto-detect.

Service Account JSON

Yes

6.5.0

Optional. Content of the service account.

Temporary Bucket Name

Yes

6.5.0

Optional. Google Cloud Storage bucket to store temporary data in. Cloud Storage data is deleted after it has been loaded into BigQuery. If it is not provided, a unique bucket is created and then deleted after the pipeline run finishes, and the service account must have permission to create buckets in the configured project.

Location

Yes

6.5.0

Optional. The location where the BigQuery dataset is created. This value is ignored if the dataset or temporary bucket already exists.

Default is the US multi-region.

Encryption Key Name

Yes

6.5.1

Optional. The GCP customer managed encryption key (CMEK) used to encrypt data written to any bucket, dataset, or table created by the plugin. If the bucket, dataset, or table already exists, this is ignored. For more information, see Customer-managed encryption keys.

Retain BigQuery Tables after Completion

Yes

6.5.0

Optional. Select this option to retain all BigQuery temporary tables created during the pipeline run. This can be used for debugging and validation purposes.

Default is No.

Temporary Table TTL (in Hours)

Yes

6.5.0

Optional. Set table TTL for BigQuery temporary tables, in number of hours. This is used as a failsafe in case the pipeline is canceled abruptly and the cleanup process is interrupted (for example, if the execution cluster is shut down abruptly). 

Setting this value to 0 will disable the table TTL.

Default is 72 (3 days).

Job Priority

Yes

6.5.0

Optional. Priority used to execute BigQuery jobs. Select one of the following options:

  • Batch. A batch job is queued and started as soon as idle resources are available, usually within a few minutes. If the job isn’t started within three hours, its priority is switched to interactive.

  • Interactive. An interactive job is executed as soon as possible and counts towards the concurrent rate limit and daily rate limit.

Default is Batch.

Stages to force pushdown

Yes

6.7.0

Optional. Supported stages that should always be executed in BigQuery.

Stages to skip pushdown

Yes

6.7.0

Optional. Stages that should never be pushed down to BigQuery, even when supported.

Use BigQuery Storage Read API

Yes

6.7.0

Optional. Select this option to use the BigQuery Storage Read API when extracting records from BigQuery during pipeline execution. This option can increase the performance of the BigQuery Transformation Pushdown execution. The usage of this API incurs additional costs. This requires Scala 2.12 to be installed in the execution environment.

Default is No.

Viewing logs

The pipeline runtime logs include messages that show the SQL queries that are run in BigQuery. You can see which stages in the pipeline get pushed into BigQuery.

As the pipeline execution begins, you will see log entries detailing which steps of your pipeline can and will be executed in BigQuery:

INFO  [Driver:i.c.p.g.b.s.BigQuerySQLEngine@190] - Validating join for stage 'Users' can be executed on BigQuery: true

DEBUG [batch-sql-engine-adapter:i.c.c.e.s.b.BatchSQLEngineAdapter@131] - Starting push for dataset 'UserProfile'

DEBUG [batch-sql-engine-adapter:i.c.c.e.s.b.BatchSQLEngineAdapter@131] - Starting push for dataset 'UserDetails'

DEBUG [batch-sql-engine-adapter:i.c.c.e.s.b.BatchSQLEngineAdapter@292] - Starting join for dataset 'Users'

INFO  [Driver:i.c.p.g.b.s.BigQuerySQLEngine@190] - Validating join for stage 'UserPurchases' can be executed on BigQuery: true

DEBUG [batch-sql-engine-adapter:i.c.c.e.s.b.BatchSQLEngineAdapter@131] - Starting push for dataset 'Purchases'

DEBUG [batch-sql-engine-adapter:i.c.c.e.s.b.BatchSQLEngineAdapter@292] - Starting join for dataset 'UserPurchases'

INFO  [Driver:i.c.p.g.b.s.BigQuerySQLEngine@190] - Validating join for stage 'MostPopularNames' can be executed on BigQuery: true

DEBUG [batch-sql-engine-adapter:i.c.c.e.s.b.BatchSQLEngineAdapter@131] - Starting push for dataset 'FirstNameCounts'

DEBUG [batch-sql-engine-adapter:i.c.c.e.s.b.BatchSQLEngineAdapter@292] - Starting join for dataset 'MostPopularNames'

DEBUG [batch-sql-engine-adapter:i.c.c.e.s.b.BatchSQLEngineAdapter@193] - Starting pull for dataset 'MostPopularNames'

You will also see the table names that will be assigned for each of the datasets involved in the pushdown execution:

INFO  [batch-sql-engine-adapter:i.c.p.g.b.s.BigQuerySQLEngine@145] - Executing Push operation for dataset Purchases stored in table 2cb2c4b29b2041d9aab9f69ca849494f_8e28f19f7b7947c0a5594194f2893dc5

INFO  [batch-sql-engine-adapter:i.c.p.g.b.s.BigQuerySQLEngine@145] - Executing Push operation for dataset UserDetails stored in table 2cb2c4b29b2041d9aab9f69ca849494f_0a610477d9a241f8b2703e22073bdabe

INFO  [batch-sql-engine-adapter:i.c.p.g.b.s.BigQuerySQLEngine@145] - Executing Push operation for dataset FirstNameCounts stored in table 2cb2c4b29b2041d9aab9f69ca849494f_61968b5609364c7fb133c5fdff08399f

INFO  [batch-sql-engine-adapter:i.c.p.g.b.s.BigQuerySQLEngine@145] - Executing Push operation for dataset UserProfile stored in table 2cb2c4b29b2041d9aab9f69ca849494f_6c39fa171e87476aa7e2695c70f07c32

As the execution continues, you will see the completion of push stages, and eventually the execution of join operations:

DEBUG [batch-sql-engine-adapter:i.c.c.e.s.b.BatchSQLEngineAdapter@133] - Completed push for dataset 'UserProfile'

[...]

DEBUG [batch-sql-engine-adapter:i.c.c.e.s.b.BatchSQLEngineAdapter@133] - Completed push for dataset 'UserDetails'

[...]

DEBUG [batch-sql-engine-adapter:i.c.p.g.b.s.BigQuerySQLEngine@235] - Executing join operation for dataset Users 

[...]

INFO  [batch-sql-engine-adapter:i.c.p.g.b.s.BigQueryJoinDataset@118] - Creating table `2cb2c4b29b2041d9aab9f69ca849494f_82808c054ace4ed2b70699f46380ce51` using job: 5e9c4af3a68543cda5b4116ac068a956 with SQL statement: SELECT `UserDetails`.id AS `id` , `UserDetails`.first_name AS `first_name` , `UserDetails`.last_name AS `last_name` , `UserDetails`.email AS `email` , `UserProfile`.phone AS `phone` , `UserProfile`.profession AS `profession` , `UserProfile`.age AS `age` , `UserProfile`.address AS `address` , `UserProfile`.score AS `score` FROM `your_project.your_dataset.2cb2c4b29b2041d9aab9f69ca849494f_6c39fa171e87476aa7e2695c70f07c32` AS `UserProfile` LEFT JOIN `your_project.your_dataset.2cb2c4b29b2041d9aab9f69ca849494f_0a610477d9a241f8b2703e22073bdabe` AS `UserDetails` ON `UserProfile`.id = `UserDetails`.id

[...]

INFO  [batch-sql-engine-adapter:i.c.p.g.b.s.BigQueryJoinDataset@151] - Created BigQuery table `2cb2c4b29b2041d

INFO  [batch-sql-engine-adapter:i.c.p.g.b.s.BigQuerySQLEngine@245] - Executed join operation for dataset Users

After all stages are completed, you will see a message indicating that the Pull operation has completed. This indicates that the BigQuery export process has been triggered and records will start being read into the pipeline once this export job begins.

DEBUG [batch-sql-engine-adapter:i.c.c.e.s.b.BatchSQLEngineAdapter@196] - Completed pull for dataset 'MostPopularNames'

If the pipeline execution encounters any errors, they will be shown in the logs. 

If you would like to get additional details about the execution of the BigQuery join jobs (such as resource utilization, execution time, error causes, etc), you can view the BigQuery Job data using the Job ID you will find in the job logs.

 

Created in 2020 by Google Inc.