Metadata and Data Discovery

Overview

This is a design document for the Metadata and Data Discovery feature in CDAP. Metadata and Data Discovery will allow CDAP applications and datasets to be annotated with both business and system metadata. This will enable users to:

  1. Track lineage and provenance of datasets
  2. Discover datasets and applications based on Metadata

Requirements

IdRequirementPriorityDescription/Comments
R1CDAP should provide Java API to annotate metadataH
  • Java API is only for business metadata
  • Only String data-type will be supported for values
R2CDAP should have the ability to distinguish the type of metadataH
  • Scopes : SYSTEM, USER

  • If added using Java API, then it is Scope.USER

  • If added using REST API, then it is Scope.USER
  • If added via a Program (workflow token, data quality, etc), it is Scope.SYSTEM

  • If added by the CDAP System, it is Scope.SYSTEM

  • Is Schema part of metadata? What about other Dataset properties? (question)

R3Metadata can be associated with Dataset (All Metadata) and as well Application (Business Metadata and Generic)H
  • What System Metadata can be tagged on an Application?

    • creation time
    • created by
    • last update time
    • last updated by
    • ... any more?
  • What System Metadata can be tagged on a Dataset?
    • last updated by (program)
    • last update time
    • ... any more?
  • Is this for lineage tracking purposes only? Or is this general purpose? If it is for general purpose store and retrieval, then we may have to alter storage.
R4User should have the ability to annotate business metadata with Application or Datasets using REST API / CLI and Programmatically using APIH 
R5User should have the ability to retrieve all of business metadata using REST API / CLIH
  • We should allow retrieval of System/Generic metadata as well.

R6User should have the ability to search a Dataset and Application based on Generic MetadataH
  • free text search or exact match

  • Do we need an auto-complete interface

  • Solr, Elastic Search, Apache Blur, Cloudera Search (index HBase), Lily

R7User should have the ability to search a Dataset and Application based on Business MetadataH 
R8User should have the ability to view metadata in UI in Dataset View and for Application in Application View.H 
R9User should have the ability to distinguish the type of metadata he is viewing on the UI.H 
R10User should have ability to search an Application or Dataset by their Metadata on the UIH 
R11CDAP system should automatically annotate dataset with machine generated metadataM

Enumerate:

  • creation time
  • created by
  • last update time
  • last updated by
  • Any more? All updates? Last N updates?

 

R12User should have the ability to search Dataset based on machine-generated metadata – specifically we start with field names rather than values.M 
R13User should have the ability to search Dataset based on the schema fields associated with Dataset.M 
R14User should have the ability to publish business tags on notification systemH 
R15User should have the ability to specify a filter on machine-generated metadata to be associated with dataset.LEnumerate. What kind of filters?

User Stories

In no specific order...

IdDescriptionRequirements fulfilled
U1As a user, I should be able to tag applications, programs, streams and datasets with business metadata at deploy timeR1
U2As a user, I should be able to tag application, programs, streams and datasets with business metadata after deploymentR4
U3As a user, I should be able to view business and system metadata separatelyR2, R3, R6
U4As a user, I should be able to view metadata and runtime information that the CDAP system has automatically tagged applications and datasets with.R11
U5As a user, I should be able to retrieve all the business and system metadata associated with applications and datasetsR5, R8
U6As a user, I should be able to view the lineage and provenance for a dataset in a given time period 
U7As a user, I should be able to discover/search datasets based on metadata - business or systemR6, R7, R10
U8As a user, I should be able to discover/search datasets based on fields in the dataset schemaR13
U9As a user, I should be able to publish metadata to the notification systemR14
U10As a user, I should be able to specify filters on the kind of metadata that is automatically added to datasets and applicationsR15

User Interactions

Interaction 1

  1. User goes to the metadata discovery page
  2. User types in a metadata keyword in the search box
  3. User gets a list of all possible metadata keys in the search results
  4. User selects (clicks) a single metadata key
  5. User gets a list of all datasets and applications that were annotated with the selected metadata keys
  6. User selects a dataset from the above list
  7. User is taken to the dataset detail page
  8. User has the option to view the lineage of the dataset, for a given time interval

Interaction 2

  1. User is shown all available metadata keys
  2. User follows Interaction 1 from point 4 onwards

Interaction 3

  1. User is on a page showing a lineage diagram for a dataset for the specified time interval
  2. User clicks on a program in the lineage diagram
  3. User is shown all runs for the selected program in the specified time interval
  4. User selects a runId
  5. User is shown all metadata for the selected run

