Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents


General 

...

To support use cases of migrating files from OnPrem to Google cloud we there is a need to for comphrensive support for Filesfile handling capabilities. This includes FileList, FileCompression, FileDecompression, FileEncryption , FileDecryption etc.  Right now the Plugins thats supported on CDAP do not support Fileconstructs but rather at the Record level There are few file level plugins available in CDAP like FileMove, FileDelete and this needs to be expanded


UseCase



Proposed Design 

...

  1. FileList Plugin - BatchSource Plugin  - Implement a new FileList plugin ( Batchsource plugin) with similar capability of the current FileSource plugin but instead of actually reading the file contents  it would just pass the filenames with full URI to be used for processing the following actions in pipeline. 
  2. FileCompress Plugin - ( Transform or Action Plugin) -  Implement a Compression plugin similar to the Field Compression Plugin which accepts an input file URI , reads the file compress this and stored it temporarily on teh same node and spits out the compressed file URI location that can be used by the next processing action.   
  3. Invoke the current Google Cloud Storage plugin to persist the file to Cloud storage. 

...

This is a Batchsource plugin similar to current FileSource Plugin but only list the filenames with full URI and not actually read the contents of the file. 

Plugin

...

Properties

SectionFieldTypeDescription
Plugin

Basic

Configuration

Path


Test

Path - Provide the path for the File or Directory. ( Text Field)

Recursive ProcessingBooleanList Files Recursively ( Boolean ) True / False

Output

Schema

FileNameList ( String , each record contains the full file name URI)

FileNameTextRecord with FileName with full URI

Implementation Logic 


Base the code off FileSource / AbstractFileSource Plugin  . Override the PathTrackingInputFormat  ( I have checked the PathTrackingBlobInputFormat, probably I can take that and just overwrite without a deligate and use that ).  

...

@Override
public StructuredRecord.Builder getCurrentValue() {
String fieldName = schema.getFields().iterator().next().getName();
return StructuredRecord.builder(schema).set(fieldName, val Instead of actual record just pass in file path with name of the file ?);
}

...


This plugin will take input file name thats passed from the FileList Plugin, Get the fileInputStream using the URI and then using Gzip or Snappy libs compress the file and store them locally on the node. 


Plugin Properties

SectionFieldTypeDescription
Plugin

Basic

Configuration

FileName

- Full URI of the file

Compression Type - Snappy / Groovy

Output Schema

CompressedFileName - Location of the compressed file with name (this is stored on the local node)

...


Test

Full Name of File including path ( URI)

CompressionDropdownSnappy / Gzip

Output

Schema

Compressed FileName (TBD)TextRecord with FileName with full URI
Compressed Content (TBD)File Stream




 Implement a new  Plugin for Compression,  use the exiting FieldCompression Plugin as starting point with following modification  

@Override
public void transform(StructuredRecord in, Emitter<StructuredRecord> emitter) throws Exception {
StructuredRecord.Builder builder = StructuredRecord.builder(outSchema);

Schema inSchema = in.getSchema();
List<Field> inFields = inSchema.getFields();

// Iterate through input fields. Check if field name is present
// in the fields that need to be compressed, if it's not then write
// to output as it is.
for (Field field : inFields) {
String FileName = field.getName();

Open the file , get the InputFile Stream and pass this to the GZIP compression API's and store the file locally on the node. 


Queries 

  1. Since this is processing at the FileLevel should this just be a Action Plugin or do we need this to be a Transformation Plugin. 
  2. Should this plugin just store the file locally and pass the new compressed file name or should this actually read the file , compress contents and passed the compressed contents as a stream. The issue with stream is if the file size if larger then it might not be efficient or we might have to do everything in memory which is not ideal. 
  3. If we store the files locally we need some way to clean it up after the processing.