Snowflake database plugin

Snowflake database plugin

Introduction

A separate database plugin to support Snowflake-specific features and configurations.

Use-Case

  • Users can choose and install Snowflake source and sink plugins.

  • Users should see Snowflake logo on plugin configuration page for better experience.

  • Users should get relevant information from the tool tip:

    • The tool tip should describe accurately what each field is used for.

  • Users should not have to specify any redundant configuration

  • Users should get field level lineage for the source and sink that is being used.

  • Reference documentation should be updated to account for the changes.

  • The source code for Snowflake database plugin should be placed in repo under data-integrations org.

  • The data pipeline using source and sink plugins should run on both mapreduce and spark engines.

User Stories

  • User should be able to install Snowflake specific database source and sink plugins from the Hub

  • Users should have each tool tip accurately describe what each field does

  • Users should get field level lineage information for the Snowflake source and sink 

  • Users should be able to setup a pipeline avoiding specifying redundant information

  • Users should get updated reference document for Snowflake source and sink

  • Users should be able to read all the DB types

Plugin Type

Batch Source
Batch Sink 
Real-time Source
Real-time Sink
Action
Post-Run Action
Aggregate
Join
Spark Model
Spark Compute

Snowflake Overview

Snowflake is an analytic data warehouse provided as Software-as-a-Service (SaaS). Snowflake provides a data warehouse that is faster, easier to use, and far more flexible than traditional data warehouse offerings.

Snowflake’s data warehouse is not built on an existing database or “big data” software platform such as Hadoop. The Snowflake data warehouse uses a new SQL database engine with a unique architecture designed for the cloud. To the user, Snowflake has many similarities to other enterprise data warehouses, but also has additional functionality and unique capabilities.

Snowflake bulk API

Using JDBC for loading data has performance limitations. Snowflake provides bulk APIs for loading data

COPY INTO <table> command loads data from staged files to an existing table. The files must already be staged in one of the following locations:

  • Named internal stage (or table/user stage). Files can be staged using the PUT command. 

  • Named external stage that references an external location (AWS S3, Google Cloud Storage, or Microsoft Azure). 

  • External location (AWS S3, Google Cloud Storage, or Microsoft Azure).

Example:

-- Stages copy into mytable from '@mystage/path 1/file 1.csv'; copy into mytable from '@%mytable/path 1/file 1.csv'; copy into mytable from '@~/path 1/file 1.csv'; -- S3 bucket copy into mytable from 's3://mybucket 1/prefix 1/file 1.csv'; -- Azure container copy into mytable from 'azure://myaccount.blob.core.windows.net/mycontainer/encrypted_files/file 1.csv';

Also, there is an option to use Snowpipe to load data continuously.

Notes:

 

COPY INTO <location> command unloads data from a table (or query) into one or more files in one of the following locations:

  • Named internal stage (or table/user stage). The files can then be downloaded from the stage/location using the GET command.

  • Named external stage that references an external location (AWS S3, Google Cloud Storage, or Microsoft Azure).

  • External location (AWS S3, Google Cloud Storage, or Microsoft Azure).

Example:

-- Stages copy into '@mystage/path 1/file 1.csv' from mytable; copy into '@%mytable/path 1/file 1.csv' from mytable; copy into '@~/path 1/file 1.csv' from mytable; -- S3 bucket copy into 's3://mybucket 1/prefix 1/file 1.csv' from mytable; -- Azure container copy into 'azure://myaccount.blob.core.windows.net/mycontainer/encrypted_files/file 1.csv' from mytable;

Notes:

Design Tips

JDBC Driver API Support: https://docs.snowflake.net/manuals/user-guide/jdbc-api.html

Loading Data into Snowflake: https://docs.snowflake.net/manuals/user-guide-data-load.html

Design

The suggestion is to create a new maven sub-module in the database-plugins repo under the data-integrations organization.

Action Plugins

The proposal is to create a Snowflake Data Loading Action Plugin that will utilize COPY INTO <table> command to load data files into Snowflake and Snowflake Data Unloading Action Plugin that will utilize COPY INTO <location> command to unload data from a table (or query) into one or more files.

