...
Code Block |
---|
{ "stages": [ { "name": "inputTable", "plugin": { "name": "Table", "type": "batchsource", // new field "properties": { } } }, { "name": "aggStage", "plugin": { "name": "RecordAggregatorGroupByAggregate", "type": "aggregation", // new plugin type "properties": { "groupBy": "id", "functions": "{ [ "totalPrice": { "namecolumnName": "sumtotalPrice", "propertiesplugin": { "columnname": "pricesum", } "properties": { }, "numTransactionscolumn": {"price" "name": "count" } } } }" }, } } { } ], "connections": [ { "fromcolumnName": "inputTable", "to": "aggStage" } ] } |
...
numTransactions",
"plugin": {
"name": "count"
}
}
]"
}
}
}
],
"connections": [
{ "from": "inputTable", "to": "aggStage" }
]
} |
Some problems with this is that the plugin property "functions" is itself a json describing plugins to use. This is not easy for somebody to configure, but maybe it could be simplified by a UI widget type.
Option 2: Keep the current structure of the config, with plugin type at the top level.
Code Block |
---|
{ "sources": [ { "name": "inputTable", "plugin": { "name": "Table", "type": "batchsource", // new field "properties": { } } } ], "aggregations": [ { "name": "aggStage", "groupBy": "id", "aggregations": [ { "columnName": "totalPrice", "plugin": { "name": "sum", "properties": { "column": "price" } } }, { "columnName": "numTransactions", "plugin": { "name": "count" } } ] } ], "connections": [ { "from": "inputTable", "to": "aggStage" } ] } |
This would allow us to have more structure in the config, but then we lose the generalization of plugin types.
Java APIs for plugin developers. It is basically mapreduce, 'Aggregation' is probably a bad name for this.
Code Block |
---|
public abstract class Aggregation<INPUT_TYPE, GROUP_BY, RECORD_TYPE, OUTPUT_TYPE> { public abstract groupBy(INPUT_TYPE input, Emitter<KeyValue<GROUP_BY, RECORD_TYPE>> emitter); public abstract aggregate(GROUP_BY groupKey, Iterable<RECORD_TYPE> groupRecords, Emitter<OUTPUT_TYPE> emitter); } @Plugin(type = "aggregation") @Name("recordGroupByAggregate") public RecordAggregation extends Aggregation<StructuredRecord, StructuredRecord, StructuredRecord, StructuredRecord> { private static final AggConfig config; public static class AggConfig extends PluginConfig { private String groupBy; // ideally this would be Map<String, FunctionConfig> functions private String functions; } public void configurePipeline(PipelineConfigurer configurer) { Map<String, FunctionConfig> functions = gson.fromJson(config.functions, MAP_TYPE); for each function: usePlugin(id, type, name, properties); } public groupBy(StructuredRecord input, Emitter<KeyValue<StructuredRecord, StructuredRecord>> emitter) { // key = new record from input with only fields in config.groupBy // emitter.emit(new KeyValue<>(key, input)); } public aggregate(StructuredRecord groupKey, Iterable<StructuredRecord> groupRecords, Emitter<StructuredRecord> emitter) { Map<String, FunctionConfig> functions = gson.fromJson(config.functions, MAP_TYPE); for each function: val = function.aggregate(groupRecords); for (StructuredRecord record : groupRecords) { function.update(record); } // build record from group key and function values for each function: val = function.aggregate(); // emit record } } public abstract class AggregationFunction<RECORD_TYPE, OUTPUT_TYPE> { public abstract void update(RECORD_TYPE record); public abstract OUTPUT_TYPE aggregate(); } @Plugin(type = "aggregationFunction") @Name("sum") public SumAggregation extends AggregationFunction<StructuredRecord, Number> { private final SumConfig config; private Number sum; public static class SumConfig extends PluginConfig { private String column; } public void update(StructuredRecord record) { // get type of config.column, initialize sum to right type based on that sum += record; } public Number aggregate() { return sum; } } |
...