Action Plugin

An Action plugin runs arbitrary logic at the start or end of a batch data pipeline.

In order to implement an Action plugin, you extend the Action class. Only one method is required to be implemented: run()

Methods

Example:

/**
 * Action that moves files from one fileset into another, optionally filtering files that match a regex.
 */
@Plugin(type = Action.PLUGIN_TYPE)
@Name(FilesetMoveAction.NAME)
@Description("Action that moves files from one fileset into another, optionally filtering files that match a regex.")
public class FilesetMoveAction extends Action {
  public static final String NAME = "FilesetMove";
  private final Conf config;

  /**
   * Config properties for the plugin.
   */
  public static class Conf extends PluginConfig {
    public static final String SOURCE_FILESET = "sourceFileset";
    public static final String DEST_FILESET = "destinationFileset";
    public static final String FILTER_REGEX = "filterRegex";

    @Name(SOURCE_FILESET)
    @Description("The fileset to move files from.")
    private String sourceFileset;

    @Name(DEST_FILESET)
    @Description("The fileset to move files to.")
    private String destinationFileset;

    @Nullable
    @Name(FILTER_REGEX)
    @Description("Filter any files whose name matches this regex. Defaults to '^\\.', which will filter any files " +
      "that begin with a period.")
    private String filterRegex;

    // set defaults for properties in a no-argument constructor.
    public Conf() {
      filterRegex = "^\\.";
    }
  }

  public FilesetMoveAction(Conf config) {
    this.config = config;
  }

  @Override
  public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
    Pattern.compile(config.filterRegex);
  }

  @Override
  public void run(ActionContext context) throws Exception {
    context.execute(new TxRunnable() {
      @Override
      public void run(DatasetContext context) throws Exception {
        FileSet sourceFileSet = context.getDataset(config.sourceFileset);
        FileSet destinationFileSet = context.getDataset(config.destinationFileset);

        Pattern pattern = Pattern.compile(config.filterRegex);

        for (Location sourceFile : sourceFileSet.getBaseLocation().list()) {
          if (pattern.matcher(sourceFile.getName()).find()) {
            continue;
          }
          Location destFile = destinationFileSet.getBaseLocation().append(sourceFile.getName());
          sourceFile.renameTo(destFile);
        }
      }
    });
  }
}

Post-run Action Plugin

PostAction plugin runs arbitrary logic after the end of a pipeline run. It can be set to execute based on whether the run completed successfully, if it failed, or in either case.

The difference between a PostAction and an Action that is placed at the end of a pipeline is that a PostAction will always be executed even if the pipeline run fails, while an Action will only be executed if every stage preceding it successfully runs.

In order to implement a Post-run Action plugin, you extend the PostAction class. Only one method is required to be implemented: run()

Methods

Example:

/**
 * Post run action that deletes files in a FileSet that match a configurable regex.
 */
@Plugin(type = PostAction.PLUGIN_TYPE)
@Name(FilesetDeletePostAction.NAME)
@Description("Post run action that deletes files in a FileSet that match a configurable regex if the run succeeded.")
public class FilesetDeletePostAction extends PostAction {
  public static final String NAME = "FilesetDelete";
  private final Conf config;

  /**
   * Config properties for the plugin.
   */
  public static class Conf extends PluginConfig {
    public static final String FILESET_NAME = "filesetName";
    public static final String DELETE_REGEX = "deleteRegex";
    public static final String DIRECTORY = "directory";

    @Name(FILESET_NAME)
    @Description("The fileset to delete files from.")
    private String filesetName;

    @Name(DELETE_REGEX)
    @Description("Delete files that match this regex.")
    private String deleteRegex;

    // Macro enabled properties can be set to a placeholder value ${key} when the pipeline is deployed.
    // At runtime, the value for 'key' can be given and substituted in.
    @Macro
    @Name(DIRECTORY)
    @Description("The fileset directory to delete files from.")
    private String directory;
  }

  public FilesetDeletePostAction(Conf config) {
    this.config = config;
  }

  @Override
  public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
    Pattern.compile(config.deleteRegex);
  }

  @Override
  public void run(BatchActionContext context) throws Exception {
    if (!context.isSuccessful()) {
      return;
    }

    FileSet fileSet = context.getDataset(config.filesetName);
    Pattern pattern = Pattern.compile(config.deleteRegex);
    for (Location fileLocation : fileSet.getBaseLocation().append(config.directory).list()) {
      if (pattern.matcher(fileLocation.getName()).find()) {
        fileLocation.delete();
      }
    }
  }
}

Batch Source Plugin

BatchSource plugin is used as a source of a batch data pipeline. It is used to prepare and configure the input of a pipeline run.

In order to implement a Batch Source, you extend the BatchSource class. You need to define the types of the KEY and VALUE that the Batch Source will receive and the type of object that the Batch Source will emit to the subsequent stage (which could be either a Transformation or a Batch Sink). After defining the types, only one method is required to be implemented: prepareRun()

