...
Inject the dataset into the mapper or reducer that uses it. This method is useful if the name of the dataset is constant or known at compile time. For example, to have access to a dataset named catalog:
Code Block public static class CatalogJoinMapper extends Mapper<byte[], Purchase, ...> { @UseDataSet("catalog") private ProductCatalog catalog; @Override public void map(byte[] key, Purchase purchase, Context context) throws IOException, InterruptedException { // join with catalog by product ID Product product = catalog.read(purchase.getProductId()); ... }
Acquire the dataset in the mapper's or reducer's
initialize()
method. As opposed to the previous method, this does not require the dataset name to be constant; it only needs to be known at the time the task starts (for example, through configuration). Note that this requires that the mapper or reducer class implements theProgramLifecycle
interface, which includes the two methodsinitialize()
anddestroy()
:Code Block public static class CatalogJoinMapper extends Mapper<byte[], Purchase, ...> implements ProgramLifecycle<MapReduceTaskContext> { private ProductCatalog catalog; @Override public void initialize(MapReduceTaskContext mapReduceTaskContext) throws Exception { catalog = mapReduceTaskContext.getDataset( mapReduceTaskContext.getRuntimeArguments().get("catalog.table.name")); } @Override public void destroy() { } @Override public void map(byte[] key, Purchase purchase, Context context) throws IOException, InterruptedException { // join with catalog by product ID Product product = catalog.read(purchase.getProductId()); ... }
Dynamically acquire the dataset in the mapper every time it is accessed. This is useful if the name of the dataset is not known at initialization time; for example, if it depends on the data passed to each
map()
call. In this case, you also implement theProgramLifecycle
interface, to save theMapReduceTaskContext
for use in themap()
method. For example:Code Block public static class CatalogJoinMapper extends Mapper<byte[], Purchase, ...> implements ProgramLifecycle<MapReduceTaskContext> { private MapReduceTaskContext taskContext; @Override public void initialize(MapReduceTaskContext mapReduceTaskContext) throws Exception { taskContext = mapReduceTaskContext; } @Override public void destroy() { } @Override public void map(byte[] key, Purchase purchase, Context context) throws IOException, InterruptedException { // join with catalog by product ID String catalogName = determineCatalogName(purchase.getProductCategory()); ProductCatalog catalog = taskContext.getDataset(catalogName); Product product = catalog.read(purchase.getProductId()); ... } private String determineCatalogName(String productCategory) { ... }
See also the section on Using Datasets in Programs.
A MapReduce program can interact with a dataset by using it as an input or an as an input or an output. The dataset needs to implement specific interfaces to support this, as described in the following sections.
...