Developing Pipelines

This section is intended for developers creating pipelines using command line tools.

There are three different methods for creating pipeline applications:

  1. Pipeline Studio. To create and manage pipelines through the Pipeline Studio, see the Data Pipeline User Guide.

  2. Command line tools (such as curl or the CDAP CLI).

  3. Lifecycle Microservices.

In order to create a pipeline application, a pipeline configuration (either as a file or in-memory) is required. It specifies the configuration of plugins to be used along with their properties.

Pipeline Configuration File Format

The configuration file format is JSON and, at the top level, contains the following required objects:

Object

Type

Description

Object

Type

Description

name

String

Name of the pipeline.

artifact

Object

Specifies the artifact used to create the pipeline.

config

Object

Configuration of the pipeline.

Example (in JSON format):

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 { "name": "MyPipeline", "artifact": { "name": "cdap-data-pipeline", "version": "6.3.0", "scope": "SYSTEM" }, "config": { . . . "connections": [ . . . ], "engine": "spark", "postActions": [ . . . ], "stages": [ . . . ], "schedule": "0 * * * *", }, }

Artifact Format

The format of an artifact object:

Key

Description

Key

Description

name

Name of the artifact. For example, database-plugins.

version

Allowable version(s) of the artifact.

scope

Scope of the artifact, one of either USER or SYSTEM.

Configuration Format

The format of the config object:

Key

Description

Key

Description

connections

List of connection objects.

engine

One of "mapreduce" or "spark". Used only in batch pipelines.

postActions

List of postAction (post-run action) objects; optional, can be an empty list.

instances

Positive integer of the number of worker instances. Defaults to 1 if not specified. Used only in real-time pipelines.

resources

A map defining the resources to be used for worker instance of pipeline, such as { "memoryMB": 2048, "virtualCores": 1 }

driverResources

A map defining the resources to be used for a database driver, such as username or password.

schedule

String in cron file format. Used only in batch pipelines.

stages

List of stage objects.

The format of the postAction object is identical to that of a stage object, as they are both a JSON representation of a plugin. However, only plugins of type postAction can be included in the list for postActions. Each postAction object must have a unique name.

The format of a connection object:

Key

Description

Key

Description

from

String name of stage connected from.

to

String name of stage connected to.

The format of stage and postAction objects:

Key

Description

Key

Description

name

String name. These must be unique to distinguish stages in the connections object and post-actions in the postactions list.

plugin

Plugin object.

errorDatasetName

Name of a dataset that any error messages will be written to. Used by validating transformation stages.

The format of a plugin object:

Key

Description

Key

Description

name

String name identifying the plugin.

type

String type of plugin.

artifact

Optional artifact object. If not included or defined, the highest version available is used.

properties

Map of properties, contents of which are determined by the particular plugin used.

Creating a Batch Pipeline

With a CDAP batch pipeline, a schedule property is required with a cron entry specifying the frequency of the batch job run, such as every day or every hour.

For example, this JSON (when in a file such as config.json) provides a configuration for a batch pipeline that runs every minute, reading data from a File source called myFileSource and writing to a File sink called myFileSink, without any transformations. When the run completes, a post-run action sends an email indicating that the run has completed:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 { "name": "ETLApp", "artifact": { "name": "cdap-data-pipeline", "version": "6.3.0", "scope": "SYSYEM" }, "config": { "schedule": "* * * * *", "engine": "spark", "postActions": [ { "name": "Email", "plugin": { "name": "Email", "type": "postaction", "artifact": { "name": "core-plugins", "version": "2.5.0", "scope": "SYSTEM" }, "properties": { "runCondition": "completion", "includeWorkflowToken": "false", "recipients": "users@example.com", "sender": "admin@example.com", "subject": "Post-action Status", "message": "Completed run." } } } ], "stages": [ { "name": "myFileSource", "plugin": { "name": "File", "type": "batchsource", "artifact": { "name": "core-plugins", "version": "2.5.0", "scope": "SYSTEM" }, "properties": { "name": "myFileSource", "duration": "1m", "format": "text", "schema": "{\"type\":\"record\",\"name\":\"etlSchemaBody\",\"fields\":[{\"name\":\"body\",\"type\":\"string\"}]}" } } }, { "name": "myFileSink", "plugin": { "name": "File", "type": "batchsink", "artifact": { "name": "core-plugins", "version": "2.5.0", "scope": "SYSTEM" }, "properties": { "name": "myFileSink", "schema": "{\"type\":\"record\",\"name\":\"etlSchemaBody\",\"fields\":[{\"name\":\"ts\",\"type\":\"long\"},{\"name\":\"body\",\"type\":\"string\"}]}", "format": "csv" } } } ], "connections": [ { "from": "myFileSource", "to": "myFileSink" } ] } }

