Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
/**
 * Batch Sink that writes to a FileSet in text format.
 * Each record will be written as a single line, with record fields separated by a configurable separator.
 *
 * StructuredRecord is the first parameter because that is the input to the sink.
 * The second and third parameters are the key and value expected by Hadoop's {@link TextOutputFormat}.
 */
@Plugin(type = BatchSink.PLUGIN_TYPE)
@Name(TextFileSetSink.NAME)
@Description("Writes to a FileSet in text format.")
public class TextFileSetSink extends BatchSink<StructuredRecord, NullWritable, Text> {
  public static final String NAME = "TextFileSet";
  private final Conf config;

  /**
   * Config properties for the plugin.
   */
  public static class Conf extends PluginConfig {
    public static final String FILESET_NAME = "fileSetName";
    public static final String OUTPUT_DIR = "outputDir";
    public static final String FIELD_SEPARATOR = "fieldSeparator";

    // The name annotation tells CDAP what the property name is. It is optional, and defaults to the variable name.
    // Note:  only primitives (including boxed types) and string are the types that are supported
    @Name(FILESET_NAME)
    @Description("The name of the FileSet to write to.")
    private String fileSetName;

    // Macro enabled properties can be set to a placeholder value ${key} when the pipeline is deployed.
    // At runtime, the value for 'key' can be given and substituted in.
    @Macro
    @Name(OUTPUT_DIR)
    @Description("The FileSet directory to write to.")
    private String outputDir;

    @Nullable
    @Name(FIELD_SEPARATOR)
    @Description("The separator to use to join input record fields together. Defaults to ','.")
    private String fieldSeparator;

    // Use a no-args constructor to set field defaults.
    public Conf() {
      fileSetName = "";
      fieldSeparator = ",";
    }
  }

  // CDAP will pass in a config with its fields populated based on the configuration given when creating the pipeline.
  public TextFileSetSink(Conf config) {
    this.config = config;
  }

  // configurePipeline is called exactly once when the pipeline is being created.
  // Any static configuration should be performed here.
  @Override
  public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
    // create the FileSet here.
    pipelineConfigurer.createDataset(config.fileSetName,
                                     FileSet.class,
                                     FileSetProperties.builder()
                                       .setInputFormat(TextInputFormat.class)
                                       .setOutputFormat(TextOutputFormat.class)
                                       .setEnableExploreOnCreate(true)
                                       .setExploreFormat("text")
                                       .setExploreSchema("text string")
                                       .build()
    );
  }

  // prepareRun is called before every pipeline run, and is used to configure what the input should be,
  // as well as any arguments the input should use. It is called by the client that is submitting the batch job.
  @Override
  public void prepareRun(BatchSinkContext context) throws Exception {
    Map<String, String> arguments = new HashMap<>();
    FileSetArguments.setOutputPath(arguments, config.outputDir);
    context.addOutput(Output.ofDataset(config.fileSetName, arguments));
  }

  // onRunFinish is called at the end of the pipeline run by the client that submitted the batch job.
  @Override
  public void onRunFinish(boolean succeeded, BatchSinkContext context) {
    // perform any actions that should happen at the end of the run.
  }

  // initialize is called by each job executor before any call to transform is made.
  // This occurs at the start of the batch job run, after the job has been successfully submitted.
  // For example, if mapreduce is the execution engine, each mapper will call initialize at the start of the program.
  @Override
  public void initialize(BatchRuntimeContext context) throws Exception {
    super.initialize(context);
    // create any resources required by transform()
  }

  // destroy is called by each job executor at the end of its life.
  // For example, if mapreduce is the execution engine, each mapper will call destroy at the end of the program.
  @Override
  public void destroy() {
    // clean up any resources created by initialize
  }

  @Override
  public void transform(StructuredRecord input, Emitter<KeyValue<NullWritable, Text>> emitter) throws Exception {
    StringBuilder joinedFields = new StringBuilder();
    Iterator<Schema.Field> fieldIter = input.getSchema().getFields().iterator();
    if (!fieldIter.hasNext()) {
      // shouldn't happen
      return;
    }

    Object val = input.get(fieldIter.next().getName());
    if (val != null) {
      joinedFields.append(val);
    }
    while (fieldIter.hasNext()) {
      String fieldName = fieldIter.next().getName();
      joinedFields.append(config.fieldSeparator);
      val = input.get(fieldName);
      if (val != null) {
        joinedFields.append(val);
      }
    }
    emitter.emit(new KeyValue<>(NullWritable.get(), new Text(joinedFields.toString())));
  }

}

