Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 17 Next »

Data pipelines can be started, stopped, and controlled using:

  • Pipelines Studio

  • CDAP CLI

  • Lifecycle Microservices

Running a Data Pipeline in the Pipeline Studio

In the Pipelines Studio, you can start and stop pipelines.

For a batch pipeline, you can start or stop ("suspend") its schedule. It will then begin at the next scheduled time. (To change the schedule requires creating a new pipeline with a new schedule.)

For a real-time pipeline, you simply start or stop the pipeline.

You can set the arguments used to start the pipeline (both as preferences and runtime arguments) and view details of a pipeline, such as its configuration, settings, stages, schemas, and logs.

Runtime Arguments

You might want to create a pipeline that has several configuration settings that are not known at pipeline creation time, but that are set at the start of the each pipeline run.

For instance, you might want a pipeline that reads from a database (a source) and writes to a table (a sink). The name of the database source and name of the table sink might change from run to run and you need to specify those values as input before starting a run.

You might want to create a pipeline with a particular action at the start of the run. The action could, based on some logic, provide the name of the database to use as a source and the name of the table to write as a sink. The next stage in the pipeline might use this information to read and write from appropriate sources and sinks.

To do this, CDAP supports the use of macros that will, at runtime, be evaluated and substituted for. The macros support recursive (nested) expansion and use a simple syntax. These macros are defined in the pipeline configuration and in the runtime arguments.

Runtime arguments are resolved by sourcing them from the application preferences, the runtime arguments, and the workflow token. Precedence is with the workflow token having the highest precedence.

These arguments and preferences can be set with the Pipeline Studio, the CDAP CLI, or the Program Lifecycle and Preferences Microservices.

For more information, see Setting Preferences and Using Macros to Create a Dynamic Data Pipeline.

Re-running a Data Pipeline

When a data pipeline is re-run, any previously-set runtime arguments will have been discarded and will need to be set again, if required. Any previously-set application preferences will be retained and re-used.

Notifications for Batch Data Pipelines

When a batch data pipeline run is completed, notifications can be sent by using one or more post-run actions. These are set in the pipeline configuration.

These post-run plugins allow for emails, database queries, and a general HTTP callback action. You can also download additional post-run plugins from the Hub.

Logs

As pipelines run, they create entries in the CDAP logs.

Logs for a pipeline can be obtained using the same tools as any other CDAP application and program, and are described in Logging and Monitoring and the Logging Microservices.

Script transform steps can write to logs, as described in the section in developing plugins on script transformations.

Metrics

As pipelines run, they create both system and user metrics.

System metrics for a pipeline can be obtained using the same tools as any other CDAP application and program, and are described in Metrics and Metrics Microservices.

Script transform steps can create metrics, as described in the section in developing plugins on script transformations.

For instance, if you have a real-time pipeline named "demoPipeline" with three stages (FileJavaScript, and Table), you can discover the available metrics using a curl command, such as (reformatted for display):

Linux

$ curl -w"\n" -X POST "http://localhost:11015/v3/metrics/search?target=metric&tag=namespace:default&tag=app:demoPipeline"

["system.app.log.debug","system.app.log.info","system.app.log.warn","system.dataset. store.bytes","system.dataset.store.ops","system.dataset.store.reads","system.dataset. store.writes","system.metrics.emitted.count","user.File.records.out","user. JavaScript.record.count","user.JavaScript.records.in","user.JavaScript.records.out"," user.Table.records.in","user.Table.records.out","user.metrics.emitted.count"]

Windows

> curl -X POST "http://localhost:11015/v3/metrics/search?target=metric&tag=namespace:default&tag=app:demoPipeline"

["system.app.log.debug","system.app.log.info","system.app.log.warn","system.dataset. store.bytes","system.dataset.store.ops","system.dataset.store.reads","system.dataset. store.writes","system.metrics.emitted.count","user.File.records.out","user. JavaScript.record.count","user.JavaScript.records.in","user.JavaScript.records.out"," user.Table.records.in","user.Table.records.out","user.metrics.emitted.count"]

In this case, the user metric "user.JavaScript.record.count" was incremented in the JavaScript stage using:

