Whole file ingest (preserving file boundaries)

Checklist

  • User Stories Documented
  • User Stories Reviewed
  • Design Reviewed
  • APIs reviewed
  • Release priorities assigned
  • Test cases reviewed
  • Blog post

Introduction 

Users can use the command line tool DistCp to copy files between clusters with same or different filesystems. Currently we have hydrator plugins that support batch file copying within filesystems such as local HDFS, FTP, and Windows Share. However, these plugins all have restrictions. For example, we don't have corresponding file batch sink plugins despite having file batch source plugins. Therefore the files emitted from the source can't be written to filesystems while preserving the file/ directory structures. Action plugins such as SFTP copy only supports copying within the SFTP server, which restricts users from moving files out of the server. We wish to add some Hydrator plugins that can help users perform whole file copies between arbitrary filesystems/ databases in the CDAP UI. 

Goals

According to this user request, our new plugin ideally should have the following features: 

  1. Should support file copying between the following file systems:
    1. Local directories
    2. Remote and local HDFS
    3. Amazon S3
    4. FTP
  2. Should support failover. It should start where it left during restarts or issues.
  3. We should have UI, where we can see progress
  4. We should have metrics for each process on how many files copied, size, time.
  5. Checks network bandwidth and displays estimated completion time.
  6. Maintains the timestamp of each file as is from the source.
  7. Specify Path filters through UI on the fly.
  8. File permission configurations.

User Stories 

  • As a cluster administrator, I want to migrate all my files and preserve file structures when upgrading to a newer cluster. 
  • As a data analyst, I want to retrieve files that contain data from some remote ftp location and store them in my cluster that runs the HDFS filesystem.  
  • As a cluster administrator, I'm only interested in files with specific file names and wish to copy them to some other location.
  • As a data analyst, I want to copy the results of my experiments to some remote cluster to share them.
  • As a pipeline developer, I want to be able to configure the number of files per split for the source to use.

Design

Our high level pipeline design can be split into 2 major parts: a file reader source and a file writer sink.

  • File Reader Source should contain the following configurations:
    1. The address of the source cluster/ server.
    2. The type of filesystem we wish to copy from.
    3. The base path/ directory of all the files we wish to copy.
    4. A regular expression filter that chooses which files we wish to copy.
    5. Whether or not to check recursively into deeper directories.
    6. The number of files each mapper reads from. (Similar concept as maxSplitSize in File Batch Source Plugin)
    7. Additional configurations such as username and password.
  • File Reader Sink should contain the following configurations:
    1. The address of the target cluster/ server.
    2. The type of filesystem we wish to copy into.
    3. The base path/ directory we wish to copy files into.
    4. Whether or not to overwrite existing files.
    5. Additional configurations such as username and password.

 

Approach

Approach #1

Instead of having the source read data and pass it through the pipeline to the sink, we only pass down the file metadata from the source (path, name, timestamp, size....). Information required to access the source database must also be passed down the pipeline, as the sink does not have that information during configure time. The sink will gather file metadata from the input and perform file read from source directories and file write to target location. This approach allows us to copy files at file-level-granularity (All operations depend on and only on file metadata) and avoids passing big chunks of data along the pipeline. We can also preserve attributes such as permission and timestamp with this approach.

 

File Copy Source Plugin

FileCopySource
 
public abstract class AbstractFileCopySource extends ReferenceBatchSource<String, FileAttribute, StructuredRecord> {
  private final AbstractFileCopySourceConfig config;

  public AbstractFileCopySource(AbstractFileCopySourceConfig config) {
    super(config);
    this.config = config;
  }

  public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
    /**
     * Loads configurations from UI and check if they are valid
     * Possibly initiate connections to databases and check if the
     * provided credentials are valid.
     *
     */
  }

  public void initialize(BatchSourceContext context) {
    /**
     * Initialize a record transformer that transforms file metadata
     * into StructuredRecords.
     *
     */
  }

  public void prepareRun(BatchSourceContext context) {
    /**
     * Connect to the source databases. Set up input format to read the necessary
     * file metadata.
     *
     */
  }

  public void transform(KeyValue<String, FileAttribute> input, Emitter<StructuredRecord> emitter) {
    /**
     * Convert file metadata to StructuredRecord and emit.
     * Will also have to emit information required to connect to the source database such that
     * the sink can pull file data from the source.
     */
  }

  public abstract class AbstractFileCopySourceConfig extends ReferencePluginConfig{
    public AbstractFileCopySourceConfig(String name) {
      super(name);
    }

    /**
     * Additional configurations here for specific databases.
     */
  }
}
 
 

