Creating a Plugin
Action Plugin
An Action
plugin runs arbitrary logic at the start or end of a batch data pipeline.
In order to implement an Action plugin, you extend the Action
class. Only one method is required to be implemented: run()
Methods
run()
: Used to implement the functionality of the plugin.configurePipeline()
: Used to perform any validation on the application configuration that is required by this plugin or to create any datasets if thefieldName
for a dataset is not a macro.
Example:
/**
* Action that moves files from one fileset into another, optionally filtering files that match a regex.
*/
@Plugin(type = Action.PLUGIN_TYPE)
@Name(FilesetMoveAction.NAME)
@Description("Action that moves files from one fileset into another, optionally filtering files that match a regex.")
public class FilesetMoveAction extends Action {
public static final String NAME = "FilesetMove";
private final Conf config;
/**
* Config properties for the plugin.
*/
public static class Conf extends PluginConfig {
public static final String SOURCE_FILESET = "sourceFileset";
public static final String DEST_FILESET = "destinationFileset";
public static final String FILTER_REGEX = "filterRegex";
@Name(SOURCE_FILESET)
@Description("The fileset to move files from.")
private String sourceFileset;
@Name(DEST_FILESET)
@Description("The fileset to move files to.")
private String destinationFileset;
@Nullable
@Name(FILTER_REGEX)
@Description("Filter any files whose name matches this regex. Defaults to '^\\.', which will filter any files " +
"that begin with a period.")
private String filterRegex;
// set defaults for properties in a no-argument constructor.
public Conf() {
filterRegex = "^\\.";
}
}
public FilesetMoveAction(Conf config) {
this.config = config;
}
@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
Pattern.compile(config.filterRegex);
}
@Override
public void run(ActionContext context) throws Exception {
context.execute(new TxRunnable() {
@Override
public void run(DatasetContext context) throws Exception {
FileSet sourceFileSet = context.getDataset(config.sourceFileset);
FileSet destinationFileSet = context.getDataset(config.destinationFileset);
Pattern pattern = Pattern.compile(config.filterRegex);
for (Location sourceFile : sourceFileSet.getBaseLocation().list()) {
if (pattern.matcher(sourceFile.getName()).find()) {
continue;
}
Location destFile = destinationFileSet.getBaseLocation().append(sourceFile.getName());
sourceFile.renameTo(destFile);
}
}
});
}
}
Post-run Action Plugin
A PostAction
plugin runs arbitrary logic after the end of a pipeline run. It can be set to execute based on whether the run completed successfully, if it failed, or in either case.
The difference between a PostAction
and an Action
that is placed at the end of a pipeline is that a PostAction
will always be executed even if the pipeline run fails, while an Action
will only be executed if every stage preceding it successfully runs.
In order to implement a Post-run Action plugin, you extend the PostAction
class. Only one method is required to be implemented: run()
Methods
run()
: Used to implement the functionality of the plugin.configurePipeline()
: Used to perform any validation on the application configuration that is required by this plugin or to create any datasets if thefieldName
for a dataset is not a macro.
Example:
/**
* Post run action that deletes files in a FileSet that match a configurable regex.
*/
@Plugin(type = PostAction.PLUGIN_TYPE)
@Name(FilesetDeletePostAction.NAME)
@Description("Post run action that deletes files in a FileSet that match a configurable regex if the run succeeded.")
public class FilesetDeletePostAction extends PostAction {
public static final String NAME = "FilesetDelete";
private final Conf config;
/**
* Config properties for the plugin.
*/
public static class Conf extends PluginConfig {
public static final String FILESET_NAME = "filesetName";
public static final String DELETE_REGEX = "deleteRegex";
public static final String DIRECTORY = "directory";
@Name(FILESET_NAME)
@Description("The fileset to delete files from.")
private String filesetName;
@Name(DELETE_REGEX)
@Description("Delete files that match this regex.")
private String deleteRegex;
// Macro enabled properties can be set to a placeholder value ${key} when the pipeline is deployed.
// At runtime, the value for 'key' can be given and substituted in.
@Macro
@Name(DIRECTORY)
@Description("The fileset directory to delete files from.")
private String directory;
}
public FilesetDeletePostAction(Conf config) {
this.config = config;
}
@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
Pattern.compile(config.deleteRegex);
}
@Override
public void run(BatchActionContext context) throws Exception {
if (!context.isSuccessful()) {
return;
}
FileSet fileSet = context.getDataset(config.filesetName);
Pattern pattern = Pattern.compile(config.deleteRegex);
for (Location fileLocation : fileSet.getBaseLocation().append(config.directory).list()) {
if (pattern.matcher(fileLocation.getName()).find()) {
fileLocation.delete();
}
}
}
}
Batch Source Plugin
A BatchSource
plugin is used as a source of a batch data pipeline. It is used to prepare and configure the input of a pipeline run.
In order to implement a Batch Source, you extend the BatchSource
class. You need to define the types of the KEY
and VALUE
that the Batch Source will receive and the type of object that the Batch Source will emit to the subsequent stage (which could be either a Transformation or a Batch Sink). After defining the types, only one method is required to be implemented: prepareRun()
Methods
prepareRun()
: Used to configure the input for each run of the pipeline. If thefieldName
for a dataset is a macro, their creation will happen during this stage. This is called by the client that will submit the job for the pipeline run.onRunFinish()
: Used to run any required logic at the end of a pipeline run. This is called by the client that submitted the job for the pipeline run.configurePipeline()
: Used to perform any validation on the application configuration that is required by this plugin or to create any datasets if thefieldName
for a dataset is not a macro.initialize()
: Initialize the Batch Source. Guaranteed to be executed before any call to the plugin’stransform
method. This is called by each executor of the job. For example, if the MapReduce engine is being used, each mapper will call this method.destroy()
: Destroy any resources created byinitialize
. Guaranteed to be executed after all calls to the plugin’stransform
method have been made. This is called by each executor of the job. For example, if the MapReduce engine is being used, each mapper will call this method.transform()
: This method will be called for every input key-value pair generated by the batch job. By default, the value is emitted to the subsequent stage.
Example:
/**
* Batch Source that reads from a FileSet that has its data formatted as text.
*
* LongWritable is the first parameter because that is the key used by Hadoop's {@link TextInputFormat}.
* Similarly, Text is the second parameter because that is the value used by Hadoop's {@link TextInputFormat}.
* {@link StructuredRecord} is the third parameter because that is what the source will output.
* All the plugins included with Hydrator operate on StructuredRecord.
*/
@Plugin(type = BatchSource.PLUGIN_TYPE)
@Name(TextFileSetSource.NAME)
@Description("Reads from a FileSet that has its data formatted as text.")
public class TextFileSetSource extends BatchSource<LongWritable, Text, StructuredRecord> {
public static final String NAME = "TextFileSet";
public static final Schema OUTPUT_SCHEMA = Schema.recordOf(
"textRecord",
Schema.Field.of("position", Schema.of(Schema.Type.LONG)),
Schema.Field.of("text", Schema.of(Schema.Type.STRING))
);
private final Conf config;
/**
* Config properties for the plugin.
*/
public static class Conf extends PluginConfig {
public static final String FILESET_NAME = "fileSetName";
public static final String FILES = "files";
public static final String CREATE_IF_NOT_EXISTS = "createIfNotExists";
public static final String DELETE_INPUT_ON_SUCCESS = "deleteInputOnSuccess";
// The name annotation tells CDAP what the property name is. It is optional, and defaults to the variable name.
// Note: only primitives (including boxed types) and string are the types that are supported
@Name(FILESET_NAME)
@Description("The name of the FileSet to read from.")
private String fileSetName;
// Macro enabled properties can be set to a placeholder value ${key} when the pipeline is deployed.
// At runtime, the value for 'key' can be given and substituted in.
@Macro
@Name(FILES)
@Description("A comma separated list of files in the FileSet to read.")
private String files;
// A nullable fields tells CDAP that this is an optional field.
@Nullable
@Name(CREATE_IF_NOT_EXISTS)
@Description("Whether to create the FileSet if it doesn't already exist. Defaults to false.")
private Boolean createIfNotExists;
@Nullable
@Name(DELETE_INPUT_ON_SUCCESS)
@Description("Whether to delete the data read by the source after the run succeeds. Defaults to false.")
private Boolean deleteInputOnSuccess;
// Use a no-args constructor to set field defaults.
public Conf() {
createIfNotExists = false;
deleteInputOnSuccess = false;
}
}
// CDAP will pass in a config with its fields populated based on the configuration given when creating the pipeline.
public TextFileSetSource(Conf config) {
this.config = config;
}
// configurePipeline is called exactly once when the pipeline is being created.
// Any static configuration should be performed here.
@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
// if the user has set createIfNotExists to true, create the FileSet here.
if (config.createIfNotExists) {
pipelineConfigurer.createDataset(config.fileSetName,
FileSet.class,
FileSetProperties.builder()
.setInputFormat(TextInputFormat.class)
.setOutputFormat(TextOutputFormat.class)
.setEnableExploreOnCreate(true)
.setExploreFormat("text")
.setExploreSchema("text string")
.build()
);
}
// set the output schema of this stage so that stages further down the pipeline will know their input schema.
pipelineConfigurer.getStageConfigurer().setOutputSchema(OUTPUT_SCHEMA);
}
// prepareRun is called before every pipeline run, and is used to configure what the input should be,
// as well as any arguments the input should use. It is called by the client that is submitting the batch job.
@Override
public void prepareRun(BatchSourceContext context) throws IOException {
Map<String, String> arguments = new HashMap<>();
FileSetArguments.setInputPaths(arguments, config.files);
context.setInput(Input.ofDataset(config.fileSetName, arguments));
}
// onRunFinish is called at the end of the pipeline run by the client that submitted the batch job.
@Override
public void onRunFinish(boolean succeeded, BatchSourceContext context) {
// perform any actions that should happen at the end of the run.
// in our case, we want to delete the data read during this run if the run succeeded.
if (succeeded && config.deleteInputOnSuccess) {
Map<String, String> arguments = new HashMap<>();
FileSetArguments.setInputPaths(arguments, config.files);
FileSet fileSet = context.getDataset(config.fileSetName, arguments);
for (Location inputLocation : fileSet.getInputLocations()) {
try {
inputLocation.delete(true);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}
// initialize is called by each job executor before any call to transform is made.
// This occurs at the start of the batch job run, after the job has been successfully submitted.
// For example, if mapreduce is the execution engine, each mapper will call initialize at the start of the program.
@Override
public void initialize(BatchRuntimeContext context) throws Exception {
super.initialize(context);
// create any resources required by transform()
}
// destroy is called by each job executor at the end of its life.
// For example, if mapreduce is the execution engine, each mapper will call destroy at the end of the program.
@Override
public void destroy() {
// clean up any resources created by initialize
}
// transform is used to transform the key-value pair output by the input into objects output by this source.
// The output should be a StructuredRecord if you want the source to be compatible with the plugins included
// with Hydrator.
@Override
public void transform(KeyValue<LongWritable, Text> input, Emitter<StructuredRecord> emitter) throws Exception {
emitter.emit(StructuredRecord.builder(OUTPUT_SCHEMA)
.set("position", input.getKey().get())
.set("text", input.getValue().toString())
.build()
);
}
}
Lineage
For plugins that fetch data from non-CDAP sources, the lineage is registered using the inputName
provided when setInput()
is invoked on BatchSourceContext
in prepareRun()
. Note that the inputName
should be a valid DatasetId
. For example:
Lineage will be tracked using myExternalSource
.
Batch Sink Plugin
A BatchSink
plugin is used to write data in either batch or real-time data pipelines. It is used to prepare and configure the output of a batch of data from a pipeline run.
In order to implement a Batch Sink, you extend the BatchSink
class. Similar to a Batch Source, you need to define the types of the KEY
and VALUE
that the Batch Sink will write in the Batch job and the type of object that it will accept from the previous stage (which could be either a Transformation or a Batch Source).
After defining the types, only one method is required to be implemented: prepareRun()
Methods
prepareRun()
: Used to configure the output for each run of the pipeline. This is called by the client that will submit the job for the pipeline run.onRunFinish()
: Used to run any required logic at the end of a pipeline run. This is called by the client that submitted the job for the pipeline run.configurePipeline()
: Used to perform any validation on the application configuration that is required by this plugin or to create any datasets if thefieldName
for a dataset is not a macro.initialize()
: Initialize the Batch Sink. Guaranteed to be executed before any call to the plugin’stransform
method. This is called by each executor of the job. For example, if the MapReduce engine is being used, each mapper will call this method.destroy()
: Destroy any resources created byinitialize
. Guaranteed to be executed after all calls to the plugin’stransform
method have been made. This is called by each executor of the job. For example, if the MapReduce engine is being used, each mapper will call this method.transform()
: This method will be called for every object that is received from the previous stage. The logic inside the method will transform the object to the key-value pair expected by the Batch Sink's output format. If you don't override this method, the incoming object is set as the key and the value is set to null.
Example:
Lineage
For plugins that write data to non-CDAP sinks, the lineage is registered using the outputName
provided when addOutput()
is invoked on BatchSinkContext
in prepareRun()
. Note that the outputName
should be a valid DatasetId
. For example:
Lineage will be tracked using myExternalSink
.
Transformation Plugin
A Transform
plugin is used to convert one input record into zero or more output records. It can be used in both batch and real-time data pipelines.
The only method that needs to be implemented is: transform()
Methods
initialize()
: Used to perform any initialization step that might be required during the runtime of theTransform
. It is guaranteed that this method will be invoked before thetransform
method.transform()
: This method contains the logic that will be applied on each incoming data object. An emitter can be used to pass the results to the subsequent stage.destroy()
: Used to perform any cleanup before the plugin shuts down.
Below is an example of a StringCase
that transforms specific fields to lowercase or uppercase.
If you wanted, you could add to the transform
method a user metric indicating the number of fields changed. The user metrics can be queried by using the CDAP Metrics Microservices:
Error Transformation Plugin
An ErrorTransform
plugin is a special type of Transform
that consumes error records emitted from the previous stages instead of output records. It is used to transform an ErrorRecord
to zero or more output records. In addition to the actual error object, an ErrorRecord
exposes the stage the error was emitted from, an error code, and an error message. Errors can be emitted by BatchSource
, Transform
, and BatchAggregator
plugins using the Emitter
they receive. An ErrorTransform
can be used in both batch and real-time data pipelines.
The only method that needs to be implemented is: transform()
Methods
initialize()
: Used to perform any initialization step that might be required during the runtime of theErrorTransform
. It is guaranteed that this method will be invoked before thetransform
method.transform()
: This method contains the logic that will be applied on each incomingErrorRecord
object. An emitter can be used to pass the results to the subsequent stage.destroy()
: Used to perform any cleanup before the plugin shuts down.
Below is an example of an ErrorCollector
that adds the error stage, code, and message to each record it receives.
Alert Publisher Plugin
An AlertPublisher
plugin is a special type of plugin that consumes alerts emitted from previous stages instead of output records. Alerts are meant to be uncommon events that need to be acted on in some other program. Alerts contain a payload, which is just a map of strings containing any relevant data. An alert publisher is responsible for writing the alerts to some system, where it can be read and acted upon by some external program. For example, a plugin may write alerts to Kafka. Alerts may not be published immediately after they are emitted. It is up to the processing engine to decide when to publish alerts.
The only method that needs to be implemented is: publish(Iterator<Alert> alerts)
Methods
initialize()
: Used to perform any initialization step that might be required during the runtime of theAlertPublisher
. It is guaranteed that this method will be invoked before thepublish
method.publish()
: This method contains the logic that will publish each incoming Alert.destroy()
: Used to perform any cleanup before the plugin shuts down.
Batch Aggregator Plugin
A BatchAggregator
plugin is used to compute aggregates over a batch of data. It is used in both batch and real-time data pipelines. An aggregation takes place in two steps: groupBy and then aggregate.
In the groupBy step, the aggregator creates zero or more group keys for each input record. Before the aggregate step occurs, the CDAP pipeline will take all records that have the same group key, and collect them into a group. If a record does not have any of the group keys, it is filtered out. If a record has multiple group keys, it will belong to multiple groups.
The aggregate step is then called. In this step, the plugin receives group keys and all records that had that group key. It is then left to the plugin to decide what to do with each of the groups.
In order to implement a Batch Aggregator, you extend the BatchAggregator
class. Unlike a Transform
, which operates on a single record at a time, a BatchAggregator
operates on a collection of records.
Methods
configurePipeline()
: Used to perform any validation on the application configuration that is required by this plugin or to create any datasets if thefieldName
for a dataset is not a macro.initialize()
: Initialize the Batch Aggregator. Guaranteed to be executed before any call to the plugin’sgroupBy
oraggregate
methods. This is called by each executor of the job. For example, if the MapReduce engine is being used, each mapper will call this method.destroy()
: Destroy any resources created byinitialize
. Guaranteed to be executed after all calls to the plugin’sgroupBy
oraggregate
methods have been made. This is called by each executor of the job. For example, if the MapReduce engine is being used, each mapper will call this method.groupBy()
: This method will be called for every object that is received from the previous stage. This method returns zero or more group keys for each object it receives. Objects with the same group key will be grouped together for theaggregate
method.aggregate()
: The method is called after every object has been assigned their group keys. This method is called once for each group key emitted by thegroupBy
method. The method receives a group key as well as an iterator over all objects that had that group key. Objects emitted in this method are the output for this stage.
Example:
Batch Joiner Plugin
A BatchJoiner
plugin is used to join records over a batch of data. It can be used in both batch and real-time data pipelines. A join takes place in two steps: a joinOn step followed by a merge step.
In the joinOn step, the joiner creates a join key for each input record. The CDAP pipeline will then take all records that have the same join key and collect them into a group.
The merge step is then called. In this step, the plugin receives a list of all the records with same join key based on the type of join (either an inner or outer join). It is then up to the plugin to decide what to emit, in what becomes the final output of the stage.
To implement a Batch Joiner, you extend the BatchJoiner
class. Unlike a Transform
, which operates on a single record at a time, a BatchJoiner
operates on a collection of records.
Methods
configurePipeline()
: Used to create any datasets, or perform any validation on the application configuration that is required by this plugin.initialize()
: Initialize the Batch Joiner. Guaranteed to be executed before any call to the plugin’sjoinOn
ormerge
methods. This is called by each executor of the job. For example, if the MapReduce engine is being used, each mapper will call this method.prepareRun()
: Prepares a pipeline run. This is run every time before a pipeline runs to help set up the run. Here you can set properties such as the number of partitions to use when joining and the join key class, if it is not known at compile time.destroy()
: Destroy any resources created by theinitialize
method. Guaranteed to be executed after all calls to the plugin’sjoinOn
ormerge
methods have been made. This is called by each executor of the job. For example, if the MapReduce engine is being used, each mapper will call this method.joinOn()
: This method will be called for every object that is received from the previous stage. This method returns a join key for each object it receives. Objects with the same join key will be grouped together for themerge
method.getJoinConfig()
: This method will be called by the CDAP Pipeline to find out the type of join to be performed. The config specifies which input stages arerequiredInputs
. Records from a required input will always be present in themerge()
method. Records from a non-required input will only be present in themerge()
method if they meet the join criteria. In other words, if there are no required inputs, a full outer join is performed. If all inputs are required inputs, an inner join is performed.merge()
: This method is called after each object has been assigned a join key. The method receives a join key, an iterator over all objects with that join key, and the stage that emitted the object. Objects emitted by this method are the output for this stage.
Spark Compute Plugin
A SparkCompute
plugin is used to transform a collection of input records into a collection of output records. It can be used in both batch and real-time data pipelines. It is similar to a Transform
, except instead of transforming its input record by record, it transforms an entire collection. In a SparkCompute
plugin, you are given access to anything you would be able to do in a Spark program.
In order to implement a Spark Compute Plugin, you extend the SparkCompute
class.
Methods
configurePipeline()
: Used to perform any validation on the application configuration that is required by this plugin or to create any datasets if thefieldName
for a dataset is not a macro.transform()
: This method is given a Spark RDD (Resilient Distributed Dataset) containing every object that is received from the previous stage. This method then performs Spark operations on the input to transform it into an output RDD that will be sent to the next stage.
Example:
Spark Sink Plugin
A SparkSink
plugin is used to perform computations on a collection of input records and optionally write output data. It can only be used in batch data pipelines. A SparkSink
is similar to a SparkCompute
plugin except that it has no output. In a SparkSink
, you are given access to anything you would be able to do in a Spark program. For example, one common use case is to train a machine-learning model in this plugin.
In order to implement a Spark Sink Plugin, you extend the SparkSink
class.
Methods
configurePipeline()
: Used to perform any validation on the application configuration that is required by this plugin or to create any datasets if thefieldName
for a dataset is not a macro.run()
: This method is given a Spark RDD (Resilient Distributed Dataset) containing every object that is received from the previous stage. Then this method performs Spark operations on the input, and usually saves the result to a dataset.
Example:
Streaming Source Plugin
A Streaming Source plugin is used as a source in real-time data pipelines. It is used to fetch a Spark DStream, which is an object that represents a collection of Spark RDDs and that produces a new RDD every batch interval of the pipeline.
In order to implement a Streaming Source Plugin, you extend the StreamingSource
class.
Methods
configurePipeline()
: Used to perform any validation on the application configuration that is required by this plugin or to create any datasets if thefieldName
for a dataset is not a macro.getStream()
: Returns theJavaDStream
that will be used as a source in the pipeline.
Example:
Lineage
The lineage is registered using the referenceName
provided when invoking registerLineage()
on StreamingContext
in getStream()
. Note that the referenceName
should be a valid DatasetId
.
Windower Plugin
A Windower plugin is used in real-time data pipelines to create sliding windows over the data. It does this by combining multiple micro batches into larger batches.
A window is defined by its size and its slide interval. Both are defined in seconds and must be multiples of the batchInterval
of the pipeline. The size defines how much data is contained in the window. The slide interval defines have often a window is created.
For example, consider a pipeline with a batchInterval
of 10 seconds. The pipeline uses a windower
that has a size of 60 and a slide interval of 30. The input into the windower
will be micro batches containing 10 seconds of data. Every 30 seconds, the windower
will output a batch of data containing the past 60 seconds of data, meaning the previous six micro batches that it received as input.
This also means that each window output will overlap (repeat) some of the data from the previous window. This is useful in calculating aggregates, such as how many "404" responses did a website send out in the past ten seconds, past minute, past five minutes.
In order to implement a Windower Plugin, you extend the Windower
class.
Methods
configurePipeline()
: Used to perform any validation on the application configuration that is required by this plugin or to create any datasets if thefieldName
for adataset is not a macro.
getWidth()
: Return the width in seconds of windows created by this plugin. Must be a multiple of thebatchInterval
of the pipeline.getSlideInterval()
: Get the slide interval in seconds of windows created by this plugin. Must be a multiple of thebatchInterval
of the pipeline.
Example:
Created in 2020 by Google Inc.