Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Workflows are used to automate the execution of a series of MapReduce, Spark or custom  Spark, or custom actions. It allows for both sequential and parallel execution of programs.

...

Workflows can be controlled by the CDAP CLI and the Lifecycle Microservices. The status of a workflow can be retrieved, workflows started or stopped, and individual runs of a workflow suspended or resumed.

...

Code Block
public void configure() {
  ...
  addMapReduce(new MyMapReduce());
  addMapReduce(new AnotherMapReduce());
  addSpark(new MySpark());
  addWorkflow(new MyWorkflow());
  schedule(
    buildSchedule("FiveHourSchedule", ProgramType.WORKFLOW, "MyWorkflow")
      .setDescription("Schedule running every 5 hours")
      .triggerByTime("0 */5 * * *"));
  ...
}

You'll then extend the AbstractWorkflow class and implement the configure() method. Inside configure, you can add multiple MapReduce, Spark programs, or custom actions to the workflow. The programs will be executed in the order they are specified in the configure method:

Code Block
public static class MyWorkflow extends AbstractWorkflow {

  @Override
  public void initialize(WorkflowContext context) throws Exception {
    // Invoked before the Workflow run starts
    super.initialize(context);
  }

  @Override
  public void configure() {
      setName("MyWorkflow");
      setDescription("MyWorkflow description");
      addMapReduce("MyMapReduce");
      addSpark("MySpark");
      addMapReduce("AnotherMapReduce");
      addAction(new MyAction());
  }

  @Override
  public void destroy() {
    // Invoked after the execution of the Workflow
    // Determine the status of the Workflow
    boolean isWorkflowSuccessful = getContext().getState().getStatus() == ProgramStatus.COMPLETED;

    // Get the state of all nodes that were executed as a part of this Workflow run.
    Map<String, WorkflowNodeState> nodeStates = getContext().getNodeStates();
  }
}

In this example, the MyWorkflow will be executed every 5 hours. During each execution of the workflow, the MyMapReduceMySpark, and AnotherMapReduce programs and the MyAction custom action will be executed in order.

...

Code Block
public class WorkflowWithLocalDatasets extends AbstractWorkflow {
  @Override
  protected void configure() {
    ...
    createLocalDataset("WordCount", KeyValueTable.class);
    createLocalDataset("CSVData", FileSet.class,FileSetProperties.builder()
    .setInputFormat(TextInputFormat.class)
    .setOutputFormat(TextOutputFormat.class).build());
    ...
  }
}

WordCount and CSVData are configured as local datasets for a workflow. For every workflow run, these datasets will be created and they will be named as WordCount.<unique_id> and CSVData.<unique_id>. Once the run is complete they will be deleted by the workflow system.

...

When an action is started by a workflow, the action always receives a non-null instance of the WorkflowToken. However, when a MapReduce or Spark program is started directly (outside of a workflow), the WorkflowToken received from the context of the program is null. Since custom actions are always started by a workflow, they will always receive a non-null WorkflowToken instance.

Scope

Two scopes—scopes, System and User—are , are provided for workflow keys. CDAP adds keys (such as MapReduce counters) under the System scope. Keys added by user programs are stored under the User scope.

...

In the case of a MapReduce program, the program's Mapper and Reducer classes need to implement ProgramLifecycle<MapReduceTaskContext>. After doing so, they can access the workflow token in either the initialize or destroy methods. To access it in the map or reduce methods, you would need to cache a reference to the workflow token object as a class member in the initialize() method. This is because the context object passed to those methods is a Hadoop class that is unaware of CDAP and its workflow tokens.

Note: The test of workflowToken != null is only required because this Reducer could be used outside of a workflow. When run from within a workflow, the token is guaranteed to be non-null.

The WorkflowToken Java API includes methods for getting values for different keys, scopes, and nodes. The same key can be added to the workflow by different nodes, and there are methods to return a map of those key-value pairs. Convenience methods allow the putting and getting of non-string values through the use of the class Value.

...