Creating a reusable pipeline with the GCS Argument Setter

This topic shows how to build a reusable pipeline that reads data from Cloud Storage, performs data quality checks, and writes to Cloud Storage.

Reusable pipelines have a regular pipeline structure, but use macros to add reusable variables to plugin configurations so that you can specify the variable substitutions at runtime. 

Fields that can be made reusable using macros have a blue M icon on the far right in the Plugin Configuration section:

The GCS Argument Setter provides the ability to read a JSON file from GCS to set arguments for macros in the pipeline. It also provides secure access to your source data by allowing you to set the Service Account information for GCS. It is most commonly used when the structure of a pipeline is static, and its configuration needs to be managed outside the pipeline.

Objectives

  • Use the GCS Argument Setter plugin to allow the pipeline to read different input in every run.

  • Use the GCS Argument Setter plugin to allow the pipeline to perform different quality checks in every run.

  • Use the GCS Argument Setter to ensure your data is secure during when you run a pipeline.

  • Write the output data of each run to Cloud Storage.

Costs

This article uses Google Cloud Storage, which is a billable component of Google Cloud.

Use the pricing calculator to generate a cost estimate based on your projected usage.

Before you begin

  1. In the Google Cloud Console, on the project selector page, select or create a Google Cloud project.

    Note: If you don't plan to keep the resources that you create in this procedure, create a project instead of selecting an existing project. After you finish these steps, you can delete the project, removing all resources associated with the project.

    Go to the project selector page

  2. Make sure that billing is enabled for your Cloud project. Learn how to confirm that billing is enabled for your project.

  3. Enable the Cloud Storage APIs.
    Enable the APIs

  4. Download the following sample file, args.json, and upload it to a GCS bucket. This is the JSON file that you’ll call from the GCS Argument Setter:

Read from Cloud Storage

  1. In the Pipeline Studio, add a GCS batch source to the canvas.

  2. Click the Properties button.

  3. In the Reference name field, enter a name.

  4. In the Project ID field, enter your Google Cloud Project ID.

  5. In the Path field, enter ${input.path}. This macro controls what the Cloud Storage input path will be in the different pipeline runs.

  6. (Optional) In the Output Schema panel, remove the offset field from the output schema by clicking the trash icon in the offset field row.
    Recommended: The offset field isn’t required. Removing it for text sources is a best practice to include only required fields in a pipeline, which can help streamline troubleshooting and lineage analysis.

  7. In the Format field, select text.
    This is the format of the file that we’ll call in the GCS Argument Setter. The Format of the GCS source must match the format of the source file that is called in the GCS Argument Setter.

  8. For Service Account, select JSON.

  9. In the Service Account JSON field, paste the entire contents of the Service Account JSON.

  10. Click Validate to make sure you don't have any errors.

  11. Click the X button to exit the Properties dialog box.

Transform your data

  1. Add a Wrangler transformation to the pipeline.

  2. On the Wrangler transformation, click the Properties button.

  3. In Input field name, enter body.

  4. In the Recipe field, enter ${directives}. This macro controls what the transform logic will be in the different pipeline runs.

  5. Click Validate to make sure you don't have any errors.

  6. Click the X button to exit the Properties dialog box.

Write to Cloud Storage

  1. Add a GCS sink to the pipeline.

  2. On the GCS sink, click the Properties button.

  3. In the Reference name field, enter a name.

  4. In the Path field, enter the path of a Cloud Storage bucket in your project, where your pipeline can write the output files. If you don't have a Cloud Storage bucket, create one.

  5. For Service Account, select JSON.

  6. In the Service Account JSON field, paste the entire contents of the Service Account JSON.

  7. In the Format field, select json.
    Note: You can select any output format, but for this tutorial, we’ll write to a json file.

  8. Click Validate to make sure you don't have any errors.

  9. Click the X button to exit the Properties menu.

Add the argument setter

  1. From the Plugin Palette, under Conditions and Actions, select the GCS Argument Setter plugin.

  2. In the Pipeline Studio canvas, connect the GCS Argument Setter to the GCS batch source.

  3. On the GCS Argument Setter plugin, click the Properties button.

  4. In the Project ID field, enter your Google Cloud Project ID.

  5. In the Path field, enter the path to the args.json file you uploaded to your bucket.
    The file contains the following content:

    {   "arguments" : [     {       "name": "input.path",       "value": "gs://reusable-pipeline-tutorial/user-emails.txt"     },     {       "name": "directives",       "value": "send-to-error !dq:isEmail(body)"     }   ] }

    The first of the two arguments is the value for input.path. The path gs://reusable-pipeline-tutorial/user-emails.txt is a publicly accessible object in Cloud Storage that contains the following test data:

    alice@example.com bob@example.com craig@invalid@example.com

    The second argument is the value for directives. The value send-to-error !dq:isEmail(body) configures Wrangler to filter out any lines that are not a valid email address. For example, craig@invalid@example.com is filtered out.

  6. In the Service Account Type field, select JSON.

  7. In the Service Account JSON field, paste the entire contents of the Service Account JSON.

Deploy and run your pipeline

  1. In the top bar of the Pipeline Studio page, click Name your pipeline. Name your pipeline and click Save.

  2. Click Deploy.

  3. Next to Run, click the drop-down menu arrow_drop_down to open the Runtime Arguments and view the macro (runtime) input.path and directives arguments. Leave the value fields blank to notify CDAP that the GCS Argument Setter plugin in the pipeline will set the values of these arguments during runtime.

  4. Click Run.

Cleaning up

To avoid incurring charges to your Google Cloud account for the resources used in this tutorial, either delete the project that contains the resources, or keep the project and delete the individual resources.

After you've finished the tutorial, clean up the resources you created on Google Cloud so they won't take up quota and you won't be billed for them in the future. The following sections describe how to delete or turn off these resources.

Delete the project

The easiest way to eliminate billing is to delete the project that you created for the tutorial.

To delete the project:

Caution: Deleting a project has the following effects:

  • Everything in the project is deleted. If you used an existing project for this tutorial, when you delete it, you also delete any other work you've done in the project.

  • Custom project IDs are lost. When you created this project, you might have created a custom project ID that you want to use in the future. To preserve the URLs that use the project ID, such as an appspot.com URL, delete selected resources inside the project instead of deleting the whole project.

  1. In the Cloud Console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.

  3. In the dialog, type the project ID, and then click Shut down to delete the project.

Related Topics

Google Cloud Storage Argument Setter Action

Reusable Pipelines Best Practices

 

Created in 2020 by Google Inc.