Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. A developer should be able to create pipelines that contain aggregations (GROUP BY -> count/sum/unique)
  2. A developer should be able to create a pipeline with multiple sources, with one happening after the other
  3. A developer should be able to use a Spark ML job as a pipeline stage
  4. A developer should be able to rerun failed pipeline runs without reconfiguring the pipeline
  5. A developer should be able to de-duplicate records in a pipeline
  6. A developer should be able to join multiple branches of a pipeline
  7. A developer should be able to use an Explore action as a pipeline stage
  8. A developer should be able to create pipelines that contain Spark Streaming jobs
  9. A developer should be able to create pipelines that run based on various conditions, including input data availability and Kafka events

Design:

Story 1: Group By -> Aggregations

Option 1:

Introduce a new plugin type "aggregation".  In general, to support more and more plugin types in a generic way, we want to refactor the config:

Code Block
Option 1:
{
  "stages": [
    {
      "name": "inputTable",
      "plugin": {
        "name": "Table",
        "type": "batchsource",  // new field
        "properties": {
        }
      }
    },
    {
      "name": "aggStage",
      "plugin": {
        "name": "RecordAggregator",
        "type": "aggregation",  // new plugin type
        "properties": {
          "groupBy": "id",
          "functions": "{
            "totalPrice": {
              "name": "sum",
              "properties": {
                "column": "price"
              }
            },
            "numTransactions": {
              "name": "count"
            }
          }"
        }
      }
    }
  ],
  "connections": [
    { "from": "inputTable", "to": "aggStage" } 
  ]
}

 

...

Option

...

1: Keep the current structure of the config, with plugin type at the top level.

Code Block
{
  "sources": [
    {
      "name": "inputTable",
      "plugin": {
        "name": "Table",
        "type": "batchsource",  // new field
        "properties": {
        }
      }
    }
  ],
  "aggregations": [
    {
      "name": "aggStage",
      "groupBy": "id",
      "aggregations": [
        {
          "columnName": "totalPrice",
          "plugin": {
            "name": "sum",
            "properties": {
              "column": "price"
            }
          }
        },
        {
          "columnName": "numTransactions",
          "plugin": {
            "name": "count"
          }
        }
      ]
    }
  ],
  "connections": [
    { "from": "inputTable", "to": "aggStage" } 
  ]
}
 

 

Java APIs for plugin developers

Code Block
public abstract class Aggregation<INPUT_TYPE, GROUP_BY, RECORD_TYPE, OUTPUT_TYPE> {
  
  public abstract groupBy(INPUT_TYPE input, Emitter<KeyValue<GROUP_BY, RECORD_TYPE>> emitter);
  
  public abstract aggregate(GROUP_BY groupKey, Iterable<RECORD_TYPE> groupRecords, Emitter<OUTPUT_TYPE> emitter);
  
}

@Plugin(type = "aggregation")
@Name("record")
public RecordAggregation extends Aggregation<StructuredRecord, StructuredRecord, StructuredRecord, StructuredRecord> {
  private static final AggConfig config;
  
  public static class AggConfig extends PluginConfig {
    private String groupBy;
  
    // ideally this would be Map<String, FunctionConfig> functions
    private String functions;
  }
  
  public void configurePipeline(PipelineConfigurer configurer) {
    Map<String, FunctionConfig> functions = gson.fromJson(config.functions, MAP_TYPE);
    for each function:
      usePlugin(id, type, name, properties);
  }
 
  public groupBy(StructuredRecord input, Emitter<KeyValue<StructuredRecord, StructuredRecord>> emitter) {
    // key = new record from input with only fields in config.groupBy
    // emitter.emit(new KeyValue<>(key, input));
  }
 
  public aggregate(StructuredRecord groupKey, Iterable<StructuredRecord> groupRecords, Emitter<StructuredRecord> emitter) {
    Map<String, FunctionConfig> functions = gson.fromJson(config.functions, MAP_TYPE);
    for each function:
      val = function.aggregate(groupRecords);
    for (StructuredRecord record : groupRecords) {
      function.update(record);
    }
    // build record from group key and function values
    for each function:
      val = function.aggregate();
    // emit record
  }
 
}
  
public abstract class AggregationFunction<RECORD_TYPE, OUTPUT_TYPE> {
  
  public abstract void update(RECORD_TYPE record);
  public abstract OUTPUT_TYPE aggregate();
}
  
@Plugin(type = "aggregationFunction")
@Name("sum")
public SumAggregation extends AggregationFunction<StructuredRecord, Number> {
  private final SumConfig config;
  private Number sum;
  
  public static class SumConfig extends PluginConfig {
    private String column;
  }
  
  public void update(StructuredRecord record) {
    // get type of config.column, initialize sum to right type based on that
    sum += record;
  }
  
  public Number aggregate() {
    return sum;
  }
}
 

Note: This would also satisfy user story 5, where a unique can be implemented as a Aggregation plugin, where you group by the fields you want to unique, and ignore the Iterable<> in aggregate and just emit the group key.

...