Note |
---|
Important: Apache Hadoop MapReduce is not supported in CDAP versions 6.7.0 and later. It will be removed in a future release. |
A MapReduce program is used to process data in batch. MapReduce can be written as in a conventional Hadoop system. Additionally, CDAP datasets can CDAP datasets can be accessed from MapReduce as both input and output.
...
or specify addWorkflow()
in your application and specify your MapReduce in the workflow definitionthe workflow definition:
Code Block |
---|
public void configure() { ... // Run a MapReduce on the acquired data using a workflow addWorkflow(new PurchaseHistoryWorkflow()); |
...
The configure method is similar to the one found in applications. It defines the name and description of the MapReduce program. You can also specify resources also specify resources (memory and virtual cores) used by the mappers and reducers.
The initialize()
method is invoked at runtime, before the MapReduce is executed. Through the getContext()
method you can obtain an instance of the MapReduceContext
. It allows you to specify datasets to to specify datasets to be used as input or output; it . It also provides you access to the actual Hadoop job configuration, as though you were running the MapReduce directly on Hadoop. For example, you can specify the input and output datasets, the mapper and reducer classes as well as the intermediate data format:
...
Code Block |
---|
public static class PurchaseMapper extends Mapper<byte[], Purchase, Text, Text> { private Metrics mapMetrics; @Override public void map(byte[] key, Purchase purchase, Context context) throws IOException, InterruptedException { String user = purchase.getCustomer(); if (purchase.getPrice() > 100000) { mapMetrics.count("purchases.large", 1); } context.write(new Text(user), new Text(new Gson().toJson(purchase))); } } public static class PerUserReducer extends Reducer<Text, Text, String, PurchaseHistory> { @UseDataSet("frequentCustomers") private KeyValueTable frequentCustomers; private Metrics reduceMetrics; public void reduce(Text customer, Iterable<Text> values, Context context) throws IOException, InterruptedException { PurchaseHistory purchases = new PurchaseHistory(customer.toString()); int numPurchases = 0; for (Text val : values) { purchases.add(new Gson().fromJson(val.toString(), Purchase.class)); numPurchases++; } if (numPurchases == 1) { reduceMetrics.count("customers.rare", 1); } else if (numPurchases > 10) { reduceMetrics.count("customers.frequent", 1); frequentCustomers.write(customer.toString(), String.valueOf(numPurchases)); } context.write(customer.toString(), purchases); } } |
MapReduce and Datasets
Both a CDAP Mapper
and Reducer
can directly read or write to a dataset, using one of the following options. (Note that the second and third options can be used for a Partitioner
or Comparator
, if configured on the MapReduce job.)
Inject the dataset into the mapper or reducer that uses it. This method is useful if the name of the dataset is constant or known at compile time. For example, to have access to a dataset named catalog:
Code Block public static class CatalogJoinMapper extends Mapper<byte[], Purchase, ...> { @UseDataSet("catalog") private ProductCatalog catalog; @Override public void map(byte[] key, Purchase purchase, Context context) throws IOException, InterruptedException { // join with catalog by product ID Product product = catalog.read(purchase.getProductId()); ... }
Acquire the dataset in the mapper's or reducer's
initialize()
method. As opposed to the previous method, this does not require the dataset name to be constant; it only needs to be known at the time the task starts (for example, through configuration). Note that this requires that the mapper or reducer class implements theProgramLifecycle
interface, which includes the two methodsinitialize()
anddestroy()
:Code Block public static class CatalogJoinMapper extends Mapper<byte[], Purchase, ...> implements ProgramLifecycle<MapReduceTaskContext> { private ProductCatalog catalog; @Override public void initialize(MapReduceTaskContext mapReduceTaskContext) throws Exception { catalog = mapReduceTaskContext.getDataset( mapReduceTaskContext.getRuntimeArguments().get("catalog.table.name")); } @Override public void destroy() { } @Override public void map(byte[] key, Purchase purchase, Context context) throws IOException, InterruptedException { // join with catalog by product ID Product product = catalog.read(purchase.getProductId()); ... }
Dynamically acquire the dataset in the mapper every time it is accessed. This is useful if the name of the dataset is not known at initialization time; for example, if it depends on the data passed to each
map()
call. In this case, you also implement theProgramLifecycle
interface, to save theMapReduceTaskContext
for use in themap()
method. For example:Code Block public static class CatalogJoinMapper extends Mapper<byte[], Purchase, ...> implements ProgramLifecycle<MapReduceTaskContext> { private MapReduceTaskContext taskContext; @Override public void initialize(MapReduceTaskContext mapReduceTaskContext) throws Exception { taskContext = mapReduceTaskContext; } @Override public void destroy() { } @Override public void map(byte[] key, Purchase purchase, Context context) throws IOException, InterruptedException { // join with catalog by product ID String catalogName = determineCatalogName(purchase.getProductCategory()); ProductCatalog catalog = taskContext.getDataset(catalogName); Product product = catalog.read(purchase.getProductId()); ... } private String determineCatalogName(String productCategory) { ... }
See also the section on Using Datasets in Programs.
A MapReduce program can interact with a dataset by using it as an input or an as an input or an output. The dataset needs to implement specific interfaces to support this, as described in the following sections.
A Dataset as the Input Source of a MapReduce Program
When you run a MapReduce program, you can configure it to read its input from a dataset. The source dataset must implement the BatchReadable
interface, which requires two methods:
...
Code Block |
---|
@UseDataSet("myTable") KeyValueTable kvTable; ... @Override public void initialize() throws Exception { MapReduceContext context = getContext(); ... context.addInput(Input.ofDataset("myTable", kvTable.getSplits(16, startKey, stopKey))); } |
A Dataset as the Output Destination of a MapReduce Program
Just as you have the option to read input from a dataset, you have the option to write to a dataset as the output destination of a MapReduce program if that dataset implements the BatchWritable
interface:
...
The write()
method is used to redirect all writes performed by a Reducer to the dataset. Again, the KEY
and VALUE
type parameters must match the output key and value type parameters of the Reducer.
Multiple Output Destinations of a MapReduce Program
To write to multiple output datasets from a MapReduce program, begin by adding the datasets as outputs:
...
Note that the multiple output write method—method, MapReduceTaskContext.write(String, KEY key, VALUE value)
—can , can only be used if there are multiple outputs. Similarly, the single output write method—method, MapReduceTaskContext.write(KEY key, VALUE value)
—can , can only be used if there is a single output to the MapReduce program.
MapReduce and Transactions
When you run a MapReduce that interacts with datasets, the system creates a long-running transaction. Similar to the transaction of a service handler, here are some rules to follow:
...
It's important to note that the MapReduce framework will reattempt a task (Mapper or Reducer) if it fails. If the task is writing to a dataset, the reattempt of the task will most likely repeat the writes that were have been already performed in the failed attempt. Therefore, it is highly advisable that all writes performed by MapReduce programs be idempotent.
If your Map Reduce Job performs speculative execution, a similar repetition of of writes can occur.
See Transaction System for additional information.
MapReduce and Resources
Both the YARN container size and the number of virtual cores used in a MapReduce job can be specified as part of the configuration. They can also be set at runtime through the use of runtime arguments.
The Resources API, if called with two arguments, sets both the memory used in megabytes and the number of virtual cores used.
MapReduce Program Examples
...
.