Checklist
- User Stories Documented
- User Stories Reviewed
- Design Reviewed
- APIs reviewed
- Release priorities assigned
- Test cases reviewed
- Blog post
Introduction
There have been multiple requests to support sending records to different outputs depending on some condition. Other similar pipelining solutions have this capability.
Goals
The goal is to support pipeline use cases that require sending some records to one output stage and other records to another output stage.
Use Cases
A pipeline is processing records that contain a union field. The field is a union of 5 possible records. The pipeline developer wants to insert a stage that will have five different output ports, one for each possible schema in the union.
A pipeline is processing records that contain a nullable 'email' field. Most records contain an email, but some do not. The pipeline developer wants to insert a stage that sends all records with empty emails on to a join stage to populate the email field.
User Stories
- As a pipeline developer, I want to insert a stage that sends records to different output ports
- As a pipeline developer, I want to know how many output ports a stage supports
- As a pipeline developer, I want to see the schema for each output port
- As a plugin developer, I want to specify which records are sent to which output port
- As a plugin developer, I want to be able to specify different output ports depending on input schema and configuration
- As a plugin developer, I want to be able to modify records before they are sent to an output port
Design
The pipeline config will need to be enhanced. When a splitter is involved, connections from a splitter to other stages must also specify which splitter output is involved:
{ "stages": [ { "name": "A", ... }, { "name": "splitter", ... }, { "name": "B", ... }, { "name": "C", ... } ], "connections": [ { "from": "A", "to": "splitter" }, { "from": "splitter", "output": "0", "to": "B"}, { "from": "splitter", "output": "1", "to": "C"} ] }
We will introduce a new 'splitter' plugin type that can send records to multiple outputs. Each output can be configured with its own schema. The plugin interface will be:
public abstract class Splitter<T> implements StageLifecycle<SplitterContext>, MultiOutputPipelineConfigurable { public static final String PLUGIN_TYPE = "splitter"; /** * Configure the pipeline. This is run once when the pipeline is being published. * This is where you perform any static logic, like creating required datasets, performing schema validation, * setting output schema, and things of that nature. * * @param configurer the configurer used to add required datasets and streams */ @Override public void configurePipeline(MultiOutputPipelineConfigurer configurer) { // no-op } /** * Initialize the splitter. */ @Override public void initialize(SplitterContext context) throws Exception { //no-op } /** * Destroy the splitter. */ @Override public void destroy() { //no-op } public abstract void transform(T input, MultiOutputEmitter<T> emitter); } public interface MultiOutputEmitter<T> { public emit(String output, Object record); public emitError(InvalidEntry<T> invalidEntry); } public interface MultiOutputPipelineConfigurable { void configurePipeline(MultiOutputPipelineConfigurer multiOutputPipelineConfigurer); } public interface MultiOutputPipelineConfigurer extends PipelineConfigurer { MultiOutputStageConfigurer getMultiOutputStageConfigurer(); } public interface MultiOutputStageConfigurer { Schema getInputSchema(); void setOutputSchemas(Map<String, Schema> outputSchemas); }
For example, a plugin that splits based on a union field can be implemented like so:
public class UnionFieldSplitter extends Splitter<StructuredRecord> { private Conf conf; public void configurePipeline(MultiOutputPipelineConfigurer configurer) { MultiOutputStageConfigurer stageConfigurer = configurer.getMultiOutputStageConfigurer(); Schema inputSchema = stageConfigurer.getInputSchema(); if (inputSchema != null) { stageConfigurer.setOutputs(getOutputs(inputSchema)); } } @Override public void transform(StructuredRecord input, MultiOutputEmitter<StructuredRecord> emitter) { Schema.Field splitField = input.get(conf.field); if (splitField == null) { emitter.emitError(new InvalidEntry<>(100, "Field " + conf.field + " does not exist in input schema.", input)); } if (splitField.getType() != Schema.Type.UNION) { emitter.emitError(new InvalidEntry<>(101, "Field " + conf.field + " is not of type union, but is of type " + splitField.getType())); } StructuredRecord val = input.get(conf.field); List<Schema.Field> fields = new ArrayList<>(input.getSchema().getFields().size()); for (Schema.Field field : input.getSchema().getFields()) { fields.add(field.getName().equals(conf.field) ? val.getSchema() : field); } int i = 0; for (Schema unionSchema : splitField.getSchema().getUnionSchemas()) { if (val.getSchema().equals(unionSchema)) { break; } i++; } StructuredRecord.Builder builder = StructuredRecord.builder(Schema.recordOf(input.getSchema().getName() + "." + i); for (Schema.Field field : input.getSchema().getFields()) { builder.set(field.getName(), input.get(field.getName()); } emitter.emit(String.valueOf(i), builder.build()); } private Map<String, Schema> getOutputs(Schema inputSchema) { Schema.Field splitField = inputSchema.getField(conf.field); if (splitField == null) { throw new IllegalArgumentException("Field " + conf.field + " does not exist in input schema."); } if (splitField.getType() != Schema.Type.UNION) { throw new IllegalArgumentException("Field " + conf.field + " must be a union field, but is of type " + splitField.getType()); } List<Schema.Field> fields = new ArrayList<>(); int i = 0; int splitFieldIndex = 0; for (Schema.Field inputField : inputSchema.getFields()) { if (!inputField.getName().equals(conf.field)) { fields.set(i, inputField); } else { splitFieldIndex = i; } i++; } Map<String, Schema> outputs = new HashMap<>(); i = 0; for (Schema unionSchema : splitField.getSchema().getUnionSchemas()) { fields.set(splitFieldIndex, unionSchema); outputs.put(String.valueOf(i), Schema.recordOf(inputSchema.getName() + "." + i, fields)); i++; } } @Path("outputSchemas") public Map<String, Schema> outputSchemas(GetSchemasRequest request) { this.conf = request; return getOutputs(request.inputSchema); } public static class Conf extends PluginConfig { @Description("The field to split on. Must be a union field.") private String field; } public static class GetSchemasRequest extends Conf { private Schema inputSchema; } }
An alternative approach is to modify existing plugins and allow them to write to multiple outputs. This, however, would add additional complexity to those plugin types, and would encourage people to write plugins that are not as modular. It would also have a performance impact, as splitting in Spark must be implemented as multiple filters on the same data.
API changes
New Programmatic APIs
new Splitter plugin type and MultiOutput configurers.
Deprecated Programmatic APIs
None
New REST APIs
None
Deprecated REST API
None
CLI Impact or Changes
None
UI Impact or Changes
UI will need enhancements to support multiple outputs. It will need some way to display and connect multiple outputs of a stage and manage schema for each output.
Security Impact
None
Impact on Infrastructure Outages
None
Test Scenarios
Test ID | Test Description | Expected Results |
---|---|---|
Releases
Release X.Y.Z
Release X.Y.Z
Related Work
- Work #1
- Work #2
- Work #3