Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Introduce a new plugin type "aggregation".

Code Block
Option 1:
{
  "stages": [
    {
      "name": "inputTable",
      "plugin": {
        "name": "Table",
        "type": "batchsource",  // new field
        "properties": {
        }
      }
    },
    {
      "name": "aggStage",
      "plugin": {
        "name": "RecordAggregator",
        "type": "aggregation",  // new plugin type
        "properties": {
          "groupBy": "id",
          "functions": "{
            "totalPrice": {
              "name": "sum",
              "properties": {
                "column": "price"
              }
            },
            "numTransactions": {
              "name": "count"
            }
          }"
        }
      }
    }
  ],
  "connections": [
    { "from": "inputTable", "to": "aggStage" } 
  ]
}
 
Option 2:
{
  "sources": [
    {
      "name": "inputTable",
      "plugin": {
        "name": "Table",
        "type": "batchsource",  // new field
        "properties": {
        }
      }
    }
  ],
  "aggregations": [
    {
      "name": "aggStage",
      "groupBy": "id",
      "aggregations": [
        {
          "columnName": "totalPrice",
          "plugin": {
            "name": "sum",
            "properties": {
              "column": "price"
            }
          }
        },
        {
          "columnName": "numTransactions",
          "plugin": {
            "name": "count"
          }
        }
      ]
    }
  ],
  "connections": [
    { "from": "inputTable", "to": "aggStage" } 
  ]
}
 
public abstract class Aggregation<INPUT_TYPE, GROUP_BY, RECORD_TYPE, OUTPUT_TYPE> {
 
  public abstract groupBy(INPUT_TYPE input, Emitter<KeyValue<GROUP_BY, RECORD_TYPE>> emitter);
 
  public abstract aggregate(GROUP_BY groupKey, Iterable<RECORD_TYPE> groupRecords, Emitter<OUTPUT_TYPE> emitter);
 
}

@Plugin(type = "aggregation")
@Name("record")
public RecordAggregation extends Aggregation<StructuredRecord, StructuredRecord, StructuredRecord, StructuredRecord> {
  private static final AggConfig config;
  
  public static class AggConfig extends PluginConfig {
    private String groupBy;
 
    // ideally this would be Map<String, FunctionConfig> functions
    private String functions;
  }
 
  public void configurePipeline(PipelineConfigurer configurer) {
    Map<String, FunctionConfig> functions = gson.fromJson(config.functions, MAP_TYPE);
    for each function:
      usePlugin(id, type, name, properties);
  }
 
  public groupBy(StructuredRecord input, Emitter<KeyValue<StructuredRecord, StructuredRecord>> emitter) {
    // key = new record from input with only fields in config.groupBy
    // emitter.emit(new KeyValue<>(key, input));
  }
 
  public aggregate(StructuredRecord groupKey, Iterable<StructuredRecord> groupRecords, Emitter<StructuredRecord> emitter) {
    Map<String, FunctionConfig> functions = gson.fromJson(config.functions, MAP_TYPE);
    for each function:
      val = function.aggregate(groupRecords);
    for (StructuredRecord record : groupRecords) {
      function.update(record);
    }
    // build record from group key and function values
    for each function:
      val = function.aggregate();
    // emit record
  }
 
}
 
public abstract class AggregationFunction<RECORD_TYPE, OUTPUT_TYPE> {
 
  public abstract void update(RECORD_TYPE record);
  public abstract OUTPUT_TYPE aggregate();
}
 
@Plugin(type = "aggregationFunction")
@Name("sum")
public SumAggregation extends AggregationFunction<StructuredRecord, Number> {
  private final SumConfig config;
  private Number sum;
  
  public static class SumConfig extends PluginConfig {
    private String column;
  }
 
  public void update(StructuredRecord record) {
    // get type of config.column, initialize sum to right type based on that
    sum += record;
  }
 
  public Number aggregate() {
    return sum;
  }
}
 

...