Workflows
Workflows are used to automate the execution of a series of MapReduce, Spark, or custom actions. It allows for both sequential and parallel execution of programs.
The workflow system allows specifying, executing, scheduling, and monitoring complex series of jobs and tasks in CDAP. The system can manage thousands of workflows and maintain millions of historic workflow logs.
Overview
A Workflow is given a sequence of programs that follow each other, with an optional schedule to run the workflow periodically. Upon successful execution of a program, the control is transferred to the next program in the sequence until the last program in the sequence is executed. Upon failure, the execution is stopped at the failed program and no subsequent programs in the sequence are executed.
The control flow of a workflow can be described as a directed, acyclic graph (DAG) of actions. To be more precise, we require that it be a series-parallel graph. This is a graph with a single start node and a single finish node; in between, the graph can be either a simple series of nodes or a more complicated parallel workflow.
Workflows can be controlled by the CDAP CLI and the Lifecycle Microservices. The status of a workflow can be retrieved, workflows started or stopped, and individual runs of a workflow suspended or resumed.
A workflow can have one or more schedules that call upon it. These schedules are in a disabled state when the application is first deployed. Each schedule needs to be enabled, changing its status to scheduled, in order for the workflow to become executed following the schedule.
Executing MapReduce or Spark Programs
To execute MapReduce or Spark programs in a workflow, you will need to add them in your application along with the workflow. You can (optionally) add a schedule to a workflow using the schedule Java API.
To add a schedule to an application extended from AbstractApplication
, use the method schedule instead:
public void configure() {
...
addMapReduce(new MyMapReduce());
addMapReduce(new AnotherMapReduce());
addSpark(new MySpark());
addWorkflow(new MyWorkflow());
schedule(
buildSchedule("FiveHourSchedule", ProgramType.WORKFLOW, "MyWorkflow")
.setDescription("Schedule running every 5 hours")
.triggerByTime("0 */5 * * *"));
...
}
You'll then extend the AbstractWorkflow
 class and implement the configure()
 method. Inside configure
, you can add multiple MapReduce, Spark programs, or custom actions to the workflow. The programs will be executed in the order they are specified in the configure
 method:
public static class MyWorkflow extends AbstractWorkflow {
@Override
public void initialize(WorkflowContext context) throws Exception {
// Invoked before the Workflow run starts
super.initialize(context);
}
@Override
public void configure() {
setName("MyWorkflow");
setDescription("MyWorkflow description");
addMapReduce("MyMapReduce");
addSpark("MySpark");
addMapReduce("AnotherMapReduce");
addAction(new MyAction());
}
@Override
public void destroy() {
// Invoked after the execution of the Workflow
// Determine the status of the Workflow
boolean isWorkflowSuccessful = getContext().getState().getStatus() == ProgramStatus.COMPLETED;
// Get the state of all nodes that were executed as a part of this Workflow run.
Map<String, WorkflowNodeState> nodeStates = getContext().getNodeStates();
}
}
In this example, the MyWorkflow
 will be executed every 5 hours. During each execution of the workflow, the MyMapReduce
, MySpark
, and AnotherMapReduce
 programs and the MyAction
 custom action will be executed in order.
In addition to configure()
 method, extending from AbstractWorkflow
 allows you to implement these methods:
initialize()
destroy()
The initialize()
 method is invoked at runtime before the start of the workflow run. Any error occurred in this method causes failure of the workflow.
The destroy()
 method is invoked after the workflow run is completed, either successfully or on failure. WorkflowContext
 can be used to determine the status of the workflow. This method also has access to the state of all nodes that were executed as a part of the workflow run through the WorkflowContext
. An error occurring in this method does not affect the status of the workflow.
Workflow Custom Action
In addition to MapReduce and Spark programs, a workflow can also execute custom actions. Custom actions are implemented in Java and can perform tasks such as sending an email. Since within a custom action you have control over transactions (they can be started only if there is dataset accesses), long-running tasks which do not require transactions can be executed in a custom action.
To define a custom action, you will need to extend the AbstractCustomAction
 and implement its run()
 method:
