Confluent Kafka Sink
The Confluent Kafka sink plugin is available for Preview in the Hub.
Use this sink to write data to Confluent. Sends message to specified Kafka topic per received record. It can also be configured to partition events being written to Kafka based on a configurable key. The sink can also be configured to operate in sync or async mode and apply different compression types to events. Can be used with self-managed Confluent Platform or Confluent Cloud. Supports Schema Registry.
Configuration
Property | Macro Enabled? | Version Introduced | Description |
---|---|---|---|
Use Connection | No | 2.8.0/6.7.0 | Whether to use a connection. If a connection is used, the brokers do not need to be provided. |
Connection | Yes | 2.8.0/6.7.0 | Name of the connection to use. Project and service account information will be provided by the connection. You can also use the macro function |
Reference Name | No |
| Required. Used to uniquely identify this sink for lineage, annotating metadata, etc. |
Kafka Brokers | Yes |
| Required. List of Kafka brokers specified in |
Kafka Topic | Yes |
| Required. The Kafka topic to read from. |
Async | Yes |
| Required. Specifies whether an acknowledgment is required from broker that message was received. Default is No. |
Compression Type | Yes |
| Required. Compression type to be applied on message. Default is none. |
Time Field | No |
| Optional. Name of the field containing the read time of the message. If this is not set, message will be send with current timestamp. If set, this field must be present in the input schema and must be a long. |
Key Field | Yes |
| Optional. Name of the field containing the message key. If this is not set, message will be send without a key. If set, this field must be present in the schema property and must be of type bytes. |
Partition Field | No |
| Optional. Name of the field containing the partition the message should be written to. If this is not set, default partition will be used for all messages. If set, this field must be present in the schema property and must be an int. |
Additional Kafka Producer Properties | Yes |
| Optional. Additional Kafka producer properties to set. |
Cluster API Key | Yes |
| Optional. The Confluent API Key used for the source. |
Cluster API Secret | Yes |
| Optional. The Confluent API Secret used for the source. |
Schema Registry URL | Yes |
| Optional. The Schema Registry endpoint URL. |
Schema Registry API Key | Yes |
| Optional. The Schema Registry API Key. |
Schema Registry API Secret | Yes |
| Optional. The Schema Registry API Secret. |
Message Format | Yes |
| Optional. Format a structured record should be converted to. Required if used without Schema Registry. |
Example
This example writes structured record to kafka topic ‘alarm’ in asynchronous manner using compression type ‘gzip’. The written events will be written in csv format to kafka running at localhost. The Kafka partition will be decided based on the provided key ‘ts’. Additional properties like number of acknowledgements and client id can also be provided.
Property | Value |
---|---|
Reference Name |
|
Kafka Brokers |
|
Kafka Topic |
|
Async |
|
Compression Type |
|
Key Field |
|
Additional Kafka Producer Properties |
|
Cluster API Key |
|
Cluster API Secret |
|
Message Format |
|
Created in 2020 by Google Inc.