Also, Snowflake Action Plugin that runs a SQL query should be created.

Batch Source/Sink Plugins

Option 1

The first option is to create a new maven sub-module in the database-plugins repo under the data-integrations organization for Batch Source/Sink Plugins and implement them using JDBC in the same way as other DB plugins. 

Option 2

Using JDBC for loading data has performance limitations, so we can utilize Snowflake's bulk APIs for loading data.

Source Plugin

There is an option for the Source plugin to unload data from a table (or query) using COPY INTO <location> command into one or more files that than will be directly read by the plugin. 

Example that unloads the result of a query into a named internal stage (my_stage) using a folder/filename prefix (result/data_), a named file format (myformat), and gzip compression:

copy into @my_stage/result/data_ from (select * from orderstiny) file_format=(format_name='myformat' compression='gzip'); ---------------+-------------+--------------+ rows_unloaded | input_bytes | output_bytes | ---------------+-------------+--------------+ 73 | 8339 | 3322 | ---------------+-------------+--------------+

Data Files than can be directly downloaded from an Internal Stage to a Stream:

Connection connection = DriverManager.getConnection(url, prop); InputStream out = connection.unwrap(SnowflakeConnection.class).downloadStream( "~", DEST_PREFIX + "/" + TEST_DATA_FILE + ".gz", true);

Files can be stored in one of the following locations:

  • Named internal stage (or table/user stage). The files can then be downloaded from the stage/location using the GET command.

  • Named external stage that references an external location (AWS S3, Google Cloud Storage, or Microsoft Azure).

  • External location (AWS S3, Google Cloud Storage, or Microsoft Azure).

The proposal is to use an internal stage since files can then be downloaded directly using SnowflakeConnection#downloadStream method.

Source Splitter

The proposal is to determine the number of splits according to the number of staged files that were created using COPY INTO <location> command. The number of resulting files can be controlled using MAX_FILE_SIZE Copy Options. The proposal is to add "Maximum Split Size" Source configuration property which will use MAX_FILE_SIZE copy option.  

LIST command returns a list of files that have been staged.

Example of listing the files that match a regular expression (i.e. all file names containing the string data_0) in a named stage (my_csv_stage) with a prefix (/analysis/):

list @my_csv_stage/analysis/ pattern='.*data_0.*'; +--------------------+------+----------------------------------+------------------------------+ | name | size | md5 | last_modified | |--------------------+------+----------------------------------+------------------------------| | employees01.csv.gz | 288 | a851f2cc56138b0cd16cb603a97e74b1 | Tue, 9 Jan 2018 15:31:44 GMT | | employees02.csv.gz | 288 | 125f5645ea500b0fde0cdd5f54029db9 | Tue, 9 Jan 2018 15:31:44 GMT | | employees03.csv.gz | 304 | eafee33d3e62f079a054260503ddb921 | Tue, 9 Jan 2018 15:31:45 GMT | | employees04.csv.gz | 304 | 9984ab077684fbcec93ae37479fa2f4d | Tue, 9 Jan 2018 15:31:44 GMT | | employees05.csv.gz | 304 | 8ad4dc63a095332e158786cb6e8532d0 | Tue, 9 Jan 2018 15:31:44 GMT | +--------------------+------+----------------------------------+------------------------------+

 

Sink Plugin

For the Sink, it's possible to write data to the internal stage files first(according to the File Sizing Recommendations) and then use COPY INTO <table> command.

Example of loading files from a named internal stage into a table:

copy into mytable from @my_int_stage;

 

Data Files can be uploaded directly from a Stream to an Internal Stage:

Connection connection = DriverManager.getConnection(url, prop); File file = new File("/tmp/test.csv"); FileInputStream fileInputStream = new FileInputStream(file); // upload file stream to user stage connection.unwrap(SnowflakeConnection.class).uploadStream("MYSTAGE", "testUploadStream", fileInputStream, "destFile.csv", true);

Files can be staged in one of the following locations:

  • Named internal stage (or table/user stage). Files can be staged using the PUT command.

  • Named external stage that references an external location (AWS S3, Google Cloud Storage, or Microsoft Azure).

  • External location (AWS S3, Google Cloud Storage, or Microsoft Azure).

