Table of Contents |
---|
Checklist
- User Stories Documented
- User Stories Reviewed
- Design Reviewed
- APIs reviewed
- Release priorities assigned
- Test cases reviewed
- Blog post
Overview
This addition will allow users to see the history of directives made to a column of data.
Goals
User should be able to see lineage information, ie. directives, for columns
Storing lineage information should have minimal/no impact to the wrangler application
User Stories
As a user, I should be able to see the directives applied to a column of data.
As a user, I should be able to see the directives applied to a column of data over any period of time.
- As a user, I should be able to see how any column got to its current state as well as other columns that were impacted by it
As a user, I should be able to add tags and properties to specific columns of data (stretch)
Design
Save directives for each column in AST format after parsing of directives along with necessary information (time, dataset/stream name/id, etc.).
Use TMS to send information to platform.
Unmarshal and store in HBase.
Access to lineage should only be available through the platform
Questions
- How to get source and sink datasets?
- How to ensure this works with multiple transform nodes, even just wrangler nodes?
- How to send and consume data from wrangler to CDAP?
- Does ParseTree have all necessary information for every directive?
Approach
Computing Lineage Approach:
Compute lineage without looking at data by backtracking
Advantages:
- No instance variables added to step classes
- Faster
Disadvantages:
- Requires stricter rule on directives, ie. every rename must give old and new name. See * below for why
*Backtrack starting with columns A,B,C. Previous directive is "set-columns A B C". The directive before that is "lowercase <column>" where <column> is nameOfOwner. No way of knowing what nameOfOwner refers to without looking at data.
API changes
New Programmatic APIs:
FieldLevelLineageTransformStep:
FieldLevelLineage TransformStep is a Java class that contains all the necessary information to be sent to the CDAP platform
Instance should be initialized per wrangler node passing in a list of final columns (output schema) and the name of the wrangler node.
store() takes a ParseTree and stores all the necessary information into lineages.
Stores lineage for each column in lineage instance variable which is a map to ASTs.
interface that represents a modification done to a dataset by a transform stage.
Code Block | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|
| ||||||||||
public class FieldLevelLineage { private enum Type { ADD, DROP, MODIFY, READ, RENAME } class BranchingStepNodeimport com.sun.istack.internal.Nullable; /** * <p>A TransformStep represents a modification done by a Transform.</p> */ public interface TransformStep { /** boolean continueUp, continueDown; * @return the name Stringof directive;this modification Map<String,*/ Integer> upBranches, downBranches; // constructors, toString(), ...String getName(); }/** private final* String@return name;additional information about privatethis finalmodification long startTime; private*/ Set<String> startColumns; @Nullable private final Set<String> endColumns; private final transient Set<String> currentColumns; private final Map<String, List<BranchingStepNode>> lineage; public FieldLevelLineage(String name, String[] columnNames) {...} // getters // helpers for store public void store(ParseTree tree) { /** * Go through tree one directive at a time * For each column associated with the directive get label * Store directive for this column correctly into lineage based on label */ } // toString() } |
Parse Tree should contain all columns affected per directive.
Labels:
- All columns should be labeled one of: {Read, Drop, Modify, Add, Rename}
- Read: column's name or values are read. Including reading values and modifying. ie. "filter-rows-on..."
- Drop: column is dropped
- Add: column is added
- Rename: column's name is replaced with another name
- Modify: column's values altered and doesn't fit in any of the other categories, ie. "lowercase"
For Read, Drop, Modify, and Add the column and associated label should be something like -> Column: Name, Label: add.
For Rename the column and associated label should be something like -> Column: body_5 DOB, Label: rename. // Basically some way of having both names, currently using a space. Old/new for rename. Swap example: A B, Label: rename and B A, Label: rename. Both the old & new names should be part of the information being sent somehow.
For Read, Modify, and Add there is another option; instead of column name can return {"all columns", "all columns minus _ _ _ _ ", "all columns formatted %s_%d"}, along with label. ie. Column: "all columns minus body", Label: add. "all columns" refers to all columns present in dataset after execution of this step. Format string only accepts %s and %d.
For Rename, and Drop this option is not available; must explicitly return name of all columns involved.
**Assumption (can be changed): In ParseTree all columns should be in order of impact. ie. If directive is "copy A A_copy". "A, Label: read" should be before "A_copy, Label: add".
Algorithm visual: Example wrangler application --> lineage --> ASTs for each column
AbstractStorageNode:
AbstractStorageNode is an abstract Java class that represents an element being stored in HBase.
These nodes are created and connected in an AST format by LineageStorageTreeString getInformation();
} |
FieldLevelLineage:
FieldLevelLineage is a Java interface that contains all the necessary field-level lineage information to be sent to the CDAP platform, for transform stages.
Code Block | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|
| ||||||||||
import java.util.List;
import java.util.Map;
/**
* <p>FieldLevelLineage is a DataType for computing lineage for each field in a dataset.
* An instance of this type can be sent to be sent to platform through API for transform stages.</p>
*/
public interface FieldLevelLineage {
/**
* <p>A BranchingTransformStepNode represents a linking between a {@link TransformStep} and a field of data.
* It contains data about how this TransformStep affected this field.</p>
*/
interface BranchingTransformStepNode {
/**
* @return the index of the TransformStep in {@link #getSteps()}
*/
int getTransformStepNumber();
/**
* @return true if this TransformStep does not add this field
*/
boolean continueBackward(); // continue down
/**
* @return true if this TransformStep does not drop this field
*/
boolean continueForward(); // continue up
/**
* This map should contain every other field that was impacted by this field in this TransformStep
* @return A map from field name to the index of the next TransformStep using {@link #getLineage()}
* Usage: getLineage().get(field).get(index).
*/
Map<String, Integer> getImpactedBranches(); // getUpBranches
/**
* This map should contain every other field that impacted this field with this TransformStep
* @return A map from field name to the index of the previous transformation step in {@link #getLineage()}
* Usage: getLineage().get(field).get(index).
*/
Map<String, Integer> getImpactingBranches(); // getDownBranches
}
/**
* @return a list of all TransformSteps executed by this Transform in order.
*/
List<TransformStep> getSteps();
/**
* @return a mapping of field names to a list of {@link BranchingTransformStepNode} in reverse order.
*/
Map<String, List<BranchingTransformStepNode>> getLineage();
} |
WranglerFieldLevelLineage:
Instance should be initialized per wrangler node passing in a list of final columns (output schema) and the name of the wrangler node.
store() takes a ParseTree and stores all the necessary information into lineages.
Stores lineage for each column in lineage instance variable which is a map to ASTs.
Code Block | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|
| ||||||||||
public abstract class AbstractStorageNode { final List<AbstractStorageNode> children = new ArrayList<AbstractStorageNode>(); public void prettyPrint(String indent, boolean last) {...} // Gives ability to print AST } /** * Node for beginning and end of each path in tree for each pipeline * Node to be stored for source/sink for each column */ public class DataSetStorageNode extends AbstractStorageNode { class WranglerFieldLevelLineage { private enum Type { ADD, DROP, MODIFY, READ, RENAME } class BranchingStepNode { boolean continueUp, continueDown; String directive; Map<String, Integer> upBranches, downBranches; // constructors, toString(), ... } private final String name; private final long startTime; private Set<String> startColumns; private final Set<String> endColumns; private final longtransient Set<String> timecurrentColumns; private final StringMap<String, dataSet,List<BranchingStepNode>> columnlineage; // Constructor, getters, toString } /** * Node for storing wrangler node changes * Node for storing column changes */ public class ColumnStorageNode extends AbstractStorageNode { private final String wrangler, column; // Constructor, getters, toString } /** * Node for storing directives */ public class StepStorageNode extends AbstractStorageNode { private final String directive; // Constructor, getters, toString } |
LineageStorageTree:
LineageStorageTree is a Java class that transforms many instances of FieldLevelLineage into ASTs of AbstractStorageNodes for storage.
One instance of this type should be created per pipeline. Created from list of JSONs of FieldLevelLineage and some graph representation of the pipeline
Code Block | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|
| ||||||||||
/** * DataType for storing directives for each column * Creates trees to be stored * Instance of this type will be created by worker thread from many JSON FieldLevelLineages */ public class LineageStorageTree { private final long time; private final Map<String, FieldLevelLineage> wranglers; private final Map<String, List<String>> forward, backward; private final List<DataSetStorageNode> readingTrees, lineageTrees; // Instance of DAG instead of forward would simplify this constructor greatly public LineageStorageTree(List<String> fll, Map<String, List<String>> forward) { FieldLevelLineage temp; DataSetStorageNode storeNode; this.time = serialize(fll.get(0)).getStartTime(); this.wranglers = new HashMap<String, FieldLevelLineage>(fll.size()); this.forward = forward; this.backward = new HashMap<String, List<String>>(forward.size()); this.readingTrees = new ArrayList<DataSetStorageNode>(); this.lineageTrees = new ArrayList<DataSetStorageNode>(); for (String json : fll) { temp = serialize(json); this.wranglers.put(temp.getName(), temp); } for (String key : this.forward.keySet()) { this.backward.put(key, new ArrayList<String>()); } for (Map.Entry<String, List<String>> entry : this.forward.entrySet()) { for (String key : entry.getValue()) { this.backward.get(key).add(entry.getKey()); } } for (Map.Entry<String, List<String>> entry : this.forward.entrySet()) { if (this.backward.get(entry.getKey()).isEmpty() && !entry.getValue().isEmpty()) { for (String column : this.wranglers.get(entry.getValue().get(0)).getStartColumns()) { storeNode = new DataSetStorageNode(entry.getKey(), column, this.time); this.readingTrees.add(storeNode); init(storeNode, true); } } } for (Map.Entry<String, List<String>> entry : this.backward.entrySet()) { if (this.forward.get(entry.getKey()).isEmpty() && !entry.getValue().isEmpty()) {public FieldLevelLineage(String name, String[] columnNames) {...} // getters // helpers for store public void store(ParseTree tree) { /** * Go through tree one directive at a time * For each column associated with the directive get label * Store directive for this column correctly into lineage based on label */ } // toString() } |
Parse Tree should contain all columns affected per directive.
Labels:
- All columns should be labeled one of: {Read, Drop, Modify, Add, Rename}
- Read: column's name or values are read. Including reading values and modifying. ie. "filter-rows-on..."
- Drop: column is dropped
- Add: column is added
- Rename: column's name is replaced with another name
- Modify: column's values altered and doesn't fit in any of the other categories, ie. "lowercase"
For Read, Drop, Modify, and Add the column and associated label should be something like -> Column: Name, Label: add.
For Rename the column and associated label should be something like -> Column: body_5 DOB, Label: rename. // Basically some way of having both names, currently using a space. Old/new for rename. Swap example: A B, Label: rename and B A, Label: rename. Both the old & new names should be part of the information being sent somehow.
For Read, Modify, and Add there is another option; instead of column name can return {"all columns", "all columns minus _ _ _ _ ", "all columns formatted %s_%d"}, along with label. ie. Column: "all columns minus body", Label: add. "all columns" refers to all columns present in dataset after execution of this step. Format string only accepts %s and %d.
For Rename, and Drop this option is not available; must explicitly return name of all columns involved.
**Assumption (can be changed): In ParseTree all columns should be in order of impact. ie. If directive is "copy A A_copy". "A, Label: read" should be before "A_copy, Label: add".
Algorithm visual: Example wrangler application --> lineage --> ASTs for each column
FieldLevelLineageStorageNode:
FieldLevelLineageStorageNode is a Java interface that represents an element being stored in HBase.
These nodes are created and connected in an AST format by FieldLevelLineageStorageGraph.
Code Block | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|
| ||||||||||
/**
* <p>A FieldLevelLineageStorageNode represents a node of field-level lineage information.</p>
*/
public interface FieldLevelLineageStorageNode {
/**
* @return the ID of the pipeline
*/
ProgramRunId getPipeline();
/**
* @return the name of the stage
*/
String getStage();
/**
* @return the name of the field
*/
String getField();
}
|
FieldLevelLineageStorageGraph:
FieldLevelLineageStorageGraph is a Java class that transforms many instances of FieldLevelLineage into a graph of FieldLevelLineageStorageNodes for storage.
One instance of this type should be created per pipeline. Created from many instances of FieldLevelLineage, some graph representation of the pipeline, a mapping between reference names and stage names for sources/sinks, and ProgramRunId.
Code Block | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|
| ||||||||||
import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.HashBasedTable; import com.google.common.collect.ListMultimap; import com.google.common.collect.Table; import java.util.HashMap; import java.util.List; import java.util.Map; public final class FieldLevelLineageStorageGraph { private final ProgramRunId pipelineId; private final PipelinePhase pipeline; // Or another type of graph of the pipeline private final Map<String, FieldLevelLineage> stages; private final Map<String, String> stageToDataSet; private final Map<FieldLevelLineageStorageNode, FieldLevelLineageStorageNode> nodeRetriever; private final Table<String, String, DataSetFieldNode> history; // Storage private final Map<FieldStepNode, TransformStep> stepInformation; // Storage private final ListMultimap<FieldLevelLineageStorageNode, FieldLevelLineageStorageNode> pastEdges; // Storage private final ListMultimap<FieldLevelLineageStorageNode, FieldLevelLineageStorageNode> futureEdges; // Storage public FieldLevelLineageStorageGraph(ProgramRunId PipelineId, PipelinePhase pipeline, for (String column : this.wranglers.get(entry.getValue().get(0)).getEndColumns()) { storeNode =Map<String, new DataSetStorageNode(entry.getKey()FieldLevelLineage> stages, columnMap<String, String> this.time);stageToDataSet) { this.pipelineId = PipelineId; this.lineageTrees.add(storeNode);pipeline = pipeline; this.stages = stages; init(storeNode, false)this.stageToDataSet = stageToDataSet; this.nodeRetriever = new HashMap<>(); } this.history = HashBasedTable.create(); } this.stepInformation }= new HashMap<>(); } privatethis.pastEdges static FieldLevelLineage serialize(String json) {= ArrayListMultimap.create(); this.futureEdges return new Gson().fromJson(json, FieldLevelLineage.class= ArrayListMultimap.create(); } private void init(DataSetStorageNode storageNode, boolean read// helpers for make() private void make() {...} // helpersmakes forthe init()graph } |
Visual:
New REST APIs
Path | Method | Description | Response |
---|---|---|---|
/v3/namespaces/{namespace-id}/datasets/{dataset-id}/columns/{column-id}/lineage?start=<start-ts>&end=<end-ts>&maxLevels=<max-levels> | GET | Returns list of directives applied to the specified column in the specified dataset | 200: Successful Response TBD, but will contain a Tree representation |
/v3/namespaces/{namespace-id}/streams/{stream-id}/columns/{column-id}/lineage?start=<start-ts>&end=<end-ts>&maxLevels=<max-levels> | GET | Returns list of directives applied to the specified column in the specified stream | 200: Successful Response TBD, but will contain a Tree representation |
CLI Impact or Changes
TBD
UI Impact or Changes
- Option 2: Add interface to metadata table when viewing dataset to see lineage of columns possibly by clicking on column: -> When a column is clicked on will look something like:
-> - Option 2: Show all columns at once directly on lineage tab from clicking on dataset, tab between field level and dataset level:
Security Impact
Should be none, TBD
Impact on Infrastructure Outages
Storage in HBase. Key being dataset name + field name + boolean(read/lineage). For each pipeline a node is added to this key; Impact TBD.
Test Scenarios
Test ID | Test Description | Expected Results |
---|---|---|
1 | Tests all directives | All Step subclasses should be properly parsed containing all correct columns with correct labels |
2 | Multiple datasets/streams | Lineages are correctly shown between different datasets/streams |
3 | Tests all store() | FieldLevelLineage.store() always correctly stores step |
Releases
Release 4.3.0
Release 4.4.0
Related Work
- Fixing TextDirectives and parsing of directives in general