Add SNAPSHOT dataset and SNAPSHOT source/sink
Introduction:
Sometimes, users of CDAP want to transform their database to a hbase dataset. When they update the original database, they should be able to see the corresponding updates in the hbase dataset.
What we have now and the problem:
Users of CDAP can create an adapter with a table sink to store data from a source. The table sink can transform the data from the source and correctly update whenever insertions occur in the source database. However, whenever deletions occur, the table sink will not delete the corresponding entries and they will remain in the table.
Solution:
SNAPSHOT dataset is a good solution for this.
User stories:
Creating adapters with SNAPSHOT source/sink:
Users should be able to create an adapter with a SNAPSHOT dataset as a source or sink. User should be able to specify the TTL or minimum version of snapshots for the SNAPSHOT dataset. A SNAPSHOT sink should get all the data from the source in each run of the adapter. Whenever insertions, deletions, or updates happen in the source, users should see corresponding changes in the sink.
Reading from the SNAPSHOT datasets:
Users can read from the SNAPSHOT datasets using other applications. They should see the data with the current latest snapshot. They will not be able to see any updates in the dataset unless they try to get the dataset again. Therefore, the users should use the SNAPSHOT dataset for batch programs. For long run programs such as flow, the user can only see one SNAPSHOT and cannot see any updates.
Design:
My test process and results for the existing table sink:
1. I created an adapter with a DB source and a table sink. Initially the mysql table had 4 entries.
After one run of mapreduce,
the table sink got initialized and had 4 entries with correct value.
2. I inserted one entry in the mysql table.
After one run of mapreduce,
the table sink got correctly updated.
3. I deleted one entry from the mysql table.
After one run of mapreduce,
the table sink did not update the change and remain the same.
4. I inserted one entry in the mysql table.
After one run of mapreduce,
the table sink correctly updated the inserted entry but the deleted entry was still there.
SNAPSHOT dataset design (TBD):
The SNAPSHOT dataset will be a subclass of a table dataset, since the functionality of a Snapshot dataset will be similar to a table dataset. It basically will have all the functionalities a Table dataset has.
Like other datasets, several classes are needed:
SnapshotDataset.java, which provides implementation of data operations that can be performed on this dataset instance.
SnapshotDatasetAdmin.java, which defines the administrative operations the Snapshot dataset will support.
SnapshotDatasetDefinition.java, which defines a way to configure the snapshot dataset and a way to perform administrative or data manipulation operations on the dataset instance by providing implementation of SnapshotDatasetAdmin and implementation of SnapshotDataset.
SnapshotModule.java, which defines the dataset type.
Approach:
Description:
Have two tables for the SNAPSHOT dataset, one is metadata table use to store the current version, the other is use to store the data. When querying for the dataset, we will first try to get the current version from the metadata table and filter the other table for the corresponding version. Use TTL or minVersion to maintain the out-dated data.
Designs/Algorithms:
Have two tables:
Metadata table - it has only one row and stores the current version of the SNAPSHOT datasets.
Main table - it will store all the data which comes from the adapter. It should be configured with TTL or minVersion to maintain the old data.
Ways to store the latest version:
For each adapter run, when the MapReduce job is writing data to the main table of the SNAPSHOT dataset, the data will be assigned with a transaction id as timestamp by HBase.
If the adapter run succeeds, in the onRunFinish() method, we update the metadata table with the transaction-id of the mapreduce job.
When reading from the SNAPSHOT datasets, data in main table with the same transaction-id in the metadata table will be read.
Examples:
Suppose we update the dataset with the following data at time = 10, the two tables will look like:
Main data table | ||
rowKey | value | timestamp |
x | 1 | 10 |
y | 2 | 10 |
z | 3 | 10 |
Metadata table | ||
rowKey | value | timestamp |
version | 10 | 12 |
The version is 10 so when query happens at this time, all data with timestamp 10 will be returned.
Next, suppose we update the dataset at time = 20, only x and z are updated, y is deleted, two tables will look like:
Main data table | ||||
rowKey | value | timestamp | value | timestamp |
x | 1 | 10 | 1 | 20 |
y | 2 | 10 | ||
z | 3 | 10 | 4 | 20 |
Metadata table | ||||
rowKey | value | timestamp | value | timestamp |
version | 10 | 12 | 20 | 22 |
The latest version in the metadata table is now 20, so all data with timestamp 20 will be returned.
How to filter the result when reading the table:
Make the table to accept a runtime argument to be used to filter the data in the main data table.
Each time getDataset() in SNAPSHOTDatasetDefinition is called:
Call getDataset for the metadata table -
metadata = getDataset(“metadata”,...);
Start transaction to get the latest version from the metadata table,
tx.start();
version = metadata.get(“version”);
tx.stop();
Pass the version as the arguments to the main data table to filter and get the dataset - mainData = getDataset(“mainData”, … , “SNAPSHOT”+version);
Use TTL or minVersion to maintain the old data:
TTL - old rows will get deleted after certain time is passed
Consequences of using TTL: if the adapter stops, then all the data in the dataset will be deleted.
minVersion = 1 - keep at least one version of data in the dataset.
Consequences of using minVersion: the old data will get remained in the dataset actually since we are filtering the dataset.
Main challenges(sorted by priority):
How to start a transaction in getDataset() method.
Use Inject in the SnapshotDatasetDefinition
Tested in standalone using TimePartitionedFilesetDefinition, can successfully inject the TransactionExecutorFactory
Next to investigate how to start a transaction
Modify the Table to to receive the version as the runtime arguments for the SNAPSHOT dataset
Create a new constructor for LevelDBTable, HBaseTable and InMemoryTable to accept the version as a runtime parameter.
Delegate the change to all the tables.
Modify all get(), scan() related functions in BufferingTable and all its subclasses to filter the results if the version is provided.
For each result we get, we get its timestamp of each cell and check if it matches the version.
Let the user choose to use TTL or minVersion, both have consequences(specified above).
What we changed in the implementation:
Do not have the SnapshotDatasetAdmin class:
In the design:
Have a SnapshotDatasetAdmin.java class
What we do now:
Use the existing CompositeDatasetAdmin.java class instead since it propagates administrative operations to the given list of DatasetAdmin.
Change the method to update the meta data table in the SnapshotSink:
In the design:
In the SnapshotSink of the original design, we should update the metadata table in the onRunFinish method.
What we do now:
We are now doing it in the initialize method instead since the mapreduce job and onRunFinish happen in different transactions, we cannot get the correct transaction id of the mapreduce job in the onRunFinish method.
Code Snippets of initialize method of SnapshotSink.java:
@Override
public void initialize(BatchSinkContext context) throws Exception {
...
Map<String, String> sinkArgs = new HashMap<>();
SnapshotDataset snapshotDataset = context.getDataset(snapshotSinkConfig.name, sinkArgs);
snapshotDataset.updateMetaDataTable(snapshotDataset.getTransactionId());
…
}
Get the main table when we startTx in the SnapshotDataset instead of in the SnapshotDefinition:
In the design:
In the SnapshotDefinition of the original design, we will start a transaction in the getDataset method to get the snapshot version from the meta data table and put the snapshot version as a runtime argument to the main data table and get the main data table.
What we do now:
We pass all the arguments we need to get the main data table to the SnapshotDataset. Every time we start a tx in the SnapshotDataset, we get the version from the meta data table and get the main data table with corresponding arguments.
Reason: We will get a exception with error message “Transaction manager is not running” if we try to start a transaction in the getDataset method of SnapshotDefinition because the TransactionExecutor will be inconsistent with the one in the mapreduce job, we do not have a good way to deal with it.
Code snippets of startTx in SnapshotDataset.java:
public void startTx(Transaction tx) {
...
// Get the version from the metadataTable
Map<String, String> copyOfArguments = new HashMap<>(arguments);
Long version = getCurrentVersion();
if (version != null) {
copyOfArguments.put("version", String.valueOf(version));
}
try {
mainTable = mainTableDef.getDataset(datasetContext, spec.getSpecification("maindata"), copyOfArguments,
classLoader);
} catch (Throwable t) {
LOG.info("Error happens when creating mainTable", t);
Throwables.propagate(t);
}
….
}
Previous designs and options:
Use HBase Versions:
Description:
For each columnFamily in the row in HBase, we can set the number of minimum versions and the TTL. The versions older than the TTL will be deleted, HBase will make sure at least configured number of versions are available at any point of time. We can also set the number of maximum version to avoid too many column versions in the row.
In addition, we might need to store the latest timestamp when adapter ran, to find the corresponding version.
Algorithm:
The SNAPSHOT dataset will have one HBase table, with a TTL passed as parameter to indicate how long the older versions will exist in the dataset.
For each write in the SNAPSHOT dataset, all data will be written to the same row key. We set the minVersion of the columnFamily in this row key be 1, to make sure there is at least one version of data in the dataset. We will set the maxVersion to be 3 to avoid too many versions of data.
For each read from the SNAPSHOT dataset, we will scan the column family in the row key, and find out all the data with the latest timestamp.
Pros:
Performance is good, read and write are simply operations on the HBase table.
Older versions will get automatically deleted with the TTL no matter the adapter starts or not and guaranteed to have at least minVersions in the dataset.
Jobs can always get the correct version of data.
Cons:
Have to do some Tephra changes to make sure the hive queries for the snapshot dataset will get the correct output.
Also involve the Table API changes to correctly accept the read pointer.
2. Use transactions to delete all the old data:
Description:
We can use the transaction property in cdap to delete all the rows in the table and write the new data to the table.
Algorithm:
The SNAPSHOT dataset will have one table. Every time the adapter tries to run a mapreduce job to update the dataset, in the prepareRun() method, before the mapreduce job starts, we will:
Scan the table,
Issue a delete on each row from the scan result.
We run the mapreduce job to put new data in the dataset.
Pros:
Can successfully update the data.
Since we are doing delete in transaction, existing jobs can get the correct data.
After the transaction, new jobs can get the latest data.
Cons:
The performance is bad since we need to scan and delete all the rows.
prepareRun() and mappers may not happen in the same transactions, then jobs reading the dataset between these two transactions will get empty dataset.
For flow jobs, the dataset has to be read only.
3. Use row prefix:
Description:
We will use the timestamp when the mapreduce job finishes as the “version” of data, and keep all the versions in one big table.
Algorithm:
The SNAPSHOT dataset will have one table which contains all versions of data, with a TTL passed as parameter to indicate how long the older versions will exist in the dataset.
Every time a mapreduce job succeeds, the timestamp will be passed to the dataset as a prefix in the row key to indicate which the version of data. We will keep a row key will timestamp 0 to indicate the latest version.
All jobs will get the data with the latest version as prefix.
Pros:
Can successfully update the data.
The older data will get deleted with the TTL automatically.
Jobs can get the correct data if the adapter does not stop.
Cons:
When the adapter does not run, we may lost all versions of data in the dataset.
If the frequency of the adapter run is lower than the TTL, we will not see any data at some time beyond the TTL.
4. Use two table switching:
Description:
Use two tables and a metadata table to implement the SNAPSHOT dataset.
Algorithm:
The SNAPSHOT dataset will have three tables, one is primary, one is secondary, and one is metadata table.
The primary table will be the one shown to the user. It contains all the latest data.
The secondary table will be used for table switching. The table is “hidden” and user does not know the existence.
The metadata table will be used to tell which one is primary.
When a mapreduce jobs tries to write data to the dataset, we do the following operations:
Delete all the data in the secondaryTable, (in prepareRun() method)
Copy all the data in the secondaryTable, (in mappers())
Update the metadata table to make the secondaryTable primary(if mapreduce succeeds)
Pros:
Easy to implement
Can successfully update the data.
Cons:
Performance is bad since need to delete all old data.
When job is accessing the dataset, the table it reads may get deleted.
Clients may get exceptions since SNAPSHOT datasets have three embedded tables and clients already have pointers to the underlying tables.
5. Use versions instead of TTL in HBase:
Description:
We will let user specify the number of snapshots for the snapshot dataset and will keep that number of snapshots in the dataset. Use a metadata table to maintain the snapshot id and the current version number.
Algorithm:
Let user specify the number of snapshots they want in the SNAPSHOT dataset.
Have two tables for the SNAPSHOT dataset, one will be use to store the data and another will be used as a metadata table which stores the current snapshot, sequence number and snapshot id.
How the metadata table looks like, e.g, snapshot = 2:
SNAPSHOT id | isActive |
0 | No |
1 | No |
0 | Yes |
... | ... |
Another row to store the current snapshot:
The snapshot id is stored as a prefix of the row key in the HBaseTable.
When data comes in, we check:
If the number of snapshots in the table is less than the number of snapshots user specified, we increment the version number, and add the version number as a prefix to the row key and put all the data in the table.
If the number of versions in the table is equal to the number of snapshots, we find the current snapshot id in the metadata table, and delete all data with snapshot id calculated by (currentSnapshotID + 1) modulo snapshotNum.
When user tries to read the dataset, use the metadata table to find out the current snapshot id and find the snapshot in the table with the prefix.
Pros:
Can successfully update the table
Do not need to worry about the TTL any more, use the number of snapshots as a controller, e.g, user schedules for the adapter run once a day and requires for three snapshots, then all the data will remain in the dataset for three days. When the adapter stops, the data will still remain in the dataset.
Other jobs can get correct and consistent data by looking up the metadata table.
Cons
Performance is not that good - adding prefix and deletion take some time.
Created in 2020 by Google Inc.