Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 29 Next »

Goals

  1. JIRA: CDAP-3969: CDAP should offer a temporary location to store results between jobs of a workflow.
  2. JIRA: CDAP-4075: Error handling for Workflows.

Checklist

  • User stories documented (Sagar)
  • User stories reviewed (Nitin)
  • Design documented (Sagar)
  • Design reviewed (Albert/Terence/Andreas)
  • Feature merged (Sagar)
  • Examples and guides (Sagar)
  • Integration tests (Sagar) 
  • Documentation for feature (Sagar)
  • Blog post

Terminology

In this document I used the term local dataset to refer to the datasets which are defined inside the scope of the Workflow, however temporary, transient refer to same thing. We can finalize which term to use. 

Local dataset Definition

Local datasets are the datasets which are configured inside the Workflow. These datasets are created by the Workflow driver for each run and are deleted once the Workflow run finishes. In some situations user can choose not to delete them for the debugging purpose by providing appropriate runtime arguments. These datasets should be hidden from the normal list dataset calls and only visible from the Workflow run level UI page for exploring/debugging purpose. 

Use Cases

    1. JIRA: CDAP-3969: CDAP should offer a temporary location to store results between jobs of a workflow.

      Case A)

      Consider the above sample workflow from CDAP-Workflow guide. The goal is to process the raw purchase events from the purchaseEvents stream and find the purchases made by each customer and purchases made for the particular product. When workflow runs, PurchaseEventParser reads the raw events from the purchaseEvents stream and writes the purchase objects to the purchaseRecords dataset. This dataset is later used by PurchaseCounterByCustomer and PurchaseCounterByProduct MapReduce programs as input to create the datasets customerPurchases and productPurchases respectively. Note that when the workflow completes, user is only interested in the final datasets that are created by the Workflow run: customerPurchases and productPurchases. The dataset purchaseRecords created by the MapReduce program PurchaseEventParser is local to the Workflow and no longer required when the workflow run is completed.

      Case B)
      MapReduce program in CDAP can output to the multiple datasets. Consider that the above Workflow can be modified, so that PurchaseEventParser can also write to the errorRecords along with the purchaseRecords dataset. The errorRecords contains the raw events from the purchaseEvents stream for which parsing failed. In this case, the errorRecords may not be local since user may want to perform some analysis on it using another CDAP application to find out the sources which are emitting the bad data frequently. 

      Case C)
      If for some reason, MapReduce program PurchaseEventParser is not generating the required amount of the data, user may want to keep the dataset purchaseRecords even after the run of the Workflow completes, so that he can debug it further.

       

    2. JIRA: CDAP-4075: Error handling for Workflows.
      Case A) When the Workflow fails for some reason, user may want to notify appropriate parties via email, possibly with the cause of the failure and the node at which the Workflow failed.
      Case B) When the Workflow fails for some reason at a particular node, user may want to cleanup the datasets and files created by the previous nodes in the Workflow.

User Stories

  1. As a developer of the Workflow, I want the ability to specify that the particular dataset used in the Workflow is local for the particular run, so that Workflow system can clean it up after the run completes. (CDAP-3969)
  2. As a developer of the Workflow, I should be able to specify whether the local datasets created by the Workflow run should be deleted or not after the Workflow run finishes. This way I can do some debugging on the them once the Workflow run is failed. (CDAP-3969)
  3. I want the ability to delete the local datasets generated for the particular Workflow run. (CDAP-3969)
  4. I should be able to specify whether to keep the local dataset even after the Workflow run is finished. (CDAP-3969)
  5. As a developer of the Workflow, I want ability to specify the functionality(such as sending an email) that will get executed when the Workflow finishes successfully. (CDAP-4075)
  6. As a developer of the Workflow, I want ability to specify the functionality(such as sending an email) that will get executed when the Workflow fails at any point in time. I want access to the cause of the failure and the node at which the workflow failed. (CDAP-4075)

Approach for CDAP-3969

