Replication Snapshot completion status and State Store Management
Replication process consists of 2 parts :
Snapshot : The initial backfill where all the existing data is ingested as INSERT. This is generally a heavy process.
Replicating : After snapshot the incremental events on DB source is ingested.
After every successful sink we store the current state / checkpoint with the status of the snapshot in our internal DB ( CDAP 6.6 and below stores in gcs).
We can inspect the current state / checkpoint via Rest Apis . We generally refer to this info as Offset
Method to Trigger CDAP Rest Apis
Use the UI to access the rest apis.
Navigate to
SYSTEM ADMIN
>Configuration
>Make Http Calls
2. Or Use a API client on your system like curl. Follow Microservices Introduction
Fetch or Read Offset ( CDAP 6.7 + )
Step 1: Fetch the generation number for the app (pipeline).
The generation number information can fetched via :
GET /v3/namespaces/system/apps/delta/services/assessor/methods/v1/contexts/{namespace}/apps/{app_name}/maxgeneration
Parameter | Description |
---|---|
| The namespace for your application. In most cases ” |
| The name of your replication pipeline |
Example Screenshot :
Step 2: Fetch the offset information for each Task instance
The offset is maintained at for every task instance. ( While you configure replication pipeline, there is an option to set no of tasks and each task will have it's own offset information).
Let’s say we get the generation value as : generation_num
from STEP 1.
The offset information can fetched via :
GET /v3/namespaces/system/apps/delta/services/assessor/methods/v1/contexts/{namespace}/apps/{app_name}/generations/{generation_num}/instances/{instance_id}/offset
Parameter | Description |
---|---|
| The namespace for your application. In most cases ” |
| The name of your replication pipeline |
| The value we got from STEP 1 . It’s generally a LONG type value |
| The id of the task . |
Example Screenshot : ( fetching offset info for app name “testsimple” and task no “0” )
Step 3: Extra Step (Optional) : To fetch which tables belong to each Task / Instance_id
Each Task/instance handles a particular set of tables. It is responsible to snapshot and replication process for those tables.
The Table
to Instance_id
mapping information can fetched via :
GET v3/namespaces/system/apps/{app_name}/workers/DeltaWorker
Look for properties
. table.assignments
in the Response body.
Checking SNAPSHOT status :
SQL Server Source
Fetch the offset info using the above mentioned steps and look for offset
. state
. snapshot_completed
.true
means it is completed, false
means it’s still in progress.
Assuming we get the following offset info using contexts/default/apps/my_app_name/generations/1663319016782/instances/1/offset
Offset response :
This means the task with instance_id
= 1 which handles 2 tables dbo.persons,dbo.persons_date2
(confirm via via STEP 3 only) has completed the snapshot process.
Incase "snapshot_completed": "false"
and we have a failure / stopped and have a restart of pipeline, it will run snapshot for all the tables assigned to this task. ( even if some tables might have completed snapshot process ).
Basically we can track Snapshot for a group of tables together only.
MYSQL Source
Fetch the offset info using the above mentioned steps and look for offset
. state
. snapshot
If the value is true
: It means the SNAPSHOT is still in progress.
if the value if MISSING / false
: it means the SNAPSHOT process is complete.
Example of a Successful Snapshot :
Datastream - Oracle Source
Fetch the offset info using the above mentioned steps, snapshot state will be at path
offset.state.<COMPOSITE_TABLE_NAME>.snapshot.done
where COMPOSITE_TABLE_NAME
is <SCHEMA>_<TABLE_NAME>
If the value is true
, it means snapshot process is complete for the table, while missing/false
value signifies it is still in progress. Note that the snapshot state is maintained per table.
Example of a successful snapshot:
For CDAP 6.6 and below
These info are present on a GCS location of customer project.
Steps to view offset info :
1. Export the JSON of the pipeline ( in pipeline ui page, ACTIONS → EXPORT )
2. Look for the directory location offsetBasePath
3. It should be like "offsetBasePath": "gs://df-123-abc/delta/"
, ( The path scheme depends on your environment, can be a local file as well)
4. navigate to gs://df-123-abc/delta/{namespace}/{app_name}/{generation_num}/{instance_id}
generation_num
: choose the largest value, incase there are multiple
5. download and view the offset
file .
Created in 2020 by Google Inc.