The Multiple Database Tables batch source plugin is available in the Hub.

Plugin version: 1.4.0

Reads from multiple tables within a database using JDBC. This source is often used with the Google Cloud Storage Multi Files Sink and Google BigQuery Multi Table Sink.

The source outputs a record for each row in the tables it reads, with each record containing an additional field that holds the name of the table the record came from. In addition, for each table it reads, this plugin sets pipeline arguments where the key is multisink.[tablename] and the value is the schema of the table. This makes it work with the Google Cloud Storage Multi Files Sink and Google BigQuery Multi Table Sink.

note

Note: Although you can change table names with runtime arguments and the JavaScript transformation, you cannot view lineage for the pipeline if you do this. It’s recommended to avoid changing table names in plugins downstream from the source plugins.

Note: Although you can change table names with runtime arguments and the JavaScript transformation, you cannot view lineage for the pipeline if you do this. It’s recommended to avoid changing table names in plugins downstream from the source plugins.

Configuration

Property

Macro Enabled?

Version Introduced

Description

Reference Name

No

Required. Name used to uniquely identify this source for lineage, annotating metadata, etc.

JDBC Connection String

Yes

Required. The JDBC connection string to the database. For example: jdbc:mysql://HOST/DATABASE.

JDBC Plugin Name

No

Optional. The name of the JDBC plugin to use.

Database User Name

Yes

Optional. The username to use when connecting to the database.

Database User Password

Yes

Optional. The password to use when connecting to the database.

Data Selection Mode

Yes

The operation mode for this plugin. Select one of the following:

  • Table Allow List : Define the list of database tables to include.

  • Table Block List: Define the list of database tables to exclude.

  • Custom SQL Statements: Manually define each of the SQL Statement to execute.

Depending on the selected Data Selection Mode, the properties listed in the rest of the table will be available.

Schema Name Pattern

Yes

Optional. A pattern that defines which schemas should be used to list the tables. Any schema whose name matches the pattern will read. If not specified, all schema will be read. Pattern syntax is specific to the type of database that is being connected to.

Table Name Pattern

Yes

Optional. A pattern that defines which tables should be read from. Any table whose name matches the pattern will read. If not specified, all tables will be read. Pattern syntax is specific to the type of database that is being connected to.

Where Clause

Yes

Optional. Filters which records needs to be consumed from each table: i.e. where updated_at > '2018-08-20 00:00:00'. The where clause will be applied to every table that is being read. Therefore, all the columns that are mentioned in the where clause should be present in each table.

Allow List of Table Names

Yes

Optional. Used in conjunction with Table Name Pattern, this configuration specifies tables to be read. If no value is specified in the white list, all tables matching the Table Name Pattern will be read. By default, reads all tables matching the Table Name Pattern.

Block List of Table Names

Yes

Optional. Used in conjunction with Table Name Pattern, this configuration specifies the tables to be skipped. By default, the black list is empty which means no tables will be skipped.

SQL Statements

Yes

Optional. List of SQL statements to execute. Each statement will be handled as a different partition. When submitting this statements using the API, use a semicolon ; as a separator. If the SQL statement includes a semicolon ; character, you will need to escape it using \;. This option is only displayed when the Data Selection Mode is SQL Statements.

Table Aliases

Yes

Optional. List of aliases to use for the datasets generated by the supplied SQL statements. If supplied, the Table Alias for each SQL statement will be used as the Table Name value for each record. The SQL statements will be matched to a Table Alias based on the order in which they appear on the list. This option is only displayed when the Data Selection Mode is SQL Statements.

Table Name Field

No

Optional. The name of the field that holds the table name. Must not be the name of any table column that will be read.

Default is ‘tablename’.

Enable Auto Commit

Yes

Optional. Whether to enable auto-commit for queries run by this source. In most cases, set to NO. If you use a JDBC driver that results in an error when the commit operation is run, set to YES.

Default is NO.

Splits Per Table

Yes

Optional. The number of splits per table.

This option is only displayed when the Data Selection Mode is either Allow List ot Tables or Block List of Tables.

Default is 1.

Fetch Size

Yes

1.3.2

The number of rows to fetch at a time per split. Larger fetch size can result in faster import, with the tradeoff of higher memory usage.

Default is 1000.

Query Timeout (Seconds)

No

Optional. The query timeout in seconds.

Transaction Isolation Level

Yes

Optional. The transaction isolation level for queries run by this sink. For more information, see https://docs.oracle.com/javadb/10.8.3.0/ref/rrefjavcsti.html. The Phoenix JDBC driver will throw an exception if the Phoenix database does not have transactions enabled and this setting is set to true. For drivers like that, this should be set to TRANSACTION_NONE.

Default is TRANSACTION_SERIALIZABLE.

Error Handling Mode

No

Optional. How to handle error handling.

Default is Fail pipeline.

Custom SQL Statements

When using the Data Selection Mode called SQL Statements, the supplied list of SQL statements will be executed as supplied using the specified database connection.

