Introduction:

Sometimes, users of CDAP want to transform their database to a hbase dataset. When they update the original database, they should be able to see the corresponding updates in the hbase dataset.

What we have now and the problem:

Users of CDAP can create an adapter with a table sink to store data from a source. The table sink can transform the data from the source and correctly update whenever insertions occur in the source database. However, whenever deletions occur, the table sink will not delete the corresponding entries and they will remain in the table.

Solution:

SNAPSHOT dataset is a good solution for this.


User stories:

Creating adapters with SNAPSHOT source/sink:

Users should be able to create an adapter with a SNAPSHOT dataset as a source or sink. User should be able to specify the TTL or minimum version of snapshots for the SNAPSHOT dataset. A SNAPSHOT sink should get all the data from the source in each run of the adapter. Whenever insertions, deletions, or updates happen in the source, users should see corresponding changes in the sink.

 

Reading from the SNAPSHOT datasets:

Users can read from the SNAPSHOT datasets using other applications. They should see the data with the current latest snapshot. They will not be able to see any updates in the dataset unless they try to get the dataset again. Therefore, the users should use the SNAPSHOT dataset for batch programs. For long run programs such as flow, the user can only see one SNAPSHOT and cannot see any updates.

 

Design:

My test process and results for the existing table sink:

1. I created an adapter with a DB source and a table sink. Initially the mysql table had 4 entries.

   After one run of mapreduce,

   the table sink got initialized and had 4 entries with correct value.

2. I inserted one entry in the mysql table.

   After one run of mapreduce,

   the table sink got correctly updated.

3. I deleted one entry from the mysql table.

   After one run of mapreduce,

   the table sink did not update the change and remain the same.

4. I inserted one entry in the mysql table.

   After one run of mapreduce,

   the table sink correctly updated the inserted entry but the deleted entry was still there.

 

SNAPSHOT dataset design (TBD):

The SNAPSHOT dataset will be a subclass of a table dataset, since the functionality of a Snapshot dataset will be similar to a table dataset. It basically will have all the functionalities a Table dataset has.

Like other datasets, several classes are needed:

  1. SnapshotDataset.java, which provides implementation of data operations that can be performed on this dataset instance.

  2. SnapshotDatasetAdmin.java, which defines the administrative operations the Snapshot dataset will support.

  3. SnapshotDatasetDefinition.java, which defines a way to configure the snapshot dataset and a way to perform administrative or data manipulation operations on the dataset instance by providing implementation of SnapshotDatasetAdmin and implementation of SnapshotDataset.

  4. SnapshotModule.java, which defines the dataset type.

     

Approach:

Have two tables for the SNAPSHOT dataset, one is metadata table use to store the current version, the other is use to store the data. When querying for the dataset, we will first try to get the current version from the metadata table and filter the other table for the corresponding version. Use TTL or minVersion to maintain the out-dated data.

Examples:

Suppose we update the dataset with the following data at time = 10, the two tables will look like:

 

Main data table

rowKey

value

timestamp

x

1

10

y

2

10

z

3

10

 

 

Metadata table

rowKey

value

timestamp

version

10

12

 

The version is 10 so when query happens at this time, all data with timestamp 10 will be returned.

Next, suppose we update the dataset at time = 20, only x and z are updated, y is deleted, two tables will look like:

 

Main data table

rowKey

value

timestamp

value

timestamp

x

1

10

1

20

y

2

10

  

z

3

10

4

20

 

 

Metadata table

rowKey

value

timestamp

value

timestamp

version

10

12

20

22

 

The latest version in the metadata table is now 20, so all data with timestamp 20 will be returned.

metadata = getDataset(“metadata”,...);

tx.start();

version = metadata.get(“version”);

tx.stop();

Consequences of using TTL: if the adapter stops, then all the data in the dataset will be deleted.

Consequences of using minVersion: the old data will get remained in the dataset actually since we are filtering the dataset.

What we changed in the implementation:

  1. Do not have the SnapshotDatasetAdmin class:

Have a SnapshotDatasetAdmin.java class

Use the existing CompositeDatasetAdmin.java class instead since it propagates administrative operations to the given list of DatasetAdmin.


  1. Change the method to update the meta data table in the SnapshotSink:

In the SnapshotSink of the original design, we should update the metadata table in the onRunFinish method.

We are now doing it in the initialize method instead since the mapreduce job and onRunFinish happen in different transactions, we cannot get the correct transaction id of the mapreduce job in the onRunFinish method.

@Override

public void initialize(BatchSinkContext context) throws Exception {

...

Map<String, String> sinkArgs = new HashMap<>();

SnapshotDataset snapshotDataset = context.getDataset(snapshotSinkConfig.name, sinkArgs);

snapshotDataset.updateMetaDataTable(snapshotDataset.getTransactionId());

}


  1. Get the main table when we startTx in the SnapshotDataset instead of in the SnapshotDefinition:

In the SnapshotDefinition of the original design, we will start a transaction in the getDataset method to get the snapshot version from the meta data table and put the snapshot version as a runtime argument to the main data table and get the main data table.

We pass all the arguments we need to get the main data table to the SnapshotDataset. Every time we start a tx in the SnapshotDataset, we get the version from the meta data table and get the main data table with corresponding arguments.

Reason: We will get a exception with error message “Transaction manager is not running” if we try to start a transaction in the getDataset method of SnapshotDefinition because the TransactionExecutor will be inconsistent with the one in the mapreduce job, we do not have a good way to deal with it.

public void startTx(Transaction tx) {

...

 // Get the version from the metadataTable

 Map<String, String> copyOfArguments = new HashMap<>(arguments);

Long version = getCurrentVersion();

 if (version != null) {

  copyOfArguments.put("version", String.valueOf(version));

}

 try {

  mainTable mainTableDef.getDataset(datasetContextspec.getSpecification("maindata"), copyOfArguments,

                                      classLoader);


catch (Throwable t) {

  LOG.info("Error happens when creating mainTable", t);

  Throwables.propagate(t);

}

….

}


Previous designs and options:

  1. Use HBase Versions:

For each columnFamily in the row in HBase, we can set the number of minimum versions and the TTL. The versions older than the TTL will be deleted, HBase will make sure at least configured number of versions are available at any point of time. We can also set the number of maximum version to avoid too many column versions in the row.

In addition, we might need to store the latest timestamp when adapter ran, to find the corresponding version.

 

      2. Use transactions to delete all the old data:

We can use the transaction property in cdap to delete all the rows in the table and write the new data to the table.

      3. Use row prefix:

We will use the timestamp when the mapreduce job finishes as the “version” of data, and keep all the versions in one big table.


      4. Use two table switching:

Use two tables and a metadata table to implement the SNAPSHOT dataset.


      5. Use versions instead of TTL in HBase:

We will let user specify the number of snapshots for the snapshot dataset and will keep that number of snapshots in the dataset. Use a metadata table to maintain the snapshot id and the current version number.

SNAPSHOT id

isActive

0

No

1

No

0

Yes

...

...

 Another row to store the current snapshot: