Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 14 Next »

Goals

  1. JIRA: CDAP-3969: CDAP should offer a temporary location to store results between jobs of a workflow.
  2. JIRA: CDAP-4075: Error handling for Workflows.

Checklist

  • User stories documented (Sagar)
  • User stories reviewed (Nitin)
  • Design documented (Sagar)
  • Design reviewed (Albert/Terence/Andreas)
  • Feature merged (Sagar)
  • Examples and guides (Sagar)
  • Integration tests (Sagar) 
  • Documentation for feature (Sagar)
  • Blog post

Use Cases

  1. JIRA: CDAP-3969: CDAP should offer a temporary location to store results between jobs of a workflow.

    Case A)

    Consider the above sample workflow from CDAP-Workflow guide. The goal is to process the raw purchase events from the purchaseEvents stream and find the purchases made by each customer and purchases made for the particular product. When workflow runs, PurchaseEventParser reads the raw events from the purchaseEvents stream and writes the purchase objects to the purchaseRecords dataset. This dataset is later used by PurchaseCounterByCustomer and PurchaseCounterByProduct MapReduce programs as input to create the datasets customerPurchases and productPurchases respectively. Note that when the workflow completes, user is only interested in the final datasets that are created by the Workflow run: customerPurchases and productPurchases. The dataset purchaseRecords created by the MapReduce program PurchaseEventParser is temporary and no longer required when the workflow run is completed.

    Case B)
    MapReduce program in CDAP can output to the multiple datasets. Consider that the above Workflow can be modified, so that PurchaseEventParser can also write to the errorRecords along with the purchaseRecords dataset. The errorRecords contains the raw events from the purchaseEvents stream for which parsing failed. In this case, the errorRecords may not be temporary since user may want to perform some analysis on it to find out the sources which are emitting the bad data frequently. 


  2. JIRA: CDAP-4075: Error handling for Workflows.
    Case A) When the Workflow fails for some reason, user may want to notify appropriate parties via email, possibly with the cause of the failure and the node at which the Workflow failed.
    Case B) When the Workflow fails for some reason at a particular node, user may want to cleanup the datasets and files created by the previous nodes in the Workflow.

User Stories

  1. As a developer of the Workflow, I want ability to specify that the output of the particular program (MapReduce/Spark) in the Workflow is temporary, so that Workflow system can clean it up. (CDAP-3969)
  2. As a developer of the MapReduce or Spark program, I should be able to run it on its own as well as inside the Workflow with its output specified as temporary. When the MapReduce is run on its own, the output should not be cleaned up. (CDAP-3969)
  3. MapReduce program can output to multiple datasets. As a developer of the Workflow, I want the ability to selectively specify some of the output datasets of the MapReduce program as transient. (CDAP-3969). 
  4. As a developer of the Workflow, I want ability to specify the functionality that will get executed when the Workflow finishes successfully. (CDAP-4075)
  5. As a developer of the Workflow, I want ability to specify the functionality that will get executed when the Workflow fails at any point in time. I want access to the cause of the failure and the node at which the workflow failed. (CDAP-4075)

Approach for CDAP - 3969

Consider again the Workflow mentioned in the use case above.

  1. Application code will be not be changed.

    public class WorkflowApplication extends AbstractApplication {
       ...
       createDataset("purchaseRecords", KeyValueTable.class);
       createDataset("errorRecords", KeyValueTable.class);
       createDataset("customerPurchases", KeyValueTable.class);
       createDataset("productPurchases", KeyValueTable.class);
       ...
    }
    
    
  2. When the application is deployed, following datasets are created -  purchaseRecords, errorRecords, customerPurchases, and productPurchases

  3. If MapReduce program PurchaseEventParser is ran by itself, outside the Workflow, it will write to the non-transient datasets purchaseRecords and errorRecords.

  4. In order to specify the output as transient inside a Workflow, appropriate configuration parameters can be passed to the Workflow when it is started as

    // PurchaseEventParser MapReduce program writes to the two output datasets - purchaseRecords and errorRecords
    // To specify that the purchaseRecords is a transient dataset use following scoped parameter
     
    dataset.purchaseRecords.transient=true
  5. Is it possible to have two different MapReduce programs in the Workflow writing to the same dataset and the output of the one is transient? In this case the dataset can be further scoped by the MapReduce program name as 

    // Scope the parameter further by the name of the MapReduce program
    mapreduce.PurchaseEventParser.dataset.purchaseRecords.transient=true
  6. WorkflowDriver will thus get the multiple pair of <MapReduceProgram, TransientDataset>. WorkflowDriver can then pass this information to the MapReduceProgramRunner.

  7. MapReduceProgramRunner will create the transient dataset instances with unique names. For example if the dataset name is purchaseRecords, its corresponding transient dataset could be created with name purchaseRecords.<mapreduce_runid>. 

  8. MapReduceProgramRunner while creating the BasicMapReduceContext can pass on this dataset name mapping (for example in this case <purchaseRecords, purchaseRecords.xyzzz>) to it. All calls to the MapReduceContext.addOutput will be intercepted, so that if they are for the transient datasets, then the name of the dataset will be replaced by the transient dataset name.

    class BasicMapReduceContext {
       // Map from dataset's actual name to the internal unique transient name
       private final Map<String, String> transientDatasetNameMapping;
    public BasicMapReduceContext(Program program,
                                 RunId runId,
                                 Arguments runtimeArguments,
                                 MapReduceSpecification spec,
                                 ...
                                 /* MapReduce program can have multiple transient datasets as output*/
                                 Map<String/*dataset name*/, String/*transient dataset name*/> transientDatasetNameMapping) {
          ...
    	  // Store the transient dataset name mapping
          this.transientDatasetNameMapping = transientDatasetNameMapping;
          ...
       }
     
       @Override
       public void addOutput(String datasetName, Map<String, String> arguments) {
         String transientDatasetName = transientDatasetNameMapping.get(datasetName);
         // It is possible that the arguments here are scoped by the name of the dataset.
         // For example dataset.purchaseRecords.cache.seconds=30. 
         // We will need to add the scoped parameters under the name of the new transient dataset
         // as dataset.purchaseRecords.<mapreduce_runid>.cache.seconds=30
         addOutput(datasetName, new DatasetOutputFormatProvider(datasetName, arguments, getDataset(datasetName, arguments),
                                                           MapReduceBatchWritableOutputFormat.class));
      
       }
     
       @Override
       public void addOutput(String outputName, OutputFormatProvider outputFormatProvider) {
          String transientDatasetName = transientDatasetNameMapping.get(outputName);
          this.outputFormatProviders.put(transientDatasetName, outputFormatProvider);
       }
    }
  9. BasicMapReduceContext is created in MapperWrapper and ReducerWrapper as well, so we will need to pass the mapping there using the MapReduce job configuration.

  10. Once the Workflow run completes, the corresponding transient datasets can be deleted from the WorkflowDriver.

 

 

 

 

 

 

 

  • No labels