Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 17 Current »

This document explains the design for storage and retrieval of the Field Level Lineage information.

Access Pattern:

  1. For a given dataset, find out the high level lineage (field mapping between source and destination datasets and not the detail operations which caused this conversion) going in backward direction within a given time range. Note that the response should be multi-level. For example, consider a case where "Employee" dataset is generated from "Person", "HR", and "Skills" datasets. Response would contain the field mappings between source datasets ("Person", "HR", and "Skills") and "Employee" dataset. However it is also possible that the source datasets are created/updated in the given time range. So response should also include the field mappings between the datasets which created the source datasets and source datasets themselves.  
  2. For a given dataset, find out the high level lineage (field mapping between source and destination datasets and not the detail operations which caused this conversion) going in forward direction within a given time range. Similar to the above query, response need to be multi-level.
  3. Given a dataset and field name, find out detail lineage (field mapping between the source and destination datasets along with the operations which caused this conversion) going in the backward direction. Response will only contain the operations belonging to the single level.
  4. Given a dataset and field name, find out detail lineage (field mapping between the source and destination datasets along with the operations which caused this conversion) going in the forward direction. Response will only contain the operations belonging to the single level.

REST API:

  1. Given a dataset and time range, get the high level lineage both in forward and backward direction.

    GET /v3/namespaces/<namespace-id>/endpoints/<endpoint-name>/fields/lineage?start=<start-ts>&end=<end-ts>&level=<level>
    
    
    Where:
    namespace-id: namespace name
    endpoint-name: name of the endpoint
    start-ts: starting timestamp(inclusive) in seconds
    end-ts: ending timestamp(exclusive) in seconds for lineage
    level: how many hops to make in backward/forward direction
    
    
    Sample response:
    [
      ...
      list of lineage mappings
      ...
    ]
    
    
    where each lineage mapping will be of the form:
    
    
    {
      "source": {
         "namespace": "ns",
         "name": "Person" 
      },
      "Destination": {
         "namespace": "ns",
         "name": "Employee"
      },
      "fieldmap": [
         { "from": "id", "to": "id" },
         { "from": "first_name", "to": "name"},
         { "from": "last_name", "to": "name"}
      ] 
    }
  2. Given a dataset and field, find out the detailed lineage.

    GET /v3/namespaces/<namespace-id>/endpoints/<endpoint-id>/fields/<field-name>/lineage?start=<start-ts>&end=<end-ts>&direction=<backward/forward>
     
    Where:
    namespace-id: namespace name
    endpoint-id: endpoint name
    field-name: name of the field for which lineage information to be retrieved
    start-ts: starting timestamp(inclusive) in seconds
    end-ts: ending timestamp(exclusive) in seconds for lineage
    direction: backward or forward
    
    
    Sample response:
    {
      [
       ...
          list of nodes
       ...
      ],
      [
       ...
          list of operations
       ...
      ],
      [
       ...
          list of connections
       ...
      ]    
    }
    
    where each Node is an object representing field. Node has id which is uniquely identifies the Node (combination of origin and name) and label which is used to display on the UI. Node can have optional sourceEndPoint and destinationEndPoint members which represents if this node is generated directly from Source EndPoint or written to the Destination EndPoint.
     
    {
      "id": "origin.fieldname",
      "label": "fieldname"
      "sourceEndPoint": {
         "name": "file",
         "namespace": "ns" 
      }  
    }
    
    
    each Operation is represented as 
    {
      "name": "IDENTITY",
      "description": "description associated with the operation". 
    }
    each Connection represents transformation between two nodes with operation name that caused it:
    {
      "from": "Node1.id",
      "to": "Node2.id",
      "operation": "opname"
    }
    
    

Store:

Based on the above example, we want following pieces of information to be stored in the "FieldLevelLineage" dataset


  1. Properties associated with the Dataset. For example: File path, name of the directory associated with the "HR File", Broker Id, Topic name etc associated with the Kafka plugin. This will be single row per dataset per namespace per run of the pipeline.
  2. Fields associated with the Dataset. This will be single row per dataset per namespace per run of the pipeline. We will store each field as a separate column in this row.
  3. Lineage information associated with the target dataset. For each target dataset per run there will be single row which will contain the entire lineage graph.

Example: With one run of the pipeline shown above, following will be the sample data in the store.

Row KeyColumn KeyValueNote
MyNamespace:HRFile:<runidX-inverted-start-time>:runidXProperties

inputDir=/data/2017/hr

