Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  •  User stories documented (Albert/Alvin)
  •  User stories reviewed (Nitin)
  •  Design documented (Albert/Alvin)
  •  Design reviewed (Andreas/Terence)
  •  Feature merged (Albert/Alvin)
  •  Examples and guides (Albert/Alvin)
  •  Integration tests (Albert/Alvin) 
  •  Documentation for feature (Albert/Alvin)
  •  Blog post

...

The developer to load webpage click and view data (customer id, timestamp, action, url) into a partitioned fileset. After loading the data, the developer wants to de-duplicate records and calculate how many times each customer clicked and viewed over the past hour, past day, and past month. 

User Stories:

  1. (3.4) A developer should be able to create pipelines that contain aggregations (GROUP BY -> count/sum/unique)
  2. (3.5) A developer should be able to create a pipeline with multiple sources, with one happening after the otherA control some parts of the pipeline running before others. For example, one source -> sink branch running before another source -> sink branch.
  3. (3.4) A developer should be able to use a Spark ML job as a pipeline stage
  4. A (3.4) A developer should be able to rerun failed pipeline runs without reconfiguring the pipeline
  5. A (3.4) A developer should be able to de-duplicate records in a pipeline
  6. A developer (3.5) A developer should be able to join multiple branches of a pipeline
  7. A (3.5) A developer should be able to use an Explore action as a pipeline stage
  8. A (3.5) A developer should be able to create pipelines that contain Spark Streaming jobs
  9. A (3.5) A developer should be able to create pipelines that run based on various conditions, including input data availability and Kafka events

Design:

Story 1: Group By -> Aggregations

Option 1:

Introduce a new plugin type "aggregationaggregator".  In general, to support more and more plugin types in a generic way, we want to refactor the config:

