Change Data Capture

Change Data Capture

Introduction 

One common use case is that of a user running a relational database with multiple tables. They would like to create copies of those tables in a data warehouse like BigQuery in a single, simple operation. All existing data should be copied first, then new changes (inserts, updates, deletes) that are applied to the relational db tables should be reflected in the BigQuery tables within minutes. Newly created tables in the relational db should automatically appear in BigQuery. Tables that are deleted in the relational db should be delete in BigQuery. Compatible schema changes should also be reflected.

Pipelines are usually not suitable for these types of use cases, which more closely resemble replication than incremental loads. It is possible to incrementally load data from a single table to a single BigQuery table if the table never has deletes or updates and has a monotonically increasing column. Most users do not have a write pattern like this, so a better solution is required. 

Goals

Design a way for users to easily create a continuously updating copy of their existing data.

Terminology

  • Replicator - A program that reads changes applied to some source storage system and applies – or replicates -- them to a target storage system 

  • Source - The storage system to replicate from. The first version focuses on relational databases.

  • Target - The storage system to replicate to. The first version focuses on data warehouses like BigQuery, Redshift, or Snowflake.

  • DDL event - An event involving a structure in the source, such as the creation, alteration, or deletion of a table

  • DML event - An event involving data in the source, such as the insertion, update, or deletion of a row in a table

User Stories

  1. As a data admin, I want to be able to replicate data from Oracle, MySQL, or SQL Server

  2. As a data admin, I want to be able to replicate data into BigQuery, Spanner, Redshift, or Snowflake

  3. As a data admin, I want to have an SLO to know that X% of the time, my data is replicated within Y minutes

  4. As a data admin, if an event failed to replicate for any reason, I want the replicator to retry for a configurable amount of time before stopping the replicator

  5. As a data admin, I want to know how many times the replicator failed to replicate an event

  6. As a data admin, I do not want any events to be lost even if the replicator crashes

  7. As a data admin, I do not want duplicate data in the target even if the replicator crashes

  8. As a data admin, I want to be able to tell how far behind my target tables are compared to my source tables

  9. As a data admin, I want to have some metrics around how quickly events are being replicated

  10. As a data admin, I want to be able to pause and resume a replicator

  11. As a data admin, I want to be able to delete a replicator

  12. As a data admin, I want to be able to select a subset of source tables to replicate to my target

  13. As a data admin, I want supported DDL events to be replicated to my destination system

  14. As a data admin, I want to be able to see logs about my replicator in case there are issues (out of memory, permissions errors, etc)

  15. As a data admin, I want to be able to find documentation about what type of database setup I need to perform on my source database

  16. As a data admin, I want to be able to test that my replicator is correctly configured before running it

  17. As a data admin, I want to track field level lineage for table that were replicated

Design

Approach

At a high level, replicators are implemented by a new CDAP application that define new 'DeltaSource' and 'DeltaTarget' plugin interfaces.

A DeltaSource is responsible for reading change events from a database and translating them into an ordered sequence of standard DDLEvents and DMLEvents. Sources begin by taking a snapshot of the current state of the database, then begin consuming change events from that moment on. Each event contains an Offset, which is a monotonically increasing and unique (at least within a single replicator). Given an offset, the source must be able to start reading events from that offset. 

A DeltaTarget is responsible for taking the ordered sequence of events and replicating them to a storage system, as well as telling the app that it has finished replicating an event, allowing the app to store the offset for that event. Events will be send to a target exactly once during normal operation, but can be sent at least once in error scenarios. Once an offset has been successfully persisted, events prior to that number will never be seen again.

Change events are represented as:

