This document describes the API for the Field Level Lineage feature. Please see corresponding examples and user stories for more details about the feature.
Platform API
Input to the field operation can be Source (in case of Source plugin) or another field(in case of Transform plugin).
Code Block language java // Represents input to the Field operations public abstract class Input { // Schema Namefield which is uniquelyinput identifiesto the inputoperation StringSchema.Field namefield; // Description associated with the input Source information if the field belongs to the source/dataset @Nullable StringSource descriptionsource; // Create input from thea source datasetField. Note that this willSince Schema can be nested, plain String cannot be // alwaysused beto datasetuniquely asidentify eventhe forField externalin sourcesthe suchSchema, as Kafka,multiple FileFields can have same //name we create dataset for// lineagebut purposedifferent asnesting. identifiedIn byorder theto Referenceuniquely Name.identify a Field from publicthe staticSchema Inputwe ofDataset(Stringwill name) { // need ...an enhancement in the }CDAP platform so that each publicField staticcan Inputhold ofDataset(String namespace, String name) { ... } // Create input from a Field. Since Schema can be nested, plain String cannot be the transient // reference to the parent Field. From these references, then we can create unique field path. public static Input ofField(Schema.Field field) { ... } // usedCreate toinput uniquelyfrom identify the Field which belongs into the Source Schema, as multiplepublic Fieldsstatic canInput have same name // but different nesting. In order to uniquely identify a Field from the Schema we will ofField(Source source, Schema.Field field) { ... } } // Represents the Source dataset information. public class Source { // needNamespace anassociated enhancement inwith the CDAPDataset. platform so that each Field// canThis holdis therequired transientsince programs can read //the referencedata tofrom thedifferent parent Fieldnamespace. From these references, thenString wenamespace; can create unique field path.// Name of the publicsource staticdataset Input ofField(Schema.Field field) {String name; ... // Properties associated }with }the source //dataset. Concrete implementation of the Input// whichThis representscan thepotentially Sourcestore publicplugin classproperties DatasetInput extends Input {for context. // NamespaceFor associatedexample within thecase Dataset.of KafkaConsumer source, properties can //include Thisbroker isid, requiredlist sinceof programstopics canetc. read the data fromMap<String, different namespace. String namespaceString> properties; }
Output of field operation can only be field.
Code Block // PropertiesRepresent associatedOutput within the Input.field operation public class // This can potentially store plugin properties for context. Output { Schema.Field field; // For example in case of KafkaConsumer source, properties can include broker id, list of topics etc. Map<String, String> properties; } // Concrete implementation of the Input which represents the Field. public class FieldInput extends Input { Schema.Field field; }
Output of field operation can only be field (This is to confirm, otherwise we would need hierarchy similar to the Input side).
Code Block // Represent Output in the field operation public class Output { Schema.Field field; }
Field operation consists of one or more Input and one or more Output along with the name and its description.
Code Block language java public class FieldOperation { }
Field operation consists of one or more Input and one or more Output along with the name and its description.
Code Block language java public class FieldOperation { // Operation name String name; // Optional detailed description about the operation String description; // Set of input fields participate in the operation Set<Input> inputs; // Set of output fields generated as a part of this operation // Note that this can be null for example in case of "Drop Field" operation. // OperationHowever nameif the field is Stringdropped name;and its not present in the //destination Optional detailed description about the// operationdataset it cannot be Stringreached description;in the lineage graph. //@Nullable Set of input fieldsSet<Output> participateoutputs; in the operation // Builder Set<Input>for inputs;the FieldOperation public //static SetBuilder of{ output fields generated as a part ofString thisname; operation // Note thatString thisdescription; can be null for example in caseSet<Input> ofinputs; "Drop Field" operation. //Set<Output> Howeveroutputs; if the field is dropped and its not presentprivate in the destinationBuilder(String name) { // dataset it cannot bethis.name reached= inname; the lineage graph. @Nullable inputs = Set<Output>new outputsHashSet<>(); // Builder for theoutputs FieldOperation= public static Builder {new HashSet<>(); } String name; public Builder setDescription(String description;) { Set<Input> inputs; this.description = description; Set<Output> outputs; return this; private Builder(String name) { } this.namepublic = name; Builder addInput(Input input) { inputs = new HashSet<>inputs.add(input); outputs = new HashSet<>()return this; } public Builder setDescriptionaddInputs(StringCollection<Input> descriptioninputs) { this.description = descriptioninputs.addAll(inputs); return this; } public Builder addInputaddOutput(InputOutput inputoutput) { inputsoutputs.add(inputoutput); return this; } public Builder addInputsaddOutputs(Collection<Input>Collection<Output> inputsoutputs) { this.inputsoutputs.addAll(inputsoutputs); return this; } } public Builder addOutput(Output output) { outputs.add(output); return this; } }
List of field operations can be supplied to the platform through LineageRecorder interface. Program runtime context (such as MapReduceContext) can implement this interface.
List of fieldCode Block language java /** * This interface provides methods that will allow programs to record the field level operations. */ public interface LineageRecorder { /** public Builder addOutputs(Collection<Output> outputs) { this.outputs.addAll(outputs); * Record the field level operations against the given destination. * * return@param this;destination the destination for which to record }field operations }* @param fieldOperations The list of field }
can be supplied to the platform through program runtime contextoperations
.
Example usage: Consider a simple MapReduce program which does concatenation of the two fields from the source. The Field level operations emitted would look like below -Code Block language java /** * This interface provides*/ methods that will allow programsvoid to record the field level operations. */ public interface LineageRecorder { /** * Record the field level operations against the given destination. * * @param destination the destination for which to record field operations * @param fieldOperations The list of field operations. */ void record(Destination destination, List<FieldOperation> fieldOperations); } // Destination represents the dataset of which the fields will be part of. public class Destination { // Namespacerecord(Destination destination, List<FieldOperation> fieldOperations); } // Destination represents the dataset of which the fields will be part of. public class Destination { // Namespace associated with the Dataset. // This is required since programs can read the data from different namespace. String namespace; // Name of the Dataset String name; // Description associated with the Dataset. String description; // Properties associated with the Dataset. // This iscan requiredpotentially sincestore programsplugin canproperties readof the dataSink fromfor different namespacecontext. String namespace; // NameFor ofexample thein Datasetcase of KafkaProducer sink, Stringproperties name;can include broker id, list //of Descriptiontopics associatedetc. with the Dataset. Map<String, String> String description; // Properties associated with the Dataset. // This can potentially store plugin properties of the Sink for context. // For example in case of KafkaProducer sink, properties can include broker id, list of topics etc. Map<String, String> properties; }
properties; }
Example usage: Consider a simple MapReduce program which does concatenation of the two fields from the source. The Field level operations emitted would look like below -
Code Block language java public class NoramlizerMapReduce extends AbstractMapReduce { public static final Schema OUTPUT_SCHEMA = Schema.recordOf("user", Schema.Field.of("UID", Schema.of(Schema.Type.LONG)), Schema.Field.of("Name", Schema.of(Schema.Type.STRING)), Schema.Field.of("DOB", Schema.of(Schema.Type.STRING)), Schema.Field.of("Zip", Schema.of(Schema.Type.INT))); ... public void initialize() throws Exception { MapReduceContext context = getContext(); context.addInput(Input.ofDataset("Users")); context.addOutput(Output.ofDataset("NormalizedUserProfiles")); DatasetProperties inputProperties = context.getAdmin().getDatasetProperties("Users"); Schema inputSchema = inputProperties.getProperties().get(DatasetProperties.SCHEMA); DatasetProperties outputProperties = context.getAdmin().getDatasetProperties("NormalizedUserProfiles"); Schema outputSchema = outputProperties.getProperties().get(DatasetProperties.SCHEMA); ... List<FieldOperation> operations = new ArrayList<>(); Source source = Source.of("Users"); FieldOperation.Builder builder = new FieldOperation.Builder("Concat"); builder .setDescription("Concatenating the FirstName and LastName fields to create Name field.") .addInput(Input.ofField(source, inputSchema.getField("FirstName"))) .addInput(Input.ofField(source, inputSchema.getField("LastName"))) .addOutput(Output.ofField(outputSchema.getField("Name"))) operations.add(builder.build()); builder = new FieldOperation.Builder("Drop"); builder .setDescription("deleting the FirstName field") .addInput(Input.ofField(source, inputSchema.getField("FirstName"))) operations.add(builder.build()); builder = new FieldOperation.Builder("Drop"); builder .setDescription("deleting the LastName field") .addInput(Input.ofField(source, inputSchema.getField("LastName"))) operations.add(builder.build()); context.record(Destination.of("NormalizedUserProfiles"), operations); ... } ... }
For some input datasets (such as Filesets?), DatasetProperties.SCHEMA may not be available since not all datasets have schema associated with it. So the program will have to explicitly create Fields based on the logic of the program. Consider a WordCount MapReduce program which reads lines from the files and create ouput dataset with words and correpsonding counts. We need to create 3 different fields in the program to represent this as shown below -
Code Block language java public class WordCount extends AbstractMapReduce { @Override public void initialize() throws Exception { MapReduceContext context = getContext(); Job job = context.getHadoopJob(); job.setMapperClass(Tokenizer.class); job.setReducerClass(Counter.class); job.setNumReduceTasks(1); String inputDataset = context.getRuntimeArguments().get("input"); String outputDataset = context.getRuntimeArguments().get("output"); context.addInput(Input.ofDataset(inputDataset)); context.addOutput(Output.ofDataset(outputDataset)); // Create dummy Field for the lineage Schema.Field record = Schema.Field.of("record", Schema.of(Schema.Type.String)); // Fields from the output dataset Schema.Field word = Schema.Field.of("word", Schema.of(Schema.Type.String)); Schema.Field count = Schema.Field.of("count", Schema.of(Schema.Type.Long)); Source source = Source.of(inputDataset); List<FieldOperation> operations = new ArrayList<>(); builder = new FieldOperation.Builder("Create"); builder .setDescription("Creating Word and Count fields") .addInput(source, Input.ofField(record)) .addOutput(Input.ofField(word)) .addOutput(Input.ofField(count)) operations.add(builder.build()); context.record(Destination.of(outputDataset), operations); ... } }
Plugin API
- Field level lineage API will not be available to the Source plugins. Data pipeline application can determine the output schema of the Source plugin which is supplied at configure time or runtime as a macro. [TODO: Talked to Albert earlier and few changes required to perform the schema propagation if it is supplied as a part of macro. File a JIRA for this]. Data pipeline application can also know which dataset, the source is reading from. [TODO: File JIRA for this to keep mapping of the stage and dataset in the app]. So for any stage subsequent to the Source, it is possible for app to call the platform with correctly specifying the input fields (combination of Schema.Field and Source).
- It is still possible for the source to add additional fields to the output schema, for example, file path in case of the File source plugin. Should file path is also be associated with the Source?
Transform plugins and Sink plugins will be able to provide the field level lineage using following API. This API will be available to the prepareRun method through context.
Code Block language java public interface LineageRecorder { void record(List<FieldOperation> operations); }
There are few differences between the FieldOperation class available to the plugins and the one from platform. Mainly the FieldOperation class available to the plugins will not have notion of the Source in it and it will be able to assign the metadata.
Code Block language java public class NoramlizerMapReduce extends AbstractMapReduce { public static final Schema OUTPUT_SCHEMA = Schema.recordOf("user", Schema.Field.of("UID", Schema.of(Schema.Type.LONG)), Input { Schema.Field field; } public class Output { Schema.Field field; } public class FieldOperation { // Operation name String name; // Optional detailed description about the operation String description; // Set of input fields participate in the operation Set<Input> inputs; // Set of output fields generated as a part of this operation // Schema.Field.of("Name", Schema.of(Schema.Type.STRING)), Note that this can be null for example in case of "Drop Field" operation. // However if the field is dropped and its not present in the destination // dataset it cannot be reached in the lineage graph. @Nullable Set<Output> outputs; // Boolean flag to Schema.Field.of("DOB", Schema.of(Schema.Type.STRING)), determine wheteher the metadata from the inputs to outputs is propagated boolean propagateMetadata; // Builder for the FieldOperation public static Builder { String name; String description; Set<Input> inputs; Schema.Field.of("Zip", Schema.of(Schema.Type.INT))); ...Set<Output> outputs; public void initialize() throws Exception {boolean propagateMetadata; MapReduceContext context =private getContextBuilder(String name); { context.addInput(Input.ofDataset("Users")); this.name = name; context.addOutput(Output.ofDataset("NormalizedUserProfiles")); DatasetPropertiesinputs inputProperties= = context.getAdmin().getDatasetProperties("Users"new HashSet<>(); Schema inputSchemaoutputs = new inputProperties.getPropertiesHashSet<>().get(DatasetProperties.SCHEMA);; } DatasetProperties outputProperties = context.getAdmin().getDatasetProperties("NormalizedUserProfiles"); public Builder setDescription(String description) { this.description = description; Schema outputSchema = outputProperties.getProperties().get(DatasetProperties.SCHEMA); return this; ... } List<FieldOperation> operations = new ArrayList<>(); public Builder addInput(Input input) { FieldOperation.Builder builder = new FieldOperationinputs.Builderadd("Concat"input); builder return this; } .setDescription("Concatenating the FirstName and LastName fields topublic createBuilder Name field.")addInputs(Collection<Input> inputs) { this.addInput(Inputinputs.ofField(inputSchema.getField("FirstName")))addAll(inputs); .addInput(Input.ofField(inputSchema.getField("LastName")))return this; } .addOutput(Output.ofField(outputSchema.getField("Name"))) public Builder operations.add(builder.build()); addOutput(Output output) { builder = new FieldOperationoutputs.Builderadd("Drop"output); builder return this; } .setDescription("deleting the FirstName field") public Builder .addInput(Input.ofField(inputSchema.getField("FirstName")))addOutputs(Collection<Output> outputs) { operationsthis.add(builderoutputs.buildaddAll()outputs); builder = new FieldOperation.Builder("Drop")return this; builder} .setDescription("deleting thepublic LastNameBuilder field"withMetadataPropagationEnabled() { .addInput(Input.ofField(inputSchema.getField("LastName")))this.propagateMetadata = true; operations.add(builder.build()); return this; context.record(Destination.of("NormalizedUserProfiles"), operations); } } ... } ... }
When metadata propagation is enabled during field operation, metadata from the input fields will be propagated to the output fields. We will need to clearly define what happens when the field operation has multiple inputs. Should the output fields get union of metadata? How the conflitcts are resolved then?