Interaction 4

  1. User is on a dataset/application/program detail page
  2. User clicks a button to view metadata
  3. User is shown all metadata for the selected dataset/application/program/runId

Scope (3.2)

In Scope:

Metadata Annotation and Retrieval

  • Allow users to update and retrieve Scope.USER Metadata through REST endpoints
  • Automatically associate application, runId with dataset access
  • Publish business metadata to Kafka topic.

Lineage

  • View lineage on a dataset through REST API

Search and Discovery

  • Allow prefix-searching for a single metadata key or value. (Either just key, or a combination of key and value).

Out of Scope 

Metadata Annotation and Retrieval

  • No Java Clients/CLI support
  • Annotating dataset/stream accesses as read/write is out of scope. 
  • No support for annotating applications or datasets programmatically

Lineage

  • Lineage will not be directional.

Search and Discovery

  • No support for boolean expressions.
  • No free-text search.

Architecture

The Metadata Service will be implemented as a separate service that runs in Yarn. One of the primary reasons for this is that it cuts across AppFabric and DataFabric. This will also allow us to scale it independently. This service will expose some REST APIs for interaction, most of which will be public (routable) while some may be private (non-routable). This service will allow the following kinds of interactions:

  1. Direct interactions from users for setting and retrieving metadata for applications and datasets
  2. Interactions from AppFabric/Dataset Service during (the start/end of) program runs for annotating that a program accessed a dataset (as input or output).

This service will also be responsible for computing and serving the lineage of a dataset in a specified time interval.

An additional service may (TBD) also be exposed to fulfill the requirement of metadata search. This service may be a YARN service that runs Solr or Lucene to index metadata.

Design

Metadata System

Metadata Types

The metadata system supports the following kinds of metadata:

  1. Business (User) Metadata: Metadata that users annotate, using either the Java API or the REST API.
  2. System Metadata: Metadata that is annotated by the CDAP system. This may include two kinds of metadata:
    1. Generic Metadata: Metadata that is added by default to every application/dataset - created_by, creation_time, last_updated_by, last_update_time (Any more (question))
    2. Runtime Metadata: Metadata that contains runtime information like Workflow Token, Data Quality Stats, Program/Dataset Runtime Arguments, Preferences. We may not store this data in the metadata service, but may just reference from the current MDS with a runId.

System Metadata Updates

Updates to system metadata will happen at four different times:

  1. App/Dataset Deployment: To update System Metadata
  2. Program start: To update the history table for the dataset access with the run
  3. During Program Run: To update dataset accesses for programs that update datasets (that are not set as input or output datasets) during their run 
  4. Program end: To update the history table for the dataset access with the end time (and the metadata values?)

We will use Solr or Lucene as an external search and indexing engine. An overview of the investigation of various available options can be found at External Search and Indexing Engine Investigation.

CDAP Search System Service

To support Solr for fault tolerant mode there will be new CDAP system service to act as adapter for the search and index engine in YARN.

The new proposed search system service will have a primary role to manage and act as proxy between CDAP master and the external search and index engine.

Indexing and Searching Metadata Records Using IndexedTable

In the 3.2 release, we will support search on metadata records using the key*=value* and key* format rather than free text search.

To do this, we will use IndexedTable instance to store the metadata records which allow us to get another table act as inverted index of the metadata records.

Store

The metadata system will be composed of the following three tables:

Business Metadata Table

This table contains the most recent metadata associated with a dataset or an application. It contains no historical data. It's purpose is to optimally serve the annotate and retrieve APIs for business metadata. 

RowKey:

<target-type>
<target-id>
<metadata-type>
<key>

where:

<target-type>APP/PROGRAM/DATASET/STREAM

<target-id>: app-id(ns+app) / program-id(ns+app+pgtype+pgm) / dataset-id(ns+dataset)/stream-id(ns+stream)

<metadata-type>: "p" for a property (key-value) record, "t" for a tag record

<key>: the key

Col:

Key:ValueValue

We store both Key:Value and Value in the column to make indexing easier using an IndexedTable. We will be able to support prefix searches with this approach as well.

When an app is deleted, its business metadata from this table is also deleted

Lineage Table

This table contains historical information of program runs and the datasets that they accessed for read or write. It serves like an audit-trail. The primary goal of this table is to be able to compute the lineage for a dataset.

RowKey:

This table can have six kinds of row-keys

Dataset access from a CDAP Program

d<dataset-id><inverted-start-time>p<run-id><access-type>
p<run-id><inverted-start-time>d<dataset-id><access-type>

where: 

dIdentifies a dataset access

<dataset-id>The dataset id containing ns+dataset

