Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In order to implement an Action plugin, you extend the Action class. Only one method is required to be implemented:

...

run()

Methods

  • run(): Used to implement the functionality of the plugin.

  • configurePipeline(): Used to perform any validation on the application configuration that is required by this plugin or to create any streams or datasets if the fieldName for a stream or dataset is not a macro.

...

In order to implement a Post-run Action plugin, you extend the PostAction class. Only one method is required to be implemented:

...

run()

Methods

  • run(): Used to implement the functionality of the plugin.

  • configurePipeline(): Used to perform any validation on the application configuration that is required by this plugin or to create any streams or datasets if the fieldName for a stream or dataset is not a macro.

...

Code Block
/**
 * Post run action that deletes files in a FileSet that match a configurable regex.
 */
@Plugin(type = PostAction.PLUGIN_TYPE)
@Name(FilesetDeletePostAction.NAME)
@Description("Post run action that deletes files in a FileSet that match a configurable regex if the run succeeded.")
public class FilesetDeletePostAction extends PostAction {
  public static final String NAME = "FilesetDelete";
  private final Conf config;

  /**
   * Config properties for the plugin.
   */
  public static class Conf extends PluginConfig {
    public static final String FILESET_NAME = "filesetName";
    public static final String DELETE_REGEX = "deleteRegex";
    public static final String DIRECTORY = "directory";

    @Name(FILESET_NAME)
    @Description("The fileset to delete files from.")
    private String filesetName;

    @Name(DELETE_REGEX)
    @Description("Delete files that match this regex.")
    private String deleteRegex;

    // Macro enabled properties can be set to a placeholder value ${key} when the pipeline is deployed.
    // At runtime, the value for 'key' can be given and substituted in.
    @Macro
    @Name(DIRECTORY)
    @Description("The fileset directory to delete files from.")
    private String directory;
  }

  public FilesetDeletePostAction(Conf config) {
    this.config = config;
  }

  @Override
  public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
    Pattern.compile(config.deleteRegex);
  }

  @Override
  public void run(BatchActionContext context) throws Exception {
    if (!context.isSuccessful()) {
      return;
    }

    FileSet fileSet = context.getDataset(config.filesetName);
    Pattern pattern = Pattern.compile(config.deleteRegex);
    for (Location fileLocation : fileSet.getBaseLocation().append(config.directory).list()) {
      if (pattern.matcher(fileLocation.getName()).find()) {
        fileLocation.delete();
      }
    }
  }
}

Batch Source Plugin

BatchSource plugin is used as a source of a batch data pipeline. It is used to prepare and configure the input of a pipeline run.

In order to implement a Batch Source, you extend the BatchSource class. You need to define the types of the KEY and VALUE that the Batch Source will receive and the type of object that the Batch Source will emit to the subsequent stage (which could be either a Transformation or a Batch Sink). After defining the types, only one method is required to be implemented: prepareRun()

Methods

  • prepareRun(): Used to configure the input for each run of the pipeline. If the fieldName for a stream or dataset is a macro, their creation will happen during this stage. This is called by the client that will submit the job for the pipeline run.

  • onRunFinish(): Used to run any required logic at the end of a pipeline run. This is called by the client that submitted the job for the pipeline run.

  • configurePipeline(): Used to perform any validation on the application configuration that is required by this plugin or to create any streams or datasets if the fieldName for a stream or dataset is not a macro.

  • initialize(): Initialize the Batch Source. Guaranteed to be executed before any call to the plugin’s transform method. This is called by each executor of the job. For example, if the MapReduce engine is being used, each mapper will call this method.

  • destroy(): Destroy any resources created by initialize. Guaranteed to be executed after all calls to the plugin’s transform method have been made. This is called by each executor of the job. For example, if the MapReduce engine is being used, each mapper will call this method.

  • transform(): This method will be called for every input key-value pair generated by the batch job. By default, the value is emitted to the subsequent stage.

...