The proposal is to use an internal stage since files can be uploaded directly using SnowflakeConnection#uploadStream method.

Option 3

Although it is possible to create a Batch Sink Plugin that will accept files' locations and utilize COPY INTO <table> command (or Snowpipe) to load data files into Snowflake, it seems that it's not a good idea:

  • COPY INTO <table> command uses locations of the raw data files, so it won't be possible to use this sink in the same way as other Database Sinks.

  • No transformations can be done on actual data on the CDAP since we operate on file locations and not on the actual data. It is still possible to perform a transformation on the Snowflake side using Transformation Parameters.

Source Properties

Option 1

Section

User Configuration Label

Label Description

Options

Default

Variable

User Widget

Section

User Configuration Label

Label Description

Options

Default

Variable

User Widget

General

Label

Label for UI.

 

 

 

textbox

 

Reference Name

Uniquely identified name for lineage.

 

 

referenceName

textbox

 

Account Name

Full name of Snowflake account.

 

 

accountName

textbox

 

Database

Database name to connect to.

 

 

database

textbox

 

Import Query

Query for import data.

 

 

importQuery

textarea

Credentials

Username

User identity for connecting to the specified database.

 

 

username

textbox

 

Password

Password to use to connect to the specified database.

 

 

password

password

Key Pair Authentication

Key Pair Authentication Enabled

If true, plugin will perform Key Pair authentication.

  • True

  • False

False

keyPairEnabled

toggle

 

Key File Path

Path to the private key file.

 

 

path

textbox

OAuth2

OAuth2 Enabled

If true, plugin will perform OAuth2 authentication.

  • True

  • False

False

oauth2Enabled

toggle

 

Auth URL

Endpoint for the authorization server used to retrieve the authorization code.

 

 

authUrl

textbox

 

Token URL

Endpoint for the resource server, which exchanges the authorization code for an access token.

 

 

tokenUrl

textbox

 

Client ID

Client identifier obtained during the Application registration process.

 

 

clientId

textbox

 

Client Secret

Client secret obtained during the Application registration process.

 

 

clientSecret

password

 

Scopes

Scope of the access request, which might have multiple space-separated values.

 

 

scopes

textbox

 

Refresh Token

Token used to receive accessToken, which is end product of OAuth2.

 

 

refreshToken

textbox

Advanced

Bounding Query

Bounding Query should return the min and max of the values of the 'splitBy' field. For example, 'SELECT MIN(id),MAX(id) FROM table'. Not required if numSplits is set to one.

 

 

boundingQuery

textarea

 

Split-By Field Name

Field Name which will be used to generate splits. Not required if numSplits is set to one.

 

 

splitBy

textbox

 

Number of Splits to Generate

Number of splits to generate.

 

 

numSplits

textbox

 

Connection Arguments

A list of arbitrary string tag/value pairs as connection arguments. See: https://docs.snowflake.net/manuals/user-guide/jdbc-configure.html#jdbc-driver-connection-string

 

 

connectionArguments

keyvalue

Notes:

Option 2

Section

User Configuration Label

Label Description

Options

Default

Variable

User Widget

Section

User Configuration Label

Label Description

Options

Default

Variable

User Widget

General

Label

Label for UI.

 

 

 

textbox

 

Reference Name

Uniquely identified name for lineage.

 

 

referenceName

textbox

 

Account Name

Full name of Snowflake account.

 

 

accountName

textbox

 

Database

Database name to connect to.

 

 

database

textbox

 

Import Query

Query for import data.

 

 

importQuery

textarea

Credentials

Username

User identity for connecting to the specified database.

 

 

username

textbox

 

Password

Password to use to connect to the specified database.

 

 

password

password

Key Pair Authentication

Key Pair Authentication Enabled

If true, plugin will perform Key Pair authentication.

  • True

  • False

False

keyPairEnabled

toggle

 

Key File Path

Path to the private key file.

 

 

path

textbox

OAuth2

OAuth2 Enabled

If true, plugin will perform OAuth2 authentication.

  • True

  • False

False

oauth2Enabled

Created in 2020 by Google Inc.