Salesforce Batch Source

Salesforce Batch Source

MyIntroduction 

Salesforce exposes an expressive query language "SOQL" (Salesforce Object Query Language) through the Salesforce HTTP REST API.   Extending the capabilities of Data Pipelines to include Salesforce Data will allow for improved usability and additional use cases within the CRM and Customer 360 space.

 

Use-case

A User in their Data Pipeline can retrieve opportunities and customer information through SOQL to integrate with pipeline customer data to improve overall understanding of customer interactions and the sales process.  

User Stories

  • As a data pipeline user, I would like to be able to pull data from Salesforce using a SOQL query in a batch pipeline, transform it and load it into my analytics data warehouse, so that I can use it for analytics

  • As a data pipeline user, I would like to be able to optionally specify the batch size for Salesforce API calls, so that I can guard against API limits

  • As a data pipeline user, I would like to be able to pull data for all attributes of a given object in my batch pipeline without specifying a SOQL query, so that I do not have to manually list all the columns I want to pull.

  • As a data pipeline user, I would like to support pull only data that has changed since the last time an object was read, so that I can do incremental ingestion of my data and reduce load on the APIs.

Requirements

  • For authentication client ID, client secret, user name and password. The Salesforce instance should be derived using the login API. User should not require to specify the instance.

  • Once the instance is determined, a user should be able to select an object from a list of objects available in the instance. Once he selects the object, he can then choose (multiple) attributes from a list of attributes available for that object. By default, all attributes should be selected.

  • A user can skip the above step, and instead choose to specify the data to pull as a SOQL query.

  • Users should be able to optionally specify the batch size for all Salesforce queries. If unspecified, a reasonable default should be documented and used.

User Stories - discussion results

Browsing objects and their properties

This is not be supported by cdap, until connection/source/sink consolidation work is done. For now a possibility to get Schema via UI will be implemented. Also methods to get objects and their fields will be created to easily implement this, once connection/source/sink consolidation work is done.

Defaulting to the latest version of the API. Allow user to optionally override it.

Salesforce API is not backward compatible. Salesforce Java library of certain version will support only certain versions of salesforce api. Defaulting to latest version will break things for existing customers, when new Salesforce API version is released. And they will have to go and download latest Salesforce jar to fix this.

Specifying batch size

Batches are internally created by Salesforce processing server. Their size cannot be configured. So we just have to go with what we get. More information in "MapReduce salesforce parallelization" section.

Specifying time period

Based on the discussion with @Albert Shau we have decided that adding conditions like "LastModifiedDate=TODAY AND HOUR_IN_DAY(LastModifiedDate) 1" into the query is not reliable since for complex queries user can have (e.g. nested queries, complex queries without WHERE or multiple WHERE statements) this would be very error-prone to try to insert something inside. So the approach would be describe in docs on how to specify this interval so user can easily incorporate this in the query.

Mapping types from Salesforce to CDAP

CDAP Type

Salesforce Type

Notes

CDAP Type

Salesforce Type

Notes

Boolean

_bool

 

Long

_int, _long

 

Double

_double, currency, percent

 

Date

date

Logical type

Timestamp_millis

datetime

Logical type

Time_millis

time

Logical type

String

picklist, multipicklist, combobox, reference, base64, textarea, phone, id, url, email, encryptedstring datacategorygroupreference location address anyType json complexvalue

Custom user types are also perceived as strings

 

Design

Using Bulk API to retrieve objects from SOQL

We use bulk API since SOAP api can only process hundreds of records. Please note bulk api has it's limitations caused by necessity to process large amounts of data. Bulk API query doesn’t support the following SOQL:

  • COUNT

  • ROLLUP

  • SUM

  • GROUP BY CUBE

  • OFFSET

  • Nested SOQL queries

 

Automatically choosing Api version

As per @Bhooshan Mogal : 

Let's use the latest one available right now for development, and not make it configurable. If there's ever an instance where we require a higher version, we'll release a new plugin.

Salesforce v45 will be default non configurable version

Automatically retrieving sObject from query

Salesforce API requires an sObject when when starting a job. This object is the same as the tableName we select data from. It is bad user experience if user has to specify table name two times. That's why antlr4 parser will be used to parse SOQL query and get table name out of it.

Parser should be tested against all queries from:

https://developer.salesforce.com/docs/atlas.en-us.soql_sosl.meta/soql_sosl/sforce_api_calls_soql_select_examples.htm

The page contains an example query for every syntax construct SOQL supports

Properties

Input JSON

{ "name": "HTTPSalesforce", "plugin": { "name": "HTTPSalesforce", "type": "batchsource", "label": "HTTPSalesforce", "properties": { "clientId": "XXXXXXX", "clientSecret": "XXXXXXXXX", "username": "XXXXX", "password": "XXXXX", "query": "SELECT Name FROM Opportunity", "outputSchema": "Name:String", "loginUrl": "https://login.salesforce.com/services/oauth2/token" } } }


Example

clientId:XXXX

clientSecret:XXXXXX

username:XXX

password:XXX

query:SELECT Name FROM Opportunity

outputSchema:Name:String

Output (If outputSchema is configured as mentioned in above config section)

Name

Name

United Oil Plant Standby Generators

Edge Emergency Generator

Grand Hotels Emergency Generators

Output (If outputSchema is not configured)

Records

Records

"Name":"United Oil Plant Standby Generators","attributes":{"type":"Opportunity","url":"/services/data/v37.0/sobjects/Opportunity/0062800000CQWgJAAX"}

