HTTPSink

Introduction

In some instances, the results of a pipeline may need to be posted to an external webservice. For example, you could have a processing pipeline that would send messages to Slack via the rest endpoint or you may want to send notifications to a 3rd party website. This sink would send the messages from the pipeline to an external http endpoint. 

Use case(s)

  • I would like to post a notification to Slack every time a user from my website sees a 500 error. I would like to set up a realtime spark streaming pipeline to read my weblog data, filter for messages that have a 500 error, and post a custom message to slack with details from the message such as the url.
  • I am leveraging a 3rd party reporting tool for updating metrics in a dashboard. I would like to create a realtime pipeline to generate those metrics and post them to the 3rd party reporting service. I would like to use a realtime spark streaming pipeline, configure windows for aggregations, then send those aggregated stats to the 3rd party using this HTTP Sink.

User Storie(s)

  • As a pipeline developer, i would like to post data to an external webservice by providing the request method (GET, POST, PUT, DELETE), url, payload (If POST or PUT), request headers, timeouts. 
  • As a pipeline developer, i would like to be able to define a custom POST payload leveraging fields from the message. 
  • As a pipeline developer, I would like to batch my updates if required, so that it would post to the external service only when n number of messages has been sent.
  • As a pipeline developer, I would like the plugin to retry an configurable amount of time before failing the pipeline
  • As a pipeline developer, I would like to be able to send basic auth credentials by providing a username and password in the config
  • As a pipeline developer, I would like to be able to send to http and https endpoints.

Plugin Type

  • BatchSink

Configurables

This section defines properties that are configurable for this plugin. 

User Facing NameTypeDescriptionConstraintsMacro Enabled?
URLString
Required. The URL to post data to.
 yes
Request MethodSelect
The HTTP request method.
GET, POST, PUT, DELETE 
Batch SizeStringThe number of messages to batch before sending> 0, default 1 (no batching)yes
FormatSelectThe format to send the message in. JSON will format the entire input record to json and send it as a payload. Form will convert the input message to a query string and send it in the payload. Custom will leverage the request body field to send.JSON, Form, Custom 
Request BodyString
Optional request body. Only required if Custom format is specified.
 yes
Content TypeStringUsed to specify the Content-Type header. yes
Request HeadersKeyValue
An optional string of header values to send in each request where the keys and values are
delimited by a colon (":") and each pair is delimited by a newline ("\n").
 yes
Should Follow Redirects?Select
Whether to automatically follow redirects. Defaults to true.
true,false 
Number of RetriesSelect
The number of times the request should be retried if the request fails. Defaults to 3.
0,1,2,3,4,5,6,7,8,9,10 
Connect TimeoutString
The time in milliseconds to wait for a connection. Set to 0 for infinite. Defaults to 60000 (1 minute).
  
Read TimeoutString
The time in milliseconds to wait for a read. Set to 0 for infinite. Defaults to 60000 (1 minute).
  

Design / Implementation Tips

  • Please use HTTPPoller and HTTPCallback in Hydrator plugins as a reference.
  • If a user selects json, the content-type header should be set to application/json. Form should be set to application/x-www-form-urlencoded. 
  • When formatting the message as a query string, don't forget to urlencode the values
  • We will need to define some sort of macro language so that the user can leverage message fields in their post payload. For example, i might define my payload as \{ "messageType" : "update", "name" : "%{firstName}" \} where %{firstName} will be substituted for the value that is in firstName in the incoming message.
  • For Batching, each message will be sent separated by a newline (\n) character

Design

{
    "name": "HTTP Sink",
      "plugin": {
        "name": "HTTP",
        "type": "batchsink",
        "label": "HTTP Sink",
        "artifact": {
          "name": "http-sink-plugin",
          "version": "1.6.0",
          "scope": "SYSTEM"
      },
      "properties": {
		  "referenceName": "HTTP Sink Plugin",
          "url": "http://example.com/data",
          "method": "POST",
		  "batchSize": "1",
          "messageFormat": "JSON",
          "body": "{"text" : "Hello Slack"}",
		  "delimiterForMessages": "\n",
		  "chatset": "UTF-8",
          "followRedirects": "true",
          "disableSSLValidation": "true",
          "numRetries": 3,
          "connectTimeout": 60000,
          "readTimeout": 60000,
		  "failOnNon200Response": "true"
      }
}

 

Approach(s)

1.JSON would be default message format.

2.If batchsize > 1, delimiterForMessages would be used to create batch message.

3.If header doesnot contain "Content-Type",then the value would be set as per the message format configured.

4.In case user wants to have Custom message and want to use some of the input record fields as variables to build the message,then user needs to put the variable in the message having # as prefix  so that the same would be replaced by the value from the input record.

5.Multiple messages built for batch would be sent as a single payload to the http end point for every batch http execution.

Properties

url: The URL to post data to.
method: The HTTP request method. Defaults to POST.
bathSize: Batch size. Defaults to 1.
delimiterForMessages: Delimiter for messages in case of batching > 1. Defaults to "\n".
messageFormat: Format to send messsage in. Defaults to JSON.
body: Custom message.
requestHeaders: An optional string of header values to send in each request where the keys and values are
delimited by a colon (":") and each pair is delimited by a newline ("\n").
charset: Charset. Defaults to UTF-8.
followRedirects: Whether to automatically follow redirects. Defaults to true.
disableSSLValidation: If user enables SSL validation, they will be expected to add the certificate to the trustStore on each machine. Defaults to true.
numRetries: The number of times the request should be retried if the request fails. Defaults to 3.
connectTimeout: The time in milliseconds to wait for a connection. Set to 0 for infinite. Defaults to 60000 (1 minute).
readTimeout: The time in milliseconds to wait for a read. Set to 0 for infinite. Defaults to 60000 (1 minute).

failOnNon200Response: Whether to fail the pipeline on non-200 response from the http end point. Defaults to true.

 

NFR

1.If user enables SSL validation, they will be expected to add the certificate to the truststore of each machine.

Limitation(s)

Future Work

  • Some future work – HYDRATOR-99999
  • Another future work – HYDRATOR-99999

Test Case(s)

1.Send slack message with Custom format
2.Send slack message with JSON format
3.Batch(1*10000) put request
4.Batch(2*5000) put request
5.Post custom message
6.Put form batch message
7.Non-200 response

Sample Pipeline

Slack_Custom_Multi_batch1.json

Slack_JSON.json

PUT_10000Batch_Custom.json

PUT_2_5000_batch_Custom.json

POST_Custom.json

PUT_Form_batch.json

Non-200-reponse.json

Table of Contents

Checklist

  • User stories documented 
  • User stories reviewed 
  • Design documented 
  • Design reviewed 
  • Feature merged 
  • Examples and guides 
  • Integration tests 
  • Documentation for feature 
  • Short video demonstrating the feature

Created in 2020 by Google Inc.