Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents


General 


To support use cases of migrating files from OnPrem to Google cloud we there is a need to for comphrensive support for Filesfile handling capabilities. This includes FileList, FileCompression, FileDecompression, FileEncryption , FileDecryption etc.  Right now the Plugins thats supported on CDAP do not support Fileconstructs but rather at the Record level There are few file level plugins available in CDAP like FileMove, FileDelete and this needs to be expanded


UseCase 1



Proposed Design 


Image RemovedImage Added


  1. FileList Plugin - BatchSource Plugin  - Implement a new FileList plugin ( Batchsource plugin) with similar capability of the current FileSource plugin but instead of actually reading the file contents  it would just pass the filenames with full URI to be used for processing the following actions in pipeline. FileCompress
  2. FileCompressEncrypt Sink Plugin - Transform  SparkSink Plugin -  Implement   Implement a Compression plugin similar to the Field Compression Plugin which accepts an input file URI , reads the file compress this and stored it temporarily on teh same node and spits out the compressed file URI location that can be used by the next processing action.   Invoke the current Google Cloud Storage plugin to persist the file to Cloud storagenew plugin of type SparkSink that will read the filename's from the FileList plugin , will compress the file using gzip / snappy then encrypt the file using PGP Public key and persist to Google Cloud storage or HDFS or FileSystem

FileList Plugin 

This is a Batchsource plugin similar to current FileSource Plugin but only list the filenames with full URI and not actually read the contents of the file. 

Plugin

...

Properties

SectionFieldTypeDescription
Plugin

Basic

Configuration

Path


String

Path - Provide the path for the File or Directory. ( Text Field)

This should also support other file sources like FTP / SFTP etc

Recursive ProcessingBooleanList Files Recursively ( Boolean ) True / False

Output

Schema

FileName

FileNameList ( String , each record contains the full file name URI)

Implementation Logic 

Base the code off FileSource Plugin  . Override the PathTrackingInputFormat  ( I have checked the PathTrackingBlobInputFormat, probably I can take that and just overwrite without a deligate and use that ).  

Use PathTrackingBlobInputFormat 

@Override
public StructuredRecord.Builder getCurrentValue() {
String fieldName = schema.getFields().iterator().next().getName();
return StructuredRecord.builder(schema).set(fieldName, val Instead of actual record just pass in file path with name of the file ?);
}

...

StringRecord with FileName with full URI


Queries 

  • If SFTP or FTP needs to be supported then its not clear how the credential information can be shared to the next step in the process. 

FileCompressEncrypt Plugin


This plugin will take input file name thats passed from the FileList Plugin, Get the fileInputStream using the URI and then using Gzip or Snappy libs compress the file and store them locally on the node, encrypt the file using PGP public key and persist the file


Plugin Properties

SectionFieldTypeDescription
Plugin

Basic

Configuration

Input FileName

- Full URI of the file

Compression Type - Snappy / Groovy

Output Schema

CompressedFileName - Location of the compressed file with name (this is stored on the local node)

 Implement a new Transform Plugin for Compression,  use the exiting FieldCompression Plugin as starting point with following modification  

...

Open the file , get the InputFile Stream and pass this to the GZIP compression API's and store the file locally on the node. 

Queries 

...


String

Full Name of File including path ( URI)

Compress FileBooleanTrue / False
Compression AlgorithmString ( List)Gzip / Snappy. Applicable only if above Compress File is set to true
Encrypt FileBooleanTrue
PGP Public Key PathStringLocation of PGP public key. Path to File
PGP Public Key Access UseridStringUserid to access the public key incase security is enabled

PGP Public Key Access

password

StringPassword to access the key file
OutFilePathString

Path to store the output file from sync. The output filename will follow the format of <InputfileName Suffix>.gz.pgp

The file path URI can contain filesystem , Hdfs, gcs - google file system or cloud store.

MoveInputBooleanTrue / False - Move the source input file to a different path so the next run of the pipeline the same file will not be processed.
MoveFilePathStringPath to move the input on successful processing of the file.



Queries 

  1. What is the best approach to track processed files so they are not processed again.  Proposing moving the input files after successful processing to a different directory so they dont get processed again in the next run. 



Usecase 2


Image Added



FileDecompressDecrypt Plugin

The plugin support decrypting  files using PGP Public key and decompress file. 


Plugin Properties

SectionFieldTypeDescription

Basic

Configuration

Path


String

Path containing file name or directory of files.


Recursive ProcessingBooleanTrue / False

DeCompress FileBooleanTrue / False

DeCompression AlgorithmString ( List)Gzip / Snappy. Applicable only if above Compress File is set to true

DeEncrypt FileBooleanTrue

PGP Private Key PathStringLocation of PGP public key. Path to File

PGP Private Key Access UseridStringUserid to access the public key incase security is enabled


PGP Private Key Access

password

StringPassword to access the key file





MoveInputBooleanTrue / False - Move the source input file to a different path so the next run of the pipeline the same file will not be processed.

MoveFilePathStringPath to move the input on successful processing of the file.

Output

Schema

OutputStringEach Row from the file read.