Code Block
/**
 * Batch Source that reads from a FileSet that has its data formatted as text.
 *
 * LongWritable is the first parameter because that is the key used by Hadoop's {@link TextInputFormat}.
 * Similarly, Text is the second parameter because that is the value used by Hadoop's {@link TextInputFormat}.
 * {@link StructuredRecord} is the third parameter because that is what the source will output.
 * All the plugins included with Hydrator operate on StructuredRecord.
 */
@Plugin(type = BatchSource.PLUGIN_TYPE)
@Name(TextFileSetSource.NAME)
@Description("Reads from a FileSet that has its data formatted as text.")
public class TextFileSetSource extends BatchSource<LongWritable, Text, StructuredRecord> {
  public static final String NAME = "TextFileSet";
  public static final Schema OUTPUT_SCHEMA = Schema.recordOf(
    "textRecord",
    Schema.Field.of("position", Schema.of(Schema.Type.LONG)),
    Schema.Field.of("text", Schema.of(Schema.Type.STRING))
  );
  private final Conf config;

  /**
   * Config properties for the plugin.
   */
  public static class Conf extends PluginConfig {
    public static final String FILESET_NAME = "fileSetName";
    public static final String FILES = "files";
    public static final String CREATE_IF_NOT_EXISTS = "createIfNotExists";
    public static final String DELETE_INPUT_ON_SUCCESS = "deleteInputOnSuccess";

    // The name annotation tells CDAP what the property name is. It is optional, and defaults to the variable name.
    // Note:  only primitives (including boxed types) and string are the types that are supported
    @Name(FILESET_NAME)
    @Description("The name of the FileSet to read from.")
    private String fileSetName;

    // Macro enabled properties can be set to a placeholder value ${key} when the pipeline is deployed.
    // At runtime, the value for 'key' can be given and substituted in.
    @Macro
    @Name(FILES)
    @Description("A comma separated list of files in the FileSet to read.")
    private String files;

    // A nullable fields tells CDAP that this is an optional field.
    @Nullable
    @Name(CREATE_IF_NOT_EXISTS)
    @Description("Whether to create the FileSet if it doesn't already exist. Defaults to false.")
    private Boolean createIfNotExists;

    @Nullable
    @Name(DELETE_INPUT_ON_SUCCESS)
    @Description("Whether to delete the data read by the source after the run succeeds. Defaults to false.")
    private Boolean deleteInputOnSuccess;

    // Use a no-args constructor to set field defaults.
    public Conf() {
      createIfNotExists = false;
      deleteInputOnSuccess = false;
    }
  }

  // CDAP will pass in a config with its fields populated based on the configuration given when creating the pipeline.
  public TextFileSetSource(Conf config) {
    this.config = config;
  }

