HTTP Streaming Source

The HTTP Streaming source plugin is available in the Hub.

Plugin version: 1.3.0

This plugin reads data from HTTP/HTTPS pages periodically. Paginated APIs are supported. For paginated APIs plugin reads available data and then waits for new pages to appear. Data in JSON, XML, CSV, TSV, TEXT and BLOB formats is supported.

Configuration

Property

Macro Enabled?

Description

Property

Macro Enabled?

Description

General

 

 

Reference Name

No

Required. Name used to uniquely identify this source for lineage, annotating metadata, etc.

URL

Yes

Required. Url to fetch to the first page. The url must start with a protocol (e.g. http://).

HTTP Method

Yes

Required. HTTP request method.

Headers

Yes

Optional. Headers to send with each HTTP request.

Request Body

Yes

Optional. Body to send with each HTTP request.

Max Pages Per Fetch

Yes

Optional. Maximum number of pages put to RDD in one blocking reading. Empty value means that the maximum is not enforced.

Format

 

 

Format

Yes

Required. Format of the HTTP response. This determines how the response is converted into output records. Possible values are:

  • JSON. Retrieves all records from the given json path and transforms them into records according to the mapping.

  • XML. Retrieves all records from the given XPath and transforms them into records according to the mapping.

  • TSV. Tab separated values. Columns are mapped to record fields in the order they are listed in schema.

  • CSV. Comma separated values. Columns are mapped to record fields in the order they are listed in schema.

  • Text. Transforms a single line of text into a single record with a string field body containing the result.

  • BLOB. Transforms the entire response into a single record with a byte array field body containing the result.

Default is json.

JSON/XML Result Path

Yes

Optional. Path to the results. When the format is XML, this is an XPath. When the format is JSON, this is a JSON path.

For examples, see below.

JSON/XML Fields Mapping

Yes

Optional. Mapping of fields in a record to fields in retrieved element. The left column contains the name of schema field. The right column contains path to it within a relative to an element. It can be either XPath or JSON path.

For an example, see below.

CSV Skip First Row

Yes

Optional. Whether to skip the first row of the HTTP response. This is usually set if the first row is a header row.

Default is false.

OAuth2

 

 

OAuth2 Enabled

No

Required. If true, plugin will perform OAuth2 authentication.

Default is False.

Auth URL

Yes

Optional. Endpoint for the authorization server used to retrieve the authorization code.

Token URL

Yes

Optional. Endpoint for the resource server, which exchanges the authorization code for an access token.

Client ID

Yes

Optional. Client identifier obtained during the Application registration process.

Client Secret

Yes

Optional. Client secret obtained during the Application registration process.

Scopes

Yes

Optional. Scope of the access request, which might have multiple space-separated values.

Refresh Token

Yes

Optional. Token used to receive accessToken, which is end product of OAuth2.

HTTP Proxy

 

 

Proxy URL

Yes

Optional.

Error Handling

 

 

HTTP Errors Handling

No

Optional. Defines the error handling strategy to use for certain HTTP response codes. The left column contains a regular expression for HTTP status code. The right column contains an action which is done in case of match. If HTTP status code matches multiple regular expressions, the first specified in mapping is matched.

For an example, see below.

Non-HTTP Error Handling

No

Required. Error handling strategy to use when the HTTP response cannot be transformed to an output record. Possible values are:

  • Stop on error. Fails pipeline due to erroneous record.

  • Skip on error. Ignores erroneous records.

Default is Stop on error.

Retry Policy

No

Required. Policy used to calculate delay between retries.

Default is Exponential.

Linear Retry Interval

Yes

Optional. Interval between retries. Is only used if retry policy is “linear”.

Default is 30.

Max Retry Duration

Yes

Required. Maximum time in seconds retries can take.

Default is 600.

Connect Timeout

Yes

Required. Maximum time in seconds connection initialization is allowed to take.

Default is 120.

Read Timeout

Yes

Required. Maximum time in seconds fetching data from the server is allowed to take.

Default is 120.

Pagination

 

When there is pagination, the plugin reads pages in order, keeping track of which pages have already been read. It will not reprocess any page it has already read. When there is no pagination, it periodically polls the same page.

Pagination Type

No

Required. Strategy used to determine how to get next page. You can select one of the following pagination types:

  • None.

  • Link in response header.

  • Link in response body.

  • Token in response body.

  • Increment an Index.

  • Custom.

Default is None.

Pagination Type: None

No

Only single page is loaded.

Wait Time Between Pages (milliseconds)

Yes

Optional. Time in milliseconds to wait between HTTP requests for the next page.

Default is 0.

Pagination Type: Link in response header

No

In response there is a “Link” header, which contains a url marked as “next”. Example:

; rel="first", ; rel="next", ; rel="last"`

Pagination Type: Link in response body

No

Every page contains a next page url. This pagination type is only supported for JSON and XML formats. Pagination happens until no next page field is present or until page contains no elements.

Next Page JSON/XML Field Path

Yes

Optional. A JSON path or an XPath to a field which contains next page url. It can be either relative or absolute url.

Example page response:

{ "results": [ ... ] "_links": { "self": "https://confluence.atlassian.com/rest/api/space/ADMINJIRASERVER0710/content/page", "next": "/rest/api/space/ADMINJIRASERVER0710/content/page?limit=100&start=100", "base": "https://confluence.atlassian.com", "context": "" } }

Next page field path is _links/next.

Pagination Type: Token in response body

No

Every page contains a token, which is appended as a url parameter to obtain next page. This type of pagination is only supported for JSON and XML formats. Pagination happens until no next page token is present on the page or until page contains no elements.

Next Page Token Path

Yes

A JSON path or an XPath to a field which contains next page token.

Next Page Url Parameter

Yes

A parameter which is appended to url in order to specify next page token.

Example plugin config:

{ "url": "https://www.googleapis.com/youtube/v3/search?part=snippet&maxResults=20&q=cask+cdap", "resultPath": "/items" "paginationType": "Token in response body", "nextPageTokenPath": "/nextPageToken", "nextPageUrlParameter": "pageToken" }

First page response:

Next page fetched by plugin will be url with &pageToken=CAEQAA appended.

Pagination Type: Increment an Index

No

Pagination by incrementing a {pagination.index} placeholder value in url. For this pagination type url is required to contain above placeholder.

Start Index

Yes

Start value of {pagination.index} placeholder.

Max Index

Yes

Maximum value of {pagination.index} placeholder. If empty, pagination will happen until the page with no elements.

Index Increment

Yes

A value which the {pagination.index} placeholder is incremented by. Increment can be negative.

Pagination Type: Custom

No

Pagination using user provided code. The code decides how to retrieve a next page url based on previous page contents and headers and when to finish pagination.

Custom Pagination Python Code

No

A code which implements retrieving a next page url based on previous page contents and headers.

Example code:

The above code iterates over first five pages of searchcode.com results. When ‘None’ is returned the iteration is stopped.

SSL/TSL

 

 

Verify HTTPS Trust Certificates

Yes

Required. If false, untrusted trust certificates (e.g. self signed), will not lead to an error. Do not disable this in production environment on a network you do not entirely trust. Especially public internet.

Default is True.

Keystore File

Yes

Optional. A path to a file which contains keystore.

Keystore Type

Yes

Optional. Format of a keystore.

Default is Java KeyStore (JKS).

Keystore Password

Yes

Optional. Password for a keystore. If a keystore is not password protected leave it empty.

Keystore Key Algorithm

Yes

Optional. An algorithm used for keystore.

Default is SUNX509.

TrustStore File

Yes

Optional. A path to a file which contains truststore.

TrustStore Type

Yes

Optional. Format of a truststore.

Default is Java KeyStore (JKS).

TrustStore Password

Yes

Optional. Password for a truststore. If a truststore is not password protected leave it empty.

TrustStore Key Algorithm

Yes

Optional. An algorithm used for truststore.

Default is SUNX509.

Transport Protocols

Yes

Optional. Transport protocols which are allowed for connection.

Default is TLSv1.2.

Cipher Suites

Yes

Optional. Cipher suites which are allowed for connection. Colons, commas or spaces are also acceptable separators.

JSON/XML Result Path Examples

JSON path example:

The JSON path to fetch books is /response/books. However, if we need to fetch only printInfo, we can specify /response/books/printInfo as well.

XPath example:

XPath to fetch all books is /bookstores/bookstore/book. However a more precise selections can be done. E.g. /bookstores/bookstore/book[@category='web'].

XPath to fetch all books is /bookstores/bookstore/book. However a more precise selections can be done. E.g. /bookstores/bookstore/book[@category='web'].

JSON/XML Fields Mapping Example

Example response:

Assume the result path is /issues.

The mapping is:

Field Name

Field Path

Field Name

Field Path

type

/fields/issuetype/name

description

/fields/description

projectCategory

/fields/project/projectCategory/name

isSubtask

/fields/issuetype/subtask

fixVersions

/fields/fixVersions

The result records are:

key

type

isSubtask

description

projectCategory

fixVersions

key

type

isSubtask

description

projectCategory

fixVersions

NETTY-14

Bug

false

Test description for NETTY-14

Infrastructure

[“4.1.37”]

NETTY-13

Improvement

false

Test description for NETTY-13

Infrastructure

[]

Note that field key was mapped without being included into the mapping. Mapping entries like key: /key can be omitted as long as the field is present in schema.

HTTP Errors Handling Example

HTTP Code Regexp

Error Handling

HTTP Code Regexp

Error Handling

2..

Success

401

Retry and fail

4..

Fail

5..

Retry and skip

.*

Fail

Note: Pagination types “Link in response header”, “Link in response body”, and “Token in response body” do not support “Skip” and “Retry and skip” options.

Created in 2020 by Google Inc.