Change Data Capture
- 1 Introduction
- 2 Goals
- 3 Terminology
- 4 User Stories
- 5 Design
- 5.1 Approach
- 5.1.1 Examples
- 5.1.1.1 With Primary Key
- 5.1.1.2 Without Primary Key
- 5.1.1 Examples
- 5.2 Transactions
- 5.2.1 Example
- 5.3 Failure Handling
- 5.3.1 Offset/State failures
- 5.3.2 Source/Target failures
- 5.4 Config API
- 5.5 Program Type
- 5.6 Lineage
- 5.7 Metrics
- 5.8 Validation
- 5.9 Why not Pipelines?
- 5.1 Approach
- 6 API changes
- 6.1 New Programmatic APIs
- 6.1.1 Plugin Context
- 6.1.2 DeltaSource
- 6.1.2.1 Example
- 6.1.3 DeltaTarget
- 6.1.3.1 Example
- 6.2 System Service
- 6.2.1 REST APIs
- 6.2.2 Data Model
- 6.3 Deprecated Programmatic APIs
- 6.4 New REST APIs
- 6.5 Deprecated REST API
- 6.6 CLI Impact or Changes
- 6.1 New Programmatic APIs
- 7 UI Impact or Changes
- 8 Security Impact
- 9 Impact on Infrastructure Outages
- 10 Test Scenarios
- 11 Releases
- 11.1 Release X.Y.Z
- 11.2 Release X.Y.Z
- 12 Related Work
- 13 Future work
Checklist
Introduction
One common use case is that of a user running a relational database with multiple tables. They would like to create copies of those tables in a data warehouse like BigQuery in a single, simple operation. All existing data should be copied first, then new changes (inserts, updates, deletes) that are applied to the relational db tables should be reflected in the BigQuery tables within minutes. Newly created tables in the relational db should automatically appear in BigQuery. Tables that are deleted in the relational db should be delete in BigQuery. Compatible schema changes should also be reflected.
Pipelines are usually not suitable for these types of use cases, which more closely resemble replication than incremental loads. It is possible to incrementally load data from a single table to a single BigQuery table if the table never has deletes or updates and has a monotonically increasing column. Most users do not have a write pattern like this, so a better solution is required.
Goals
Design a way for users to easily create a continuously updating copy of their existing data.
Terminology
Replicator - A program that reads changes applied to some source storage system and applies – or replicates -- them to a target storage system
Source - The storage system to replicate from. The first version focuses on relational databases.
Target - The storage system to replicate to. The first version focuses on data warehouses like BigQuery, Redshift, or Snowflake.
DDL event - An event involving a structure in the source, such as the creation, alteration, or deletion of a table
DML event - An event involving data in the source, such as the insertion, update, or deletion of a row in a table
User Stories
As a data admin, I want to be able to replicate data from Oracle, MySQL, or SQL Server
As a data admin, I want to be able to replicate data into BigQuery, Spanner, Redshift, or Snowflake
As a data admin, I want to have an SLO to know that X% of the time, my data is replicated within Y minutes
As a data admin, if an event failed to replicate for any reason, I want the replicator to retry for a configurable amount of time before stopping the replicator
As a data admin, I want to know how many times the replicator failed to replicate an event
As a data admin, I do not want any events to be lost even if the replicator crashes
As a data admin, I do not want duplicate data in the target even if the replicator crashes
As a data admin, I want to be able to tell how far behind my target tables are compared to my source tables
As a data admin, I want to have some metrics around how quickly events are being replicated
As a data admin, I want to be able to pause and resume a replicator
As a data admin, I want to be able to delete a replicator
As a data admin, I want to be able to select a subset of source tables to replicate to my target
As a data admin, I want supported DDL events to be replicated to my destination system
As a data admin, I want to be able to see logs about my replicator in case there are issues (out of memory, permissions errors, etc)
As a data admin, I want to be able to find documentation about what type of database setup I need to perform on my source database
As a data admin, I want to be able to test that my replicator is correctly configured before running it
As a data admin, I want to track field level lineage for table that were replicated
Design
Approach
At a high level, replicators are implemented by a new CDAP application that define new 'DeltaSource' and 'DeltaTarget' plugin interfaces.
A DeltaSource is responsible for reading change events from a database and translating them into an ordered sequence of standard DDLEvents and DMLEvents. Sources begin by taking a snapshot of the current state of the database, then begin consuming change events from that moment on. Each event contains an Offset, which is a monotonically increasing and unique (at least within a single replicator). Given an offset, the source must be able to start reading events from that offset.
A DeltaTarget is responsible for taking the ordered sequence of events and replicating them to a storage system, as well as telling the app that it has finished replicating an event, allowing the app to store the offset for that event. Events will be send to a target exactly once during normal operation, but can be sent at least once in error scenarios. Once an offset has been successfully persisted, events prior to that number will never be seen again.
Change events are represented as:
class DDLEvent {
Offset offset;
long sequenceNumber;
String transactionId;
boolean isSnapshot;
DDLOperation operation; // "CREATE_DATABASE" | "DROP_DATABASE" | "CREATE_TABLE" | "DROP_TABLE" | "TRUNCATE_TABLE" | "ALTER_TABLE" | "RENAME_TABLE"
Schema schema;
String database;
String prevTable; // used by renames
String table;
List<String> primaryKey;
}
class DMLEvent {
Offset offset;
long sequenceNumber;
String transactionId;
boolean isSnapshot;
DMLOperation operation; // "INSERT" | "DELETE" | "UPDATE" | "COMMIT" ,
String database;
String table;
StructuredRecord before; // null unless operation is "UPDATE" or "DELETE"
StructuredRecord row;
}
interface Offset {
// serialize the offset fields into the DataOutput
void write(DataOutput out) throws IOException;
// deserialize offset fields from the DataInput
void readFields(DataInput in) throws IOException;
}Each DeltaSource is responsible for defining it's own Offset implementation. This is because different sources require different information to know where to start reading from. For example, MySQL offsets correspond to a binlog file name and a position within that file. SQL Server offsets correspond to a change tracking sequence number for the database.
The sequence number is a monotonically increasing number generated by the application, equal to the number of changes emitted by the source. Sources are only responsible for attaching an Offset to each Event they emit. The application will then generate a sequence number and attach it to the event before sending it to the target. This is done because a monotonically increasing number makes it much easier for targets to implement their logic in an idempotent way, which is required to correctly handle failure scenarios. In addition, the sequence number is used as a gauge metric to track progress being made by the replicator.
Examples
With Primary Key
Source
In this example, suppose the following queries are run on the source database:
CREATE DATABASE myDB;
CREATE TABLE customers (id int, name varchar(50), PRIMARY KEY (id));
INSERT INTO customers (id, name) VALUES (0, 'alice');
UPDATE customers set id=1 where id=0;
UPDATE customers set id=2 where id=1;
DELETE FROM customers where id=2;
INSERT into customers (id, name) VALUES (0, 'Alice'), (1, 'blob');
UPDATE customers set name='Bob' where id='1';The source generates the following DDL events:
offset | operation | database | table | schema | primary key |
|---|---|---|---|---|---|
<binlog:mysql-bin.000003, pos:1424> | CREATE_DATABASE |
|
|
|
|
<binlog:mysql-bin.000003, pos:1462> | CREATE_TABLE |
| customers |
| id |
followed by the following DML events:
offset | operation | database | table | before | row |
|---|---|---|---|---|---|
<binlog:mysql-bin.000003, pos:1462> | INSERT | myDB | customers |
| <id:0, name:alice> |
<binlog:mysql-bin.000003, pos:1482> | UPDATE | myDB | customers | <id:0, name:alice> | <id:1, name:alice> |
<binlog:mysql-bin.000003, pos:1493> | UPDATE | myDB | customers | <id:1, name:alice> | <id:2, name:alice> |
<binlog:mysql-bin.000003, pos:1519> | DELETE | myDB | customers | <id:2, name:alice> | <id:2> |
<binlog:mysql-bin.000003, pos:1538> | INSERT | myDB | customers |
| <id:0, name:Alice> |
<binlog:mysql-bin.000003, pos:1557> | INSERT | myDB | customers |
| <id:1, name:blob> |
<binlog:mysql-bin.000003, pos:1598> | UPDATE | myDB | customers | <id:1, name:blob> | <id:1, name:Bob> |
Sequence number is attached by the application, the source is only responsible for attaching an offset to each event and defining how to compare offsets. In the MySQL case, offsets are compared by filename first, then position within the file.
Target
The BigQuery target batches DML events together and writes a batch of events to GCS. Once in GCS, it runs a BigQuery load job to load the changes into a staging table. Finally, it runs a Merge query to merge events from the staging table into the actual target table. Once that is complete, it persists the latest sequence number of events contained in the batch. DDL events are not batched together.
For event #0, the target creates a BQ dataset named 'myDB'. Since it must assume the event occurs at least once, it checks if the dataset exists before creating it. After creating the dataset, the target calls a method that tells the application to remember that the event was replicated. The application stores the offset and sequence number for that event.
For event #1, a staging table '_staging_customers' is created that records values for the row before and after the change along with 3 extra columns – batchId, sequenceNum, and operation. Batch id is just the current timestamp of the load job. The table is partitioned on batchId and clustered on sequenceNum. This allows efficiently selecting data for a specific batchId while ordering by sequenceNum. Note that a BQ 'integer' is 8 bytes, equivalent to a Java long.
_batch_id (timestamp) | _sequence_num (integer) | _operation (string) | _before_id (integer) | _before_name (string) | id (integer) | name (string) |
|---|---|---|---|---|---|---|
|
|
|
|
|
|
|
the actual target table 'customers' is also created with the same schema as the source table, except with the sequence number as an additional column:
_sequence_num (integer) | id (integer) | name (string) |
|---|---|---|
|
|
|
For events #2-7, the target may decide to batch them together in different ways. Supposing they all get batched together, after the load job, the staging table looks like:
_batch_id | _sequence_num | _operation | _before_id | _before_name | id | name |
|---|---|---|---|---|---|---|
1234567890 | 2 | INSERT |
|
| 0 | alice |
1234567890 | 3 | UPDATE | 0 | alice | 1 | alice |
1234567890 | 4 | UPDATE | 1 | alice | 2 | alice |
1234567890 | 5 | DELETE | 2 | alice | 2 |
|
1234567890 | 6 | INSERT |
|
| 0 | Alice |
1234567890 | 7 | INSERT |
|
| 1 | blob |
1234567890 | 8 | UPDATE | 1 | blob | 1 | Bob |
A merge query is then run to merge changes from the staging table into the final target table:
MERGE myDB.customers as T
USING ($DIFF_QUERY) as D
ON T.id = D._before_id
WHEN MATCHED AND D._op = "DELETE
DELETE
WHEN MATCHED AND D._op IN ("INSERT", "UPDATE")
UPDATE id = D.id, name = D.name
WHEN NOT MATCHED AND D._op IN ("INSERT", "UPDATE")
INSERT (id, name) VALUES (id, name)Where the $DIFF_QUERY is:
SELECT A.* FROM
(SELECT * FROM myDB._staging_customers WHERE _batch_id = 1234567890 AND _sequence_num > $LATEST_APPLIED) as A
LEFT OUTER JOIN
(SELECT * FROM myDB._staging_customers WHERE _batch_id = 1234567890 AND _sequence_num > $LATEST_APPLIED) as B
ON A.id = B._before_id AND A._sequence_num < B._sequence_num
WHERE B._before_id IS NULLThe diff query is responsible for getting the latest change for each primary key. With the example above, it results in:
_batch_id | _sequence_num | _operation | _before_id | _before_name | id | name |
|---|---|---|---|---|---|---|
1234567890 | 5 | DELETE | 2 | alice | 2 |
|
1234567890 | 6 | INSERT |
|
| 0 | Alice |
1234567890 | 8 | UPDATE |
|
| 1 | Bob |
The $LATEST_APPLIED variable is the max sequence number seen in the target table. This is required to ensure idempotency – events that are replayed should not be re-inserted into the final target table. The latest applied sequence number can be tracked in memory by the target, except for the first time it sees the table, where it will need to run a SELECT MAX(_sequence_num) query.
Note: When there is a primary key, it is possible to implement the target in such a way where it doesn't need the additional sequence number column, exactly matching the source schema. However, this complicates the idempotency story, as the target would need to ensure that load and merge jobs are not run on data that was previously seen, requiring more complicated logic around using specific GCS object names and BQ load job ids.
Without Primary Key
If no primary key exists, a very similar set of steps occurs, except BigQuery will use all of the columns as the "primary key".
Note: SQL Server doesn't allow enabling change tracking on a table without a primary key.
Source
Suppose the following queries are run on the source database:
CREATE DATABASE myDB;
CREATE TABLE customers (name varchar(50));
INSERT INTO customers (name) VALUES ('alice', 'alice', 'Bob');
UPDATE customers SET name = 'Alyce' WHERE name = 'alice';
UPDATE customers SET name = 'Alice' WHERE name = 'Alyce';
DELETE FROM customers WHERE name = 'alice';The source generates the following DDL events:
offset | operation | database | table | schema | primary key |
|---|---|---|---|---|---|
<binlog:mysql-bin.000003, pos:1424> | CREATE_DATABASE |
|
|
|
|
<binlog:mysql-bin.000003, pos:1462> | CREATE_TABLE |
| customers |
|
|
followed by the following DML events:
offset | transaction id | operation | database | table | before | row |
|---|---|---|---|---|---|---|
<binlog:mysql-bin.000003, pos:1462> | 0 | INSERT | myDB | customers |
| <name:alice> |
<binlog:mysql-bin.000003, pos:1482> | 0 | INSERT | myDB | customers |
| <name:alice> |
<binlog:mysql-bin.000003, pos:1493> | 0 | INSERT | myDB | customers |
| <name:Bob> |
<binlog:mysql-bin.000003, pos:1519> | 1 | UPDATE | myDB | customers | <name:alice> | <name:Alyce> |
<binlog:mysql-bin.000003, pos:1538> | 1 | UPDATE | myDB | customers | <name:alice> | <name:Alyce> |
<binlog:mysql-bin.000003, pos:1557> | 2 | UPDATE | myDB | customers | <name:Alyce> | <name:Alice> |
<binlog:mysql-bin.000003, pos:1598> | 2 | UPDATE | myDB | customers | <name:Alyce> | <name:Alice> |
<binlog:mysql-bin.000003, pos:1603> | 3 | DELETE | myDB | customers | <name:Alice> | <name:Alice> |
<binlog:mysql-bin.000003, pos:1605> | 3 | DELETE | myDB | customers | <name:Alice> | <name:Alice> |
Target
The BigQuery target loads the DML events into a staging table
_batch_id | _sequence_num | _operation | _before_name |
|---|