Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Input to the field operation can be Source (in case of Source plugin) or another field(in case of Transform plugin).

    Code Block
    languagejava
    // Represents input to the Field operations
    public abstract class Input {
       // Name which uniquely identifies the input
       String name;
       // Description associated with the input
       String description;
     
       // Create input from the source dataset. Note that this will 
       // always be dataset as even for external sources such as Kafka, File
       // we create dataset for lineage purpose as identified by the Reference Name.
       public static Input ofDataset(String name) {
        ...
       }
     
       public static Input ofDataset(String namespace, String name) {
        ...
       }
    
    
       // Create input from a Field. Since Schema can be nested, plain String cannot be 
       // used to uniquely identify the Field in the Schema, as multiple Fields can have same name
       // but different nesting. In order to uniquely identify a Field from the Schema we will 
       // need an enhancement in the CDAP platform so that each Field can hold the transient
       // reference to the parent Field. From these references, then we can create unique field path.
       public static Input ofField(Schema.Field field) {
        ...
       }
    }
     
    // Concrete implementation of the Input which represents the Source
    public class DatasetInput extends Input {
       // Namespace associated with the Dataset. 
       // This is required since programs can read the data from different namespace.
       String namespace;
     
       // Properties associated with the Input. 
       // This can potentially store plugin properties for context.
       // For example in case of KafkaConsumer source, properties can include broker id, list of topics etc.
       Map<String, String> properties;
    }
     
    // Concrete implementation of the Input which represents the Field.
    public class FieldInput extends Input {
       Schema.Field field;
    }
  2. Output of field operation can only be field (This is to confirm, otherwise we would need hierarchy similar to the Input side).

    Code Block
    // Represent Output in the field operation
    public class Output {
       Schema.Field field;   
    }
  3. Field operation consists of one or more Input and one or more Output along with the name and its description.

    Code Block
    languagejava
    public class FieldOperation {
       // Operation name
       String name;
     
       // Optional detailed description about the operation
       String description;
     
       // Set of input fields participate in the operation 
       Set<Input> inputs;
     
       // Set of output fields generated as a part of this operation
       // Note that this can be null for example in case of "Drop Field" operation.
       // However if the field is dropped and its not present in the destination 
       // dataset it cannot be reached in the lineage graph.
       @Nullable
       Set<Output> outputs;
     
       // Builder for the FieldOperation
       public static Builder {
          String name;
          String description;
          Set<Input> inputs;
          Set<Output> outputs;
     
          private Builder(String name) {
            this.name = name;
            inputs = new HashSet<>();
            outputs = new HashSet<>();
          }
     
          public Builder setDescription(String description) {
             this.description = description;
             return this;
          }
     
          public Builder addInput(Input input) {
             inputs.add(input);
             return this;
          }
    
    
          public Builder addInputs(Collection<Input> inputs) {
             this.inputs.addAll(inputs);
             return this;
          }
     
          public Builder addOutput(Output output) {
             outputs.add(output);
             return this;
          }   
    
    
          public Builder addOutputs(Collection<Output> outputs) {
             this.outputs.addAll(outputs);
             return this;
          }   
       }       
    }
  4. List of field operations can be supplied to the platform through program LineageRecorder interface. Program runtime context (such as MapReduceContext) can implement this interface.

    Code Block
    languagejava
    /**
     * This interface provides methods that will allow programs to record the field level operations.
     */
    public interface LineageRecorder {
        /**
         * Record the field level operations against the given destination.
         *
         * @param destination the destination for which to record field operations
         * @param fieldOperations The list of field operations.
        */
        void record(Destination destination, List<FieldOperation> fieldOperations);
    }
    
    
    // Destination represents the dataset of which the fields will be part of. 
    public class Destination {
       // Namespace associated with the Dataset. 
       // This is required since programs can read the data from different namespace.
       String namespace;
     
       // Name of the Dataset
       String name;
     
       // Description associated with the Dataset.
       String description;
     
       // Properties associated with the Dataset. 
       // This can potentially store plugin properties of the Sink for context.
       // For example in case of KafkaProducer sink, properties can include broker id, list of topics etc.
       Map<String, String> properties;
    }
  5. Example usage: Consider a simple MapReduce program which does concatenation of the two fields from the source. The Field level operations emitted would look like below -

    Code Block
    languagejava
    public class NoramlizerMapReduce extends AbstractMapReduce {
    public static final Schema OUTPUT_SCHEMA = Schema.recordOf("user", Schema.Field.of("UID", Schema.of(Schema.Type.LONG)),
                                                                Schema.Field.of("Name", Schema.of(Schema.Type.STRING)),
                                                                Schema.Field.of("DOB", Schema.of(Schema.Type.STRING)),
                                                                Schema.Field.of("Zip", Schema.of(Schema.Type.INT)));
    ...
       public void initialize() throws Exception {
          MapReduceContext context = getContext();
          context.addInput(Input.ofDataset("Users"));
          context.addOutput(Output.ofDataset("NormalizedUserProfiles"));
          DatasetProperties inputProperties = context.getAdmin().getDatasetProperties("Users");
          Schema inputSchema = inputProperties.getProperties().get(DatasetProperties.SCHEMA);
          DatasetProperties outputProperties = context.getAdmin().getDatasetProperties("NormalizedUserProfiles");
          Schema outputSchema = outputProperties.getProperties().get(DatasetProperties.SCHEMA);
    
          ...
          List<FieldOperation> operations = new ArrayList<>();
          
          FieldOperation.Builder builder = new FieldOperation.Builder("Concat");
          builder
             .setDescription("Concatenating the FirstName and LastName fields to create Name field.")
             .addInput(Input.ofField(inputSchema.getField("FirstName")))
             .addInput(Input.ofField(inputSchema.getField("LastName")))
             .addOutput(Output.ofField(outputSchema.getField("Name")))
          operations.add(builder.build());
    
    
          builder = new FieldOperation.Builder("Drop");
          builder
             .setDescription("deleting the FirstName field")
             .addInput(Input.ofField(inputSchema.getField("FirstName")))
          operations.add(builder.build());
     
          builder = new FieldOperation.Builder("Drop");
          builder
             .setDescription("deleting the LastName field")
             .addInput(Input.ofField(inputSchema.getField("LastName")))
          operations.add(builder.build());
    
          context.record(Destination.of("NormalizedUserProfiles"), operations);
          ...   
       }
    ...
    }

...