regex=*.csv

failOnError=false

One Row per namespace per dataset per run
MyNamespace: PersonFile:<runidX-inverted-start-time>:runidXProperties

inputDir=/data/2017/person

regex=*.csv

failOnError=false

One Row per namespace per dataset per run
MyNamespace:EmployeeData:<runidX-inverted-start-time>:runidXProperties

rowid=ID

/*should we store schema too? what if that changes per run?*/

One Row per namespace per dataset per run
MyNamespace:EmployeeData:AllFields:<runidX-inverted-start-time>:runidXID

/* We may not necessarily required to store any value*/

created_time:12345678

updated_time:12345678

last_updated_by:runid_X

One Row per namespace per dataset per run
MyNamespace:EmployeeData:AllFields:<runidX-inverted-start-time>:runidXName

MyNamespace:EmployeeData:AllFields:<runidX-inverted-start-time>:runidXDepartment

MyNamespace:EmployeeData:AllFields:<runidX-inverted-start-time>:runidXContactDetails

MyNamespace:EmployeeData:AllFields:<runidX-inverted-start-time>:runidXJoiningDate

MyNamespace:EmployeeData:<runidX-inverted-start-time>:runidXLineage

JSON representation of the LineageGraph provided by app to the platform.


One row per run per target dataset

JSON stored for ID field:

{
  "sources": [
    {
      "name": "PersonFile",
      "properties": {
        "inputPath": "/data/2017/persons",
        "regex": "*.csv"
      }
    },
    {
      "name": "HRFile",
      "properties": {
        "inputPath": "/data/2017/hr",
        "regex": "*.csv"
      }
    }
  ],
  "targets": [
    {
      "name": "Employee Data"
    }
  ],
  "operations": [
    {
      "inputs": [
        {
          "name": "PersonRecord",
          "properties": {
            "source": "PersonFile"
          }
        }
      ],
      "outputs": [
        {
          "name": "body"
        }
      ],
      "name": "READ",
      "description": "Read Person file.",
      "properties": {
        "stage": "Person File Reader"
      }
    },
    {
      "inputs": [
        {
          "name": "body"
        }
      ],
      "outputs": [
        {
          "name": "SSN"
        }
      ],
      "name": "PARSE",
      "description": "Parse the body field",
      "properties": {
        "stage": "Person File Parser"
      }
    },
    {
      "inputs": [
        {
          "name": "HRRecord",
          "properties": {
            "source": "HRFile"
          }
        }
      ],
      "outputs": [
        {
          "name": "body"
        }
      ],
      "name": "READ",
      "description": "Read HR file.",
      "properties": {
        "stage": "HR File Reader"
      }
    },
    {
      "inputs": [
        {
          "name": "body"
        }
      ],
      "outputs": [
        {
          "name": "Employee_Name"
        },
        {
          "name": "Dept_Name"
        }
      ],
      "name": "PARSE",
      "description": "Parse the body field",
      "properties": {
        "stage": "HR File Parser"
      }
    },
    {
      "inputs": [
        {
          "name": "Employee_Name"
        },
        {
          "name": "Dept_Name"
        },
        {
          "name": "SSN"
        }
      ],
      "outputs": [
        {
          "name": "ID",
          "properties": {
            "target": "Employee Data"
          }
        }
      ],
      "name": "GenerateID",
      "description": "Generate unique Employee Id",
      "properties": {
        "stage": "Field Normalizer"
      }
    }
  ]
}

Few things to note:

  1. When platform receives the LineageGraph from the app, processing of the graph would be done before storing the data so the retrieval is straightforward.
  2. In the above pipeline, "HR File Parser" stage parses the body and generate fields "Employee_Name", "Dept_Name", "Salary", and "Start_Date". However the actual JSON stored for the ID field only contains operation related to the "Employee_Name" and "Dept_Name" since these are the only fields involved in the "ID" generation and not "Salary" and "Start_Date".

Retrieval:

Following REST APIs are available:

  1. Get the list of fields in the dataset.

    GET /v3/namespaces/<namespace-id>/datasets/<dataset-id>/fields?start=<start-ts>&end=<end-ts>
     
    Where:
    namespace-id: namespace name
    dataset-id: dataset name
    start-ts: starting timestamp(inclusive) in seconds
    end-ts: ending timestamp(exclusive) in seconds for lineage
     
    Sample Response:
    [
      {
        "name": "ID",
        "properties": {
          "creation_time": 12345678,
          "last_update_time": 12345688,
          "last_modified_run": "runid_x"
        }
      },
      {
        "name": "name",
        "properties": {
          "creation_time": 12345678,
          "last_update_time": 12345688,
          "last_modified_run": "runid_x"
        }
      },
      {
        "name": "Department",
        "properties": {
          "creation_time": 12345678,
          "last_update_time": 12345688,
          "last_modified_run": "runid_x"
        }
      },
      {
        "name": "ContactDetails",
        "properties": {
          "creation_time": 12345678,
          "last_update_time": 12345688,
          "last_modified_run": "runid_x"
        }
      },
      {
        "name": "JoiningDate",
        "properties": {
          "creation_time": 12345678,
          "last_update_time": 12345688,
          "last_modified_run": "runid_x"
        }
      }
    ]
  2. Get the properties associated with the dataset.

    GET /v3/namespaces/<namespace-id>/datasets/<dataset-id>/properties?start=<start-ts>&end=<end-ts>
    
    Where:
    namespace-id: namespace name
    dataset-id: dataset name
    start-ts: starting timestamp(inclusive) in seconds
    end-ts: ending timestamp(exclusive) in seconds for lineage
    Sample Response:
    [
       {
          "programRun": "run1",
          "properties": {
            "inputPath": "/data/2017/hr",
            "regex": "*.csv"
          } 
       },
       {
          "programRun": "run2",  
          "properties": {
            "inputPath": "/data/2017/anotherhrdata",
            "regex": "*.csv"
          }
       }
    ]
  3. Get the lineage associated with the particular field in a dataset.

    GET /v3/namespaces/<namespace-id>/datasets/<dataset-id>/fields/<field-name>/lineage?start=<start-ts>&end=<end-ts>
     
    Where:
    namespace-id: namespace name
    dataset-id: dataset name
    field-name: name of the field for which lineage information to be retrieved
    start-ts: starting timestamp(inclusive) in seconds
    end-ts: ending timestamp(exclusive) in seconds for lineage

    Sample response:

    {
      "startTimeInSeconds": 1442863938,
      "endTimeInSeconds": 1442881938,
      "paths": [
       ....
           list of paths which represent the different ways field is created 
       ....
      ] 
    }
     
    Each path will look as follows:
     {
      "sources": [
        {
          "name": "PersonFile",
          "properties": {
            "inputPath": "/data/2017/persons",
            "regex": "*.csv"
          }
        },
        {
          "name": "HRFile",
          "properties": {
            "inputPath": "/data/2017/hr",
            "regex": "*.csv"
          }
        }
      ],
      "targets": [
        {
          "name": "Employee Data"
        }
      ],
      "operations": [
        {
          "inputs": [
            {
              "name": "PersonRecord",
              "properties": {
                "source": "PersonFile"
              }
            }
          ],
          "outputs": [
            {
              "name": "body"
            }
          ],
          "name": "READ",
          "description": "Read Person file.",
          "properties": {
            "stage": "Person File Reader"
          }
        },
        {
          "inputs": [
            {
              "name": "body"
            }
          ],
          "outputs": [
            {
              "name": "SSN"
            }
          ],
          "name": "PARSE",
          "description": "Parse the body field",
          "properties": {
            "stage": "Person File Parser"
          }
        },
        {
          "inputs": [
            {
              "name": "HRRecord",
              "properties": {
                "source": "HRFile"
              }
            }
          ],
          "outputs": [
            {
              "name": "body"
            }
          ],
          "name": "READ",
          "description": "Read HR file.",
          "properties": {
            "stage": "HR File Reader"
          }
        },
        {
          "inputs": [
            {
              "name": "body"
            }
          ],
          "outputs": [
            {
              "name": "Employee_Name"
            },
            {
              "name": "Dept_Name"
            }
          ],
          "name": "PARSE",
          "description": "Parse the body field",
          "properties": {
            "stage": "HR File Parser"
          }
        },
        {
          "inputs": [
            {
              "name": "Employee_Name"
            },
            {
              "name": "Dept_Name"
            },
            {
              "name": "SSN"
            }
          ],
          "outputs": [
            {
              "name": "ID",
              "properties": {
                "target": "Employee Data"
              }
            }
          ],
          "name": "GenerateID",
          "description": "Generate unique Employee Id",
          "properties": {
            "stage": "Field Normalizer"
          }
        }
      ],
      "runs": [
        "runidX",
        "runidY",
        "runidZ"
      ]
    }







  • No labels