<inverted-start-time>: The start time of the run; inverted because HBase0.96 does not support reverse scan.

pIdentifies that this access was made from a CDAP Program

<run-id>The run id containing ns+app+program+runId

<access-type>: r/w/rw

 

Stream access from a CDAP Program

s<stream-id><inverted-start-time>p<run-id><access-type>
p<run-id><inverted-start-time>s<stream-id><access-type>

where: 

sIdentifies a stream access

<stream-id>The stream id containing ns+stream

<inverted-start-time>: The start time of the run; inverted because HBase0.96 does not support reverse scan.

pIdentifies that this access was made from a CDAP Program

<run-id>The run id containing ns+app+program+runId

<access-type>r/w/rw


Stream access from an external entity

s<stream-id><inverted-start-time>e<external-entity-id><access-type>
e<external-entity-id><inverted-start-time>s<stream-id><access-type>

where: 

sIdentifies a stream access

<stream-id>The stream id containing ns+stream

<inverted-start-time>: The start time of the run; inverted because HBase0.96 does not support reverse scan.

eIdentifies that this access was made from an external entity

<external-entity-id>The id of the external entity, which is a combination of the source query param specified in the modified Stream Write REST APIs below and an access timestamp (eg. day timestamp). It is okay to have timestamp in the external entity id, since we will not be computing lineage for external entities.

<access-type>r/w/rw

Columns:

<stop-time><metadata>

<stop-time> - Stop time of the program in ms, -1 if program is still running

<metadata> - JSON containing business and system metadata, even though storing system metadata could mean a duplication. This will be null for dataset and stream rows. This will not contain run information like runtime arguments, workflow token etc. These will be looked up from the run record table when required.

Lineage Computation

Lineage can be computed for streams and datasets using the lineage table. Lineage computation is a breadth first search on the lineage table.

Given a dataset dataset1 and a time range time1 and time2, the following scan and filter will compute one iteration of the breadth first search.

Scan

start row: d-dataset1-inverted-time2

stop row: d-dataset1-inverted-min(running-program-start-times)

Filter

stop time == -1 OR stop time >= time1

This scan will satisfy the following condition - (time1 <= stop-time < time2) OR (start-time < time2 AND (stop-time == -1 OR stop-time >= time2))

min(running program start times) <= start time < time2 AND (stop time == -1 OR stop time >= time1)

inverted min(running program start times) > inverted start time >= inverted time2 AND (stop time == -1 OR stop time >= time1)

 

The above scan will give us all programs that accessed dataset dataset1. We can continue the scans with the programs to go further down the search. Note that each node in the graph will need a separate scan to move forward in the search.

 

(question) When is metadata stored in this table?

  • Every time a dataset access happens - It will make querying expensive
  • First access - May be less performant, since it will include a read + write
  • Latest access - Easy to implement, and query, but may show inconsistent metadata for the same program run over time, if the metadata is updated using a REST API call during the run.

(question) Should we store some notion of app spec/app version at the time of the run, since there is no way to get that information, given a run-id today?

When an app is deleted, its historical data is not deleted.

In the lineage table, for 3.2, the access type will always be null

REST APIs

The following REST APIs will be exposed from the metadata service.

PurposeAPIBodyResponseRoutableCommentsApproved?
Annotate business metadata for datasets
POST /v3/namespaces/{namespace-id}/datasets/{dataset-id}/metadata/properties
{
  "key1" : "value1",
  "key2" : "value2",
  //...
}

200: Successful

404: Dataset not found in specified namespace

Yes
  • New keys are added.
  • Existing keys are updated.
  •  

 

Annotate business metadata for apps
POST /v3/namespaces/{namespace-id}/apps/{app-id}/metadata/properties
{
  "key1" : "value1",
  "key2" : "value2",
  //...
}

200: Successful

404: App not found in specified namespace

Yes
  • New keys are added.
  • Existing keys are updated.
  •  

 

Annotate business metadata for programs
POST /v3/namespaces/{namespace-id}/apps/{app-id}/{program-type}/{program-id}/metadata/properties
{
  "key1" : "value1",
  "key2" : "value2",
  //...
}

200: Successful

404: Program not found in specified namespace

Yes
  • New keys are added.
  • Existing keys are updated.
  •  

 

Annotate business metadata for streams
POST /v3/namespaces/{namespace-id}/streams/{stream-id}/metadata/properties
{
  "key1" : "value1",
  "key2" : "value2",
  //...
}

200: Successful

404: Stream not found in specified namespace

Yes
  • New keys are added.
  • Existing keys are updated.
  •  

 

