Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 4 Current »

Introduction:

Sometimes, users of CDAP want to transform their database to a hbase dataset. When they update the original database, they should be able to see the corresponding updates in the hbase dataset.

What we have now and the problem:

Users of CDAP can create an adapter with a table sink to store data from a source. The table sink can transform the data from the source and correctly update whenever insertions occur in the source database. However, whenever deletions occur, the table sink will not delete the corresponding entries and they will remain in the table.

Solution:

SNAPSHOT dataset is a good solution for this.


User stories:

Creating adapters with SNAPSHOT source/sink:

Users should be able to create an adapter with a SNAPSHOT dataset as a source or sink. User should be able to specify the TTL or minimum version of snapshots for the SNAPSHOT dataset. A SNAPSHOT sink should get all the data from the source in each run of the adapter. Whenever insertions, deletions, or updates happen in the source, users should see corresponding changes in the sink.


Reading from the SNAPSHOT datasets:

Users can read from the SNAPSHOT datasets using other applications. They should see the data with the current latest snapshot. They will not be able to see any updates in the dataset unless they try to get the dataset again. Therefore, the users should use the SNAPSHOT dataset for batch programs. For long run programs such as flow, the user can only see one SNAPSHOT and cannot see any updates.


Design:

My test process and results for the existing table sink:

1. I created an adapter with a DB source and a table sink. Initially the mysql table had 4 entries.

   After one run of mapreduce,

   the table sink got initialized and had 4 entries with correct value.

2. I inserted one entry in the mysql table.

   After one run of mapreduce,

   the table sink got correctly updated.

3. I deleted one entry from the mysql table.

   After one run of mapreduce,

   the table sink did not update the change and remain the same.

4. I inserted one entry in the mysql table.

   After one run of mapreduce,

   the table sink correctly updated the inserted entry but the deleted entry was still there.


SNAPSHOT dataset design (TBD):

The SNAPSHOT dataset will be a subclass of a table dataset, since the functionality of a Snapshot dataset will be similar to a table dataset. It basically will have all the functionalities a Table dataset has.

Like other datasets, several classes are needed:

  1. SnapshotDataset.java, which provides implementation of data operations that can be performed on this dataset instance.

  2. SnapshotDatasetAdmin.java, which defines the administrative operations the Snapshot dataset will support.

  3. SnapshotDatasetDefinition.java, which defines a way to configure the snapshot dataset and a way to perform administrative or data manipulation operations on the dataset instance by providing implementation of SnapshotDatasetAdmin and implementation of SnapshotDataset.

  4. SnapshotModule.java, which defines the dataset type.


Approach:

  • Description:

Have two tables for the SNAPSHOT dataset, one is metadata table use to store the current version, the other is use to store the data. When querying for the dataset, we will first try to get the current version from the metadata table and filter the other table for the corresponding version. Use TTL or minVersion to maintain the out-dated data.

  • Designs/Algorithms:

    • Have two tables:

      • Metadata table - it has only one row and stores the current version of the SNAPSHOT datasets.

      • Main table - it will store all the data which comes from the adapter. It should be configured with TTL or minVersion to maintain the old data.

    • Ways to store the latest version:

      • For each adapter run, when the MapReduce job is writing data to the main table of the SNAPSHOT dataset, the data will be assigned with a transaction id as timestamp by HBase.

      • If the adapter run succeeds, in the onRunFinish() method, we update the metadata table with the transaction-id of the mapreduce job.

      • When reading from the SNAPSHOT datasets, data in main table with the same transaction-id in the metadata table will be read.

Examples:

Suppose we update the dataset with the following data at time = 10, the two tables will look like:

 

Main data table

rowKey

value

timestamp

x

1

10

y

2

10

z

3

10

 

 

Metadata table

rowKey

value

timestamp

version

10

12

 

The version is 10 so when query happens at this time, all data with timestamp 10 will be returned.

Next, suppose we update the dataset at time = 20, only x and z are updated, y is deleted, two tables will look like:

 

Main data table

rowKey

value

timestamp

value

timestamp

x

1

10

1

20

y

2

10

  

z

3

10

4

20

 

 

Metadata table

rowKey

value

timestamp

value

timestamp

version

10

12

20

22

 

The latest version in the metadata table is now 20, so all data with timestamp 20 will be returned.

  • How to filter the result when reading the table:

    • Make the table to accept a runtime argument to be used to filter the data in the main data table.

    • Each time getDataset() in SNAPSHOTDatasetDefinition is called:

      • Call getDataset for the metadata table -

metadata = getDataset(“metadata”,...);

  • Start transaction to get the latest version from the metadata table,

tx.start();

version = metadata.get(“version”);

tx.stop();

  • Pass the version as the arguments to the main data table to filter and get the dataset - mainData = getDataset(“mainData”, … , “SNAPSHOT”+version);

  • Use TTL or minVersion to maintain the old data:

    • TTL - old rows will get deleted after certain time is passed

Consequences of using TTL: if the adapter stops, then all the data in the dataset will be deleted.

  • minVersion = 1 - keep at least one version of data in the dataset.

Consequences of using minVersion: the old data will get remained in the dataset actually since we are filtering the dataset.

  • Main challenges(sorted by priority):

    • How to start a transaction in getDataset() method.

      • Use Inject in the SnapshotDatasetDefinition

      • Tested in standalone using TimePartitionedFilesetDefinition, can successfully inject the TransactionExecutorFactory

      • Next to investigate how to start a transaction

  • Modify the Table to to receive the version as the runtime arguments for the SNAPSHOT dataset

    • Create a new constructor for LevelDBTable, HBaseTable and InMemoryTable to accept the version as a runtime parameter.

  • Delegate the change to all the tables.

    • Modify all get(), scan() related functions in BufferingTable and all its subclasses to filter the results if the version is provided.

      • For each result we get, we get its timestamp of each cell and check if it matches the version.

  • Let the user choose to use TTL or minVersion, both have consequences(specified above). 

  • No labels