Adding processed date to all records in the destination
A pipeline stage may need extra information to proceed with execution. In this example, we will explore adding a date to each record in the destination.
Instructions
Go to Hub and deploy the plugin “Field Adder Transform”.
Create a new pipeline.
Add a GCSFile source with project id=cloud-data-fusion-demos and Path =
gs://campaign-tutorial/customers.csv
.Add a Wrangler transform with the following recipe:
parse-as-csv :body ',' false drop body rename body_1 Fname rename body_2 a rename Fname name rename a address rename body_3 city rename body_4 state rename body_5 country
In the Field Adder transform, set the Field Name to
processed_date
and set the Field Value to use a macro. The macro should use the logicalStartTime macro function, which returns the logical time of a pipeline run. The start time can be formatted by passing optional arguments to the function. For example, if the pipeline is run at2020-05-05T01:00:00,
we can use the formatyyyy-MM-dd'T'HH-mm-ss
with an offset of1d
to return the start time minus one day as2020-05-04T01:00:00.
Add a BigQuery sink with the following params:
The final pipeline should look like:
Finally, save, deploy, and run the pipeline. Note that you do not need to specify a value for the logicalStartTime runtime argument.
Created in 2020 by Google Inc.