Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Overview

Motivation

The goal of this application is to provide users with an extensible CDAP application to help them determine the quality of their data.  Users will have the ability to assess the quality of their data using some out of box functionality and libraries. Furthermore, users will also be able to extend the application with their own aggregation functions and customizations. Finally, we’d like to provide users with a way to query the results of the quality metric computations via RESTful APIs.

...

  • Suppose the user wants to take in a stream of Apache Access log data (CLF format) to generate several histograms of status codes and referrers (partitioned by time)  and then wants to be able to query ranges of timestamps for aggregated histogram data. 
  • Another such example would be if a user wanted to figure out which IP addresses were trying to access a given service, and the user had the related Apache Access Log data stored in a table — partitioned by time. The application could be configured to read in the IP addresses from the table and compute a unique aggregation on each set of IP addresses for each time interval. After that, the user could once again query the aggregated data while specifying relevant timestamps.

Requirements

  • Read in multiple data formats from a generic source

    • Example: read in from stream of CLF data

  • Data Quality should be assessed in batch and the results should be computed in periodic intervals (i.e. each hour)

  • Map Reduce program that will take in log data as input, apply an aggregation function, and store the results of the aggregation in the table.

    • The table should be partitioned by time

  • Allow the user to query aggregated data over various time intervals.

  • Library of aggregation functions

  • Allow the aggregation function logic to be pluggable (allow users to plug in their own aggregation functions)

  • Allow the source type and input data format to be configurable

  • UI
  • Determine when there are anomalies/suspicious patterns (using the aggregated data).

    • Explore if it’s possible to attach a level of confidence to these findings

    • Alert the user via email when such things occur

    • Examples:

      • Suppose 90% of the status codes are consistently 200’s and all of a sudden, in the adjacent time interval, 70% of the status codes coming in are a mix of 404’s and 500’s. We would want to be able to determine that this is a situation worth alerting the user via email over.

      • Suppose the frequencies of referrers over a standard length time interval are fairly equally distributed up until a certain interval at which point the frequencies are largely skewed towards one referrer. We would want to be alert the user that this referrer is likely a bot (possibly even with a confidence level)

Design

Implicit Table Schema

...

  • Service for combinable aggregations (example: frequency/histogram)

    • Handler for retrieving aggregations

  • Returns a JSON of aggregated values
  • Request format: /v1/source/{sourceID}/fields/{fieldname}/aggregations/{aggregationName}?startTimestamp={start_ts}&endTimestamp={end_ts}
  • Path param: source ID
  • Path param: field name
  • Path param: aggregation type
  • Query param: start timestamp 
  •  Query param: end timestamp 
  • The service will return a JSON containing the aggregated values that were queried. Here is an example of the format of something the service might return if the user wants to see a discrete values histogram over a given time range:

...

    • Hander for retrieving the fields that have been aggregated over for a given sourceID.

      • Returns a JSON of field names

      • Request format: /v1/source/{sourceID}/fields/?startTimestamp={start_ts}&endTimestamp={end_ts}

      • Path param: source ID
      • Query param: start timestamp
      • Query param: end timestamp
      •  The service will return a JSON containing the available field names for the given parameters (sourceID, time interval). Here is an example of the format of something the service might return if the user wants to see the available aggregation functions for stream1 between timestamps 1423371600 and 1423372900 
Code Block
{  
   "content_length":[  
      {  
         "name":"median",
         "combinable":false
      },
      {  
         "name":"max",
         "combinable":true
      }
   ]
}
    • Handler for retrieving the aggregation types for a given sourceID, fieldName, and time interval.

      • Returns a JSON of aggregation types

      • Request format: /v1/source/{sourceID}/fields/{fieldname}/aggregations?startTimestamp={start_ts}&endTimestamp={end_ts}

      • Path param: source ID

      • Path param: field name

      • Query param: start timestamp

      • Query param: end timestamp

...

This is important because it dictates the structure of users’ custom aggregation functions.

Interfaces

 

 

 

Code Block
languagejava
/**
* Basic Aggregation Function Interface: All aggregation function classes -- combinable or       * non-combinable-- should implement these methods
* @param <T> Type of the aggregation
*/

public interface BasicAggregationFunction<T> {

 /**
 * Incrementally add values to the aggregation
 * @param value is a DataQualityWritable that we want to add to the running aggregation
 */
  void add(DataQualityWritable value);

 /**
 * Return the aggregation accumulated using add() in the form of a byte[]
 * @return a byte[] that represents the final aggregated value
 */
  byte[] aggregate();

 /**
 * Deserialize a given value appropriately  
 */

 T deserialize(byte[] value);
}



 

 

 

Code Block
languagejava
/**

* Aggregation Function Interface
* This is for aggregation that still make sense when combined.
*
* An example of such a function would be a discrete values histogram. If several
* histograms (each corresponding to various time intervals) were combined
* the result would be a histogram that would represent the frequencies of
* various values over the combined time interval
* @param <T> Aggregation type
*/

public interface CombinableAggregationFunction<T> extends BasicAggregationFunction {

 /**
 * Retrieve a combined aggregation
 * @return T is the type of the combined aggregation
 */

 T retrieveAggregation();

 /**
 * Combine existing aggregations one-by-one
 * @param values is a byte[] that we want to add to the running combined aggregation
 */