context.getMetrics().count('record.count', 1);

The value of the metric can be retrieved with:

Linux

$ curl -w"\n" -X POST "localhost:11015/v3/metrics/query?tag=namespace:default&tag=app:etlRealtime6&metric=user.JavaScript.record.count&aggregate=true" {"startTime":0,"endTime":1468884338,"series":[{"metricName":"user.JavaScript.record. count","grouping":{},"data":[{"time":0,"value":170}]}],"resolution":"2147483647s"}

Windows

> curl -X POST "localhost:11015/v3/metrics/query?tag=namespace:default&tag=app:etlRealtime6&metric=user.JavaScript.record.count&aggregate=true" {"startTime":0,"endTime":1468884338,"series":[{"metricName":"user.JavaScript.record. count","grouping":{},"data":[{"time":0,"value":170}]}],"resolution":"2147483647s"}

Using the CDAP CLI, you can retrieve the value with:

cdap > get metric value user.JavaScript.record.count 'app=demoPipeline'

Start time: 0
End time: 1468884640 Series: user.JavaScript.record.count

+===================+
| timestamp | value |
+===================+
| 0 | 170 |
+===================+

Error Record Handling

To handle the problem of validating records and handling any subsequent errors, certain transform stages are available that can check that a record matches specified criteria. Records that fail the criteria can be discarded, and appropriate messages written to a configurable error dataset.

These transform plugins support error record handling:

  • JavaScript

  • Python Evaluator

  • XML Parser

Configuring Resources

Resources for pipelines can be configured as any other CDAP workflow application.

See Scaling Instances and Resource Guarantees for CDAP Programs in YARN.

Upgrading a Pipeline

If you want to upgrade 4.2.x pipelines, you can use the ETL upgrade tool packaged with the distributed version of CDAP. You would want to run this tool to upgrade applications that were created with earlier versions of the artifacts, which you want to open in the 6.2.0 version of the Pipeline Studio.

The tool will connect to an instance of CDAP, look for any applications that use 4.2.x versions of the system artifacts, and then update the application to use the 6.2.0 version of those artifacts.

CDAP must be running when you run the command:

$ java -cp /opt/cdap/master/libexec/cdap-etl-tools-6.2.0.jar io.cdap.cdap.etl.tool.UpgradeTool -u http://<host>:<port> -e /tmp/failedUpgrades upgrade

The first argument is the host and port for the CDAP router. The second argument is a directory to write the configurations of any pipelines that could not be upgraded. A pipeline may fail to upgrade if the new version of a plugin used in the pipeline is not backwards compatible. For example, this may happen if the plugin added a new required property.

You can also upgrade just the ETL applications within a specific namespace:

$ java -cp /opt/cdap/master/libexec/cdap-etl-tools-6.2.0.jar io.cdap.cdap.etl.tool.UpgradeTool -u http://<host>:<port> -n <namespace-id> upgrade

You can also upgrade just one ETL application:

$ java -cp /opt/cdap/master/libexec/cdap-etl-tools-6.2.0.jar io.cdap.cdap.etl.tool.UpgradeTool -u http://<host>:<port> -n <namespace-id> -p <app-name> upgrade

If you have authentication turned on, you also need to store an access token in a file and pass the file to the tool:

$ java -cp /opt/cdap/master/libexec/cdap-etl-tools-6.2.0.jar io.cdap.cdap.etl.tool.UpgradeTool -u http://<host>:<port> -a <tokenfile> upgrade

For instance, if you have obtained an access token (as shown in the example in the security documentation <testing-security>) such as:

{"access_token":"AghjZGFwAI7e8p65Uo7OpfG5UrD87psGQE0u0sFDoqxtacdRR5GxEb6bkTypP7mXdqvqqnLmfxOS",
  "token_type":"Bearer","expires_in":86400}

The access token itself (AghjZGFwAI7e8p65Uo7OpfG5UrD87psGQE0u0sFDoqxtacdRR5GxEb6bkTypP7mXdqvqqnLmfxOS) would be placed in a file and then the file's path would be used in the above command.

  • No labels