Workflows hooks CDAP-4075

Goals

JIRA: CDAP-4075: Error handling for Workflows.

Use Cases

  1. When Workflow run finishes, user may want to send an email about its success or failure.
  2. In case of hydrator pipeline, once the run is finish, user may wish to delete the data from the external source such as Oracle/Teradata etc.
  3. If the Workflow fails for some reason, user may want to cleanup the files/data written by the nodes in the Workflow.
  4. On failure of the Workflow, user may wish to keep certain local datasets for further debugging.
  5. In a Workflow, user can have a custom action at the start of the workflow that writes to a dataset (which acts as a lock). Next node in the Workflow is a MapReduce program that fails for that run of the Workflow. User would like to be able to clean up the state that custom action wrote to dataset 

User Stories

  1. As a developer of Workflow action, I want an ability to clean up the data that was written by Workflow action in case of Workflow action failure.
  2. As a developer of Workflow action, I want an ability to clean up the data that was written by Workflow action in case Workflow fails.
  3. As a developer of the Workflow, I want an ability to send an email once the Workflow run finishes. In case of failure, I should be able to access the nodes that failed and the failure cause.
  4. As a developer of the Workflow, I want an ability to instruct the Workflow system, not to delete the certain local datasets for triage purpose.

 Possible approach

  1. Ideally clean up activity should be done by the node in the Workflow which created the data, since the node knows what information need to be cleaned up. MapReduce and Spark program already have the onFinish method, which can be used to clean up any state on their failure. Custom action should similarly have the onFinish method to perform clean up on custom action failure. Custom action already have destroy method, however it does not know whether the run is succeeded or failed. We should deprecate it and introduce the new method onFinish to be consistent with other actions.

    public interface WorkflowAction extends Runnable {
      /**
       * This method is called after the {@link #run} method completes and it can be used for resource cleanup. 
       * Any exception thrown only gets logged but does not affect execution of the {@link Workflow}.
       */
      @Deprecated 
      void destroy();
     
      /**
       * This method is called after the execution of the action is done.
       * @param succeeded defines the result of action execution: true if job succeeded, false otherwise
      * @throws Exception if there is any error during execution. This exception will only be logged as error
      * without affecting the execution of the {@link Workflow}
      */
      void onFinish(boolean succeeded) throws Exception;
    }


  2. We will have onFinish method in the Workflow interface as well, which will get called when the Workflow finishes either successfully or on failure.
      

    public interface Workflow {
     /**
     * Called when the Workflow run finishes either successfully or on failure.
     * @param context the context associated with the Workflow
     * @param state the state of the Workflow
     * @throws Exception if there is an error during this method. This will not affect the status of the Workflow.
     */
     void onFinish(WorkflowContext context, WorkflowState state) throws Exception;
    }

     

  3. WorkflowState class contains the state of all nodes in the Workflow.

    public final class WorkflowState {
       private final Map<String, WorkflowNodeState> nodeState;
       private boolean isSucceeded; 
    }
     
    public final class WorkflowNodeState {
       private final String nodeId;
       private final NodeStatus nodeStatus;
       private final RunId runId;
       // Cause if the node execution failed, null otherwise
       private final Throwable failureCause;
    }
     
    public enum NodeStatus {
       KILLED,
       FAILED,
       COMPLETED
    }
  4. onFinish method in the Workflow can also update the preferences such as changing preferences for the local datasets.
    1. This can be either done through WorkflowToken, since user can get the WorkflowToken through WorkflowContext. However since we store the information in the WorkflowToken at node level, we will have to create an internal node for the onFinish method.
    2. Another approach is to have Map<String, String> properties in the WorkflowState instance, which user can update in the onFinish method.

  5. Similar to MapReduce and Spark, onFinish method of the Workflow will run in short transaction. Ideally user would like to have control over the kind of transaction that need to be started.

  6. Workflow can also have the beforeStart method which can be used for any cleanup activity, so that user do not have to put additional custom action only for initialization purpose. beforeStart method for now can run in short transaction.

    public interface Workflow {
       /**
        * Called before the start of the every run of the Workflow.
        * @param context the Workflow context
        * @throws Exception thrown if any error executing code. This will cause Workflow to fail.
        */
       void beforeStart(WorkflowContext context) throws Exception;
    }


Created in 2020 by Google Inc.