...
- JIRA: CDAP-3969: CDAP should offer a temporary location to store results between jobs of a workflow.
Case A)Consider the above sample workflow from CDAP-Workflow guide. The goal is to process the raw purchase events from the purchaseEvents stream and find the purchases made by each customer and purchases made for the particular product. When workflow runs, PurchaseEventParser reads the raw events from the purchaseEvents stream and writes the purchase objects to the purchaseRecords dataset. This dataset is later used by PurchaseCounterByCustomer and PurchaseCounterByProduct MapReduce programs as input to create the datasets customerPurchases and productPurchases respectively. Note that when the workflow completes, user is only interested in the final datasets that are created by the Workflow run: customerPurchases and productPurchases. The dataset purchaseRecords created by the MapReduce program PurchaseEventParser is local to the Workflow and no longer required when the workflow run is completed.
Case B)
MapReduce program in CDAP can output to the multiple datasets. Consider that the above Workflow can be modified, so that PurchaseEventParser can also write to the errorRecords along with the purchaseRecords dataset. The errorRecords contains the raw events from the purchaseEvents stream for which parsing failed. In this case, the errorRecords may not be local since user may want to perform some analysis on it using another CDAP application to find out the sources which are emitting the bad data frequently.
Andreas: I don't understand the significance of case B. All this says is that some output datasets are not local. But we already know that? - [Sagar: This is simply to note down the use case where not all the outputs of the MapReduce program can be local.]Case C)
If for some reason, MapReduce program PurchaseEventParser is not generating the required amount of the data, user may want to keep the dataset purchaseRecords even after the run of the Workflow completes, so that he can debug it further.Case D)
Workflow DAG on the UI shows which nodes will be executed by the Workflow and in what order. User can click on any link between the nodes and mark it as local so that it can be kept after the Workflow run. This will cause the output of the source node for that link to be marked as local. User can again click on the link and mark the output as non-local. Solving this use case has few challenges though: MapReduce program can write to multiple output datasets which is decided dynamically during the Workflow run. How would we know before hand that which dataset to be marked as local. Also user can decide to have only one of the output dataset of MapReduce program as local. How would it work in case the link is between the custom action and MapReduce program or custom action and predicates?
Andreas: I think this is the wrong approach. Due to "dynamic datasets", the UI cannot know what datasets are read/written by an action. Also, an edge in the workflow represents a transition in control flow, not data flow: it does not represent a dataset. The next action in the workflow may actually read a dataset completely unrelated to the previous action. Didn't we agree that local datasets are declared at the scope of the workflow (that is, as part of workflow spec), and all the user can do at runtime is specify whether it should be preserved for triage purposes at the end of the run? I believe we also said that there can be an onError or onFinish action that gets called when the workflow terminates, and in the code of that action the developer can instruct the workflow driver to keep some of the local datasets for triage?[Sagar: onError or onFinish need to be design further as a part of CDAP-4075. One possible way is to have access to the WorkflowToken in these methods, so that user can put values in the token which are indicative of whether the dataset should be deleted or not.] - JIRA: CDAP-4075: Error handling for Workflows.
Case A) When the Workflow fails for some reason, user may want to notify appropriate parties via email, possibly with the cause of the failure and the node at which the Workflow failed.
Case B) When the Workflow fails for some reason at a particular node, user may want to cleanup the datasets and files created by the previous nodes in the Workflow.
Case C) In case of hydrator pipeline, user may want to delete the data from the source when the Workflow run is complete.
User Stories
- As a developer of the Workflow, I want the ability to specify that the particular dataset used in the Workflow is local to it, so that Workflow system can create it at the start of every run and cleans it up after the run completes. (CDAP-3969)
- As a developer of the Workflow, I should be able to specify whether the local datasets created by the Workflow run should be deleted or not after the Workflow run finishes. This way I can do some debugging on the them once the Workflow run is failed. (CDAP-3969)
Terence: If it is retained, how the user gain access and interact with those local datasets? As per the definition, local datasets "should be hidden from the normal list dataset calls and only visible from the Workflow run level UI page for exploring/debugging purpose". [Sagar: When dataset is created in the Workflow driver, it can add a special property say .dataset.workflow.internal to the dataset. While listing the datasets, we can filter out the datasets which have this property, so that the list dataset call does not show them. On the Workflow run level UI page, we can explicitly call the "/data/datasets/localdatasetname" so that it can be explored further given that the dataset itself is explorable. Also local dataset can be accessed from the other applications in the same namespace since the name of the local dataset would simply be datasetName.<workflow_run_id>.] - I want the ability to delete the local datasets generated for the particular Workflow run. (CDAP-3969)
I should be able to specify whether to keep the local dataset even after the Workflow run is finished. (CDAP-3969)
Andreas: This seems identical to 2. [Sagar: Oh yes. Striking it.]- As a developer of the Workflow, I want ability to specify the functionality(such as sending an email) that will get executed when the Workflow finishes successfully. (CDAP-4075)
- As a developer of the Workflow, I want ability to specify the functionality(such as sending an email) that will get executed when the Workflow fails at any point in time. I want access to the cause of the failure and the node at which the workflow failed. (CDAP-4075)
- As a developer of the Workflow, if the workflows fails, I want the ability instruct the workflow system to not delete the local datasets, for triage purposes. [Sagar: This is good point. Will add the design for it.]
...
- For every Workflow run, we will need to add the new tab for listing the local datasets.
- These datasets if marked explorable, should be explorable from the UI.
- We need ability to delete the local datasets from UI.
Approach for CDAP-4075
Workflow will have the onFinish method which will be called at the end of the every Workflow run. This is similar to the MapReduce programs.
Code Block language java /** * Called when the Workflow run finishes either successfully or on failure. * @param context context of the Workflow * @param cause non-null value indicates that there is a failure in the Workflow run */ void onFinish(WorkflowContext context, @Nullable WorkflowFailure cause);
WorkflowFailure class encapsulate the failure cause and the id of the node which failed, as -
Code Block language java public final class WorkflowFailure { // id of the node which failed the execution private final String nodeId; // failure reason private final Throwable failureReason; }
- WorkflowFailure will be set by the WorkflowDriver when there is any exception while executing the node in the Workflow.
- In the current implementation of the WorkflowDriver, when there is a failure in the fork node, we interrupt the nodes executing on the other branches. WorkflowFailure will still correspond to the first failure that occurred in the Workflow.