This pipeline launches a Spark program that runs every minute, reads data from the source myFileSource and writes to a sink myFileSink. A File Sink needs a row key field to be specified and can use the timestamp of a source event for that.

A pipeline configuration (the config object) consists of stages, connections, and other properties. The stages consist of a single source, zero (or more) transforms, and one (or more) sink(s). Each of these stages is identified by a unique name and the plugin to be used.

plugin object is specified by a plugin-name, type, a properties map and can (optionally) specify the artifact. If the artifact is not specified, the pipeline will choose the artifact with the highest version.

The connections field in the configuration defines the connections between the stages, using the unique names of the stages. The pipeline defined by these connections must be a directed acyclic graph (or DAG).

To create this pipeline, called ETLApp, you can use either Microservices or the CDAP CLI.

  • Using the Lifecycle Microservices:

    • Linux
      $ curl -w"\n" -X PUT localhost:11015/v3/namespaces/default/apps/ETLApp \ -H "Content-Type: application/json" -d @config.json
      Deploy Complete

    • Windows
      > curl -X PUT localhost:11015/v3/namespaces/default/apps/ETLApp ^ -H "Content-Type: application/json" -d @config.json
      Deploy Complete

  • Using the CDAP CLI:

    • Linux
      $ cdap cli create app ETLApp cdap-data-pipeline 6.3.0 system <path-to-config.json>
      Successfully created application

    • Windows
      > cdap cli create app ETLApp cdap-data-pipeline 6.3.0 system <path-to-config.json> Successfully created application

where, in both cases, config.json is the file that contains the pipeline configuration shown above.

Creating a Real-Time Pipeline

With a CDAP real-time pipeline, a batchInterval property is required specifying the frequency at which the sources will create new micro batches of data. The batchInterval must be a number followed by a time unit, with 's' for seconds, 'm' for minutes, and 'h' for hours. For example, a value of '10s' will be interpreted as ten seconds. This means that every 10 seconds, a new micro batch of data will be generated.

To create a real-time pipeline that reads from a source such as Twitter and writes to a CDAP Table Dataset sink after performing a projection transformation, you can use a configuration such as:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 { "artifact": { "name": "cdap-data-streams", "version": "6.3.0", "scope": "SYSTEM" }, "name": "twitterToStream", "config": { "batchInterval": "10s", "stages": [ { "name": "twitterSource", "plugin": { "name": "Twitter", "type": "streamingsource", "artifact": { "name": "spark-plugins", "version": "2.5.0", "scope": "SYSTEM" }, "properties": { "referenceName": "xxx", "AccessToken": "xxx", "AccessTokenSecret": "xxx", "ConsumerKey": "xxx", "ConsumerSecret": "xxx" } } }, { "name": "dropProjector", "plugin": { "name": "Projection", "type": "transform", "artifact": { "name": "core-plugins", "version": "2.5.0", "scope": "SYSTEM" }, "properties": { "drop": "lang,time,favCount,source,geoLat,geoLong,isRetweet", "rename": "message:tweet,rtCount:retCount" } } }, { "name": "tableSink", "plugin": { "name": "Table", "type": "batchsink", "artifact": { "name": "core-plugins", "version": "2.5.0", "scope": "SYSTEM" }, "properties": { "name": "tableSink", "schema": "{\"type\":\"record\",\"name\":\"etlSchemaBody\",\"fields\":[{\"name\":\"id\",\"type\":\"long\"},{\"name\":\"tweet\",\"type\":\"string\"},{\"name\":\"retCount\",\"type\":\"int\"}]}", "schema.row.field": "id" } } } ], "connections": [ { "from": "twitterSource", "to": "dropProjector" }, { "from": "dropProjector", "to": "tableSink" } ] } }

In the example code above, we will use a Projection Transform (a type of transform) to drop and rename selected columns in the incoming data. A tableSink in the final step requires a schema property that it will use to set the columns and row field for writing the data to a CDAP Table Dataset sink.

Non-linear Executions in Pipelines

CDAP pipelines supports directed acyclic graphs in pipelines, which allows for the non-linear execution of pipeline stages.

Branches in a Pipeline