Methods

Example:

/**
 * Batch Source that reads from a FileSet that has its data formatted as text.
 *
 * LongWritable is the first parameter because that is the key used by Hadoop's {@link TextInputFormat}.
 * Similarly, Text is the second parameter because that is the value used by Hadoop's {@link TextInputFormat}.
 * {@link StructuredRecord} is the third parameter because that is what the source will output.
 * All the plugins included with Hydrator operate on StructuredRecord.
 */
@Plugin(type = BatchSource.PLUGIN_TYPE)
@Name(TextFileSetSource.NAME)
@Description("Reads from a FileSet that has its data formatted as text.")
public class TextFileSetSource extends BatchSource<LongWritable, Text, StructuredRecord> {
  public static final String NAME = "TextFileSet";
  public static final Schema OUTPUT_SCHEMA = Schema.recordOf(
    "textRecord",
    Schema.Field.of("position", Schema.of(Schema.Type.LONG)),
    Schema.Field.of("text", Schema.of(Schema.Type.STRING))
  );
  private final Conf config;

  /**
   * Config properties for the plugin.
   */
  public static class Conf extends PluginConfig {
    public static final String FILESET_NAME = "fileSetName";
    public static final String FILES = "files";
    public static final String CREATE_IF_NOT_EXISTS = "createIfNotExists";
    public static final String DELETE_INPUT_ON_SUCCESS = "deleteInputOnSuccess";

    // The name annotation tells CDAP what the property name is. It is optional, and defaults to the variable name.
    // Note:  only primitives (including boxed types) and string are the types that are supported
    @Name(FILESET_NAME)
    @Description("The name of the FileSet to read from.")
    private String fileSetName;

    // Macro enabled properties can be set to a placeholder value ${key} when the pipeline is deployed.
    // At runtime, the value for 'key' can be given and substituted in.
    @Macro
    @Name(FILES)
    @Description("A comma separated list of files in the FileSet to read.")
    private String files;

    // A nullable fields tells CDAP that this is an optional field.
    @Nullable
    @Name(CREATE_IF_NOT_EXISTS)
    @Description("Whether to create the FileSet if it doesn't already exist. Defaults to false.")
    private Boolean createIfNotExists;

    @Nullable
    @Name(DELETE_INPUT_ON_SUCCESS)
    @Description("Whether to delete the data read by the source after the run succeeds. Defaults to false.")
    private Boolean deleteInputOnSuccess;

    // Use a no-args constructor to set field defaults.
    public Conf() {
      createIfNotExists = false;
      deleteInputOnSuccess = false;
    }
  }

  // CDAP will pass in a config with its fields populated based on the configuration given when creating the pipeline.
  public TextFileSetSource(Conf config) {
    this.config = config;
  }

  // configurePipeline is called exactly once when the pipeline is being created.
  // Any static configuration should be performed here.
  @Override
  public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
    // if the user has set createIfNotExists to true, create the FileSet here.
    if (config.createIfNotExists) {
      pipelineConfigurer.createDataset(config.fileSetName,
                                       FileSet.class,
                                       FileSetProperties.builder()
                                         .setInputFormat(TextInputFormat.class)
                                         .setOutputFormat(TextOutputFormat.class)
                                         .setEnableExploreOnCreate(true)
                                         .setExploreFormat("text")
                                         .setExploreSchema("text string")
                                         .build()
      );
    }
    // set the output schema of this stage so that stages further down the pipeline will know their input schema.
    pipelineConfigurer.getStageConfigurer().setOutputSchema(OUTPUT_SCHEMA);
  }

  // prepareRun is called before every pipeline run, and is used to configure what the input should be,
  // as well as any arguments the input should use. It is called by the client that is submitting the batch job.
  @Override
  public void prepareRun(BatchSourceContext context) throws IOException {
    Map<String, String> arguments = new HashMap<>();
    FileSetArguments.setInputPaths(arguments, config.files);
    context.setInput(Input.ofDataset(config.fileSetName, arguments));
  }

  // onRunFinish is called at the end of the pipeline run by the client that submitted the batch job.
  @Override
  public void onRunFinish(boolean succeeded, BatchSourceContext context) {
    // perform any actions that should happen at the end of the run.
    // in our case, we want to delete the data read during this run if the run succeeded.
    if (succeeded && config.deleteInputOnSuccess) {
      Map<String, String> arguments = new HashMap<>();
      FileSetArguments.setInputPaths(arguments, config.files);
      FileSet fileSet = context.getDataset(config.fileSetName, arguments);
      for (Location inputLocation : fileSet.getInputLocations()) {
        try {
          inputLocation.delete(true);
        } catch (IOException e) {
          throw new RuntimeException(e);
        }
      }
    }
  }

  // initialize is called by each job executor before any call to transform is made.
  // This occurs at the start of the batch job run, after the job has been successfully submitted.
  // For example, if mapreduce is the execution engine, each mapper will call initialize at the start of the program.
  @Override
  public void initialize(BatchRuntimeContext context) throws Exception {
    super.initialize(context);
    // create any resources required by transform()
  }

  // destroy is called by each job executor at the end of its life.
  // For example, if mapreduce is the execution engine, each mapper will call destroy at the end of the program.
  @Override
  public void destroy() {
    // clean up any resources created by initialize
  }

  // transform is used to transform the key-value pair output by the input into objects output by this source.
  // The output should be a StructuredRecord if you want the source to be compatible with the plugins included
  // with Hydrator.
  @Override
  public void transform(KeyValue<LongWritable, Text> input, Emitter<StructuredRecord> emitter) throws Exception {
    emitter.emit(StructuredRecord.builder(OUTPUT_SCHEMA)
                   .set("position", input.getKey().get())
                   .set("text", input.getValue().toString())
                   .build()
    );
  }
}

