Data Quality Application

Overview

Motivation

The goal of this application is to provide users with an extensible CDAP application to help them determine the quality of their data.  Users will have the ability to assess the quality of their data using some out of box functionality and libraries. Furthermore, users will also be able to extend the application with their own aggregation functions and customizations. Finally, we’d like to provide users with a way to query the results of the quality metric computations via RESTful APIs.

Example Use Cases

  • Suppose the user wants to take in a stream of Apache Access log data (CLF format) to generate several histograms of status codes and referrers (partitioned by time)  and then wants to be able to query ranges of timestamps for aggregated histogram data. 
  • Another such example would be if a user wanted to figure out which IP addresses were trying to access a given service, and the user had the related Apache Access Log data stored in a table — partitioned by time. The application could be configured to read in the IP addresses from the table and compute a unique aggregation on each set of IP addresses for each time interval. After that, the user could once again query the aggregated data while specifying relevant timestamps.

Requirements

  • Read in multiple data formats from a generic source

    • Example: read in from stream of CLF data

  • Data Quality should be assessed in batch and the results should be computed in periodic intervals (i.e. each hour)

  • Map Reduce program that will take in log data as input, apply an aggregation function, and store the results of the aggregation in the table.

    • The table should be partitioned by time

  • Allow the user to query aggregated data over various time intervals.

  • Library of aggregation functions

  • Allow the aggregation function logic to be pluggable (allow users to plug in their own aggregation functions)

  • Allow the source type and input data format to be configurable

  • UI
  • Determine when there are anomalies/suspicious patterns (using the aggregated data).

    • Explore if it’s possible to attach a level of confidence to these findings

    • Alert the user via email when such things occur

    • Examples:

      • Suppose 90% of the status codes are consistently 200’s and all of a sudden, in the adjacent time interval, 70% of the status codes coming in are a mix of 404’s and 500’s. We would want to be able to determine that this is a situation worth alerting the user via email over.

      • Suppose the frequencies of referrers over a standard length time interval are fairly equally distributed up until a certain interval at which point the frequencies are largely skewed towards one referrer. We would want to be alert the user that this referrer is likely a bot (possibly even with a confidence level)

Design

Implicit Table Schema

  • The reducer writes the results of the data aggregations to a Table following the following implicit Table schema:
    • Row key : v.SourceID.FieldName.Timestamp
    • Column key: Aggregation function name
    • Value: Aggregate for a field over some time window
  • In addition, the reducer will write the list of aggregation types available for each source and time interval combination following the following implicit Table schema:
    • Row key: a.SourceID.Timestamp
    • Column key: FieldName
    • Value: A list of objects -- each object corresponds to an aggregation type.

Services

We need two services: one for aggregated data that can be combined in a meaningful way over multiple timestamps, and one for data that cannot be combined because combining such aggregations (like in the case of multiple standard deviations corresponding to multiple timestamps) is nonsensical.

  • Service for combinable aggregations (example: frequency/histogram)

    • Handler for retrieving aggregations

  • Returns a JSON of aggregated values
  • Request format: /v1/source/{sourceID}/fields/{fieldname}/aggregations/{aggregationName}?startTimestamp={start_ts}&endTimestamp={end_ts}
  • Path param: source ID
  • Path param: field name
  • Path param: aggregation type
  • Query param: start timestamp 
  •  Query param: end timestamp 
  • The service will return a JSON containing the aggregated values that were queried. Here is an example of the format of something the service might return if the user wants to see a discrete values histogram over a given time range:
{
  "data": {
    "200": 504,
    "400": 12,
    "404": 23,
    "500": 2
  }
}
    • Hander for retrieving the fields that have been aggregated over for a given sourceID.

      • Returns a JSON of field names

      • Request format: /v1/source/{sourceID}/fields/?startTimestamp={start_ts}&endTimestamp={end_ts}

      • Path param: source ID
      • Query param: start timestamp
      • Query param: end timestamp
      •  The service will return a JSON containing the available field names for the given parameters (sourceID, time interval). Here is an example of the format of something the service might return if the user wants to see the available aggregation functions for stream1 between timestamps 1423371600 and 1423372900 
{  
   "content_length":[  
      {  
         "name":"median",
         "combinable":false
      },
      {  
         "name":"max",
         "combinable":true
      }
   ]
}
    • Handler for retrieving the aggregation types for a given sourceID, fieldName, and time interval.

      • Returns a JSON of aggregation types

      • Request format: /v1/source/{sourceID}/fields/{fieldname}/aggregations?startTimestamp={start_ts}&endTimestamp={end_ts}

      • Path param: source ID

      • Path param: field name

      • Query param: start timestamp

      • Query param: end timestamp