Code Block
Option 1:
{
  "stages": [
    {
      "name": "inputTable",
      "plugin": {
        "name": "Table",
        "type": "batchsource",  // new field
        "properties": {
        }
      }
    },
    {
      "name": "aggStage",
      "plugin": {
        "name": "RecordAggregatorGroupByAggregate",
        "type": "aggregationaggregator",  // new plugin type
        "properties": {
          "groupBy": "id",
          "functions": "{[
            "totalPrice":{
{               "namecolumnName": "sumtotalPrice",
              "propertiesplugin": {
                "columnname": "pricesum",
              }  "properties": {
         },
            "numTransactionscolumn": {"price"
              "name": "count" }
           }   }
       }"     },
   }       }  {
  }   ],   "connections": [     { "fromcolumnName": "inputTablenumTransactions",
"to": "aggStage" }       ] }   Option 2: {   "sourcesplugin": [ {
         {       "name": "inputTablecount",
      "plugin": {         "name": "Table",}
         "type": "batchsource",  //}
new field         ]"properties": {
        }
      }
    }
  ],
  "aggregationsconnections": [
    {       "name"from": "inputTable", "to": "aggStage", 
     "groupBy": "id",
      "aggregations": [
        {
   ]
}

Some problems with this is that the plugin property "functions" is itself a json describing plugins to use.  This is not easy for somebody to configure, but maybe it could be simplified by a UI widget type.

 

Java APIs for plugin developers.  It is basically mapreduce, 'Aggregation' is probably a bad name for this.  Need to see if this fits into Spark.  Would we have to remove the emitters?

Code Block
public abstract class Aggregation<GROUP_BY, RECORD_TYPE, OUTPUT_TYPE> {
 
  public  "columnName": "totalPrice",
 abstract void groupBy(RECORD_TYPE input, Emitter<GROUP_BY> emitter);
 
  public abstract void  "plugin": {
  aggregate(GROUP_BY groupKey, Iterable<RECORD_TYPE> groupRecords, Emitter<OUTPUT_TYPE> emitter);
 
}

@Plugin(type =    "nameaggregation":)
@Name("sumGroupByAggregate",)
public GroupByAggregate extends Aggregation<StructuredRecord, StructuredRecord, StructuredRecord> {
  private static final "properties": {AggConfig config;
  
  public static class AggConfig extends PluginConfig {
  "column": "price" private String groupBy;
 
    // ideally this }would be Map<String, FunctionConfig> functions
    private String }functions;
        },
 
  public void configurePipeline(PipelineConfigurer configurer) {
    Map<String, FunctionConfig> functions    "columnName": "numTransactions",
= gson.fromJson(config.functions, MAP_TYPE);
    for each function:
  "plugin": {   usePlugin(id, type, name, properties);
  }
 
 "name": "count"
     public groupBy(StructuredRecord input, Emitter<StructuredRecord> emitter) {
    // }key = new record from input with only fields }in config.groupBy
    Set<String> ]fields = config.groupBy.split(",");
  }   ]emitter.emit(recordSubset(input, fields));
 "connections": [}
 
  public { "from": "inputTable", "to": "aggStage" } 
  ]
}
 
public abstract class Aggregation<INPUT_TYPE, GROUP_BY, RECORD_TYPE, OUTPUT_TYPE> {
 
  public abstract groupBy(INPUT_TYPE input, Emitter<KeyValue<GROUP_BY, RECORD_TYPE>> emitter);
 
  public abstract aggregate(GROUP_BY groupKey, Iterable<RECORD_TYPE> groupRecords, Emitter<OUTPUT_TYPE> emitter);
 
}

@Plugin(type = "aggregation")
@Name("record")
public RecordAggregation extends Aggregation<StructuredRecord, StructuredRecord, StructuredRecord, StructuredRecord> {
  private static final AggConfig config;
  
  public static class AggConfig extends PluginConfig {
    private String groupBy;
 
    // ideally this would be Map<String, FunctionConfig> functions
    private String functions;
  }
 
  public void configurePipeline(PipelineConfigurer configurer) {
    Map<String, FunctionConfig> functions = gson.fromJson(config.functions, MAP_TYPE);
    for each function:
      usePlugin(id, type, name, properties);
  }
 
  public groupBy(StructuredRecord input, Emitter<KeyValue<StructuredRecord, StructuredRecord>> emitter) {
    // key = new record from input with only fields in config.groupBy
    // emitter.emit(new KeyValue<>(key, input))void initialize() {
    Map<String, FunctionConfig> functions = gson.fromJson(config.functions, MAP_TYPE);
    for each function:
      val = function.aggregate(groupRecords);
  }
 
  public void aggregate(StructuredRecord groupKey, Iterable<StructuredRecord> groupRecords, Emitter<StructuredRecord> emitter) {
    // reset all functions
    for (StructuredRecord record : groupRecords) {
      foreach function:
        function.update(record);
    }
    // build record from group key and function values
    for each function:
      val = function.aggregate();
    // emit record
  }
 
}
 
public abstract class AggregationFunction<RECORD_TYPE, OUTPUT_TYPE> {
  public abstract void reset();
  public abstract void update(RECORD_TYPE record);
  public abstract OUTPUT_TYPE aggregate();
}
 
@Plugin(type = "aggregationFunction")
@Name("sum")
public SumAggregation extends AggregationFunction<StructuredRecord, Number> {
  private final SumConfig config;
  private Number sum;
  
  public static class SumConfig extends PluginConfig {
    private String column;
  }
 
  public aggregatevoid update(StructuredRecord groupKey, Iterable<StructuredRecord> groupRecords, Emitter<StructuredRecord> emitterrecord) {
    Map<String,// FunctionConfig>get functionstype =of gson.fromJson(config.functionscolumn, MAP_TYPE);initialize sum to right type forbased eachon function:that
    sum  val += function.aggregate(groupRecords);
    for (StructuredRecord record : groupRecords) {
      function.update(record);
 (casted to correct thing) record.get(config.column);
  }
 
  //public buildNumber recordaggregate() from{
group key and function valuesreturn sum;
   for each function:
      val = function.aggregate();
    // emit record
  }
 
}
 
public abstract class AggregationFunction<RECORD_TYPE, OUTPUT_TYPE> {
 
  public abstract void update(RECORD_TYPE record);
  public abstract OUTPUT_TYPE aggregate();
}
 
@Plugin(type = "aggregationFunction")
@Name("sum")
public SumAggregation extends AggregationFunction<StructuredRecord, Number> {
  private final SumConfig config;
  private Number sum;
  
  public static class SumConfig extends PluginConfig {
    private String column;
  }
 
  public void update(StructuredRecord record)}
}

