Description
spark Spark streaming source to ingest realtime real-time streaming data into hydrator Hydrator pipelines.
User Story
User have has a continuos continuous data being stream, generated by click streams, which is being dumped into amazon kinesis Amazon Kinesis streams from various nodes. User has live dashboards running powered by hydrator Hydrator pipelines and want to feed the data being generated into the dashboards. User can leverage kinesis spark Kinesis Spark streaming source to connect the incoming stream to hydrator a Hydrator pipeline.
This will also help users to leverage Aws AWS sources that are not currently supported by hydrator Hydrator or dont don't have existing plugins for them. Kinesis has out-of-the-box support for all of the AWS products and can act as a connector between aws AWS services and hydrator Hydrator pipelines.
Properties
Kinesis App Name: The application name that is used used to checkpoint the Kinesis sequence numbers in a DynamoDB table by the Kinesis spark Spark streaming library. The application name must be unique for a given account and region.
Kinesis Stream name: The Kinesis stream that this streaming application will pull data from.
Endpoint URL: Valid A valid Kinesis endpoints URL can be found here.
Region name: Valid A valid Kinesis region names can be found herename(s?).
Checkpoint interval: The interval (e.g., Durationfor example, duration(2000) = 2 seconds) at which the Kinesis Client Library saves its position in the stream. For starters, set it to the same as the batch interval of the streaming application.
Initial position: Can be either InitialPositionInStream.TRIM_HORIZON or InitialPositionInStream.LATEST (see Kinesis Checkpointing section and of the Amazon Kinesis API documentation for more details).
Aws AWS access key Id: Api access key Id for amazon web servicesAws ID for Amazon Web Services
AWS access secret: Api access secret for amazon web servicessecret for Amazon Web Services
Design
Implementation
Code Block | ||||
---|---|---|---|---|
| ||||
public JavaDStream<StructuredRecord> getStream(StreamingContext streamingContext) throws Exception { registerUsage(streamingContext); JavaStreamingContext javaStreamingContext = streamingContext.getSparkStreamingContext(); JavaReceiverInputDStream<byte[]> kinesisStream = KinesisUtils.createStream(javaStreamingContext, config); return kinesisStream.map(new MyObjectToStructuredRecordFunction()); |
...