Couchbase database plugin

Couchbase database plugin

Introduction

A separate database plugin to support Couchbase-specific features and configurations.

Use-Case

  • Users can choose and install Couchbase source and sink plugins.

  • Users should see Couchbase logo on plugin configuration page for better experience.

  • Users should get relevant information from the tool tip:

    • The tool tip should describe accurately what each field is used for.

  • Users should not have to specify any redundant configuration

  • Users should get field level lineage for the source and sink that is being used.

  • Reference documentation should be updated to account for the changes.

  • The source code for Couchbase database plugin should be placed in repo under data-integrations org.

  • The data pipeline using source and sink plugins should run on both mapreduce and spark engines.

User Stories

  • User should be able to install Couchbase specific database source and sink plugins from the Hub

  • Users should have each tool tip accurately describe what each field does

  • Users should get field level lineage information for the Couchbase source and sink 

  • Users should be able to setup a pipeline avoiding specifying redundant information

  • Users should get updated reference document for Couchbase source and sink

  • Users should be able to read all the DB types

Plugin Type

Batch Source
Batch Sink 
Real-time Source
Real-time Sink
Action
Post-Run Action
Aggregate
Join
Spark Model
Spark Compute

Couchbase Overview

Couchbase Server is an open-source, distributed, NoSQL document-oriented engagement database. It exposes a fast key-value store with managed cache for sub-millisecond data operations, purpose-built indexers for fast queries and a powerful query engine for executing SQL-like queries. Sub-millisecond data operations are provided by powerful services for querying and indexing, and by a feature-rich, document-oriented query-language, N1QL. Multiple instances of Couchbase Server can be combined into a single cluster. Couchbase Server stores data as items. Each item consists of a key, by which the item is referenced; and an associated value, which must be either binary or a JSON document. Items are stored in named Buckets

The Couchbase Data Model

The Couchbase data model is based on JSON, which provides a simple, lightweight, human-readable notation. It supports basic data types, such as numbers and strings; and complex types, such as embedded documents and arrays. Couchbase Server does not enforce uniformity: document-structures can vary.

N1QL Query Language

N1QL Query Language embraces the JSON document model and uses SQL-like syntax. In N1QL, you operate on JSON documents, and the result of your operation is another JSON document. 

A basic N1QL query has the following parts:

  • SELECT — The fields of each document to return.

  • FROM — The data bucket in which to look.

  • WHERE — The conditions that the document must satisfy.

Here’s an example of a basic N1QL query and the JSON document it returns. The query asks for the country that is associated with the airline Excel Airways:

SELECT country FROM `travel-sample` WHERE name = "Excel Airways";