  // configurePipeline is called exactly once when the pipeline is being created.
  // Any static configuration should be performed here.
  @Override
  public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
    // if the user has set createIfNotExists to true, create the FileSet here.
    if (config.createIfNotExists) {
      pipelineConfigurer.createDataset(config.fileSetName,
                                       FileSet.class,
                                       FileSetProperties.builder()
                                         .setInputFormat(TextInputFormat.class)
                                         .setOutputFormat(TextOutputFormat.class)
                                         .setEnableExploreOnCreate(true)
                                         .setExploreFormat("text")
                                         .setExploreSchema("text string")
                                         .build()
      );
    }
    // set the output schema of this stage so that stages further down the pipeline will know their input schema.
    pipelineConfigurer.getStageConfigurer().setOutputSchema(OUTPUT_SCHEMA);
  }

  // prepareRun is called before every pipeline run, and is used to configure what the input should be,
  // as well as any arguments the input should use. It is called by the client that is submitting the batch job.
  @Override
  public void prepareRun(BatchSourceContext context) throws IOException {
    Map<String, String> arguments = new HashMap<>();
    FileSetArguments.setInputPaths(arguments, config.files);
    context.setInput(Input.ofDataset(config.fileSetName, arguments));
  }

  // onRunFinish is called at the end of the pipeline run by the client that submitted the batch job.
  @Override
  public void onRunFinish(boolean succeeded, BatchSourceContext context) {
    // perform any actions that should happen at the end of the run.
    // in our case, we want to delete the data read during this run if the run succeeded.
    if (succeeded && config.deleteInputOnSuccess) {
      Map<String, String> arguments = new HashMap<>();
      FileSetArguments.setInputPaths(arguments, config.files);
      FileSet fileSet = context.getDataset(config.fileSetName, arguments);
      for (Location inputLocation : fileSet.getInputLocations()) {
        try {
          inputLocation.delete(true);
        } catch (IOException e) {
          throw new RuntimeException(e);
        }
      }
    }
  }

  // initialize is called by each job executor before any call to transform is made.
  // This occurs at the start of the batch job run, after the job has been successfully submitted.
  // For example, if mapreduce is the execution engine, each mapper will call initialize at the start of the program.
  @Override
  public void initialize(BatchRuntimeContext context) throws Exception {
    super.initialize(context);
    // create any resources required by transform()
  }

  // destroy is called by each job executor at the end of its life.
  // For example, if mapreduce is the execution engine, each mapper will call destroy at the end of the program.
  @Override
  public void destroy() {
    // clean up any resources created by initialize
  }

  // transform is used to transform the key-value pair output by the input into objects output by this source.
  // The output should be a StructuredRecord if you want the source to be compatible with the plugins included
  // with Hydrator.
  @Override
  public void transform(KeyValue<LongWritable, Text> input, Emitter<StructuredRecord> emitter) throws Exception {
    emitter.emit(StructuredRecord.builder(OUTPUT_SCHEMA)
                   .set("position", input.getKey().get())
                   .set("text", input.getValue().toString())
                   .build()
    );
  }
}

Lineage

For plugins that fetch data from non-CDAP sources, the lineage is registered using the inputName provided when setInput() is invoked on BatchSourceContext in prepareRun(). Note that the inputName should be a valid DatasetId. For example:

...

After defining the types, only one method is required to be implemented: prepareRun()

Methods

  • prepareRun(): Used to configure the output for each run of the pipeline. This is called by the client that will submit the job for the pipeline run.

  • onRunFinish(): Used to run any required logic at the end of a pipeline run. This is called by the client that submitted the job for the pipeline run.

  • configurePipeline(): Used to perform any validation on the application configuration that is required by this plugin or to create any streams or datasets if the fieldName for a stream or dataset is not a macro.

  • initialize(): Initialize the Batch Sink. Guaranteed to be executed before any call to the plugin’s transform method. This is called by each executor of the job. For example, if the MapReduce engine is being used, each mapper will call this method.

  • destroy(): Destroy any resources created by initialize. Guaranteed to be executed after all calls to the plugin’s transform method have been made. This is called by each executor of the job. For example, if the MapReduce engine is being used, each mapper will call this method.

  • transform(): This method will be called for every object that is received from the previous stage. The logic inside the method will transform the object to the key-value pair expected by the Batch Sink's output format. If you don't override this method, the incoming object is set as the key and the value is set to null.

...

The only method that needs to be implemented is: transform()

Methods

  • initialize(): Used to perform any initialization step that might be required during the runtime of the Transform. It is guaranteed that this method will be invoked before the transform method.

  • transform(): This method contains the logic that will be applied on each incoming data object. An emitter can be used to pass the results to the subsequent stage.

  • destroy(): Used to perform any cleanup before the plugin shuts down.

...

The only method that needs to be implemented is: transform()

Methods

  • initialize(): Used to perform any initialization step that might be required during the runtime of the ErrorTransform. It is guaranteed that this method will be invoked before the transform method.

  • transform(): This method contains the logic that will be applied on each incoming ErrorRecord object. An emitter can be used to pass the results to the subsequent stage.

  • destroy(): Used to perform any cleanup before the plugin shuts down.

...

Code Block
/**
 * Adds the error code and error message to each record, then emits it.
 */
@Plugin(type = ErrorTransform.PLUGIN_TYPE)
@Name("ErrorCollector")
public class ErrorCollector extends ErrorTransform<StructuredRecord, StructuredRecord> {
  private final Config config;