The service will return a JSON containing the available aggregation types for the given parameters (sourceID, fieldName, time interval). Here is an example of the format of something the service might return if the user wants to see the available aggregation functions for stream1, content_length, between timestamps 1423371600 and 1423372900

  [  
   {  
      "name":"median",
      "combinable":false
   },
   {  
      "name":"max",
      "combinable":true
   }
]

  • Service for non-combinable aggregations (example: standard deviation)

    • Handler for retrieving aggregations

      • Returns a timeseries
      • Request format : /v1/source/{sourceID/fields/{fieldName}/timeseries/{aggregationType}?startTimestamp={start_ts}&endTimestamp={end_ts}

      • Path param: source ID

      • Path param: field name

      • Path param: aggregation type

      • Query param: start timestamp

      • Query param: end timestamp

This service will return a JSON containing the aggregated values that were queried. For instance, if a user queries the average of content length over a given time range, the service will likely return something of the following format:

 {  
   "timeValues":[  
      {  
         "timestamp":1423371600,
         "data":122240
      },
      {  
         "timestamp":1423375200,
         "data":122240
      },
      {  
         "timestamp":1423378800,
         "data":121732
      },
      {  
         "timestamp":1423382400,
         "data":122240
      },
      {  
         "timestamp":1423386000,
         "data":121732
      },
      {  
         "timestamp":1423389600,
         "data":122240
      },
      {  
         "timestamp":1423393200,
         "data":121732
      },
      {  
         "timestamp":1423396800,
         "data":47327
      }
   ]
}
    • Handler for retrieving the field names that have been aggregated over for a given sourceID

      • This will be identical to the second handler for combinable aggregations mentioned above.

    • Handler for retrieving the aggregation types for a given sourceID, fieldName, and time interval.

      • This will be identical to the third handler for combinable aggregations mentioned above.

Library of Aggregation Functions

We want to provide a set of basic aggregation functions for users to use right off the bat and also base their custom aggregation functions’ structures off of. The following are some of the functions we’d like to include:

For discrete values:

  1. For numerical values:

    1. Generate histogram (sum of frequencies for each field)

    2. Average

    3. Standard deviation

    4. Median

  2. For categorical values:

    1. Generate histogram (sum of frequencies for each field)  

    2. Unique values

For continuous values:

  1. Average

  2. Standard deviation

  3. Median

  4. Determine buckets, and generate histogram

Structure of Aggregation Functions  

This is important because it dictates the structure of users’ custom aggregation functions.

Interfaces

 

 

 

/**
* Basic Aggregation Function Interface: All aggregation function classes -- combinable or       * non-combinable-- should implement these methods
* @param <T> Type of the aggregation
*/
public interface BasicAggregationFunction<T> {

 /**
 * Incrementally add values to the aggregation
 * @param value is a DataQualityWritable that we want to add to the running aggregation
 */
 void add(DataQualityWritable value);

 /**
 * Return the aggregation accumulated using add() in the form of a byte[]
 * @return a byte[] that represents the final aggregated value
 */
 byte[] aggregate();

 /**
 * Deserialize a given value appropriately  
 */
 T deserialize(byte[] value);
}



 

 

 

/**

* Aggregation Function Interface
* This is for aggregation that still make sense when combined.
*
* An example of such a function would be a discrete values histogram. If several
* histograms (each corresponding to various time intervals) were combined
* the result would be a histogram that would represent the frequencies of
* various values over the combined time interval
* @param <T> Aggregation type
*/

public interface CombinableAggregationFunction<T> extends BasicAggregationFunction {

 /**
 * Retrieve a combined aggregation
 * @return T is the type of the combined aggregation
 */

 T retrieveAggregation();

 /**
 * Combine existing aggregations one-by-one
 * @param values is a byte[] that we want to add to the running combined aggregation
 */

 void combine(byte[] values);
}

 

 

Application API Usages

Usage of  void add(DataQualityWritable value) and  byte[] aggregate()

 

 

@Override
public void reduce(Text key, Iterable<DataQualityWritable> values, Context context)
 throws IOException, InterruptedException {
 if (fieldsSet.contains(key.toString()) || fieldsCSV.equals("")) {
  try {
    Class<?> aggregationClass = Class.forName("data.quality.app.functions." +      
        aggregationFunctionClassName);
    BasicAggregationFunction instance = (BasicAggregationFunction)
        aggregationClass.newInstance();
    for (DataQualityWritable value : values) {
      instance.add(value);
    }
    FieldTimestampKey fieldTimestampKey = new FieldTimestampKey(timeKey, key.toString());
    context.write(fieldTimestampKey.getTableRowKey(), new Put(fieldTimestampKey.getTableRowKey(),
       Bytes.toBytes(aggregationFunctionClassName), instance.aggregate()));
  } catch (ClassNotFoundException | RuntimeException | InstantiationException |
      IllegalAccessException e) {
    throw new RuntimeException(e);
  }
 }
}

 

 

Usage of  T retrieveAggregation() and void combine(byte[] values)

 

 

 

/**
* Handler class for Data Quality Histogram Service
*/

@Path("/v1")
public static final class HistogramLookup extends AbstractHttpServiceHandler {
 @UseDataSet("logDataStore")
 Table logDataStore;

 @Path("fields/{fieldName}/aggregations/{aggregationType}")
 @GET
 public void handler(HttpServiceRequest request, HttpServiceResponder responder,
                    @PathParam("fieldName") String fieldName, @PathParam("aggregationType")
                    String aggregationType,
                    @QueryParam("startTimestamp") @DefaultValue("0") long startTimestamp,
                    @QueryParam("endTimestamp") @DefaultValue("9223372036854775807")
                    long endTimestamp) throws IOException {
  FieldTimestampKey fieldStartTimestampKey = new FieldTimestampKey(startTimestamp,
      fieldName);
  FieldTimestampKey fieldEndTimestampKey =
    new FieldTimestampKey(endTimestamp + 1 , fieldName); // scan rows inclusive of
      endTimestamp
  Scanner scanner = logDataStore.scan(fieldStartTimestampKey.getTableRowKey(),
                                      fieldEndTimestampKey.getTableRowKey());
  try {
    Class<?> aggregationClass = Class.forName("data.quality.app.functions." +
      aggregationType);
    CombinableAggregationFunction aggregationClassInstance =
      (CombinableAggregationFunction) aggregationClass.newInstance();
    Row row;
    byte[] aggregationTypeBytes = Bytes.toBytes(aggregationType);
    try {
      while (row = scanner.next()) != null) {
        Map<byte[], byte[]> columnsMapBytes = row.getColumns();
        byte[] output = columnsMapBytes.get(aggregationTypeBytes);
        if (output != null) {
          aggregationClassInstance.combine(output);
        }
      }
    } finally {
      scanner.close();
    }
    Object output = aggregationClassInstance.retrieveAggregation();
    responder.sendJson(output == null ? 404 : 200, output == null ? aggregationType +
      " is not a valid CombinableAggregationFunction" : output);
  } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
    new RuntimeException(e);
  }
 }
}

 

 

