Multiple Inputs of MapReduce

Goals

  1. JIRA: CDAP-3980: Multiple input datasets for MapReduce.

Checklist

  • User stories documented (Ali)
  • User stories reviewed (Nitin)
  • Design documented (Ali)
  • Design reviewed (Albert/Terence/Andreas)
  • Feature merged (Ali)
  • Examples and guides (Ali)
  • Integration tests (Ali) 
  • Documentation for feature (Ali)
  • Blog post

Use Cases

  1. A developer wants to compute an aggregation (for instance, word count) across data that is stored in multiple datasets.
  2. Joining in MapReduce:
    A developer wants to load data from a 'customers' dataset which has the customer's details. The developer then wants to load a 'transactions' dataset which holds less information about the customer, but more about a particular transaction. The developer should be able to join the data of these two datasets. (see Use Case #2 on Cask Hydrator++).

User Stories

  1. A developer should be able to set multiple datasets as input to one MapReduce job.
    1. The datasets have the same type.
    2. The datasets have different types (this will require different Mapper classes). Note that the restriction here is that each of the Mappers must have the same output type (single Reducer class).
  2. A developer should be able to read from different partitions of a PartitionedFileSet (multiple time ranges of a TimePartitionedFileSet).
  3. A developer should be able to know which input they are processing data from, in their Mapper/Reducer.
  4. A developer should be able to use Cask Hydrator to set up multiple sources in their pipeline.
  5. A developer should be able to use Cask Hydrator to perform a join across two branches in their pipeline (See User Story #7 on Cask Hydrator++).


API Changes

 

There are several 'setInput' methods on MapReduceContext. These will be deprecated and replaced by the ones in the code snippet below. The setInput methods in AbstractMapReduce will also be deprecated.
Since the different inputs may have differing formats, a different Mapper class may be required for each input, and so our APIs will need to depend on Hadoop modules.

Note that our APIs are optimized for Datasets, and not files (User can not use the methods of FileInputformat on the job, with multiple inputs).

New Input class will be used to define the input to the MapReduce (currently, there are 7 'setInput' methods available on the MapReduceContext):

 

// Input class will encapsulate one of the following three things:
//   1. Dataset - name, args, splits
//   2. Stream - name, startTime, endTime, decoderType, bodyformatSpec
//   3. InputFormatProvider - name, inputFormatProvider class

// StreamBatchReadable will be deprecated.
// It will have the following APIs exposed to the user:

/**
 * Defines input to a program, such as MapReduce.
 */
public abstract class Input {
  /**
   * Returns an Input defined by a dataset.
   *
   * @param datasetName the name of the input dataset
   */
  public static Input ofDataset(String datasetName);

  /**
   * Returns an Input defined by a dataset.
   *
   * @param datasetName the name of the input dataset
   * @param arguments the arguments to use when instantiating the dataset
   */
  public static Input ofDataset(String datasetName, Map<String, String> arguments);

  /**
   * Returns an Input defined by a dataset.
   *
   * @param datasetName the name of the input dataset
   * @param splits the data selection splits
   */
  public static Input ofDataset(String datasetName, Iterable<Split> splits);

  /**
   * Returns an Input defined by a dataset.
   *
   * @param datasetName the name of the input dataset
   * @param arguments the arguments to use when instantiating the dataset
   * @param splits the data selection splits
   */
  public static Input ofDataset(String datasetName, Map<String, String> arguments, Iterable<Split> splits);

  /**
   * Returns an Input defined by an InputFormatProvider.
   *
   * @param inputName the name of the input
   */
  public static Input of(String inputName, InputFormatProvider inputFormatProvider);

  /**
   * Returns an Input defined by a stream.
   *
   * @param streamBatchReadable specifies the stream to be used as input
   */
  private static Input ofStream(StreamBatchReadable streamBatchReadable);

  /**
   * Returns an Input defined with the given stream name with all time range.
   *
   * @param streamName Name of the stream.
   */
  public static Input ofStream(String streamName);

  /**
   * Returns an Input defined by a stream with the given properties.
   *
   * @param streamName Name of the stream.
   * @param startTime Start timestamp in milliseconds.
   * @param endTime End timestamp in milliseconds.
   */
  public static Input ofStream(String streamName, long startTime, long endTime);

  /**
   * Returns an Input defined by a stream with the given properties.
   *
   * @param streamName Name of the stream
   * @param startTime Start timestamp in milliseconds (inclusive) of stream events provided to the job
   * @param endTime End timestamp in milliseconds (exclusive) of stream events provided to the job
   * @param decoderType The {@link StreamEventDecoder} class for decoding {@link StreamEvent}
   */
  public static Input ofStream(String streamName, long startTime,
                             long endTime, Class<? extends StreamEventDecoder> decoderType);

  /**
   * Returns an Input defined by a stream with the given properties.
   *
   * @param streamName Name of the stream
   * @param startTime Start timestamp in milliseconds (inclusive) of stream events provided to the job
   * @param endTime End timestamp in milliseconds (exclusive) of stream events provided to the job
   * @param bodyFormatSpec The {@link FormatSpecification} class for decoding {@link StreamEvent}
   */
  @Beta
  public static Input ofStream(String streamName, long startTime,
                               long endTime, FormatSpecification bodyFormatSpec);
}

 

// The new APIs on MapReduceContext will simply be:

/**
 * Updates the input configuration of this MapReduce job to use the specified {@link Input}.
 * @param input the input to be used
 */
void addInput(Input input);

/**
 * Updates the input configuration of this MapReduce job to use the specified {@link Input}.
 * @param input the input to be used
 * @param mapperCls the mapper class to be used for the input
 */
void addInput(Input input, Class<?> mapperCls);

Approach for CDAP-3980 (wip)

There already exists a MultipleInputs class in Hadoop, which supports MapReduce jobs that have multiple input paths with a different InputFormat and Mapper for each path.
Two downsides to this are:

  1. If user uses this functionality, their mapper class can no longer implement ProgramLifeCycle<MapReduceTaskContext> and expect initialize/destroy methods to be called.
  2. Datasets can not be used as the input with this implementation.

 

Summary of approach:

  1. The existing APIs exposed in the Hadoop Multiple Inputs class:

    public static void addInputPath(Job job, Path path, Class<? extends InputFormat> inputFormatClass);
    public static void addInputPath(Job job, Path path, Class<? extends InputFormat> inputFormatClass, Class<? extends Mapper> mapperClass);
    
  2. Similar to Hadoop MultipleInputs, we will have a DelegatingInputFormat that delegates the record-reading and getSplits to other input formats. Ours will also support Datasets, whereas the existing one in Hadoop libraries only supports paths.

Created in 2020 by Google Inc.