Kafka Consumer Batch Source

Plugin version: 3.1.0

Kafka Consumer Batch source emits the record from Kafka. It will emit a record based on the schema and format you use, or if no schema or format is specified, the message payload will be emitted. The source will remember the offset it read last run and continue from that offset for the next run. The Kafka batch source supports providing additional Kafka properties for the Kafka consumer, reading from kerberos-enabled Kafka and limiting the number of records read. This plugin uses Kafka 2.6.0 java apis.

This source is used whenever you want to read from Kafka. For example, you might want to read messages from Kafka and write them to Amazon S3.

If the Kafka instance is SSL enabled, see Reading from and writing to SSL enabled Apache Kafka for additional configuration information.

Configuration

Property

Macro Enabled?

Version Introduced

Description

Property

Macro Enabled?

Version Introduced

Description

Use connection

No

6.7.0/2.8.0

Optional. Whether to use a connection. If a connection is used, you do not need to provide the credentials.

Connection

Yes

6.7.0/2.8.0

Required. Name of the connection to use. You also can use the macro function ${conn(connection-name)}.

Kafka Brokers

Yes

 

Required. List of Kafka brokers specified in host1:port1,host2:port2 form. 

Reference Name

No

 

Required. Name used to uniquely identify this source for lineage, annotating metadata, etc.

Kafka Topic

Yes

 

Required. The Kafka topic to read from.

Offset Directory

Yes

 

Optional. Directory path to track the latest offset we read from Kafka. It is useful for incrementally processing data from Kafka across subsequent runs.

Topic Partitions

Yes

 

Optional. List of topic partitions to read from. If not specified, all partitions will be read. 

Initial Offsets

Yes

 

Optional. The initial offset for each topic partition. This offset will only be used for the first run of the pipeline. Any subsequent run will read from the latest offset from previous run. Offsets are inclusive. If an offset of 5 is used, the message at offset 5 will be read.

Key Field

No

 

Optional. Name of the field containing the message key. If this is not set, no key field will be added to output records. If set, this field must be present in the schema property and must be bytes.

Partition Field

No

 

Optional. Name of the field containing the partition the message was read from. If this is not set, no partition field will be added to output records. If set, this field must be present in the schema property and must be an Int.

Offset Field

No

 

Optional. Name of the field containing the partition offset the message was read from. If this is not set, no offset field will be added to output records. If set, this field must be present in the schema property and must be a long.

Max Number Records

Yes

 

Optional. The maximum of messages the source will read from each topic partition. If the current topic partition does not have this number of messages, the source will read to the latest offset. Note that this is an estimation, the actual number of messages the source read may be smaller than this number.

Additional Kafka Consumer Properties

Yes

 

Optional. Additional Kafka consumer properties to set.

Format

No

 

Optional. Format of the Kafka event message. Any format supported by CDAP is supported. For example, a value of ‘csv’ will attempt to parse Kafka payloads as comma-separated values. If no format is given, Kafka message payloads will be treated as bytes.

Kerberos Principal

Yes

 

Optional. The kerberos principal used for the source when kerberos security is enabled for Kafka.

Keytab Location

Yes

 

Optional. The keytab location for the kerberos principal when kerberos security is enabled for Kafka.

Output Schema

No

 

Required. The output schema for the data.

Example

This example reads from the ‘purchases’ topic of a Kafka instance running on brokers host1.example.com:9092 and host2.example.com:9092. The source will add a field named key that will have the message key in it. It parses the Kafka messages using the csv format with user, item, count, and price as the message schema.

Property

Value

Property

Value

Reference Name

kafka-src

Kafka Brokers

host1.example.com:9092,host2.example.com:9092

Kafka Topic

purchases

Key Field

key

Format

csv

Output Schema

{ \"type\":\"record\", \"name\":\"purchase\", \"fields\":[ {\"name\":\"key\",\"type\":\"bytes\"}, {\"name\":\"user\",\"type\":\"string\"}, {\"name\":\"item\",\"type\":\"string\"}, {\"name\":\"count\",\"type\":\"int\"}, {\"name\":\"price\",\"type\":\"double\"} ] }" } }

For each Kafka message read, it will output a record with the schema:

field name

type

field name

type

key

bytes

user

string

item

string

count

int

price

double



Created in 2020 by Google Inc.