Multiple File Set Sink

Multiple File Set Sink

Introduction

As users start to make more and more generic pipelines that are configured at runtime through macros, we will need the ability to write to to multiple sinks with a single plugin since we cannot modify the DAG once a pipeline is deployed. This plugin is designed to be dynamically configured at runtime to write to multiple filesets for a single run.

Use case(s)

  • A user would like to create a single pipeline that can be used to process files from different clients. The pipeline will be completely configured using macros so that each run can be customized depending on the client file being read in. Each file contains multiple lines that start with identifiers such as 101 and 202, each of these lines should be sent to a different dataset. Each client has a different set of identifiers in the file and would like to write to a different name and number of output file sets. The user would use this plugin, in combination with conditional directives from Wrangler to dynamically configure and write the records to different filesets.

  • This plugin would almost always be configured via macros at the pipeline runtime.

User Stories

  • As a pipeline developer, I would like the ability to dynamically change the number of filesets being written to at runtime. Determining which record to write to each dataset should be based on the data in each record.

  • As a pipeline developer, I would like to define different schemas for each fileset I am writing to, which means this plugin will allow different input schemas.

  • As a pipeline developer, I would like to determine which fileset to write each record to based on one or more fields in the record.

  • As a pipeline developer, I would like to be able to compress the data written to each of these datasets.

  • As a pipeline developer, I would like to be able to partition the filesets using the runtime

Plugin Type

Batch Sink 

Configurable

This section defines properties that are configurable for this plugin. 

User Facing Name

Required

Default

Description

User Facing Name

Required

Default

Description

Sink Name

Y

None

Names of the Sink plugin.

Type

Y

Automatic

Type of this sink is always batch sink.

OutputFileSets:

User Facing Name

Required

Default

Description

User Facing Name

Required

Default

Description

Dataset Name

Y

None

Name of the PartitionedFileSet to which records are written. If they do not exist, they will be created. (Macro-enabled)

Expression

Y

None

Expression for file id filter and different fields filter.

DatasetTargetPath

N

[Namespace]/data/[Dataset name]

The path where the data will be recorded. Defaults to the name of the dataset.

FileSet Properties:

N

None

The Parquet file schema of the record being written to the sink as a JSON object. If there are multiple file sets to be written to, multiple schemas of each file in JSON should be combined and written together as a single JSON. (Macro-enabled)

Compression Codec

N

None

Optional parameter to determine the compression codec to use on the resulting data. Valid values are None, Snappy, GZip, and LZO.

Schema

Y

None

The schema of the record being written to the sink as a JSON Object. Must be specified manually.

 

Example

This example will write to a PartitionedFileSet named 'users'. It will write data in Parquet format compressed using Snappy compressions using the given schema. Every time the pipeline runs, the most recent run will be stored in a new partition in the PartitionedFileSet:

{ "name": "Multiple File Set Sink", "type": "batchsink", "outputFileSets": [{ "compressionCodec": "Snappy", "datasetName": "fileset0", "datasetTargetPaths": "", "expression": "(id.startsWith(\"201\") || id.startsWith(\"202\") || id.startsWith(\"203\") )&& salary >= 5000 && salary <= 7000", "filesetProperties": "", "schema": { "type": "record", "name": "fileset0", "fields": [{ "name": "id", "type": "string" }, { "name": "first_name", "type": "string" }, { "name": "last_name", "type": "string" }, { "name": "sex", "type": "string" }, { "name": "address", "type": "string" }, { "name": "salary", "type": "string" }] } }, { "compressionCodec": "Snappy", "datasetName": "fileset1", "datasetTargetPaths": "", "expression": "(id.startsWith(\"201\") || id.startsWith(\"202\") || id.startsWith(\"203\") )&& salary >= 5000 && salary <= 7000", "filesetProperties": "", "schema": { "type": "record", "name": "sales", "fields": [{ "name": "id", "type": "string" }, { "name": "first_name", "type": "string" }, { "name": "salary", "type": "string" }] } }, { "compressionCodec": "Snappy", "datasetName": "fileset2", "datasetTargetPaths": "", "expression": "(id.startsWith(\"201\") || id.startsWith(\"202\") || id.startsWith(\"203\") )&& salary >= 5000 && salary <= 7000", "filesetProperties": "", "schema": { "type": "record", "name": "sales", "fields": [{ "name": "id", "type": "string" }, { "name": "first_name", "type": "string" }, { "name": "sex", "type": "string" }, { "name": "address", "type": "string" }, { "name": "salary", "type": "string" }] } } ] }

 

Original data source is as following:

id

first_name

last_name

sex

address

salary

id

first_name

last_name

sex

address

salary

202EMPLPIM 

Madeline

Heine

F

 22 Rochester St, Uhome, WY  

5000

202EMPLPIM

Margaret

Morehead

F

100 Commerce Cr, Springfield, IL

6000

201EMPLPIM  

Sean

Froula

M

2105 8th St, Uhome, WY

7000

203EMPLPIM  

Jennifer 

Costello

F

21 Walker Rd, Uhome, WY

8000

 

 

After sink process, it will be split into follow filesets:

fileset0

id

first_name

last_name

sex

address

salary

id

first_name

last_name

sex

address

salary

202EMPLPIM 

Madeline

Heine

F

22 Rochester St, Uhome, WY  

5000

202EMPLPIM

Margaret

Morehead

F

100 Commerce Cr, Springfield, IL

6000

201EMPLPIM  

Sean

Froula

M

2105 8th St, Uhome, WY

7000

 

fileset1

id

first_name

salary

id

first_name

salary

202EMPLPIM 

Madeline

5000

202EMPLPIM

Margaret

6000

 

fileset2

id

first_name

sex

address

salary

id

first_name

sex

address

salary

202EMPLPIM 

Madeline

F

22 Rochester St, Uhome, WY  

5000

202EMPLPIM

Margaret

F

100 Commerce Cr, Springfield, IL

6000

203EMPLPIM  

Jennifer 

F

21 Walker Rd, Uhome, WY

8000

 

Design

  • Investigate if the Wrangler core library can be used to evaluate the expression for this plugin

  • Can compression codec be changed for each dataset? or should it be set on the job level?

  • How can we write to multiple filesets from the same transform function?

 

Security

Limitation(s)

  • This can only be used to write to time partitioned filesets

 

Future Work

  • Add the ability to write to all different types of datasets

  • Add the ability to write datasets with different partitioning schemes

Test Case(s)

  • Test case #1

  • Test case #2

Sample Pipeline

Please attach one or more sample pipeline(s) and associated data. 

Pipeline #1

Pipeline #2

 

 

Table of Contents

Checklist

User stories documented 
User stories reviewed 
Design documented 
Design reviewed 
Feature merged 
Examplesandguides 
Integration tests 
Documentation for feature 
Short video demonstrating the feature

Created in 2020 by Google Inc.