Record run-level dataset access metadata

Note: will be part of client call.

POST /v3/metadata/history
{
  "programRun": {                                 "namespace" : "ns1",
      "app": "app1",
      "programType": "MAPREDUCE",
      "name": "pgm1",
      "runId": "r1"
  },
  "data": {
      "namespace": "ns1",
      "type": [STREAM|DATASET],
      "name": "d1"
  },  
  "accessType": "in",
  "metadata": {
    "user": {
      "key1": "value1",
      ...
    },
    "system": {
       "k1": "v1",
        ...
    }
  }
}

200: Successful

404: App/dataset not found in specified namespace (proper error message)

No
  • Update behavior TBD
  • How do you fit streams in here
  •  
Retrieve business metadata for datasets
GET /v3/namespaces/{namespace-id}/datasets/{dataset-id}/metadata/properties
N/A

200: Successful

404: Dataset not found in specified namespace

{
  "key1" : "value1",
  "key2" : "value2",
  //...
}
Yes 
  •  
Retrieve business metadata for apps
GET /v3/namespaces/{namespace-id}/apps/{app-id}/metadata/properties
N/A

200: Successful

404: App not found in specified namespace

{
  "key1" : "value1",
  "key2" : "value2",
  //...
}
Yes 
  •  
Retrieve business metadata for programs
GET /v3/namespaces/{namespace-id}/apps/{app-id}/{program-type}/{program-id}/metadata/properties
N/A

200: Successful

404: Program not found in specified namespace

{
  "key1" : "value1",
  "key2" : "value2",
  //...
}
Yes 
  •  
Retrieve business metadata for streams
GET /v3/namespaces/{namespace-id}/stream/{stream-id}/metadata/properties
N/A

200: Successful

404: Stream not found in specified namespace

{
  "key1" : "value1",
  "key2" : "value2",
  //...
}
Yes 
  •  
Retrieve system metadata for datasets
GET /v3/namespaces/{namespace-id}/datasets/{dataset-id}/metadata/properties/system
N/A

200: Successful

404: Dataset not found in specified namespace

{
  "key1" : "value1",
  "key2" : "value2",
  //...
}
Yes
  • Not supported in 3.2
  • Why is scope not a query param?.
  •  
Retrieve system metadata for apps
GET /v3/namespaces/{namespace-id}/apps/{app-id}/metadata/properties/system
N/A

200: Successful

404: App not found in specified namespace

{
  "key1" : "value1",
  "key2" : "value2",
  //...
}
Yes
  • Not supported in 3.2
  •  
Retrieve system metadata for programs
GET /v3/namespaces/{namespace-id}/apps/{app-id}/{program-type}/{program-id}/metadata/properties/system
N/A

200: Successful

404: Program not found in specified namespace

{
  "key1" : "value1",
  "key2" : "value2",
  //...
}
Yes
  • Not supported in 3.2
  •  
Retrieve system metadata for streams
GET /v3/namespaces/{namespace-id}/streams/{stream-id}/metadata/properties/system
N/A

200: Successful

404: Stream not found in specified namespace

{
  "key1" : "value1",
  "key2" : "value2",
  //...
}
Yes
  • Not supported in 3.2
  •  
Delete all business metadata for datasets
DELETE  /v3/namespaces/{namespace-id}/datasets/{dataset-id}/metadata/properties

 

N/A

200: Successful

404: Dataset not found in specified namespace

Yes 
  •  
Delete selected key from business metadata for datasets
DELETE  /v3/namespaces/{namespace-id}/datasets/{dataset-id}/metadata/properties/{key}

 

N/A

200: Successful

404: Dataset not found in specified namespace

Yes 
  •  
Delete all business metadata for apps
DELETE  /v3/namespaces/{namespace-id}/apps/{app-id}/metadata/properties

 

 

200: Successful

404: App not found in specified namespace

Yes 
  •  
Delete selected key from business metadata for apps
DELETE  /v3/namespaces/{namespace-id}/apps/{app-id}/metadata/properties/{key}

 

 

200: Successful

404: App not found in specified namespace

Yes 
  •  
Delete all business metadata for programs
DELETE  /v3/namespaces/{namespace-id}/apps/{app-id}/{program-type}/{program-id}/metadata/properties

 

 

200: Successful

404: Program not found in specified namespace

Yes 
  •  
Delete all business metadata for programs
DELETE  /v3/namespaces/{namespace-id}/apps/{app-id}/{program-type}/{program-id}/metadata/properties/{key}

 

 

200: Successful

404: Program not found in specified namespace

Yes 
  •  
