Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. In above Workflow, the datasets errorRecords, customerPurchases, and productPurchases are non-transient datasets. They can be defined inside the application as -

    Code Block
    languagejava
    public class WorkflowApplication extends AbstractApplication {
       ...
       // define non-transient datasets 
       createDataset("errorRecords", KeyValueTable.class);
       createDataset("customerPurchases", KeyValueTable.class);
       createDataset("productPurchases", KeyValueTable.class);
       ...
    }
    
    
  2. Since purchaseRecords is the dataset local to the Workflow, it can be defined inside the Workflow configurer method as - 

    Code Block
    languagejava
    public class PurchaseWorkflow extends AbstractWorkflow {
       @Override
       protected void configure() {
          setName("PurchaseWorkflow");
          ...
          // create the Workflow local datasets here - 
          createLocalDataset("PurchaseRecords", KeyValueTable.class);
          ...
          addMapReduce("PurchaseEventParser");
          ...
       }
    }
  3. When the application is deployed, following datasets are created - errorRecords, customerPurchases, and productPurchases

  4. If MapReduce program PurchaseEventParser is ran by itself, outside the Workflow, it would fail, since the purchaseRecords is not defined in the application scope. Its user's responsibility to make sure that the dataset exists in the correct scope.

  5. User can choose to not to delete the local dataset even after the Workflow run is complete by specifying the runtime argument.

    Code Block
    // To keep the Workflow local purchaseRecords dataset even after the Workflow run is completed following runtime argument can be specified -
    dataset.purchaseRecords.keep.local=true
     
    // In order to keep all the local datasets even after the Workflow run is completed following runtime argument can be specified -
    dataset.*.keep.local=true
  6. Datasets configured as local to the Workflow can be stored in the Workflow specification. When Workflow run is started, these datasets can be instantiated. The name of the local dataset would be datasetName.<workflow_run_id>.

  7. Mapping of the <datasetName, localDatasetName> would be passed to the dataset framework instance so that all calls to the dataset would be routed to the appropriate dataset instance. Possible approach is - 

    Code Block
    languagejava
    // Class to forward the calls to the delegate
    class ForwardingDatasetFramework implements DatasetFramework {
       private final Map<String, String> datasetNameMapping;
       private final DatasetFramework delegate; 
       public ForwardingDatasetFramework(DatasetFramework datasetFramework, Map<String, String> datasetNameMapping) {
          this.delegate = datasetFramework;
          this.datasetNameMapping = datasetNameMapping;
       }
     
       // All required calls would be intercepted here. For e.g.
       @Override
       public <T extends Dataset> T getDataset(
         Id.DatasetInstance datasetInstanceId, Map<String, String> arguments,
         @Nullable ClassLoader classLoader) throws DatasetManagementException, IOException {
         
         if (datasetNameMapping.containsKey(datasetInstanceId.getId())) {
            datasetInstanceId = Id.DatasetInstance.from(datasetInstanceId.getNamespaceId(), datasetInstanceId.getId()); 
         }
         return getDataset(datasetInstanceId, arguments, classLoader, null);
       }   
    }
     
    // ForwardingDatasetFramework can then be created in WorkflowDriver (in order to use it for custom action and predicates), MapReduceProgramRunner, and SparkProgramRunner from injected DatasetFramework instance.
  8. Similar change mentioned in the (7) would require in MapperWrapper and ReducerWrapper, since we create the instance of the datasetFramework over there as well.

  9. Once the Workflow run completes, the corresponding transient datasets can be deleted from the WorkflowDriver, depending on the runtime argument dataset.*.keep.local as mentioned in (5).

REST API Changes

 

...

 

 

 

...

  1. Datasets local to the Workflow should not be listed in the list dataset API.

  2. API to list the local datasets if they are available.

    Code Block
    GET <base-url>/namespaces/{namespace}/apps/{app-id}/workflows/{workflow-id}/runs/{run-id}/localdatasets
  3. We will need a way to delete the transient datasets created for a particular Workflow run if user set the dataset.*.keep.local for that run.

    Code Block
    DELETE <base-url>/namespaces/{namespace}/apps/{app-id}/workflows/{workflow-id}/runs/{run-id}/localdatasets

UI Changes

  1. For every Workflow run, we will need to add the new tab for listing the local datasets.
  2. These datasets if marked explorable, should be explorable from the UI.