HTTP Batch Source

Table of Contents

General 

As a framework Apache HttpComponents HttpClient is be used, a successor of Commons HttpClient.

It seems the most widely used/supported by community framework. It is very simple to find all kind of solutions and workaround already implemented, which makes plugin development and maintenance easy. Framework has a built in support for compession, https tunneling, digest auth and lot of other functions.

Properties:

SectionNameDescriptionDefaultWidgetValidations
General




URL

The url we will request. "{pagination.index}" can be included into url to represent a changing part needed for some pagination strategies.

E.g:

https://my.api.com/api/v1/user?maxResults=10&name=John&pageNumber={pagination_index}


Text BoxValidate it contains protocol.
HTTP Method

Possible values:

  • GET
  • PUT
  • POST
  • DELETE
  • HEAD
GETRadio group
HeadersKey-value map of headers
KeyValue
Request Body

Text AreaNo validation [1]
Error Handling

HTTP Errors Handling

This is a map in which user can define which error status codes produce which results. Possible values are: RETRY, FAIL, SKIP, SEND_TO_ERROR, ALERT

Example:

500: RETRY

404: SEND_TO_ERROR

*: FAIL

Wildcard (*) means "otherwise" or "for all other codes do ..".


If the field is empty. Any status_code>=400 will yield a pipeline failure.


KeyValue Dropdown

If using SEND_TO_ERROR or SKIP or SEND_TO_ALERTS and current pagination type does not support it throw a validation error. [2]

Non-HTTP Error Handling

Handling of type casting and any other unhandled exceptions thrown during transformation of a record:

Possible values are:

  • "Skip on error" - ignores any errors
  • "Stop on error" - fails pipeline
  • "Send to error" - send to error handler
Stop on errorDropdown listIf using "Send to error" or "Skip on error" and current pagination type does not support it throw a validation error. [2]
Retry Policy

Possible values are:

    • Exponential
    • Linear

Exponential

Radio group
Linear Retry IntervalThe interval between retries (seconds)30Number
  • if not set and retryPolicy is linear, fail.
Max retry durationMax seconds it takes to do retries600Number
Connect Timeout

Maximum seconds to connect to server. (seconds)

0 - wait forever

120Number
Read Timeout

Maximum seconds to wait for data. (seconds)

0 - wait forever

120Number
Basic authenticationUsernameUsed for basic authentication.
Text Box
PasswordUsed for basic authentication.
Password
HTTP Proxy:

Proxy URL

Example: http://proxy.com:8080

Note for me: test this with https proxies.


Text Box
Username

Text Box
Password

Password


[1] Unfortunately we cannot do validation here. Even though most commonly body in API requests is a JSON for JSON APIs or an XML for XML SOAP APIs. Theoretically it can be anything.

[2] Pagination types, where next page url is on previous taken from the previous page, are the one which do not support SEND_TO_ERROR or IGNORE.

Parallelization

There are two reasons why we should not parallelize the requests:

  1. We don't want to DDOS the API we are using and get a flood IP ban.
  2. When running requests on paginated API, often APIs will return an url to next page in previous one, or in response header. The process is subsequent and cannot be parallelized.

Not parallelizing will be achieved by creating a single inputSplit for RecordReader. Also to avoid DDOSing the API, we should add a configurable interval between requests for paginated requests.

Pagination

NameDescriptionDefaultWidgetValidations

Pagination type

Possible values are:

  • None
  • Link in response header
  • Link in response body
  • Increment an index
  • Token in Response Body
  • Custom
NoneSelect
  • "Link in response body": Next Page field is set
  • "Token in Response Body": "Next Page Token Path" and "Next Page Url Parameter" are set.
  • "Increment an index": {pagination.index} is in url, start index, increment are set.
  • Custom: python code is set.
Start IndexInitial value for index which replaces {pagination.index} in url. See example here
Text Box
  • If set and pagination type is not "Increment an index", fail.
  • If set and no {pagination.index} in url, fail.
  • Assert if is_number
Max Index

Max value for index which replaces {pagination.index} in url.

If this is empty, plugin will load pages until no results or 404 is returned.

Also plugin may stop iteration before max index is reached, if no more records.