To support the source plugin, we need an InputFormat that reads file metadata from file source.

sourceinputformat
 
public class MetadataInputFormat extends FileInputFormat<String, FileAttribute> {
  public MetadataInputFormat() {

  }

  @Override
  public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) {
    return new MetadataRecordReader();
  }

  /**
   * Returns key that contains file path.
   * Returns value that contains file attributes.
   */
  public class MetadataRecordReader extends RecordReader<String, FileAttribute> {

    public MetadataRecordReader() {
      super();
    }

    @Override
    public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException 	     { 
      /**
       * Initialize a reader that can traverse through all the files to be read from
       * in the source directory
       **/
    }

    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
      /**
       * Get the next file to read
       *
       */
    }
  }
}
 

File metadata should include the following fields:

  • File name
  • File path
  • File size
  • Date / Timestamp
  • User / Permission

We will have to create our own FileAttribute class to normalize this data.

File copy sink plugin

After the sink receives metadata for every file, it initiates connection with both the source and destination filesystems. We have to create a new OutputFormat class with the following functionalities.

  • Can read file contents given file metadata and database access information.
  • Can create and maintain a list of files to be copied.
  • Have a static method that sets the destination folder.
  • Can create folders in the destination filesystem.
  • Reads and writes files using streams.

The OutputFormat, given the list of files to be copied, will read file contents from an input stream and outputs a stream that writes to the destination filesystem.

FileCopySink
 
public abstract class AbstractFileCopySink extends ReferenceBatchSink {
  private final AbstractFileCopySinkConfig config;

  public AbstractFileCopySink(AbstractFileCopySinkConfig config) {
    super(config);
    this.config = config;
  }

  public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
    /**
     * Loads configurations from UI and check if they are valid
     * Possibly initiate connections to databases and check if the
     * provided credentials are valid.
     *
     */
  }

  public void initialize(BatchSourceContext context) {

  }

  public void prepareRun(BatchSourceContext context) {
    /**
     * Connect to the destination databases. Create taget directories if necessary.
     *
     */
  }

  public void transform(StructuredRecord input, Emitter<NullWritable> emitter) {
    /**
     * 1. Emit file metadata. InputFormat will read from source data
     *    base according to metadata. 
     * 2. Emit credentials required to connect to the source database.
     */
  }

  public class AbstractFileCopySinkConfig extends ReferencePluginConfig{
    public AbstractFileCopySinkConfig(String name) {
      super(name);
    }
    /**
     * Additional configurations for the file sink
     */
  }
}
 

In addition, we can add a configuration option that controls whether or not to overwrite files at the destination folder (similar to DistCp). If this option is disabled (i.e. no overwriting), the filesystem will have to scan the destination folder and discard existing files from the copying list.

API changes

Deprecated Programmatic APIs

We will remove plugins such as FTP source and SFTP source. Also file copy action plugins that have limited functionalities will be removed. (Windows Share Copy and HDFS Move plugins).

New REST APIs

PathMethodDescriptionResponse CodeResponse
/v3/apps/<app-id>GETReturns the application spec for a given application

200 - On success

404 - When application is not available

500 - Any internal errors

 

     

Deprecated REST API

PathMethodDescription
/v3/apps/<app-id>GETReturns the application spec for a given application

UI Impact or Changes

  • New interfaces that help visualize file copying progress and bandwidth used can be added. 
  • We should support dynamic plugin configuration menus. i.e. different configuration options would pop up depending on some particular configuration choices. We need this such that we can combine different plugins with the same functionality (HDFS, S3...) into a single configurable plugin.

Security Impact 

  • Users will possibly have to provide security credentials to remote clusters. 
  • Obviously data pulled out from a local secure cluster will be visible by outside sources. Users should make sure sensitive data is moved from a secure cluster to another secure cluster. We could implement a check during configure time that raises a warning regarding this issue.

Questions and Concerns 

  1. What happens when a pipeline crashes? Is it possible to continue the copying job after restart?
    1. Crashes during file metadata transfer phase: We can clear the metadata cache in the sink and retransfer file metadata from the source. It is not a very expensive operation and can be restarted.
    2. Crashes during file copy phase at the sink. ???

 

Test Scenarios

Test IDTest DescriptionExpected Results
   
   
   
   

Releases

Release X.Y.Z

Release X.Y.Z

Related Work

 

Future work

 

Created in 2020 by Google Inc.