Lineage

For plugins that fetch data from non-CDAP sources, the lineage is registered using the inputName provided when setInput() is invoked on BatchSourceContext in prepareRun(). Note that the inputName should be a valid DatasetId. For example:

@Override
public void prepareRun(BatchSourceContext context) throws Exception {
  ...
  context.setInput(Input.of("myExternalSource", myInputFormatProvider));
}

Lineage will be tracked using myExternalSource.

Batch Sink Plugin

BatchSink plugin is used to write data in either batch or real-time data pipelines. It is used to prepare and configure the output of a batch of data from a pipeline run.

In order to implement a Batch Sink, you extend the BatchSink class. Similar to a Batch Source, you need to define the types of the KEY and VALUE that the Batch Sink will write in the Batch job and the type of object that it will accept from the previous stage (which could be either a Transformation or a Batch Source).

After defining the types, only one method is required to be implemented: prepareRun()

Methods

Example:

/**
 * Batch Sink that writes to a FileSet in text format.
 * Each record will be written as a single line, with record fields separated by a configurable separator.
 *
 * StructuredRecord is the first parameter because that is the input to the sink.
 * The second and third parameters are the key and value expected by Hadoop's {@link TextOutputFormat}.
 */
@Plugin(type = BatchSink.PLUGIN_TYPE)
@Name(TextFileSetSink.NAME)
@Description("Writes to a FileSet in text format.")
public class TextFileSetSink extends BatchSink<StructuredRecord, NullWritable, Text> {
  public static final String NAME = "TextFileSet";
  private final Conf config;

  /**
   * Config properties for the plugin.
   */
  public static class Conf extends PluginConfig {
    public static final String FILESET_NAME = "fileSetName";
    public static final String OUTPUT_DIR = "outputDir";
    public static final String FIELD_SEPARATOR = "fieldSeparator";

    // The name annotation tells CDAP what the property name is. It is optional, and defaults to the variable name.
    // Note:  only primitives (including boxed types) and string are the types that are supported
    @Name(FILESET_NAME)
    @Description("The name of the FileSet to write to.")
    private String fileSetName;

    // Macro enabled properties can be set to a placeholder value ${key} when the pipeline is deployed.
    // At runtime, the value for 'key' can be given and substituted in.
    @Macro
    @Name(OUTPUT_DIR)
    @Description("The FileSet directory to write to.")
    private String outputDir;

    @Nullable
    @Name(FIELD_SEPARATOR)
    @Description("The separator to use to join input record fields together. Defaults to ','.")
    private String fieldSeparator;

    // Use a no-args constructor to set field defaults.
    public Conf() {
      fileSetName = "";
      fieldSeparator = ",";
    }
  }

  // CDAP will pass in a config with its fields populated based on the configuration given when creating the pipeline.
  public TextFileSetSink(Conf config) {
    this.config = config;
  }

