Adding processed date to all records in the destination

A pipeline stage may need extra information to proceed with execution. In this example, we will explore adding a date to each record in the destination.

Instructions

  1. Go to Hub and deploy the plugin “Field Adder Transform”.

  2. Create a new pipeline.

  3. Add a GCSFile source with project id=cloud-data-fusion-demos and Path = gs://campaign-tutorial/customers.csv.

  4. Add a Wrangler transform with the following recipe:

    parse-as-csv :body ',' false drop body rename body_1 Fname rename body_2 a rename Fname name rename a address rename body_3 city rename body_4 state rename body_5 country
  5. In the Field Adder transform, set the Field Name to processed_date and set the Field Value to use a macro. The macro should use the logicalStartTime macro function, which returns the logical time of a pipeline run. The start time can be formatted by passing optional arguments to the function. For example, if the pipeline is run at 2020-05-05T01:00:00, we can use the format yyyy-MM-dd'T'HH-mm-ss with an offset of 1d to return the start time minus one day as 2020-05-04T01:00:00.

     

  6. Add a BigQuery sink with the following params:


    The final pipeline should look like:

Finally, save, deploy, and run the pipeline. Note that you do not need to specify a value for the logicalStartTime runtime argument.

 

Created in 2020 by Google Inc.