Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

CDAP pipelines supports directed acyclic graphs in pipelines, which allows for the non-linear execution of pipeline stages.

...

Branches in a Pipeline

In this example, a pipeline reads from the source purchaseStats. It writes the source events to the table replicaTable, while at the same time it writes just the userIds to the usersTable when a user's purchase price is greater than 1000. This filtering logic is applied by using an included script in the step spendingUsersScript:

...

Code Block
{
  "name": "forkedPipelinebranchedPipeline",
  "artifact": {
    "name": "cdap-data-pipeline",
    "version": "6.3.0",
    "scope": "SYSTEM"
  },
  "config": {
    "schedule": "* * * * *",
    "engine": "spark",
    "postActions": [],
    "stages": [
      {
        "name": "purchaseStats",
        "plugin": {
          "name": "File",
          "type": "batchsource",
          "artifact": {
            "name": "core-plugins",
            "version": "2.5.0",
            "scope": "SYSTEM"
          },
          "properties": {
            "name": "testSource",
            "duration": "1d",
            "format": "csv",
            "schema": "{\"type\":\"record\",\"name\":\"etlSchemaBody\",\"fields\":[{\"name\":\"userId\",\"type\":\"string\"},{\"name\":\"purchaseItem\",\"type\":\"string\"},{\"name\":\"purchasePrice\",\"type\":\"long\"}]}"
          }
        }
      },
      {
        "name": "replicaTable",
        "plugin": {
          "name": "Table",
          "type": "batchsink",
          "artifact": {
            "name": "core-plugins",
            "version": "2.5.0",
            "scope": "SYSTEM"
          },
          "properties": {
            "name": "replicaTable",
            "schema": "{\"type\":\"record\",\"name\":\"etlSchemaBody\",\"fields\":[{\"name\":\"userId\",\"type\":\"string\"},{\"name\":\"purchaseItem\",\"type\":\"string\"},{\"name\":\"purchasePrice\",\"type\":\"long\"}]}",
            "schema.row.field": "userId"
          }
        }
      },
      {
        "name": "usersTable",
        "plugin": {
          "name": "Table",
          "type": "batchsink",
          "artifact": {
            "name": "core-plugins",
            "version": "2.5.0",
            "scope": "SYSTEM"
          },
          "properties": {
            "name": "targetCustomers",
            "schema": "{\"type\":\"record\",\"name\":\"etlSchemaBody\",\"fields\":[{\"name\":\"userId\",\"type\":\"string\"}]}",
            "schema.row.field": "userId"
          }
        }
      },
      {
        "name": "spendingUsersScript",
        "plugin": {
          "name": "JavaScript",
          "type": "transform",
          "artifact": {
            "name": "core-plugins",
            "version": "2.5.0",
            "scope": "SYSTEM"
          },
          "properties": {
            "script": "function transform(input, emitter, context) {\n  if (input.purchasePrice > 1000) {\n  emitter.emit(input);\n  }  \n}"
          }
        }
      }
    ],
   "connections": [
      {
        "from": "purchaseStats",
        "to": "replicaTable"
      },
      {
        "from": "purchaseStats",
        "to": "spendingUsersScript"
      },
      {
        "from": "spendingUsersScript",
        "to": "usersTable"
      }
    ]
  }
}

Pipeline connections can be configured to fork branch from a stage, with the output of the stage sent to two or more configured stages; in . In the above example, the output record from purchaseStats will be sent to both replicaTable and spendingUsersScript stages.

Merging Stages in Pipeline

Forked Branched transform stages can merge together at a transform or a sink stage.

A merge does not join, or modify records in any way. It simply means that multiple stages can write to the same stage. The only requirement is that all stages must output records of the same schema to the merging stage. Note that the order of records sent from the forked branched stages to the merging stage will not be defined.

In this next example, purchaseStats has purchase data with fields useriditemcount, and price. The source events stage purchaseStats forks branches, and records are sent to both of the transforms userRewards and itemRewards.

...