  // configurePipeline is called exactly once when the pipeline is being created.
  // Any static configuration should be performed here.
  @Override
  public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
    // create the FileSet here.
    pipelineConfigurer.createDataset(config.fileSetName,
                                     FileSet.class,
                                     FileSetProperties.builder()
                                       .setInputFormat(TextInputFormat.class)
                                       .setOutputFormat(TextOutputFormat.class)
                                       .setEnableExploreOnCreate(true)
                                       .setExploreFormat("text")
                                       .setExploreSchema("text string")
                                       .build()
    );
  }

  // prepareRun is called before every pipeline run, and is used to configure what the input should be,
  // as well as any arguments the input should use. It is called by the client that is submitting the batch job.
  @Override
  public void prepareRun(BatchSinkContext context) throws Exception {
    Map<String, String> arguments = new HashMap<>();
    FileSetArguments.setOutputPath(arguments, config.outputDir);
    context.addOutput(Output.ofDataset(config.fileSetName, arguments));
  }

  // onRunFinish is called at the end of the pipeline run by the client that submitted the batch job.
  @Override
  public void onRunFinish(boolean succeeded, BatchSinkContext context) {
    // perform any actions that should happen at the end of the run.
  }

  // initialize is called by each job executor before any call to transform is made.
  // This occurs at the start of the batch job run, after the job has been successfully submitted.
  // For example, if mapreduce is the execution engine, each mapper will call initialize at the start of the program.
  @Override
  public void initialize(BatchRuntimeContext context) throws Exception {
    super.initialize(context);
    // create any resources required by transform()
  }

  // destroy is called by each job executor at the end of its life.
  // For example, if mapreduce is the execution engine, each mapper will call destroy at the end of the program.
  @Override
  public void destroy() {
    // clean up any resources created by initialize
  }

  @Override
  public void transform(StructuredRecord input, Emitter<KeyValue<NullWritable, Text>> emitter) throws Exception {
    StringBuilder joinedFields = new StringBuilder();
    Iterator<Schema.Field> fieldIter = input.getSchema().getFields().iterator();
    if (!fieldIter.hasNext()) {
      // shouldn't happen
      return;
    }

    Object val = input.get(fieldIter.next().getName());
    if (val != null) {
      joinedFields.append(val);
    }
    while (fieldIter.hasNext()) {
      String fieldName = fieldIter.next().getName();
      joinedFields.append(config.fieldSeparator);
      val = input.get(fieldName);
      if (val != null) {
        joinedFields.append(val);
      }
    }
    emitter.emit(new KeyValue<>(NullWritable.get(), new Text(joinedFields.toString())));
  }

}

Lineage

For plugins that write data to non-CDAP sinks, the lineage is registered using the outputName provided when addOutput() is invoked on BatchSinkContext in prepareRun(). Note that the outputName should be a valid DatasetId. For example:

@Override
public void prepareRun(BatchSinkContext context) throws Exception {
  ...
  context.addOutput(Output.of("myExternalSink", myOutputFormatProvider));
}

Lineage will be tracked using myExternalSink.

Transformation Plugin

Transform plugin is used to convert one input record into zero or more output records. It can be used in both batch and real-time data pipelines.

The only method that needs to be implemented is: transform()

Methods

Below is an example of a StringCase that transforms specific fields to lowercase or uppercase.

/**
 * Transform that can transforms specific fields to lowercase or uppercase.
 */
@Plugin(type = Transform.PLUGIN_TYPE)
@Name(StringCaseTransform.NAME)
@Description("Transforms configured fields to lowercase or uppercase.")
public class StringCaseTransform extends Transform<StructuredRecord, StructuredRecord> {
  public static final String NAME = "StringCase";
  private final Conf config;
  private Set<String> upperFields;
  private Set<String> lowerFields;

  /**
   * Config properties for the plugin.
   */
  public static class Conf extends PluginConfig {
    public static final String UPPER_FIELDS = "upperFields";
    public static final String LOWER_FIELDS = "lowerFields";
    private static final Pattern SPLIT_ON = Pattern.compile("\\s*,\\s*");

    // nullable means this property is optional
    @Nullable
    @Name(UPPER_FIELDS)
    @Description("A comma separated list of fields to uppercase. Each field must be of type String.")
    private String upperFields;

    @Nullable
    @Name(LOWER_FIELDS)
    @Description("A comma separated list of fields to lowercase. Each field must be of type String.")
    private String lowerFields;

    private Set<String> getUpperFields() {
      return parseToSet(upperFields);
    }

    private Set<String> getLowerFields() {
      return parseToSet(lowerFields);
    }

    private Set<String> parseToSet(String str) {
      Set<String> set = new HashSet<>();
      if (str == null || str.isEmpty()) {
        return set;
      }
      for (String element : SPLIT_ON.split(str)) {
        set.add(element);
      }
      return set;
    }
  }

  public StringCaseTransform(Conf config) {
    this.config = config;
  }

  // configurePipeline is called only once, when the pipeline is deployed. Static validation should be done here.
  @Override
  public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
    StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();
    // the output schema is always the same as the input schema
    Schema inputSchema = stageConfigurer.getInputSchema();

    // if schema is null, that means it is either not known until runtime, or it is variable
    if (inputSchema != null) {
      // if the input schema is constant and known at configure time, check that all configured fields are strings
      for (String fieldName : config.getUpperFields()) {
        validateFieldIsString(inputSchema, fieldName);
      }
      for (String fieldName : config.getLowerFields()) {
        validateFieldIsString(inputSchema, fieldName);
      }
    }