Text Box
  • If set and pagination type is not "Increment an index", fail.
  • If set and no {pagination.index} in url, fail.
  • Assert if is_number
Index IncrementIncrement value for index which replaces {pagination.index} in url.
Text Box
  • If set and pagination type is not "Increment an index", fail.
  • If set and no {pagination.index} in url, fail.
  • Assert if is_number
Next Page JSON/XML Field Path

Link to a field in JSON or an XML containing next page url. See an example here


Text Box
  • If set and pagination type is not "Link in response body", fail.
  • If the content type is not XML or JSON, fail.
Next Page Token PathLink to a field in JSON or an XML containing next page token.

  • If set and pagination type is not "Token in Response Body", fail.
  • If the content type is not XML or JSON, fail.
  • Validate to have at least one element
Next Page Url ParameterFor type "Token in Response Body" this is used as next page token name in added to url
Text Box
  • If set and pagination type is not "Token in Response Body", fail.
Custom Pagination Python CodeA code fragment which determines how next page url is generated and also when to finish iteration. For more info see Custom Pagination
Python codeIf set and pagination type is not "Custom" fail.
Wait time between pagesThe number of milliseconds to wait before requesting the next page.1000Number
  • Assert if is_number and > 0.
  • If not set and Pagination type is non 'None' fail.

The above is a bit messy cause we cannot dynamically change the content of widget depending on pagination type. Which makes it a mix of properties for different pag_types. Is not super user-friendly for end-user. For now I will a placeholder which says which pagination type property coresponds to.

Pagination type is none

Plugin will request a single page.

Pagination via url from response header

When accessing the page the response header contains a link to next page:

Link: <http://helloworld.voog.co/admin/api/pages?page=1&q.language.id=1>; rel="first",
<http://helloworld.voog.co/admin/api/pages?page=2&q.language.id=1>; rel="next", # <-------- HERE IT IS
<http://helloworld.voog.co/admin/api/pages?page=2&q.language.id=1>; rel="last"

When there is no next page in header, iteration is finished.

Pagination via next page url in response body

This example is from Confluence API. When querying  https://wiki.cask.co/rest/api/space/TEPHRA/content/page?limit=5&start=5

{
    "_links": {
        "base": "https://wiki.cask.co",
        "context": "",
        "next": "/rest/api/space/TEPHRA/content/page?limit=5&start=10",
        "prev": "/rest/api/space/TEPHRA/content/page?limit=5&start=0",
        "self": "https://wiki.cask.co/rest/api/space/TEPHRA/content/page?limit=5&start=5"
    },
    ... actual data ...
}

User will need to set "Next Page Link" to /_links/next to get the pagination working.

Pagination by page number or offset

User must add a placeholder {pagination.index}  to URL. How it is replaced is controlled by three properties:

  • Start Index
  • Max Index
  • Index Increment

WordPress API has this kind of pagination. Here's widget configurations example for it:

url = "http://localhost/wp/v2/posts?page={pagination_index}"
start_index = "1"
max_index = ""
index_increment = "1"

Another example:

url = "http://localhost:4532/api/v3/transactions?start_time={pagination_index}"
start_index = "1389075585"
max_index = ""
index_increment = "10000000"

The plugin stops reading when a page returns no records or 404. Or when reached max_index (if it's not empty) 

Pagination by next page token

Here's an example of pagination from youtube API. NextPageToken field contains a token, which should be included in url to get next page. "&page_token=CAEQAA"

{
 ...
 "nextPageToken": "CAEQAA", # <---------- HERE IT IS
 "pageInfo": {
  "totalResults": 208,
  "resultsPerPage": 1
 },
 "items": [
  {
   ...
   }
  }
 ]
}

Here's how the links chain looks like:

${url}
${url}&nextPageToken=${nextPageToken1}
${url}&nextPageToken=${nextPageToken2}
...


Custom pagination

Different APIs use very different styles of pagination. In the simple cases they return link in header or some field of response JSON.

  • For example API where user wants to paginate by time in the following way: &start_time={something}&end_time={something+10000}. Two dependent variables are involved here. It would be very problematic to give ability to configure something like this via widget.  
  • Let's images another case. User wants to download a webserver directory. So "pages" in this case are files on webserver. Let's say he analyses/backups a whole site. So we need to paginate based on results from parsing HTML.
  • Let's assume another example. User wants to skip certain pages in API. Let's say the API pagination is time based, meaning something like this is appended to url "&start_time=1389075585". But he only wants to get pages for the weekends.

