...
- A developer should be able to create pipelines that contain aggregations (GROUP BY -> count/sum/unique)
- A developer should be able to create a pipeline with multiple sources, with one happening after the other
- A developer should be able to use a Spark ML job as a pipeline stage
- A developer should be able to rerun failed pipeline runs without reconfiguring the pipeline
- A developer should be able to de-duplicate records in a pipeline
- A developer should be able to join multiple branches of a pipeline
- A developer should be able to use an Explore action as a pipeline stage
- A developer should be able to create pipelines that contain Spark Streaming jobs
- A developer should be able to create pipelines that run based on various conditions, including input data availability and Kafka events
Design:
Story 1: Group By -> Aggregations
Option 1:
Introduce a new plugin type "aggregation". In general, to support more and more plugin types in a generic way, we want to refactor the config:
Code Block |
---|
Option 1: { "stages": [ { "name": "inputTable", "plugin": { "name": "Table", "type": "batchsource", // new field "properties": { } } }, { "name": "aggStage", "plugin": { "name": "RecordAggregator", "type": "aggregation", // new plugin type "properties": { "groupBy": "id", "functions": "{ "totalPrice": { "name": "sum", "properties": { "column": "price" } }, "numTransactions": { "name": "count" } }" } } } ], "connections": [ { "from": "inputTable", "to": "aggStage" } ] } |
...
Option
...
1: Keep the current structure of the config, with plugin type at the top level.
Code Block |
---|
{
"sources": [
{
"name": "inputTable",
"plugin": {
"name": "Table",
"type": "batchsource", // new field
"properties": {
}
}
}
],
"aggregations": [
{
"name": "aggStage",
"groupBy": "id",
"aggregations": [
{
"columnName": "totalPrice",
"plugin": {
"name": "sum",
"properties": {
"column": "price"
}
}
},
{
"columnName": "numTransactions",
"plugin": {
"name": "count"
}
}
]
}
],
"connections": [
{ "from": "inputTable", "to": "aggStage" }
]
}
|
Java APIs for plugin developers
Code Block |
---|
public abstract class Aggregation<INPUT_TYPE, GROUP_BY, RECORD_TYPE, OUTPUT_TYPE> { public abstract groupBy(INPUT_TYPE input, Emitter<KeyValue<GROUP_BY, RECORD_TYPE>> emitter); public abstract aggregate(GROUP_BY groupKey, Iterable<RECORD_TYPE> groupRecords, Emitter<OUTPUT_TYPE> emitter); } @Plugin(type = "aggregation") @Name("record") public RecordAggregation extends Aggregation<StructuredRecord, StructuredRecord, StructuredRecord, StructuredRecord> { private static final AggConfig config; public static class AggConfig extends PluginConfig { private String groupBy; // ideally this would be Map<String, FunctionConfig> functions private String functions; } public void configurePipeline(PipelineConfigurer configurer) { Map<String, FunctionConfig> functions = gson.fromJson(config.functions, MAP_TYPE); for each function: usePlugin(id, type, name, properties); } public groupBy(StructuredRecord input, Emitter<KeyValue<StructuredRecord, StructuredRecord>> emitter) { // key = new record from input with only fields in config.groupBy // emitter.emit(new KeyValue<>(key, input)); } public aggregate(StructuredRecord groupKey, Iterable<StructuredRecord> groupRecords, Emitter<StructuredRecord> emitter) { Map<String, FunctionConfig> functions = gson.fromJson(config.functions, MAP_TYPE); for each function: val = function.aggregate(groupRecords); for (StructuredRecord record : groupRecords) { function.update(record); } // build record from group key and function values for each function: val = function.aggregate(); // emit record } } public abstract class AggregationFunction<RECORD_TYPE, OUTPUT_TYPE> { public abstract void update(RECORD_TYPE record); public abstract OUTPUT_TYPE aggregate(); } @Plugin(type = "aggregationFunction") @Name("sum") public SumAggregation extends AggregationFunction<StructuredRecord, Number> { private final SumConfig config; private Number sum; public static class SumConfig extends PluginConfig { private String column; } public void update(StructuredRecord record) { // get type of config.column, initialize sum to right type based on that sum += record; } public Number aggregate() { return sum; } } |
Note: This would also satisfy user story 5, where a unique can be implemented as a Aggregation plugin, where you group by the fields you want to unique, and ignore the Iterable<> in aggregate and just emit the group key.
...