Lineage

For plugins that write data to non-CDAP sinks, the lineage is registered using the outputName provided when addOutput() is invoked on BatchSinkContext in prepareRun(). Note that the outputName should be a valid DatasetId. For example:

...

In order to implement a Batch Aggregator, you extend the BatchAggregator class. Unlike a Transform, which operates on a single record at a time, a BatchAggregator operates on a collection of records.

Methods

  • configurePipeline(): Used to perform any validation on the application configuration that is required by this plugin or to create any streams or datasets if the fieldName for a stream or dataset is not a macro.

  • initialize(): Initialize the Batch Aggregator. Guaranteed to be executed before any call to the plugin’s groupBy or aggregate methods. This is called by each executor of the job. For example, if the MapReduce engine is being used, each mapper will call this method.

  • destroy(): Destroy any resources created by initialize. Guaranteed to be executed after all calls to the plugin’s groupBy or aggregate methods have been made. This is called by each executor of the job. For example, if the MapReduce engine is being used, each mapper will call this method.

  • groupBy(): This method will be called for every object that is received from the previous stage. This method returns zero or more group keys for each object it recievesreceives. Objects with the same group key will be grouped together for the aggregate method.

  • aggregate(): The method is called after every object has been assigned their group keys. This method is called once for each group key emitted by the groupBy method. The method recieves receives a group key as well as an iterator over all objects that had that group key. Objects emitted in this method are the output for this stage.

...

Code Block
/**
 * Aggregator that counts how many times each word appears in records input to the aggregator.
 */
@Plugin(type = BatchAggregator.PLUGIN_TYPE)
@Name(WordCountAggregator.NAME)
@Description("Counts how many times each word appears in all records input to the aggregator.")
public class WordCountAggregator extends BatchAggregator<String, StructuredRecord, StructuredRecord> {
  public static final String NAME = "WordCount";
  public static final Schema OUTPUT_SCHEMA = Schema.recordOf(
    "wordCount",
    Schema.Field.of("word", Schema.of(Schema.Type.STRING)),
    Schema.Field.of("count", Schema.of(Schema.Type.LONG))
  );
  private static final Pattern WHITESPACE = Pattern.compile("\\s");
  private final Conf config;

  /**
   * Config properties for the plugin.
   */
  public static class Conf extends PluginConfig {
    @Description("The field from the input records containing the words to count.")
    private String field;
  }

  public WordCountAggregator(Conf config) {
    this.config = config;
  }

  @Override
  public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
    // any static configuration validation should happen here.
    // We will check that the field is in the input schema and is of type string.
    Schema inputSchema = pipelineConfigurer.getStageConfigurer().getInputSchema();
    // a null input schema means its unknown until runtime, or its not constant
    if (inputSchema != null) {
      // if the input schema is constant and known at configure time, check that the input field exists and is a string.
      Schema.Field inputField = inputSchema.getField(config.field);
      if (inputField == null) {
        throw new IllegalArgumentException(
          String.format("Field '%s' does not exist in input schema %s.", config.field, inputSchema));
      }
      Schema fieldSchema = inputField.getSchema();
      Schema.Type fieldType = fieldSchema.isNullable() ? fieldSchema.getNonNullable().getType() : fieldSchema.getType();
      if (fieldType != Schema.Type.STRING) {
        throw new IllegalArgumentException(
          String.format("Field '%s' is of illegal type %s. Must be of type %s.",
                        config.field, fieldType, Schema.Type.STRING));
      }
    }
    // set the output schema so downstream stages will know their input schema.
    pipelineConfigurer.getStageConfigurer().setOutputSchema(OUTPUT_SCHEMA);
  }

  @Override
  public void groupBy(StructuredRecord input, Emitter<String> groupKeyEmitter) throws Exception {
    String val = input.get(config.field);
    if (val == null) {
      return;
    }

    for (String word : WHITESPACE.split(val)) {
      groupKeyEmitter.emit(word);
    }
  }

  @Override
  public void aggregate(String groupKey, Iterator<StructuredRecord> groupValues,
                        Emitter<StructuredRecord> emitter) throws Exception {
    long count = 0;
    while (groupValues.hasNext()) {
      groupValues.next();
      count++;
    }
    emitter.emit(StructuredRecord.builder(OUTPUT_SCHEMA).set("word", groupKey).set("count", count).build());
  }
}