class DDLEvent { Offset offset; long sequenceNumber; String transactionId; boolean isSnapshot; DDLOperation operation; // "CREATE_DATABASE" | "DROP_DATABASE" | "CREATE_TABLE" | "DROP_TABLE" | "TRUNCATE_TABLE" | "ALTER_TABLE" | "RENAME_TABLE" Schema schema; String database; String prevTable; // used by renames String table; List<String> primaryKey; } class DMLEvent { Offset offset; long sequenceNumber; String transactionId; boolean isSnapshot; DMLOperation operation; // "INSERT" | "DELETE" | "UPDATE" | "COMMIT" , String database; String table; StructuredRecord before; // null unless operation is "UPDATE" or "DELETE" StructuredRecord row; } interface Offset { // serialize the offset fields into the DataOutput void write(DataOutput out) throws IOException; // deserialize offset fields from the DataInput void readFields(DataInput in) throws IOException; }

Each DeltaSource is responsible for defining it's own Offset implementation. This is because different sources require different information to know where to start reading from. For example, MySQL offsets correspond to a binlog file name and a position within that file. SQL Server offsets correspond to a change tracking sequence number for the database.

The sequence number is a monotonically increasing number generated by the application, equal to the number of changes emitted by the source. Sources are only responsible for attaching an Offset to each Event they emit. The application will then generate a sequence number and attach it to the event before sending it to the target. This is done because a monotonically increasing number makes it much easier for targets to implement their logic in an idempotent way, which is required to correctly handle failure scenarios. In addition, the sequence number is used as a gauge metric to track progress being made by the replicator. 

Examples

With Primary Key

Source

In this example, suppose the following queries are run on the source database:

CREATE DATABASE myDB; CREATE TABLE customers (id int, name varchar(50), PRIMARY KEY (id)); INSERT INTO customers (id, name) VALUES (0, 'alice'); UPDATE customers set id=1 where id=0; UPDATE customers set id=2 where id=1; DELETE FROM customers where id=2; INSERT into customers (id, name) VALUES (0, 'Alice'), (1, 'blob'); UPDATE customers set name='Bob' where id='1';

The source generates the following DDL events:

offset

operation

database

table

schema

primary key

offset

operation

database

table

schema

primary key

<binlog:mysql-bin.000003, pos:1424>

CREATE_DATABASE

myDB

 

 

 

<binlog:mysql-bin.000003, pos:1462>

CREATE_TABLE

myDB

customers

id int, name varchar(50)

id

followed by the following DML events:

offset

operation

database

table

before

row

offset

operation

database

table

before

row

<binlog:mysql-bin.000003, pos:1462>

INSERT

myDB

customers

 

<id:0, name:alice>

<binlog:mysql-bin.000003, pos:1482>

UPDATE

myDB

customers

<id:0, name:alice>

<id:1, name:alice>

<binlog:mysql-bin.000003, pos:1493>

UPDATE

myDB

customers

<id:1, name:alice>

<id:2, name:alice>

<binlog:mysql-bin.000003, pos:1519>

DELETE

myDB

customers

<id:2, name:alice>

<id:2>

<binlog:mysql-bin.000003, pos:1538>

INSERT

myDB

customers

 

<id:0, name:Alice>

<binlog:mysql-bin.000003, pos:1557>

INSERT

myDB

customers

 

<id:1, name:blob>

<binlog:mysql-bin.000003, pos:1598>

UPDATE

myDB

customers

<id:1, name:blob>

<id:1, name:Bob>

Sequence number is attached by the application, the source is only responsible for attaching an offset to each event and defining how to compare offsets. In the MySQL case, offsets are compared by filename first, then position within the file.

Target

The BigQuery target batches DML events together and writes a batch of events to GCS. Once in GCS, it runs a BigQuery load job to load the changes into a staging table. Finally, it runs a Merge query to merge events from the staging table into the actual target table. Once that is complete, it persists the latest sequence number of events contained in the batch. DDL events are not batched together.

For event #0, the target creates a BQ dataset named 'myDB'. Since it must assume the event occurs at least once, it checks if the dataset exists before creating it. After creating the dataset, the target calls a method that tells the application to remember that the event was replicated. The application stores the offset and sequence number for that event.

For event #1, a staging table '_staging_customers' is created that records values for the row before and after the change along with 3 extra columns – batchId, sequenceNum, and operation. Batch id is just the current timestamp of the load job. The table is partitioned on batchId and clustered on sequenceNum. This allows efficiently selecting data for a specific batchId while ordering by sequenceNum. Note that a BQ 'integer' is 8 bytes, equivalent to a Java long.

_batch_id (timestamp)

_sequence_num (integer)

_operation (string)

_before_id (integer)

_before_name (string)

id (integer)

name (string)

_batch_id (timestamp)

_sequence_num (integer)

_operation (string)

_before_id (integer)

_before_name (string)

id (integer)

name (string)

 

 

 

 

 

 

 

the actual target table 'customers' is also created with the same schema as the source table, except with the sequence number as an additional column:

_sequence_num (integer)

id (integer)

name (string)

_sequence_num (integer)

id (integer)

name (string)

 

 

 

For events #2-7, the target may decide to batch them together in different ways. Supposing they all get batched together, after the load job, the staging table looks like:

_batch_id

_sequence_num

_operation

_before_id

_before_name

id

name

_batch_id

_sequence_num

_operation

_before_id

_before_name

id

name

1234567890

2

INSERT

 

 

0

alice

1234567890

3

UPDATE

0

alice

1

alice

1234567890

4

UPDATE

1

alice

2

alice

1234567890

5

DELETE

2

alice

2

 

1234567890

6

INSERT

 

 

0

Alice

1234567890

7

INSERT

 

 

1

blob

1234567890

8

UPDATE

1

blob

1

Bob

A merge query is then run to merge changes from the staging table into the final target table:

MERGE myDB.customers as T USING ($DIFF_QUERY) as D ON T.id = D._before_id WHEN MATCHED AND D._op = "DELETE DELETE WHEN MATCHED AND D._op IN ("INSERT", "UPDATE") UPDATE id = D.id, name = D.name WHEN NOT MATCHED AND D._op IN ("INSERT", "UPDATE") INSERT (id, name) VALUES (id, name)

