Confluent Kafka Streaming Source

The Confluent Kafka Streaming source plugin is available for Preview in the Hub.

This source does the following:

  • Reads data from Confluent.

  • Emits a record per message from a specified Kafka topic.

  • Can be used with self-managed Confluent Platform or Confluent Cloud.

  • Supports Schema Registry.

  • In plugin version 2.0.0 and later, supports state management and at-least-once processing.

It can be configured to parse values from source in following ways:

  1. User-defined format. Use Message Format field to choose any format supported by CDAP.

  2. Schema Registry. Requires credentials for Schema Registry to be specified. Uses Avro schemas to deserialize Kafka messages. Use Get Schema button to fetch key and value schemas from registry.

  3. Binary format. Used in case if no message format or Schema Registry credentials were provided.

Configuration

Property

Macro Enabled?

Description

Property

Macro Enabled?

Description

Reference Name

No

Required. This will be used to uniquely identify this source for lineage, annotating metadata, etc.

Kafka Brokers

Yes

Required. List of Kafka brokers specified in host1:port1,host2:port2 form.

Kafka Topic

Yes

Required. The Kafka topic to read from. 

Topic Partitions

Yes

Optional. List of topic partitions to read from. If not specified, all partitions will be read.

Default Initial Offset

Yes

Optional. The default initial offset for all topic partitions. An offset of -2 means the smallest offset. An offset of -1 means the latest offset. Defaults to -1. Offsets are inclusive. If an offset of 5 is used, the message at offset 5 will be read. If you wish to set different initial offsets for different partitions, use the initialPartitionOffsets property. 

Default is -1.

Initial Partition Offsets

Yes

Optional. The initial offset for each topic partition. If this is not specified, all partitions will use the same initial offset, which is determined by the defaultInitialOffset property. Any partitions specified in the partitions property, but not in this property will use the defaultInitialOffset. An offset of -2 means the smallest offset. An offset of -1 means the latest offset. Offsets are inclusive. If an offset of 5 is used, the message at offset 5 will be read.

Time Field

No

Optional. Name of the field containing the read time of the batch. If this is not set, no time field will be added to output records. If set, this field must be present in the schema property and must be a long.

Key Field

No

Optional. Name of the field containing the message key. If this is not set, no key field will be added to output records. If set, this field must be present in the schema property and must be bytes.

Partition Field

No

Optional. Name of the field containing the partition the message was read from. If this is not set, no partition field will be added to output records. If set, this field must be present in the schema property and must be an int.

Offset Field

No

Optional. Name of the field containing the partition offset the message was read from. If this is not set, no offset field will be added to output records. If set, this field must be present in the schema property and must be a long.

Max Rate Per Partition

No

Optional. Maximum number of records to read per second per partition.

Default is 1000.

Additional Kafka Consumer Properties

Yes

Optional. Additional Kafka consumer properties to set.

Cluster API Key

Yes

Optional. The Confluent API Key used for the source.

Cluster API Secret

Yes

Optional. The Confluent API Secret used for the source.

Schema Registry URL

Yes

Optional. The Schema Registry endpoint URL.

Schema Registry API Key

Yes

Optional. The Schema Registry API Key.

Schema Registry API Secret

Yes

Optional. The Schema Registry API Secret.

Value Field

Yes

Optional. The name of the field containing the message value. Required to fetch schema from Schema Registry.

Message Format

No

Optional. Format of the Kafka event message. Any format supported by CDAP is supported. For example, a value of ‘csv’ will attempt to parse Kafka payloads as comma-separated values. If no format is given, Kafka message payloads will be treated as bytes.

Output Schema

No

Required. Output schema of the source. If you would like the output records to contain a field with the Kafka message key, the schema must include a field of type bytes/nullable bytes or string/nullable string, and you must set the Key Field property to that field’s name. Similarly, if you would like the output records to contain a field with the timestamp of when the record was read, the schema must include a field of type long or nullable long, and you must set the Time Field property to that field’s name. Any field that is not the Time Field or Key Field will be used in conjunction with the format to parse Kafka message payloads. If used with Schema Registry then should be fetched using Get Schema button.

Example

Read from the ‘purchases’ topic of a Kafka instance running on brokers host1.example.com:9092 and host2.example.com:9092. The source will add a time field named ‘readTime’ that contains a timestamp corresponding to the micro batch when the record was read. It will also contain a field named ‘key’ which will have the message key in it. It parses the Kafka messages using the ‘csv’ format with ‘user’, ‘item’, ‘count’, and ‘price’ as the message schema.

Property

Value

Property

Value

Kafka Brokers

host1.example.com:9092,host2.example.com:9092

Kafka Topic

purchases

Default Initial Offset

-2

Time Field

readTime

Key Field

key

Cluster API Key

““

Cluster API Secret

““

Message Format

csv

Output Schema

"schema": "{ \"type\":\"record\", \"name\":\"purchase\", \"fields\":[ {\"name\":\"readTime\",\"type\":\"long\"}, {\"name\":\"key\",\"type\":\"bytes\"}, {\"name\":\"user\",\"type\":\"string\"}, {\"name\":\"item\",\"type\":\"string\"}, {\"name\":\"count\",\"type\":\"int\"}, {\"name\":\"price\",\"type\":\"double\"} ] }"

For each Kafka message read, it will output a record with the schema:

field name

type

field name

type

readTime

long

key

bytes

user

string

item

string

count

int

price

double

Note that the readTime field is not derived from the Kafka message, but from the time that the message was read.



Created in 2020 by Google Inc.