Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. In above Workflow, the datasets errorRecords, customerPurchases, and productPurchases are non-transient datasets. They can be defined inside the application as -

    Code Block
    languagejava
    public class WorkflowApplication extends AbstractApplication {
       ...
       // define non-transient datasets 
       createDataset("errorRecords", KeyValueTable.class);
       createDataset("customerPurchases", KeyValueTable.class);
       createDataset("productPurchases", KeyValueTable.class);
       ...
    }
    
    
  2. Since purchaseRecords is the dataset local to the Workflow, it can be defined inside the Workflow configurer method as - 

    Code Block
    languagejava
    public class PurchaseWorkflow extends AbstractWorkflow {
       @Override
       protected void configure() {
          setName("PurchaseWorkflow");
          ...
          // create the Workflow local datasets here - 
          createLocalDataset("PurchaseRecords", KeyValueTable.class);
          ...
          addMapReduce("PurchaseEventParser");
          ...
       }
    }
  3. When the application is deployed, following datasets are created -   purchaseRecords, errorRecords, customerPurchases, and productPurchases

  4. If MapReduce program PurchaseEventParser is ran by itself, outside the Workflow, it will write to the non-transient datasets purchaseRecords and errorRecords.
    In order to specify the output as transient inside a Workflow, appropriate configuration parameters can be passed to the Workflow when it is started aswould fail, since the purchaseRecords is not defined in the application scope. Its user's responsibility to make sure that the dataset exists in the correct scope.

  5. User can choose to not to delete the local dataset even after the Workflow run is complete by specifying the runtime argument.

    Code Block
    // PurchaseEventParserTo MapReduce program writes to keep the two output datasets -Workflow local purchaseRecords anddataset errorRecordseven //after Tothe specify that the purchaseRecordsWorkflow run is acompleted transientfollowing datasetruntime useargument followingcan scopedbe parameterspecified  -
    dataset.purchaseRecords.transient=true

    Is it possible to have two different MapReduce programs in the Workflow writing to the same dataset and the output of the one is transient? In this case the dataset can be further scoped by the MapReduce program name as 

    Code Block
    // Scope the parameter further by the name of the MapReduce program
    mapreduce.PurchaseEventParser.dataset.purchaseRecords.transientkeep.local=true
     
    // In order to keep all the local datasets even after the Workflow run is completed following runtime argument can be specified -
    dataset.*.keep.local=true
  6. WorkflowDriver will thus get the multiple pair of <MapReduceProgram, TransientDataset>. WorkflowDriver can then pass this information to the MapReduceProgramRunner.

  7. MapReduceProgramRunner will create the transient dataset instances with unique names. For example if the dataset name is purchaseRecords, its corresponding transient dataset could be created with name purchaseRecords.<mapreduce_runid>. 

  8. MapReduceProgramRunner while creating the BasicMapReduceContext can pass on this dataset name mapping (for example in this case <purchaseRecords, purchaseRecords.xyzzz>) to it. All calls to the MapReduceContext.addOutput will be intercepted, so that if they are for the transient datasets, then the name of the dataset will be replaced by the transient dataset name.

    Code Block
    languagejava
    class BasicMapReduceContext {
       // Map from dataset's actual name to the internal unique transient name
       private final Map<String, String> transientDatasetNameMapping;
    public BasicMapReduceContext(Program program,
                                 RunId runId,
                                 Arguments runtimeArguments,
                                 MapReduceSpecification spec,
                                 ...
                                 /* MapReduce program can have multiple transient datasets as output*/
                                 Map<String/*dataset name*/, String/*transient dataset name*/> transientDatasetNameMapping) {
          ...
    	  // Store the transient dataset name mapping
          this.transientDatasetNameMapping = transientDatasetNameMapping;
          ...
       }
     
       @Override
       public void addOutput(String datasetName, Map<String, String> arguments) {
         String transientDatasetName = transientDatasetNameMapping.get(datasetName);
         // It is possible that the arguments here are scoped by the name of the dataset.
         // For example dataset.purchaseRecords.cache.seconds=30. 
         // We will need to add the scoped parameters under the name of the new transient dataset
         // as dataset.purchaseRecords.<mapreduce_runid>.cache.seconds=30
         addOutput(datasetName, new DatasetOutputFormatProvider(datasetName, arguments, getDataset(datasetName, arguments),
                                                           MapReduceBatchWritableOutputFormat.class));
      
       }
     
       @Override
       public void addOutput(String outputName, OutputFormatProvider outputFormatProvider) {
          String transientDatasetName = transientDatasetNameMapping.get(outputName);
          this.outputFormatProviders.put(transientDatasetName, outputFormatProvider);
       }
    }
  9. BasicMapReduceContext is created in MapperWrapper and ReducerWrapper as well, so we will need to pass the mapping there using the MapReduce job configuration.

  10. Once the Workflow run completes, the corresponding transient datasets can be deleted from the WorkflowDriver.

...