Consider again the Workflow mentioned in the use case above.

  1. In above Workflow, the datasets errorRecords, customerPurchases, and productPurchases are non-local datasets. They can be defined inside the application as -

    public class WorkflowApplication extends AbstractApplication {
       ...
       // define non-local datasets 
       createDataset("errorRecords", KeyValueTable.class);
       createDataset("customerPurchases", KeyValueTable.class);
       createDataset("productPurchases", KeyValueTable.class);
       ...
    }
    
    
  2. Since purchaseRecords is the dataset local to the Workflow, it can be defined inside the Workflow configurer method as - 

    public class PurchaseWorkflow extends AbstractWorkflow {
       @Override
       protected void configure() {
          setName("PurchaseWorkflow");
          ...
          // create the Workflow local datasets here - 
          createLocalDataset("PurchaseRecords", KeyValueTable.class);
          ...
          addMapReduce("PurchaseEventParser");
          ...
       }
    }
  3. When the application is deployed, following datasets are created - errorRecords, customerPurchases, and productPurchases

  4. If MapReduce program PurchaseEventParser is ran by itself, outside the Workflow, it would fail, since the purchaseRecords is not defined in the application scope. Its user's responsibility to make sure that the dataset exists in the correct scope.

  5. User can choose to not to delete the local dataset even after the Workflow run is complete by specifying the runtime argument.

    // To keep the Workflow local purchaseRecords dataset even after the Workflow run is completed following runtime argument can be specified -
    dataset.purchaseRecords.keep.local=true
     
    // In order to keep all the local datasets even after the Workflow run is completed following runtime argument can be specified -
    dataset.*.keep.local=true
  6. Datasets configured as local to the Workflow can be stored in the Workflow specification. When Workflow run is started, these datasets can be instantiated. The name of the local dataset would be datasetName.<workflow_run_id>.

  7. Mapping of the <datasetName, localDatasetName> would be passed to the dataset framework instance so that all calls to the dataset would be routed to the appropriate dataset instance. Possible approach is - 

    // Class to forward the calls to the delegate
    class ForwardingDatasetFramework implements DatasetFramework {
       private final Map<String, String> datasetNameMapping;
       private final DatasetFramework delegate; 
       public ForwardingDatasetFramework(DatasetFramework datasetFramework, Map<String, String> datasetNameMapping) {
          this.delegate = datasetFramework;
          this.datasetNameMapping = datasetNameMapping;
       }
     
       // All required calls would be intercepted here. For e.g.
       @Override
       public <T extends Dataset> T getDataset(
         Id.DatasetInstance datasetInstanceId, Map<String, String> arguments,
         @Nullable ClassLoader classLoader) throws DatasetManagementException, IOException {
         
         if (datasetNameMapping.containsKey(datasetInstanceId.getId())) {
            datasetInstanceId = Id.DatasetInstance.from(datasetInstanceId.getNamespaceId(), datasetInstanceId.getId()); 
         }
         return getDataset(datasetInstanceId, arguments, classLoader, null);
       }   
    }
     
    // ForwardingDatasetFramework can then be created in WorkflowDriver (in order to use it for custom action and predicates), MapReduceProgramRunner, and SparkProgramRunner from injected DatasetFramework instance.
  8. Similar change mentioned in the (7) would require in MapperWrapper and ReducerWrapper, since we create the instance of the datasetFramework over there as well.

  9. Once the Workflow run completes, the corresponding local datasets can be deleted from the WorkflowDriver, depending on the runtime argument dataset.*.keep.local as mentioned in (5).

Lineage for the Local datasets(TBD)


REST API Changes

  1. Datasets local to the Workflow should not be listed in the list dataset API. This will simply hide the local datasets from the user. However there is still possibility for the user to get access to the local datasets and use them inside the applications. 

  2. API to list the local datasets if they are available.

    GET <base-url>/namespaces/{namespace}/apps/{app-id}/workflows/{workflow-id}/runs/{run-id}/localdatasets
  3. We will need a way to delete the local datasets created for a particular Workflow run if user set the dataset.*.keep.local for that run.

    DELETE <base-url>/namespaces/{namespace}/apps/{app-id}/workflows/{workflow-id}/runs/{run-id}/localdatasets

UI Changes

  1. For every Workflow run, we will need to add the new tab for listing the local datasets.
  2. These datasets if marked explorable, should be explorable from the UI.
  3. We need ability to delete the local datasets from UI.

 

 

  • No labels