Note: This would also satisfy user story 5, where a unique can be implemented as a Aggregation plugin, where you group by the fields you want to unique, and ignore the Iterable<> in aggregate and just emit the group key.

Story 2: Control Flow (Not Reviewed, WIP)

Option 1: Introduce different types of connections. One for data flow, one for control flow

Code Block
{
  "stages": [
    {
      "name": "customersTable",
      "plugin": {
        "name": "Database",
        "type": "batchsource", ...
      }
    },    
    {
      "name": "customersFiles",
      "plugin": {
    //  get type of config.column"name": "TPFSParquet",
initialize sum to right type based on that "type": "batchsink", ...
 sum += record;   }
    public Number},
aggregate()    {
      "name": "purchasesTable",
  return sum;    "plugin": {
      } }
 

Note: This would also satisfy user story 5, where a unique can be implemented as a Aggregation plugin, where you group by the fields you want to unique, and ignore the Iterable<> in aggregate and just emit the group key.

Story 2: Multiple Sources

Option 1: Introduce different types of connections. One for data flow, one for control flow

Code Block
{
  "stages": [
    {
      " "name": "Database",
        "type": "batchsource"
      }
    },
    {
      "name": "customersTablepurchasesFiles",
      "plugin": {
        "name": "DatabaseTPFSParquet",
        "type": "batchsourcebatchsink", ...
      }
    },
  ],
  "connections": [
 {   {   "from": "customersTable", "nameto": "customersFiles",       "plugin"type": {"data" },
    {   "namefrom": "TPFSParquetcustomersFiles",         "type"to": "batchsinkpurchasesTable", ...
      }
    }"type": "control" },
    {       "name"from": "purchasesTable",
      "pluginto": {
        "namepurchasesFiles": "Database",         "type": "batchsourcedata" }
     }
    },]
}

An alternative could be to introduce the concept of "phases", with connections between phases always control flow, and connections within phases as data flow

Code Block
{
  "phases": [
    {
      "name": "purchasesFilesphase1",
      "pluginstages": [
        {
          "name": "TPFSParquetcustomersTable",
          "typeplugin": "batchsink", ...{
       }     }"name": "Database",
  ],   "connections": [     { "fromtype": "customersTablebatchsource", "to": "customersFiles", "type": "data" },...
          }
 { "from": "customersFiles", "to": "purchasesTable", "type": "control" },   
        {
   "from       "name": "purchasesTablecustomersFiles",
"to": "purchasesFiles", "type          "plugin": {
            "name": "dataTPFSParquet" },
           ]
}

An alternative could be to introduce the concept of "phases", with connections between phases always control flow, and connections within phases as data flow

Code Block
{
  "phases": [ "type": "batchsink", ...
       { }
     "name": "phase1" ],
      "stagesconnections": [
        { "from": "customersTable", "to": "customersFiles" }
     "name": "customersTable", ]
     },
    "plugin": {
 
          "name": "Databasephase2",
            "type"stages": "batchsource", ...
    [
     }         },   
        {
          "name": "customersFilespurchasesTable",
          "plugin": {
            "name": "TPFSParquetDatabase",
            "type": "batchsinkbatchsource",
 ...         }
        ]},
      "connections": [ {
       { "from": "customersTable", "toname": "customersFilespurchasesFiles",
}       ]   "plugin": {
},     {       "name": "phase2TPFSParquet",
      "stages": [     "type": "batchsink", ...
 {         }
 "name": "purchasesTable",      }
    "plugin": {    ],
        "nameconnections": "Database",[
        {   "from": "purchasesTable", "typeto": "batchsourcepurchasesFiles" }
      ]
  }  }
  ]
   },"connections": [
        {           "name"from": "purchasesFilesphase1", "to": "phase2" }
    ]
}

 

Option2: Make it so that connections into certain plugin types imply control flow rather than data flow.  For example, introduce "condition" plugin type.  Connections into a condition imply control flow rather than data flow.  Similarly, connections into an "action" plugin type would imply control flow

