...
Introduce a new plugin type "aggregation".
Code Block |
---|
Option 1: { "stages": [ { "name": "inputTable", "plugin": { "name": "Table", "type": "batchsource", // new field "properties": { } } }, { "name": "aggStage", "plugin": { "name": "RecordAggregator", "type": "aggregation", // new plugin type "properties": { "groupBy": "id", "functions": "{ "totalPrice": { "name": "sum", "properties": { "column": "price" } }, "numTransactions": { "name": "count" } }" } } } ], "connections": [ { "from": "inputTable", "to": "aggStage" } ] } Option 2: { "sources": [ { "name": "inputTable", "plugin": { "name": "Table", "type": "batchsource", // new field "properties": { } } } ], "aggregations": [ { "name": "aggStage", "groupBy": "id", "aggregations": [ { "columnName": "totalPrice", "plugin": { "name": "sum", "properties": { "column": "price" } } }, { "columnName": "numTransactions", "plugin": { "name": "count" } } ] } ], "connections": [ { "from": "inputTable", "to": "aggStage" } ] } public abstract class Aggregation<INPUT_TYPE, GROUP_BY, RECORD_TYPE, OUTPUT_TYPE> { public abstract groupBy(INPUT_TYPE input, Emitter<KeyValue<GROUP_BY, RECORD_TYPE>> emitter); public abstract aggregate(GROUP_BY groupKey, Iterable<RECORD_TYPE> groupRecords, Emitter<OUTPUT_TYPE> emitter); } @Plugin(type = "aggregation") @Name("record") public RecordAggregation extends Aggregation<StructuredRecord, StructuredRecord, StructuredRecord, StructuredRecord> { private static final AggConfig config; public static class AggConfig extends PluginConfig { private String groupBy; // ideally this would be Map<String, FunctionConfig> functions private String functions; } public void configurePipeline(PipelineConfigurer configurer) { Map<String, FunctionConfig> functions = gson.fromJson(config.functions, MAP_TYPE); for each function: usePlugin(id, type, name, properties); } public groupBy(StructuredRecord input, Emitter<KeyValue<StructuredRecord, StructuredRecord>> emitter) { // key = new record from input with only fields in config.groupBy // emitter.emit(new KeyValue<>(key, input)); } public aggregate(StructuredRecord groupKey, Iterable<StructuredRecord> groupRecords, Emitter<StructuredRecord> emitter) { Map<String, FunctionConfig> functions = gson.fromJson(config.functions, MAP_TYPE); for each function: val = function.aggregate(groupRecords); for (StructuredRecord record : groupRecords) { function.update(record); } // build record from group key and function values for each function: val = function.aggregate(); // emit record } } public abstract class AggregationFunction<RECORD_TYPE, OUTPUT_TYPE> { public abstract void update(RECORD_TYPE record); public abstract OUTPUT_TYPE aggregate(); } @Plugin(type = "aggregationFunction") @Name("sum") public SumAggregation extends AggregationFunction<StructuredRecord, Number> { private final SumConfig config; private Number sum; public static class SumConfig extends PluginConfig { private String column; } public void update(StructuredRecord record) { // get type of config.column, initialize sum to right type based on that sum += record; } public Number aggregate() { return sum; } } |
...