Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Note

Important: MapReduce is deprecated and will be removed in CDAP 7.0.0.

MapReduce program is used to process data in batch. MapReduce can be written as in a conventional Hadoop system. Additionally, CDAP datasets can be accessed from MapReduce as both input and output.

...

Code Block
public static class PurchaseMapper
    extends Mapper<byte[], Purchase, Text, Text> {

  private Metrics mapMetrics;

  @Override
  public void map(byte[] key, Purchase purchase, Context context)
    throws IOException, InterruptedException {
    String user = purchase.getCustomer();
    if (purchase.getPrice() > 100000) {
      mapMetrics.count("purchases.large", 1);
    }
    context.write(new Text(user), new Text(new Gson().toJson(purchase)));
  }
}

public static class PerUserReducer
    extends Reducer<Text, Text, String, PurchaseHistory> {

  @UseDataSet("frequentCustomers")
  private KeyValueTable frequentCustomers;
  private Metrics reduceMetrics;

  public void reduce(Text customer, Iterable<Text> values, Context context)
    throws IOException, InterruptedException {
    PurchaseHistory purchases = new PurchaseHistory(customer.toString());
    int numPurchases = 0;
    for (Text val : values) {
      purchases.add(new Gson().fromJson(val.toString(), Purchase.class));
      numPurchases++;
    }
    if (numPurchases == 1) {
      reduceMetrics.count("customers.rare", 1);
    } else if (numPurchases > 10) {
      reduceMetrics.count("customers.frequent", 1);
      frequentCustomers.write(customer.toString(), String.valueOf(numPurchases));
    }
    context.write(customer.toString(), purchases);
  }
}

MapReduce and Datasets

Both a CDAP Mapper and Reducer can directly read or write to a dataset, using one of the following options. (Note that the second and third options can be used for a Partitioner or Comparator, if configured on the MapReduce job.)

...

A MapReduce program can interact with a dataset by using it as an input or an output. The dataset needs to implement specific interfaces to support this, as described in the following sections.

A Dataset as the Input Source of a MapReduce Program

When you run a MapReduce program, you can configure it to read its input from a dataset. The source dataset must implement the BatchReadable interface, which requires two methods:

...

Code Block
@UseDataSet("myTable")
KeyValueTable kvTable;
...
@Override
public void initialize() throws Exception {
  MapReduceContext context = getContext();
  ...
  context.addInput(Input.ofDataset("myTable", kvTable.getSplits(16, startKey, stopKey)));
}

A Dataset as the Output Destination of a MapReduce Program

Just as you have the option to read input from a dataset, you have the option to write to a dataset as the output destination of a MapReduce program if that dataset implements the BatchWritable interface:

...

The write() method is used to redirect all writes performed by a Reducer to the dataset. Again, the KEY and VALUE type parameters must match the output key and value type parameters of the Reducer.

Multiple Output Destinations of a MapReduce Program

To write to multiple output datasets from a MapReduce program, begin by adding the datasets as outputs:

...

Note that the multiple output write method, MapReduceTaskContext.write(String, KEY key, VALUE value), can only be used if there are multiple outputs. Similarly, the single output write method, MapReduceTaskContext.write(KEY key, VALUE value), can only be used if there is a single output to the MapReduce program.

MapReduce and Transactions

When you run a MapReduce that interacts with datasets, the system creates a long-running transaction. Similar to the transaction of a service handler, here are some rules to follow:

...

See Transaction System for additional information.

MapReduce and Resources

Both the YARN container size and the number of virtual cores used in a MapReduce job can be specified as part of the configuration. They can also be set at runtime through the use of runtime arguments.

...