public static class MyAction extends AbstractCustomAction {
@Override
public void run() {
// Your code goes here
...
// Datasets can be accessed in this method using a transaction such as:
getContext().execute(new TxRunnable() {
@Override
public void run(DatasetContext context) throws Exception {
Table myDs = context.getDataset("MyDataset");
// Perform dataset operations such as reading and writing
myDs.write(...);
}
});
}
}
The custom action then can be added to a workflow using the addAction()
 method, as shown in the previous example:
Assigning Unique Names
It's important to assign unique names to each component of the workflow, especially when you use multiple instances of the same program in the same workflow.
These unique names can be set when the Workflow is first configured, passed to the instance of the program, and then be used when the program performs its own configuration.
Local Datasets in Workflow
Local datasets are the datasets that are created for each workflow run and deleted once the workflow run finishes. Consider a workflow which runs multiple actions with output of one action is given as an input to the next action. However, we are only interested in the output of the final action. In this case, datasets created by intermediate stages can be defined as local datasets when configuring the workflow. This allows the workflow system to manage such temporary storage for you.
The local datasets can be configured in the workflow as:
WordCount
 and CSVData
 are configured as local datasets for a workflow. For every workflow run, these datasets will be created and they will be named as WordCount.<unique_id>
 and CSVData.<unique_id>
. Once the run is complete they will be deleted by the workflow system.
Local datasets can be retained after the workflow run is complete by setting the runtime argument dataset.<dataset_name>.keep.local
 to true
. For example in order to keep a WordCount dataset even after the workflow run is complete, set the runtime argument dataset.WordCount.keep.local
 to true
. To keep all local datasets, set the runtime argument dataset.*.keep.local
 to true
.
Concurrent Workflows
By default, a workflow runs concurrently, allowing multiple instances of a workflow to be run simultaneously. However, for scheduled workflows, the number of concurrent runs can be controlled by setting a maximum number of runs.
Workflow Tokens
In addition to passing the control flow from one node to the next, a workflow token is passed, available to each of the programs in the workflow. This allows programs to:
pass custom data (such as a counter, a status, or an error code) from one program in the workflow to subsequent programs;
query and set the data in the token;
fetch the data from the token which was set by a specific node; and
alter the job configuration based on a key in the token; for example, set a different mapper/reducer class or a different input/output dataset for a Spark or MapReduce program.
The API is intended to allow appropriate action to be taken in response to the token, including logging and modifying the conditional execution of the workflow based on the token.
Once a run is completed, you can query the tokens from past workflow runs for analyses that determine which node was executed more frequently and when. You can retrieve the token values that were added by a specific node in the workflow to debug the flow of execution.
When an action is started by a workflow, the action always receives a non-null instance of the WorkflowToken. However, when a MapReduce or Spark program is started directly (outside of a workflow), the WorkflowToken received from the context of the program is null. Since custom actions are always started by a workflow, they will always receive a non-null WorkflowToken instance.
Scope
Two scopes, System and User, are provided for workflow keys. CDAP adds keys (such as MapReduce counters) under the System scope. Keys added by user programs are stored under the User scope.
Putting and Getting Token Values
When a value is put into a token, it is stored under a specific key. Both keys and their corresponding values must be non-null. The token stores additional information about the context in which the key is being set, such as the unique name of the workflow node. To put a value into a token, first obtain access to the token from the workflow context, and then set a value for a specific key.
In the case of a MapReduce program, the program's Mapper and Reducer classes need to implement ProgramLifecycle<MapReduceTaskContext>
. After doing so, they can access the workflow token in either the initialize
 or destroy
 methods. To access it in the map
 or reduce
 methods, you would need to cache a reference to the workflow token object as a class member in the initialize()
 method. This is because the context object passed to those methods is a Hadoop class that is unaware of CDAP and its workflow tokens.
Note: The test of workflowToken != null
 is only required because this Reducer could be used outside of a workflow. When run from within a workflow, the token is guaranteed to be non-null.