Delete all business metadata for streams
DELETE  /v3/namespaces/{namespace-id}/streams/{stream-id}/metadata/properties

 

 

200: Successful

404: Stream not found in specified namespace

Yes 
  •  
Delete selected key from business metadata for streams
DELETE  /v3/namespaces/{namespace-id}/streams/{stream-id}/metadata/properties/{key}

 

 

200: Successful

404: Stream not found in specified namespace

Yes 
  •  
Search Datasets containing business metadata
GET /v3/namespaces/{namespace-id}/metadata/search?query=term&target=dataset

N/A

200: Successful

["dataset1", "dataset2"]
Yes
  • Only prefix search supported in 3.2.
  • Supported formats:
    • Value Prefix
    • Key:Value Prefix
  •  
Search Apps containing business metadata
GET /v3/namespaces/{namespace-id}/metadata/search?query=term&target=app

N/A

200: Successful

["app1", "app2"]
Yes
  • Only prefix search supported in 3.2.
  • Supported formats:
    • Value Prefix
    • Key:Value Prefix
  •  
Search Programs containing business metadata
GET /v3/namespaces/{namespace-id}/metadata/search?query=term&target=program

N/A

200: Successful

["app1", "app2"]
Yes
  • Only prefix search supported in 3.2.
  • Supported formats:
    • Value Prefix
    • Key:Value Prefix
  •  
Search Streams containing business metadata
GET /v3/namespaces/{namespace-id}/metadata/search?query=term&target=stream

N/A

200: Successful

["stream1", "stream2"]
Yes
  • Only prefix search supported in 3.2.
  • Supported formats:
    • Value Prefix
    • Key:Value Prefix
  •  
View Dataset Lineage
GET /v3/namespaces/{namespace-id}/datasets/{dataset-id}/lineage?start=<start-ts>&end=<end-ts>&maxLevels=<max-levels>
N/A

200: Successful

Response TBD, but will contain a DAG representation

Yes 
  •  
View Stream Lineage
GET /v3/namespaces/{namespace-id}/streams/{stream-id}/lineage?start=<start-ts>&end=<end-ts>&maxLevels=<max-levels>
N/A

200: Successful

Response TBD, but will contain a DAG representation

Yes 
  •  
View Run Id Accesses
GET /v3/namespaces/{namespace-id}/apps/{app-id}/{program-type}/{program-id}/runs/{run-id}/metadata/accesses
N/A

200: Successful

Response Body TBD

Yes
  • TODO: Figure out a better name
  • May not be part of 3.2
  •  
View Dataset Lineage after specified stage
POST /v3/namespaces/{namespace-id}/datasets/{dataset-id}/lineage/next

TODO

Note: Query params of /lineage will become part of POST body.

 Yes
  • Will not be supported in 3.2
  •  
View Stream Lineage after specified stage
POST /v3/namespaces/{namespace-id}/streams/{stream-id}/lineage/next

TODO

Note: Query params of /lineage will become part of POST body.

 Yes
  • Will not be supported in 3.2
  •  
Add tags to a dataset
POST /v3/namespaces/{namespace-id}/datasets/{dataset-id}/metadata/tags
["tag1", "tag2"]

200: Successful

404: Dataset not found in specified namespace

Yes

 

  •  

 

Add tags to an app
POST /v3/namespaces/{namespace-id}/apps/{app-id}/metadata/tags
["tag1", "tag2"]

200: Successful

404: App not found in specified namespace

Yes

 

  •  

 

Add tags to a program
POST /v3/namespaces/{namespace-id}/apps/{app-id}/{program-type}/{program-id}/metadata/tags
["tag1", "tag2"]

200: Successful

404: Program not found in specified namespace

Yes

 

  •  

 

Add tags to a stream
POST /v3/namespaces/{namespace-id}/streams/{stream-id}/metadata/tags
["tag1", "tag2"]

200: Successful

404: Stream not found in specified namespace

Yes

 

  •  

 

Retrieve dataset tags
GET /v3/namespaces/{namespace-id}/datasets/{dataset-id}/metadata/tags
 N/A
["tag1", "tag2"]
Yes  
  •  
Retrieve app tags
GET /v3/namespaces/{namespace-id}/apps/{app-id}/metadata/tags
N/A 
["tag1", "tag2"]
Yes  
  •  
Retrieve program tags
GET /v3/namespaces/{namespace-id}/apps/{app-id}/{program-type}/{program-id}/metadata/tags
 N/A
["tag1", "tag2"]
Yes  
  •  
