Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The PartitionedFileSet dataset relieves applications from understanding file name conventions. Instead, it associates a partition key with a path. Because different paths cannot have the same partition key, this allows applications to address the file(s) at that path uniquely through their partition keys, or more broadly through conditions over the partition keys. For example, the months of February through June of a particular year, or the month of November in any year. By inheriting the attributes—such as format and schema—of FileSets, PartitionedFileSets are a powerful abstraction over data that is organized into files.

Creating a PartitionedFileSet

To create and use a PartitionedFileSet in an application, you create it as part of the application configuration, similar to FileSets. However, the partitioning has to be given as an additional property:

...

In order to make the PartitionedFileSet explorable, additional properties are needed, as described in “Exploring PartitionedFileSets”.

Reading and Writing PartitionedFileSets

You can interact with the files in a PartitionedFileSet directly through the Location abstraction of the file system. This is similar to a FileSet, but instead of a relative path, you specify a partition key to obtain a Partition; you can then get a Location from that Partition.

...

Code Block
PartitionKey key = ...
PartitionOutput output = dataset.getPartitionOutput(key);
try {
  Location location = output.getLocation().append("part");
  OutputStream outputStream = location.getOutputStream());
  ...
} catch (IOException e) {
  ...
}
output.addPartition();

Using PartitionedFileSets in MapReduce

A partitioned file set can be accessed in MapReduce in a similar fashion to a FileSet. The difference is that instead of input and output paths, you specify a partition filter for the input and a partition key for the output. For example, the MapReduce program of the SportResults example reads as input all partitions for the league given in its runtime arguments, and writes as output a partition with that league as the only key:

...

Code Block
{
  "dataset.results.input.partition.filter.league.value": "nfl",
  "dataset.results.input.partition.filter.season.lower": "1980",
  "dataset.results.input.partition.filter.season.upper": "1990",
  "dataset.totals.output.partition.key.league" : "nfl"
}

Dynamic Partitioning of MapReduce Output

A MapReduce job can write to multiple partitions of a PartitionedFileSet using the DynamicPartitioner class. To do so, define a class that implements DynamicPartitioner. The core method to override is the getPartitionKey method; it maps a record's key and value to a PartitionKey, which defines which Partition the record should be written to:

...

Likewise, CREATE_OR_OVERWRITE has the effect of overwriting any contents of any previously-existing partition.

Incrementally Processing PartitionedFileSets

Processing using MapReduce

One way to process a partitioned file set is with a repeatedly-running MapReduce program that, in each run, reads all partitions that have been added since its previous run. This requires that the MapReduce program persists between runs which partitions have already been consumed. An easy way is to use the PartitionBatchInput, an experimental feature introduced in CDAP 3.3.0. Your MapReduce program is responsible for providing an implementation of DatasetStatePersistor to persist and then read back its state. In this example, the state is persisted to a row in a KeyValue Table, using the convenience class KVTableStatePersistor; however, other types of Datasets can also be used. In the initialize method of the MapReduce, specify the partitioned file set to be used as input as well as the DatasetStatePersistor to be used:

...

Code Block
@Override
public void destroy() {
  boolean succeeded = getContext().getState().getStatus() == ProgramStatus.COMPLETED;
  partitionCommitter.onFinish(succeeded);
}

Processing using Other Programs

Partitions of a partitioned file set can also be incrementally processed from other program types using the generic PartitionConsumer APIs. The implementation of these APIs that can be used from multiple instances of a program is ConcurrentPartitionConsumer. To use, you simply need to provide the instance of the partitioned file set you want to consume from, along with a StatePersistor, responsible for managing persistence of the consumer's state:

...

Code Block
// return only partitions, to process up to 500MB of data
partitions = consumer.consumePartitions(new SizeLimitingAcceptor(500));

Exploring PartitionedFileSets

A partitioned file set can be explored with ad-hoc queries if you enable it at creation time:

...

You need to specify the SerDe, the input format, the output format, and any additional properties any of these may need as table properties. This is an experimental feature and only tested for Avro; see the FileSet Exploration for more details.

PartitionedFileSets and Transactions

A PartitionedFileSet is a hybrid of a non-transactional FileSet and a transactional Table that stores the partition metadata. As a consequence, operations that need access to the partition table (such as adding a partition or listing partitions) can only be performed in the context of a transaction, while operations that only require access to the FileSet (such as getPartitionOutput() or getEmbeddedFileSet()) can be performed without a transaction.

...