    stageConfigurer.setOutputSchema(inputSchema);
  }

  // initialize is called once at the start of each pipeline run
  @Override
  public void initialize(TransformContext context) throws Exception {
    upperFields = config.getUpperFields();
    lowerFields = config.getLowerFields();
  }

  // transform is called once for each record that goes into this stage
  @Override
  public void transform(StructuredRecord record, Emitter<StructuredRecord> emitter) throws Exception {
    StructuredRecord.Builder builder = StructuredRecord.builder(record.getSchema());
    for (Schema.Field field : record.getSchema().getFields()) {
      String fieldName = field.getName();
      if (upperFields.contains(fieldName)) {
        builder.set(fieldName, record.get(fieldName).toString().toUpperCase());
      } else if (lowerFields.contains(fieldName)) {
        builder.set(fieldName, record.get(fieldName).toString().toLowerCase());
      } else {
        builder.set(fieldName, record.get(fieldName));
      }
    }
    emitter.emit(builder.build());
  }

  private void validateFieldIsString(Schema schema, String fieldName) {
    Schema.Field inputField = schema.getField(fieldName);
    if (inputField == null) {
      throw new IllegalArgumentException(
        String.format("Field '%s' does not exist in input schema %s.", fieldName, schema));
    }
    Schema fieldSchema = inputField.getSchema();
    Schema.Type fieldType = fieldSchema.isNullable() ? fieldSchema.getNonNullable().getType() : fieldSchema.getType();
    if (fieldType != Schema.Type.STRING) {
      throw new IllegalArgumentException(
        String.format("Field '%s' is of illegal type %s. Must be of type %s.",
                      fieldName, fieldType, Schema.Type.STRING));
    }
  }
}

If you wanted, you could add to the transform method a user metric indicating the number of fields changed. The user metrics can be queried by using the CDAP Metrics Microservices:

public void transform(StructuredRecord input, Emitter<StructuredRecord> emitter) throws Exception {
  int fieldsChanged = 0;
  . . .
    builder.set(fieldName, record.get(fieldName). . .
    fieldsChanged += 1;
  . . .
  getContext().getMetrics().count("fieldsChanged", fieldsChanged);
}

Error Transformation Plugin

An ErrorTransform plugin is a special type of Transform that consumes error records emitted from the previous stages instead of output records. It is used to transform an ErrorRecord to zero or more output records. In addition to the actual error object, an ErrorRecord exposes the stage the error was emitted from, an error code, and an error message. Errors can be emitted by BatchSourceTransform, and BatchAggregator plugins using the Emitter they receive. An ErrorTransform can be used in both batch and real-time data pipelines.

The only method that needs to be implemented is: transform()

Methods

Below is an example of an ErrorCollector that adds the error stage, code, and message to each record it receives.

/**
 * Adds the error code and error message to each record, then emits it.
 */
@Plugin(type = ErrorTransform.PLUGIN_TYPE)
@Name("ErrorCollector")
public class ErrorCollector extends ErrorTransform<StructuredRecord, StructuredRecord> {
  private final Config config;

  public ErrorCollector(Config config) {
    this.config = config;
  }

  @Override
  public void configurePipeline(PipelineConfigurer pipelineConfigurer) throws IllegalArgumentException {
    Schema inputSchema = pipelineConfigurer.getStageConfigurer().getInputSchema();
    if (inputSchema != null) {
      if (inputSchema.getField(config.messageField) != null) {
        throw new IllegalArgumentException(String.format(
          "Input schema already contains message field %s. Please set messageField to a different value.",
          config.messageField));
      }
      if (inputSchema.getField(config.codeField) != null) {
        throw new IllegalArgumentException(String.format(
          "Input schema already contains code field %s. Please set codeField to a different value.",
          config.codeField));
      }
      if (inputSchema.getField(config.stageField) != null) {
        throw new IllegalArgumentException(String.format(
          "Input schema already contains stage field %s. Please set stageField to a different value.",
          config.stageField));
      }
      Schema outputSchema = getOutputSchema(config, inputSchema);
      pipelineConfigurer.getStageConfigurer().setOutputSchema(outputSchema);
    }
  }

  @Override
  public void transform(ErrorRecord<StructuredRecord> input, Emitter<StructuredRecord> emitter) throws Exception {
    StructuredRecord invalidRecord = input.getRecord();
    StructuredRecord.Builder output = StructuredRecord.builder(getOutputSchema(config, invalidRecord.getSchema()));
    for (Schema.Field field : invalidRecord.getSchema().getFields()) {
      output.set(field.getName(), invalidRecord.get(field.getName()));
    }
    if (config.messageField != null) {
      output.set(config.messageField, input.getErrorMessage());
    }
    if (config.codeField != null) {
      output.set(config.codeField, input.getErrorCode());
    }
    if (config.stageField != null) {
      output.set(config.stageField, input.getStageName());
    }
    emitter.emit(output.build());
  }

  private static Schema getOutputSchema(Config config, Schema inputSchema) {
    List<Schema.Field> fields = new ArrayList<>();
    fields.addAll(inputSchema.getFields());
    if (config.messageField != null) {
      fields.add(Schema.Field.of(config.messageField, Schema.of(Schema.Type.STRING)));
    }
    if (config.codeField != null) {
      fields.add(Schema.Field.of(config.codeField, Schema.of(Schema.Type.INT)));
    }
    if (config.stageField != null) {
      fields.add(Schema.Field.of(config.stageField, Schema.of(Schema.Type.STRING)));
    }
    return Schema.recordOf("error" + inputSchema.getRecordName(), fields);
  }

  /**
   * The plugin config
   */
  public static class Config extends PluginConfig {
    @Nullable
    @Description("The name of the error message field to use in the output schema. " +
      "If this not specified, the error message will be dropped.")
    private String messageField;

    @Nullable
    @Description("The name of the error code field to use in the output schema. " +
      "If this not specified, the error code will be dropped.")
    private String codeField;

    @Nullable
    @Description("The name of the error stage field to use in the output schema. " +
      "If this not specified, the error stage will be dropped.")
    private String stageField;

  }
}

Alert Publisher Plugin

An AlertPublisher plugin is a special type of plugin that consumes alerts emitted from previous stages instead of output records. Alerts are meant to be uncommon events that need to be acted on in some other program. Alerts contain a payload, which is just a map of strings containing any relevant data. An alert publisher is responsible for writing the alerts to some system, where it can be read and acted upon by some external program. For example, a plugin may write alerts to Kafka. Alerts may not be published immediately after they are emitted. It is up to the processing engine to decide when to publish alerts.

The only method that needs to be implemented is: publish(Iterator<Alert> alerts)

Methods

Batch Aggregator Plugin

BatchAggregator plugin is used to compute aggregates over a batch of data. It is used in both batch and real-time data pipelines. An aggregation takes place in two steps: groupBy and then aggregate.

In order to implement a Batch Aggregator, you extend the BatchAggregator class. Unlike a Transform, which operates on a single record at a time, a BatchAggregator operates on a collection of records.

Methods

Example:

/**
 * Aggregator that counts how many times each word appears in records input to the aggregator.
 */
@Plugin(type = BatchAggregator.PLUGIN_TYPE)
@Name(WordCountAggregator.NAME)
@Description("Counts how many times each word appears in all records input to the aggregator.")
public class WordCountAggregator extends BatchAggregator<String, StructuredRecord, StructuredRecord> {
  public static final String NAME = "WordCount";
  public static final Schema OUTPUT_SCHEMA = Schema.recordOf(
    "wordCount",
    Schema.Field.of("word", Schema.of(Schema.Type.STRING)),
    Schema.Field.of("count", Schema.of(Schema.Type.LONG))
  );
  private static final Pattern WHITESPACE = Pattern.compile("\\s");
  private final Conf config;

  /**
   * Config properties for the plugin.
   */
  public static class Conf extends PluginConfig {
    @Description("The field from the input records containing the words to count.")
    private String field;
  }

  public WordCountAggregator(Conf config) {
    this.config = config;
  }

  @Override
  public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
    // any static configuration validation should happen here.
    // We will check that the field is in the input schema and is of type string.
    Schema inputSchema = pipelineConfigurer.getStageConfigurer().getInputSchema();
    // a null input schema means its unknown until runtime, or its not constant
    if (inputSchema != null) {
      // if the input schema is constant and known at configure time, check that the input field exists and is a string.
      Schema.Field inputField = inputSchema.getField(config.field);
      if (inputField == null) {
        throw new IllegalArgumentException(
          String.format("Field '%s' does not exist in input schema %s.", config.field, inputSchema));
      }
      Schema fieldSchema = inputField.getSchema();
      Schema.Type fieldType = fieldSchema.isNullable() ? fieldSchema.getNonNullable().getType() : fieldSchema.getType();
      if (fieldType != Schema.Type.STRING) {
        throw new IllegalArgumentException(
          String.format("Field '%s' is of illegal type %s. Must be of type %s.",
                        config.field, fieldType, Schema.Type.STRING));
      }
    }
    // set the output schema so downstream stages will know their input schema.
    pipelineConfigurer.getStageConfigurer().setOutputSchema(OUTPUT_SCHEMA);
  }

  @Override
  public void groupBy(StructuredRecord input, Emitter<String> groupKeyEmitter) throws Exception {
    String val = input.get(config.field);
    if (val == null) {
      return;
    }

    for (String word : WHITESPACE.split(val)) {
      groupKeyEmitter.emit(word);
    }
  }

  @Override
  public void aggregate(String groupKey, Iterator<StructuredRecord> groupValues,
                        Emitter<StructuredRecord> emitter) throws Exception {
    long count = 0;
    while (groupValues.hasNext()) {
      groupValues.next();
      count++;
    }
    emitter.emit(StructuredRecord.builder(OUTPUT_SCHEMA).set("word", groupKey).set("count", count).build());
  }
}

Batch Joiner Plugin