In this example, a pipeline reads from the source purchaseStats. It writes the source events to the table replicaTable, while at the same time it writes just the userIds to the usersTable when a user's purchase price is greater than 1000. This filtering logic is applied by using an included script in the step spendingUsersScript:

 

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 { "name": "branchedPipeline", "artifact": { "name": "cdap-data-pipeline", "version": "6.3.0", "scope": "SYSTEM" }, "config": { "schedule": "* * * * *", "engine": "spark", "postActions": [], "stages": [ { "name": "purchaseStats", "plugin": { "name": "File", "type": "batchsource", "artifact": { "name": "core-plugins", "version": "2.5.0", "scope": "SYSTEM" }, "properties": { "name": "testSource", "duration": "1d", "format": "csv", "schema": "{\"type\":\"record\",\"name\":\"etlSchemaBody\",\"fields\":[{\"name\":\"userId\",\"type\":\"string\"},{\"name\":\"purchaseItem\",\"type\":\"string\"},{\"name\":\"purchasePrice\",\"type\":\"long\"}]}" } } }, { "name": "replicaTable", "plugin": { "name": "Table", "type": "batchsink", "artifact": { "name": "core-plugins", "version": "2.5.0", "scope": "SYSTEM" }, "properties": { "name": "replicaTable", "schema": "{\"type\":\"record\",\"name\":\"etlSchemaBody\",\"fields\":[{\"name\":\"userId\",\"type\":\"string\"},{\"name\":\"purchaseItem\",\"type\":\"string\"},{\"name\":\"purchasePrice\",\"type\":\"long\"}]}", "schema.row.field": "userId" } } }, { "name": "usersTable", "plugin": { "name": "Table", "type": "batchsink", "artifact": { "name": "core-plugins", "version": "2.5.0", "scope": "SYSTEM" }, "properties": { "name": "targetCustomers", "schema": "{\"type\":\"record\",\"name\":\"etlSchemaBody\",\"fields\":[{\"name\":\"userId\",\"type\":\"string\"}]}", "schema.row.field": "userId" } } }, { "name": "spendingUsersScript", "plugin": { "name": "JavaScript", "type": "transform", "artifact": { "name": "core-plugins", "version": "2.5.0", "scope": "SYSTEM" }, "properties": { "script": "function transform(input, emitter, context) {\n if (input.purchasePrice > 1000) {\n emitter.emit(input);\n } \n}" } } } ], "connections": [ { "from": "purchaseStats", "to": "replicaTable" }, { "from": "purchaseStats", "to": "spendingUsersScript" }, { "from": "spendingUsersScript", "to": "usersTable" } ] } }

Pipeline connections can be configured to branch from a stage, with the output of the stage sent to two or more configured stages. In the above example, the output record from purchaseStats will be sent to both replicaTable and spendingUsersScript stages.

Merging Stages in Pipeline

Branched transform stages can merge together at a transform or a sink stage.

A merge does not join, or modify records in any way. It simply means that multiple stages can write to the same stage. The only requirement is that all stages must output records of the same schema to the merging stage. Note that the order of records sent from the branched stages to the merging stage will not be defined.

In this next example, purchaseStats has purchase data with fields useriditemcount, and price. The source events stage purchaseStats branches, and records are sent to both of the transforms userRewards and itemRewards.

The userRewards transform script looks up valued customers in the table hvCustomers, to check if userid is a valued customer and assigns higher rewards if they are. After calculating the rewards, this transform sends an output record in the format userid(string), rewards(double).

The itemRewards transform script awards higher rewards for bulk purchases and sends output records in the same format, userid(string), rewards(double).

The rewards records are merged at the sink rewardsSink; note that the incoming schema from the transforms userRewards and itemRewards are the same, and that the order of received records will vary.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 { "name": "mergedPipeline", "artifact": { "name": "cdap-data-pipeline", "version": "6.3.0", "scope": "SYSTEM" }, "config": { "schedule": "* * * * *", "engine": "spark", "postActions": [], "stages": [ { "name": "purchaseStats", "plugin": { "name": "File", "type": "batchsource", "artifact": { "name": "core-plugins", "version": "2.5.0", "scope": "SYSTEM" }, "properties": { "format": "csv", "schema": "{ \"type\":\"record\", \"name\":"\etlSchemaBody\", \"fields\":[ {\"name\":\"userid\",\"type\":\"string\"}, {\"name\":\"item\",\"type\":\"string\"}, {\"name\":\"count\",\"type\":\"int\"}, {\"name\":\"price\",\"type\":\"long\"} ] }", "name": "purchases", "duration": "1d" } } }, { "name": "userRewards", "plugin": { "name": "Script", "type": "transform", "artifact": { "name": "core-plugins", "version": "2.5.0", "scope": "SYSTEM" }, "properties": { "script": "function transform(input, context) { var rewards = 5; if (context.getLookup('hvCustomers').lookup(input.userid) !== null) { context.getLogger().info("user " + input.userid + " is a valued customer"); rewards = 100; } else { context.getLogger().info("user " + input.userid + " is not a valued customer"); } return {'userid': input.userid, 'rewards': rewards}; }", "schema": "{ \"type\":\"record\", \"name\":\"etlSchemaBody\", \"fields\":[ {\"name\":\"userid\",\"type\":\"string\"}, {\"name\":\"rewards\",\"type\":\"double\"} ] }", "lookup": "{"tables":{"hvCustomers":{"type":"DATASET","datasetProperties":{}}}}" } } }, { "name": "itemRewards", "plugin": { "name": "Script", "type": "transform", "artifact": { "name": "core-plugins", "version": "2.5.0", "scope": "SYSTEM" }, "properties": { "script": "function transform(input, context) { var rewards = 5; if (input.count > 20) { rewards = 50; } return {'userid':input.userid, 'rewards':rewards}; }", "schema": "{ \"type\":\"record\", \"name\":\"etlSchemaBody\", \"fields\":[ {\"name\":\"userid\",\"type\":\"string\"}, {\"name\":\"rewards\",\"type\":\"double\"} ] }" } } }, { "name": "rewardsSink", "plugin": { "name": "TPFSAvro", "type": "batchsink", "artifact": { "name": "core-plugins", "version": "2.5.0", "scope": "SYSTEM" }, "properties": { "schema": "{ \"type\":\"record\", \"name\":\"etlSchemaBody\", \"fields\":[ {\"name\":\"userid\",\"type\":\"string\"}, {\"name\":\"rewards\",\"type\":\"double\"} ] }" } } } ], "connections": [ { "from": "purchaseStats", "to": "userRewards" }, { "from": "userRewards", "to": "rewardsSink" }, { "from": "purchaseStats", "to": "itemRewards" }, { "from": "itemRewards", "to": "rewardsSink" } ], } }