Note that for all identifiers (bucket names) that contain a hyphen character, the name is needed to be enclosed with backtick (`) characters.
The results:

{ "requestID": "9e1cd084-f45e-4059-9e7a-edec30f60dd2", "signature": { "country": "json" }, "results": [ { "country": "United Kingdom" } ], "status": "success", "metrics": { "elapsedTime": "7.42097249s", "executionTime": "7.420925841s", "resultCount": 1, "resultSize": 51 } }


Design Tips

Couchbase Java SDK reference: https://docs.couchbase.com/java-sdk/2.7/start-using-sdk.html

 

Design

The suggestion is to create a new maven project in it's own repository: https://github.com/data-integrations/couchbase 

Although Couchbase has JDBC drivers provided by Simba and cdata, they are paid products. So, it seems that a better solution is to use the Official Couchbase Java SDK with the Apache-2.0 license.

Compatibility

The suggestion is to use Java SDK 2.7 which is compatible with Couchbase Server versions 4.0-4.5, 4.6, 5.0-5.5, 6.0.

Couchbase Server 3.x can not be supported.

Output schema inference

Couchbase Server 4.5 introduces INFER, a N1QL statement that infers the metadata of documents. This can be used to infer the Output Schema.

For sample `test-bucket` bucket, which contains 3 documents as below:

[ { "test-bucket": { "boolean": false, "boolean_array": [], "created": 666028241280339, "date_as_string": "2006-03-02T15:04:05.567+08:00", "null": null, "number_array": [], "number_big_int": 0, "number_decimal": 0, "number_double": 0, "number_int": 0, "number_long": 0, "object": { "key": "value" }, "object_array": [], "object_map": { "key": "value" }, "string": "" } }, { "test-bucket": { "boolean": false, "boolean_array": [ true, false ], "created": 666028241251904, "date_as_string": "2006-03-02T15:04:05.567+08:00", "null": null, "number_array": [ -9223372036854775808, 0, 9223372036854775807 ], "number_big_int": 18446744073709552000, "number_decimal": 3.14, "number_double": 179769313486231570000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000, "number_int": 2147483647, "number_long": 9223372036854775807, "object": { "key": "value" }, "object_array": [], "object_map": { "key": "value" }, "string": "string_value" } }, { "test-bucket": { "boolean": true, "boolean_array": [ true, false ], "created": 666028240839735, "date_as_string": "2006-01-02T15:04:05.567+08:00", "null": null, "number_array": [ -9223372036854775808, 0, 9223372036854775807 ], "number_big_int": -18446744073709552000, "number_decimal": 3.14, "number_double": 0.000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000005, "number_int": -2147483648, "number_long": -9223372036854775808, "object": { "key": "value" }, "object_array": [ { "key": "value1" }, { "key": "value2" }, { "key": "value3" } ], "object_map": { "key": "value" }, "string": "string_value" } } ]

INFER `test-bucket` statement will result in:

[ [ { "#docs": 3, "$schema": "http://json-schema.org/schema#", "Flavor": "", "properties": { "boolean": { "#docs": 3, "%docs": 100, "samples": [ false, true ], "type": "boolean" }, "boolean_array": { "#docs": 3, "%docs": 100, "items": { "type": "boolean" }, "maxItems": 2, "minItems": 0, "samples": [ [], [ true, false ] ], "type": "array" }, "created": { "#docs": 3, "%docs": 100, "samples": [ 666028240839735, 666028241251904, 666028241280339 ], "type": "number" }, "date_as_string": { "#docs": 3, "%docs": 100, "samples": [ "2006-01-02T15:04:05.567+08:00", "2006-03-02T15:04:05.567+08:00" ], "type": "string" }, "null": { "#docs": 3, "%docs": 100, "samples": [ null ], "type": "null" }, "number_array": { "#docs": 3, "%docs": 100, "items": { "type": "number" }, "maxItems": 3, "minItems": 0, "samples": [ [], [ -9223372036854775808, 0, 9223372036854775807 ] ], "type": "array" }, "number_big_int": { "#docs": 3, "%docs": 100, "samples": [ -18446744073709552000, 0, 18446744073709552000 ], "type": "number" }, "number_decimal": { "#docs": 3, "%docs": 100, "samples": [ 0, 3.14 ], "type": "number" }, "number_double": { "#docs": 3, "%docs": 100, "samples": [ 0, 0.000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000005, 179769313486231570000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000 ], "type": "number" }, "number_int": { "#docs": 3, "%docs": 100, "samples": [ -2147483648, 0, 2147483647 ], "type": "number" }, "number_long": { "#docs": 3, "%docs": 100, "samples": [ -9223372036854775808, 0, 9223372036854775807 ], "type": "number" }, "object": { "#docs": 3, "%docs": 100, "properties": { "key": { "#docs": 3, "%docs": 100, "samples": [ "value" ], "type": "string" } }, "samples": [ { "key": "value" } ], "type": "object" }, "object_array": { "#docs": 3, "%docs": 100, "items": { "#docs": 3, "$schema": "http://json-schema.org/schema#", "Flavor": "", "properties": { "key": { "type": "string" } }, "type": "object" }, "maxItems": 3, "minItems": 0, "samples": [ [], [ { "key": "value1" }, { "key": "value2" }, { "key": "value3" } ] ], "type": "array" }, "object_map": { "#docs": 3, "%docs": 100, "properties": { "key": { "#docs": 3, "%docs": 100, "samples": [ "value" ], "type": "string" } }, "samples": [ { "key": "value" } ], "type": "object" }, "string": { "#docs": 3, "%docs": 100, "samples": [ "string_value", "" ], "type": "string" } }, "type": "object" } ] ]

Let's consider a single attribute for 'number_double' property. The value of this property for the first document equals java.lang.Double.MIN_VALUE, for the second - java.lang.Double.MAX_VALUE, for the third - 0.

... "number_double": { "#docs": 3, "%docs": 100, "samples": [ 0, 0.000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000005, 179769313486231570000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000 ], "type": "number" } ...

 

For each identified attribute, the statement returns the following details:

  • #docs: Specifies the number of documents in the sample that contain this attribute.

  • %docs: Specifies the percentage of documents in the sample that contain this attribute.

  • minitems: If the data type is an array, specifies the minimum number of elements (array size).

  • maxitems: If the data type is an array, specifies the maximum number of elements (array size).

  • samples: Displays a list of sample values for the attribute found in the sample population.

  • type: Specifies the identified data type of the attribute.

Thus, we can use type and samples info to infer schema field type.

However, there are some concerns:

  • Couchbase's number type can be mapped to a CDAP int, long, double and decimal. To infer actual CDAP type we can do our best by analyzing samples. 

          Let's suppose all values of some property less than java.lang.Integer.MAX_VALUE, but one equals to java.lang.Long.MAX_VALUE. In the case, when samples do not contain java.lang.Long.MAX_VALUE, we will infer invalid CDAP type.          

  • There is no way to determine if a field is nullable. The proposal is to make all fields nullable and let the user change this manually,

  • INFER does not honor the SELECT query and returns all documents attributes. If we want to filter out schema fields according to the specified query, we have to manually parse the query.

  • INFER statement supported since Couchbase Server 4.5, so we won't be able to support versions [4.0-4.5).

     

Source Splitter

The proposal is to add "Number of Splits" Source configuration property, which allows specifying the desired number of splits to divide the query into when reading from Couchbase. 

Fewer splits may be created if the query cannot be divided into the desired number of splits.

Also, we can use '0' as the default value for this configuration property and determine the number of splits according to the number of map tasks (controlled by the "mapreduce.job.maps" property):

public List<InputSplit> getSplits(JobContext job) throws IOException { ... int targetNumTasks = job.getConfiguration().getInt(MRJobConfig.NUM_MAPS, 1); ...


'SELECT COUNT(*)' query can be used in order to get a total number of documents, that will be divided between splits using 'OFFSET' and 'LIMIT'.

 

Source Properties

Section

User Configuration Label

Label Description

Options

Default

Variable

User Widget

Section

User Configuration Label

Label Description

Options

Default

Variable

User Widget

General

Label

Label for UI.

 

 

 

textbox

 

Reference Name

Uniquely identified name for lineage.

 

 

referenceName

textbox

 

Nodes

List of nodes to use when connecting to the Couchbase cluster.

 

 

nodes

csv

 

Bucket

Couchbase Bucket name.

 

 

bucket

textbox

 

Select Fields

Comma-separated list of fields to be read.

 

*

selectFields

textbox

 

Conditions

Optional criteria (filters or predicates) that the result documents must satisfy. Corresponds to the WHERE clause in N1QL SELECT statement.

 

 

conditions

textbox

 

Output Schema

Specifies the schema of the documents.

 

 

schema

schema

 

Number of Splits

Desired number of splits to divide the query into when reading from Couchbase. Fewer splits may be created if the query cannot be divided into the desired number of splits. If the specified value is zero, the plugin will use the number of map tasks as the number of splits.

 

0

numSplits

number

Credentials

Username

User identity for connecting to the Couchbase.

 

 

username

textbox

 

Password

Password to use to connect to the Couchbase.

 

 

password

password

Error Handling

On Record Error

How to handle error in record processing.

  • Skip error

  • Send to error

  • Fail pipeline

Fail pipeline

on-error

radio-group (layout: block)

Advanced

Sample Size

Specifies the number of documents to randomly sample in the bucket when inferring the schema. The default sample size is 1000 documents. If a bucket contains fewer documents than the specified number, then all the documents in the bucket will be used.

 

1000

sampleSize

number

 

Max Parallelism

Maximum number of CPU cores can be used to process a query. If the specified value is less than zero or greater than the total number of cores in a cluster, the system will use all available cores in the cluster.

 

0

maxParallelism

number

 

Scan Consistency

Specifies the consistency guarantee or constraint for index scanning

  • Not Bounded

  • At Plus

  • Request Plus

  • Statement Plus

Not Bounded

scanConsistency

select

 

Query Timeout

Number of seconds to wait before a timeout has occurred on a query. 

 

600

timeout

number

Notes:

Source Data Types Mapping

The source requires Output Schema to be set. Based on the schema source will expect a field in each document to be of a specific Couchbase data type.

On Record Error error handling property allows the user to decide whether the pipeline should fail, the record should be skipped, or the record should be sent to the error dataset.

The following table shows what Couchbase data types can be read as CDAP types.

Couchbase Data Types

CDAP Schema Data Type

Couchbase Data Types

CDAP Schema Data Type

Boolean

boolean

Number

int, long, double, decimal, string

String

string

Array

array

Object

record

The following schema:

{"type":"record","name":"object","fields":[{"name":"inner_field","type":"string"}]}

is used for 'object' field:

{ "object" : { "inner_field" : "val" } }

 

map

The following schema:

{"type":"map","keys":"string","values":"string"}

is used for 'object' field:

{ "object" : { "inner_field" : "val" } }

 

Notes:

See: 

 

Sink Properties

Section

User Configuration Label

Label Description

Options

Default

Variable

User Widget

Section

User Configuration Label

Label Description

Options

Default

Variable

User Widget

General

Label

Label for UI.

 

 

 

textbox

 

Reference Name

Uniquely identified name for lineage.

 

 

referenceName

textbox

 

Nodes

List of nodes to use when connecting to the Couchbase cluster.

 

 

nodes

csv

 

Bucket

Couchbase Bucket name.

 

 

bucket

textbox

 

Key Field

Allows the user to specify which of the incoming fields should be used as a document identifier.

Identifier is expected to be of type string.

 

 

keyField

input-field-selector

 

Operation

Type of write operation to perform. This can be set to Insert, Replace or Upsert.

  • Insert

  • Replace

  • Upsert

Insert

operation

radio-group

Credentials

Username

User identity for connecting to the Couchbase.

 

 

username

textbox

 

Password

Password to use to connect to the Couchbase.

 

 

password

password

Advanced

Batch Size

Size (in number of records) of the batched writes to the Couchbase bucket. Each write to Couchbase contains some overhead. To maximize bulk write throughput, maximize the amount of data stored per write. Commits of 1 MiB usually provide the best performance. Default value is 100 records.

 

100

batchSize

number

Notes:

Sink Data Types Mapping

CDAP Schema Data Type

Couchbase Data Types

CDAP Schema Data Type

Couchbase Data Types

boolean

Boolean

bytes

base64 encoded String

Created in 2020 by Google Inc.