Peforming upserts on a PostgreSQL Database

The Database sink, PostgreSQL sink, and CloudSQL PostgreSQL sink all perform inserts when writing data to the target database.

If you want to upsert data instead of inserting data, create a pipeline that uses a PostgreSQL sink as a staging table and then add a PostgreSQL Execute action plugin after the sink. The PostgreSQL Execute performs the upsert from the staging table to the final table.

For example, the following pipeline reads data from a file stored in Cloud Storage, performs some data quality checks in Wrangler, inserts the data into the PostgreSQL staging table, upserts the data from the staging table to the final table, and then truncates the staging table. It’s important to truncate the staging table after the upsert completes to ensure the upsert completes successfully in subsequent runs.

The Database Command in the PostgreSQL Execute action looks like:

BEGIN;
INSERT INTO final_table (id, date)
SELECT id, date FROM staging_table
ON CONFLICT (id) DO UPDATE
SET date = excluded.date;
TRUNCATE staging_table;
COMMIT;

Tip: You can also add a PostgreSQL Execute action plugin before the source and configure the Database Command to truncate the staging table before the pipeline reads data from the source. You still need the PostgreSQL Execute action after the sink to perform the upsert from the staging table to the final table.

Created in 2020 by Google Inc.