Batch Joiner Plugin

BatchJoiner plugin is used to join records over a batch of data. It can be used in both batch and real-time data pipelines. A join takes place in two steps: a joinOn step followed by a merge step.

...

To implement a Batch Joiner, you extend the BatchJoiner class. Unlike a Transform, which operates on a single record at a time, a BatchJoiner operates on a collection of records.

Methods

  • configurePipeline(): Used to create any streams or datasets, or perform any validation on the application configuration that is required by this plugin.

  • initialize(): Initialize the Batch Joiner. Guaranteed to be executed before any call to the plugin’s joinOn or merge methods. This is called by each executor of the job. For example, if the MapReduce engine is being used, each mapper will call this method.

  • prepareRun(): Prepares a pipeline run. This is run every time before a pipeline runs to help set up the run. Here you can set properties such as the number of partitions to use when joining and the join key class, if it is not known at compile time.

  • destroy(): Destroy any resources created by the initialize method. Guaranteed to be executed after all calls to the plugin’s joinOn or merge methods have been made. This is called by each executor of the job. For example, if the MapReduce engine is being used, each mapper will call this method.

  • joinOn(): This method will be called for every object that is received from the previous stage. This method returns a join key for each object it recievesreceives. Objects with the same join key will be grouped together for the merge method.

  • getJoinConfig(): This method will be called by the CDAP Pipeline to find out the type of join to be performed. The config specifies which input stages are requiredInputs. Records from a required input will always be present in the merge() method. Records from a non-required input will only be present in the merge() method if they meet the join criteria. In other words, if there are no required inputs, a full outer join is performed. If all inputs are required inputs, an inner join is performed.

  • merge(): This method is called after each object has been assigned a join key. The method recieves receives a join key, an iterator over all objects with that join key, and the stage that emitted the object. Objects emitted by this method are the output for this stage.

...

In order to implement a Spark Compute Plugin, you extend the SparkCompute class.

Methods

  • configurePipeline(): Used to perform any validation on the application configuration that is required by this plugin or to create any streams or datasets if the fieldName for a stream or dataset is not a macro.

  • transform(): This method is given a Spark RDD (Resilient Distributed Dataset) containing every object that is received from the previous stage. This method then performs Spark operations on the input to transform it into an output RDD that will be sent to the next stage.

...

Code Block
/**
 * SparkCompute plugin that counts how many times each word appears in records input to the compute stage.
 */
@Plugin(type = SparkCompute.PLUGIN_TYPE)
@Name(WordCountCompute.NAME)
@Description("Counts how many times each word appears in all records input to the aggregator.")
public class WordCountCompute extends SparkCompute<StructuredRecord, StructuredRecord> {
  public static final String NAME = "WordCount";
  public static final Schema OUTPUT_SCHEMA = Schema.recordOf(
    "wordCount",
    Schema.Field.of("word", Schema.of(Schema.Type.STRING)),
    Schema.Field.of("count", Schema.of(Schema.Type.LONG))
  );
  private final Conf config;

