...
In above Workflow, the datasets errorRecords, customerPurchases, and productPurchases are non-transient datasets. They can be defined inside the application as -
Code Block language java public class WorkflowApplication extends AbstractApplication { ... // define non-transient datasets createDataset("errorRecords", KeyValueTable.class); createDataset("customerPurchases", KeyValueTable.class); createDataset("productPurchases", KeyValueTable.class); ... }
Since purchaseRecords is the dataset local to the Workflow, it can be defined inside the Workflow configurer method as -
Code Block language java public class PurchaseWorkflow extends AbstractWorkflow { @Override protected void configure() { setName("PurchaseWorkflow"); ... // create the Workflow local datasets here - createLocalDataset("PurchaseRecords", KeyValueTable.class); ... addMapReduce("PurchaseEventParser"); ... } }
- When the application is deployed, following datasets are created - errorRecords, customerPurchases, and productPurchases
- If MapReduce program PurchaseEventParser is ran by itself, outside the Workflow, it would fail, since the purchaseRecords is not defined in the application scope. Its user's responsibility to make sure that the dataset exists in the correct scope.
User can choose to not to delete the local dataset even after the Workflow run is complete by specifying the runtime argument.
Code Block // To keep the Workflow local purchaseRecords dataset even after the Workflow run is completed following runtime argument can be specified - dataset.purchaseRecords.keep.local=true // In order to keep all the local datasets even after the Workflow run is completed following runtime argument can be specified - dataset.*.keep.local=true
- Datasets configured as local to the Workflow can be stored in the Workflow specification. When Workflow run is started, these datasets can be instantiated. The name of the local dataset would be datasetName.<workflow_run_id>.
Mapping of the <datasetName, localDatasetName> would be passed to the dataset framework instance so that all calls to the dataset would be routed to the appropriate dataset instance. Possible approach is -
Code Block language java // Class to forward the calls to the delegate class ForwardingDatasetFramework implements DatasetFramework { private final Map<String, String> datasetNameMapping; private final DatasetFramework delegate; public ForwardingDatasetFramework(DatasetFramework datasetFramework, Map<String, String> datasetNameMapping) { this.delegate = datasetFramework; this.datasetNameMapping = datasetNameMapping; } // All required calls would be intercepted here. For e.g. @Override public <T extends Dataset> T getDataset( Id.DatasetInstance datasetInstanceId, Map<String, String> arguments, @Nullable ClassLoader classLoader) throws DatasetManagementException, IOException { if (datasetNameMapping.containsKey(datasetInstanceId.getId())) { datasetInstanceId = Id.DatasetInstance.from(datasetInstanceId.getNamespaceId(), datasetInstanceId.getId()); } return getDataset(datasetInstanceId, arguments, classLoader, null); } } // ForwardingDatasetFramework can then be created in WorkflowDriver (in order to use it for custom action and predicates), MapReduceProgramRunner, and SparkProgramRunner from injected DatasetFramework instance.
MapReduceProgramRunner will create the transient dataset instances with unique names. For example if the dataset name is purchaseRecords, its corresponding transient dataset could be created with name purchaseRecords.<mapreduce_runid>.MapReduceProgramRunner while creating the BasicMapReduceContext can pass on this dataset name mapping (for example in this case <purchaseRecords, purchaseRecords.xyzzz>) to it. All calls to the MapReduceContext.addOutput will be intercepted, so that if they are for the transient datasets, then the name of the dataset will be replaced by the transient dataset name.
Code Block language java class BasicMapReduceContext { // Map from dataset's actual name to the internal unique transient name private final Map<String, String> transientDatasetNameMapping; public BasicMapReduceContext(Program program, RunId runId, Arguments runtimeArguments, MapReduceSpecification spec, ... /* MapReduce program can have multiple transient datasets as output*/ Map<String/*dataset name*/, String/*transient dataset name*/> transientDatasetNameMapping) { ... // Store the transient dataset name mapping this.transientDatasetNameMapping = transientDatasetNameMapping; ... } @Override public void addOutput(String datasetName, Map<String, String> arguments) { String transientDatasetName = transientDatasetNameMapping.get(datasetName); // It is possible that the arguments here are scoped by the name of the dataset. // For example dataset.purchaseRecords.cache.seconds=30. // We will need to add the scoped parameters under the name of the new transient dataset // as dataset.purchaseRecords.<mapreduce_runid>.cache.seconds=30 addOutput(datasetName, new DatasetOutputFormatProvider(datasetName, arguments, getDataset(datasetName, arguments), MapReduceBatchWritableOutputFormat.class)); } @Override public void addOutput(String outputName, OutputFormatProvider outputFormatProvider) { String transientDatasetName = transientDatasetNameMapping.get(outputName); this.outputFormatProviders.put(transientDatasetName, outputFormatProvider); } }
- BasicMapReduceContext is created in MapperWrapper and ReducerWrapper as well, so we will need to pass the mapping there using the MapReduce job configuration
Similar change mentioned in the (7) would require in MapperWrapper and ReducerWrapper, since we create the instance of the datasetFramework over there as well.
- Once the Workflow run completes, the corresponding transient datasets can be deleted from the WorkflowDriver, depending on the runtime argument dataset.*.keep.local as mentioned in (5).