If the query contains a semicolon ; character as part of the query, this character must be escaped using a backslash \;.

Every record that is generated will have a Table Name field (defined by the Table Name Field property, which defaults to tablename) that identifies the source of each record.

If a Table Alias for a given SQL statement is specified, this value will be returned as the Table Name for the records generated using this SQL statement.

It a Table Alias for a given statement is not provided, the Table Names will be generated using a best-guess approach based on the JDBC API’s Result Set Metadata. Please note that not all JDBC drivers offer this functionality. If multiple tables are used in one statement, the resulting table name will be a concatenation of all the distinct table names present in the returned rows for this table, in order of first appearance.

If the Table Name(s) cannot be derived from the Result Set Metadata (because the JDBC driver doesn’t support this functionality, for example), the derived Table Name attribute will be in the pattern of sql_statement_<number> where the Number is the position of the SQL statement in the list of statements (starting at position 1).

See the Derived Table Name Examples section for more details.

Example

This example reads from all tables in the ‘customers’ database on host ‘host123.example.net’:

Property

Value

Reference Name

src1

JDBC Connection String

jdbc:mysql://host123.example.net/customers

JDBC Plugin Name

mysql

Splits Per Table

2

Suppose you have two tables in the ‘customers’ database, where ID column is the primary key in both tables. The first table is named ‘accounts’ and contains:

ID

name

email

0

Samuel

sjax@example.net

1

Alice

a@example.net

2

Bob

b@example.net

3

John

j@example.net

The second is named ‘activity’ and contains:

ID

userid

item

action

0

0

shirt123

view

1

0

carxyz

view

2

0

shirt123

buy

3

0

coffee

view

4

1

cola

buy

5

1

pepsi

buy

You will have 4 splits (2 per each table) with such queries:

SELECT * FROM accounts WHERE ( ID >= 0 ) AND ( ID < 1 )
SELECT * FROM accounts WHERE ( ID >= 2 ) AND ( ID <= 3 )
SELECT * FROM activity WHERE ( ID >= 0 ) AND ( ID < 3 )
SELECT * FROM activity WHERE ( ID >= 3 ) AND ( ID <= 5 )

The output of the the source will be the following records:

ID

name

email

tablename

0

Samuel

sjax@example.net

accounts

1

Alice

a@example.net

accounts

2

Bob

b@example.net

accounts

3

John

j@example.net

accounts

ID

userid

item

action

tablename

0

0

shirt123

view

activity

1

0

carxyz

view

activity

2

0

shirt123

buy

activity

3

0

coffee

view

activity

4

1

cola

buy

activity

5

1

pepsi

buy

activity

Derived Table Names

Only one table per query

Here are some example tables and the derived table names:

The resulting records for the following query:

SELECT 
  users.id, 
  users.name 
FROM 
  users 
WHERE 
  users.name = 'John'

Will have the table name users.

Joined tables

If the query joins multiple tables, the order in which the columns are returned defines the derived table name:

The resulting records for the following query:

SELECT 
  u.id, 
  c.id 
FROM 
  users u JOIN 
  comments c ON u.id = c.user_id 
WHERE 
  u.name = 'John'

Will have the table name users_comments. However, the resulting records for the following query:

SELECT 
  c.id, 
  u.id 
FROM 
  users u JOIN 
  comments c ON u.id = c.user_id 
WHERE 
  u.name = 'John'

Will have the table name comments_users as the first returned column comes from the comments table.

Note that, as mentioned earlier, the table names are concatenated in order of first appearance, without duplicates:

SELECT 
   u.id, 
   s.upvotes, 
   s.downvotes, 
   c.id, 
   u.username,
   c.timestamp,
   s.id
FROM 
   users u JOIN 
   comments c ON u.id = c.user_id JOIN 
   scores s ON s.comment_id = c.id
WHERE 
  u.name = 'John'

Will have the table name users_scores_comments.

If table names cannot be derived

There are instances where a table name cannot be derived. In this case, table names will follow the following pattern:

sql_statement_

The <number> in this statement will be the position of this query in the list of statements, starting with position 1.

Examples

For example, if the JDBC driver is able to provide the Table Name from a SQL statement ResultSet, the derived table name will look as follows:

SQL Statement

Table Alias

JDBC Driver support

Derived Table Name

SELECT * FROM table1

my_alias_1

Yes

my_alias_1

SELECT * FROM table2

<empty>

Yes

table2

SELECT * FROM users JOIN orders ON …

<empty?

Yes

users_orders

In case the JDBC driver is NOT able to provide the Table Name from a SQL statement ResultSet, the derived table name will look as follows:

SQL Statement

Table Alias

JDBC Driver support

Derived Table Name

SELECT * FROM table1

my_alias_1

No

my_alias_1

SELECT * FROM table2

<empty>

No

sql_statement_2

SELECT * FROM users JOIN orders ON …

<empty?

No

sql_statement_3