Control Flow support in Hydrator

About

Documenting thoughts on Control flow support in Hydrator

Status

Draft

What is this feature

Hydrator pipeline has two kinds of nodes:

  • Action nodes: That represents Control flow
  • Source, Transform, Sink, Spark Compute, Aggregation: That represents Data flow

As of 3.5 Hydrator pipelines can have both Control and Data flow, however - the control flow can be present only before the source or after the sink 

We will need capabilities to add a control flow anywhere in the Hydrator pipeline 

How does this feature help our customers 
  • Having a control flow in the pipeline can help do certain validations and run branches of the pipeline 
    • Example1:  Decision node
      • Ingest twitter data collected on remote machines and perform subsequent analytics processing (aggregation) only if the number of records ingested is above 1.0M (considering average is 5KTweets/sec with some tolerance).  
      • Reasoning: Anything less than that could mean there is a data collection problem and the pipeline should not proceed
      • This will need a decision point which is a control node that can run two different branches in a pipeline 
    • Example 2: Connector node  
      • Collect customer data from Salesforce, Mysql, Legacy CRM systems normalize the data and perform subsequent processing only if the data size is > 1M records
      • This node is similar to oozie join node
Requirements
  • New plugin type and a few plugins with the following capabilities 
    • Capabilities to specify a condition based on
      • Return status of the command that is run
      • Workflow tokens 
    • Capabilities to specify two different paths in data pipeline based on the outcome
UseCase
  • Data pipeline that processes Twitter mentions data on an hourly basis, if the number of records ingested is less than 1000 per hour, then it could indicate a problem with data collection and in that case, the rest of the pipeline to parse the data and compute analytics should not be executed 
User Stories

1) User wants to have a condition node in hydrator, this condition node will check if more than 1million records are received from the source, the next stage (transformation) is executed only when the threshold is reached.

 

Assumptions

Adding a Condition node marks a physical split in the pipeline. the stage before the condition and after the condition (unless the next stage is also a condition) executes in separate MR/Spark phases.

Design

A Transform can be inserted before condition and it can write to a local dataset and also add num records processed to workflow tokens. if condition node is after a condition node, this will use not need a local dataset in between, can use the local dataset and workflow token from previous transform.

Local dataset is used to store the intermediate result and based on the condition the next phase of execution is decided.

 

Example :
                                                                                                        |----- Table Sink
Stream batch source -> Python Transform -> Condition node ------
                                                                                                        |----- Database Sink

In this pipeline, at condition node based on number of records read from stream, we decide whether to store the result in table sink or database sink. if num records processed at python transform is less than 1M records, we use Table sink else we use Database sink. A transform stage after python transform will be responsible for writing to local dataset and adding num.records to workflow token, which will be used in condition node to make decision.

                  |--- sink1
condition1-|                        |--- transform2 -- sink2
                  |--- condition2--|
                                          |--- sink3

here the next phase will start from sink1 or  one of (transform2  or sink3) based on condition2.

 

API

Connection class should represent information on if-branch or else-branch for connections with condition branch as source. 

Option-1:

public class Connection {
private final String from;
private final String to;
@Nullable
private final boolean isIfBranch;


private final boolean isIfBranch() {
  // returns true if isIfBranch is true false otherwise, ideally this should be called only if isConditionBranch is true, 
  // if isIfBranch is null this then returns false
}

private final boolean isConditionBranch() {
  // returns true if isIfBranch is non null;
 }
}

 

Option-2:

 

public class ConditionConnection extends Connection {
   private final String isIfBranch; // true of false to inform if branch is if or else.
}

 

Option - 3 :

Should this information be stored as part of ETLStage than Connection ? 

 

Condition
public abstract class Condition implements PipelineConfigurable {
public static final String PLUGIN_TYPE = "condition";
/**
* Implement this method to be executed for condition logic. if this returns true, if branch will be executed, if false then else branch 
* is executed
* @param context the condition context, containing information about the pipeline run
* @throws Exception when there is failure in method execution
*/
public abstract boolean apply(ConditionContext context) throws Exception;
}

 

 

ConditionContext
public interface ConditionContext extends StageContext, Transactional, SecureStore, SecureStoreManager {
/**
* Returns the logical start time of the batch job which triggers this instance of an action.
* Logical start time is the time when the triggering Batch job is supposed to start if it is
* started by the scheduler. Otherwise it would be the current time when the action runs.
*
* @return Time in milliseconds since epoch time (00:00:00 January 1, 1970 UTC).
*/
long getLogicalStartTime();
/**
* Return the runtime arguments.
*/
Map<String, String> getArguments();
/**
* @return The application namespace
*/
String getNamespace();
}

 

Example Condition Plugin:

 

public class ThresholdCondition extends Condition {
 public static class ThresholdConditionConfig extends PluginConfig {
  @Description("Name of the property that will be looked up in workflow token")
  private String propertyName;
  
  @Description("Threshold condition will evaluate to true if value of property is above this threshold value")
  private Integer thresholdValue;
 }
 
 @Override
 public boolean apply(ConditionContext context) throws Exception {
 ...
 }
}

 

Limitations :

1) If the input records processed from the previous stage hits the threshold, but we are writing to a local dataset, we cannot start the next stage, unless all records from previous stage are processed completely and written to the local dataset. Next stage's input will be sourced from local dataset.

 

 

Created in 2020 by Google Inc.