Retrieve stream tags
GET /v3/namespaces/{namespace-id}/streams/{stream-id}/metadata/tags
N/A 
["tag1", "tag2"]
Yes  
  •  
Remove all dataset tags
DELETE /v3/namespaces/{namespace-id}/datasets/{dataset-id}/metadata/tags

N/A

 

200: Successful

404: Dataset not found in specified namespace

Yes

 

  •  

 

Remove specified dataset tag
DELETE /v3/namespaces/{namespace-id}/datasets/{dataset-id}/metadata/tags/{tag}

N/A

 

200: Successful

404: Dataset not found in specified namespace

Yes

 

  •  

 

Remove all app tags
DELETE /v3/namespaces/{namespace-id}/apps/{app-id}/metadata/tags

N/A

 

200: Successful

404: App not found in specified namespace

Yes  
  •  
Remove specified app tag
DELETE /v3/namespaces/{namespace-id}/apps/{app-id}/metadata/tags/{tag}

N/A

 

200: Successful

404: App not found in specified namespace

Yes  
  •  
Remove all program tags
DELETE /v3/namespaces/{namespace-id}/apps/{app-id}/{program-type}/{program-id}/metadata/tags

N/A

 

200: Successful

404: Program not found in specified namespace

Yes  
  •  
Remove specified program tag
DELETE /v3/namespaces/{namespace-id}/apps/{app-id}/{program-type}/{program-id}/metadata/tags/{tag}

N/A

 

200: Successful

404: Program not found in specified namespace

Yes  
  •  
Remove all stream tags
DELETE /v3/namespaces/{namespace-id}/streams/{stream-id}/metadata/tags

 

 

200: Successful

404: Stream not found in specified namespace

Yes  
  •  
Remove specified stream tag
DELETE /v3/namespaces/{namespace-id}/streams/{stream-id}/metadata/tags/{tag}

 

 

200: Successful

404: Stream not found in specified namespace

Yes  
  •  
Remove all business metadata for a dataset
DELETE /v3/namespaces/{namespace-id}/datasets/{dataset-id}/metadata

 

 

200: Successful

404: Dataset not found in specified namespace

Yes Removes all properties and tags from a dataset. Will not happen in 3.2
  •  
Remove all business metadata for an app
DELETE /v3/namespaces/{namespace-id}/apps/{app-id}/metadata

 

 

200: Successful

404: App not found in specified namespace

Yes Removes all properties and tags from an app. Will not happen in 3.2
  •  
Remove all business metadata for a program
DELETE /v3/namespaces/{namespace-id}/apps/{app-id}/{program-type}/{program-id}/metadata

 

 

200: Successful

404: Program not found in specified namespace

Yes Removes all properties and tags from a program. Will not happen in 3.2
  •  
Remove all business metadata for a dataset
DELETE /v3/namespaces/{namespace-id}/datasets/{dataset-id}/metadata

 

 

200: Successful

404: Dataset not found in specified namespace

Yes Removes all properties and tags from a dataset. Will not happen in 3.2
  •  
Get all business metadata for a dataset
GET /v3/namespaces/{namespace-id}/datasets/{dataset-id}/metadata

 

 

200: Successful

404: Dataset not found in specified namespace

Yes Retrieves all properties and tags for a dataset. Will not happen in 3.2
  •  
Get all business metadata for an app
GET /v3/namespaces/{namespace-id}/apps/{app-id}/metadata

 

 

200: Successful

404: App not found in specified namespace

Yes Retrieves all properties and tags for an app. Will not happen in 3.2
  •  
Get all business metadata for a program
GET /v3/namespaces/{namespace-id}/apps/{app-id}/{program-type}/{program-id}/metadata

 

 

200: Successful

404: Program not found in specified namespace

Yes Retrieves all properties and tags for a program. Will not happen in 3.2
  •  
Get all business metadata for a stream
GET /v3/namespaces/{namespace-id}/streams/{stream-id}/metadata

 

 

200: Successful

404: Stream not found in specified namespace

Yes Retrieves all properties and tags for a stream. Will not happen in 3.2
  •  

Lineage Output JSON

