Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Inject the dataset into the mapper or reducer that uses it. This method is useful if the name of the dataset is constant or known at compile time. For example, to have access to a dataset named catalog:

    Code Block
    public static class CatalogJoinMapper extends Mapper<byte[], Purchase, ...> {
    
      @UseDataSet("catalog")
      private ProductCatalog catalog;
    
      @Override
      public void map(byte[] key, Purchase purchase, Context context)
          throws IOException, InterruptedException {
        // join with catalog by product ID
        Product product = catalog.read(purchase.getProductId());
        ...
      }
  2. Acquire the dataset in the mapper's or reducer's initialize() method. As opposed to the previous method, this does not require the dataset name to be constant; it only needs to be known at the time the task starts (for example, through configuration). Note that this requires that the mapper or reducer class implements the ProgramLifecycle interface, which includes the two methods initialize() and destroy():

    Code Block
    public static class CatalogJoinMapper extends Mapper<byte[], Purchase, ...>
      implements ProgramLifecycle<MapReduceTaskContext> {
    
      private ProductCatalog catalog;
    
      @Override
      public void initialize(MapReduceTaskContext mapReduceTaskContext) throws Exception {
        catalog = mapReduceTaskContext.getDataset(
            mapReduceTaskContext.getRuntimeArguments().get("catalog.table.name"));
      }
    
      @Override
      public void destroy() {
      }
    
      @Override
      public void map(byte[] key, Purchase purchase, Context context)
          throws IOException, InterruptedException {
        // join with catalog by product ID
        Product product = catalog.read(purchase.getProductId());
        ...
      }
  3. Dynamically acquire the dataset in the mapper every time it is accessed. This is useful if the name of the dataset is not known at initialization time; for example, if it depends on the data passed to each map() call. In this case, you also implement the ProgramLifecycle interface, to save the MapReduceTaskContext for use in the map() method. For example:

    Code Block
    public static class CatalogJoinMapper extends Mapper<byte[], Purchase, ...>
      implements ProgramLifecycle<MapReduceTaskContext> {
    
      private MapReduceTaskContext taskContext;
    
      @Override
      public void initialize(MapReduceTaskContext mapReduceTaskContext) throws Exception {
        taskContext = mapReduceTaskContext;
      }
    
      @Override
      public void destroy() {
      }
    
      @Override
      public void map(byte[] key, Purchase purchase, Context context)
          throws IOException, InterruptedException {
        // join with catalog by product ID
        String catalogName = determineCatalogName(purchase.getProductCategory());
        ProductCatalog catalog = taskContext.getDataset(catalogName);
        Product product = catalog.read(purchase.getProductId());
        ...
      }
    
      private String determineCatalogName(String productCategory) {
        ...
      }

See also the section on Using Datasets in Programs.

A MapReduce program can interact with a dataset by using it as an input or an as an input or an output. The dataset needs to implement specific interfaces to support this, as described in the following sections.

...