Sample Pipeline Configurations

Database Source and Sink

Sample configuration for using a Database Source and a Database Sink:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 { "artifact": { "name": "cdap-data-pipeline", "version": "6.3.0", "scope": "SYSTEM" }, "config": { "schedule": "* * * * *", "stages": [ { "name": "databaseSource", "plugin": { "name": "Database", "type": "batchsource", "artifact": { "name": "core-plugins", "version": "2.5.0", "scope": "SYSTEM" }, "properties": { "importQuery": "select id,name,age from my_table", "countQuery": "select count(id) from my_table", "connectionString": "jdbc:mysql://localhost:3306/test", "tableName": "src_table", "user": "my_user", "password": "my_password", "jdbcPluginName": "jdbc_plugin_name_defined_in_jdbc_plugin_json_config", "jdbcPluginType": "jdbc_plugin_type_defined_in_jdbc_plugin_json_config" } } }, { "name": "databaseSink", "plugin": { "name": "Database", "type": "batchsink", "artifact": { "name": "core-plugins", "version": "2.5.0", "scope": "SYSTEM" }, "properties": { "columns": "id,name,age", "connectionString": "jdbc:mysql://localhost:3306/test", "tableName": "dest_table", "user": "my_user", "password": "my_password", "jdbcPluginName": "jdbc_plugin_name_defined_in_jdbc_plugin_json_config", "jdbcPluginType": "jdbc_plugin_type_defined_in_jdbc_plugin_json_config" } } } ], "connections": [ { "from": "databaseSource", "to": "databaseSink" } ] } }

Kafka Source

A Kafka cluster needs to be available, and certain minimum properties specified when creating the source:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 { "name": "KafkaPipeline", "artifact": { "name": "cdap-data-streams", "version": "6.3.0", "scope": "SYSTEM" }, "config": { "batchInterval": "1s", "connections": [ { "from": "kafkaSource", "to": "tableSink" } ], "stages": [ { "name": "kafkaSource", "plugin": { "name": "Kafka", "type": "streamingsource", "artifact": { "name": "kafka-plugins", "version": "2.5.0", "scope": "USER" }, "properties": { "referenceName": "purchasesTopic", "brokers": "localhost:9092", "topics": "purchases", "format": "csv", "schema": "{\"type\":\"record\",\"name\":\"etlSchemaBody\",\"fields\":[{\"name\":\"id\",\"type\":\"long\"},{\"name\":\"customer\",\"type\":\"string\"},{\"name\":\"item\",\"type\":\"string\"}]}" } } }, { "name": "tableSink", "plugin": { "name": "Table", "type": "batchsink", "artifact": { "name": "core-plugins", "version": "2.5.0", "scope": "SYSTEM" }, "properties": { "name": "myTable", "schema": "{\"type\":\"record\",\"name\":\"etlSchemaBody\",\"fields\":[{\"name\":\"id\",\"type\":\"long\"},{\"name\":\"customer\",\"type\":\"string\"},{\"name\":\"item\",\"type\":\"string\"}]}" "schema.row.field": "id" } } } ] } }

Prebuilt JARs

In a case where you'd like to use prebuilt third-party JARs (such as a JDBC driver) as a plugin, please refer to the section on Deploying Third-Party Jars.