Developing Pipelines
This section is intended for developers creating pipelines using command line tools.
There are three different methods for creating pipeline applications:
Pipeline Studio. To create and manage pipelines through the Pipeline Studio, see the Data Pipeline User Guide.
Command line tools (such as
curl
or the CDAP CLI).Lifecycle Microservices.
In order to create a pipeline application, a pipeline configuration (either as a file or in-memory) is required. It specifies the configuration of plugins to be used along with their properties.
Pipeline Configuration File Format
The configuration file format is JSON and, at the top level, contains the following required objects:
Object | Type | Description |
---|---|---|
|
| Name of the pipeline. |
|
| Specifies the artifact used to create the pipeline. |
|
| Configuration of the pipeline. |
Example (in JSON format):
{
"name": "MyPipeline",
"artifact": {
"name": "cdap-data-pipeline",
"version": "6.3.0",
"scope": "SYSTEM"
},
"config": {
. . .
"connections": [ . . . ],
"engine": "spark",
"postActions": [ . . . ],
"stages": [ . . . ],
"schedule": "0 * * * *",
},
}
Artifact Format
The format of an artifact
object:
Key | Description |
---|---|
| Name of the artifact. For example, |
| Allowable version(s) of the artifact. |
| Scope of the artifact, one of either |
Configuration Format
The format of the config
object:
Key | Description |
---|---|
| List of |
| One of "mapreduce" or "spark". Used only in batch pipelines. |
| List of |
| Positive integer of the number of worker instances. Defaults to 1 if not specified. Used only in real-time pipelines. |
| A map defining the resources to be used for worker instance of pipeline, such as |
| A map defining the resources to be used for a database driver, such as username or password. |
| String in |
| List of |
The format of the postAction
object is identical to that of a stage
object, as they are both a JSON representation of a plugin. However, only plugins of type postAction
can be included in the list for postActions
. Each postAction
object must have a unique name.
The format of a connection
object:
Key | Description |
---|---|
| String name of |
| String name of |
The format of stage
and postAction
objects:
Key | Description |
---|---|
| String name. These must be unique to distinguish stages in the |
| Plugin object. |
| Name of a dataset that any error messages will be written to. Used by validating transformation stages. |
The format of a plugin
object:
Key | Description |
---|---|
| String name identifying the plugin. |
| String type of plugin. |
| Optional artifact object. If not included or defined, the highest version available is used. |
| Map of properties, contents of which are determined by the particular plugin used. |
Creating a Batch Pipeline
With a CDAP batch pipeline, a schedule
property is required with a cron
entry specifying the frequency of the batch job run, such as every day or every hour.
For example, this JSON (when in a file such as config.json
) provides a configuration for a batch pipeline that runs every minute, reading data from a File source called myFileSource and writing to a File sink called myFileSink, without any transformations. When the run completes, a post-run action sends an email indicating that the run has completed:
{
"name": "ETLApp",
"artifact": {
"name": "cdap-data-pipeline",
"version": "6.3.0",
"scope": "SYSYEM"
},
"config": {
"schedule": "* * * * *",
"engine": "spark",
"postActions": [
{
"name": "Email",
"plugin": {
"name": "Email",
"type": "postaction",
"artifact": {
"name": "core-plugins",
"version": "2.5.0",
"scope": "SYSTEM"
},
"properties": {
"runCondition": "completion",
"includeWorkflowToken": "false",
"recipients": "users@example.com",
"sender": "admin@example.com",
"subject": "Post-action Status",
"message": "Completed run."
}
}
}
],
"stages": [
{
"name": "myFileSource",
"plugin": {
"name": "File",
"type": "batchsource",
"artifact": {
"name": "core-plugins",
"version": "2.5.0",
"scope": "SYSTEM"
},
"properties": {
"name": "myFileSource",
"duration": "1m",
"format": "text",
"schema": "{\"type\":\"record\",\"name\":\"etlSchemaBody\",\"fields\":[{\"name\":\"body\",\"type\":\"string\"}]}"
}
}
},
{
"name": "myFileSink",
"plugin": {
"name": "File",
"type": "batchsink",
"artifact": {
"name": "core-plugins",
"version": "2.5.0",
"scope": "SYSTEM"
},
"properties": {
"name": "myFileSink",
"schema": "{\"type\":\"record\",\"name\":\"etlSchemaBody\",\"fields\":[{\"name\":\"ts\",\"type\":\"long\"},{\"name\":\"body\",\"type\":\"string\"}]}",
"format": "csv"
}
}
}
],
"connections": [
{
"from": "myFileSource",
"to": "myFileSink"
}
]
}
}
This pipeline launches a Spark program that runs every minute, reads data from the source
myFileSource and writes to a sink
myFileSink. A File Sink needs a row key field to be specified and can use the timestamp of a source event for that.
A pipeline configuration (the config
object) consists of stages, connections, and other properties. The stages consist of a single source, zero (or more) transforms, and one (or more) sink(s). Each of these stages is identified by a unique name and the plugin to be used.
A plugin
object is specified by a plugin-name, type, a properties map and can (optionally) specify the artifact. If the artifact is not specified, the pipeline will choose the artifact with the highest version.
The connections field in the configuration defines the connections between the stages, using the unique names of the stages. The pipeline defined by these connections must be a directed acyclic graph (or DAG).
To create this pipeline, called ETLApp, you can use either Microservices or the CDAP CLI.
Using the Lifecycle Microservices:
Linux
$ curl -w"\n" -X PUT localhost:11015/v3/namespaces/default/apps/ETLApp \ -H "Content-Type: application/json" -d @config.json
Deploy Complete
Windows
> curl -X PUT localhost:11015/v3/namespaces/default/apps/ETLApp ^ -H "Content-Type: application/json" -d @config.json
Deploy Complete
Using the CDAP CLI:
Linux
$ cdap cli create app ETLApp cdap-data-pipeline 6.3.0 system <path-to-config.json>
Successfully created application
Windows
> cdap cli create app ETLApp cdap-data-pipeline 6.3.0 system <path-to-config.json> Successfully created application
where, in both cases, config.json
is the file that contains the pipeline configuration shown above.
Creating a Real-Time Pipeline
With a CDAP real-time pipeline, a batchInterval
property is required specifying the frequency at which the sources will create new micro batches of data. The batchInterval
must be a number followed by a time unit, with 's' for seconds, 'm' for minutes, and 'h' for hours. For example, a value of '10s' will be interpreted as ten seconds. This means that every 10 seconds, a new micro batch of data will be generated.
To create a real-time pipeline that reads from a source such as Twitter and writes to a CDAP Table Dataset sink after performing a projection transformation, you can use a configuration such as:
{
"artifact": {
"name": "cdap-data-streams",
"version": "6.3.0",
"scope": "SYSTEM"
},
"name": "twitterToStream",
"config": {
"batchInterval": "10s",
"stages": [
{
"name": "twitterSource",
"plugin": {
"name": "Twitter",
"type": "streamingsource",
"artifact": {
"name": "spark-plugins",
"version": "2.5.0",
"scope": "SYSTEM"
},
"properties": {
"referenceName": "xxx",
"AccessToken": "xxx",
"AccessTokenSecret": "xxx",
"ConsumerKey": "xxx",
"ConsumerSecret": "xxx"
}
}
},
{
"name": "dropProjector",
"plugin": {
"name": "Projection",
"type": "transform",
"artifact": {
"name": "core-plugins",
"version": "2.5.0",
"scope": "SYSTEM"
},
"properties": {
"drop": "lang,time,favCount,source,geoLat,geoLong,isRetweet",
"rename": "message:tweet,rtCount:retCount"
}
}
},
{
"name": "tableSink",
"plugin": {
"name": "Table",
"type": "batchsink",
"artifact": {
"name": "core-plugins",
"version": "2.5.0",
"scope": "SYSTEM"
},
"properties": {
"name": "tableSink",
"schema": "{\"type\":\"record\",\"name\":\"etlSchemaBody\",\"fields\":[{\"name\":\"id\",\"type\":\"long\"},{\"name\":\"tweet\",\"type\":\"string\"},{\"name\":\"retCount\",\"type\":\"int\"}]}",
"schema.row.field": "id"
}
}
}
],
"connections": [
{
"from": "twitterSource",
"to": "dropProjector"
},
{
"from": "dropProjector",
"to": "tableSink"
}
]
}
}
In the example code above, we will use a Projection Transform (a type of transform) to drop and rename selected columns in the incoming data. A tableSink in the final step requires a schema property that it will use to set the columns and row field for writing the data to a CDAP Table Dataset sink.
Non-linear Executions in Pipelines
CDAP pipelines supports directed acyclic graphs in pipelines, which allows for the non-linear execution of pipeline stages.
Branches in a Pipeline
In this example, a pipeline reads from the source purchaseStats
. It writes the source events to the table replicaTable
, while at the same time it writes just the userIds
to the usersTable
when a user's purchase price is greater than 1000. This filtering logic is applied by using an included script in the step spendingUsersScript
:
Pipeline connections can be configured to branch from a stage, with the output of the stage sent to two or more configured stages. In the above example, the output record from purchaseStats
will be sent to both replicaTable
and spendingUsersScript
stages.
Merging Stages in Pipeline
Branched transform stages can merge together at a transform or a sink stage.
A merge does not join, or modify records in any way. It simply means that multiple stages can write to the same stage. The only requirement is that all stages must output records of the same schema to the merging stage. Note that the order of records sent from the branched stages to the merging stage will not be defined.
In this next example, purchaseStats
has purchase data with fields userid
, item
, count
, and price
. The source events stage purchaseStats
branches, and records are sent to both of the transforms userRewards
and itemRewards
.
The userRewards
transform script looks up valued customers in the table hvCustomers
, to check if userid
is a valued customer and assigns higher rewards if they are. After calculating the rewards, this transform sends an output record in the format userid(string), rewards(double)
.
The itemRewards
transform script awards higher rewards for bulk purchases and sends output records in the same format, userid(string), rewards(double)
.
The rewards records are merged at the sink rewardsSink
; note that the incoming schema from the transforms userRewards
and itemRewards
are the same, and that the order of received records will vary.
Sample Pipeline Configurations
Database Source and Sink
Sample configuration for using a Database Source and a Database Sink:
Kafka Source
A Kafka cluster needs to be available, and certain minimum properties specified when creating the source:
Prebuilt JARs
In a case where you'd like to use prebuilt third-party JARs (such as a JDBC driver) as a plugin, please refer to the section on Deploying Third-Party Jars.
Created in 2020 by Google Inc.