...
- User stories documented (Albert/Alvin)
- User stories reviewed (Nitin)
- Design documented (Albert/Alvin)
- Design reviewed (Andreas/Terence)
- Feature merged (Albert/Alvin)
- Examples and guides (Albert/Alvin)
- Integration tests (Albert/Alvin)
- Documentation for feature (Albert/Alvin)
- Blog post
...
The developer to load webpage click and view data (customer id, timestamp, action, url) into a partitioned fileset. After loading the data, the developer wants to de-duplicate records and calculate how many times each customer clicked and viewed over the past hour, past day, and past month.
User Stories:
- (3.4) A developer should be able to create pipelines that contain aggregations (GROUP BY -> count/sum/unique)
- (3.5) A developer should be able to create a pipeline with multiple sources, with one happening after the otherA control some parts of the pipeline running before others. For example, one source -> sink branch running before another source -> sink branch.
- (3.4) A developer should be able to use a Spark ML job as a pipeline stage
- A (3.4) A developer should be able to rerun failed pipeline runs without reconfiguring the pipeline
- A (3.4) A developer should be able to de-duplicate records in a pipeline
- A developer (3.5) A developer should be able to join multiple branches of a pipeline
- A (3.5) A developer should be able to use an Explore action as a pipeline stage
- A (3.5) A developer should be able to create pipelines that contain Spark Streaming jobs
- A (3.5) A developer should be able to create pipelines that run based on various conditions, including input data availability and Kafka events
Design:
Story 1: Group By -> Aggregations
Option 1:
Introduce a new plugin type "aggregationaggregator". In general, to support more and more plugin types in a generic way, we want to refactor the config:
Code Block |
---|
Option 1: { "stages": [ { "name": "inputTable", "plugin": { "name": "Table", "type": "batchsource", // new field "properties": { } } }, { "name": "aggStage", "plugin": { "name": "RecordAggregatorGroupByAggregate", "type": "aggregationaggregator", // new plugin type "properties": { "groupBy": "id", "functions": "{[ "totalPrice":{ { "namecolumnName": "sumtotalPrice", "propertiesplugin": { "columnname": "pricesum", } "properties": { }, "numTransactionscolumn": {"price" "name": "count" } } } }" }, } } { } ], "connections": [ { "fromcolumnName": "inputTablenumTransactions", "to": "aggStage" } ] } Option 2: { "sourcesplugin": [ { { "name": "inputTablecount", "plugin": { "name": "Table",} "type": "batchsource", //} new field ]"properties": { } } } ], "aggregationsconnections": [ { "name"from": "inputTable", "to": "aggStage", } "groupBy": "id", "aggregations": [ { ] } |
Some problems with this is that the plugin property "functions" is itself a json describing plugins to use. This is not easy for somebody to configure, but maybe it could be simplified by a UI widget type.
Java APIs for plugin developers. It is basically mapreduce, 'Aggregation' is probably a bad name for this. Need to see if this fits into Spark. Would we have to remove the emitters?
Code Block |
---|
public abstract class Aggregation<GROUP_BY, RECORD_TYPE, OUTPUT_TYPE> { public "columnName": "totalPrice", abstract void groupBy(RECORD_TYPE input, Emitter<GROUP_BY> emitter); public abstract void "plugin": { aggregate(GROUP_BY groupKey, Iterable<RECORD_TYPE> groupRecords, Emitter<OUTPUT_TYPE> emitter); } @Plugin(type = "nameaggregation":) @Name("sumGroupByAggregate",) public GroupByAggregate extends Aggregation<StructuredRecord, StructuredRecord, StructuredRecord> { private static final "properties": {AggConfig config; public static class AggConfig extends PluginConfig { "column": "price" private String groupBy; // ideally this }would be Map<String, FunctionConfig> functions private String }functions; }, public void configurePipeline(PipelineConfigurer configurer) { Map<String, FunctionConfig> functions "columnName": "numTransactions", = gson.fromJson(config.functions, MAP_TYPE); for each function: "plugin": { usePlugin(id, type, name, properties); } "name": "count" public groupBy(StructuredRecord input, Emitter<StructuredRecord> emitter) { // }key = new record from input with only fields }in config.groupBy Set<String> ]fields = config.groupBy.split(","); } ]emitter.emit(recordSubset(input, fields)); "connections": [} public { "from": "inputTable", "to": "aggStage" } ] } public abstract class Aggregation<INPUT_TYPE, GROUP_BY, RECORD_TYPE, OUTPUT_TYPE> { public abstract groupBy(INPUT_TYPE input, Emitter<KeyValue<GROUP_BY, RECORD_TYPE>> emitter); public abstract aggregate(GROUP_BY groupKey, Iterable<RECORD_TYPE> groupRecords, Emitter<OUTPUT_TYPE> emitter); } @Plugin(type = "aggregation") @Name("record") public RecordAggregation extends Aggregation<StructuredRecord, StructuredRecord, StructuredRecord, StructuredRecord> { private static final AggConfig config; public static class AggConfig extends PluginConfig { private String groupBy; // ideally this would be Map<String, FunctionConfig> functions private String functions; } public void configurePipeline(PipelineConfigurer configurer) { Map<String, FunctionConfig> functions = gson.fromJson(config.functions, MAP_TYPE); for each function: usePlugin(id, type, name, properties); } public groupBy(StructuredRecord input, Emitter<KeyValue<StructuredRecord, StructuredRecord>> emitter) { // key = new record from input with only fields in config.groupBy // emitter.emit(new KeyValue<>(key, input))void initialize() { Map<String, FunctionConfig> functions = gson.fromJson(config.functions, MAP_TYPE); for each function: val = function.aggregate(groupRecords); } public void aggregate(StructuredRecord groupKey, Iterable<StructuredRecord> groupRecords, Emitter<StructuredRecord> emitter) { // reset all functions for (StructuredRecord record : groupRecords) { foreach function: function.update(record); } // build record from group key and function values for each function: val = function.aggregate(); // emit record } } public abstract class AggregationFunction<RECORD_TYPE, OUTPUT_TYPE> { public abstract void reset(); public abstract void update(RECORD_TYPE record); public abstract OUTPUT_TYPE aggregate(); } @Plugin(type = "aggregationFunction") @Name("sum") public SumAggregation extends AggregationFunction<StructuredRecord, Number> { private final SumConfig config; private Number sum; public static class SumConfig extends PluginConfig { private String column; } public aggregatevoid update(StructuredRecord groupKey, Iterable<StructuredRecord> groupRecords, Emitter<StructuredRecord> emitterrecord) { Map<String,// FunctionConfig>get functionstype =of gson.fromJson(config.functionscolumn, MAP_TYPE);initialize sum to right type forbased eachon function:that sum val += function.aggregate(groupRecords); for (StructuredRecord record : groupRecords) { function.update(record); (casted to correct thing) record.get(config.column); } //public buildNumber recordaggregate() from{ group key and function valuesreturn sum; for each function: val = function.aggregate(); // emit record } } public abstract class AggregationFunction<RECORD_TYPE, OUTPUT_TYPE> { public abstract void update(RECORD_TYPE record); public abstract OUTPUT_TYPE aggregate(); } @Plugin(type = "aggregationFunction") @Name("sum") public SumAggregation extends AggregationFunction<StructuredRecord, Number> { private final SumConfig config; private Number sum; public static class SumConfig extends PluginConfig { private String column; } public void update(StructuredRecord record)} } |
Note: This would also satisfy user story 5, where a unique can be implemented as a Aggregation plugin, where you group by the fields you want to unique, and ignore the Iterable<> in aggregate and just emit the group key.
Story 2: Control Flow (Not Reviewed, WIP)
Option 1: Introduce different types of connections. One for data flow, one for control flow
Code Block |
---|
{ "stages": [ { "name": "customersTable", "plugin": { "name": "Database", "type": "batchsource", ... } }, { "name": "customersFiles", "plugin": { // get type of config.column"name": "TPFSParquet", initialize sum to right type based on that "type": "batchsink", ... sum += record; } public Number}, aggregate() { return sum; } } |
Story 2: Multiple Sources
Option 1: Introduce different types of connections. One for data flow, one for control flow
Code Block |
---|
{ "stages": [ { "name": "customersTablepurchasesTable", "plugin": { "name": "Database", "type": "batchsource", ... } }, { { ""name": "customersFilespurchasesFiles", "plugin": { "name": "TPFSParquet", "type": "batchsink", ... } }, ], {"connections": [ { "namefrom": "purchasesTablecustomersTable", "to": "customersFiles", "plugintype": {"data" }, { "from": "customersFiles", "nameto": "DatabasepurchasesTable", "type": "control" }, { "typefrom": "batchsourcepurchasesTable", "to": "purchasesFiles", "type": "data" } ] } |
An alternative could be to introduce the concept of "phases", with connections between phases always control flow, and connections within phases as data flow
Code Block |
---|
{ "phases": [ }, { "name": "purchasesFilesphase1", "pluginstages": {[ "name": "TPFSParquet",{ "typename": "batchsinkcustomersTable", ... } "plugin": { }, ], "connections": [ { "fromname": "customersTableDatabase", "to": "customersFiles", "type": "data" }, { "from": "customersFiles", "totype": "purchasesTablebatchsource", "type": "control"... } }, { "fromname": "purchasesTablecustomersFiles", "to "plugin": "purchasesFiles", "type": "data" }{ ] } |
An alternative could be to introduce the concept of "phases", with connections between phases always control flow, and connections within phases as data flow
Code Block |
---|
{ "phasesname": ["TPFSParquet", { "nametype": "phase1batchsink", ... "stages": [ } {], "connections": [ "name": "customersTable", { "from": "plugin"customersTable", "to": {"customersFiles" } ] "name": "Database"}, { "typename": "batchsourcephase2", ... "stages": [ } }, { "name": "customersFilespurchasesTable", "plugin": { "name": "TPFSParquetDatabase", "type": "batchsinkbatchsource", ... } ]}, "connections": [ { { "from": "customersTable", "toname": "customersFilespurchasesFiles", } ] "plugin": { }, { "name": "phase2TPFSParquet", "stages": [ "type": "batchsink", ... { } "name": "purchasesTable", } "plugin": { ], "nameconnections": "Database",[ { "from": "purchasesTable", "typeto": "batchsourcepurchasesFiles" } ] } } ] },"connections": [ { "name"from": "purchasesFilesphase1", "to": "phase2" } ] } |
Option2: Make it so that connections into certain plugin types imply control flow rather than data flow. For example, introduce "condition" plugin type. Connections into a condition imply control flow rather than data flow. Similarly, connections into an "action" plugin type would imply control flow
Code Block |
---|
{ "pluginstages": {[ { "name": "TPFSParquetcustomersTable", "plugin": { "name": "Database", "type": "batchsinkbatchsource", ... } }, { } ]"name": "customersFiles", "connectionsplugin": [ { { "fromname": "purchasesTableTPFSParquet", "to": "purchasesFiles" } ]"type": "batchsink", ... } }, ] { "connectionsname": ["afterDump", "plugin": { "fromname": "phase1AlwaysRun", "to "type": "phase2condition" } ] } |
Option2: Make it so that connections into certain plugin types imply control flow rather than data flow. For example, introduce "condition" plugin type. Connections into a condition imply control flow rather than data flow.
Code Block |
---|
{ "stages": [ } }, { "name": "customersTablepurchasesTable", "plugin": { "name": "Database", "type": "batchsource", ... } }, { "name": "customersFilespurchasesFiles", "plugin": { "name": "TPFSParquet", "type": "batchsink", ... } }, ], { "connections": [ { "namefrom": "afterDumpcustomersTable", "to": "customersFiles" }, { "pluginfrom": { "customersFiles", "to": "afterDump" }, { "namefrom": "AlwaysRunafterDump", "to": "purchasesTable" }, { "typefrom": "conditionpurchasesTable", "to": "purchasesFiles" } } }, { "name": "purchasesTable", "] } |
You could also say that connections into a source imply control flow, or connections into an action imply control flow.
Story 3: Spark ML in a pipeline
Add a plugin type "sparksink" that is treated like a sink. When present, a spark program will be used to read data, transform it, then send all transformed results to the sparksink plugin.
Code Block |
---|
{ "stages": [ { "name": "customersTable", "plugin": { "name": "Database", "type": "batchsource", ... } }, { "name": "purchasesFilescategorizer", "plugin": { "name": "TPFSParquetSVM", "type": "batchsinksparksink", ... } }, ], "connections":{ [ { "fromname": "customersTablemodels", "to "plugin": { "customersFiles" }, { "fromname": "customersFilesTable", "to": "afterDump" }, { "fromtype": "afterDumpbatchsink", "to": "purchasesTable" },... {} "from": "purchasesTable }, ], "connections": [ { "from": "customersTable", "to": "purchasesFilescategorizer" } ] } |
You could also say that connections into a source imply control flow, or connections into an action imply control flow.
Story 2: Spark ML in a pipeline
Add a plugin type "sparkML" that is treated like a transform. But instead of being a stage inside a mapper, it is a program in a workflow. The application will create a transient dataset to act as the input into the program, or an explicit source can be given.
...
Story 6: Join (Not Reviewed, WIP)
Add a join plugin type. Different implementations could be inner join, left outer join, etc.
Code Block |
---|
{ "stages": [ { "name": "customers", "plugin": { "name": "Table", "type": "batchsource", ... } }, { "name": "customersTablepurchases", "plugin": { "name": "DatabaseTable", "type": "batchsource", ... } }, { "name": "categorizercustomerPurchaseJoin", "plugin": { "name": "SVMinner", "type": "sparkMLjoin", ... } "properties": { }, { "nameleft": "modelscustomers.id", "pluginright": { "purchases.id", "namerename": "Table",customers.name:customername,purchases.name:itemname" "type": "batchsink", ...} } }, ... ], "connections": [ { "from": "customers", "to": "customerPurchaseJoin" }, { "from": "customersTablepurchases", "to": "categorizercustomerPurchaseJoin" }, { "from": "categorizercustomerPurchaseJoin", "to": "modelssink" }, ] } |
Java API for join plugin type: these might just be built into the app. Otherwise the interface is basically MapReduce.