I would suggest adding a property with Python code which would define how the pagination is done and when the iteration should finish.

context.pages_proccessed = 0

def get_next_page_url(url, page, headers):
  context.next_page = url + '&nextPageToken=' + page['nextPageToken']
  context.pages_proccessed += 1
  context.should_finish = (context.pages_proccessed > 1_000_000)

OR

context.start_time = 1389075585

def get_next_page_url(url, page, headers):
  context.start_time += 10000
  end_time = context.start_time + 10000
  context.next_page = url + '&start_time=' + str(start_time) + '&end_time=' + str(end_time)

For this Jython is used, so user does not need to have Python installed. "Context" object is a java object exposes to Python.

Transforming API responses into Records

No automatic schema generation is implemented. Since we don't know the value types.

Properties:

SectionNameDescriptionDefaultWidgetValidations
Format




Format

Possible values:

  • JSON
  • XML
  • TSV
  • CSV
  • Text
  • Blob

Dropdown list
JSON/XML Result Path

For JSON a simple slash separated path is used e.g. /library/books/items.

For XML an XPath is used.


Text BoxFail if used with non JSON/XML format
JSON/XML Fields Mapping

Mapping of schema field name to jsonPath (past the result path).

Example (Jira API):

FieldNameFieldPath
name/key
type/fields/issuetype/name
description/fields/description
projectCategory/fields/project/projectCategory/name
isSubtask/fields/issuetype/subtask
fixVersions/fields/fixVersions

Schema fields which are not in the map, will use fieldName:/fieldName mapping.



  • if key is not present in schema fail
  • if used for non JSON/XML fail

1 JSON format

JSON entries are converted into StructuredRecord using StructuredRecordStringConverter.java

To specify where we should take record from user needs to specify JSON Result Path.

Example:

{
 "pageInfo": {
  "totalResults": 208,
  "resultsPerPage": 2
 },
 "items": [
  {
   "kind": "youtube#searchResult",
   "etag": "\"Bdx4f4ps3xCOOo1WZ91nTLkRZ_c/yrJNwvacPS7tA7BQCQmeIZr7fg8\"",
   "id": {
    "kind": "youtube#channel",
    "channelId": "UCfkRcekMTa5GA2DdNKba7Jg"
   },
   "snippet": {
    "publishedAt": "2015-02-12T22:12:43.000Z",
    "channelId": "UCfkRcekMTa5GA2DdNKba7Jg",
    "title": "Cask",
    "description": "Founded by developers for developers, Cask is an open source big data software company focused on developers. Cask's flagship offering, the Cask Data ...",
    "thumbnails": {
     ...
    },
    "channelTitle": "Cask",
    "liveBroadcastContent": "upcoming"
   }
  },
  {
   "kind": "youtube#searchResult",
   "etag": "\"Bdx4f4ps3xCOOo1WZ91nTLkRZ_c/uv6u8PSG0DsOqN9m77o06Jl4LnA\"",
   "id": {
    "kind": "youtube#video",
    "videoId": "ntOXeYecj7o"
   },
   "snippet": {
    "publishedAt": "2016-12-21T19:32:03.000Z",
    "channelId": "UCfkRcekMTa5GA2DdNKba7Jg",
    "title": "Cask Product Tour",
    "description": "In this video, we take you on a product tour of CDAP, CDAP extensions and the Cask Market. Cask Data Application Platform (CDAP) is the first Unified ...",
    "thumbnails": {
     ...
    },
    "channelTitle": "Cask",
    "liveBroadcastContent": "none"
   }
  }
 ]
}


For above API response user can specify Result Path to be /items/snippet. In this case CDAP will take all elements in the closest list which is /items and than take snippet dictionary out of every of them and generate record with the following fields: "publishedAt": "2015-02-12T22:12:43.000Z","channelId": "UCfkRcekMTa5GA2DdNKba7Jg","title": "Cask", etc.

Another example:

