Note |
---|
Important: MapReduce is deprecated and will be removed in CDAP 7.0.0. |
A MapReduce program is used to process data in batch. MapReduce can be written as in a conventional Hadoop system. Additionally, CDAP datasets can be accessed from MapReduce as both input and output.
...
Code Block |
---|
public static class PurchaseMapper extends Mapper<byte[], Purchase, Text, Text> { private Metrics mapMetrics; @Override public void map(byte[] key, Purchase purchase, Context context) throws IOException, InterruptedException { String user = purchase.getCustomer(); if (purchase.getPrice() > 100000) { mapMetrics.count("purchases.large", 1); } context.write(new Text(user), new Text(new Gson().toJson(purchase))); } } public static class PerUserReducer extends Reducer<Text, Text, String, PurchaseHistory> { @UseDataSet("frequentCustomers") private KeyValueTable frequentCustomers; private Metrics reduceMetrics; public void reduce(Text customer, Iterable<Text> values, Context context) throws IOException, InterruptedException { PurchaseHistory purchases = new PurchaseHistory(customer.toString()); int numPurchases = 0; for (Text val : values) { purchases.add(new Gson().fromJson(val.toString(), Purchase.class)); numPurchases++; } if (numPurchases == 1) { reduceMetrics.count("customers.rare", 1); } else if (numPurchases > 10) { reduceMetrics.count("customers.frequent", 1); frequentCustomers.write(customer.toString(), String.valueOf(numPurchases)); } context.write(customer.toString(), purchases); } } |
MapReduce and Datasets
Both a CDAP Mapper
and Reducer
can directly read or write to a dataset, using one of the following options. (Note that the second and third options can be used for a Partitioner
or Comparator
, if configured on the MapReduce job.)
...
A MapReduce program can interact with a dataset by using it as an input or an output. The dataset needs to implement specific interfaces to support this, as described in the following sections.
A Dataset as the Input Source of a MapReduce Program
When you run a MapReduce program, you can configure it to read its input from a dataset. The source dataset must implement the BatchReadable
interface, which requires two methods:
...
Code Block |
---|
@UseDataSet("myTable") KeyValueTable kvTable; ... @Override public void initialize() throws Exception { MapReduceContext context = getContext(); ... context.addInput(Input.ofDataset("myTable", kvTable.getSplits(16, startKey, stopKey))); } |
A Dataset as the Output Destination of a MapReduce Program
Just as you have the option to read input from a dataset, you have the option to write to a dataset as the output destination of a MapReduce program if that dataset implements the BatchWritable
interface:
...
The write()
method is used to redirect all writes performed by a Reducer to the dataset. Again, the KEY
and VALUE
type parameters must match the output key and value type parameters of the Reducer.
Multiple Output Destinations of a MapReduce Program
To write to multiple output datasets from a MapReduce program, begin by adding the datasets as outputs:
...
Note that the multiple output write method, MapReduceTaskContext.write(String, KEY key, VALUE value)
, can only be used if there are multiple outputs. Similarly, the single output write method, MapReduceTaskContext.write(KEY key, VALUE value)
, can only be used if there is a single output to the MapReduce program.
MapReduce and Transactions
When you run a MapReduce that interacts with datasets, the system creates a long-running transaction. Similar to the transaction of a service handler, here are some rules to follow:
...
See Transaction System for additional information.
MapReduce and Resources
Both the YARN container size and the number of virtual cores used in a MapReduce job can be specified as part of the configuration. They can also be set at runtime through the use of runtime arguments.
...