Cask Hydrator++
Goals:
- Enable users to easily create and run workflows that are more complicated than the purely functional pipelines supported by Hydrator today. Users should be able to configure what they want a workflow to do without worrying about how it is done.
- Non-functional workflow nodes must be pluggable so that more can be added.
Checklist
- User stories documented (Albert/Alvin)
- User stories reviewed (Nitin)
- Design documented (Albert/Alvin)
- Design reviewed (Andreas/Terence)
- Feature merged (Albert/Alvin)
- Examples and guides (Albert/Alvin)
- Integration tests (Albert/Alvin)
- Documentation for feature (Albert/Alvin)
- Blog post
Use Cases:
Use Case 1: Spam Classifier
The developer wants to run a workflow that runs every 15 minutes. The workflow loads all emails (email id, sender, recipient, subject, content, time, etc) sent by his organization into a Table. The workflow also reads from another feed to update the Table and set an 'isSpam' column on emails that have been marked as spam by the recipient. Every day, he wants to run a spark job that will create a spam classifier model.
Use Case 2: Data Join and Modeling
The developer wants to load data from a 'customer' table that contains customer id, and various attributes such as age, gender, email, etc. The developer also wants to load data from a 'purchases' table that contains a customer id, item id, purchase time, and purchase price. The user then wants to join both tables on customer id, then run a collaborative filtering algorithm to generate a model that can be used to recommend shopping items to people.
Use Case 3: Aggregations
The developer to load webpage click and view data (customer id, timestamp, action, url) into a partitioned fileset. After loading the data, the developer wants to de-duplicate records and calculate how many times each customer clicked and viewed over the past hour, past day, and past month.
User Stories:
- (3.4) A developer should be able to create pipelines that contain aggregations (GROUP BY -> count/sum/unique)
- (3.5) A developer should be able to control some parts of the pipeline running before others. For example, one source -> sink branch running before another source -> sink branch.
- (3.4) A developer should be able to use a Spark ML job as a pipeline stage
- (3.4) A developer should be able to rerun failed pipeline runs without reconfiguring the pipeline
- (3.4) A developer should be able to de-duplicate records in a pipeline
- (3.5) A developer should be able to join multiple branches of a pipeline
- (3.5) A developer should be able to use an Explore action as a pipeline stage
- (3.5) A developer should be able to create pipelines that contain Spark Streaming jobs
- (3.5) A developer should be able to create pipelines that run based on various conditions, including input data availability and Kafka events
Design:
Story 1: Group By -> Aggregations
Option 1:
Introduce a new plugin type "aggregator". In general, to support more and more plugin types in a generic way, we want to refactor the config:
{ "stages": [ { "name": "inputTable", "plugin": { "name": "Table", "type": "batchsource", // new field "properties": { } } }, { "name": "aggStage", "plugin": { "name": "GroupByAggregate", "type": "aggregator", // new plugin type "properties": { "groupBy": "id", "functions": "[ { "columnName": "totalPrice", "plugin": { "name": "sum", "properties": { "column": "price" } } }, { "columnName": "numTransactions", "plugin": { "name": "count" } } ]" } } } ], "connections": [ { "from": "inputTable", "to": "aggStage" } ] }
Some problems with this is that the plugin property "functions" is itself a json describing plugins to use. This is not easy for somebody to configure, but maybe it could be simplified by a UI widget type.
Java APIs for plugin developers. It is basically mapreduce, 'Aggregation' is probably a bad name for this. Need to see if this fits into Spark. Would we have to remove the emitters?
public abstract class Aggregation<GROUP_BY, RECORD_TYPE, OUTPUT_TYPE> { public abstract void groupBy(RECORD_TYPE input, Emitter<GROUP_BY> emitter); public abstract void aggregate(GROUP_BY groupKey, Iterable<RECORD_TYPE> groupRecords, Emitter<OUTPUT_TYPE> emitter); } @Plugin(type = "aggregation") @Name("GroupByAggregate") public GroupByAggregate extends Aggregation<StructuredRecord, StructuredRecord, StructuredRecord> { private static final AggConfig config; public static class AggConfig extends PluginConfig { private String groupBy; // ideally this would be Map<String, FunctionConfig> functions private String functions; } public void configurePipeline(PipelineConfigurer configurer) { Map<String, FunctionConfig> functions = gson.fromJson(config.functions, MAP_TYPE); for each function: usePlugin(id, type, name, properties); } public groupBy(StructuredRecord input, Emitter<StructuredRecord> emitter) { // key = new record from input with only fields in config.groupBy Set<String> fields = config.groupBy.split(","); emitter.emit(recordSubset(input, fields)); } public void initialize() { Map<String, FunctionConfig> functions = gson.fromJson(config.functions, MAP_TYPE); for each function: val = function.aggregate(groupRecords); } public void aggregate(StructuredRecord groupKey, Iterable<StructuredRecord> groupRecords, Emitter<StructuredRecord> emitter) { // reset all functions for (StructuredRecord record : groupRecords) { foreach function: function.update(record); } // build record from group key and function values for each function: val = function.aggregate(); // emit record } } public abstract class AggregationFunction<RECORD_TYPE, OUTPUT_TYPE> { public abstract void reset(); public abstract void update(RECORD_TYPE record); public abstract OUTPUT_TYPE aggregate(); } @Plugin(type = "aggregationFunction") @Name("sum") public SumAggregation extends AggregationFunction<StructuredRecord, Number> { private final SumConfig config; private Number sum; public static class SumConfig extends PluginConfig { private String column; } public void update(StructuredRecord record) { // get type of config.column, initialize sum to right type based on that sum += (casted to correct thing) record.get(config.column); } public Number aggregate() { return sum; } }
Note: This would also satisfy user story 5, where a unique can be implemented as a Aggregation plugin, where you group by the fields you want to unique, and ignore the Iterable<> in aggregate and just emit the group key.
Story 2: Control Flow (Not Reviewed, WIP)
Option 1: Introduce different types of connections. One for data flow, one for control flow
{ "stages": [ { "name": "customersTable", "plugin": { "name": "Database", "type": "batchsource", ... } }, { "name": "customersFiles", "plugin": { "name": "TPFSParquet", "type": "batchsink", ... } }, { "name": "purchasesTable", "plugin": { "name": "Database", "type": "batchsource" } }, { "name": "purchasesFiles", "plugin": { "name": "TPFSParquet", "type": "batchsink", ... } }, ], "connections": [ { "from": "customersTable", "to": "customersFiles", "type": "data" }, { "from": "customersFiles", "to": "purchasesTable", "type": "control" }, { "from": "purchasesTable", "to": "purchasesFiles", "type": "data" } ] }
An alternative could be to introduce the concept of "phases", with connections between phases always control flow, and connections within phases as data flow
{ "phases": [ { "name": "phase1", "stages": [ { "name": "customersTable", "plugin": { "name": "Database", "type": "batchsource", ... } }, { "name": "customersFiles", "plugin": { "name": "TPFSParquet", "type": "batchsink", ... } ], "connections": [ { "from": "customersTable", "to": "customersFiles" } ] }, { "name": "phase2", "stages": [ { "name": "purchasesTable", "plugin": { "name": "Database", "type": "batchsource" } }, { "name": "purchasesFiles", "plugin": { "name": "TPFSParquet", "type": "batchsink", ... } } ], "connections": [ { "from": "purchasesTable", "to": "purchasesFiles" } ] } ] "connections": [ { "from": "phase1", "to": "phase2" } ] }
Option2: Make it so that connections into certain plugin types imply control flow rather than data flow. For example, introduce "condition" plugin type. Connections into a condition imply control flow rather than data flow. Similarly, connections into an "action" plugin type would imply control flow
{ "stages": [ { "name": "customersTable", "plugin": { "name": "Database", "type": "batchsource", ... } }, { "name": "customersFiles", "plugin": { "name": "TPFSParquet", "type": "batchsink", ... } }, { "name": "afterDump", "plugin": { "name": "AlwaysRun", "type": "condition" } }, { "name": "purchasesTable", "plugin": { "name": "Database", "type": "batchsource" } }, { "name": "purchasesFiles", "plugin": { "name": "TPFSParquet", "type": "batchsink", ... } }, ], "connections": [ { "from": "customersTable", "to": "customersFiles" }, { "from": "customersFiles", "to": "afterDump" }, { "from": "afterDump", "to": "purchasesTable" }, { "from": "purchasesTable", "to": "purchasesFiles" } ] }
You could also say that connections into a source imply control flow, or connections into an action imply control flow.
Story 3: Spark ML in a pipeline
Add a plugin type "sparksink" that is treated like a sink. When present, a spark program will be used to read data, transform it, then send all transformed results to the sparksink plugin.
{ "stages": [ { "name": "customersTable", "plugin": { "name": "Database", "type": "batchsource", ... } }, { "name": "categorizer", "plugin": { "name": "SVM", "type": "sparksink", ... } }, { "name": "models", "plugin": { "name": "Table", "type": "batchsink", ... } }, ], "connections": [ { "from": "customersTable", "to": "categorizer" } ] }
Story 6: Join (Not Reviewed, WIP)
Add a join plugin type. Different implementations could be inner join, left outer join, etc.
{ "stages": [ { "name": "customers", "plugin": { "name": "Table", "type": "batchsource", ... } }, { "name": "purchases", "plugin": { "name": "Table", "type": "batchsource", ... } }, { "name": "customerPurchaseJoin", "plugin": { "name": "inner", "type": "join", "properties": { "left": "customers.id", "right": "purchases.id", "rename": "customers.name:customername,purchases.name:itemname" } } }, ... ], "connections": [ { "from": "customers", "to": "customerPurchaseJoin" }, { "from": "purchases", "to": "customerPurchaseJoin" }, { "from": "customerPurchaseJoin", "to": "sink" }, ] }
Java API for join plugin type: these might just be built into the app. Otherwise the interface is basically MapReduce.
Created in 2020 by Google Inc.