{
    "store": {
        "book": [
            {
                "category": "reference",
                "author": "Nigel Rees",
                "title": "Sayings of the Century",
                "price": 8.95
            },
            {
                "category": "fiction",
                "author": "Evelyn Waugh",
                "title": "Sword of Honour",
                "price": 12.99
            },
            ...
        ],
        "bicycle": {
            "color": "red",
            "price": 19.95
        }
    },
}

Here user can specify result path to be /store/book. Which will produce records with fields like: "category": "reference","author": "Nigel Rees".

Handling unknown fields:

  • Fields which are not present in schema get skipped.
  • Schema fields, which are not present in JSON response, are set to null.
  • If no fields from schema are found an exception is thrown. Which is handled according to "Non-HTTP Error Handling" property value.

2 XML format

We may add functionality for XML parsing to separate project so other projects can re-use that.

XML below will be used as basis for examples in this section.

<?xml version="1.0" encoding="UTF-8"?>
<bookstores>
   <bookstore id="1">
      <book category="cooking">
         <title lang="en">Everyday Italian</title>
         <author>Giada De Laurentiis</author>
         <year>2005</year>
         <price>
          <value>15.0</value>
          <policy>Discount up to 50%</policy>
         </price>
      </book>
      <book category="web">
         <title lang="en">XQuery Kick Start</title>
         <author>James McGovern</author>
         <author>Per Bothner</author>
         <year>2003</year>
         <price>
          <value>49.99</value>
          <policy>No discount</policy>
         </price>
      </book>
	  ...
   </bookstore>
   <bookstore id="2">
      ...
   </bookstore>
</bookstores>

Parsing XML includes below steps:

  1. Get nodes from XML by result XPath (provided by user). For example /bookstores/bookstore/book[@category='web'].
  2. Convert resulting nodes to JSON (using org.json lib).
  3. Generate StructuredRecords from JSON.

2.1 STEP 1 - Get XML by XPath

XML parsing is done by default Java DOM parser. Which is able to get items by a specified XPath. XPath is super flexible it allows user to get nodes by attribute value, as well as to group nodes from different parents into single result, as well as chose nodes conditionally etc. etc. 

Some XPath examples:

/bookstores/bookstore/book[position()<3]
//title[@lang]
//title[@lang='en']
/bookstores/bookstore/book/price[text()] # convert all subelements to string
/bookstores/bookstore/book[price>35.00]/title

2.2 STEP 2 - Convert XML to JSON

Converting XML to StructuredRecord is non-trivial, we should:

  • not lose information like attributes (id, category, lang from example)
  • handle name collisions between tags. XML can have multiple tags with the same name in the same node.
  • handle name collisions between tags and attributes, when tags are converted to record fields.

Org.json library can convert XML to JSON, which is just what we need. Below is a conversion result obtained by using org.json library.

{"bookstores": {"bookstore": [
    {
        "book": [
            {
                "year": 2005,
                "author": "Giada De Laurentiis",
                "price": {
                    "value": 15,
                    "policy": "Discount up to 50%"
                },
                "category": "cooking",
                "title": {
                    "lang": "en",
                    "content": "Everyday Italian"
                }
            },
            {
                "year": 2003,
                "author": [
                    "James McGovern",
                    "Per Bothner"
                ],
                "price": {
                    "value": 49.99,
                    "policy": "No discount"
                },
                "category": "web",
                "title": {
                    "lang": "en",
                    "content": "XQuery Kick Start"
                }
            },
            ...
        ],
        "id": 1
    },
    {
        ...
        "id": 2
    }
]}}

On a side note: look at author fields, they are of different type in the above JSON. This will be handled. If schema has field type = union, and there is a value not a list in place, we consider it as a list with a single element.

2.3 STEP 3 - Generate StructuredRecord from JSON

Converting JSON into Structured?ecord is a simple task. We do this via StructuredRecordStringConverter.java. Example:

{
    "year": 2003,
    "author": [
        "James McGovern",
        "Per Bothner"
    ],
    "price": {
        "value": 49.99,
        "policy": "No discount"
    },
    "category": "web",
    "title": {
        "lang": "en",
        "content": "XQuery Kick Start"
    }
}

will yield records compatible with below schema:

year: string
author: array
price: record
   - value:double
   - policy:string