 Where the $DIFF_QUERY is:

SELECT A.* FROM (SELECT * FROM myDB._staging_customers WHERE _batch_id = 1234567890 AND _sequence_num > $LATEST_APPLIED) as A LEFT OUTER JOIN (SELECT * FROM myDB._staging_customers WHERE _batch_id = 1234567890 AND _sequence_num > $LATEST_APPLIED) as B ON A.id = B._before_id AND A._sequence_num < B._sequence_num WHERE B._before_id IS NULL

The diff query is responsible for getting the latest change for each primary key. With the example above, it results in:

_batch_id

_sequence_num

_operation

_before_id

_before_name

id

name

_batch_id

_sequence_num

_operation

_before_id

_before_name

id

name

1234567890

5

DELETE

2

alice

2

 

1234567890

6

INSERT

 

 

0

Alice

1234567890

8

UPDATE

 

 

1

Bob

The $LATEST_APPLIED variable is the max sequence number seen in the target table. This is required to ensure idempotency – events that are replayed should not be re-inserted into the final target table. The latest applied sequence number can be tracked in memory by the target, except for the first time it sees the table, where it will need to run a SELECT MAX(_sequence_num) query. 

Note: When there is a primary key, it is possible to implement the target in such a way where it doesn't need the additional sequence number column, exactly matching the source schema. However, this complicates the idempotency story, as the target would need to ensure that load and merge jobs are not run on data that was previously seen, requiring more complicated logic around using specific GCS object names and BQ load job ids.

Without Primary Key

If no primary key exists, a very similar set of steps occurs, except BigQuery will use all of the columns as the "primary key". 

Note: SQL Server doesn't allow enabling change tracking on a table without a primary key.

Source

Suppose the following queries are run on the source database:

CREATE DATABASE myDB; CREATE TABLE customers (name varchar(50)); INSERT INTO customers (name) VALUES ('alice', 'alice', 'Bob'); UPDATE customers SET name = 'Alyce' WHERE name = 'alice'; UPDATE customers SET name = 'Alice' WHERE name = 'Alyce'; DELETE FROM customers WHERE name = 'alice';

The source generates the following DDL events:

offset

operation

database

table

schema

primary key

offset

operation

database

table

schema

primary key

<binlog:mysql-bin.000003, pos:1424>

CREATE_DATABASE

myDB

 

 

 

<binlog:mysql-bin.000003, pos:1462>

CREATE_TABLE

myDB

customers

name varchar(50)

 

followed by the following DML events:

offset

transaction id

operation

database

table

before

row

offset

transaction id

operation

database

table

before

row

<binlog:mysql-bin.000003, pos:1462>

0

INSERT

myDB

customers

 

<name:alice>

<binlog:mysql-bin.000003, pos:1482>

0

INSERT

myDB

customers

 

<name:alice>

<binlog:mysql-bin.000003, pos:1493>

0

INSERT

myDB

customers

 

<name:Bob>

<binlog:mysql-bin.000003, pos:1519>

1

UPDATE

myDB

customers

<name:alice>

<name:Alyce>

<binlog:mysql-bin.000003, pos:1538>

1

UPDATE

myDB

customers

<name:alice>

<name:Alyce>

<binlog:mysql-bin.000003, pos:1557>

2

UPDATE

myDB

customers

<name:Alyce>

<name:Alice>

<binlog:mysql-bin.000003, pos:1598>

2

UPDATE

myDB

customers

<name:Alyce>

<name:Alice>

<binlog:mysql-bin.000003, pos:1603>

3

DELETE

myDB

customers

<name:Alice>

<name:Alice>

<binlog:mysql-bin.000003, pos:1605>

3

DELETE

myDB

customers

<name:Alice>

<name:Alice>

Target

The BigQuery target loads the DML events into a staging table

 

_batch_id

_sequence_num

_operation

_before_name

_batch_id

_sequence_num

_operation

_before_name

Created in 2020 by Google Inc.