Confluent Kafka Sink

The Confluent Kafka sink plugin is available for Preview in the Hub.

Use this sink to write data to Confluent. Sends message to specified Kafka topic per received record. It can also be configured to partition events being written to Kafka based on a configurable key. The sink can also be configured to operate in sync or async mode and apply different compression types to events. Can be used with self-managed Confluent Platform or Confluent Cloud. Supports Schema Registry.

Configuration

Property

Macro Enabled?

Version Introduced

Description

Property

Macro Enabled?

Version Introduced

Description

Use Connection

No

2.8.0/6.7.0

Whether to use a connection. If a connection is used, the brokers do not need to be provided.

Connection

Yes

2.8.0/6.7.0

Name of the connection to use. Project and service account information will be provided by the connection. You can also use the macro function ${conn(connection_name)}

Reference Name

No

 

Required. Used to uniquely identify this sink for lineage, annotating metadata, etc.

Kafka Brokers

Yes

 

Required. List of Kafka brokers specified in host1:port1,host2:port2 form.

Kafka Topic

Yes

 

Required. The Kafka topic to read from.

Async

Yes

 

Required. Specifies whether an acknowledgment is required from broker that message was received.

Default is No.

Compression Type

Yes

 

Required. Compression type to be applied on message.

Default is none.

Time Field

No

 

Optional. Name of the field containing the read time of the message. If this is not set, message will be send with current timestamp. If set, this field must be present in the input schema and must be a long.

Key Field

Yes

 

Optional. Name of the field containing the message key. If this is not set, message will be send without a key. If set, this field must be present in the schema property and must be of type bytes.

Partition Field

No

 

Optional. Name of the field containing the partition the message should be written to. If this is not set, default partition will be used for all messages. If set, this field must be present in the schema property and must be an int.

Additional Kafka Producer Properties

Yes

 

Optional. Additional Kafka producer properties to set.

Cluster API Key

Yes

 

Optional. The Confluent API Key used for the source.

Cluster API Secret

Yes

 

Optional. The Confluent API Secret used for the source.

Schema Registry URL

Yes

 

Optional. The Schema Registry endpoint URL.

Schema Registry API Key

Yes

 

Optional. The Schema Registry API Key.

Schema Registry API Secret

Yes

 

Optional. The Schema Registry API Secret.

Message Format

Yes

 

Optional. Format a structured record should be converted to. Required if used without Schema Registry.

Example

This example writes structured record to kafka topic ‘alarm’ in asynchronous manner using compression type ‘gzip’. The written events will be written in csv format to kafka running at localhost. The Kafka partition will be decided based on the provided key ‘ts’. Additional properties like number of acknowledgements and client id can also be provided.

Property

Value

Property

Value

Reference Name

Kafka

Kafka Brokers

host1.example.com:9092,host2.example.com:9092

Kafka Topic

alarm

Async

Yes

Compression Type

gzip

Key Field

message

Additional Kafka Producer Properties

kafkaProperties": "acks:2,client.id:myclient

Cluster API Key

""

Cluster API Secret

""

Message Format

CSV



Created in 2020 by Google Inc.