...
Input to the field operation can be Source (in case of Source plugin) or another field(in case of Transform plugin).
Code Block language java // Represents input to the Field operations public abstract class Input { // Name which uniquely identifies the input String name; // Description associated with the input String description; // Create input from the source dataset. Note that this will // always be dataset as even for external sources such as Kafka, File // we create dataset for lineage purpose as identified by the Reference Name. public static Input ofDataset(String name) { ... } public static Input ofDataset(String namespace, String name) { ... } // Create input from a Field. Since Schema can be nested, plain String cannot be // used to uniquely identify the Field in the Schema, as multiple Fields can have same name // but different nesting. In order to uniquely identify a Field from the Schema we will // need an enhancement in the CDAP platform so that each Field can hold the transient // reference to the parent Field. From these references, then we can create unique field path. public static Input ofField(Schema.Field field) { ... } } // Concrete implementation of the Input which represents the Source public class DatasetInput extends Input { // Namespace associated with the Dataset. // This is required since programs can read the data from different namespace. String namespace; // Properties associated with the Input. // This can potentially store plugin properties for context. // For example in case of KafkaConsumer source, properties can include broker id, list of topics etc. Map<String, String> properties; } // Concrete implementation of the Input which represents the Field. public class FieldInput extends Input { Schema.Field field; }
Output of field operation can only be field (This is to confirm, otherwise we would need hierarchy similar to the Input side).
Code Block // Represent Output in the field operation public class Output { Schema.Field field; }
Field operation consists of one or more Input and one or more Output along with the name and its description.
Code Block language java public class FieldOperation { // Operation name String name; // Optional detailed description about the operation String description; // Set of input fields participate in the operation Set<Input> inputs; // Set of output fields generated as a part of this operation // Note that this can be null for example in case of "Drop Field" operation. // However if the field is dropped and its not present in the destination // dataset it cannot be reached in the lineage graph. @Nullable Set<Output> outputs; // Builder for the FieldOperation public static Builder { String name; String description; Set<Input> inputs; Set<Output> outputs; private Builder(String name) { this.name = name; inputs = new HashSet<>(); outputs = new HashSet<>(); } public Builder setDescription(String description) { this.description = description; return this; } public Builder addInput(Input input) { inputs.add(input); return this; } public Builder addInputs(Collection<Input> inputs) { this.inputs.addAll(inputs); return this; } public Builder addOutput(Output output) { outputs.add(output); return this; } public Builder addOutputs(Collection<Output> outputs) { this.outputs.addAll(outputs); return this; } } }
List of field operations can be supplied to the platform through program LineageRecorder interface. Program runtime context (such as MapReduceContext) can implement this interface.
Code Block language java /** * This interface provides methods that will allow programs to record the field level operations. */ public interface LineageRecorder { /** * Record the field level operations against the given destination. * * @param destination the destination for which to record field operations * @param fieldOperations The list of field operations. */ void record(Destination destination, List<FieldOperation> fieldOperations); } // Destination represents the dataset of which the fields will be part of. public class Destination { // Namespace associated with the Dataset. // This is required since programs can read the data from different namespace. String namespace; // Name of the Dataset String name; // Description associated with the Dataset. String description; // Properties associated with the Dataset. // This can potentially store plugin properties of the Sink for context. // For example in case of KafkaProducer sink, properties can include broker id, list of topics etc. Map<String, String> properties; }
Example usage: Consider a simple MapReduce program which does concatenation of the two fields from the source. The Field level operations emitted would look like below -
Code Block language java public class NoramlizerMapReduce extends AbstractMapReduce { public static final Schema OUTPUT_SCHEMA = Schema.recordOf("user", Schema.Field.of("UID", Schema.of(Schema.Type.LONG)), Schema.Field.of("Name", Schema.of(Schema.Type.STRING)), Schema.Field.of("DOB", Schema.of(Schema.Type.STRING)), Schema.Field.of("Zip", Schema.of(Schema.Type.INT))); ... public void initialize() throws Exception { MapReduceContext context = getContext(); context.addInput(Input.ofDataset("Users")); context.addOutput(Output.ofDataset("NormalizedUserProfiles")); DatasetProperties inputProperties = context.getAdmin().getDatasetProperties("Users"); Schema inputSchema = inputProperties.getProperties().get(DatasetProperties.SCHEMA); DatasetProperties outputProperties = context.getAdmin().getDatasetProperties("NormalizedUserProfiles"); Schema outputSchema = outputProperties.getProperties().get(DatasetProperties.SCHEMA); ... List<FieldOperation> operations = new ArrayList<>(); FieldOperation.Builder builder = new FieldOperation.Builder("Concat"); builder .setDescription("Concatenating the FirstName and LastName fields to create Name field.") .addInput(Input.ofField(inputSchema.getField("FirstName"))) .addInput(Input.ofField(inputSchema.getField("LastName"))) .addOutput(Output.ofField(outputSchema.getField("Name"))) operations.add(builder.build()); builder = new FieldOperation.Builder("Drop"); builder .setDescription("deleting the FirstName field") .addInput(Input.ofField(inputSchema.getField("FirstName"))) operations.add(builder.build()); builder = new FieldOperation.Builder("Drop"); builder .setDescription("deleting the LastName field") .addInput(Input.ofField(inputSchema.getField("LastName"))) operations.add(builder.build()); context.record(Destination.of("NormalizedUserProfiles"), operations); ... } ... }
...