 void combine(byte[] values);
}

 

 

Application API Usages

Usage of  void add(DataQualityWritable value) and  byte[] aggregate()

 

 

Code Block
languagejava
@Override
public void reduce(Text key, Iterable<DataQualityWritable> values, Context context)
 throws IOException, InterruptedException {
 if (fieldsSet.contains(key.toString()) || fieldsCSV.equals("")) {
  try {
    Class<?> aggregationClass = Class.forName("data.quality.app.functions." +      
        aggregationFunctionClassName);
    BasicAggregationFunction instance = (BasicAggregationFunction)
        aggregationClass.newInstance();
    for (DataQualityWritable value : values) {
      instance.add(value);
    }
    FieldTimestampKey fieldTimestampKey = new FieldTimestampKey(timeKey, key.toString());
    context.write(fieldTimestampKey.getTableRowKey(), new Put(fieldTimestampKey.getTableRowKey(),
       Bytes.toBytes(aggregationFunctionClassName), instance.aggregate()));
  } catch (ClassNotFoundException | RuntimeException | InstantiationException |
      IllegalAccessException e) {
    throw new RuntimeException(e);
  }
 }
}

 

 

Usage of  T retrieveAggregation() and void combine(byte[] values)

 

 

 

Code Block
languagejava
/**
* Handler class for Data Quality Histogram Service
*/

@Path("/v1")
public static final class HistogramLookup extends AbstractHttpServiceHandler {
 @UseDataSet("logDataStore")
 Table logDataStore;

 @Path("fields/{fieldName}/aggregations/{aggregationType}")
 @GET
 public void handler(HttpServiceRequest request, HttpServiceResponder responder,
                    @PathParam("fieldName") String fieldName, @PathParam("aggregationType")
                    String aggregationType,
                    @QueryParam("startTimestamp") @DefaultValue("0") long startTimestamp,
                    @QueryParam("endTimestamp") @DefaultValue("9223372036854775807")
                    long endTimestamp) throws IOException {
  FieldTimestampKey fieldStartTimestampKey = new FieldTimestampKey(startTimestamp,
      fieldName);
  FieldTimestampKey fieldEndTimestampKey =
    new FieldTimestampKey(endTimestamp + 1 , fieldName); // scan rows inclusive of
      endTimestamp
  Scanner scanner = logDataStore.scan(fieldStartTimestampKey.getTableRowKey(),
                                      fieldEndTimestampKey.getTableRowKey());
  try {
    Class<?> aggregationClass = Class.forName("data.quality.app.functions." +
      aggregationType);
    CombinableAggregationFunction aggregationClassInstance =
      (CombinableAggregationFunction) aggregationClass.newInstance();
    Row row;
    byte[] aggregationTypeBytes = Bytes.toBytes(aggregationType);
    try {
      while (row = scanner.next()) != null) {
        Map<byte[], byte[]> columnsMapBytes = row.getColumns();
        byte[] output = columnsMapBytes.get(aggregationTypeBytes);
        if (output != null) {
          aggregationClassInstance.combine(output);
        }
      }
    } finally {
      scanner.close();
    }
    Object output = aggregationClassInstance.retrieveAggregation();
    responder.sendJson(output == null ? 404 : 200, output == null ? aggregationType +
      " is not a valid CombinableAggregationFunction" : output);
  } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
    new RuntimeException(e);
  }
 }
}

 

 

...

Below is code demonstrating how a discrete values histogram aggregation function would look like using the aforementioned APIs.

 

 

 

Code Block
languagejava
/**
* Discrete Values Histogram Aggregation Function
*/

public class DiscreteValuesHistogram
 implements BasicAggregationFunction, CombinableAggregationFunction<Map<String, Integer>> {
 private static final Gson GSON = new Gson();
 private static final Type TOKEN_TYPE_MAP_STRING_INTEGER = new TypeToken<Map<String, 
   Integer>>() { }.getType();
 private Map<String, Integer> histogramMap = Maps.newHashMap();
 private Map<String, Integer> aggregatedHistogramMap = Maps.newHashMap();

 public void combine(byte[] value) {
  Map<String, Integer> outputMap = deserialize(value);
  for (Map.Entry<String, Integer> entry : outputMap.entrySet()) {
    Integer val = aggregatedHistogramMap.get(entry.getKey());
    aggregatedHistogramMap.put(entry.getKey(), val == null ? entry.getValue() : val + 
       entry.getValue());
  }
 }

 public Map<String, Integer> deserialize(byte[] valueBytes){
 	return GSON.fromJson(Bytes.toString(valueBytes), TOKEN_TYPE_MAP_STRING_INTEGER);
 }

 public Map<String, Integer> retrieveAggregation() {
 	return aggregatedHistogramMap.isEmpty() ? null : aggregatedHistogramMap;
 }

 public void add(DataQualityWritable value) {
  Integer mapVal = histogramMap.get(value.get().toString());
  if (mapVal != null) {
    histogramMap.put(value.get().toString(), mapVal + 1);
  } else {
    histogramMap.put(value.get().toString(), 1);
  }
 }

 public byte[] aggregate() {
  return Bytes.toBytes(GSON.toJson(histogramMap));
 }
}

 

 

...