Running a Data Pipeline (Developers)

Data pipelines can be started, stopped, and controlled using:

  • Pipelines Studio

  • CDAP CLI

  • Lifecycle Microservices

Running a Data Pipeline in the Pipeline Studio

In the Pipelines Studio, you can start and stop pipelines.

For a batch pipeline, you can start or stop ("suspend") its schedule. It will then begin at the next scheduled time. (To change the schedule requires creating a new pipeline with a new schedule.)

For a real-time pipeline, you simply start or stop the pipeline.

You can set the arguments used to start the pipeline (both as preferences and runtime arguments) and view details of a pipeline, such as its configuration, settings, stages, schemas, and logs.

Runtime Arguments

You might want to create a pipeline that has several configuration settings that are not known at pipeline creation time, but that are set at the start of the each pipeline run.

For instance, you might want a pipeline that reads from a database (a source) and writes to a file (a sink). The name of the database source and name of the file sink might change from run to run and you need to specify those values as input before starting a run.

You might want to create a pipeline with a particular action at the start of the run. The action could, based on some logic, provide the name of the database to use as a source and the name of the file to write as a sink. The next stage in the pipeline might use this information to read and write from appropriate sources and sinks.

To do this, CDAP supports the use of macros that will, at runtime, be evaluated and substituted for. The macros support recursive (nested) expansion and use a simple syntax. These macros are defined in the pipeline configuration and in the runtime arguments.

Runtime arguments are resolved by sourcing them from the application preferences, the runtime arguments, and Argument Setter plugins. Precedence is with the Argument Setter having the highest precedence, then runtime arguments, and the application preferences.

These arguments and preferences can be set with the Pipeline Studio, the CDAP CLI, the Lifecycle Microservices, or Preferences Microservices.

For more information, see Setting Preferences and Using Macros to Create a Dynamic Data Pipeline.

Re-running a Data Pipeline

When a data pipeline is re-run, any previously-set runtime arguments will have been discarded and will need to be set again, if required. Any previously-set application preferences will be retained and re-used.

Notifications for Batch Data Pipelines

When a batch data pipeline run is completed, notifications can be sent by using one or more post-run actions. These are set in the pipeline configuration.

These post-run plugins allow for emails, database queries, and a general HTTP callback action. You can also download additional post-run plugins from the Hub.

Logs

As pipelines run, they create entries in the CDAP logs.

Logs for a pipeline can be obtained using the same tools as any other CDAP application and program, and are described in Logging and Monitoring and the Logging Microservices.

Script transform steps can write to logs, as described in the section in developing plugins on script transformations.

Metrics

As pipelines run, they create both system and user metrics.

System metrics for a pipeline can be obtained using the same tools as any other CDAP application and program, and are described in Metrics and Metrics Microservices.

Script transform steps can create metrics, as described in the section in developing plugins on script transformations.

For instance, if you have a real-time pipeline named "demoPipeline" with three stages (File, JavaScript, and Table), you can discover the available metrics using a curl command, such as (reformatted for display):

Linux

$ curl -w"\n" -X POST "http://localhost:11015/v3/metrics/search?target=metric&tag=namespace:default&tag=app:demoPipeline"

["system.app.log.debug","system.app.log.info","system.app.log.warn","system.dataset. store.bytes","system.dataset.store.ops","system.dataset.store.reads","system.dataset. store.writes","system.metrics.emitted.count","user.File.records.out","user. JavaScript.record.count","user.JavaScript.records.in","user.JavaScript.records.out"," user.Table.records.in","user.Table.records.out","user.metrics.emitted.count"]

Windows

> curl -X POST "http://localhost:11015/v3/metrics/search?target=metric&tag=namespace:default&tag=app:demoPipeline"

["system.app.log.debug","system.app.log.info","system.app.log.warn","system.dataset. store.bytes","system.dataset.store.ops","system.dataset.store.reads","system.dataset. store.writes","system.metrics.emitted.count","user.File.records.out","user. JavaScript.record.count","user.JavaScript.records.in","user.JavaScript.records.out"," user.Table.records.in","user.Table.records.out","user.metrics.emitted.count"]

In this case, the user metric "user.JavaScript.record.count" was incremented in the JavaScript stage using:

context.getMetrics().count('record.count', 1);

The value of the metric can be retrieved with:

Linux

$ curl -w"\n" -X POST "localhost:11015/v3/metrics/query?tag=namespace:default&tag=app:etlRealtime6&metric=user.JavaScript.record.count&aggregate=true" {"startTime":0,"endTime":1468884338,"series":[{"metricName":"user.JavaScript.record. count","grouping":{},"data":[{"time":0,"value":170}]}],"resolution":"2147483647s"}

Windows

> curl -X POST "localhost:11015/v3/metrics/query?tag=namespace:default&tag=app:etlRealtime6&metric=user.JavaScript.record.count&aggregate=true" {"startTime":0,"endTime":1468884338,"series":[{"metricName":"user.JavaScript.record. count","grouping":{},"data":[{"time":0,"value":170}]}],"resolution":"2147483647s"}

Using the CDAP CLI, you can retrieve the value with:

cdap > get metric value user.JavaScript.record.count 'app=demoPipeline'

Start time: 0
End time: 1468884640 Series: user.JavaScript.record.count

+===================+
| timestamp | value |
+===================+
| 0 | 170 |
+===================+

Error Record Handling

To handle the problem of validating records and handling any subsequent errors, certain transform stages are available that can check that a record matches specified criteria. Records that fail the criteria can be discarded, and appropriate messages written to a configurable error dataset.

These transform plugins support error record handling:

  • JavaScript

  • Python Evaluator

  • XML Parser

Configuring Resources

Resources for pipelines can be configured as any other CDAP workflow application.

See Scaling Instances and Resource Guarantees for CDAP Programs in YARN.

Created in 2020 by Google Inc.