Replication Snapshot completion status and State Store Management

Replication process consists of 2 parts :

  1. Snapshot : The initial backfill where all the existing data is ingested as INSERT. This is generally a heavy process.

  2. Replicating : After snapshot the incremental events on DB source is ingested.

After every successful sink we store the current state / checkpoint with the status of the snapshot in our internal DB ( CDAP 6.6 and below stores in gcs).


We can inspect the current state / checkpoint via Rest Apis . We generally refer to this info as Offset

Method to Trigger CDAP Rest Apis

  1. Use the UI to access the rest apis.

    1. Navigate to SYSTEM ADMIN > Configuration > Make Http Calls

 

2. Or Use a API client on your system like curl. Follow

Fetch or Read Offset ( CDAP 6.7 + )

Step 1: Fetch the generation number for the app (pipeline).

The generation number information can fetched via :

GET /v3/namespaces/system/apps/delta/services/assessor/methods/v1/contexts/{namespace}/apps/{app_name}/maxgeneration

Parameter

Description

Parameter

Description

namespace

The namespace for your application. In most cases ” default “ is the correct value.

app_name

The name of your replication pipeline


Example Screenshot :

Fetching generation number

 

Step 2: Fetch the offset information for each Task instance

The offset is maintained at for every task instance. ( While you configure replication pipeline, there is an option to set no of tasks and each task will have it's own offset information).

Let’s say we get the generation value as : generation_num from STEP 1.

The offset information can fetched via :

GET /v3/namespaces/system/apps/delta/services/assessor/methods/v1/contexts/{namespace}/apps/{app_name}/generations/{generation_num}/instances/{instance_id}/offset

 

Parameter

Description

Parameter

Description

namespace

The namespace for your application. In most cases ” default “ is the correct value.

app_name

The name of your replication pipeline

generation_num

The value we got from STEP 1 . It’s generally a LONG type value

instance_id

The id of the task .
The tasks are given monotonically increasing value starting from 0.
Depending on the number of tasks, one might have to make multiple api calls.
Read this Section for more info


Example Screenshot : ( fetching offset info for app name “testsimple” and task no “0” )


Step 3: Extra Step (Optional) : To fetch which tables belong to each Task / Instance_id

Each Task/instance handles a particular set of tables. It is responsible to snapshot and replication process for those tables.

The Table to Instance_id mapping information can fetched via :

GET v3/namespaces/system/apps/{app_name}/workers/DeltaWorker


Look for properties . table.assignments in the Response body.

 

Checking SNAPSHOT status :

SQL Server Source

Fetch the offset info using the above mentioned steps and look for offset. state . snapshot_completed .

true means it is completed, false means it’s still in progress.

Assuming we get the following offset info using contexts/default/apps/my_app_name/generations/1663319016782/instances/1/offset

Offset response :


This means the task with instance_id = 1 which handles 2 tables dbo.persons,dbo.persons_date2 (confirm via via STEP 3 only) has completed the snapshot process.

Incase "snapshot_completed": "false" and we have a failure / stopped and have a restart of pipeline, it will run snapshot for all the tables assigned to this task. ( even if some tables might have completed snapshot process ).
Basically we can track Snapshot for a group of tables together only.


MYSQL Source

Fetch the offset info using the above mentioned steps and look for offset. state . snapshot

If the value is true : It means the SNAPSHOT is still in progress.
if the value if MISSING / false : it means the SNAPSHOT process is complete.

Example of a Successful Snapshot :


Datastream - Oracle Source

Fetch the offset info using the above mentioned steps, snapshot state will be at path

offset.state.<COMPOSITE_TABLE_NAME>.snapshot.done

where COMPOSITE_TABLE_NAME is <SCHEMA>_<TABLE_NAME>

If the value is true , it means snapshot process is complete for the table, while missing/false value signifies it is still in progress. Note that the snapshot state is maintained per table.

Example of a successful snapshot:


For CDAP 6.6 and below

These info are present on a GCS location of customer project.

Steps to view offset info :
1. Export the JSON of the pipeline ( in pipeline ui page, ACTIONS → EXPORT )

2. Look for the directory location offsetBasePath

3. It should be like "offsetBasePath": "gs://df-123-abc/delta/" , ( The path scheme depends on your environment, can be a local file as well)

4. navigate to gs://df-123-abc/delta/{namespace}/{app_name}/{generation_num}/{instance_id}

generation_num : choose the largest value, incase there are multiple

5. download and view the offset file .

Created in 2020 by Google Inc.