Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • WorkflowToken interface changes

    Code Block
    /**
     * Interface to represent the data that is transferred from one node to the next nodenodes in the {@link Workflow}.
     */
    public interface WorkflowToken {
    
      /**
       * Put the specified key-value entry ininto the {@link WorkflowToken}.
       * The token may store additional information about the context in which
       * this key is being set, for example, the unique name of the workflow node.
       * @param key   the key representing the entry
       * @param value the value for the key
       */
      void put(String key, String value);
    
      /**
       * SameGet keythe canmost berecent addedvalue tofor the WorkflowToken by multiple nodesspecified key.
       * This@param method returnskey the {@link List} of unique node names key to thebe valuessearched
       * that were added by@return the nodesvalue for the specified key.
    The list maintains */
      *@Nullable
    the order in which the values were inserted in the WorkflowToken for a
       * specific key. In case of fork in the Workflow, copies of the WorkflowTokenString get(String key);
    
      /**
       * Get the value set for the specified key by the specified node.
       * @param key the key to be searched
       * are@param madenodeName andthe passedname itof along the eachnode
    branch. At  * @return the join, all copies of the value set for the key by nodeName
       */
    WorkflowToken  @Nullable
    are merged together. While merging, the values from the branchString get(String key, String nodeName);
    
      /**
       * thatSame waskey completed firstcan will be added first to the WorkflowToken by multiple nodes.
       * This method returns *the Example: Consider that the following values were added to the Workflow
       * for the key "myKey". Numbers associated with the values represent
       * unique node names -{@link List} of {@link NodeValueEntry}, where
       * each entry represents the unique node name and the value that it set
       * for the specified key.
       * <p>
       * The list maintains the order in which the values were
       * inserted 3in the WorkflowToken 4for a specific key *except in the case of fork
       * and  |-->D-->E--|
       * A-->B-->C-->           >-->H-->I
       * 0   1   2  |-->F-->G--|    7   8
       *                5   6
       *join. In case of fork in the Workflow, copies of the WorkflowToken are made
       * and passed along each branch. At the join, all copies of the
       * WorkflowToken are merged together. While merging, the order in which the values were
       * inserted for a specific key is guaranteed within the same branch, but not across
       * Assumedifferent thatbranches.
    the branch containing node* 5@param finisheskey the executionkey first.to be searched
     *    * Now@return the method invocation getAll("myKey") will return the list
       * in which the keys will be ordered as 0-1-2-5-6-3-4-7-8.list of {@link NodeValueEntry} from node name to the value that node
       * added for the input key
       */
       * @paramList<NodeValueEntry> getAll(String key);
    the
    key to be/**
    searched
       * @returnGet the list{@link Map} of entrieskey-values fromthat nodewere nameadded to the value{@link thatWorkflowToken}
    node    * addedby forspecific thenode.
    input key
       */ <p>
     List<NodeValueEntry> getAll(String key);
    
      /**
       * Get the value set for the specified key by specified node * This method also accepts the optional prefix parameter. When
       * supplied, the returned map is filtered by the keys prefixed by the input prefix.
       * @param key the key to be searchedPrefix is matched along the "." boundries.
       * <p>
       * @param nodeName the name of the nodeExample: Prefix "a.b" will match with the key "a.b" or any key starting
       * @returnwith the value set for the key by nodeName
       */
      @Nullable
      String get(String key, String nodeName);
    
      /**"a.b.", however it will not match with the key "a.bc". MapReduce counters
       * from the particular node can be retrieved using prefix "mr.counters".
       * Get@param nodeName the mostunique recentname valueof for the specifiednode
    key.
       * @param keyprefix theoptional keyprefix to befilter the searchedkeys
       * @return the value for the key map of key-values that were added by the specified node
       */
      @Nullable
      String get(String keyMap<String, String> getAllFromNode(String nodeName, @Nullable String prefix);
    
      /**
       * This method is deprecated as of release 3.1. Instead to get the
       * MapReduce counters from the WorkflowToken, use the flatten key prefixed
        * by 'mr.counters.'.
       * <p>
       * Example:
       * * 1.<p>
       * <ul>
       * <li>
       *  To get the most recent value of counter with group name 
       * 'org.apache.hadoop.mapreduce.TaskCounter' and counter name 'MAP_INPUT_RECORDS'
       *  <pre>
       *    <code>
       *      String flattenCounterKey = "mr.counters.org.apache.hadoop.mapreduce.TaskCounter.MAP_INPUT_RECORDS";
       *    *  workflowToken.getgetValue(flattenCounterKey);;
       *    </code>
       *  </pre>
       * </li>
       *
       * 2.<li> To get the value of counter with group name 'org.apache.hadoop.mapreduce.TaskCounter'
       * and counter name 'MAP_INPUT_RECORDS' as set by MapReduce program with unique name 'PurchaseHistoryBuilder'
       *  <pre>
       *    <code>
       *      String flattenCounterKey = "mr.counters.org.apache.hadoop.mapreduce.TaskCounter.MAP_INPUT_RECORDS";
       *      workflowToken.getgetValue(flattenCounterKey, "PurchaseHistoryBuilder");
       *    </code>
       *  </pre>
       * </li>
       * </ul>
       * <p>
       * Get the Hadoop counters from the previous MapReduce program in the Workflow.
       * The method returns null if the counters are not set.
       * @return the Hadoop MapReduce counters set by the previous MapReduce program
       */
      @Deprecated
      @Nullable
      Map<String, Map<String, Long>> getMapReduceCounters();
    
      /**
       * Return true if the {@link WorkflowToken} contains the specified key.
       * @param key the key to be tested for the presence in the {@link WorkflowToken}
       * @return the result of the test
       */
      boolean containsKey(String key);
    }
    
    


    The method getAll(String key) in the above interface returns the List of NodeValueEntry objects. NodeValueEntry class represents nodeName and value that the node put for the specific key.

    Code Block
    /**
     * Multiple nodes in the Workflow can add the same key to the {@link WorkflowToken}.
     * This class provides a mapping from node name to the value which was set for the
     * specific key.
     */
    public final class NodeValueEntry {
      private final String nodeName;
      private final String value;
    
      public NodeValueEntry(String nodeName, String value) {
        this.nodeName = nodeName;
        this.value = value;
      }
    
      public String getNodeName() {
        return nodeName;
      }
    
      public String getValue() {
        return value;
      }
      
      ...
      // other methods like toString(), equals() and hashCode()		 
      ...
    }
  • WorkflowConfigurer interface changes

    We generate unique numeric node ids for each node when the application is deployed.  However, while writing the Workflow, users will not be aware of the node id associated with each node in the Workflow. Since WorkflowToken stores the MapReduce counters and other information per node level, users should be able to get the value of a particular key from the token as set by the particular program in the Workflow.

    If the program is used only once in a Workflow, then the user can use its name to query for the token information. However, we allow the same program to occur multiple times in a Workflow. In that case, the program name will not be sufficient to access the token.

    The WorkflowConfigurer API can be updated to allow a user to set a unique name for the program, if it occurs multiple times in a Workflow and use that unique name to retrieve the token.

    Code Block
    /**
     * Add MapReduce program to the {@link Workflow}.
     * @param uniqueName    the unique name for the MapReduce program which will be used 
     *                      to identify particular occurrence of the program in the Workflow 
     * @param mapReduceName the name of the MapReduce program
     */
    void addMapReduce(String uniqueName, String mapReduceName);


    WorkflowToken can also be updated from a predicate on the condition node. In the presence of multiple condition nodes in a Workflow, we will need the ability to specify unique names for the conditions as well so that token values from specific condition nodes can be fetched. 

    Code Block
    /**
     * Adds a condition with the unique name to the {@link Workflow}.
     * @param conditionName the unique name to be assigned to the condition
     * @param condition     the {@link Predicate} to be evaluated for the condition
     * @return the configurer for the condition
     */
    WorkflowConditionConfigurer<? extends WorkflowConfigurer> condition(String conditionName, Predicate<WorkflowContext> condition);
  • Provide ability to set and get information in the WorkflowToken

    1. MapReduce program: Users should be able to access and modify WorkflowToken from "beforeSubmit" and "onFinish" methods of the MapReduce program. Since these methods get the MapReduceContext, we will need to update the MapReduceContext interface to get the WorkflowToken.

    Code Block
    /**
     * If {@link MapReduce} program is executed as a part of the {@link Workflow} 
     * then get the {@link WorkflowToken} associated with the current run, otherwise return null.  
     * @return the {@link WorkflowToken} if available
     */
    @Nullable
    WorkflowToken getWorkflowToken();


    Consider the following code sample to update the WorkflowToken in the MapReduce program:

    Code Block
    @Override
    public void beforeSubmit(MapReduceContext context) throws Exception {
      ...
      WorkflowToken workflowToken = context.getWorkflowToken();
      if (workflowToken != null) {
        // Put the action type in the WorkflowToken
        workflowToken.put("action_type", "MAPREDUCE");
        // Put the start time for the action
        workflowToken.put("startTime", String.valueOf(System.currentTimeMillis()));
      }
      ...
    }
     
    @Override
    public void onFinish(boolean succeeded, MapReduceContext context) throws Exception {
      ...
      WorkflowToken workflowToken = context.getWorkflowToken();
      if (workflowToken != null) {
        // Put the end time for the action 
        workflowToken.put("endTime", String.valueOf(System.currentTimeMillis()));
      }
      ...
    }
    

    2. Spark program: Users should be able to access and modify WorkflowToken from "beforeSubmit" and "onFinish" methods of the Spark program. Since these methods get the SparkContext, we will need to update the SparkContext interface to get the WorkflowToken.

     

    Code Block
    /**
     * If {@link Spark} program is executed as a part of the {@link Workflow}
     * then get the {@link WorkflowToken} associated with the current run, otherwise return null.
     * @return the {@link WorkflowToken} if available
     */
    @Nullable
    WorkflowToken getWorkflowToken();

    3. Custom Workflow action: Since custom workflow actions already receive WorkflowContext, no changes are anticipated in the interface.

    Following is the sample code to get values from the WorkflowToken in custom action:

    Code Block
    @Override
    public void run() {
      ...
      WorkflowToken token = getContext().getToken();
      // set the type of the action of the current node	
      token.put("action_type", "CUSTOM_ACTION");
     
      // Assume that we have the following Workflow 
     
      //												|------->PurchaseByCustomer------->|
      //										True	|								   |	
      // Start---->RecordVerifier---->Predicate-------->|								   |------------->StatusReporter----->End 	
      //								  |				| 								   |	    |
      //								  | False		|------->PurchaseByProduct-------->|        |
      //                                  |                        									|
      //                                  |--------------------->ProblemLogger--------------------->|
     
      
      // Use case 1: Predicate can add the key "branch" in the WorkflowToken with value as "true" if true branch will be executed
      // or "false" otherwise. In "StatusReporter" in order to get which branch in the Workflow was executed
      boolean bTrueBranch = Boolean.parseBoolean(token.get("branch"));
     
      // Use case 2: User may want to compare the counters emitted by "PurchaseByCustomer" and "PurchaseByProduct", in order to find which job
      // is processing more records. 
      String purchaseByCustomerCounterValue = token.get("MapReduceCounterName", "PurchaseByCustomer");
      String purchaseByProductCounterValue = token.get("MapReduceCounterName", "PurchaseByProduct");
      
      // Use case 3: Since Workflow can have multiple complex conditions and forks in its structure, in the "StatusReporter", 
      // user may want to  know how many actions were executed as a part of this run. If the number of nodes executed were below
      // certain threshold send an alert. Assuming that every node in the Workflow adds the key "action_type" with the value as action
      // type for node in the WorkflowToken, user can further figure out the break down by action type in the particular Workflow run.
      List<NodeValueEntry> nodeValues = token.getAll("action_type");
      int totalNodeExecuted = nodeValues.size();
      int mapReduceNodes = 0;
      int sparkNodes = 0;
      int customActionNodes = 0;
      int conditions = 0;
      for (NodeValueEntry entry : nodeValues) {
        if (entry.getValue().equals("MAPREDUCE")) {
          mapReduceNodes++;
        } 
        if (entry.getValue().equals("SPARK")) {
          sparkNodes++;
        } 
        if (entry.getValue().equals("CUSTOM_ACTION")) {
          customActionNodes++;
        }
        if (entry.getValue().equals("CONDITION")) {
          conditions++;
        }
      }
    
      // Use case 4: To get the name of the last node which set the "ERROR" flag in the WorkflowToken
      List<NodeValueEntry> errorNodeValueList = token.getAll("ERROR");
      String nodeNameWhoSetTheErrorFlagLast = errorNodeValueList.get(errorNodeValueList.size() - 1);
     
      // To get the start time of the MapReduce program with unique name "PurchaseHistoryBuilder"
      String startTime = token.get("startTime", "PurchaseHistoryBuilder");
     
      // To get the most recent value of counter with group name
      // 'org.apache.hadoop.mapreduce.TaskCounter' and counter name 'MAP_INPUT_RECORDS'
      
      String flattenCounterKey = "mr.counters.org.apache.hadoop.mapreduce.TaskCounter.MAP_INPUT_RECORDS";
      workflowToken.get(flattenCounterKey);
    
      // To get the value of counter with group name 'org.apache.hadoop.mapreduce.TaskCounter'
      // and counter name 'MAP_INPUT_RECORDS' as set by MapReduce program with unique name 'PurchaseHistoryBuilder'
      workflowToken.get(flattenCounterKey, "PurchaseHistoryBuilder");
     ...
    }
  • WorkflowToken in presence of Fork and Join
    When a fork is encountered in the Workflow, we make a copy of the WorkflowToken and pass it along to each branch. At the join, we create a new WorkflowToken, which will be a merge of the WorkflowTokens associated with each of the branches of the fork. Since we are storing the information in the token at the node level, there will not be any conflicts during the merge process.

  • Persisting the WorkflowToken
    The RunRecord for the Workflow will contain the WorkflowToken as its property. This token will be updated before the execution of the action in the Workflow. We will add a version field to the RunRecord itself which will help in the upgrade process.

  • RESTful end-points to access the value of the WorkflowToken that was received by an individual node in the Workflow
    We will expose a RESTful end point to retrieve the token values that were set by a particular node as identified by its unique name.

     

    Code Block
    /apps/{app-id}/workflows/{workflow-name}/runs/{run-id}/nodes/{unique-node-name}/token

     

     

...