Amazon Kinesis Spark Streaming Source

The Amazon Kinesis Spark Stream source plugin is available in the Hub.

Plugin version: 2.0.0

Apache Spark streaming source that reads from AWS Kinesis streams. Use this source when you want to read data from a Kinesis stream in real-time. For example, you might want to read data from a Kinesis stream write it to Google BigQuery.

This source requires Scala 2.12 for execution (Dataproc 1.5, Dataproc 2.0 or later).

Note: Before you run a pipeline with this source, make sure the AWS IAM policy and user have the need permissions.

Configuration

Property

Macro Enabled?

Description

Property

Macro Enabled?

Description

Reference Name

No

Required. Name used to uniquely identify this source for lineage, annotating metadata, etc.

Application Name

Yes

Required. The name of the Kinesis application. The application name that is used to checkpoint the Kinesis sequence numbers in DynamoDB table.

Stream Name

Yes

Required. The name of the Kinesis stream to the get the data from. The stream should be active. 

Kinesis Endpoint url

Yes

Required. Valid Kinesis endpoint URL, for example, Kinesis.us-east-1.amazonaws.com. 

Region

Yes

Required. Valid Kinesis region URL, for example, ap-south-1.

Default is us-east-1.

Checkpoint Interval Duration

Yes

Required. The interval in milliseconds at which the Kinesis Client Library saves its position in the stream

Initial Position in Stream

Yes

Required. Initial position in the stream. Can be either TRIM_HORIZON or LATEST.

Default is LATEST.

AWS Access Key ID

Yes

Required. The access Id provided by AWS required to access the Kinesis streams. The Id can be stored in CDAP secure store and can be provided as macro configuration. 

AWS Access Secret

Yes

Required. AWS access key secret having access to Kinesis streams. The key can be stored in CDAP secure store and can be provided as macro configuration.

Format

No

Optional. Format of the Kinesis shard payload. Any format supported by CDAP is supported. For example, a value of ‘csv’ will attempt to parse Kinesis payloads as comma-separated values. If no format is given, Kinesis payloads will be treated as bytes.

Example

This example will read from Kinesis stream named ‘MyKinesisStream’. It will spin up 1 Kinesis Receiver per shard for the given stream. It starts pulling from the last checkpointed sequence number of the given stream.

Property

Value

Property

Value

Reference Name

myKinesisApp

Application Name

myKinesisApp

Stream Name

MyKinesisStream

Kinesis endpoint url

1

Checkpoint interval duration

2000

Initial position in stream

LATEST

AWS access key id

my_aws_access_key

AWS access secret

my_aws_access_secret

Format

csv

For each Kinesis message read, it will output a record with the schema:

field name

type

field name

type

user

string

item

string

count

int

price

double

 

Created in 2020 by Google Inc.