...
- A developer should be able to create pipelines that contain aggregations (GROUP BY -> count/sum/unique)
- A developer should be able to create a pipeline with multiple sources, with one happening after the other
- A developer should be able to use a Spark ML job as a pipeline stage
- A developer should be able to rerun failed pipeline runs without reconfiguring the pipeline
- A developer should be able to de-duplicate records in a pipeline
- A developer should be able to join multiple branches of a pipeline
- A developer should be able to use an Explore action as a pipeline stage
- A developer should be able to create pipelines that contain Spark Streaming jobs
- A developer should be able to create pipelines that run based on various conditions, including input data availability and Kafka events
...
Code Block |
---|
Option 1:
{
"stages": [
{
"name": "inputTable",
"plugin": {
"name": "Table",
"type": "batchsource", // new field
"properties": {
}
}
},
{
"name": "aggStage",
"plugin": {
"name": "RecordAggregator",
"type": "aggregation", // new plugin type
"properties": {
"groupBy": "id",
"functions": "{
"totalPrice": {
"name": "sum",
"properties": {
"column": "price"
}
},
"numTransactions": {
"name": "count"
}
}"
}
}
}
],
"connections": [
{ "from": "inputTable", "to": "aggStage" }
]
}
Option 2:
{
"sources": [
{
"name": "inputTable",
"plugin": {
"name": "Table",
"type": "batchsource", // new field
"properties": {
}
}
}
],
"aggregations": [
{
"name": "aggStage",
"groupBy": "id",
"aggregations": [
{
"columnName": "totalPrice",
"plugin": {
"name": "sum",
"properties": {
"column": "price"
}
}
},
{
"columnName": "numTransactions",
"plugin": {
"name": "count"
}
}
]
}
],
"connections": [
{ "from": "inputTable", "to": "aggStage" }
]
}
public abstract class Aggregation<INPUT_TYPE, GROUP_BY, RECORD_TYPE, OUTPUT_TYPE> {
public abstract groupBy(INPUT_TYPE input, Emitter<KeyValue<GROUP_BY, RECORD_TYPE>> emitter);
public abstract aggregate(GROUP_BY groupKey, Iterable<RECORD_TYPE> groupRecords, Emitter<OUTPUT_TYPE> emitter);
}
@Plugin(type = "aggregation")
@Name("record")
public RecordAggregation extends Aggregation<StructuredRecord, StructuredRecord, StructuredRecord, StructuredRecord> {
private static final AggConfig config;
public static class AggConfig extends PluginConfig {
private String groupBy;
// ideally this would be Map<String, FunctionConfig> functions
private String functions;
}
public void configurePipeline(PipelineConfigurer configurer) {
Map<String, FunctionConfig> functions = gson.fromJson(config.functions, MAP_TYPE);
for each function:
usePlugin(id, type, name, properties);
}
public groupBy(StructuredRecord input, Emitter<KeyValue<StructuredRecord, StructuredRecord>> emitter) {
// key = new record from input with only fields in config.groupBy
// emitter.emit(new KeyValue<>(key, input));
}
public aggregate(StructuredRecord groupKey, Iterable<StructuredRecord> groupRecords, Emitter<StructuredRecord> emitter) {
Map<String, FunctionConfig> functions = gson.fromJson(config.functions, MAP_TYPE);
for each function:
val = function.aggregate(groupRecords);
for (StructuredRecord record : groupRecords) {
function.update(record);
}
// build record from group key and function values
for each function:
val = function.aggregate();
// emit record
}
}
public abstract class AggregationFunction<RECORD_TYPE, OUTPUT_TYPE> {
public abstract void update(RECORD_TYPE record);
public abstract OUTPUT_TYPE aggregate();
}
@Plugin(type = "aggregationFunction")
@Name("sum")
public SumAggregation extends AggregationFunction<StructuredRecord, Number> {
private final SumConfig config;
private Number sum;
public static class SumConfig extends PluginConfig {
private String column;
}
public void update(StructuredRecord record) {
// get type of config.column, initialize sum to right type based on that
sum += record;
}
public Number aggregate() {
return sum;
}
}
|
Story 2: Multiple Sources
Option 1: Introduce the concept of "phases". Each phase has its own dag. Phases can be connected, with connections denoting control flow rather than data flow.
Code Block |
---|
{
"phases": [
{
"name": "phase1",
"stages": [...],
"connections": [...]
},
{
"name": "phase2",
"stages": [...],
"connections": [...]
}
]
"connections": [
{ "from": "phase1", "to": "phase2" }
]
} |
Option2: Introduce "runcondition" plugin type. Connections into a run condition imply control flow rather than data flow.
Code Block |
---|
{ "stages": [ { "name": "customersTable", "plugin": { "name": "Database", "type": "batchsource", ... } }, { "name": "customersFiles", "plugin": { "name": "TPFSParquet", "type": "batchsink", ... } }, { "name": "afterDump", "plugin": { "name": "AlwaysRun", "type": "runcondition" } }, { "name": "purchasesTable", "plugin": { "name": "Database", "type": "batchsource" } }, { "name": "purchasesFiles", "plugin": { "name": "TPFSParquet", "type": "batchsink", ... } }, ], "connections": [ { "from": "customersTable", "to": "customersFiles" }, { "from": "customersFiles", "to": "afterDump" }, { "from": "afterDump", "to": "purchasesTable" }, { "from": "purchasesTable", "to": "purchasesFiles" } ] } |