category: string
title: record
   - lang:string
   - content:string

3 Delimited format

Will use the functionality from cdap-formats/DelimitedStringsRecordFormat.java to validate schema and convert input TCS/CSV to StructuredRecord.  The class supports a columns mapping as last field of schema. 

4 Text format

Will use the functionality from cdap-formats/TextRecordFormat.java  to validate schema and convert input text to StructuredRecord. 

OAuth2

Moved design information into a separate doc: Plugin OAuth2 Common Module

Properties:

NameDescriptionDefaultWidgetValidations
OAuth2 EnabledTrue or false.falseRadio group
Auth URL

A page, where the user is directed to enter his credentials.

Example: https://www.facebook.com/dialog/oauth



Text BoxAssert to be empty if OAuth2 is disabled and the not empty if enabled.
Token URL

A page, where CDAP can exchange authCode for accessToken and refreshToken. Or refresh the accessToken.

Example: https://graph.facebook.com/v3.3/oauth/access_token


Text BoxAssert to be empty if OAuth2 is disabled and the not empty if enabled.
Client IDUser should obtain this when registering the OAuth2 application in the service (e.g. Twitter).
Text BoxAssert to be empty if OAuth2 is disabled and the not empty if enabled.
Client Secret

User should obtain this when registering the OAuth2 application in the service (e.g. Twitter).


PasswordAssert to be empty if OAuth2 is disabled and the not empty if enabled.
Scope

This is optional.

Scope is a mechanism in OAuth 2.0 to limit an application's access to a user's account. An application can request one or more scopes, this information is then presented to the user in the consent screen, and the access token issued to the application will be limited to the scopes granted.


Text BoxAssert to be empty if OAuth2 is disabled.
Refresh Token

This is populated by the button "Login via OAuth 2.0". Since we save Refresh Token (not an access token which is short lived), this should be done only once, during initial pipeline deployment. For more information click here.

UI should put an actual value into secure store and put macro function ${secure(key)} a value for extra safety.



Fail is empty and OAuth2 is enabled.

SSL/TLS

Some general definitions for more context:

KeyStore file - contains a private key of client (in our case), which will be used for client-server SSL communication.

TrustStore file - stores certificate chain from trusted CAs. This allows to validate server's identity during SSL handshake.

Other things which are customized are Cipher Suites and TLS version. These things are negotiated between server and client during SSL handshake. By providing these we can set what the client sends as "available" for usage, than server will choose from those.

Should we provide an option for user to skip identity check during HTTPs connection? This is not recommended anywhere you read about it, but it might be useful in case user is testing some API which is in development stage.

NameDescriptionDefaultWidgetValidations
Verify HTTPs Trust CertificatesIf false will allow connection to untrusted https sources.true

Keystore FilePath to a keystore file
Text BoxCheck if file exists
Keystore Type

According to Oracle docs. There are 3 supported keystore types.

Possible values:

  • Java KeyStore (JKS)
  • Java Cryptography Extension KeyStore (JCEKS)
  • PKCS #12
JKSRadio Group
Keystore PasswordLeave empty if keystore is not password protected
PasswordTry to load keystore with given password
Keystore Key AlgorithmSunX509 is default in Java.SunX509Text Box
TrustStore FilePath to a truststore file. If empty use default Java truststores.
Text BoxCheck if file exists
TrustStore Type

According to Oracle docs. There are 3 supported truststore types.

Possible values:

  • Java KeyStore (JKS)
  • Java Cryptography Extension KeyStore (JCEKS)
  • PKCS #12
JKSRadio Group
TrustStore PasswordLeave empty if keystore is not password protected
PasswordTry to load truststore with given password
Truststore Trust Algorithm
SunX509Text Box
Transport ProtocolsUser can add multiple protocols. Which will be offered by client during handshake.TLSv1.2ArrayValidate if names are correct
Cipher Suites

User can add multiple cipher suites. They will be offered by client during handshake.

If empty use default cipher suites.

This is textBox with comma separated list of ciphers. Since sometimes there can be 20, 30 or more ciphers it is not usable for user to add every one of them manually into an array.


Text Box

Validate if supported by current java implementation



Created in 2020 by Google Inc.