Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
public abstract class Aggregation<GROUP_BY, RECORD_TYPE, OUTPUT_TYPE> {
 
  public abstract groupBy(RECORD_TYPE input, Emitter<KeyValue<GROUP_BY, RECORD_TYPE>> emitter);
 
  public abstract aggregate(GROUP_BY groupKey, Iterable<RECORD_TYPE> groupRecords, Emitter<OUTPUT_TYPE> emitter);
 
}

@Plugin(type = "aggregation")
@Name("GroupByAggregate")
public RecordAggregationGroupByAggregate extends Aggregation<StructuredRecord, StructuredRecord, StructuredRecord> {
  private static final AggConfig config;
  
  public static class AggConfig extends PluginConfig {
    private String groupBy;
 
    // ideally this would be Map<String, FunctionConfig> functions
    private String functions;
  }
 
  public void configurePipeline(PipelineConfigurer configurer) {
    Map<String, FunctionConfig> functions = gson.fromJson(config.functions, MAP_TYPE);
    for each function:
      usePlugin(id, type, name, properties);
  }
 
  public groupBy(StructuredRecord input, Emitter<KeyValue<StructuredRecord, StructuredRecord>> emitter) {
    // key = new record from input with only fields in config.groupBy
    // emitter.emit(new KeyValue<>(key, input));
  }
 
  public aggregate(StructuredRecord groupKey, Iterable<StructuredRecord> groupRecords, Emitter<StructuredRecord> emitter) {
    Map<String, FunctionConfig> functions = gson.fromJson(config.functions, MAP_TYPE);
    for each function:
      val = function.aggregate(groupRecords);
    for (StructuredRecord record : groupRecords) {
      function.update(record);
    }
    // build record from group key and function values
    for each function:
      val = function.aggregate();
    // emit record
  }
 
}
 
public abstract class AggregationFunction<RECORD_TYPE, OUTPUT_TYPE> {
 
  public abstract void update(RECORD_TYPE record);
  public abstract OUTPUT_TYPE aggregate();
}
 
@Plugin(type = "aggregationFunction")
@Name("sum")
public SumAggregation extends AggregationFunction<StructuredRecord, Number> {
  private final SumConfig config;
  private Number sum;
  
  public static class SumConfig extends PluginConfig {
    private String column;
  }
 
  public void update(StructuredRecord record) {
    // get type of config.column, initialize sum to right type based on that
    sum += record;
  }
 
  public Number aggregate() {
    return sum;
  }
}

...