Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...


This PartitionConsumer would be a simple consumer (without persistence persistence) which calls PartitionFileSet's consumerPartitions method with a PartitionConsumerState and keeps track of the previously returned PartitionConsumerState, to pass in the next call. Users of this PartitionConsumer (for instance, a periodically scheduled workflow or a thread in a scheduler) can manage persistence of it, with this approach.

This is complicated if there are overlapping runs of the same consumer. For instance, if a workflow runs every minute, a second run can be started before the first is committed. The first run may not have yet saved its partition state and so the second run would consume partitions based off the same PartitionConsumerState that the first workflow ran with.


Example Usage:
Initial implementation will support a non-concurrent, periodically-running MapReduce job:

Code Block
languagejava
@Override
public void beforeSubmit(MapReduceContext context) throws Exception {
  ...
  // get the current state of the PartitionConsumer (user is currently responsible for storing state between runs)
  byte[] state = keyValueTable.read(STATE_KEY);
  PartitionConsumerState initialPartitionConsumerState;
  if (state == null) {
    initialPartitionConsumerState = PartitionConsumerState.FROM_BEGINNING;
  } else {
    initialPartitionConsumerState = PartitionConsumerState.fromBytes(state);
  }

  PartitionedFileSet results = context.getDataset("lines");
  PartitionConsumerResult partitionConsumerResult = results.consumePartitions(initialPartitionConsumerState);
  // keep track of the final state to store in onFinish
  finalPartitionConsumerState = partitionConsumerResult.getPartitionConsumerState();
 
  // set the dataset as the input, with the the specified partitions to process
  Map<String, String> inputArgs = Maps.newHashMap();
  PartitionedFileSetArguments.addPartitions(inputArgs, partitionConsumerResult.getPartitionIterator());
  PartitionedFileSet input = context.getDataset("lines", inputArgs);
  context.setInput("lines", input);
  ...
}

@Override
public void onFinish(boolean succeeded, MapReduceContext context) throws Exception {
  if (succeeded) {
    ...
    keyValueTable.write(STATE_KEY, finalPartitionConsumerState.toBytes());
  }
  super.onFinish(succeeded, context);
} 




Related JIRAs:

Jira Legacy
serverCask Community Issue Tracker
serverId45b48dee-c8d6-34f0-9990-e6367dc2fe4b
keyCDAP-2746

Jira Legacy
serverCask Community Issue Tracker
serverId45b48dee-c8d6-34f0-9990-e6367dc2fe4b
keyCDAP-2747