5.0 Metadata
Checklist
- User Stories Documented
- User Stories Reviewed
- Design Reviewed
- APIs reviewed
- Release priorities assigned
- Test cases reviewed
- Blog post
IntroductionÂ
In CDAP, we want the ability for a user to tag the data, so that data can be identified and utilized.
Goals
Clearly state the design goals/requirements for this featureÂ
User Stories
Metadata user stories are grouped together in the below four categories.
High-level user stories
Pipeline Configuration
- As a CDAP data analyst (pipeline developer), Frank should be able to configure in the UI, what metadata is generated by pipelines
- what meta data is propagated from source to sink and how it is projected
- what metrics are saved as operational meta data for the sink (or the output object in the sink)
- what meta data is converted to a field in every record, for example source filename, ingest timestamp
- ...
Programmatically Defining Metadata
- As a CDAP application/pipeline plugin/data prep directive developer, Dan should be able to programmatically retrieve or add metadata for a given entity through the program context
- As a CDAPÂ application/pipeline plugin/data prep directive developer, Dan should be able to programmatically emit field lineage as meta data
- As a CDAP application/pipeline plugin/data prep directive developer, Dan should be able to programmatically access the "in-flight" metadata for the current program runÂ
- As a developer, when setting a metadata field, Dan should be able to specify programmatically how this field was computed (from what input meta data). For example: field "origin" of file X in sink dataset A was computed from field "id" of source B.
- As a CDAP user, David should be able to add metadata during entity creation through all possible ways of entity creation (apis, code, pipelines) for every possible CDAP entity
- As a CDAP developer, David should be able to access field lineage and metadata provenance programmatically through the program/plugin context
- As a CDAP developer, David should be able to access operational metadata (e.g. metrics from previous stage)
Custom Metadata
- As a CDAP user, David should be able to add metadata to elements inside CDAP which are not entities for example files in a fileset, schema fields in a dataset.
- As a CDAP user, David should be able to use all existing metadata APIs, and programmatic approach to add metadata to any CDAP element irrespective of whether they are CDAP entities or not.
- As a CDAP user, David should be able to see the audit and lineage details of metadata added to CDAP elements and not just CDAP entities.
Discoverability
- As a CDAP user, David should be able to search/discover CDAP elements based on their metadata associated with them.
- As a CDAP user, David should be able to browse the metadata structure/mode in CDAP
Metadata Provenance
- As a CDAP admin, Walter should be able to see a detailed audit trail of all the metadata added to any element whether it's a CDAP entity or not, including who did it, when it happened, and how (what API/Pipeline was used)
- As a CDAP admin, Walter should be able to see audit/lineage of metadata for CDAP elements at a granular level. For example user which added the metadata, if the metadata was generated by the system then the cause of it.
- As a CDAP user, David wants to see metadata provenance visualized in the UIÂ
- As a CDAP user, David wants to retrieve metadata provenance through a REST APIÂ
Integrations
- As a CDAP admin, Walter should be able to import/export metadata
- As a CDAP admin, Walter should be able to configure CDAP metadata to be synced from/to external sources or enterprise metadata repositories.
- As a pipeline plugin developer, David wants to push or pull metadata to or from an external metadata repository.
Actionable Metadata
- As a plugin developer, David wants to take action based on the metadata of a field. For example, validate that a field tagged as "price" is a positive number. Or anonymize all fields tagged as "pii".Â
- As a plugin developer, David wants to take action based on the type of a field. For example, lower-case all String fields.Â
- As a plugin developer, David wants to take action based on the meta data of the input file, for example, validate a checksum.Â
- [As a CDAP operator, I want to define policies for meta-data based actions]
Automatic Tagging
- As a CDAP user, David should be able to do automatic metadata tagging in CDAP based on some predefined rules.
- As a CDAPÂ data analyst (pipeline developer), Frank should be able to propagate metadata from source to sinks in Hydrator pipelines. For example being able to propagate the size of the data in source, checksum, creation data to the sink.
- As a CDAP admin, Walter should be able to access system generated operation metadata. Operational metadata may include things like pipeline which processed a record etc.
Manual tagging: As a member of data ingestion team, I want to manually apply one or more tags to data, so that the data can be identified and utilized eg. PII, sensitive, highly confidential, source system, marketing segmentation, data owner.HDFS Files: apply tag to a file, tag to an attributeHive Table: tag to a schema, tag to a table, tag to a columnHBase Table: tag to a table, tag to a column, tag to a family
Automatic tagging based on some rules: As a member of data ingestion team, I want the data management tool to allocate tags and classification of data based on a predefined rules sets, so that I can understand and utilize the data. (This will most likely fall under rules engine.)
Design
The following design targets two user stories from above which can be summarized as:
- Being able to add metadata to fields of dataset
- Being able to track the lineage of fields of dataset
Metadata Resource Representation Format
One of the pain point of existing metadata APIs is that it only allow metadata to be associated with CDAP entities. This is very restrictive for enterprises who want capability to tag/discover any resources in CDAP (for example field of a schema, file of fileset) which are not CDAP entities.Â
To solve the earlier we will need to support a generic way of specifying resources (entities and nonentities) in CDAP. For this we purpose the following generic way of specifying resources for metadata annotations.Â
A map of string to string will allow user to specify any resource in CDAP and also support existing CDAP entities. For example:
- Existing CDAP entities like a dataset with datasetId myDataset can be specified as:
Map<namespace=myNamespace, dataset=myDataset> - Field 'empName' of dataset 'myDataset' can be specified as:
Map<namespace=myNamespace, dataset=myDataset, field=empName - File 'part01' of a fileset 'myFileset' can be specified as:
Map<namespace=myNamespace, dataset=myFileset, file=part01> - The above free form map allows us to represent any resource in CDAP metadata system irrespective of whether it is present in CDAP or not. For example an external MySQL table can represented as:
Map<database=myDatabase, table=myTable>
/** * Defines a Metadata Resource */ @Beta public final class Resource { private final Map<String, String> details; private Resource(Map<String, String> details) { this.details = details; } public Map<String, String> getDetails() { return details; } public static Builder builder() { return new Builder(); } public static class Builder { private final Map<String, String> details = new LinkedHashMap<>(); public Builder add(String k, String v) { details.put(k, v); return this; } public Resource build() { return new Resource(Collections.unmodifiableMap(details)); } } public static Resource fromEntityId(EntityId entityId) { // converts to EntityId to Metadata Resource } }
Overview of API changes
Our existing metadata API will need to change to allow user to specify the above generic metadata target. All our existing metadata APIs are built around EntityId as target for metadata system. Since EntityId are just a key-value pair with well defined key names they can easily be represented as a Metadata Resource presented above.
For example mapreduce program with ProgramId 'myProgram' can be represented as a map with the following key value pair:
Map<namespace=myNamespace, application=myApplication, appVersion=1.0.0-SNAPSHOT, programName=myProgram, programType=mapreduce>
CDAP internal Metadata APIs will be changed to accept a Resource rather than EntityId. For example the following APIs in MetadataAdmin
void addProperties(NamespacedEntityId namespacedEntityId, Map<String, String> properties) throws NotFoundException, InvalidMetadataException; void addTags(NamespacedEntityId namespacedEntityId, String... tags) throws NotFoundException, InvalidMetadataException; Set<MetadataRecord> getMetadata(NamespacedEntityId namespacedEntityId) throws NotFoundException;
will change to
void addProperties(Resource resource, Map<String, String> properties) throws NotFoundException, InvalidMetadataException; void addTags(Resource resource, String... tags) throws NotFoundException, InvalidMetadataException; Set<MetadataRecord> getMetadata(Resource resource) throws NotFoundException;
In addition to new metadata APIs we will also introduce we will also introduce new utility methods and public APIs which can allow user to add metadata by directly specifying EntityId and/or easily convert an EntityId to Resource for the metadata system.
Program/Plugin Level APIs
Field Level Lineage
Once we have capability to add metadata to fields of dataset we can store field level lineage as metadata of the field. It will be the responsibility of the Plugin/Plugin developer to provide the correct lineage information to CDAP to be recorded through publicly exposed APIs. But the lineage information can easily be stored as metadata of the field using the above mentioned CDAP APIs.
For example if a join plugin reads 'fieldOne' and fieldTwo' from two different datasets 'datasetOne' and 'datasetTwo', joins them and then writes to 'fieldThree' of 'datasetThree'. This lineage can be stored as metadata of the field 'fieldThree' of 'datasetThree' where
Resource: Map<namespace=myNamespace, dataset=datasetThree, field=fieldThree>
Properties: Map<lineage={operationName=join; Operands=[(namespace=myNamespace, dataset=datasetOne, field=fieldOne), (namespace=myNamespace, dataset=datasetTwo, field=fieldTwo)]}>
Although the lineage information is dependent on the runId i.e. pipeline which is generating the data. So the storage should be able to store it by unique <resource, runId> pair.
Storage and Indexing
To be able to record lineage information as metadata we will like to record metadata being emitted by a pipeline at every runID and also be able to index them on basis of runId.Â
Authorization for Metadata
Allowing metadata to be added to CDAP resources (non-entities) opens the question about authorization enforcement (i.e. who can add metadata to these resources). Since these resources are not entities we cannot have policy defined for them as of now.
Even though CDAP resources are not predefined we can depend on the fact that these resources are generally under some CDAP entity. For example schema fields are always associated with a dataset, file in a fileset is always associated with dataset itself. If such a relationship does not exist we can depend on the fact that resources exists under a namespace and we can perform authorization on these entities. In case of external resources which does not even exist under a namespace we can enforce on instanceId if needed.
For example to add metadata to a schema field the user must have privilege on the dataset.
Open Questions
- Schema is not a resource/entity in CDAP. If it's discovered with a metadata associated with it. How is this schema presented to the user in the UI
No special way of displaying schema. Dataset associated with it will be shown. - Given a schema which was discovered through metadata. Does the dataset associated with it needs to be presented to the user.
Yes. - One dataset's schema field can be written/overwritten by different pipeline which computes that field in different ways. Does the different possible lineage information needs to be captured separately for a field.
We need to find out how to store this information. Probably we need to record runId associated with every record and lineage associated with every runID. - Since a field can be written/overwritten by different pipelines which computes the information in different ways. It is possible that out of 10 record 6 records value was overwritten by the latest run of another pipeline, say pipeline B, where the other 4 still have values which was written by pipeline A. Does individual values need to store which lineage computed them?
See above. - Does the lineage history needs to be tracked?
Lineage history is not relevant. - How does metadata for schema applied to external sinks (dataset) which CDAP does not know about like kudu table?
Associated with external datasets. - How does discovery works in above?
? - Recording lineage at file level
- What are the different possibilities of search?
- Do we need to support mathematical operators such as >, <, <= etc. In this case the data needs to be treated as numbers. Does the user need to specify the type of metadata being added.
- Do we need to support relational operator in search queries. For example: List all datasetsÂ
- Metadata now has class/type (business, operational, technical) do we need capabilities to filter metadata on this?Â
- How are resources like files, partition etc which are not cdap entities and cdap does not know about them are presented in UI when discovered through metadata.Â
To be designed - Does the metadata history need to be periodically cleaned up to save space
Ideally should be and the duration should be configurable - What are the goal for metadata history to calculate difference
?
Option 1: Allow tagging operation only from within the program lifecycle methods - initialize and destroy.Â
/** * This interface used by CDAP programs to tag certain entities. */ public interface Tagger { Â /** * Attach the specified set of tags to the given entity. * This method can be used when the user wants to attach the specified set of tags * to a Table for example, where Entity represents the Table dataset. */ void tag(Entity entity, Set<String> tags); Â /** * For a given entity and key add the specified set of tags. * This method can be used when user wants to attach the specified set of tags * to a more granular level within the entity. For example in order to attach the * tags for column names "PII" in the Table dataset, this method can be used with * entity representing the Table dataset and key is "PII". Same method can be used * to tag a column and column families as well, where the key will represent the name * of the column or column family. */ void tag(Entity entity, String key, Set<String> tags); }
RuntimeContext can extend this interface to make these methods available to the lifecycle methods of the program.
// Sample usage of the methods. public class PurchaseHistoryBuilder extends AbstractMapReduce { Â public void initialize() throws Exception { MapReduceContext context = getContext(); Job job = context.getHadoopJob(); job.setReducerClass(PerUserReducer.class); context.addInput(Input.ofDataset("purchases"), PurchaseMapper.class); // Purchases is transactional data and contains cost information context.tag(Entity.name("purchases").ofType(Type.Dataset), Arrays.asList("transactional data", "cost")); / UserId is PII field context.tag(Entity.name("purchases").ofType(Type.Dataset), "UserId", Arrays.asList("PII")); context.addOutput(Output.ofDataset("history")); } }
Pros:
- Design is simpler.
- Low impact on the performance during program execution.
Cons:
- Bit restrictive since tagging happens in the lifecycle methods of the program and not for each individual record.
Option 2: Allow tagging operation from program lifecycle methods as well as from the executor containers such as map and reduce tasks and spark executors.
Similar to the metrics the implementation of the Tagger interface will be injected in the executor tasks (Map and Reduce tasks for MapReduce program and Spark executors in case of Spark jobs). Then from the map tasks for example, the implementation can write to the storage system.
Sample usage is as follows:
public static class PurchaseMapper extends Mapper<byte[], Purchase, Text, Purchase> { private Metrics mapMetrics; private Tagger tagger; @Override public void map(byte[] key, Purchase purchase, Context context) throws IOException, InterruptedException { String user = purchase.getCustomer(); if (purchase.getPrice() > 100000) { mapMetrics.count("purchases.large", 1); } // check if PII field is not null if (purchase.getUserId() != null) { context.tag(Entity.name("purchases").ofType(Type.Dataset), "UserId", Arrays.asList("PII")); } context.write(new Text(user), purchase); } }
Pros:
- Allows tagging based on the actual data in the record which gives more control to the user.
Cons:
- MapReduce/Spark framework do not allow built in way of collecting information (except counters and accumulators) from the executor tasks. CDAP will have to handle it. One of the approach is to do it the way shown above by injecting implementation of the Tagger. However it makes implementation complex.
- Since the call is made per the record level the impact on the performance will be higher.
Approach
Approach #1
Approach #2
API changes
New Programmatic APIs
void addProperties(Map<String, String> resource, MetadataType type, Map<String, String> properties) throws Exception; void addTags(String resource, MetadataType type, String... tags) throws Exception; Set<MetadataRecord> getMetadata(Map<String, String> resource) throws Exception; Set<MetadataRecord> getMetadata(MetadataScope scope, Map<String, String> resources) throws FoundException; void removeMetadata(Map<String, String> resource) throws NotFoundException; void removeProperties(Map<String, String> resource, String... keys) throws NotFoundException; void removeTags(Map<String, String> resource, String ... tags) throws NotFoundException; MetadataSearchResponse search(String namespaceId, String searchQuery, Set<EntityTypeSimpleName> types, SortInfo sortInfo, int offset, int limit, int numCursors, String cursor, boolean showHidden, Set<EntityScope> entityScope) throws Exception;
Deprecated Programmatic APIs
New REST APIs
Deprecated REST API
All of the entityId based rest apis in MetadataHttpHandler
CLI Impact or Changes
- Impact #1
- Impact #2
- Impact #3
UI Impact or Changes
- Impact #1
- Impact #2
- Impact #3
Security ImpactÂ
What's the impact on Authorization and how does the design take care of this aspect
Impact on Infrastructure OutagesÂ
System behavior (if applicable - document impact on downstream [ YARN, HBase etc ]Â component failures) and how does the design take care of these aspect
Test Scenarios
Test ID | Test Description | Expected Results |
---|---|---|
Releases
Release X.Y.Z
Release X.Y.Z
Related Work
- Work #1
- Work #2
- Work #3
Future work
Created in 2020 by Google Inc.