Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In this pipeline, at condition node based on number of records read from stream, we decide whether to store the result in table sink or database sink. if num records processed at python transform is less than 1M records, we use Table sink else we use Database sink. A transform stage after python transform will be responsible for writing to local dataset and adding num.records to workflow token, which will be used in condition node to make decision.

                    |--- sink1
condition1-|                        |--- transform transform2 -- sink2
                  |--- condition2--|
                                          |--- sink3

Then condition nodes can make use of the records count from workflow token and decide to perform appropriate condition, either continue executing condition node (which can make use of workflow token to perform further operation) or add program to execute next phasehere the next phase will start from sink1 or  one of (transform2  or sink3) based on condition2.

 

API

Connection class should represent information on if-branch or else-branch for connections with condition branch as source. 

...

Code Block
public class ConditionConnection extends Connection {
   private final String isIfBranch; // true of false to inform if branch is if or else.
}

 

Option - 3 :

Should this information be stored as part of ETLStage than Connection ? 

 

Code Block
titleCondition
public abstract class Condition implements PipelineConfigurable {
public static final String PLUGIN_TYPE = "condition";
/**
* Implement this method to be executed for condition logic. if this returns true, if branch will be executed, if false then else branch 
* is executed
* @param context the condition context, containing information about the pipeline run
* @throws Exception when there is failure in method execution
*/
public abstract boolean apply(ConditionContext context) throws Exception;
}

...