  /**
   * Config properties for the plugin.
   */
  public static class Conf extends PluginConfig {
    @Description("The field from the input records containing the words to count.")
    private String field;
  }

  public WordCountCompute(Conf config) {
    this.config = config;
  }

  @Override
  public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
    // any static configuration validation should happen here.
    // We will check that the field is in the input schema and is of type string.
    Schema inputSchema = pipelineConfigurer.getStageConfigurer().getInputSchema();
    if (inputSchema != null) {
      WordCount wordCount = new WordCount(config.field);
      wordCount.validateSchema(inputSchema);
    }
    // set the output schema so downstream stages will know their input schema.
    pipelineConfigurer.getStageConfigurer().setOutputSchema(OUTPUT_SCHEMA);
  }

  @Override
  public JavaRDD<StructuredRecord> transform(SparkExecutionPluginContext sparkExecutionPluginContext,
                                             JavaRDD<StructuredRecord> javaRDD) throws Exception {
    WordCount wordCount = new WordCount(config.field);
    return wordCount.countWords(javaRDD)
      .flatMap(new FlatMapFunction<Tuple2<String, Long>, StructuredRecord>() {
        @Override
        public Iterable<StructuredRecord> call(Tuple2<String, Long> stringLongTuple2) throws Exception {
          List<StructuredRecord> output = new ArrayList<>();
          output.add(StructuredRecord.builder(OUTPUT_SCHEMA)
                       .set("word", stringLongTuple2._1())
                       .set("count", stringLongTuple2._2())
                       .build());
          return output;
        }
      });
  }
}

Spark Sink Plugin

SparkSink plugin is used to perform computations on a collection of input records and optionally write output data. It can only be used in batch data pipelines. A SparkSink is similar to a SparkCompute plugin except that it has no output. In a SparkSink, you are given access to anything you would be able to do in a Spark program. For example, one common use case is to train a machine-learning model in this plugin.

In order to implement a Spark Sink Plugin, you extend the SparkSink class.

Methods

  • configurePipeline(): Used to perform any validation on the application configuration that is required by this plugin or to create any streams or datasets if the fieldName for a stream or dataset is not a macro.

  • run(): This method is given a Spark RDD (Resilient Distributed Dataset) containing every object that is received from the previous stage. This Then this method then performs Spark operations on the input, and usually saves the result to a dataset.

...

Code Block
/**
 * SparkSink plugin that counts how many times each word appears in records input to it and stores the result in
 * a KeyValueTable.
 */
@Plugin(type = SparkSink.PLUGIN_TYPE)
@Name(WordCountSink.NAME)
@Description("Counts how many times each word appears in all records input to the aggregator.")
public class WordCountSink extends SparkSink<StructuredRecord> {
  public static final String NAME = "WordCount";
  private final Conf config;

  /**
   * Config properties for the plugin.
   */
  public static class Conf extends PluginConfig {
    @Description("The field from the input records containing the words to count.")
    private String field;

    @Description("The name of the KeyValueTable to write to.")
    private String tableName;
  }

  public WordCountSink(Conf config) {
    this.config = config;
  }

  @Override
  public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
    // any static configuration validation should happen here.
    // We will check that the field is in the input schema and is of type string.
    Schema inputSchema = pipelineConfigurer.getStageConfigurer().getInputSchema();
    if (inputSchema != null) {
      WordCount wordCount = new WordCount(config.field);
      wordCount.validateSchema(inputSchema);
    }
    pipelineConfigurer.createDataset(config.tableName, KeyValueTable.class, DatasetProperties.EMPTY);
  }

  @Override
  public void prepareRun(SparkPluginContext sparkPluginContext) throws Exception {
    // no-op
  }

  @Override
  public void run(SparkExecutionPluginContext sparkExecutionPluginContext,
                  JavaRDD<StructuredRecord> javaRDD) throws Exception {
    WordCount wordCount = new WordCount(config.field);
    JavaPairRDD outputRDD = wordCount.countWords(javaRDD)
      .mapToPair(new PairFunction<Tuple2<String, Long>, byte[], byte[]>() {
        @Override
        public Tuple2<byte[], byte[]> call(Tuple2<String, Long> stringLongTuple2) throws Exception {
          return new Tuple2<>(Bytes.toBytes(stringLongTuple2._1()), Bytes.toBytes(stringLongTuple2._2()));
        }
      });
    sparkExecutionPluginContext.saveAsDataset(outputRDD, config.tableName);
  }
}

