Workflows are used to automate the execution of a series of MapReduce, Spark or custom Spark, or custom actions. It allows for both sequential and parallel execution of programs.
...
Workflows can be controlled by the CDAP CLI and the Lifecycle Microservices. The status of a workflow can be retrieved, workflows started or stopped, and individual runs of a workflow suspended or resumed.
...
Code Block |
---|
public void configure() {
...
addMapReduce(new MyMapReduce());
addMapReduce(new AnotherMapReduce());
addSpark(new MySpark());
addWorkflow(new MyWorkflow());
schedule(
buildSchedule("FiveHourSchedule", ProgramType.WORKFLOW, "MyWorkflow")
.setDescription("Schedule running every 5 hours")
.triggerByTime("0 */5 * * *"));
...
}
|
You'll then extend the AbstractWorkflow
class and implement the configure()
method. Inside configure
, you can add multiple MapReduce, Spark programs, or custom actions to the workflow. The programs will be executed in the order they are specified in the configure
method:
Code Block |
---|
public static class MyWorkflow extends AbstractWorkflow {
@Override
public void initialize(WorkflowContext context) throws Exception {
// Invoked before the Workflow run starts
super.initialize(context);
}
@Override
public void configure() {
setName("MyWorkflow");
setDescription("MyWorkflow description");
addMapReduce("MyMapReduce");
addSpark("MySpark");
addMapReduce("AnotherMapReduce");
addAction(new MyAction());
}
@Override
public void destroy() {
// Invoked after the execution of the Workflow
// Determine the status of the Workflow
boolean isWorkflowSuccessful = getContext().getState().getStatus() == ProgramStatus.COMPLETED;
// Get the state of all nodes that were executed as a part of this Workflow run.
Map<String, WorkflowNodeState> nodeStates = getContext().getNodeStates();
}
}
|
In this example, the MyWorkflow
will be executed every 5 hours. During each execution of the workflow, the MyMapReduce
, MySpark
, and AnotherMapReduce
programs and the MyAction
custom action will be executed in order.
...
Code Block |
---|
public class WorkflowWithLocalDatasets extends AbstractWorkflow {
@Override
protected void configure() {
...
createLocalDataset("WordCount", KeyValueTable.class);
createLocalDataset("CSVData", FileSet.class,FileSetProperties.builder()
.setInputFormat(TextInputFormat.class)
.setOutputFormat(TextOutputFormat.class).build());
...
}
}
|
WordCount
and CSVData
are configured as local datasets for a workflow. For every workflow run, these datasets will be created and they will be named as WordCount.<unique_id>
and CSVData.<unique_id>
. Once the run is complete they will be deleted by the workflow system.
...
When an action is started by a workflow, the action always receives a non-null instance of the WorkflowToken. However, when a MapReduce or Spark program is started directly (outside of a workflow), the WorkflowToken received from the context of the program is null. Since custom actions are always started by a workflow, they will always receive a non-null WorkflowToken instance.
Scope
Two scopes—scopes, System and User—are , are provided for workflow keys. CDAP adds keys (such as MapReduce counters) under the System scope. Keys added by user programs are stored under the User scope.
...
In the case of a MapReduce program, the program's Mapper and Reducer classes need to implement ProgramLifecycle<MapReduceTaskContext>
. After doing so, they can access the workflow token in either the initialize
or destroy
methods. To access it in the map
or reduce
methods, you would need to cache a reference to the workflow token object as a class member in the initialize()
method. This is because the context object passed to those methods is a Hadoop class that is unaware of CDAP and its workflow tokens.
Note: The test of workflowToken != null
is only required because this Reducer could be used outside of a workflow. When run from within a workflow, the token is guaranteed to be non-null.
The WorkflowToken Java API includes methods for getting values for different keys, scopes, and nodes. The same key can be added to the workflow by different nodes, and there are methods to return a map of those key-value pairs. Convenience methods allow the putting and getting of non-string values through the use of the class Value
.
...