Example Aggregation Function

Below is code demonstrating how a discrete values histogram aggregation function would look like using the aforementioned APIs.

 

 

 

/**
* Discrete Values Histogram Aggregation Function
*/

public class DiscreteValuesHistogram
 implements BasicAggregationFunction, CombinableAggregationFunction<Map<String, Integer>> {
 private static final Gson GSON = new Gson();
 private static final Type TOKEN_TYPE_MAP_STRING_INTEGER = new TypeToken<Map<String, 
   Integer>>() { }.getType();
 private Map<String, Integer> histogramMap = Maps.newHashMap();
 private Map<String, Integer> aggregatedHistogramMap = Maps.newHashMap();

 public void combine(byte[] value) {
  Map<String, Integer> outputMap = deserialize(value);
  for (Map.Entry<String, Integer> entry : outputMap.entrySet()) {
    Integer val = aggregatedHistogramMap.get(entry.getKey());
    aggregatedHistogramMap.put(entry.getKey(), val == null ? entry.getValue() : val + 
       entry.getValue());
  }
 }

 public Map<String, Integer> deserialize(byte[] valueBytes){
 	return GSON.fromJson(Bytes.toString(valueBytes), TOKEN_TYPE_MAP_STRING_INTEGER);
 }

 public Map<String, Integer> retrieveAggregation() {
 	return aggregatedHistogramMap.isEmpty() ? null : aggregatedHistogramMap;
 }

 public void add(DataQualityWritable value) {
  Integer mapVal = histogramMap.get(value.get().toString());
  if (mapVal != null) {
    histogramMap.put(value.get().toString(), mapVal + 1);
  } else {
    histogramMap.put(value.get().toString(), 1);
  }
 }

 public byte[] aggregate() {
  return Bytes.toBytes(GSON.toJson(histogramMap));
 }
}

 

 