{
  "start": "1441310434000",
  "end": "1441320599000",
  
  "relations": 
  [
    {
      "data": "stream.default.purchaseStream",
      "program": "flow.default.PurchaseHistory.PurchaseFlow",
      "access": "read",
      "runs": ["283-afsd032-adsf90", "283-0rwedfk-09wrff"],
      "component": ["reader"]
    },
    {
      "data": "flow.default.PurchaseHistory.PurchaseFlow",
      "program": "dataset.default.purchases",
      "access": "write",
      "runs": ["283-afsd032-adsf90", "283-0rwedfk-09wrff"],
      "component": ["collector"]
    },
    {
      "data": "flow.default.PurchaseHistory.PurchaseFlow",
      "program": "dataset.default.frequentCustomers",
      "access": "write",
      "runs": ["283-0rwedfk-09wrff"],
      "component": ["collector"]
    },
    {
      "data": "flow.default.PurchaseHistory.PurchaseFlow",
      "program": "dataset.default.purchases",
      "access": "read",
      "runs": ["283-0rwedfk-09wrff"],
      "component": ["collector"]
    },
    {
      "data": "dataset.default.purchases",
      "program": "mapreduce.default.PurchaseHistory.PurchaseHistoryBuilder",
      "access": "read",
      "runs": ["283-afsd032-adsf90"]
    },
    {
      "data": "mapreduce.default.PurchaseHistory.PurchaseHistoryBuilder",
      "program": "dataset.default.history",
      "access": "write",
      "runs": ["283-99sd032-adsf90", "283-88wedfk-09wrff"]
    },
    {
      "data": "mapreduce.default.PurchaseHistory.PurchaseHistoryBuilder",
      "program": "dataset.default.frequentCustomers",
      "access": "write",
      "runs": ["283-99sd032-adsf90", "283-88wedfk-09wrff"]
    },
    {
      "data": "dataset.default.history",
      "program": "service.default.PurchaseHistory.PurchaseHistoryService",
      "runs": ["283-zsed032-adsf90"]
    }
  ],
    
  "programs":
  {
    "flow.default.PurchaseHistory.PurchaseFlow":
    {
      "id": 
      {
        "namespace": "default",
        "application": "PurchaseHistory",
        "type": "flow",
        "id": "PurchaseFlow"
      }
    },
    "mapreduce.default.PurchaseHistory.PurchaseHistoryBuilder":
    {
      "id": 
      {
        "namespace": "default",
        "application": "PurchaseHistory",
        "type": "mapreduce",
        "id": "PurchaseHistoryBuilder"
      }
    },
    "service.default.PurchaseHistory.PurchaseHistoryService":
    {
      "id": 
      {
        "namespace": "default",
        "application": "PurchaseHistory",
        "type": "flow",
        "id": "PurchaseHistoryService"
      }
    }
  },
  
  "data": 
  {
    "dataset.default.frequentCustomers":
    {
      "id": 
      {
        "namespace": "default",
        "type": "dataset",
        "id": "frequentCustomers"
      }
    },
    "dataset.default.history":
    {
      "id": 
      {
        "namespace": "default",
        "type": "dataset",
        "id": "history"
      }
    },
    "dataset.default.purchases":
    {
      "id": 
      {
        "namespace": "default",
        "type": "dataset",
        "id": "purchases"
      }
    },
    "stream.default.purchaseStream":
    {
      "id": 
      {
        "namespace": "default",
        "type": "stream",
        "id": "purchaseStream"
      }
    }
  }
}

 

Java APIs

Applications

TBD - Out of scope for 3.2

Datasets

The following dataset APIs will be updated to help CDAP to compute lineage.

We will add ability to dataset/streams to programmatically annotate accesses as read/write. Details TBD. This is out of scope for 3.2.

For 3.2, users will not be able to tell if a dataset/stream access is a read, write or read/write. They will only be able to see connections.

UseDataset

The @UseDataset annotation will be extended to accept an accessTypeaccessType can either be IN, OUT  or UNKNOWN to indicate if the dataset is being used as input or output. If a program both reads from and writes to a dataset, it will have both IN and OUT accessType.

getDataset()

The getDataset API will be extended to accept an accessTypeaccessType can either be IN, OUT or UNKNOWN to indicate if the dataset is being used as input or output. If a program both reads from and writes to a dataset, it will have both IN and OUT accessType.

Both these API updates will be backward-compatible. For usages that don't specify accessType, we will presume it to be UNKNOWN

Stream

No Java API changes needed. Whenever a StreamWriter gets a call to write to a stream, we will capture and register the call as an OUT call. Every other Stream interaction becomes an IN event.

Existing REST API changes

Stream

Optionally allow users to specify a query param for stream write (both batch and single enqueue endpoints) REST APIs to identify the source for the stream they are writing to. 

Existing APINew APICommentsApproved?
POST /v3/namespaces/{namespace-id}/streams/{stream-id}
POST /v3/namespaces/{namespace-id}/streams/{stream-id}?source=<source>
  • Additional source QueryParam
  •  