Code Block
{
  "pluginstages": {[
    {
       "name": "TPFSParquetcustomersTable",
      "plugin": {
        "name": "Database",
        "type": "batchsinkbatchsource", ...
      }
    },    
    {
}       ]"name": "customersFiles",
      "connectionsplugin": [
{
       { "fromname": "purchasesTableTPFSParquet",
"to": "purchasesFiles" }       ]"type": "batchsink", ...
      }
    },
  ]  {
      "connectionsname": ["afterDump",
      "plugin": {
        "fromname": "phase1AlwaysRun",
 "to       "type": "phase2condition"
  }   ]
}

 

Option2: Make it so that connections into certain plugin types imply control flow rather than data flow.  For example, introduce "condition" plugin type.  Connections into a condition imply control flow rather than data flow.

Code Block
{
  "stages": [ }
    },
     {
      "name": "customersTablepurchasesTable",
      "plugin": {
        "name": "Database",
        "type": "batchsource",
...       }
    },
   
    {
      "name": "customersFilespurchasesFiles",
      "plugin": {
        "name": "TPFSParquet",
        "type": "batchsink", ...
      }
    },
  ],
 { "connections": [
    { "namefrom": "afterDumpcustomersTable", "to": "customersFiles" },
    { "pluginfrom": {
  "customersFiles", "to": "afterDump" },
    { "namefrom": "AlwaysRunafterDump", "to": "purchasesTable" },
    { "typefrom": "conditionpurchasesTable", "to": "purchasesFiles" }
   }
    },
    {
      "name": "purchasesTable",
      "]
}

You could also say that connections into a source imply control flow, or connections into an action imply control flow.

Story 3: Spark ML in a pipeline

Add a plugin type "sparksink" that is treated like a sink.  When present, a spark program will be used to read data, transform it, then send all transformed results to the sparksink plugin.

Code Block
{
  "stages": [
    {
      "name": "customersTable",
      "plugin": {
        "name": "Database",
        "type": "batchsource", ...
      }
    },    
    {
      "name": "purchasesFilescategorizer",
      "plugin": {
        "name": "TPFSParquetSVM",
        "type": "batchsinksparksink", ...
      }
    },
  ],   "connections":{
[     { "fromname": "customersTablemodels",
"to      "plugin": {
"customersFiles"  },     { "fromname": "customersFilesTable",
"to": "afterDump" },     { "fromtype": "afterDumpbatchsink", "to": "purchasesTable" },...
      {}
"from": "purchasesTable    },
  ],
  "connections": [
    { "from": "customersTable", "to": "purchasesFilescategorizer" }
  ]
}

You could also say that connections into a source imply control flow, or connections into an action imply control flow.

Story 2: Spark ML in a pipeline

Add a plugin type "sparkML" that is treated like a transform.  But instead of being a stage inside a mapper, it is a program in a workflow.  The application will create a transient dataset to act as the input into the program, or an explicit source can be given.

...

Story 6: Join (Not Reviewed, WIP)

Add a join plugin type.  Different implementations could be inner join, left outer join, etc.

Code Block
{
  "stages": [
    {
      "name": "customers",
      "plugin": {
        "name": "Table",
        "type": "batchsource", ...
      }
    },
    {
      "name": "customersTablepurchases",
      "plugin": {
        "name": "DatabaseTable",
        "type": "batchsource", ...
      }
    },    
    {
      "name": "categorizercustomerPurchaseJoin",
      "plugin": {
        "name": "SVMinner",
        "type": "sparkMLjoin", ...
      }  "properties": {
 },     {       "nameleft": "modelscustomers.id",
          "pluginright": { "purchases.id",
          "namerename": "Table",customers.name:customername,purchases.name:itemname"
        "type": "batchsink", ...}
      }
    },
    ...
  ],
  "connections": [
    { "from": "customers", "to": "customerPurchaseJoin" },
    { "from": "customersTablepurchases", "to": "categorizercustomerPurchaseJoin" },
    { "from": "categorizercustomerPurchaseJoin", "to": "modelssink" },
  ]
}

 Java API for join plugin type: these might just be built into the app.  Otherwise the interface is basically MapReduce.