...
The PartitionedFileSet
dataset relieves applications from understanding file name conventions. Instead, it associates a partition key with a path. Because different paths cannot have the same partition key, this allows applications to address the file(s) at that path uniquely through their partition keys, or more broadly through conditions over the partition keys. For example, the months of February through June of a particular year, or the month of November in any year. By inheriting the attributes—such as format and schema—of FileSets, PartitionedFileSets are a powerful abstraction over data that is organized into files.
Creating a PartitionedFileSet
To create and use a PartitionedFileSet in an application, you create it as part of the application configuration, similar to FileSets. However, the partitioning has to be given as an additional property:
...
In order to make the PartitionedFileSet explorable, additional properties are needed, as described in “Exploring PartitionedFileSets”.
Reading and Writing PartitionedFileSets
You can interact with the files in a PartitionedFileSet directly through the Location
abstraction of the file system. This is similar to a FileSet, but instead of a relative path, you specify a partition key to obtain a Partition; you can then get a Location from that Partition.
...
Code Block |
---|
PartitionKey key = ... PartitionOutput output = dataset.getPartitionOutput(key); try { Location location = output.getLocation().append("part"); OutputStream outputStream = location.getOutputStream()); ... } catch (IOException e) { ... } output.addPartition(); |
Using PartitionedFileSets in MapReduce
A partitioned file set can be accessed in MapReduce in a similar fashion to a FileSet. The difference is that instead of input and output paths, you specify a partition filter for the input and a partition key for the output. For example, the MapReduce program of the SportResults example reads as input all partitions for the league given in its runtime arguments, and writes as output a partition with that league as the only key:
...
Code Block |
---|
{ "dataset.results.input.partition.filter.league.value": "nfl", "dataset.results.input.partition.filter.season.lower": "1980", "dataset.results.input.partition.filter.season.upper": "1990", "dataset.totals.output.partition.key.league" : "nfl" } |
Dynamic Partitioning of MapReduce Output
A MapReduce job can write to multiple partitions of a PartitionedFileSet using the DynamicPartitioner
class. To do so, define a class that implements DynamicPartitioner
. The core method to override is the getPartitionKey
method; it maps a record's key and value to a PartitionKey
, which defines which Partition
the record should be written to:
...
Likewise, CREATE_OR_OVERWRITE has the effect of overwriting any contents of any previously-existing partition.
Incrementally Processing PartitionedFileSets
Processing using MapReduce
One way to process a partitioned file set is with a repeatedly-running MapReduce program that, in each run, reads all partitions that have been added since its previous run. This requires that the MapReduce program persists between runs which partitions have already been consumed. An easy way is to use the PartitionBatchInput
, an experimental feature introduced in CDAP 3.3.0. Your MapReduce program is responsible for providing an implementation of DatasetStatePersistor
to persist and then read back its state. In this example, the state is persisted to a row in a KeyValue Table, using the convenience class KVTableStatePersistor
; however, other types of Datasets can also be used. In the initialize
method of the MapReduce, specify the partitioned file set to be used as input as well as the DatasetStatePersistor
to be used:
...
Code Block |
---|
@Override public void destroy() { boolean succeeded = getContext().getState().getStatus() == ProgramStatus.COMPLETED; partitionCommitter.onFinish(succeeded); } |
Processing using Other Programs
Partitions of a partitioned file set can also be incrementally processed from other program types using the generic PartitionConsumer
APIs. The implementation of these APIs that can be used from multiple instances of a program is ConcurrentPartitionConsumer
. To use, you simply need to provide the instance of the partitioned file set you want to consume from, along with a StatePersistor
, responsible for managing persistence of the consumer's state:
...
Code Block |
---|
// return only partitions, to process up to 500MB of data partitions = consumer.consumePartitions(new SizeLimitingAcceptor(500)); |
Exploring PartitionedFileSets
A partitioned file set can be explored with ad-hoc queries if you enable it at creation time:
...
You need to specify the SerDe, the input format, the output format, and any additional properties any of these may need as table properties. This is an experimental feature and only tested for Avro; see the FileSet Exploration for more details.
PartitionedFileSets and Transactions
A PartitionedFileSet is a hybrid of a non-transactional FileSet and a transactional Table that stores the partition metadata. As a consequence, operations that need access to the partition table (such as adding a partition or listing partitions) can only be performed in the context of a transaction, while operations that only require access to the FileSet (such as getPartitionOutput()
or getEmbeddedFileSet()
) can be performed without a transaction.
...