Google BigQuery Multi Table Sink
Plugin version: 0.22.0
Writes to multiple BigQuery tables. BigQuery is Google's serverless, highly scalable, enterprise data warehouse. Data is first written to a temporary location on Google Cloud Storage, and then loaded into BigQuery from there.
The plugin expects that the tables it needs to write to will be set as pipeline properties, where the key is multisink.[TABLE_NAME]
and the value is the table schema in Avro format.
Note: Although you can change table names with runtime arguments and the JavaScript transformation, you cannot view lineage for the pipeline if you do this. It’s recommended to avoid changing table names in plugins downstream from the source plugins.
Credentials
If the plugin is run on a Google Cloud Dataproc cluster, the service account key does not need to be provided and can be set to 'auto-detect'. Credentials will be automatically read from the cluster environment.
If the plugin is not run on a Dataproc cluster, the path to a service account key must be provided. The service account key can be found on the Dashboard in the Cloud Platform Console. Make sure the account key has permission to access BigQuery and Google Cloud Storage. The service account key file needs to be available on every node in your cluster and must be readable by all users running the job.
Configuration
Property | Macro Enabled? | Version Introduced | Description |
---|---|---|---|
Use Connection | No | 6.7.0/0.20.0 | Optional. Whether to use a connection. If a connection is used, you do not need to provide the credentials. |
Connection | Yes | 6.7.0/0.20.0 | Optional. Name of the connection to use. Project and service account information will be provided by the connection. You can also use the macro function |
Project ID | Yes |
| Optional. Google Cloud Project ID, which uniquely identifies a project. It can be found on the Dashboard in the Google Cloud Platform Console. This is the project that the BigQuery job will run in. Default is auto-detect. |
Dataset Project ID | Yes |
| Optional. Project the dataset belongs to. This is only required if the dataset is not in the same project that the BigQuery job will run in. If no value is given, it will default to the configured Project ID. |
Service Account Type | Yes | 6.3.0/0.16.0 | Optional. Service account key used for authorization. Select File Path or JSON. If you select File Path, enter the Service Account File Path. If you select JSON, enter the JSON Path. Default is Select File Path. |
Service Account File Path | Yes |
| Optional. Path on the local file system of the service account key used for authorization. Can be set to 'auto-detect' when running on a Dataproc cluster. When running on other clusters, the file must be present on every node in the cluster. Default is auto-detect. |
Service Account JSON | Yes | 6.3.0/0.16.0 | Optional. Contents of the service account JSON file. |
Reference Name | No |
| Required. Name used to uniquely identify this sink for lineage, annotating metadata, etc. |
Dataset | Yes |
| Required. Dataset the tables belongs to. A dataset is contained within a specific project. Datasets are top-level containers that are used to organize and control access to tables and views. If dataset does not exist, it will be created. |
Truncate Table | Yes |
| Optional. Whether or not to truncate the table before writing to it. Default is False. |
Temporary Bucket Name | Yes |
| Optional. Google Cloud Storage bucket to store temporary data in. It will be automatically created if it does not exist, but will not be automatically deleted. Temporary data will be deleted after it is loaded into BigQuery. If it is not provided, a unique bucket will be created and then deleted after the run finishes. Syntax: |
GCS Upload Request Chunk Size | Yes |
| Optional. GCS upload request chunk size in bytes. Default is 8388608 bytes. |
Split Field | Yes |
| Optional. The name of the field that will be used to determine which table to write to. |
Allow flexible schemas in Output | Yes |
| Optional. When enabled, this sink will write out records with arbitrary schemas. Records may not have a well defined schema depending on the source. When disabled, table schemas must be passed in pipeline arguments. |
Update Table Schema | Yes |
| Optional. Whether the BigQuery table schema should be modified when it does not match the schema expected by the pipeline.
Compatible changes fall under the following categories:
Incompatible schema changes will result in pipeline failure. Default is False. |
Location | Yes |
| Optional. The location where the big query datasets will get created. This value is ignored if the dataset or temporary bucket already exist. Default is US. |
Encryption Key Name | Yes | 6.5.1/0.18.1 | Optional. The GCP customer managed encryption key (CMEK) used to encrypt data written to any bucket, dataset, or table created by the plugin. More information can be found here. |
Example
Suppose the input records are:
id | name | tablename | |
---|---|---|---|
0 | Samuel | sjax@example.net | accounts |
1 | Alice | a@example.net | accounts |
userid | item | action | tablename |
---|---|---|---|
0 | shirt123 | view | activity |
0 | carxyz | view | activity |
0 | shirt123 | buy | activity |
0 | coffee | view | activity |
1 | cola | buy | activity |
The plugin will expect two pipeline arguments to tell it to write the first two records to an accounts
table and others records to an activity
table.
Pipeline runtime arguments with schema in Avro format:
Key: multisink.accounts
Value:
{
"name": "accounts",
"type": "record",
"fields": [
{"name": "id", "type": "long" },
{ "name": "name", "type": "string"},
{ "name": "email", "type": "string"}
]
}
Troubleshooting
Missing permission to create a temporary bucket If your pipeline failed with the following error in the log:
com.google.api.client.googleapis.json.GoogleJsonResponseException: 403 Forbidden
POST https://storage.googleapis.com/storage/v1/b?project=projectId&projection=full
{
"code" : 403,
"errors" : [ {
"domain" : "global",
"message" : "xxxxxxxxxxxx-compute@developer.gserviceaccount.com does not have storage.buckets.create access to the Google Cloud project.",
"reason" : "forbidden"
} ],
"message" : "xxxxxxxxxxxx-compute@developer.gserviceaccount.com does not have storage.buckets.create access to the Google Cloud project."
}
`xxxxxxxxxxxx-compute@developer.gserviceaccount.comis the service account you specified in this plugin. This means the temporary bucket you specified in this plugin doesn't exist. CDF/CDAP is trying to create the temporary bucket, but the specified service account doesn't have the permission. You must grant "GCE Storage Bucket Admin" role on the project identified by the
Project IDyou specified in this plugin to the service account. If you think you already granted the role, check if you granted the role to the wrong project (for example the one identified by the
Dataset Project ID`).
Missing permission to run BigQuery jobs
If your pipeline failed with the following error in the log:
POST https://bigquery.googleapis.com/bigquery/v2/projects/xxxx/jobs
{
"code" : 403,
"errors" : [ {
"domain" : "global",
"message" : "Access Denied: Project xxxx: User does not have bigquery.jobs.create permission in project xxxx",
"reason" : "accessDenied"
} ],
"message" : "Access Denied: Project xxxx: User does not have bigquery.jobs.create permission in project xxxx.",
"status" : "PERMISSION_DENIED"
}
xxxx
is the Project ID
you specified in this plugin. This means the specified service account doesn’t have the permission to run BigQuery jobs. You must grant “BigQuery Job User” role on the project identified by the Project ID
you specified in this plugin to the service account. If you think you already granted the role, check if you granted the role on the wrong project (for example the one identified by the Dataset Project ID
).
Missing permission to create the BigQuery dataset
If your pipeline failed with the following error in the log:
POST https://www.googleapis.com/bigquery/v2/projects/xxxx/datasets?prettyPrint=false
{
"code" : 403,
"errors" : [ {
"domain" : "global",
"message" : "Access Denied: Project xxxx: User does not have bigquery.datasets.create permission in project xxxx.",
"reason" : "accessDenied"
} ],
"message" : "Access Denied: Project xxxx: User does not have bigquery.datasets.create permission in project xxxx.",
"status" : "PERMISSION_DENIED"
}
xxxx
is the Dataset Project ID
you specified in this plugin. This means the dataset specified in this plugin doesn’t exist. CDF/CDAP is trying to create the dataset but the service account you specified in this plugin doesn’t have the permission. You must grant “BigQuery Data Editor” role on the project identified by the Dataset Project ID
you specified in this plugin to the service account. If you think you already granted the role, check if you granted the role on the wrong project (for example the one identified by the Project ID
).
Missing permission to create the BigQuery table If your pipeline failed with the following error in the log:
POST https://bigquery.googleapis.com/bigquery/v2/projects/xxxx/jobs
{
"code" : 403,
"errors" : [ {
"domain" : "global",
"message" : "Access Denied: Dataset xxxx:mysql_bq_perm: Permission bigquery.tables.create denied on dataset xxxx:mysql_bq_perm (or it may not exist).",
"reason" : "accessDenied"
} ],
"message" : "Access Denied: Dataset xxxx:mysql_bq_perm: Permission bigquery.tables.create denied on dataset xxxx:mysql_bq_perm (or it may not exist).",
"status" : "PERMISSION_DENIED"
}
xxxx
is the Dataset Project ID
you specified in this plugin. This means the table specified in this plugin doesn’t exist. CDF/CDAP is trying to create the table but the service account you specified in this plugin doesn’t have the permission. You must grant “BigQuery Data Editor” role on the project identified by the Dataset Project ID
you specified in this plugin to the service account. If you think you already granted the role, check if you granted the role on the wrong project (for example the one identified by the Project ID
).
Missing permission to read the BigQuery dataset If your pipeline failed with the following error in the log:
com.google.api.client.googleapis.json.GoogleJsonResponseException: 403 Forbidden
GET https://www.googleapis.com/bigquery/v2/projects/xxxx/datasets/mysql_bq_perm?prettyPrint=false
{
"code" : 403,
"errors" : [ {
"domain" : "global",
"message" : "Access Denied: Dataset xxxx:mysql_bq_perm: Permission bigquery.datasets.get denied on dataset xxxx:mysql_bq_perm (or it may not exist).",
"reason" : "accessDenied"
} ],
"message" : "Access Denied: Dataset xxxx:mysql_bq_perm: Permission bigquery.datasets.get denied on dataset xxxx:mysql_bq_perm (or it may not exist).",
"status" : "PERMISSION_DENIED"
}
xxxx
is the Dataset Project ID
you specified in this plugin. The service account you specified in this plugin doesn’t have the permission to read the dataset you specified in this plugin. You must grant “BigQuery Data Editor” role on the project identified by the Dataset Project ID
you specified in this plugin to the service account. If you think you already granted the role, check if you granted the role on the wrong project (for example the one identified by the Project ID
).
Data Type Mappings from CDAP to BigQuery
The following table lists CDAP data types and the corresponding BigQuery data types for updates and upserts.
For inserts, the type conversions are the same as those used in loading Avro data to BigQuery. For more information, see Avro conversions.
Note: Support for the datetime data type was introduced in CDAP 6.4.0.
CDAP Schema Data Type | BigQuery Data Type |
---|---|
array | repeated |
boolean | bool |
bytes | bytes |
date | date |
datetime | datetime, string |
decimal | numeric, bignumeric Note: Support for bignumeric was added in CDAP 6.7.0. |
double / float | float64 |
enum | unsupported |
int / long | int64 |
map | unsupported |
record | struct |
string | string, datetime (ISO 8601 format) |
time | time |
timestamp | timestamp |
union | unsupported |
For more information on BigQuery data types, see Standard SQL Data Types.
Related content
Created in 2020 by Google Inc.