Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Note

Important:  Apache Hadoop MapReduce is deprecated and will be removed not supported in CDAP versions 6.7.0 .0and later. It will be removed in a future release.

MapReduce program is used to process data in batch. MapReduce can be written as in a conventional Hadoop system. Additionally, CDAP datasets can be accessed from MapReduce as both input and output.

...

Both a CDAP Mapper and Reducer can directly read or write to a dataset, using one of the following options. (Note that the second and third options can be used for a Partitioner or Comparator, if configured on the MapReduce job.)

  1. Inject the dataset into the mapper or reducer that uses it. This method is useful if the name of the dataset is constant or known at compile time. For example, to have access to a dataset named catalog:

    Code Block
    public static class CatalogJoinMapper extends Mapper<byte[], Purchase, ...> {
    
      @UseDataSet("catalog")
      private ProductCatalog catalog;
    
      @Override
      public void map(byte[] key, Purchase purchase, Context context)
          throws IOException, InterruptedException {
        // join with catalog by product ID
        Product product = catalog.read(purchase.getProductId());
        ...
      }
  2. Acquire the dataset in the mapper's or reducer's initialize() method. As opposed to the previous method, this does not require the dataset name to be constant; it only needs to be known at the time the task starts (for example, through configuration). Note that this requires that the mapper or reducer class implements the ProgramLifecycle interface, which includes the two methods initialize() and destroy():

    Code Block
    public static class CatalogJoinMapper extends Mapper<byte[], Purchase, ...>
      implements ProgramLifecycle<MapReduceTaskContext> {
    
      private ProductCatalog catalog;
    
      @Override
      public void initialize(MapReduceTaskContext mapReduceTaskContext) throws Exception {
        catalog = mapReduceTaskContext.getDataset(
            mapReduceTaskContext.getRuntimeArguments().get("catalog.table.name"));
      }
    
      @Override
      public void destroy() {
      }
    
      @Override
      public void map(byte[] key, Purchase purchase, Context context)
          throws IOException, InterruptedException {
        // join with catalog by product ID
        Product product = catalog.read(purchase.getProductId());
        ...
      }
  3. Dynamically acquire the dataset in the mapper every time it is accessed. This is useful if the name of the dataset is not known at initialization time; for example, if it depends on the data passed to each map() call. In this case, you also implement the ProgramLifecycle interface, to save the MapReduceTaskContext for use in the map() method. For example:

    Code Block
    public static class CatalogJoinMapper extends Mapper<byte[], Purchase, ...>
      implements ProgramLifecycle<MapReduceTaskContext> {
    
      private MapReduceTaskContext taskContext;
    
      @Override
      public void initialize(MapReduceTaskContext mapReduceTaskContext) throws Exception {
        taskContext = mapReduceTaskContext;
      }
    
      @Override
      public void destroy() {
      }
    
      @Override
      public void map(byte[] key, Purchase purchase, Context context)
          throws IOException, InterruptedException {
        // join with catalog by product ID
        String catalogName = determineCatalogName(purchase.getProductCategory());
        ProductCatalog catalog = taskContext.getDataset(catalogName);
        Product product = catalog.read(purchase.getProductId());
        ...
      }
    
      private String determineCatalogName(String productCategory) {
        ...
      }

See also the section on Using Datasets in Programs.

...