Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 11 Next »

This document explains the design for storage and retrieval of the Field Level Lineage information.

Example: Consider a sample pipeline which performs join between HR dataset and Person dataset. The joined data is then normalized(to rename some fields, drop some fields, and create ID) before storing data into the data lake. Following diagram explains the sample pipeline which is reading from 2 file sources. Note that the 2D boxes represent the SCHEMA that is flowing through the pipeline while 3D boxes represents the pipeline stages.

 

In the lineage view, we show high level information first as shown below. Note that 'HR File', 'Person File', and 'Employee Data' are name of the input and output datasets, as indicated by the Reference name in the plugin properties.

Next detail level view contains the clickable fields from the input and output datasets. Note that 2D boxes represents fields belonging to the datasets. Since input datasets are of type file which does not have schema yet, plugin can provide any String name for it. In this case we are using "HR Record" and "Person Record" as name.

 

Once user clicks on particular field, field level lineage graph can be displayed.

Example: Graph for field ID, where circle represents the fields and edges represents operations with names in bubbles.

Note that "body" field is generated from "HR Record" as well as "Person Record". To distinguish it while storing we might need to prefix it with the stage name. - To distinguish it we can associate the name of the stage as a property with the Operation itself.

As an additional information for the source and target datasets we might want to show the associated properties such as file path, regex used etc.

Store:

Based on the above example, we want following pieces of information to be stored in the "FieldLevelLineage" dataset

  1. Properties associated with the Dataset. For example: File path, name of the directory associated with the "HR File", Broker Id, Topic name etc associated with the Kafka plugin. This will be single row per dataset per namespace. If the same dataset is used in multiple pipelines, but with different configurations the properties will be union of both. 
  2. Fields associated with the Dataset. This will be single row per dataset per namespace. We will store each field as a separate column in this row. The value of the column can be additional properties such as creation time, last update time, runid responsible for last update etc.
  3. Lineage associated with the each field from the target dataset. For each field belonging to each target dataset, and for each run of the pipeline writing to that dataset, there will be one row.

Example: With one run of the pipeline shown above, following will be the sample data in the store.

Row KeyColumn KeyValueNote
MyNamespace:HRFileProperties

inputDir=/data/2017/hr

regex=*.csv

failOnError=false

One Row per namespace per dataset
MyNamespace: PersonFileProperties

inputDir=/data/2017/person

regex=*.csv

failOnError=false

One Row per namespace per dataset
MyNamespace:EmployeeDataProperties

rowid=ID

/*should we store schema too? what if that changes per run?*/

One Row per namespace per dataset
MyNamespace:EmployeeData:AllFieldsID

/* We may not necessarily required to store any value*/

created_time:12345678

updated_time:12345678

last_updated_by:runid_X

One Row per namespace per dataset
MyNamespace:EmployeeData:AllFieldsName  
MyNamespace:EmployeeData:AllFieldsDepartment  
MyNamespace:EmployeeData:AllFieldsContactDetails  
MyNamespace:EmployeeData:AllFieldsJoiningDate  
MyNamespace:EmployeeData:ID:<runidX-inverted-start-time>:runidXLineage

Please see the full JSON below.

 

One row per run if field is part of target
MyNamespace:EmployeeData:Name:<runidX-inverted-start-time>:runidXLineageSimilar JSONOne row per run if field is part of target
MyNamespace:EmployeeData:ContactDetails:<runidX-inverted-start-time>:runidXLineageSimilar JSONOne row per run if field is part of target
MyNamespace:EmployeeData:JoiningDate:<runidX-inverted-start-time>:runidXLineageSimilar JSONOne row per run if field is part of target

JSON stored for ID field:

{
  "sources": [
    {
      "name": "PersonFile",
      "properties": {
        "inputPath": "/data/2017/persons",
        "regex": "*.csv"
      }
    },
    {
      "name": "HRFile",
      "properties": {
        "inputPath": "/data/2017/hr",
        "regex": "*.csv"
      }
    }
  ],
  "targets": [
    {
      "name": "Employee Data"
    }
  ],
  "operations": [
    {
      "inputs": [
        {
          "name": "PersonRecord",
          "properties": {
            "source": "PersonFile"
          }
        }
      ],
      "outputs": [
        {
          "name": "body"
        }
      ],
      "name": "READ",
      "description": "Read Person file.",
      "properties": {
        "stage": "Person File Reader"
      }
    },
    {
      "inputs": [
        {
          "name": "body"
        }
      ],
      "outputs": [
        {
          "name": "SSN"
        }
      ],
      "name": "PARSE",
      "description": "Parse the body field",
      "properties": {
        "stage": "Person File Parser"
      }
    },
    {
      "inputs": [
        {
          "name": "HRRecord",
          "properties": {
            "source": "HRFile"
          }
        }
      ],
      "outputs": [
        {
          "name": "body"
        }
      ],
      "name": "READ",
      "description": "Read HR file.",
      "properties": {
        "stage": "HR File Reader"
      }
    },
    {
      "inputs": [
        {
          "name": "body"
        }
      ],
      "outputs": [
        {
          "name": "Employee_Name"
        },
        {
          "name": "Dept_Name"
        }
      ],
      "name": "PARSE",
      "description": "Parse the body field",
      "properties": {
        "stage": "HR File Parser"
      }
    },
    {
      "inputs": [
        {
          "name": "Employee_Name"
        },
        {
          "name": "Dept_Name"
        },
        {
          "name": "SSN"
        }
      ],
      "outputs": [
        {
          "name": "ID",
          "properties": {
            "target": "Employee Data"
          }
        }
      ],
      "name": "GenerateID",
      "description": "Generate unique Employee Id",
      "properties": {
        "stage": "Field Normalizer"
      }
    }
  ]
}