Streaming Source Plugin

A Streaming Source plugin is used as a source in real-time data pipelines. It is used to fetch a Spark DStream, which is an object that represents a collection of Spark RDDs and that produces a new RDD every batch interval of the pipeline.

In order to implement a Streaming Source Plugin, you extend the StreamingSource class.

Methods

  • configurePipeline(): Used to perform any validation on the application configuration that is required by this plugin or to create any streams or datasets if the fieldName for a stream or dataset is not a macro.

  • getStream(): Returns the JavaDStream that will be used as a source in the pipeline.

...

Code Block
@Plugin(type = StreamingSource.PLUGIN_TYPE)
@Name("Twitter")
@Description("Twitter streaming source.")
public class TwitterStreamingSource extends StreamingSource<StructuredRecord> {
  private final TwitterStreamingConfig config;

  /**
   * Config class for TwitterStreamingSource.
   */
  public static class TwitterStreamingConfig extends PluginConfig implements Serializable {
    private static final long serialVersionUID = 4218063781909515444L;

    private String consumerKey;

    private String consumerSecret;

    private String accessToken;

    private String accessTokenSecret;

    private String referenceName;
  }

  public TwitterStreamingSource(TwitterStreamingConfig config) {
    this.config = config;
  }

  @Override
  public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
    pipelineConfigurer.getStageConfigurer().setOutputSchema(TwitterConstants.SCHEMA);
  }

  @Override
  public JavaDStream<StructuredRecord> getStream(StreamingContext context) throws Exception {
    JavaStreamingContext javaStreamingContext = context.getSparkStreamingContext();
    // lineage for this source will be tracked with this reference name
    context.registerLineage(config.referenceName);

    // Create authorization from user-provided properties
    ConfigurationBuilder configurationBuilder = new ConfigurationBuilder();
    configurationBuilder.setDebugEnabled(false)
      .setOAuthConsumerKey(config.consumerKey)
      .setOAuthConsumerSecret(config.consumerSecret)
      .setOAuthAccessToken(config.accessToken)
      .setOAuthAccessTokenSecret(config.accessTokenSecret);
    Authorization authorization = new OAuthAuthorization(configurationBuilder.build());

    return TwitterUtils.createStream(javaStreamingContext, authorization).map(
      new Function<Status, StructuredRecord>() {
        public StructuredRecord call(Status status) {
          return convertTweet(status);
        }
      }
    );
  }

  private StructuredRecord convertTweet(Status tweet) {
    // logic to convert a Twitter Status into a CDAP StructuredRecord
  }

}

Lineage

The lineage is registered using the referenceName provided when invoking registerLineage() on StreamingContext in getStream(). Note that the referenceName should be a valid DatasetId.

...

For example, consider a pipeline with a batchInterval of 10 seconds. The pipeline uses a windower that has a size of 60 and a slide interval of 30. The input into the windower will be micro batches containing 10 seconds of data. Every 30 seconds, the windower will output a batch of data containing the past 60 seconds of data, meaning the previous six micro batches that it received as input.

...

In order to implement a Windower Plugin, you extend the Windower class.

Methods

  • configurePipeline(): Used to perform any validation on the application configuration that is required by this plugin or to create any streams or datasets if the fieldName for a stream or dataset is not a macro.

  • getWidth(): Return the width in seconds of windows created by this plugin. Must be a multiple of the batchInterval of the pipeline.

  • getSlideInterval(): Get the slide interval in seconds of windows created by this plugin. Must be a multiple of the batchInterval of the pipeline.

...