Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »


Description

spark streaming source to ingest realtime streaming data into hydrator pipelines.

User Story

User have a continuos data being generated by click streams which is being dumped into amazon kinesis streams from various nodes. User has live dashboards running powered by hydrator pipelines and want to feed the data being generated into the dashboards. User can leverage kinesis spark streaming source to connect the incoming stream to hydrator pipeline.

This will also help users to leverage Aws sources that are not currently supported by hydrator or dont have existing plugins for them. Kinesis has out of the box support for all the AWS products and can act as a connector between aws services and hydrator pipelines.

Properties

Kinesis App Name: The application name that is  used to checkpoint the Kinesis sequence numbers in DynamoDB table by Kinesis spark streaming library. The application name must be unique for a given account and region.

Kinesis Stream name: The Kinesis stream that this streaming application will pull data from.

Endpoint URL: Valid Kinesis endpoints URL can be found here.

Region name: Valid Kinesis region names can be found here.

Checkpoint interval: The interval (e.g., Duration(2000) = 2 seconds) at which the Kinesis Client Library saves its position in the stream. For starters, set it to the same as the batch interval of the streaming application.

Initial position: Can be either InitialPositionInStream.TRIM_HORIZON or InitialPositionInStream.LATEST (see Kinesis Checkpointing section and Amazon Kinesis API documentation for more details).

Aws access key Id: Api access key Id for amazon web services

Aws access secret: Api access secret for amazon web services

Design

 

Implementation

 

getStream
public JavaDStream<StructuredRecord> getStream(StreamingContext streamingContext) throws Exception {
  registerUsage(streamingContext);
  JavaStreamingContext javaStreamingContext = streamingContext.getSparkStreamingContext();
  JavaReceiverInputDStream<byte[]> kinesisStream = KinesisUtils.createStream(javaStreamingContext, config);
return kinesisStream..map(new MyObjectToStructuredRecordFunction());

 

 

  • No labels