...
- Field level lineage API will not be available to the Source plugins. Data pipeline application can determine the output schema of the Source plugin which is supplied at configure time or runtime as a macro. [TODO: Talked to Albert earlier and few changes required to perform the schema propagation if it is supplied as a part of macro. File a JIRA for this]. Data pipeline application can also know which dataset, the source is reading from. [TODO: File JIRA for this to keep mapping of the stage and dataset in the app]. So for any stage subsequent to the Source, it is possible for app to call the platform with correctly specifying the input fields (combination of Schema.Field and Source).
- It is still possible for the source to add additional fields to the output schema, for example, file path in case of the File source plugin. Should file path is also be associated with the Source?
Transform plugins and Sink plugins will be able to provide the field level lineage using following API. This API will be available to the prepareRun method through context.
Code Block language java public interface LineageRecorder { void record(List<FieldOperation> operations); }
There are few differences between the FieldOperation class available to the plugins and the one from platform. Mainly the FieldOperation class available to the plugins will not have notion of the Source in it and it will be able to assign the metadata.
Code Block language java public class Input { Schema.Field field; } public class Output { Schema.Field field; } public class FieldOperation { // Operation name String name; // Optional detailed description about the operation String description; // Set of input fields participate in the operation Set<Input> inputs; // Set of output fields generated as a part of this operation // Note that this can be null for example in case of "Drop Field" operation. // However if the field is dropped and its not present in the destination // dataset it cannot be reached in the lineage graph. @Nullable Set<Output> outputs; // Boolean flag to determine wheteher the metadata from the inputs to outputs is propagated boolean propagateMetadata; // Builder for the FieldOperation public static Builder { String name; String description; Set<Input> inputs; Set<Output> outputs; boolean propagateMetadata; private Builder(String name) { this.name = name; inputs = new HashSet<>(); outputs = new HashSet<>(); } public Builder setDescription(String description) { this.description = description; return this; } public Builder addInput(Input input) { inputs.add(input); return this; } public Builder addInputs(Collection<Input> inputs) { this.inputs.addAll(inputs); return this; } public Builder addOutput(Output output) { outputs.add(output); return this; } public Builder addOutputs(Collection<Output> outputs) { this.outputs.addAll(outputs); return this; } public Builder withMetadataPropagationEnabled() { this.propagateMetadata = true; return this; } } }
When metadata propagation is enabled during field operation, metadata from the input fields will be propagated to the output fields. We will need to clearly define what happens when the field operation has multiple inputs. Should the output fields get union of metadata? How the conflitcts are resolved then?