Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

This document describes the API for the Field Level Lineage feature. Please see corresponding examples and user stories for more details about the feature.

Platform API

  1. Input to the field operation can be Source (in case of Source plugin) or another field(in case of Transform plugin).

    Code Block
    languagejava
    // Represents input to the Field operations
    public abstract class Input {
       // Schema Namefield which is uniquelyinput identifiesto the inputoperation
       StringSchema.Field namefield;
     
       // Description associated with the input Source information if the field belongs to the source/dataset
       @Nullable
       StringSource descriptionsource;
     
       // Create input from thea source datasetField. Note that this willSince Schema can be nested, plain String cannot be 
       // alwaysused beto datasetuniquely asidentify eventhe forField externalin sourcesthe suchSchema, as Kafka,multiple FileFields can have same //name
    we create dataset for// lineagebut purposedifferent asnesting. identifiedIn byorder theto Referenceuniquely Name.identify a Field from publicthe staticSchema Inputwe ofDataset(Stringwill name)
    {   // need ...an enhancement in the }CDAP  platform so that each publicField staticcan Inputhold ofDataset(String namespace, String name) {
        ...
       }
    
    
       // Create input from a Field. Since Schema can be nested, plain String cannot be 
       // used to uniquely identify the Field in the Schema, as multiple Fields can have same name
       // but different nesting. In order to uniquely identify a Field from the Schema we will the transient
       // reference to the parent Field. From these references, then we can create unique field path.
       public static Input ofField(Schema.Field field) {
        ...
       }
     
       // Create input from the Field which belongs to the Source
       public static Input ofField(Source source, Schema.Field field) {
        ...
       }
    }
     
    // Represents the Source dataset information.
    public class Source {
       // needNamespace anassociated enhancementwith in the CDAPDataset. platform
    so that each Field// canThis holdis therequired transientsince programs can read //the referencedata tofrom thedifferent parent Fieldnamespace.
    From these references, thenString wenamespace;
    can 
    create unique field path.// Properties associated with publicthe static Input ofField(Schema.Field field) {source dataset. 
       //  ...
       }
    }
     
    // Concrete implementation of the Input which represents the Source
    public class DatasetInput extends Input {
       // Namespace associated with the Dataset. 
       // This is required since programs can read the data from different namespace.
       String namespace;
     
       // Properties associated with the Input. 
       // This can potentially store plugin properties for context.
       // For example in case of KafkaConsumer source, properties can include broker id, list of topics etc.
       Map<String, String> properties;
    }
     
    // Concrete implementation of the Input which represents the Field.
    public class FieldInput extends Input {
       Schema.Field field;
    }This can potentially store plugin properties for context.
       // For example in case of KafkaConsumer source, properties can include broker id, list of topics etc.
       Map<String, String> properties;
    }
    
  2. Output of field operation can only be field (This is to confirm, otherwise we would need hierarchy similar to the Input side)field.

    Code Block
    // Represent Output in the field operation
    public class Output {
       Schema.Field field;   
    }
  3. Field operation consists of one or more Input and one or more Output along with the name and its description.

    Code Block
    languagejava
    public class FieldOperation {
       // Operation name
       String name;
     
       // Optional detailed description about the operation
       String description;
     
       // Set of input fields participate in the operation 
       Set<Input> inputs;
     
       // Set of output fields generated as a part of this operation
       // Note that this can be null for example in case of "Drop Field" operation.
       // However if the field is dropped and its not present in the destination 
       // dataset it cannot be reached in the lineage graph.
       @Nullable
       Set<Output> outputs;
     
       // Builder for the FieldOperation
       public static Builder {
          String name;
          String description;
          Set<Input> inputs;
          Set<Output> outputs;
     
          private Builder(String name) {
            this.name = name;
            inputs = new HashSet<>();
            outputs = new HashSet<>();
          }
     
          public Builder setDescription(String description) {
             this.description = description;
             return this;
          }
     
          public Builder addInput(Input input) {
             inputs.add(input);
             return this;
          }
    
    
          public Builder addInputs(Collection<Input> inputs) {
             this.inputs.addAll(inputs);
             return this;
          }
     
          public Builder addOutput(Output output) {
             outputs.add(output);
             return this;
          }   
    
    
          public Builder addOutputs(Collection<Output> outputs) {
             this.outputs.addAll(outputs);
             return this;
          }   
       }       
    }
  4. List of field operations can be supplied to the platform through LineageRecorder interface. Program runtime context (such as MapReduceContext) can implement this interface.

    Code Block
    languagejava
    /**
     * This interface provides methods that will allow programs to record the field level operations.
     */
    public interface LineageRecorder {
        /**
         * Record the field level operations against the given destination.
         *
         * @param destination the destination for which to record field operations
         * @param fieldOperations The list of field operations.
        */
        void record(Destination destination, List<FieldOperation> fieldOperations);
    }
    
    
    // Destination represents the dataset of which the fields will be part of. 
    public class Destination {
       // Namespace associated with the Dataset. 
       // This is required since programs can read the data from different namespace.
       String namespace;
     
       // Name of the Dataset
       String name;
     
       // Description associated with the Dataset.
       String description;
     
       // Properties associated with the Dataset. 
       // This can potentially store plugin properties of the Sink for context.
       // For example in case of KafkaProducer sink, properties can include broker id, list of topics etc.
       Map<String, String> properties;
    }
  5. Example usage: Consider a simple MapReduce program which does concatenation of the two fields from the source. The Field level operations emitted would look like below -

    Code Block
    languagejava
    public class NoramlizerMapReduce extends AbstractMapReduce {
    public static final Schema OUTPUT_SCHEMA = Schema.recordOf("user", Schema.Field.of("UID", Schema.of(Schema.Type.LONG)),
                                                                Schema.Field.of("Name", Schema.of(Schema.Type.STRING)),
                                                                Schema.Field.of("DOB", Schema.of(Schema.Type.STRING)),
                                                                Schema.Field.of("Zip", Schema.of(Schema.Type.INT)));
    ...
       public void initialize() throws Exception {
          MapReduceContext context = getContext();
          context.addInput(Input.ofDataset("Users"));
          context.addOutput(Output.ofDataset("NormalizedUserProfiles"));
          DatasetProperties inputProperties = context.getAdmin().getDatasetProperties("Users");
          Schema inputSchema = inputProperties.getProperties().get(DatasetProperties.SCHEMA);
          DatasetProperties outputProperties = context.getAdmin().getDatasetProperties("NormalizedUserProfiles");
          Schema outputSchema = outputProperties.getProperties().get(DatasetProperties.SCHEMA);
    
          ...
          List<FieldOperation> operations = new ArrayList<>();
          
          Source source = Source.of("Users");
          FieldOperation.Builder builder = new FieldOperation.Builder("Concat");
          builder
             .setDescription("Concatenating the FirstName and LastName fields to create Name field.")
             .addInput(Input.ofField(source, inputSchema.getField("FirstName")))
             .addInput(Input.ofField(source, inputSchema.getField("LastName")))
             .addOutput(Output.ofField(outputSchema.getField("Name")))
          operations.add(builder.build());
    
    
          builder = new FieldOperation.Builder("Drop");
          builder
             .setDescription("deleting the FirstName field")
             .addInput(Input.ofField(source, inputSchema.getField("FirstName")))
          operations.add(builder.build());
     
          builder = new FieldOperation.Builder("Drop");
          builder
             .setDescription("deleting the LastName field")
             .addInput(Input.ofField(source, inputSchema.getField("LastName")))
          operations.add(builder.build());
    
          context.record(Destination.of("NormalizedUserProfiles"), operations);
          ...   
       }
    ...
    }
  6. For some input datasets (such as Filesets?), DatasetProperties.SCHEMA may not be available since not all datasets have schema associated with it. So the program will have to explicitly create Fields based on the logic of the program. Consider a WordCount MapReduce program which reads lines from the files and create ouput dataset with words and correpsonding counts. We need to create 3 different fields in the program to represent this as shown below -

    Code Block
    languagejava
    public class WordCount extends AbstractMapReduce {
    
      @Override
      public void initialize() throws Exception {
        MapReduceContext context = getContext();
        Job job = context.getHadoopJob();
        job.setMapperClass(Tokenizer.class);
        job.setReducerClass(Counter.class);
        job.setNumReduceTasks(1);
    
        String inputDataset = context.getRuntimeArguments().get("input");
        String outputDataset = context.getRuntimeArguments().get("output");
        context.addInput(Input.ofDataset(inputDataset));
        context.addOutput(Output.ofDataset(outputDataset));
    
    
        // Create dummy FieldsField for the lineage
        Schema.Field record = Schema.Field.of("record", Schema.of(Schema.Type.String));));
     
        // Fields from the output dataset
        Schema.Field word = Schema.Field.of("word", Schema.of(Schema.Type.String));
        Schema.Field count = Schema.Field.of("count", Schema.of(Schema.Type.Long));
        
    
        List<FieldOperation> operations = new ArrayList<>();
          
        FieldOperation.Builder builder = new FieldOperation.Builder("Read"Long));
        builder
        Source source .setDescription("Reading the input files")
          .addInput(Input.ofDataset= Source.of(inputDataset));  
    
       .addOutput(record) List<FieldOperation> operations =  operations.add(builder.build()new ArrayList<>();
         builder = new FieldOperation.Builder("Create");
        builder
          .setDescription("Creating Word and Count fields")
          .addInput(source, Input.ofField(record))
          .addOutput(Input.ofField(word))
          .addOutput(Input.ofField(count)) 
        operations.add(builder.build());
        context.record(Destination.of(outputDataset), operations);
        ...
      }
    }