Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
{
  "stages": [
    {
      "name": "inputTable",
      "plugin": {
        "name": "Table",
        "type": "batchsource",  // new field
        "properties": {
        }
      }
    },
    {
      "name": "aggStage",
      "plugin": {
        "name": "RecordAggregatorGroupByAggregate",
        "type": "aggregation",  // new plugin type
        "properties": {
          "groupBy": "id",
          "functions": "{
[
           "totalPrice": {
              "namecolumnName": "sumtotalPrice",
              "propertiesplugin": {
                "columnname": "pricesum",
              }     "properties": {
       },             "numTransactionscolumn": {"price"
              "name": "count" }
           }   }
       }"     },
   }       }  {
  }   ],   "connections": [     { "fromcolumnName": "inputTable", "to": "aggStage" } 
  ]
}

 

...

numTransactions",
              "plugin": {
                "name": "count"
              }
            }
          ]"
        }
      }
    }
  ],
  "connections": [
    { "from": "inputTable", "to": "aggStage" } 
  ]
}

Some problems with this is that the plugin property "functions" is itself a json describing plugins to use.  This is not easy for somebody to configure, but maybe it could be simplified by a UI widget type.

 

Option 2: Keep the current structure of the config, with plugin type at the top level.

Code Block
{
  "sources": [
    {
      "name": "inputTable",
      "plugin": {
        "name": "Table",
        "type": "batchsource",  // new field
        "properties": {
        }
      }
    }
  ],
  "aggregations": [
    {
      "name": "aggStage",
      "groupBy": "id",
      "aggregations": [
        {
          "columnName": "totalPrice",
          "plugin": {
            "name": "sum",
            "properties": {
              "column": "price"
            }
          }
        },
        {
          "columnName": "numTransactions",
          "plugin": {
            "name": "count"
          }
        }
      ]
    }
  ],
  "connections": [
    { "from": "inputTable", "to": "aggStage" } 
  ]
}

This would allow us to have more structure in the config, but then we lose the generalization of plugin types.

 

Java APIs for plugin developers.  It is basically mapreduce, 'Aggregation' is probably a bad name for this.

Code Block
public abstract class Aggregation<INPUT_TYPE, GROUP_BY, RECORD_TYPE, OUTPUT_TYPE> {
 
  public abstract groupBy(INPUT_TYPE input, Emitter<KeyValue<GROUP_BY, RECORD_TYPE>> emitter);
 
  public abstract aggregate(GROUP_BY groupKey, Iterable<RECORD_TYPE> groupRecords, Emitter<OUTPUT_TYPE> emitter);
 
}

@Plugin(type = "aggregation")
@Name("recordGroupByAggregate")
public RecordAggregation extends Aggregation<StructuredRecord, StructuredRecord, StructuredRecord, StructuredRecord> {
  private static final AggConfig config;
  
  public static class AggConfig extends PluginConfig {
    private String groupBy;
 
    // ideally this would be Map<String, FunctionConfig> functions
    private String functions;
  }
 
  public void configurePipeline(PipelineConfigurer configurer) {
    Map<String, FunctionConfig> functions = gson.fromJson(config.functions, MAP_TYPE);
    for each function:
      usePlugin(id, type, name, properties);
  }
 
  public groupBy(StructuredRecord input, Emitter<KeyValue<StructuredRecord, StructuredRecord>> emitter) {
    // key = new record from input with only fields in config.groupBy
    // emitter.emit(new KeyValue<>(key, input));
  }
 
  public aggregate(StructuredRecord groupKey, Iterable<StructuredRecord> groupRecords, Emitter<StructuredRecord> emitter) {
    Map<String, FunctionConfig> functions = gson.fromJson(config.functions, MAP_TYPE);
    for each function:
      val = function.aggregate(groupRecords);
    for (StructuredRecord record : groupRecords) {
      function.update(record);
    }
    // build record from group key and function values
    for each function:
      val = function.aggregate();
    // emit record
  }
 
}
 
public abstract class AggregationFunction<RECORD_TYPE, OUTPUT_TYPE> {
 
  public abstract void update(RECORD_TYPE record);
  public abstract OUTPUT_TYPE aggregate();
}
 
@Plugin(type = "aggregationFunction")
@Name("sum")
public SumAggregation extends AggregationFunction<StructuredRecord, Number> {
  private final SumConfig config;
  private Number sum;
  
  public static class SumConfig extends PluginConfig {
    private String column;
  }
 
  public void update(StructuredRecord record) {
    // get type of config.column, initialize sum to right type based on that
    sum += record;
  }
 
  public Number aggregate() {
    return sum;
  }
}

...