...
Code Block |
---|
/**
* Batch Sink that writes to a FileSet in text format.
* Each record will be written as a single line, with record fields separated by a configurable separator.
*
* StructuredRecord is the first parameter because that is the input to the sink.
* The second and third parameters are the key and value expected by Hadoop's {@link TextOutputFormat}.
*/
@Plugin(type = BatchSink.PLUGIN_TYPE)
@Name(TextFileSetSink.NAME)
@Description("Writes to a FileSet in text format.")
public class TextFileSetSink extends BatchSink<StructuredRecord, NullWritable, Text> {
public static final String NAME = "TextFileSet";
private final Conf config;
/**
* Config properties for the plugin.
*/
public static class Conf extends PluginConfig {
public static final String FILESET_NAME = "fileSetName";
public static final String OUTPUT_DIR = "outputDir";
public static final String FIELD_SEPARATOR = "fieldSeparator";
// The name annotation tells CDAP what the property name is. It is optional, and defaults to the variable name.
// Note: only primitives (including boxed types) and string are the types that are supported
@Name(FILESET_NAME)
@Description("The name of the FileSet to write to.")
private String fileSetName;
// Macro enabled properties can be set to a placeholder value ${key} when the pipeline is deployed.
// At runtime, the value for 'key' can be given and substituted in.
@Macro
@Name(OUTPUT_DIR)
@Description("The FileSet directory to write to.")
private String outputDir;
@Nullable
@Name(FIELD_SEPARATOR)
@Description("The separator to use to join input record fields together. Defaults to ','.")
private String fieldSeparator;
// Use a no-args constructor to set field defaults.
public Conf() {
fileSetName = "";
fieldSeparator = ",";
}
}
// CDAP will pass in a config with its fields populated based on the configuration given when creating the pipeline.
public TextFileSetSink(Conf config) {
this.config = config;
}
// configurePipeline is called exactly once when the pipeline is being created.
// Any static configuration should be performed here.
@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
// create the FileSet here.
pipelineConfigurer.createDataset(config.fileSetName,
FileSet.class,
FileSetProperties.builder()
.setInputFormat(TextInputFormat.class)
.setOutputFormat(TextOutputFormat.class)
.setEnableExploreOnCreate(true)
.setExploreFormat("text")
.setExploreSchema("text string")
.build()
);
}
// prepareRun is called before every pipeline run, and is used to configure what the input should be,
// as well as any arguments the input should use. It is called by the client that is submitting the batch job.
@Override
public void prepareRun(BatchSinkContext context) throws Exception {
Map<String, String> arguments = new HashMap<>();
FileSetArguments.setOutputPath(arguments, config.outputDir);
context.addOutput(Output.ofDataset(config.fileSetName, arguments));
}
// onRunFinish is called at the end of the pipeline run by the client that submitted the batch job.
@Override
public void onRunFinish(boolean succeeded, BatchSinkContext context) {
// perform any actions that should happen at the end of the run.
}
// initialize is called by each job executor before any call to transform is made.
// This occurs at the start of the batch job run, after the job has been successfully submitted.
// For example, if mapreduce is the execution engine, each mapper will call initialize at the start of the program.
@Override
public void initialize(BatchRuntimeContext context) throws Exception {
super.initialize(context);
// create any resources required by transform()
}
// destroy is called by each job executor at the end of its life.
// For example, if mapreduce is the execution engine, each mapper will call destroy at the end of the program.
@Override
public void destroy() {
// clean up any resources created by initialize
}
@Override
public void transform(StructuredRecord input, Emitter<KeyValue<NullWritable, Text>> emitter) throws Exception {
StringBuilder joinedFields = new StringBuilder();
Iterator<Schema.Field> fieldIter = input.getSchema().getFields().iterator();
if (!fieldIter.hasNext()) {
// shouldn't happen
return;
}
Object val = input.get(fieldIter.next().getName());
if (val != null) {
joinedFields.append(val);
}
while (fieldIter.hasNext()) {
String fieldName = fieldIter.next().getName();
joinedFields.append(config.fieldSeparator);
val = input.get(fieldName);
if (val != null) {
joinedFields.append(val);
}
}
emitter.emit(new KeyValue<>(NullWritable.get(), new Text(joinedFields.toString())));
}
}
|
Lineage
For plugins that write data to non-CDAP sinks, the lineage is registered using the outputName
provided when addOutput()
is invoked on BatchSinkContext
in prepareRun()
. Note that the outputName
should be a valid DatasetId
. For example:
...
In order to implement a Batch Aggregator, you extend the BatchAggregator
class. Unlike a Transform
, which operates on a single record at a time, a BatchAggregator
operates on a collection of records.
Methods
configurePipeline()
: Used to perform any validation on the application configuration that is required by this plugin or to create any streams or datasets if thefieldName
for a stream or dataset is not a macro.initialize()
: Initialize the Batch Aggregator. Guaranteed to be executed before any call to the plugin’sgroupBy
oraggregate
methods. This is called by each executor of the job. For example, if the MapReduce engine is being used, each mapper will call this method.destroy()
: Destroy any resources created byinitialize
. Guaranteed to be executed after all calls to the plugin’sgroupBy
oraggregate
methods have been made. This is called by each executor of the job. For example, if the MapReduce engine is being used, each mapper will call this method.groupBy()
: This method will be called for every object that is received from the previous stage. This method returns zero or more group keys for each object it recievesreceives. Objects with the same group key will be grouped together for theaggregate
method.aggregate()
: The method is called after every object has been assigned their group keys. This method is called once for each group key emitted by thegroupBy
method. The method recieves receives a group key as well as an iterator over all objects that had that group key. Objects emitted in this method are the output for this stage.
...
Code Block |
---|
/**
* Aggregator that counts how many times each word appears in records input to the aggregator.
*/
@Plugin(type = BatchAggregator.PLUGIN_TYPE)
@Name(WordCountAggregator.NAME)
@Description("Counts how many times each word appears in all records input to the aggregator.")
public class WordCountAggregator extends BatchAggregator<String, StructuredRecord, StructuredRecord> {
public static final String NAME = "WordCount";
public static final Schema OUTPUT_SCHEMA = Schema.recordOf(
"wordCount",
Schema.Field.of("word", Schema.of(Schema.Type.STRING)),
Schema.Field.of("count", Schema.of(Schema.Type.LONG))
);
private static final Pattern WHITESPACE = Pattern.compile("\\s");
private final Conf config;
/**
* Config properties for the plugin.
*/
public static class Conf extends PluginConfig {
@Description("The field from the input records containing the words to count.")
private String field;
}
public WordCountAggregator(Conf config) {
this.config = config;
}
@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
// any static configuration validation should happen here.
// We will check that the field is in the input schema and is of type string.
Schema inputSchema = pipelineConfigurer.getStageConfigurer().getInputSchema();
// a null input schema means its unknown until runtime, or its not constant
if (inputSchema != null) {
// if the input schema is constant and known at configure time, check that the input field exists and is a string.
Schema.Field inputField = inputSchema.getField(config.field);
if (inputField == null) {
throw new IllegalArgumentException(
String.format("Field '%s' does not exist in input schema %s.", config.field, inputSchema));
}
Schema fieldSchema = inputField.getSchema();
Schema.Type fieldType = fieldSchema.isNullable() ? fieldSchema.getNonNullable().getType() : fieldSchema.getType();
if (fieldType != Schema.Type.STRING) {
throw new IllegalArgumentException(
String.format("Field '%s' is of illegal type %s. Must be of type %s.",
config.field, fieldType, Schema.Type.STRING));
}
}
// set the output schema so downstream stages will know their input schema.
pipelineConfigurer.getStageConfigurer().setOutputSchema(OUTPUT_SCHEMA);
}
@Override
public void groupBy(StructuredRecord input, Emitter<String> groupKeyEmitter) throws Exception {
String val = input.get(config.field);
if (val == null) {
return;
}
for (String word : WHITESPACE.split(val)) {
groupKeyEmitter.emit(word);
}
}
@Override
public void aggregate(String groupKey, Iterator<StructuredRecord> groupValues,
Emitter<StructuredRecord> emitter) throws Exception {
long count = 0;
while (groupValues.hasNext()) {
groupValues.next();
count++;
}
emitter.emit(StructuredRecord.builder(OUTPUT_SCHEMA).set("word", groupKey).set("count", count).build());
}
}
|
Batch Joiner Plugin
A BatchJoiner
plugin is used to join records over a batch of data. It can be used in both batch and real-time data pipelines. A join takes place in two steps: a joinOn step followed by a merge step.
...
To implement a Batch Joiner, you extend the BatchJoiner
class. Unlike a Transform
, which operates on a single record at a time, a BatchJoiner
operates on a collection of records.
Methods
configurePipeline()
: Used to create any streams or datasets, or perform any validation on the application configuration that is required by this plugin.initialize()
: Initialize the Batch Joiner. Guaranteed to be executed before any call to the plugin’sjoinOn
ormerge
methods. This is called by each executor of the job. For example, if the MapReduce engine is being used, each mapper will call this method.prepareRun()
: Prepares a pipeline run. This is run every time before a pipeline runs to help set up the run. Here you can set properties such as the number of partitions to use when joining and the join key class, if it is not known at compile time.destroy()
: Destroy any resources created by theinitialize
method. Guaranteed to be executed after all calls to the plugin’sjoinOn
ormerge
methods have been made. This is called by each executor of the job. For example, if the MapReduce engine is being used, each mapper will call this method.joinOn()
: This method will be called for every object that is received from the previous stage. This method returns a join key for each object it recievesreceives. Objects with the same join key will be grouped together for themerge
method.getJoinConfig()
: This method will be called by the CDAP Pipeline to find out the type of join to be performed. The config specifies which input stages arerequiredInputs
. Records from a required input will always be present in themerge()
method. Records from a non-required input will only be present in themerge()
method if they meet the join criteria. In other words, if there are no required inputs, a full outer join is performed. If all inputs are required inputs, an inner join is performed.merge()
: This method is called after each object has been assigned a join key. The method recieves receives a join key, an iterator over all objects with that join key, and the stage that emitted the object. Objects emitted by this method are the output for this stage.
...
In order to implement a Spark Compute Plugin, you extend the SparkCompute
class.
Methods
configurePipeline()
: Used to perform any validation on the application configuration that is required by this plugin or to create any streams or datasets if thefieldName
for a stream or dataset is not a macro.transform()
: This method is given a Spark RDD (Resilient Distributed Dataset) containing every object that is received from the previous stage. This method then performs Spark operations on the input to transform it into an output RDD that will be sent to the next stage.
...
Code Block |
---|
/**
* SparkCompute plugin that counts how many times each word appears in records input to the compute stage.
*/
@Plugin(type = SparkCompute.PLUGIN_TYPE)
@Name(WordCountCompute.NAME)
@Description("Counts how many times each word appears in all records input to the aggregator.")
public class WordCountCompute extends SparkCompute<StructuredRecord, StructuredRecord> {
public static final String NAME = "WordCount";
public static final Schema OUTPUT_SCHEMA = Schema.recordOf(
"wordCount",
Schema.Field.of("word", Schema.of(Schema.Type.STRING)),
Schema.Field.of("count", Schema.of(Schema.Type.LONG))
);
private final Conf config;
/**
* Config properties for the plugin.
*/
public static class Conf extends PluginConfig {
@Description("The field from the input records containing the words to count.")
private String field;
}
public WordCountCompute(Conf config) {
this.config = config;
}
@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
// any static configuration validation should happen here.
// We will check that the field is in the input schema and is of type string.
Schema inputSchema = pipelineConfigurer.getStageConfigurer().getInputSchema();
if (inputSchema != null) {
WordCount wordCount = new WordCount(config.field);
wordCount.validateSchema(inputSchema);
}
// set the output schema so downstream stages will know their input schema.
pipelineConfigurer.getStageConfigurer().setOutputSchema(OUTPUT_SCHEMA);
}
@Override
public JavaRDD<StructuredRecord> transform(SparkExecutionPluginContext sparkExecutionPluginContext,
JavaRDD<StructuredRecord> javaRDD) throws Exception {
WordCount wordCount = new WordCount(config.field);
return wordCount.countWords(javaRDD)
.flatMap(new FlatMapFunction<Tuple2<String, Long>, StructuredRecord>() {
@Override
public Iterable<StructuredRecord> call(Tuple2<String, Long> stringLongTuple2) throws Exception {
List<StructuredRecord> output = new ArrayList<>();
output.add(StructuredRecord.builder(OUTPUT_SCHEMA)
.set("word", stringLongTuple2._1())
.set("count", stringLongTuple2._2())
.build());
return output;
}
});
}
}
|
Spark Sink Plugin
A SparkSink
plugin is used to perform computations on a collection of input records and optionally write output data. It can only be used in batch data pipelines. A SparkSink
is similar to a SparkCompute
plugin except that it has no output. In a SparkSink
, you are given access to anything you would be able to do in a Spark program. For example, one common use case is to train a machine-learning model in this plugin.
In order to implement a Spark Sink Plugin, you extend the SparkSink
class.
Methods
configurePipeline()
: Used to perform any validation on the application configuration that is required by this plugin or to create any streams or datasets if thefieldName
for a stream or dataset is not a macro.run()
: This method is given a Spark RDD (Resilient Distributed Dataset) containing every object that is received from the previous stage. This Then this method then performs Spark operations on the input, and usually saves the result to a dataset.
...
Code Block |
---|
/**
* SparkSink plugin that counts how many times each word appears in records input to it and stores the result in
* a KeyValueTable.
*/
@Plugin(type = SparkSink.PLUGIN_TYPE)
@Name(WordCountSink.NAME)
@Description("Counts how many times each word appears in all records input to the aggregator.")
public class WordCountSink extends SparkSink<StructuredRecord> {
public static final String NAME = "WordCount";
private final Conf config;
/**
* Config properties for the plugin.
*/
public static class Conf extends PluginConfig {
@Description("The field from the input records containing the words to count.")
private String field;
@Description("The name of the KeyValueTable to write to.")
private String tableName;
}
public WordCountSink(Conf config) {
this.config = config;
}
@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
// any static configuration validation should happen here.
// We will check that the field is in the input schema and is of type string.
Schema inputSchema = pipelineConfigurer.getStageConfigurer().getInputSchema();
if (inputSchema != null) {
WordCount wordCount = new WordCount(config.field);
wordCount.validateSchema(inputSchema);
}
pipelineConfigurer.createDataset(config.tableName, KeyValueTable.class, DatasetProperties.EMPTY);
}
@Override
public void prepareRun(SparkPluginContext sparkPluginContext) throws Exception {
// no-op
}
@Override
public void run(SparkExecutionPluginContext sparkExecutionPluginContext,
JavaRDD<StructuredRecord> javaRDD) throws Exception {
WordCount wordCount = new WordCount(config.field);
JavaPairRDD outputRDD = wordCount.countWords(javaRDD)
.mapToPair(new PairFunction<Tuple2<String, Long>, byte[], byte[]>() {
@Override
public Tuple2<byte[], byte[]> call(Tuple2<String, Long> stringLongTuple2) throws Exception {
return new Tuple2<>(Bytes.toBytes(stringLongTuple2._1()), Bytes.toBytes(stringLongTuple2._2()));
}
});
sparkExecutionPluginContext.saveAsDataset(outputRDD, config.tableName);
}
}
|
Streaming Source Plugin
A Streaming Source plugin is used as a source in real-time data pipelines. It is used to fetch a Spark DStream, which is an object that represents a collection of Spark RDDs and that produces a new RDD every batch interval of the pipeline.
In order to implement a Streaming Source Plugin, you extend the StreamingSource
class.
Methods
configurePipeline()
: Used to perform any validation on the application configuration that is required by this plugin or to create any streams or datasets if thefieldName
for a stream or dataset is not a macro.getStream()
: Returns theJavaDStream
that will be used as a source in the pipeline.
...
Code Block |
---|
@Plugin(type = StreamingSource.PLUGIN_TYPE)
@Name("Twitter")
@Description("Twitter streaming source.")
public class TwitterStreamingSource extends StreamingSource<StructuredRecord> {
private final TwitterStreamingConfig config;
/**
* Config class for TwitterStreamingSource.
*/
public static class TwitterStreamingConfig extends PluginConfig implements Serializable {
private static final long serialVersionUID = 4218063781909515444L;
private String consumerKey;
private String consumerSecret;
private String accessToken;
private String accessTokenSecret;
private String referenceName;
}
public TwitterStreamingSource(TwitterStreamingConfig config) {
this.config = config;
}
@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
pipelineConfigurer.getStageConfigurer().setOutputSchema(TwitterConstants.SCHEMA);
}
@Override
public JavaDStream<StructuredRecord> getStream(StreamingContext context) throws Exception {
JavaStreamingContext javaStreamingContext = context.getSparkStreamingContext();
// lineage for this source will be tracked with this reference name
context.registerLineage(config.referenceName);
// Create authorization from user-provided properties
ConfigurationBuilder configurationBuilder = new ConfigurationBuilder();
configurationBuilder.setDebugEnabled(false)
.setOAuthConsumerKey(config.consumerKey)
.setOAuthConsumerSecret(config.consumerSecret)
.setOAuthAccessToken(config.accessToken)
.setOAuthAccessTokenSecret(config.accessTokenSecret);
Authorization authorization = new OAuthAuthorization(configurationBuilder.build());
return TwitterUtils.createStream(javaStreamingContext, authorization).map(
new Function<Status, StructuredRecord>() {
public StructuredRecord call(Status status) {
return convertTweet(status);
}
}
);
}
private StructuredRecord convertTweet(Status tweet) {
// logic to convert a Twitter Status into a CDAP StructuredRecord
}
}
|
Lineage
The lineage is registered using the referenceName
provided when invoking registerLineage()
on StreamingContext
in getStream()
. Note that the referenceName
should be a valid DatasetId
.
...
For example, consider a pipeline with a batchInterval
of 10 seconds. The pipeline uses a windower
that has a size of 60 and a slide interval of 30. The input into the windower
will be micro batches containing 10 seconds of data. Every 30 seconds, the windower
will output a batch of data containing the past 60 seconds of data, meaning the previous six micro batches that it received as input.
...
In order to implement a Windower Plugin, you extend the Windower
class.
Methods
configurePipeline()
: Used to perform any validation on the application configuration that is required by this plugin or to create any streams or datasets if thefieldName
for a stream or dataset is not a macro.getWidth()
: Return the width in seconds of windows created by this plugin. Must be a multiple of thebatchInterval
of the pipeline.getSlideInterval()
: Get the slide interval in seconds of windows created by this plugin. Must be a multiple of thebatchInterval
of the pipeline.
...