Custom Dataset Exploration (Deprecated)
Warning: This topic is no longer supported.
It is often useful to be able to explore a dataset in an ad-hoc manner. This can be done using SQL if your dataset fulfills two requirements:
it defines the schema for each record; and
it has a method to scan its data record by record.
For CDAP datasets, this is done by implementing the RecordScannable
interface. The CDAP built-in KeyValueTable
and ObjectMappedTable
datasets already implement this and can be used for ad-hoc queries.
Let's take a closer look at the RecordScannable
interface.
Defining the Record Schema
The record schema is given by returning the Java type of each record, and CDAP will derive the record schema from that type:
Type getRecordType();
For example, suppose you have a class Entry
defined as:
class Entry {
private final String key;
private final int value;
...
}
You can implement a record-scannable dataset that uses Entry
as the record type:
class MyDataset ... implements RecordScannable<Entry> {
...
public Type getRecordType() {
return Entry.class;
}
Note that Java's Class
implements Type
and you can simply return Entry.class
as the record type. CDAP will use reflection to infer a SQL-style schema using the fields of the record type.
In the case of the above class Entry
, the schema will be:
Limitations
The record type must be a structured type, that is, a Java class with fields. This is because SQL tables require a structure type at the top level. This means the record type cannot be a primitive, collection or map type. However, these types may appear nested inside the record type.
The record type must be that of an actual Java class, not an interface. The same applies to the types of any fields contained in the type. The reason is that interfaces only define methods but not fields; hence, reflection would not be able to derive any fields or types from the interface.
The one exception to this rule is that Java collections such as
List
andSet
are supported as well as JavaMap
. This is possible because these interfaces are so commonly used that they deserve special handling. These interfaces are parameterized and require special care as described in the next section.The record type must not be recursive. In other words, it cannot contain any class that directly or indirectly contains a member of that same class. This is because a recursive type cannot be represented as a SQL schema.
Fields of a class that are declared static or transient are ignored during schema generation. This means that the record type must have at least one non-transient and non-static field. For example, the
java.util.Date
class has only static and transient fields. Therefore a record type ofDate
is not supported and will result in an exception when the dataset is created.A dataset can only be used in ad-hoc queries if its record type is completely contained in the dataset definition. This means that if the record type is or contains a parameterized type, then the type parameters must be present in the dataset definition. The reason is that the record type must be instantiated when executing an ad-hoc query. If a type parameter depends on the jar file of the application that created the dataset, then this jar file is not available to the query execution runtime.
For example, you cannot execute ad-hoc queries over an
ObjectStore<MyObject>
if theMyObject
is contained in the application jar. However, if you define your own dataset typeMyObjectStore
that extends or encapsulates anObjectStore<MyObject>
, thenMyObject
becomes part of the dataset definition forMyObjectStore
.
Parameterized Types
Suppose instead of being fixed to String
and int
, the Entry
class is generic with type parameters for both key and value:
We should easily be able to implement RecordScannable<GenericEntry<String, Integer>>
by defining getRecordType()
. However, due to Java's runtime type erasure, returning GenericEntry.class
does not convey complete information about the record type. With reflection, CDAP can only determine the names of the two fields, but not their types.
To convey information about the type parameters, we must instead return a ParameterizedType
, which Java's Class
does not implement. An easy way is to use Guava's TypeToken
:
While this seems a little more complex at first sight, it is the de-facto standard way of dealing with Java type erasure.
Complex Types
Your record type can also contain nested structures, lists, or maps, and they will be mapped to type names as defined in the Hive language manual. For example, if your record type is defined as:
The SQL schema of the dataset would be:
Refer to the Hive language manual for more details on schema and data types.
StructuredRecord Type
There are times when your record type cannot be expressed as a plain old Java object. For example, you may want to write a custom dataset whose schema may change depending on the properties it is given. In these situations, you can implement a record-scannable dataset that uses StructuredRecord
as the record type:
The StructuredRecord
class is essentially a map of fields to values, with a Schema
describing the fields and values:
Datasets that use StructuredRecord
as the record type must also set the schema dataset property when they are created:
Failure to set the schema property will result in errors when enabling exploration on the dataset. The dataset will still be created, but it will not be explorable until the schema property is set correctly through the Microservices. In addition, it is up to the user to ensure that the schema set in the dataset properties matches the schema of records returned by the dataset. Schema mismatches will result in runtime errors.
The CDAP Table
and ObjectMappedTable
datasets implement RecordScannable
in this way and can be used as references.
Scanning Records
The second requirement for enabling SQL queries over a dataset is to provide a means of scanning the dataset record by record. Similar to how the BatchReadable
interface makes datasets readable by MapReduce programs by iterating over pairs of key and value, RecordScannable
iterates over records. You need to implement a method to partition the dataset into splits, and an additional method to create a record scanner for each split:
The RecordScanner
is very similar to a SplitReader
; except that instead of nextKeyValue()
, getCurrentKey()
, and getCurrentValue()
, it implements nextRecord()
and getCurrentRecord()
.
Typically, you do not implement these methods from scratch but rely on the BatchReadable
implementation of the underlying Tables and datasets. For example, if your dataset is backed by a Table
:
While this is straightforward, it is even easier if your dataset already implements BatchReadable
. In that case, you can reuse its implementation of getSplits()
and implement the split record scanner with a helper method (Scannables.splitRecordScanner
) already defined by CDAP. It takes a split reader and a RecordMaker
that transforms a key and value, as produced by the BatchReadable
's split reader, into a record:
Note there is an even simpler helper (Scannables.valueRecordScanner
) that derives a split record scanner from a split reader. For each key and value returned by the split reader it ignores the key and returns the value. For example, if your dataset implements BatchReadable<String, Employee>
, then you can implement RecordScannable<Employee>
by defining:
An example demonstrating an implementation of RecordScannable
is included in the CDAP Sandbox in the directory examples/Purchase
, namely the PurchaseHistoryStore
.
Writing to Datasets with SQL
Data can be inserted into datasets using SQL. For example, you can write to a dataset named ProductCatalog
with this SQL query:
In order for a dataset to enable record insertion from SQL query, it simply has to expose a way to write records into itself.
For CDAP datasets, this is done by implementing the RecordWritable
interface. The system dataset KeyValueTable already implements this and can be used to insert records from SQL queries.
Let's take a closer look at the RecordWritable
interface.
Defining the Record Schema
Just like in the RecordScannable
interface, the record schema is given by returning the Java type of each record, using the method:
The same rules that apply to the type of the RecordScannable
interface apply to the type of the RecordWritable
interface. In fact, if a dataset implements both RecordScannable
and RecordWritable
interfaces, they will have to use identical record types.
Writing Records
To enable inserting SQL query results, a dataset needs to provide a means of writing a record into itself. This is similar to how the BatchWritable
interface makes datasets writable from MapReduce programs by providing a way to write pairs of key and value. You need to implement the RecordWritable
method:
Continuing the MyDataset example used above, which showed an implementation of RecordScannable
, this example an implementation of a RecordWritable
dataset that is backed by a Table
:
Note that a dataset can implement either RecordScannable
, RecordWritable
, or both.
Formulating Queries
When creating your queries, keep these limitations in mind:
The query syntax of CDAP is a subset of the variant of SQL that was first defined by Apache Hive.
The SQL commands
UPDATE
andDELETE
are not allowed on CDAP datasets.When addressing your datasets in queries, you need to prefix the dataset name with
dataset_
. For example, if your dataset is namedProductCatalog
, then the corresponding table name isdataset_productcatalog
. Note that the table name is lower-case.If your dataset name contains a '.' or a '-', those characters will be converted to '_' for the Hive table name. For example, if your dataset is named
my-table.name
, the corresponding Hive table name will bedataset_my_table_name
. Beware of name collisions. For example,my.table
will use the same Hive table name asmy_table
.You can also configure the table name by setting the dataset property
explore.table.name
(see Data Exploration).
For more examples of queries, please refer to the Hive language manual.
Created in 2020 by Google Inc.