  public ErrorCollector(Config config) {
    this.config = config;
  }

  @Override
  public void configurePipeline(PipelineConfigurer pipelineConfigurer) throws IllegalArgumentException {
    Schema inputSchema = pipelineConfigurer.getStageConfigurer().getInputSchema();
    if (inputSchema != null) {
      if (inputSchema.getField(config.messageField) != null) {
        throw new IllegalArgumentException(String.format(
          "Input schema already contains message field %s. Please set messageField to a different value.",
          config.messageField));
      }
      if (inputSchema.getField(config.codeField) != null) {
        throw new IllegalArgumentException(String.format(
          "Input schema already contains code field %s. Please set codeField to a different value.",
          config.codeField));
      }
      if (inputSchema.getField(config.stageField) != null) {
        throw new IllegalArgumentException(String.format(
          "Input schema already contains stage field %s. Please set stageField to a different value.",
          config.stageField));
      }
      Schema outputSchema = getOutputSchema(config, inputSchema);
      pipelineConfigurer.getStageConfigurer().setOutputSchema(outputSchema);
    }
  }

  @Override
  public void transform(ErrorRecord<StructuredRecord> input, Emitter<StructuredRecord> emitter) throws Exception {
    StructuredRecord invalidRecord = input.getRecord();
    StructuredRecord.Builder output = StructuredRecord.builder(getOutputSchema(config, invalidRecord.getSchema()));
    for (Schema.Field field : invalidRecord.getSchema().getFields()) {
      output.set(field.getName(), invalidRecord.get(field.getName()));
    }
    if (config.messageField != null) {
      output.set(config.messageField, input.getErrorMessage());
    }
    if (config.codeField != null) {
      output.set(config.codeField, input.getErrorCode());
    }
    if (config.stageField != null) {
      output.set(config.stageField, input.getStageName());
    }
    emitter.emit(output.build());
  }

  private static Schema getOutputSchema(Config config, Schema inputSchema) {
    List<Schema.Field> fields = new ArrayList<>();
    fields.addAll(inputSchema.getFields());
    if (config.messageField != null) {
      fields.add(Schema.Field.of(config.messageField, Schema.of(Schema.Type.STRING)));
    }
    if (config.codeField != null) {
      fields.add(Schema.Field.of(config.codeField, Schema.of(Schema.Type.INT)));
    }
    if (config.stageField != null) {
      fields.add(Schema.Field.of(config.stageField, Schema.of(Schema.Type.STRING)));
    }
    return Schema.recordOf("error" + inputSchema.getRecordName(), fields);
  }

  /**
   * The plugin config
   */
  public static class Config extends PluginConfig {
    @Nullable
    @Description("The name of the error message field to use in the output schema. " +
      "If this not specified, the error message will be dropped.")
    private String messageField;

    @Nullable
    @Description("The name of the error code field to use in the output schema. " +
      "If this not specified, the error code will be dropped.")
    private String codeField;

    @Nullable
    @Description("The name of the error stage field to use in the output schema. " +
      "If this not specified, the error stage will be dropped.")
    private String stageField;

  }
}

Alert Publisher Plugin

An AlertPublisher plugin is a special type of plugin that consumes alerts emitted from previous stages instead of output records. Alerts are meant to be uncommon events that need to be acted on in some other program. Alerts contain a payload, which is just a map of strings containing any relevant data. An alert publisher is responsible for writing the alerts to some system, where it can be read and acted upon by some external program. For example, a plugin may write alerts to Kafka. Alerts may not be published immediately after they are emitted. It is up to the processing engine to decide when to publish alerts.

The only method that needs to be implemented is:

...

publish(Iterator<Alert>

...

alerts)

...

Methods

  • initialize(): Used to perform any initialization step that might be required during the runtime of the AlertPublisher. It is guaranteed that this method will be invoked before the publish method.

  • publish(): This method contains the logic that will publish each incoming Alert.

  • destroy(): Used to perform any cleanup before the plugin shuts down.

...