The WorkflowToken Java API includes methods for getting values for different keys, scopes, and nodes. The same key can be added to the workflow by different nodes, and there are methods to return a map of those key-value pairs. Convenience methods allow the putting and getting of non-string values through the use of the class Value
.
MapReduce Counters and Workflow Tokens
When a workflow executes a MapReduce program, MapReduce counters generated by the program are added to the workflow token under the system scope. Counters can be defined either by the MapReduce framework or applications.
The counters defined by the MapReduce framework can be retrieved from the workflow token by using a key composed of the counter group name, followed by a ".", followed by the name of the counter. For example, to access the number of input records to the map method of the PurchaseHistoryBuilder
 MapReduce program:
Applications can define the counters using an enum
 such as:
These counters can be retrieved from the workflow token by using a key composed of the class name of the enum
 as the counter group name, followed by a ".", followed by the enum
 member as the name of the counter:
Applications can also define counters by explicitly providing a counter group name and counter name:
These counters can be retrieved from the workflow token by using a key composed of the explicitly-provided counter group name, followed by a ".", followed by the explicitly-provided name of the counter:
Spark Accumulators and Workflow Tokens
Spark Accumulators can be accessed through the SparkContext, and used with workflow tokens. This allows the values in the accumulators to be accessed through workflow tokens.
Persisting the WorkflowToken
The WorkflowToken
 is persisted after each action in the workflow has completed.
Examples
In this code sample, we show how to update the WorkflowToken in a MapReduce program:
A token can only be updated in:
initialize
 andÂdestroy
 methods of a MapReduce program;Driver of a Spark program;
custom action; and
predicates of condition nodes.
You will get an exception if you try to update the workflow token in:
map or reduce methods; or
Executors in Spark programs.
You can always read the workflow token in any of the above situations.
Parallelizing Workflow Execution
The control flow of a workflow can be described as a directed, acyclic graph (DAG) of actions. To be more precise, we require that it be a series-parallel graph. This is a graph with a single start node and a single finish node. In between, execution can fork into concurrent branches, but the graph may not have cycles. Every action can be a batch job or a custom action (implemented in Java; for example, making a RESTful call to an external system).
For example, a simple control flow could be computing user and product profiles from purchase events. After the start, a batch job could start that joins the events with the product catalog. After that, execution could continue with a fork, and with two batch jobs running in parallel: one computing product profiles; while the other computes user profiles. When they are both done, execution is joined and continues with a custom action to upload the computed profiles to a serving system, after which the control flow terminates:
Forks and Joins
To create such a workflow, you provide a series of forks and joins in your workflow specification, following these rules:
Where your control flow initially splits, you place aÂ
fork
 method.Every time your control flow splits, you add additionalÂ
fork
 methods.Every point where you have either a program or an action, you add aÂ
addMapReduce
,ÂaddSpark
, orÂaddAction
 method.To show each fork, use anÂ
also
 method to separate the different branches of the control flow.Where your control flow reconnects, you add aÂ
join
 method to indicate.The control flow always concludes with aÂ
join
 method.
The application shown above could be coded (assuming the other classes referred to exist) as:
Provided that the control flow does not have cycles or the joining of any branches that do not originate from the same fork, flows of different complexity can be created using these rules and methods.
More complicated structures can be created using fork
. To add another MapReduce that runs in parallel to the entire process described above, you could use code such as:
The diagram for this code would be:
Conditional Node
You can provide a conditional node in your structure that allows for branching based on a boolean predicate.
Taking our first example and modifying it, you could use code such as:
where MyPredicate
 is a public class which implements the Predicate
 interface as:
The mapper of the JoinWithCatalogMR
 MapReduce can have a code that governs which condition to follow. Note that as the context passed is a standard Hadoop context, the WorkflowContext
 is not available:
In this case, if the predicate finds that the number of product profile entries is greater than the number of user profile entries, the logic will follow the path of BuildProductProfileMR; otherwise, the other path will be taken. The diagram for this code would be:
Workflow Token with Forks and Joins
For workflows that involve forks and joins, a single instance of the workflow token is shared by all branches of the fork. Updates to the singleton are made thread-safe through synchronized updates, guaranteeing that value you obtain from reading the token is the last value written at runtime. This is a time-based guarantee.
Examples
This code sample shows how to obtain values from the token from within a custom action, and from within a workflow with a predicate, fork and joins:
Created in 2020 by Google Inc.