Order By

Order By

Introduction 

Is a aggregator plugin that allows one to sort rows based on fields you specify and whether they should be sorted in ascending or descending order. 

Use-case

User is processing web access log and as part of his data pipeline user is aggregating response codes. User uses “GROUP BY” aggregation plugin in the pipeline to count for web responses in each response category. At the end of the file, user wants to sort it based on count before being written to the file. 

User Stories

  • User should have ability to specify single or composite field for sorting the records in the output

  • User should be able to specify field (basic type) from a nested structure for sorting the records

  • User should specify for each field how the records should be sorted

  • User can specify only basic types - String, Int, Long, Short, Float, Double, Byte as key, in case any other types are specified then error is thrown to notify the user

Example

Following is a simple example showing how Order By would work.

Input

First Name

Last Name

Age

Zip Code

Joltie

Root

29

32826

Henry

Zilka

62

96789

Baby

Trump

10

76563

Donald

Trump

70

34566

Ivanka

Trump

34

94306

Bipasha

Basu

39

67543

BabyII

Trump

10

32816

 

Configuration is specified as follows

  • Input Schema

    • First Name, String

    • Last Name, String

    • Age, Int

    • Zip Code, Long

  • Sort by 

    • Last Name, Ascending

    • Age, Ascending

    • Zipcode, Descending

  • Output Schema

    • First Name, String

    • Last Name, String

    • Age, Int

    • Zip Code, Long

Output is as follows

 

First Name

Last Name

Age

Zip Code

Bipasha

Basu

39

67543

Jolie

Root

29

32826

Baby

Trump

10

76563

BabyII

Trump

10

32816

Ivanka

Trump

34

94306

Donald

Trump

70

76563

Henry

Zilka

62

96789

Implementation Tips

  • Investigate how ‘Group Comparator’ and ‘Sort Comparator’ work together and be used to achieve the functionality for this plugin.

  • Build a simple map-reduce program to show understand how the above functionality work — Implement Sort Comparator using StructuredRecord

  • If the above works, then Data Pipeline Application Template need to be modified to the set the sort class comparator and this shouldn’t affect the other plugins.

Design 

The order by plugin will use the Secondary Sort technique to sort the values (in ascending or descending order) passed to each reducer.

This plugin looks as below:

CompositeKeyWritable.java

/** * CustomWritable for the composite key. */ public class CompositeKey implements Writable, WritableComparable<CompositeKey> { private String structureRecordJSON; //StructuredReocrd will be received as JSON string from the mapper. private String sortFieldsJSON; //List of fields to be sorted will be received as JSON string from the mapper. /** *This comparator controls the sort order of the keys. */ public int compareTo(CompositeKey other) { //Compare the structuredRecord objects parsed from the json string using JSON } }

CompositeKeyMapper.java

/** * Mapper class for Order by */ public class CompositeKeyMapper extends Mapper<LongWritable, Text, CompositeKey, Text> { /** *@param Longwritable blank object received for key. *@param StructuredRecord received as a JSON string *@param context object containing the configuration properties for the current job */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { } }

CompositeKeyReducer.java

/** * Reducer class for Order by */ public class CompositeKeyReducer extends Reducer<CompositeKey, Text, CompositeKey, Text> { /** *@param CompositeKeyWritable key used for sorting. *@param StructuredRecord received as a JSON string *@param context object containing the configuration properties for the current job */ @Override protected void reduce(CompositeKeyWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { } }

CompositeKeyComparator.java

/** * Custom sort class to sort the received structured records */ public class CompositeKeyComparator extends WritableComparator { @Override public int compare(WritableComparable wc1, WritableComparable wc2) { CompositeKey compositeKey1 = (CompositeKey) wc1; CompositeKey compositeKey2 = (CompositeKey) wc2; return compositeKey1.compareTo(compositeKey2); } }

 

  • The plugin will also implement CompositeGroupComparator. java (custom group comparator) to control which keys are grouped together into a single call to the reduce() method and CompositePartitioner.java (custom partitioner class) to ensure all data with the same keys(a combination of some of the keys specified) is sent to the same reducer.

  • The prepareRun() method will register all the custom classes required for order-by

  • For sorting to be done on nested records, the name of the field should be fully qualified name starting from parent.

    { "productName": "Angels & Demons", "quantity" : 1, "shippingAddress":{ "city": "Columbia", "state" : "Illinois", "zip" : 12568 }, "billingAddress":{ "city": "California City", "state" : "California", "zip" : 11220 } }

    For example: For the above stated order if user wants to sort the data in descending order based on the zip-code present in shippingAddress. The field should be specified as "shippingAddress->zip, desc"

 

EXAMPLE

This example would accept a structured nested record containing shippingAddress and billingAddress as record and sort the field in ascending order of productName and descending order of zip code present in shippingAddress

{ "name": "OrderBy", "plugin": { "name": "OrderBy", "type": "transform", "label": "OrderBy", "properties": { "sortFieldList": "productName:asc,shippingAddress->zip:desc" } } }

 

 

 

Table of Contents

Checklist

User stories documented 
User stories reviewed 
Design documented 
Design reviewed 
Feature merged 
Examples and guides 
Integration tests 
Documentation for feature 
Short video demonstrating the feature

 

Created in 2020 by Google Inc.