...
Code Block |
---|
public static class PurchaseMapper extends Mapper<byte[], Purchase, Text, Text> { private Metrics mapMetrics; @Override public void map(byte[] key, Purchase purchase, Context context) throws IOException, InterruptedException { String user = purchase.getCustomer(); if (purchase.getPrice() > 100000) { mapMetrics.count("purchases.large", 1); } context.write(new Text(user), new Text(new Gson().toJson(purchase))); } } public static class PerUserReducer extends Reducer<Text, Text, String, PurchaseHistory> { @UseDataSet("frequentCustomers") private KeyValueTable frequentCustomers; private Metrics reduceMetrics; public void reduce(Text customer, Iterable<Text> values, Context context) throws IOException, InterruptedException { PurchaseHistory purchases = new PurchaseHistory(customer.toString()); int numPurchases = 0; for (Text val : values) { purchases.add(new Gson().fromJson(val.toString(), Purchase.class)); numPurchases++; } if (numPurchases == 1) { reduceMetrics.count("customers.rare", 1); } else if (numPurchases > 10) { reduceMetrics.count("customers.frequent", 1); frequentCustomers.write(customer.toString(), String.valueOf(numPurchases)); } context.write(customer.toString(), purchases); } } |
MapReduce and Datasets
Both a CDAP Mapper
and Reducer
can directly read or write to a dataset, using one of the following options. (Note that the second and third options can be used for a Partitioner
or Comparator
, if configured on the MapReduce job.)
...
Note that the multiple output write method—MapReduceTaskContext.write(String, KEY key, VALUE value)
—can only be used if there are multiple outputs. Similarly, the single output write method—MapReduceTaskContext.write(KEY key, VALUE value)
—can only be used if there is a single output to the MapReduce program.
MapReduce and Transactions
When you run a MapReduce that interacts with datasets, the system creates a long-running transaction. Similar to the transaction of a service handler, here are some rules to follow:
...
See Transaction System for additional information.
MapReduce and Resources
Both the YARN container size and the number of virtual cores used in a MapReduce job can be specified as part of the configuration. They can also be set at runtime through the use of runtime arguments.
The Resources API, if called with two arguments, sets both the memory used in megabytes and the number of virtual cores used.
MapReduce Program Examples
For an example, the how-to guide Batch Data Processing with CDAP demonstrates the use of MapReduce.