HTTP Batch Source
Table of Contents
General
As a framework Apache HttpComponents HttpClient is be used, a successor of Commons HttpClient.
It seems the most widely used/supported by community framework. It is very simple to find all kind of solutions and workaround already implemented, which makes plugin development and maintenance easy. Framework has a built in support for compession, https tunneling, digest auth and lot of other functions.
Properties:
Section | Name | Description | Default | Widget | Validations |
|---|---|---|---|---|---|
General | URL | The url we will request. "{pagination.index}" can be included into url to represent a changing part needed for some pagination strategies. E.g: https://my.api.com/api/v1/user?maxResults=10&name=John&pageNumber={pagination_index} | Text Box | Validate it contains protocol. | |
HTTP Method | Possible values:
| GET | Radio group | ||
Headers | Key-value map of headers | KeyValue | |||
Request Body | Text Area | No validation [1] | |||
Error Handling | HTTP Errors Handling | This is a map in which user can define which error status codes produce which results. Possible values are: RETRY, FAIL, SKIP, SEND_TO_ERROR, ALERT Example: 500: RETRY 404: SEND_TO_ERROR *: FAIL Wildcard (*) means "otherwise" or "for all other codes do ..". If the field is empty. Any status_code>=400 will yield a pipeline failure. | KeyValue Dropdown | If using SEND_TO_ERROR or SKIP or SEND_TO_ALERTS and current pagination type does not support it throw a validation error. [2] | |
Non-HTTP Error Handling | Handling of type casting and any other unhandled exceptions thrown during transformation of a record: Possible values are:
| Stop on error | Dropdown list | If using "Send to error" or "Skip on error" and current pagination type does not support it throw a validation error. [2] | |
Retry Policy | Possible values are: | Exponential | Radio group | ||
Linear Retry Interval | The interval between retries (seconds) | 30 | Number |
| |
Max retry duration | Max seconds it takes to do retries | 600 | Number | ||
Connect Timeout | Maximum seconds to connect to server. (seconds) 0 - wait forever | 120 | Number | ||
Read Timeout | Maximum seconds to wait for data. (seconds) 0 - wait forever | 120 | Number | ||
Basic authentication | Username | Used for basic authentication. | Text Box | ||
Password | Used for basic authentication. | Password | |||
HTTP Proxy: | Proxy URL | Example: http://proxy.com:8080 Note for me: test this with https proxies. | Text Box | ||
Username | Text Box | ||||
Password | Password |
[1] Unfortunately we cannot do validation here. Even though most commonly body in API requests is a JSON for JSON APIs or an XML for XML SOAP APIs. Theoretically it can be anything.
[2] Pagination types, where next page url is on previous taken from the previous page, are the one which do not support SEND_TO_ERROR or IGNORE.
Parallelization
There are two reasons why we should not parallelize the requests:
We don't want to DDOS the API we are using and get a flood IP ban.
When running requests on paginated API, often APIs will return an url to next page in previous one, or in response header. The process is subsequent and cannot be parallelized.
Not parallelizing will be achieved by creating a single inputSplit for RecordReader. Also to avoid DDOSing the API, we should add a configurable interval between requests for paginated requests.
Pagination
Name | Description | Default | Widget | Validations |
|---|---|---|---|---|
Pagination type | Possible values are:
| None | Select |
|
Start Index | Initial value for index which replaces {pagination.index} in url. See example here | Text Box |
| |
Max Index | Max value for index which replaces {pagination.index} in url. If this is empty, plugin will load pages until no results or 404 is returned. Also plugin may stop iteration before max index is reached, if no more records. | Text Box |
| |
Index Increment | Increment value for index which replaces {pagination.index} in url. | Text Box |
| |
Next Page JSON/XML Field Path | Link to a field in JSON or an XML containing next page url. See an example here | Text Box |
| |
Next Page Token Path | Link to a field in JSON or an XML containing next page token. |
| ||
Next Page Url Parameter | For type "Token in Response Body" this is used as next page token name in added to url | Text Box |
| |
Custom Pagination Python Code | A code fragment which determines how next page url is generated and also when to finish iteration. For more info see Custom Pagination | Python code | If set and pagination type is not "Custom" fail. | |
Wait time between pages | The number of milliseconds to wait before requesting the next page. | 1000 | Number |
|
The above is a bit messy cause we cannot dynamically change the content of widget depending on pagination type. Which makes it a mix of properties for different pag_types. Is not super user-friendly for end-user. For now I will a placeholder which says which pagination type property coresponds to.
Pagination type is none
Plugin will request a single page.
Pagination via url from response header
When accessing the page the response header contains a link to next page:
Link: <http://helloworld.voog.co/admin/api/pages?page=1&q.language.id=1>; rel="first",
<http://helloworld.voog.co/admin/api/pages?page=2&q.language.id=1>; rel="next", # <-------- HERE IT IS
<http://helloworld.voog.co/admin/api/pages?page=2&q.language.id=1>; rel="last"When there is no next page in header, iteration is finished.
Pagination via next page url in response body
This example is from Confluence API. When querying https://wiki.cask.co/rest/api/space/TEPHRA/content/page?limit=5&start=5
{
"_links": {
"base": "https://wiki.cask.co",
"context": "",
"next": "/rest/api/space/TEPHRA/content/page?limit=5&start=10",
"prev": "/rest/api/space/TEPHRA/content/page?limit=5&start=0",
"self": "https://wiki.cask.co/rest/api/space/TEPHRA/content/page?limit=5&start=5"
},
... actual data ...
}User will need to set "Next Page Link" to /_links/next to get the pagination working.
Pagination by page number or offset
User must add a placeholder {pagination.index} to URL. How it is replaced is controlled by three properties:
Start Index
Max Index
Index Increment
WordPress API has this kind of pagination. Here's widget configurations example for it:
url = "http://localhost/wp/v2/posts?page={pagination_index}"
start_index = "1"
max_index = ""
index_increment = "1"Another example:
url = "http://localhost:4532/api/v3/transactions?start_time={pagination_index}"
start_index = "1389075585"
max_index = ""
index_increment = "10000000"The plugin stops reading when a page returns no records or 404. Or when reached max_index (if it's not empty)
Pagination by next page token
Here's an example of pagination from youtube API. NextPageToken field contains a token, which should be included in url to get next page. "&page_token=CAEQAA"
{
...
"nextPageToken": "CAEQAA", # <---------- HERE IT IS
"pageInfo": {
"totalResults": 208,
"resultsPerPage": 1
},
"items": [
{
...
}
}
]
}Here's how the links chain looks like:
${url}
${url}&nextPageToken=${nextPageToken1}
${url}&nextPageToken=${nextPageToken2}
...
Custom pagination
Different APIs use very different styles of pagination. In the simple cases they return link in header or some field of response JSON.
For example API where user wants to paginate by time in the following way: &start_time={something}&end_time={something+10000}. Two dependent variables are involved here. It would be very problematic to give ability to configure something like this via widget.
Let's images another case. User wants to download a webserver directory. So "pages" in this case are files on webserver. Let's say he analyses/backups a whole site. So we need to paginate based on results from parsing HTML.
Let's assume another example. User wants to skip certain pages in API. Let's say the API pagination is time based, meaning something like this is appended to url "&start_time=1389075585". But he only wants to get pages for the weekends.
I would suggest adding a property with Python code which would define how the pagination is done and when the iteration should finish.
context.pages_proccessed = 0
def get_next_page_url(url, page, headers):
context.next_page = url + '&nextPageToken=' + page['nextPageToken']
context.pages_proccessed += 1
context.should_finish = (context.pages_proccessed > 1_000_000)OR
context.start_time = 1389075585
def get_next_page_url(url, page, headers):
context.start_time += 10000
end_time = context.start_time + 10000
context.next_page = url + '&start_time=' + str(start_time) + '&end_time=' + str(end_time)For this Jython is used, so user does not need to have Python installed. "Context" object is a java object exposes to Python.
Transforming API responses into Records
No automatic schema generation is implemented. Since we don't know the value types.
Properties:
Section | Name | Description | Default | Widget | Validations | |||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Format | Format | Possible values:
| Dropdown list | |||||||||||||||
JSON/XML Result Path | For JSON a simple slash separated path is used e.g. /library/books/items. For XML an XPath is used. | Text Box | Fail if used with non JSON/XML format | |||||||||||||||
JSON/XML Fields Mapping | Mapping of schema field name to jsonPath (past the result path). Example (Jira API):
Schema fields which are not in the map, will use fieldName:/fieldName mapping. |
|
1 JSON format
JSON entries are converted into StructuredRecord using StructuredRecordStringConverter.java
To specify where we should take record from user needs to specify JSON Result Path.
Example:
{
"pageInfo": {
"totalResults": 208,
"resultsPerPage": 2
},
"items": [
{
"kind": "youtube#searchResult",
"etag": "\"Bdx4f4ps3xCOOo1WZ91nTLkRZ_c/yrJNwvacPS7tA7BQCQmeIZr7fg8\"",
"id": {
"kind": "youtube#channel",
"channelId": "UCfkRcekMTa5GA2DdNKba7Jg"
},
"snippet": {
"publishedAt": "2015-02-12T22:12:43.000Z",
"channelId": "UCfkRcekMTa5GA2DdNKba7Jg",
"title": "Cask",
"description": "Founded by developers for developers, Cask is an open source big data software company focused on developers. Cask's flagship offering, the Cask Data ...",
"thumbnails": {
...
},
"channelTitle": "Cask",
"liveBroadcastContent": "upcoming"
}
},
{
"kind": "youtube#searchResult",
"etag": "\"Bdx4f4ps3xCOOo1WZ91nTLkRZ_c/uv6u8PSG0DsOqN9m77o06Jl4LnA\"",
"id": {
"kind": "youtube#video",
"videoId": "ntOXeYecj7o"
},
"snippet": {
"publishedAt": "2016-12-21T19:32:03.000Z",
"channelId": "UCfkRcekMTa5GA2DdNKba7Jg",
"title": "Cask Product Tour",
"description": "In this video, we take you on a product tour of CDAP, CDAP extensions and the Cask Market. Cask Data Application Platform (CDAP) is the first Unified ...",
"thumbnails": {
...
},
"channelTitle": "Cask",
"liveBroadcastContent": "none"
}
}
]
}
For above API response user can specify Result Path to be /items/snippet. In this case CDAP will take all elements in the closest list which is /items and than take snippet dictionary out of every of them and generate record with the following fields: "publishedAt": "2015-02-12T22:12:43.000Z","channelId": "UCfkRcekMTa5GA2DdNKba7Jg","title": "Cask", etc.
Another example:
{
"store": {
"book": [
{
"category": "reference",
"author": "Nigel Rees",
"title": "Sayings of the Century",
"price": 8.95
},
{
"category": "fiction",
"author": "Evelyn Waugh",
"title": "Sword of Honour",
"price": 12.99
},
...
],
"bicycle": {
"color": "red",
"price": 19.95
}
},
}Here user can specify result path to be /store/book. Which will produce records with fields like: "category": "reference","author": "Nigel Rees".
Handling unknown fields:
Fields which are not present in schema get skipped.
Schema fields, which are not present in JSON response, are set to null.
If no fields from schema are found an exception is thrown. Which is handled according to "Non-HTTP Error Handling" property value.
2 XML format
We may add functionality for XML parsing to separate project so other projects can re-use that.
XML below will be used as basis for examples in this section.
<?xml version="1.0" encoding="UTF-8"?>
<bookstores>
<bookstore id="1">
<book category="cooking">
<title lang="en">Everyday Italian</title>
<author>Giada De Laurentiis</author>
<year>2005</year>
<price>
<value>15.0</value>
<policy>Discount up to 50%</policy>
</price>
</book>
<book category="web">
<title lang="en">XQuery Kick Start</title>
<author>James McGovern</author>
<author>Per Bothner</author>
<year>2003</year>
<price>
<value>49.99</value>
<policy>No discount</policy>
</price>
</book>
...
</bookstore>
<bookstore id="2">
...
</bookstore>
</bookstores>Parsing XML includes below steps:
Get nodes from XML by result XPath (provided by user). For example /bookstores/bookstore/book[@category='web'].
Convert resulting nodes to JSON (using org.json lib).
Generate StructuredRecords from JSON.
2.1 STEP 1 - Get XML by XPath
XML parsing is done by default Java DOM parser. Which is able to get items by a specified XPath. XPath is super flexible it allows user to get nodes by attribute value, as well as to group nodes from different parents into single result, as well as chose nodes conditionally etc. etc.
Some XPath examples:
/bookstores/bookstore/book[position()<3]
//title[@lang]
//title[@lang='en']
/bookstores/bookstore/book/price[text()] # convert all subelements to string
/bookstores/bookstore/book[price>35.00]/title2.2 STEP 2 - Convert XML to JSON
Converting XML to StructuredRecord is non-trivial, we should:
not lose information like attributes (id, category, lang from example)
handle name collisions between tags. XML can have multiple tags with the same name in the same node.
handle name collisions between tags and attributes, when tags are converted to record fields.
Org.json library can convert XML to JSON, which is just what we need. Below is a conversion result obtained by using org.json library.
{"bookstores": {"bookstore": [
{
"book": [
{
"year": 2005,
"author": "Giada De Laurentiis",
"price": {
"value": 15,
"policy": "Discount up to 50%"
},
"category": "cooking",
"title": {
"lang": "en",
"content": "Everyday Italian"
}
},
{
"year": 2003,
"author": [
"James McGovern",
"Per Bothner"
],
"price": {
"value": 49.99,
"policy": "No discount"
},
"category": "web",
"title": {
"lang": "en",
"content": "XQuery Kick Start"
}
},
...
],
"id": 1
},
{
...
"id": 2
}
]}}On a side note: look at author fields, they are of different type in the above JSON. This will be handled. If schema has field type = union, and there is a value not a list in place, we consider it as a list with a single element.
2.3 STEP 3 - Generate StructuredRecord from JSON
Converting JSON into Structured?ecord is a simple task. We do this via StructuredRecordStringConverter.java. Example:
{
"year": 2003,
"author": [
"James McGovern",
"Per Bothner"
],
"price": {
"value": 49.99,
"policy": "No discount"
},
"category": "web",
"title": {
"lang": "en",
"content": "XQuery Kick Start"
}
}will yield records compatible with below schema:
year: string
author: array
price: record
- value:double
- policy:string
category: string
title: record
- lang:string
- content:stringCreated in 2020 by Google Inc.