"Name":"Edge Emergency Generator","attributes":{"type":"Opportunity","url":"/services/data/v37.0/sobjects/Opportunity/0062800000CQWgKAAX"}

"Name":"Grand Hotels Emergency Generators","attributes":{"type":"Opportunity","url":"/services/data/v37.0/sobjects/Opportunity/0062800000CQWgIAAX"}


Salesforce Batch API Examples

OAuth2 

Using the provided clilentId,clientSecret,username and password,access token and instance URI will be fetched using username and password flow of OAuth2.

e.g:

grant_type=password&client_id=<your_client_id>&client_secret=<your_client_secret>&username=<your_username>&password=<your_password>

The following parameters are required:

grant_type

Set this to password.

client_id

Application's client identifier.

client_secret

Application's client secret.

username

The API user's Salesforce.com username, of the form user@example.com.

password

The API user's Salesforce.com password.

Response would be :

{
"id":"https://login.salesforce.com/id/00D50000000IZ3ZEAW/00550000001fg5OAAQ",
"issued_at":"1296509381665",
"instance_url":"https://na1.salesforce.com",
"signature":"+Nbl5EOl/DlsvUZ4NbGDno6vn935XsWGVbwoKyXHayo=",
"access_token":"00D50000000IZ3Z!AQgAQH0Yd9M51BU_rayzAdmZ6NmT3pXZBgzkc3JTwDOGBl8BP2AREOiZzL_A2zg7etH81kTuuQPljJVsX4CPt3naL7qustlb"
}

This access token and instance URI will be used to execute the queries via bulk and soap api

Submitting a job to batch API

Below is an example of submitting a job to Salesforce bulk API and reading all the batches returned using Salesforce java library. Note, that in actual code reading batches will be done in parallel using MapReduce.

// create connection String accessToken = oauth2(config.getClientId(), config.getClientSecret(), config.getUsername(), config.getPassword(), config.getApiVersion()) // an http request BulkConnection bulkConnection = new BulkConnection(accessToken) // create batch job JobInfo job = new JobInfo(); job.setObject("Opportunity"); job.setOperation(OperationEnum.query); job.setConcurrencyMode(ConcurrencyMode.Parallel); job.setContentType(ContentType.CSV); job = bulkConnection.createJob(job); assert job.getId() != null; job = bulkConnection.getJobStatus(job.getId()); // set query and inputStream reader String query = "SELECT Name, Id, IsWon FROM Opportunity"; long start = System.currentTimeMillis(); BatchInfo info = null; ByteArrayInputStream bout = new ByteArrayInputStream(query.getBytes()); info = bulkConnection.createBatchFromStream(job, bout); String[] queryResults = new String[1]; BatchInfo[] batches = bulkConnection.getBatchInfoList(job.getId()).getBatchInfo(); // read all batches for(BatchInfo batchInfo:batches) { waitForBatchComplete(batchInfo); info = bulkConnection.getBatchInfo(job.getId(), batchInfo.getId()); if (batchInfo.getState() == BatchStateEnum.Completed) { QueryResultList list = bulkConnection.getQueryResultList(job.getId(), batchInfo.getId()); queryResults = ArrayUtils.add(queryResults, list.getResult()); break; } else if (batchInfo.getState() == BatchStateEnum.Failed) { System.out.println("-------------- failed ----------" + info); break; } else { System.out.println("-------------- waiting ----------" + info); } } // read results List<String> results = new ArrayList<>(); if (queryResults != null) { for (String resultId : queryResults) { InputStream queryResultStream = bulkConnection.getQueryResultStream(job.getId(), info.getId(), resultId); results.add(CharStreams.toString(new InputStreamReader(queryResultStream, Charsets.UTF_8))); } }

 

 

Implementation

Plugin Methods

prepareRun:

  • setup a mapreduce SalesforceInputFormat class

  • setup linage recorder

configurePipeline:

Check if authentication can be done and if salesforce api version and object are valid.

transform:

Build StructuredRecord from record in Salesforce format.

onRunFinish:

Get job status from api and make sure it was successfully complete. If not throw an exception.

Salesforce Mapreduce parallelization

One MapReduce split equals to a single batch returned by Salesforce bulk API. Batches are internally created by Salesforce processing server. Their size cannot be configured. So we just have to go with what we get. 

Salesforce will split data into batches according to the following criteria:

  • A batch can contain a maximum of 10,000 records.

  • A batch can contain a maximum of 10,000,000 characters for all the data in a batch.

  • A field can contain a maximum of 32,000 characters.

  • A record can contain a maximum of 5,000 fields.

  • A record can contain a maximum of 400,000 characters for all its fields.

  • A batch must contain some content or an error occurs.

 

Most significant methods from InputFormat and RecordReader:

SalesforceInputFormat.getSplits():

create a salesforce bulk job. And return the batches created as separate splits 

SalesforceRecordReader.initialize():

Wait for current batch status = complete. And get the records from the batch. If batch status = failed throw exception.

SalesforceRecordReader.nextKeyValue():

Iterate over records from current batch. As a value we will use single record we get from batch. As a key we will use NullWritable, since we don't need MapReduce to reduce any records in our case.

Table of Contents

Checklist

User stories documented 
User stories reviewed 
Design documented 
Design reviewed 
Feature merged 
Examples and guides 
Integration tests 
Documentation for feature 
Short video demonstrating the feature

 

Created in 2020 by Google Inc.