BatchJoiner plugin is used to join records over a batch of data. It can be used in both batch and real-time data pipelines. A join takes place in two steps: a joinOn step followed by a merge step.

  1. In the joinOn step, the joiner creates a join key for each input record. The CDAP pipeline will then take all records that have the same join key and collect them into a group.

  2. The merge step is then called. In this step, the plugin receives a list of all the records with same join key based on the type of join (either an inner or outer join). It is then up to the plugin to decide what to emit, in what becomes the final output of the stage.

To implement a Batch Joiner, you extend the BatchJoiner class. Unlike a Transform, which operates on a single record at a time, a BatchJoiner operates on a collection of records.

Methods

Spark Compute Plugin

SparkCompute plugin is used to transform a collection of input records into a collection of output records. It can be used in both batch and real-time data pipelines. It is similar to a Transform, except instead of transforming its input record by record, it transforms an entire collection. In a SparkCompute plugin, you are given access to anything you would be able to do in a Spark program.

In order to implement a Spark Compute Plugin, you extend the SparkCompute class.

Methods

Example:

/**
 * SparkCompute plugin that counts how many times each word appears in records input to the compute stage.
 */
@Plugin(type = SparkCompute.PLUGIN_TYPE)
@Name(WordCountCompute.NAME)
@Description("Counts how many times each word appears in all records input to the aggregator.")
public class WordCountCompute extends SparkCompute<StructuredRecord, StructuredRecord> {
  public static final String NAME = "WordCount";
  public static final Schema OUTPUT_SCHEMA = Schema.recordOf(
    "wordCount",
    Schema.Field.of("word", Schema.of(Schema.Type.STRING)),
    Schema.Field.of("count", Schema.of(Schema.Type.LONG))
  );
  private final Conf config;

  /**
   * Config properties for the plugin.
   */
  public static class Conf extends PluginConfig {
    @Description("The field from the input records containing the words to count.")
    private String field;
  }

  public WordCountCompute(Conf config) {
    this.config = config;
  }

  @Override
  public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
    // any static configuration validation should happen here.
    // We will check that the field is in the input schema and is of type string.
    Schema inputSchema = pipelineConfigurer.getStageConfigurer().getInputSchema();
    if (inputSchema != null) {
      WordCount wordCount = new WordCount(config.field);
      wordCount.validateSchema(inputSchema);
    }
    // set the output schema so downstream stages will know their input schema.
    pipelineConfigurer.getStageConfigurer().setOutputSchema(OUTPUT_SCHEMA);
  }

  @Override
  public JavaRDD<StructuredRecord> transform(SparkExecutionPluginContext sparkExecutionPluginContext,
                                             JavaRDD<StructuredRecord> javaRDD) throws Exception {
    WordCount wordCount = new WordCount(config.field);
    return wordCount.countWords(javaRDD)
      .flatMap(new FlatMapFunction<Tuple2<String, Long>, StructuredRecord>() {
        @Override
        public Iterable<StructuredRecord> call(Tuple2<String, Long> stringLongTuple2) throws Exception {
          List<StructuredRecord> output = new ArrayList<>();
          output.add(StructuredRecord.builder(OUTPUT_SCHEMA)
                       .set("word", stringLongTuple2._1())
                       .set("count", stringLongTuple2._2())
                       .build());
          return output;
        }
      });
  }
}

Spark Sink Plugin

SparkSink plugin is used to perform computations on a collection of input records and optionally write output data. It can only be used in batch data pipelines. A SparkSink is similar to a SparkCompute plugin except that it has no output. In a SparkSink, you are given access to anything you would be able to do in a Spark program. For example, one common use case is to train a machine-learning model in this plugin.

In order to implement a Spark Sink Plugin, you extend the SparkSink class.

Methods

Example:

/**
 * SparkSink plugin that counts how many times each word appears in records input to it and stores the result in
 * a KeyValueTable.
 */
@Plugin(type = SparkSink.PLUGIN_TYPE)
@Name(WordCountSink.NAME)
@Description("Counts how many times each word appears in all records input to the aggregator.")
public class WordCountSink extends SparkSink<StructuredRecord> {
  public static final String NAME = "WordCount";
  private final Conf config;

  /**
   * Config properties for the plugin.
   */
  public static class Conf extends PluginConfig {
    @Description("The field from the input records containing the words to count.")
    private String field;

    @Description("The name of the KeyValueTable to write to.")
    private String tableName;
  }

  public WordCountSink(Conf config) {
    this.config = config;
  }