Aggregation Function Customization

Users can build custom aggregation functions using the APIs described above in a manner that is similar to the example above. After writing a custom class for an aggregation function, they will simply have to pass in their custom aggregation class name into the application configuration JSON.

End-to-End Example

  • Jumping back to the first use case mentioned under Example Use Cases: Suppose the user wants to take in a stream of CLF log data to generate several histograms partitioned by time of distributions of status codes and referrers and then wants to be able to query ranges of timestamps for aggregated histogram data.

  • They would create a Data Quality Application by specifying the following config JSON

{  
   "aggregationName":"discreteValuesHistogram",
   "fieldsCSV":"status, referrer",
   "workflowScheduleMinutes":5,
   "datatInputFormat":"clf",
   "sourceType":"stream"
}

 

  • After some time, suppose the user want to be able to query the stored data from timestamp 1436220867548 to 1436220868900

    • The service requests would look like this:

      • /v1/fields/status/totals/discreteValuesHistogram?startTimestamp=1436220867548&endTimestamp=1436220868900

      • /v1/fields/referrer/totals/discreteValuesHistogram?startTimestamp=1436220867548&endTimestamp=1436220868900

  • The results produced by the service would look something like the following for status codes (the results for the referrer query would be of an identical format):

{  
   "200":504,
   "400":12,
   "404":23,
   "500":2
}

 UI

The idea is to have a timeline on which the user can select a time range and sourceID. Once that selection happens, the user will be prompted with a list of fields over which they can aggregate over. Once the user selects a field from this list, he/she will then be prompted with a list of aggregation types. Once the user ultimately picks an aggregation type, they will be served a JSON containing the aggregated values that correspond to the time range, sourceID, field, and aggregation type selected previously.

Implementation Plan

 

  1. Start with getting an application to work end-to-end with the following requirements:

    1. Map Reduce ingests a stream of CLF data, generates a histogram of the data for every timestamp and stores the aggregation in map form in a Table.

    2. Write a workflow

    3. Write two services:

      1. One for querying combinable aggregations

      2. One for querying non-combinable (basic) aggregations

  2. Make the input to the stream generic

  3. Build out the aggregation function structure to be as generic as possible (as well as pluggable)

  4. Build a library of aggregation functions for various kinds of data: discrete and categorical, continuous, etc.

  5. Make the source generic

  6. Write a UI
  7. Implement anomaly/suspicious pattern detection (first pass should be fairly basic)

  8. Explore whether it is possible to attach a confidence level scheme to the anomaly/suspicious pattern detection to improve results.

  9. Send users alerts when such anomalies/suspicious patterns are detected.





Created in 2020 by Google Inc.