...
Code Block |
---|
public abstract class Aggregation<GROUP_BY, RECORD_TYPE, OUTPUT_TYPE> { public abstract void groupBy(RECORD_TYPE input, Emitter<KeyValue<GROUPEmitter<GROUP_BY, RECORD_TYPE>> BY> emitter); public abstract void aggregate(GROUP_BY groupKey, Iterable<RECORD_TYPE> groupRecords, Emitter<OUTPUT_TYPE> emitter); } @Plugin(type = "aggregation") @Name("GroupByAggregate") public GroupByAggregate extends Aggregation<StructuredRecord, StructuredRecord, StructuredRecord> { private static final AggConfig config; public static class AggConfig extends PluginConfig { private String groupBy; // ideally this would be Map<String, FunctionConfig> functions private String functions; } public void configurePipeline(PipelineConfigurer configurer) { Map<String, FunctionConfig> functions = gson.fromJson(config.functions, MAP_TYPE); for each function: usePlugin(id, type, name, properties); } public groupBy(StructuredRecord input, Emitter<KeyValue<StructuredRecord, StructuredRecord>>Emitter<StructuredRecord> emitter) { // key = new record from input with only fields in config.groupBy Set<String> //fields = config.groupBy.split(","); emitter.emit(new KeyValue<>recordSubset(keyinput, inputfields)); } public publicvoid aggregate(StructuredRecord groupKey, Iterable<StructuredRecord> groupRecords, Emitter<StructuredRecord> emitterinitialize() { Map<String, FunctionConfig> functions = gson.fromJson(config.functions, MAP_TYPE); for each function: val = function.aggregate(groupRecords); } for (StructuredRecord record : public void aggregate(StructuredRecord groupKey, Iterable<StructuredRecord> groupRecords, Emitter<StructuredRecord> emitter) { // reset all functions for (StructuredRecord record : groupRecords) { foreach function: function.update(record); } // build record from group key and function values for each function: val = function.aggregate(); // emit record } } public abstract class AggregationFunction<RECORD_TYPE, OUTPUT_TYPE> { public abstract void reset(); public abstract void update(RECORD_TYPE record); public abstract OUTPUT_TYPE aggregate(); } @Plugin(type = "aggregationFunction") @Name("sum") public SumAggregation extends AggregationFunction<StructuredRecord, Number> { private final SumConfig config; private Number sum; public static class SumConfig extends PluginConfig { private String column; } public void update(StructuredRecord record) { // get type of config.column, initialize sum to right type based on that sum += (casted to correct thing) record.get(config.column); } public Number aggregate() { return sum; } } |
...