Few things to note:

  1. When platform receives the LineageGraph from the app, processing of the graph would be done before storing the data so the retrieval is straightforward.
  2. In the above pipeline, "HR File Parser" stage parses the body and generate fields "Employee_Name", "Dept_Name", "Salary", and "Start_Date". However the actual JSON stored for the ID field only contains operation related to the "Employee_Name" and "Dept_Name" since these are the only fields involved in the "ID" generation and not "Salary" and "Start_Date".

Retrieval:

Following REST APIs are available:

  1. Get the list of fields in the dataset.

    GET /v3/namespaces/<namespace-id>/datasets/<dataset-id>/fields?start=<start-ts>&end=<end-ts>
     
    Where:
    namespace-id: namespace name
    dataset-id: dataset name
    start-ts: starting timestamp(inclusive) in seconds
    end-ts: ending timestamp(exclusive) in seconds for lineage
     
    Sample Response:
    [
      {
        "name": "ID",
        "properties": {
          "creation_time": 12345678,
          "last_update_time": 12345688,
          "last_modified_run": "runid_x"
        }
      },
      {
        "name": "name",
        "properties": {
          "creation_time": 12345678,
          "last_update_time": 12345688,
          "last_modified_run": "runid_x"
        }
      },
      {
        "name": "Department",
        "properties": {
          "creation_time": 12345678,
          "last_update_time": 12345688,
          "last_modified_run": "runid_x"
        }
      },
      {
        "name": "ContactDetails",
        "properties": {
          "creation_time": 12345678,
          "last_update_time": 12345688,
          "last_modified_run": "runid_x"
        }
      },
      {
        "name": "JoiningDate",
        "properties": {
          "creation_time": 12345678,
          "last_update_time": 12345688,
          "last_modified_run": "runid_x"
        }
      }
    ]
  2. Get the properties associated with the dataset.

    GET /v3/namespaces/<namespace-id>/datasets/<dataset-id>/properties?start=<start-ts>&end=<end-ts>
    
    Where:
    namespace-id: namespace name
    dataset-id: dataset name
    start-ts: starting timestamp(inclusive) in seconds
    end-ts: ending timestamp(exclusive) in seconds for lineage
    Sample Response:
    [
       {
          "programRun": "run1",
          "properties": {
            "inputPath": "/data/2017/hr",
            "regex": "*.csv"
          } 
       },
       {
          "programRun": "run2",  
          "properties": {
            "inputPath": "/data/2017/anotherhrdata",
            "regex": "*.csv"
          }
       }
    ]
  3. Get the lineage associated with the particular field in a dataset.

    GET /v3/namespaces/<namespace-id>/datasets/<dataset-id>/fields/<field-name>/lineage?start=<start-ts>&end=<end-ts>
     
    Where:
    namespace-id: namespace name
    dataset-id: dataset name
    field-name: name of the field for which lineage information to be retrieved
    start-ts: starting timestamp(inclusive) in seconds
    end-ts: ending timestamp(exclusive) in seconds for lineage

    Sample response:

    {
      "startTimeInSeconds": 1442863938,
      "endTimeInSeconds": 1442881938,
      "paths": [
       ....
           list of paths which represent the different ways field is created 
       ....
      ] 
    }
     
    Each path will look as follows:
     {
      "sources": [
        {
          "name": "PersonFile",
          "properties": {
            "inputPath": "/data/2017/persons",
            "regex": "*.csv"
          }
        },
        {
          "name": "HRFile",
          "properties": {
            "inputPath": "/data/2017/hr",
            "regex": "*.csv"
          }
        }
      ],
      "targets": [
        {
          "name": "Employee Data"
        }
      ],
      "operations": [
        {
          "inputs": [
            {
              "name": "PersonRecord",
              "properties": {
                "source": "PersonFile"
              }
            }
          ],
          "outputs": [
            {
              "name": "body"
            }
          ],
          "name": "READ",
          "description": "Read Person file.",
          "properties": {
            "stage": "Person File Reader"
          }
        },
        {
          "inputs": [
            {
              "name": "body"
            }
          ],
          "outputs": [
            {
              "name": "SSN"
            }
          ],
          "name": "PARSE",
          "description": "Parse the body field",
          "properties": {
            "stage": "Person File Parser"
          }
        },
        {
          "inputs": [
            {
              "name": "HRRecord",
              "properties": {
                "source": "HRFile"
              }
            }
          ],
          "outputs": [
            {
              "name": "body"
            }
          ],
          "name": "READ",
          "description": "Read HR file.",
          "properties": {
            "stage": "HR File Reader"
          }
        },
        {
          "inputs": [
            {
              "name": "body"
            }
          ],
          "outputs": [
            {
              "name": "Employee_Name"
            },
            {
              "name": "Dept_Name"
            }
          ],
          "name": "PARSE",
          "description": "Parse the body field",
          "properties": {
            "stage": "HR File Parser"
          }
        },
        {
          "inputs": [
            {
              "name": "Employee_Name"
            },
            {
              "name": "Dept_Name"
            },
            {
              "name": "SSN"
            }
          ],
          "outputs": [
            {
              "name": "ID",
              "properties": {
                "target": "Employee Data"
              }
            }
          ],
          "name": "GenerateID",
          "description": "Generate unique Employee Id",
          "properties": {
            "stage": "Field Normalizer"
          }
        }
      ],
      "runs": [
        "runidX",
        "runidY",
        "runidZ"
      ]
    }

 

 

 

 

 

 

  • No labels