Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
public abstract class Aggregation<GROUP_BY, RECORD_TYPE, OUTPUT_TYPE> {
 
  public abstract void groupBy(RECORD_TYPE input, Emitter<KeyValue<GROUPEmitter<GROUP_BY, RECORD_TYPE>> BY> emitter);
 
  public abstract void aggregate(GROUP_BY groupKey, Iterable<RECORD_TYPE> groupRecords, Emitter<OUTPUT_TYPE> emitter);
 
}

@Plugin(type = "aggregation")
@Name("GroupByAggregate")
public GroupByAggregate extends Aggregation<StructuredRecord, StructuredRecord, StructuredRecord> {
  private static final AggConfig config;
  
  public static class AggConfig extends PluginConfig {
    private String groupBy;
 
    // ideally this would be Map<String, FunctionConfig> functions
    private String functions;
  }
 
  public void configurePipeline(PipelineConfigurer configurer) {
    Map<String, FunctionConfig> functions = gson.fromJson(config.functions, MAP_TYPE);
    for each function:
      usePlugin(id, type, name, properties);
  }
 
  public groupBy(StructuredRecord input, Emitter<KeyValue<StructuredRecord, StructuredRecord>>Emitter<StructuredRecord> emitter) {
    // key = new record from input with only fields in config.groupBy
    Set<String>  //fields = config.groupBy.split(",");
    emitter.emit(new KeyValue<>recordSubset(keyinput, inputfields));
  }
 
  public publicvoid aggregate(StructuredRecord groupKey, Iterable<StructuredRecord> groupRecords, Emitter<StructuredRecord> emitterinitialize() {
    Map<String, FunctionConfig> functions = gson.fromJson(config.functions, MAP_TYPE);
    for each function:
      val = function.aggregate(groupRecords);
  }
 for
(StructuredRecord record : public void aggregate(StructuredRecord groupKey, Iterable<StructuredRecord> groupRecords, Emitter<StructuredRecord> emitter) {
    // reset all functions
    for (StructuredRecord record : groupRecords) {
      foreach function:
        function.update(record);
    }
    // build record from group key and function values
    for each function:
      val = function.aggregate();
    // emit record
  }
 
}
 
public abstract class AggregationFunction<RECORD_TYPE, OUTPUT_TYPE> {
  public abstract void reset();
  public abstract void update(RECORD_TYPE record);
  public abstract OUTPUT_TYPE aggregate();
}
 
@Plugin(type = "aggregationFunction")
@Name("sum")
public SumAggregation extends AggregationFunction<StructuredRecord, Number> {
  private final SumConfig config;
  private Number sum;
  
  public static class SumConfig extends PluginConfig {
    private String column;
  }
 
  public void update(StructuredRecord record) {
    // get type of config.column, initialize sum to right type based on that
    sum += (casted to correct thing) record.get(config.column);
  }
 
  public Number aggregate() {
    return sum;
  }
}

...