This document explains the design for storage and retrieval of the Field Level Lineage information.Example
Access Pattern:
...
In the lineage view, we show high level information first as shown below. Note that 'HR File', 'Person File', and 'Employee Data' are name of the input and output datasets, as indicated by the Reference name in the plugin properties.
Next detail level view contains the clickable fields from the input and output datasets. Note that 2D boxes represents fields belonging to the datasets. Since input datasets are of type file which does not have schema yet, plugin can provide any String name for it. In this case we are using "HR Record" and "Person Record" as name.
Once user clicks on particular field, field level lineage graph can be displayed.
Example: Graph for field ID, where circle represents the fields and edges represents operations with names in bubbles.
Note that "body" field is generated from "HR Record" as well as "Person Record". To distinguish it while storing we might need to prefix it with the stage name. - To distinguish it we can associate the name of the stage as a property with the Operation itself.
As an additional information for the source and target datasets we might want to show the associated properties such as file path, regex used etc.
Store:
Based on the above example, we want following pieces of information to be stored in the "FieldLevelLineage" dataset
- Properties associated with the Dataset. For example: File path, name of the directory associated with the "HR File", Broker Id, Topic name etc associated with the Kafka plugin. This will be single row per dataset per namespace. If the same dataset is used in multiple pipelines, but with different configurations the properties will be union of both.
- Fields associated with the Dataset. This will be single row per dataset per namespace. We will store each field as a separate column in this row. The value of the column can be additional properties such as creation time, last update time, runid responsible for last update etc.
- Lineage associated with the each field from the target dataset. For each field belonging to each target dataset, and for each run of the pipeline writing to that dataset, there will be one row.
Example: With one run of the pipeline shown above, following will be the sample data in the store.
Row Key | Column Key | Value | Note |
---|---|---|---|
MyNamespace:HRFile | Properties | inputDir=/data/2017/hr regex=*.csv failOnError=false | One Row per namespace per dataset |
MyNamespace: PersonFile | Properties | inputDir=/data/2017/person regex=*.csv failOnError=false | One Row per namespace per dataset |
MyNamespace:EmployeeData | Properties | rowid=ID /*should we store schema too? what if that changes per run?*/ | One Row per namespace per dataset |
MyNamespace:EmployeeData:AllFields | ID | /* We may not necessarily required to store any value*/ created_time:12345678 updated_time:12345678 last_updated_by:runid_X | One Row per namespace per dataset |
MyNamespace:EmployeeData:AllFields | Name | ||
MyNamespace:EmployeeData:AllFields | Department | ||
MyNamespace:EmployeeData:AllFields | ContactDetails | ||
MyNamespace:EmployeeData:AllFields | JoiningDate | ||
MyNamespace:EmployeeData:ID:<runidX-inverted-start-time>:runidX | Lineage | Please see the full JSON below.
| One row per run if field is part of target |
MyNamespace:EmployeeData:Name:<runidX-inverted-start-time>:runidX | Lineage | Similar JSON | One row per run if field is part of target |
MyNamespace:EmployeeData:ContactDetails:<runidX-inverted-start-time>:runidX | Lineage | Similar JSON | One row per run if field is part of target |
MyNamespace:EmployeeData:JoiningDate:<runidX-inverted-start-time>:runidX | Lineage | Similar JSON | One row per run if field is part of target |
JSON stored for ID field:
Code Block |
---|
{
"sources": [
{
"name": "PersonFile",
"properties": {
"inputPath": "/data/2017/persons",
"regex": "*.csv"
}
},
{
"name": "HRFile",
"properties": {
"inputPath": "/data/2017/hr",
"regex": "*.csv"
}
}
],
"targets": [
{
"name": "Employee Data"
}
],
"operations": [
{
"inputs": [
{
"name": "PersonRecord",
"properties": {
"source": "PersonFile"
}
}
],
"outputs": [
{
"name": "body"
}
],
"name": "READ",
"description": "Read Person file.",
"properties": {
"stage": "Person File Reader"
}
},
{
"inputs": [
{
"name": "body"
}
],
"outputs": [
{
"name": "SSN"
}
],
"name": "PARSE",
"description": "Parse the body field",
"properties": {
"stage": "Person File Parser"
}
},
{
"inputs": [
{
"name": "HRRecord",
"properties": {
"source": "HRFile"
}
}
],
"outputs": [
{
"name": "body"
}
],
"name": "READ",
"description": "Read HR file.",
"properties": {
"stage": "HR File Reader"
}
},
{
"inputs": [
{
"name": "body"
}
],
"outputs": [
{
"name": "Employee_Name"
},
{
"name": "Dept_Name"
}
],
"name": "PARSE",
"description": "Parse the body field",
"properties": {
"stage": "HR File Parser"
}
},
{
"inputs": [
{
"name": "Employee_Name"
},
{
"name": "Dept_Name"
},
{
"name": "SSN"
}
],
"outputs": [
{
"name": "ID",
"properties": {
"target": "Employee Data"
}
}
],
"name": "GenerateID",
"description": "Generate unique Employee Id",
"properties": {
"stage": "Field Normalizer"
}
}
]
} |
Few things to note:
- When platform receives the LineageGraph from the app, processing of the graph would be done before storing the data so the retrieval is straightforward.
- In the above pipeline, "HR File Parser" stage parses the body and generate fields "Employee_Name", "Dept_Name", "Salary", and "Start_Date". However the actual JSON stored for the ID field only contains operation related to the "Employee_Name" and "Dept_Name" since these are the only fields involved in the "ID" generation and not "Salary" and "Start_Date".
Retrieval:
Following REST APIs are available:
...
- For a given dataset, find out the high level lineage (field mapping between source and destination datasets and not the detail operations which caused this conversion) going in backward direction within a given time range. Note that the response should be multi-level. For example, consider a case where "Employee" dataset is generated from "Person", "HR", and "Skills" datasets. Response would contain the field mappings between source datasets ("Person", "HR", and "Skills") and "Employee" dataset. However it is also possible that the source datasets are created/updated in the given time range. So response should also include the field mappings between the datasets which created the source datasets and source datasets themselves.
- For a given dataset, find out the high level lineage (field mapping between source and destination datasets and not the detail operations which caused this conversion) going in forward direction within a given time range. Similar to the above query, response need to be multi-level.
- Given a dataset and field name, find out detail lineage (field mapping between the source and destination datasets along with the operations which caused this conversion) going in the backward direction. Response will only contain the operations belonging to the single level.
- Given a dataset and field name, find out detail lineage (field mapping between the source and destination datasets along with the operations which caused this conversion) going in the forward direction. Response will only contain the operations belonging to the single level.
REST API:
Given a dataset and time range, get the high level lineage both in forward and backward direction.
Code Block GET /v3/namespaces/<namespace-id>/datasetsendpoints/<dataset<endpoint-id>name>/fields/lineage?start=<start-ts>&end=<end-ts>&level=<level> Where: namespace-id: namespace name datasetendpoint-idname: name of datasetthe nameendpoint start-ts: starting timestamp(inclusive) in seconds end-ts: ending timestamp(exclusive) in seconds for lineage Sample Responselevel: [how many hops {to make in backward/forward direction "name": "ID", Sample response: [ "properties": { ... list of lineage mappings "creation_time": 12345678, ... ] where "last_update_time": 12345688, "last_modified_run": "runid_x" } }, each lineage mapping will be of the form: { "namesource": "name", "properties": { "creation_timenamespace": 12345678, "last_update_time": 12345688"ns", "last_modified_runname": "runid_xPerson" } }, { "name": "Department", "properties""Destination": { "creation_timenamespace": 12345678, "last_update_time": 12345688, "ns", "last_modified_runname": "runid_xEmployee" } }, { "name"fieldmap": "ContactDetails",[ "properties": { "creation_time": 12345678, "last_update_time": 12345688, "last_modified_run"from": "id", "to": "runid_xid" } }, { "name": "JoiningDate", "properties": { "creation_time": 12345678, "last_update_time": 12345688, "last_modified_run"from": "first_name", "to": "runid_x" } } ]
Get the properties associated with the dataset.
Get the lineage associated with the particular field in a datasetCode Block GET /v3/namespaces/<namespace-id>/datasets/<dataset-id>/properties?start=<start-ts>&end=<end-ts> Where: namespace-id: namespace name dataset-id: dataset name start-ts: starting timestamp(inclusive) in seconds end-ts: ending timestamp(exclusive) in seconds for lineage Sample Response: [ { "programRun": "run1", name"}, "properties": { "inputPathfrom": "/data/2017/hrlast_name", "regex"to": "*.csv" name"} ] }, { "programRun": "run2", "properties": { "inputPath": "/data/2017/anotherhrdata", "regex": "*.csv" } } ]
Given a dataset and field, find out the detailed lineage.
Code Block GET /v3/namespaces/<namespace-id>/datasetsendpoints/<dataset<endpoint-id>/fields/<field-name>/lineage?start=<start-ts>&end=<end-ts>&direction=<backward/forward> Where: namespace-id: namespace name datasetendpoint-id: datasetendpoint name field-name: name of the field for which lineage information to be retrieved start-ts: starting timestamp(inclusive) in seconds end-ts: ending timestamp(exclusive) in seconds for lineage
Sample response:
Code Block { "startTimeInSeconds": 1442863938, "endTimeInSeconds": 1442881938, "paths": direction: backward or forward Sample response: { [ .... list of paths which represent the different ways field is created nodes .... ] } Each path will look as follows: { "sources": , [ ... { list "name": "PersonFile",of operations "properties": { "inputPath": "/data/2017/persons", "regex": "*.csv" } }, { "name": "HRFile", "properties": { "inputPath": "/data/2017/hr", "regex": "*.csv" } } ... ], "targets": [ ... { list "name": "Employee Data"of connections }... ], "operations": [ } where each {Node is an object representing field. Node "inputs": [ { "name": "PersonRecord", "properties": { "source": "PersonFile" } } ], "outputs": [ { "name": "body" } ], "name": "READ", "description": "Read Person file.", "properties": { "stage": "Person File Reader" } }, { "inputs": [ { "name": "body" } ]has id which is uniquely identifies the Node (combination of origin and name) and label which is used to display on the UI. Node can have optional sourceEndPoint and destinationEndPoint members which represents if this node is generated directly from Source EndPoint or written to the Destination EndPoint. { "id": "origin.fieldname", "label": "fieldname" "sourceEndPoint": { "name": "file", "outputsnamespace": [ { "name": "SSN""ns" } } each Operation is represented ],as { "name": "PARSEIDENTITY", "description": "Parsedescription theassociated bodywith field", "properties": { "stage": "Person File Parser" } }, { "inputs": [ { "name": "HRRecord", "properties": { "source": "HRFile" } } ], "outputs": [ { "name": "body" } ], "name": "READ", "description": "Read HR file.", "properties": { "stage": "HR File Reader" } }, { "inputs": [ { "name": "body" } ], "outputs": [ { "name": "Employee_Name" }, { "name": "Dept_Name" } ], "name": "PARSE", "description": "Parse the body field", "properties": { "stage": "HR File Parser" } }, { "inputs": [ { "name": "Employee_Name" }, { "name": "Dept_Name" }, { "name": "SSN" } ], "outputs": [ { "name": "ID", "properties": { "target": "Employee Data" } } ], "name": "GenerateID", "description": "Generate unique Employee Id", "properties": { "stage": "Field Normalizer" } } ], "runs": [ "runidX", "runidY", "runidZ" ] }
...
the operation". } each Connection represents transformation between two nodes with operation name that caused it: { "from": "Node1.id", "to": "Node2.id", "operation": "opname" }
Store:
Field level lineage information will be stored in the "FieldLevelLineage" dataset.
This dataset will have following row keys
Data row: This row will store the actual operations data against the checksum of operations.
Row Key column: d c|<checksum-value> FieldLineageInfo object Backward lineage row: From the perspective of the destination endpoints, operations will represent the backward lineage. For each destination, separate row will be created.
Row Key column: c column: p b | <endpoint_ns> | <endpoint_name> | <inverted-start-time> | <id.run> <checksum> <program-run-id> Forward lineage row: From the perspective of the source endpoints, operationw will represent the forward lineage. For each source, separate row will be created.
Row Key column: c coulmn: p f | <endpoint_ns> | <endpoint_name> | <inverted-start-time> | <id.run> <checksum> <program-run-id>
FieldLineageInfo object will store following information:
- Collection<Operation>: operations representing the field lineage.
- Checksum of operations.
- Set of source endpoints.
- Set of destination endpoints.
- High level bi-directional mapping of fields from source endpoints to destination endpoints. This is for serving the access pattern 1 and 2 described above.
- For each field of source endpoint there would be graph from that field to the destination fields.
- For each field of destination endpoint there would be graph resulting into that field from different source fields.
Open questions:
- Per UI we also want to show the type of fields which we currently do not accept through API.
- What constitutes the dataset schema? For example for fileset should we assume that the fields generated by the READ operations are part of schema?