Note |
---|
Important: Apache Hadoop MapReduce is deprecated and will be removed not supported in CDAP versions 6.7.0 .0and later. It will be removed in a future release. |
A MapReduce program is used to process data in batch. MapReduce can be written as in a conventional Hadoop system. Additionally, CDAP datasets can be accessed from MapReduce as both input and output.
...
Both a CDAP Mapper
and Reducer
can directly read or write to a dataset, using one of the following options. (Note that the second and third options can be used for a Partitioner
or Comparator
, if configured on the MapReduce job.)
Inject the dataset into the mapper or reducer that uses it. This method is useful if the name of the dataset is constant or known at compile time. For example, to have access to a dataset named catalog:
Code Block public static class CatalogJoinMapper extends Mapper<byte[], Purchase, ...> { @UseDataSet("catalog") private ProductCatalog catalog; @Override public void map(byte[] key, Purchase purchase, Context context) throws IOException, InterruptedException { // join with catalog by product ID Product product = catalog.read(purchase.getProductId()); ... }
Acquire the dataset in the mapper's or reducer's
initialize()
method. As opposed to the previous method, this does not require the dataset name to be constant; it only needs to be known at the time the task starts (for example, through configuration). Note that this requires that the mapper or reducer class implements theProgramLifecycle
interface, which includes the two methodsinitialize()
anddestroy()
:Code Block public static class CatalogJoinMapper extends Mapper<byte[], Purchase, ...> implements ProgramLifecycle<MapReduceTaskContext> { private ProductCatalog catalog; @Override public void initialize(MapReduceTaskContext mapReduceTaskContext) throws Exception { catalog = mapReduceTaskContext.getDataset( mapReduceTaskContext.getRuntimeArguments().get("catalog.table.name")); } @Override public void destroy() { } @Override public void map(byte[] key, Purchase purchase, Context context) throws IOException, InterruptedException { // join with catalog by product ID Product product = catalog.read(purchase.getProductId()); ... }
Dynamically acquire the dataset in the mapper every time it is accessed. This is useful if the name of the dataset is not known at initialization time; for example, if it depends on the data passed to each
map()
call. In this case, you also implement theProgramLifecycle
interface, to save theMapReduceTaskContext
for use in themap()
method. For example:Code Block public static class CatalogJoinMapper extends Mapper<byte[], Purchase, ...> implements ProgramLifecycle<MapReduceTaskContext> { private MapReduceTaskContext taskContext; @Override public void initialize(MapReduceTaskContext mapReduceTaskContext) throws Exception { taskContext = mapReduceTaskContext; } @Override public void destroy() { } @Override public void map(byte[] key, Purchase purchase, Context context) throws IOException, InterruptedException { // join with catalog by product ID String catalogName = determineCatalogName(purchase.getProductCategory()); ProductCatalog catalog = taskContext.getDataset(catalogName); Product product = catalog.read(purchase.getProductId()); ... } private String determineCatalogName(String productCategory) { ... }
See also the section on Using Datasets in Programs.
...