  @Override
  public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
    // any static configuration validation should happen here.
    // We will check that the field is in the input schema and is of type string.
    Schema inputSchema = pipelineConfigurer.getStageConfigurer().getInputSchema();
    if (inputSchema != null) {
      WordCount wordCount = new WordCount(config.field);
      wordCount.validateSchema(inputSchema);
    }
    pipelineConfigurer.createDataset(config.tableName, KeyValueTable.class, DatasetProperties.EMPTY);
  }

  @Override
  public void prepareRun(SparkPluginContext sparkPluginContext) throws Exception {
    // no-op
  }

  @Override
  public void run(SparkExecutionPluginContext sparkExecutionPluginContext,
                  JavaRDD<StructuredRecord> javaRDD) throws Exception {
    WordCount wordCount = new WordCount(config.field);
    JavaPairRDD outputRDD = wordCount.countWords(javaRDD)
      .mapToPair(new PairFunction<Tuple2<String, Long>, byte[], byte[]>() {
        @Override
        public Tuple2<byte[], byte[]> call(Tuple2<String, Long> stringLongTuple2) throws Exception {
          return new Tuple2<>(Bytes.toBytes(stringLongTuple2._1()), Bytes.toBytes(stringLongTuple2._2()));
        }
      });
    sparkExecutionPluginContext.saveAsDataset(outputRDD, config.tableName);
  }
}

Streaming Source Plugin

A Streaming Source plugin is used as a source in real-time data pipelines. It is used to fetch a Spark DStream, which is an object that represents a collection of Spark RDDs and that produces a new RDD every batch interval of the pipeline.

In order to implement a Streaming Source Plugin, you extend the StreamingSource class.

Methods

Example:

@Plugin(type = StreamingSource.PLUGIN_TYPE)
@Name("Twitter")
@Description("Twitter streaming source.")
public class TwitterStreamingSource extends StreamingSource<StructuredRecord> {
  private final TwitterStreamingConfig config;

  /**
   * Config class for TwitterStreamingSource.
   */
  public static class TwitterStreamingConfig extends PluginConfig implements Serializable {
    private static final long serialVersionUID = 4218063781909515444L;

    private String consumerKey;

    private String consumerSecret;

    private String accessToken;

    private String accessTokenSecret;

    private String referenceName;
  }

  public TwitterStreamingSource(TwitterStreamingConfig config) {
    this.config = config;
  }

  @Override
  public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
    pipelineConfigurer.getStageConfigurer().setOutputSchema(TwitterConstants.SCHEMA);
  }

  @Override
  public JavaDStream<StructuredRecord> getStream(StreamingContext context) throws Exception {
    JavaStreamingContext javaStreamingContext = context.getSparkStreamingContext();
    // lineage for this source will be tracked with this reference name
    context.registerLineage(config.referenceName);

    // Create authorization from user-provided properties
    ConfigurationBuilder configurationBuilder = new ConfigurationBuilder();
    configurationBuilder.setDebugEnabled(false)
      .setOAuthConsumerKey(config.consumerKey)
      .setOAuthConsumerSecret(config.consumerSecret)
      .setOAuthAccessToken(config.accessToken)
      .setOAuthAccessTokenSecret(config.accessTokenSecret);
    Authorization authorization = new OAuthAuthorization(configurationBuilder.build());

    return TwitterUtils.createStream(javaStreamingContext, authorization).map(
      new Function<Status, StructuredRecord>() {
        public StructuredRecord call(Status status) {
          return convertTweet(status);
        }
      }
    );
  }

  private StructuredRecord convertTweet(Status tweet) {
    // logic to convert a Twitter Status into a CDAP StructuredRecord
  }

}

Lineage

The lineage is registered using the referenceName provided when invoking registerLineage() on StreamingContext in getStream(). Note that the referenceName should be a valid DatasetId.

Windower Plugin

A Windower plugin is used in real-time data pipelines to create sliding windows over the data. It does this by combining multiple micro batches into larger batches.

A window is defined by its size and its slide interval. Both are defined in seconds and must be multiples of the batchInterval of the pipeline. The size defines how much data is contained in the window. The slide interval defines have often a window is created.

For example, consider a pipeline with a batchInterval of 10 seconds. The pipeline uses a windower that has a size of 60 and a slide interval of 30. The input into the windower will be micro batches containing 10 seconds of data. Every 30 seconds, the windower will output a batch of data containing the past 60 seconds of data, meaning the previous six micro batches that it received as input.

This also means that each window output will overlap (repeat) some of the data from the previous window. This is useful in calculating aggregates, such as how many "404" responses did a website send out in the past ten seconds, past minute, past five minutes.

In order to implement a Windower Plugin, you extend the Windower class.

Methods

Example:

@Plugin(type = Windower.PLUGIN_TYPE)
@Name("Window")
@Description("Creates a sliding window over the data")
public class Window extends Windower {
  private final Conf conf;

  /**
   * Config for window plugin.
   */
  public static class Conf extends PluginConfig {
    long width;
    long slideInterval;
  }

  public Window(Conf conf) {
    this.conf = conf;
  }

  @Override
  public long getWidth() {
    return conf.width;
  }

  @Override
  public long getSlideInterval() {
    return conf.slideInterval;
  }
}