POST /v3/namespaces/{namespace-id}/streams/{stream-id}/async
POST /v3/namespaces/{namespace-id}/streams/{stream-id}/async?source=<source>
 
  • Additional source QueryParam
  •  
POST /v3/namespaces/{namespace-id}/streams/{stream-id}/batch
POST /v3/namespaces/{namespace-id}/streams/{stream-id}/batch?source=<source>
  • Additional source QueryParam
  •  

Even though we accept a source for every stream event enqueue, for the same stream-id and source, we will only write to the Lineage table once per day.

Notifications

Whenever metadata on a CDAP entity (Dataset/Stream/Application/Program) is updated, the metadata system will publish a notification using the Notification System. External systems that want to subscribe to these notifications will have to:

  1. Write a worker that subscribes to these notifications
  2. The worker can take any action that it wants (write to a dataset, send the notification to an external API, write to an external filesystem, start a program, etc) to when a notification is received 

Implementation of the worker is out of scope of this document.

Whenever metadata on a CDAP entity (Dataset/Stream/Application/Program) is added/updated/deleted, the metadata system will publish a notification to a Kafka topic. The Kafka topic name will be configurable. External systems that would like to subscribe to these notifications will have to subscribe to this Kafka topic to receive messages. The content of each such message will be a json representation of the following class:

/*
 * Copyright © 2015 Cask Data, Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */

package co.cask.cdap.proto;

import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;

/**
 * Represents a Metadata change for a given {@link Id.NamespacedId}, including its previous state, the new state after
 * the change occurred, the change itself, and the time that the change occurred.
 */
public final class MetadataChangeRecord {
  private final MetadataRecord previous;
  private final MetadataRecord updated;
  private final MetadataDiffRecord changes;
  private final Id.NamespacedId targetId;
  private final long updateTime;
  private final String updater;

  public MetadataChangeRecord(MetadataRecord previous, MetadataRecord updated, MetadataDiffRecord changes,
                              Id.NamespacedId targetId, long updateTime) {
    this(previous, updated, changes, targetId, updateTime, null);
  }

  public MetadataChangeRecord(MetadataRecord previous, MetadataRecord updated, MetadataDiffRecord changes,
                              Id.NamespacedId targetId, long updateTime, @Nullable String updater) {
    this.previous = previous;
    this.updated = updated;
    this.changes = changes;
    this.targetId = targetId;
    this.updateTime = updateTime;
    this.updater = updater;
  }

  public MetadataRecord getPrevious() {
    return previous;
  }

  public MetadataRecord getUpdated() {
    return updated;
  }

  public MetadataDiffRecord getChanges() {
    return changes;
  }

  public Id.NamespacedId getTargetId() {
    return targetId;
  }

  public long getUpdateTime() {
    return updateTime;
  }

  @Nullable
  public String getUpdater() {
    return updater;
  }

  /**
   * Represents the metadata state for a given {@link Id.NamespacedId} at a point in time
   */
  public static final class MetadataRecord {
    private final Map<String, String> properties;
    private final List<String> tags;

    public MetadataRecord(Map<String, String> properties, List<String> tags) {
      this.properties = properties;
      this.tags = tags;
    }

    public Map<String, String> getProperties() {
      return properties;
    }

    public List<String> getTags() {
      return tags;
    }
  }

  /**
   * Represents the changes between the previous and the new record
   */
  public static final class MetadataDiffRecord {
    private final MetadataRecord additions;
    private final MetadataRecord deletions;

    public MetadataDiffRecord(MetadataRecord additions, MetadataRecord deletions) {
      this.additions = additions;
      this.deletions = deletions;
    }

    public MetadataRecord getAdditions() {
      return additions;
    }

    public MetadataRecord getDeletions() {
      return deletions;
    }
  }
}

 

Caveats/Assumptions

  1. Metadata can only be added to datasets created/updated with the first version containing this feature (3.2). There will not be a tool to allow tagging for historical data.
  2. We will only support Strings as the datatype for metadata values. Other types will not be supported, at least to begin with.
  3. The Usage Dataset has some overlap with the Metadata System. However, at least to begin with, there would be no integration with Usage Dataset. Usage Dataset can be thought of as a purely static representation of dataset usage with no notion of runtime.

CLI

TBD

Questions/TODOs

  1. Find a better term for metadata.
  2. What term is better? USER or BUSINESS?
  3. Should we build metadata as a list of tags or a list of key-value pairs?
    1. It will be stored as key-value pairs in the backend. UI can show it delimiter separated if that is how the mocks are.
  4. How do we store tags so that they are searchable?

 

Created in 2020 by Google Inc.