Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • I want the ability to pass the custom data (such as metric, status, error codes etc.) from one program in the Workflow to the next subsequent programs in the form of a token. 
  • At any node in the Workflow, I want ability to query the data from the token.
  • I want ability to fetch the data from the token which was set by a specific node.  
  • I want ability to find the name of the node which most recently set the token value for the a specific key; e.g., the node who last set the ERROR flag in the token, so that I can take appropriate action (such as logging or improving its code) on it.
  • I want to have the conditional execution in the Workflow based on the information contained in the token.
  • I want to terminate the execution if some a node in the Workflow produce produces unexpected results.

As an admin/support person/developer of the Workflow application -

  • I want the ability to query the WorkflowToken from the past runs for running analysis such as which node is executed more frequently and why.
  • I want the ability to query the token values which were added by the a specific node in the Workflow to debug the flow of execution. 

...

  • WorkflowToken interface changes

    Code Block
    /**
     * Interface to represent the data that is transferred from one node to the next nodes in the {@link Workflow}.
     */
    @Beta
    public interface WorkflowToken {
    
      /**
       * PutKeys the specified key-value entry into in the {@link WorkflowToken}. can be added *by Theuser, tokenusing maythe
    store additional information about* the{@link contextWorkflowToken#put} inmethod. whichThese keys are added *under thisthe key{@link isScope#USER} beingscope.
    set, for example, the* uniqueCDAP namealso ofadds the workflow nodesome keys to the {@link WorkflowToken}. for e.g. MapReduce counters.
       * The keys @paramadded keyby CDAP gets theadded keyunder representing{@link theScope#SYSTEM} entryscope.
       */
     @param valuepublic theenum valueScope for{
    the key   USER,
    */   void put(StringSYSTEM
    key, String value);}
    
      /**
       * GetPut the specified mostkey recentand value forinto the specified{@link keyWorkflowToken}.
       * @paramThe keytoken themay keystore toadditional beinformation searched
       * @return about the {@linkcontext Value} for the keyin which
       */
      @Nullable
      Value get(String this key); is   /**
       * Get the value set forbeing set, for example, the specifiedunique keyname byof the specifiedworkflow node.
       * @param key the key torepresenting bethe searchedentry
       * @param nodeNamevalue the name of the node
       * @return the {@link Value} set value for the key
    by nodeName
       */
      @Nullable
      Value getvoid put(String key, String nodeNamevalue);
    
      /**
       * Put Samethe specified key canand be{@link addedValue} tointo the {@link WorkflowToken by multiple nodes}.
       * ThisThe token methodmay returnsstore theadditional {@linkinformation List}about ofthe {@link NodeValueEntry}, wherecontext in which
       * each entry representsthis key is being set, for example, the unique node name andof the valueworkflow thatnode.
    it set  * @param *key for the specified key. representing entry
     * <p>    * The@param list maintainsvalue the order{@link inValue} whichfor the values werekey
       */
    inserted in the WorkflowToken for a specific key except in the case of forkvoid put(String key, Value value);
    
      /**
       * andGet join.the Inmost caserecent ofvalue forkadded infor the Workflow,specified copieskey offor thea WorkflowToken{@link areScope#USER} madescope.
       * and passed along each branch. At@param key the join,key allto copiesbe ofsearched
    the    * WorkflowToken@return arethe merged together. While merging, the order in which the values were{@link Value} for the key or <code>null</code> if the key does not
       * inserted for a specific key is guaranteed within the same branch, but not across
       * different branchesexist in the {@link Scope#USER} scope
       */
      @Nullable
      Value get(String key);
    
      /**
       * Get the most recent value for the specified key for a given scope.
       * @param key the key to be searched
       * @return@param scope the list of {@link NodeValueEntryWorkflowToken.Scope} fromfor nodethe namekey
    to the value that node* @return the {@link Value} for the key from the specified scope or <code>null</code> if the key
       * does addednot exist forin the inputgiven keyscope
       */
      @Nullable
      List<NodeValueEntry>Value getAllget(String key, Scope scope);
    
      /**
       * Get the {@link Map} of key-values that were added to thevalue set for the specified key by the specified node for a {@link WorkflowTokenScope#USER}  scope.
       * by specific node. @param key the key to be searched
       * @param nodeName the unique name of the node
       * @return the map of key-{@link Value} that were set for the key by nodeName or <code>null</code> if the key is not
       * added by the specified node nodeName in the {@link Scope#USER} scope
       */
      Map<String, Value> getAllFromNode@Nullable
      Value get(String key, String nodeName);
    
      /**
       * Get the systemvalue valuesset for thatthe werespecified addedkey toby the {@link WorkflowToken} by specific node. 
       specified node for a given scope.
       * @param key the key to be searched
       * @param nodeName the unique name of the node
       * @param @returnscope the map of key-{@link ValueWorkflowToken.Scope} thatfor werethe addedkey
    by the specific node* @return the {@link */Value} set for Map<String,the Value>key getAllSystemValuesFromNode(Stringby nodeName);
    
      /** for a given scope or <code>null</code>
       * if Thisthe methodkey is deprecatednot asadded ofby release 3.1. Instead to get thethe nodeName in the given scope
       */
    MapReduce counters from@Nullable
    the WorkflowToken, useValue the flatten key prefixed
       * by 'mr.counters'.get(String key, String nodeName, Scope scope);
    
      /**
       * <p>Same key can be *added Example:to the {@link WorkflowToken} by *multiple <p>nodes.
       * <ul>This method returns the *{@link <li>List} of {@link NodeValue}, *where
     To get the* mosteach recententry valuerepresents ofthe counterunique with groupnode name and the {@link * 'org.apache.hadoop.mapreduce.TaskCounter' and counter name 'MAP_INPUT_RECORDS'Value} that it set
       * for <pre>the specified key for *a {@link Scope#USER}  <code>scope.
       * <p>
       * StringThe flattenCounterKeylist = "mr.counters.org.apache.hadoop.mapreduce.TaskCounter.MAP_INPUT_RECORDS";
       *  maintains the order in which the values were
       workflowToken.getValue(flattenCounterKey);
       *    </code>
       *  </pre>* inserted in the WorkflowToken for a specific key except in the case of fork
       * </li>and join. In case *of fork in the *Workflow, <li>copies To getof the valueWorkflowToken ofare countermade
    with group name 'org.apache.hadoop.mapreduce.TaskCounter'
       * and counterpassed name 'MAP_INPUT_RECORDS' as set by MapReduce program with unique name 'PurchaseHistoryBuilder'along each branch. At the join, all copies of the
       * WorkflowToken <pre>are merged together. While *merging, the order in <code>which the values were
    *   * inserted for Stringa flattenCounterKeyspecific = "mr.counters.org.apache.hadoop.mapreduce.TaskCounter.MAP_INPUT_RECORDS";
       *      workflowToken.getValue(flattenCounterKey, "PurchaseHistoryBuilder");
       *    </code>
       *  </pre>key is guaranteed within the same branch, but not across
       * different branches.
       * @param key the key to be searched
       * </li>@return the list of * </ul>
       * <p>
       * Get the Hadoop counters from the previous MapReduce program in the Workflow.
       * The method returns null if the counters are not set.
       * @return the Hadoop MapReduce counters set by the previous MapReduce program
       */
      @Deprecated
      @Nullable
      Map<String, Map<String, Long>> getMapReduceCounters();
    
      /**
       * Return true if the {@link WorkflowToken} contains the specified key{@link NodeValue} from node name to the value that node
       * added for the input key
       */
      List<NodeValue> getAll(String key);
    
      /**
       * Same key can be added to the WorkflowToken by multiple nodes.
       * This method returns the {@link List} of {@link NodeValue}, where
       * each entry represents the unique node name and the {@link Value} that it set
       * for the specified key for a given scope.
       * @param<p>
    key the key to* beThe testedlist formaintains the presenceorder in which the {@linkvalues WorkflowToken}were
       * @returninserted in the resultWorkflowToken offor thea testspecific key except in */the case of booleanfork
    containsKey(String key); } 
    
    The method getAll(String key) in the above interface returns the List of NodeValueEntry objects. NodeValueEntry class represents nodeName and value that the node put for the specific key.
    Code Block
    /**
     * Multiple nodes* and join. In case of fork in the Workflow, cancopies addof the sameWorkflowToken keyare tomade
    the {@link WorkflowToken}.  * Thisand classpassed providesalong aeach mappingbranch. fromAt nodethe namejoin, toall thecopies valueof whichthe
    was set for the* WorkflowToken *are specificmerged keytogether. While */merging, publicthe finalorder classin NodeValueEntrywhich {the values were
    private final String nodeName;* inserted for privatea finalspecific Valuekey value;is guaranteed within the public NodeValueEntry(String nodeName, String value) {same branch, but not across
       * different thisbranches.nodeName
    = nodeName;  * @param key this.valuethe =key value;to be searched
    }   * public@param Stringscope getNodeName()the {@link WorkflowToken.Scope} for the  return nodeName;
      }
    
      public Value getValue() {key
       * @return the list of {@link NodeValue} from node name to the value that node
       * returnadded value;for the input }key for  a given scope
    ...   *//
    other methods likeList<NodeValue> toStringgetAll()String key, equals() and hashCode()		 Scope scope);
    
     ... }
    The method getAllSystemValuesFromNode(String nodeName) returns the map of string keys to the Value class. The details of the Value class are as follows:
    Code Block
    /**
       * RepresentsGet the value{@link forMap} theof specific key in theto {@link WorkflowTokenValue}s that */were publicadded finalto classthe Value {@link WorkflowToken}
     private final String* value;by specific node for public Value(String value) {a {@link Scope#USER} scope.
       * this.value@param =nodeName value;the unique name }of the node
     public String getAsString()* {@return the map of key returnto value;values that were }added by the specified publicnode
    long getAsLong() { */
      Map<String, returnValue> Long.parseLong(valuegetAllFromNode(String nodeName);
    
     } /**
       public* intGet getAsInt()the {@link Map} of key to {@link  return Integer.parseInt(value);
      }
    }

    Ability to include same MapReduce program multiple times in the Workflow

    Potential Use Case: Email campaign generates two categories of events - send events (SUCCESS, FAIL) and tracking events (OPEN, CLICK etc.). Records representing the send event and tracking event have different number of fields. EventParser MapReduce program should be able to parse these two categories of the events and create the list Event object per audience id.

    Tracking event format:

    audience_id,event_sub_type,ip_address,device_type,event_time,link

    Example records:

    bob,CLICK,192.168.29.10,android,1436311150092,http://www.somedomain.com

    adam,CLICK,192.168.29.18,ipad,1436311232276,http://www.anotherdomain.com

    Send event format:

    audience_id,event_sub_type,ip_address,deliveryCode,event_time

    Example records:

    bob,SEND,192.168.29.10,SUCCESS,1436311232276

    adam,SEND,192.168.29.10,FAILED,1436311450437

    Approach 1: Add same MapReduce/Spark program multiple times in the Application.

    API changes for the ApplicationConfigurer to allow adding MapReduce/Spark program in the Application with the explicit name.

    Code Block
    languagejava
    /**
     * Adds a {@link MapReduce} to the Application.
     * @param name the name to be given to the {@link MapReduce} program
     * @param mapReduce the {@link MapReduce} program to be included in the Application
     */
    void addMapReduce(String name, MapReduce mapReduce);
     
    /**
     * Adds a {@link Spark} to the Application.
     * @param name the name to be given to the {@link Spark} program
     * @param spark the {@link Spark} program to be included in the Application
     */
    void addSpark(String name, Spark spark);
    

    EventParser application:

    Code Block
    languagejava
    public class EventParserApp extends AbstractApplication {
      @Override
      public void configure() {
    	// Stream to receive send events
    	addStream(new Stream("send"));
     	// Stream to receive tracking events
    	addStream(new Stream("tracking"));
     
        // Add EventParser MapReduce program multiple times in the application with different properties
      	Map<String, String> properties = Maps.newHashMap();
    	properties.put("input.stream", "tracking");
    	// 'trackingParser' is instance of the EventParser which will read the 'tracking' stream 
    	addMapReduce("trackingParser", new EventParser(properties));
    
    	properties = Maps.newHashMap();
    	properties.put("input.stream", "success");
    	// 'sendParser' is instance of the EventParser which will read the 'send' stream 
    	addMapReduce("sendParser", new EventParser(properties));
     
    	// Add Workflow which will process the tracking and send events in parallel
    	addWorkflow(new EventParserWorkflow());
      }
    }

    EventParser MapReduce program:

    Code Block
    languagejava
    public class EventParser extends AbstractMapReduce {
    
      private final Map<String, String> properties;
      public EventParser(Map<String, String> properties) {
        this.properties = properties;
      }
     
      @Override
      public void configure() {
        setDescription("MapReduce program for parsing the email events and storing them in the dataset.");
        // Serialize the properties
      	setProperties(properties);
      	setOutputDataset("events");
      }
     
      @Override
      public void beforeSubmit(MapReduceContext context) throws Exception {
      	Job job = context.getHadoopJob();
      	job.setMapperClass(EventParserMapper.class);
      	job.setReducerClass(EventParserReducer.class);
    
      	job.setMapOutputKeyClass(Text.class);
      	job.setOutputValueClass(Event.class);
    
      	job.setNumReduceTasks(1);
    	String streamToVerify = context.getSpecification().getProperties().get("input.stream");
      	job.getConfiguration().set("input.stream", streamToVerify);
    
      	// Read the purchase events from the last 60 minutes as input to the mapper.
      	final long endTime = context.getLogicalStartTime();
      	final long startTime = endTime - TimeUnit.MINUTES.toMillis(60);
      	StreamBatchReadable.useStreamInput(context, streamToVerify, startTime, endTime);
      }
    }
     
    // EventParserMapper
    public static class EventParserMapper extends Mapper<LongWritable, Text, Text, Event>  {
    
      @Override
      public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String logEvent = value.toString();
        if (logEvent.isEmpty()) {
          return;
        }
        
        String inputStream = context.getConfiguration().get("input.stream");
        Event event;
        if(inputStream.equals("send")) {
          event = getSendEvent(logEvent);
        } else {
          event = getTrackingEvent(logEvent);
        }
        if (event != null) {
          context.write(new Text(event.getAudienceId()), event);
        }
      }
      
      private Event getSendEvent(String logEvent) {
        String seperator = "::";
        int fieldLength = 5;
        String[] fields = logEvent.split(seperator);
        if (fields.length != fieldLength) {
          return null;
        }
        String audienceId = fields[0];
        String eventType = fields[1];
        String ipAddress = fields[2];
        String deliveryCode = fields[3];
        String eventTime = fields[4];
        
        return new Event(audienceId, eventType, ipAddress, eventTime, deliveryCode)
      }
      
      private Event getTrackingEvent(String logEvent) {
        String seperator = ",";
        int fieldLength = 6;
        String[] fields = logEvent.split(seperator);
        if (fields.length != fieldLength) {
          return null;
        }
        String audienceId = fields[0];
        String eventType = fields[1];
        String ipAddress = fields[2];
        String deviceType = fields[3];
        String eventTime = fields[4];
        String link = fields[5];
        return new Event(audienceId, eventType, ipAddress, eventTime, deviceType + "&&" + link);
      }
    }
     

     

    Approach 2: WorkflowConfigurer interface changes

    We generate unique numeric node ids for each node when the application is deployed.  However, while writing the Workflow, users will not be aware of the node id associated with each node in the Workflow. Since WorkflowToken stores the MapReduce counters and other information per node level, users should be able to get the value of a particular key from the token as set by the particular program in the Workflow.

    If the program is used only once in a Workflow, then the user can use its name to query for the token information. However, we allow the same program to occur multiple times in a Workflow. In that case, the program name will not be sufficient to access the token.

    The WorkflowConfigurer API can be updated to allow a user to set a unique name for the program, if it occurs multiple times in a Workflow and use that unique name to retrieve the token.
    Code Block
    /**
     * Add MapReduce program to the {@link Workflow}.
     * @param uniqueName    the unique name for the MapReduce program which will be used 
     *Value}s that were added to the {@link WorkflowToken}
       * by specific node for a given scope.
       * @param nodeName the unique name of the node
       * @param scope the {@link WorkflowToken.Scope} for the key
       * @return the map of key to values that were added by the specified node for a given scope
       */
      Map<String, Value> getAllFromNode(String nodeName, Scope scope);
    
      /**
       * Same key can be added to the WorkflowToken by multiple nodes.
       * This method returns the key to {@link List} of {@link NodeValue}
       * added in the {@link Scope#USER} scope.
       * @return the {@link Map} of key to {@link List} of {@link NodeValue} added for
       * the given scope
       */
      Map<String, List<NodeValue>> getAll();
    
      /**
       * Same key can be added to the WorkflowToken by multiple nodes.
       * This method returns the key to {@link List} of {@link NodeValue}
       * added in the {@link WorkflowToken.Scope} provided.
       * @param scope the scope for the key
       * @return the {@link Map} of key to {@link List} of {@link NodeValue} added for
       * the given scope
       */
      Map<String, List<NodeValue>> getAll(Scope scope);
    
      /**
       * This method is deprecated as of release 3.1.
       * Get the Hadoop counters from the previous MapReduce program in the Workflow.
       * The method returns null if the counters are not set.
       * @return the Hadoop MapReduce counters set by the previous MapReduce program
       */
      @Deprecated
      @Nullable
      Map<String, Map<String, Long>> getMapReduceCounters();
    }
     


    The method getAll(String key) in the above interface returns the List of NodeValue objects. NodeValue class represents nodeName and value that the node put for the specific key.

    Code Block
    /**
     * Multiple nodes in the Workflow can add the same key to the {@link WorkflowToken}.
     * This class provides a mapping from node name to the {@link Value} which was set for the
     * specific key.
     */
    public final class NodeValue implements Serializable {
    
      private static final long serialVersionUID = 6157808964174399650L;
    
      private final String nodeName;
      private final Value value;
    
      public NodeValue(String nodeName, Value value) {
        this.nodeName = nodeName;
        this.value = value;
      }
    
      public String getNodeName() {
        return nodeName;
      }
    
      public Value getValue() {
        return value;
      }
    
      ...
      // other methods like toString(), equals() and hashCode()		 
      ...
    }
     

    The details of the Value class are as follows:

    Code Block
    /**
     * Class representing the value of the key in the {@link WorkflowToken}.
     */
    public class Value implements Serializable {
    
      private static final long serialVersionUID = -3420759818008526875L;
    
      private final String value;
    
      private Value(String value) {
        this.value = value;
      }
    
      /**
       * @return the boolean value
       */
      public boolean getAsBoolean() {
        return Boolean.parseBoolean(value);
      }
    
      /**
       * @return the int value
       */
      public int getAsInt() {
        return Integer.parseInt(value);
      }
    
      /**
       * @return the long value
       */
      public long getAsLong() {
        return Long.parseLong(value);
      }
    
      /**
       * @return the String value
       */
      @Override
      public String toString() {
        return value;
      }
    
    }
  • Ability to include same program multiple times in the Workflow

    This can be achieved without making any changes to the API. Consider the following use case -  

    Use Case: Email campaign generates two categories of events - send events (SUCCESS, FAIL) and tracking events (OPEN, CLICK etc.). Records representing the send event and tracking event have different schema. These two categories of the events are sent to CDAP using streams "send" and "tracking".

    Tracking event format:

    audience_id,event_type,ip_address,device_type,event_time,link

    Example records:

    bob,CLICK,192.168.29.10,android,1436311150092,http://www.somedomain.com

    adam,CLICK,192.168.29.18,ipad,1436311232276,http://www.anotherdomain.com

    Send event format:

    audience_id::event_sub_type::ip_address::deliveryCode::event_time

    Example records:

    bob::SEND::192.168.29.10::SUCCESS::1436311232276

    adam::SEND::192.168.29.9::SUCCESS::1436311434476

    Same MapReduce program "EventParser" can be used in the Workflow to parse these two categories of the events in parallel and create the list Event object per audience id. 

    EventParser application:

    Code Block
    languagejava
    public class EventParserApp extends AbstractApplication {
      @Override
      public void configure() {
    	// Stream to receive send events
    	addStream(new Stream("send"));
     	// Stream to receive tracking events
    	addStream(new Stream("tracking"));
     
        // Add EventParser MapReduce program multiple times in the application with different properties
      	Map<String, String> properties = Maps.newHashMap();
    	properties.put("input.stream", "tracking");
    	// 'trackingParser' is instance of the EventParser which will read the 'tracking' stream 
    	addMapReduce(new EventParser("trackingParser", properties));
    
    	properties = Maps.newHashMap();
    	properties.put("input.stream", "send");
    	// 'sendParser' is instance of the EventParser which will read the 'send' stream 
    	addMapReduce(new EventParser("sendParser", properties));
     
    	// Add Workflow which will process the tracking and send events in parallel
    	addWorkflow(new EventParserWorkflow());
      }
    }

    EventParser MapReduce program:

    Code Block
    languagejava
    public class EventParser extends AbstractMapReduce {
      private final String name;
      private final Map<String, String> properties;
    
      public EventParser(String name, Map<String, String> properties) {
        this.name = name;
        this.properties = properties;
      }
     
      @Override
      public void configure() {
        setName(name);
        setDescription("MapReduce program for parsing the email events and storing them in the dataset.");
        // Serialize the properties
      	setProperties(properties);
      	setOutputDataset("events");
      }
     
      @Override
      public void beforeSubmit(MapReduceContext context) throws Exception {
      	Job job = context.getHadoopJob();
      	job.setMapperClass(EventParserMapper.class);
      	job.setReducerClass(EventParserReducer.class);
    
      	job.setMapOutputKeyClass(Text.class);
      	job.setOutputValueClass(Event.class);
    
      	job.setNumReduceTasks(1);
    	String streamToVerify = context.getSpecification().getProperties().get("input.stream");
      	job.getConfiguration().set("input.stream", streamToVerify);
    
      	// Read the purchase events from the last 60 minutes as input to the mapper.
      	final long endTime = context.getLogicalStartTime();
      	final long startTime = endTime - TimeUnit.MINUTES.toMillis(60);
      	StreamBatchReadable.useStreamInput(context, streamToVerify, startTime, endTime);
      }
    }
     
    // EventParserMapper
    public static class EventParserMapper extends Mapper<LongWritable, Text, Text, Event>  {
    
      @Override
      public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String logEvent = value.toString();
        if (logEvent.isEmpty()) {
          return;
        }
        
        String inputStream = context.getConfiguration().get("input.stream");
        Event event;
        if(inputStream.equals("send")) {
          event = getSendEvent(logEvent);
        } else {
          event = getTrackingEvent(logEvent);
        }
        if (event != null) {
          context.write(new Text(event.getAudienceId()), event);
        }
      }
      
      private Event getSendEvent(String logEvent) {
        String seperator = "::";
        int fieldLength = 5;
        String[] fields = logEvent.split(seperator);
        if (fields.length != fieldLength) {
          return null;
        }
        String audienceId = fields[0];
        String eventType = fields[1];
        String ipAddress = fields[2];
        String deliveryCode = fields[3];
        String eventTime = fields[4];
        
        return new Event(audienceId, eventType, ipAddress, eventTime, deliveryCode)
      }
      
      private Event getTrackingEvent(String logEvent) {
        String seperator = ",";
        int fieldLength = 6;
        String[] fields = logEvent.split(seperator);
        if (fields.length != fieldLength) {
          return null;
        }
        String audienceId = fields[0];
        String eventType = fields[1];
        String ipAddress = fields[2];
        String deviceType = fields[3];
        String eventTime = fields[4];
        String link = fields[5];
        return new Event(audienceId, eventType, ipAddress, eventTime, deviceType + "&&" + link);
      }
    }
     

    EventParserWorkflow:

    Code Block
    languagejava
    public class EventParserWorkflow extends AbstractWorkflow {
      @Override
      protected void configure() {
        fork()
          .addMapReduce("trackingParser")
        .also()
          .addMapReduce("sendParser")
        .join();
      }
    }
  • Provide ability to set and get information in the WorkflowToken

    1. MapReduce program: Users should be able to access and modify WorkflowToken from "beforeSubmit" and "onFinish" methods of the MapReduce program. Since these methods get the MapReduceContext, we will need to update the MapReduceContext interface to get the WorkflowToken.

    Code Block
    /**
     * If {@link MapReduce} program is executed as a part of the {@link Workflow} 
     * then get the {@link WorkflowToken} associated with the current run, otherwise return null.  
     * @return the {@link WorkflowToken} if available
     */
    @Nullable
    WorkflowToken getWorkflowToken();


    Consider the following code sample to update the WorkflowToken in the MapReduce program:

    Code Block
    @Override
    public void beforeSubmit(MapReduceContext context) throws Exception {
      ...
      WorkflowToken workflowToken = context.getWorkflowToken();
      if (workflowToken != null) {
        // Put the action type in the WorkflowToken
        workflowToken.put("action.type", "MAPREDUCE");
        // Put the start time for the action
        workflowToken.put("start.time", String.valueOf(System.currentTimeMillis()));
      }
      ...
    }
     
    @Override
    public void onFinish(boolean succeeded, MapReduceContext context) throws Exception {
      ...
      WorkflowToken workflowToken = context.getWorkflowToken();
      if (workflowToken != null) {
        // Put the end time for the action 
        workflowToken.put("end.time", String.valueOf(System.currentTimeMillis()));
      }
      ...
    }
    

    2. Spark program: Users should be able to access and modify WorkflowToken from "beforeSubmit" and "onFinish" methods of the Spark program. Since these methods get the SparkContext, we will need to update the SparkContext interface to get the WorkflowToken.

     

    Code Block
    /**
     * If {@link Spark} program is executed as a part of the {@link Workflow}
     * then get the {@link WorkflowToken} associated with the current run, otherwise return null.
     * @return the {@link WorkflowToken} if available
     */
    @Nullable
    WorkflowToken getWorkflowToken();

    3. Custom Workflow action: Since custom workflow actions already receive WorkflowContext, no changes are anticipated in the interface.

    Following is the sample code to get values from the WorkflowToken in custom action:

    Code Block
    @Override
    public void run() {
      ...
      WorkflowToken token = getContext().getToken();
      // set the type of the action of the current node	
      token.put("action.type", "CUSTOM_ACTION");
     
      // Assume that we have the following Workflow 
     
      //												|------->PurchaseByCustomer------->|
      //										True	|								   |	
      // Start---->RecordVerifier---->Predicate-------->|								   |------------->StatusReporter----->End 	
      //								  |				| 								   |	    |
      //								  | False		|------->PurchaseByProduct-------->|        |
      //                                  |       to identify particular occurrence of the program in the Workflow   * @param mapReduceName the name of the MapReduce program
     */
    void addMapReduce(String uniqueName, String mapReduceName);
    WorkflowToken can also be updated from a predicate on the condition node. In the presence of multiple condition nodes in a Workflow, we will need the ability to specify unique names for the conditions as well so that token values from specific condition nodes can be fetched. 
    Code Block
    /**
     * Adds a condition with the unique name to the {@link Workflow}.
     * @param conditionName the unique name to be assigned to the condition
     * @param condition     the {@link Predicate} to be evaluated for the condition
     * @return the configurer for the condition
     */
    WorkflowConditionConfigurer<? extends WorkflowConfigurer> condition(String conditionName, Predicate<WorkflowContext> condition);
    Provide ability to set and get information in the WorkflowToken
    1. MapReduce program: Users should be able to access and modify WorkflowToken from "beforeSubmit" and "onFinish" methods of the MapReduce program. Since these methods get the MapReduceContext, we will need to update the MapReduceContext interface to get the WorkflowToken.
    Code Block
    /**
     * If {@link MapReduce} program is executed as a part of the {@link Workflow} 
     * then get the {@link WorkflowToken} associated with the current run, otherwise return null.  
     * @return the {@link WorkflowToken} if available
     */
    @Nullable
    WorkflowToken getWorkflowToken();

    Consider the following code sample to update the WorkflowToken in the MapReduce program:

    Code Block
    @Override
    public void beforeSubmit(MapReduceContext context) throws Exception {
      ...
      WorkflowToken workflowToken = context.getWorkflowToken();
      if (workflowToken != null) {
        // Put the action type in the WorkflowToken
        workflowToken.put("action_type", "MAPREDUCE");
        // Put the start time for the action
        workflowToken.put("startTime", String.valueOf(System.currentTimeMillis()));
      }
      ...
    }
     
    @Override
    public void onFinish(boolean succeeded, MapReduceContext context) throws Exception {
      ...
      WorkflowToken workflowToken = context.getWorkflowToken();
      if (workflowToken != null) {
        // Put the end time for the action 
        workflowToken.put("endTime", String.valueOf(System.currentTimeMillis()));
      }
      ...
    }
    

    2. Spark program: Users should be able to access and modify WorkflowToken from "beforeSubmit" and "onFinish" methods of the Spark program. Since these methods get the SparkContext, we will need to update the SparkContext interface to get the WorkflowToken.

     

    Code Block
    /**
     * If {@link Spark} program is executed as a part of the {@link Workflow}
     * then get the {@link WorkflowToken} associated with the current run, otherwise return null.
     * @return the {@link WorkflowToken} if available
     */
    @Nullable
    WorkflowToken getWorkflowToken();
    3. Custom Workflow action: Since custom workflow actions already receive WorkflowContext, no changes are anticipated in the interface.

    Following is the sample code to get values from the WorkflowToken in custom action:

    Code Block
    @Override
    public void run() {
      ...
      WorkflowToken token = getContext().getToken();
      // set the type of the action of the current node	
      token.put("action_type", "CUSTOM_ACTION");
     
      // Assume that we have the following Workflow 
     
      //												|------->PurchaseByCustomer------->|
      //										True	|								   |	
      // Start---->RecordVerifier---->Predicate-------->|								   |------------->StatusReporter----->End 	
      //								  |				| 								   |	    |
      //								  | False		|------->PurchaseByProduct-------->|        |
      //                                  |                        									|
      //                                  |--------------------->ProblemLogger--------------------->|
     
      
      // Use case 1: Predicate can add the key "branch" in the WorkflowToken with value as "true" if true branch will be executed
      // or "false" otherwise. In "StatusReporter" in order to get which branch in the Workflow was executed
      boolean bTrueBranch = Boolean.parseBoolean(token.get("branch"));
     
      // Use case 2: User may want to compare the counters emitted by "PurchaseByCustomer" and "PurchaseByProduct", in order to find which job
      // is processing more records. 
      String purchaseByCustomerCounterValue = token.get("MapReduceCounterName", "PurchaseByCustomer");
      String purchaseByProductCounterValue = token.get("MapReduceCounterName", "PurchaseByProduct");
      
      // Use case 3: Since Workflow can have multiple complex conditions and forks in its structure, in the "StatusReporter", 
      // user may want to  know how many actions were executed as a part of this run. If the number of nodes executed were below
      // certain threshold send an alert. Assuming that every node in the Workflow adds the key "action_type" with the value as action
      // type for node in the WorkflowToken, user can further figure out the break down by action type in the particular Workflow run.
      List<NodeValueEntry> nodeValues = token.getAll("action_type");
      int totalNodeExecuted = nodeValues.size();
      int mapReduceNodes = 0;
      int sparkNodes = 0;
      int customActionNodes = 0;
      int conditions = 0;
      for (NodeValueEntry entry : nodeValues) {
        if (entry.getValue().equals("MAPREDUCE")) {
          mapReduceNodes++;
        } 
        if (entry.getValue().equals("SPARK")) {
          sparkNodes++;
        } 
        if (entry.getValue().equals("CUSTOM_ACTION")) {
          customActionNodes++;
        }
        if (entry.getValue().equals("CONDITION")) {
          conditions++;
        }
      }
    
      // Use case 4: To get the name of the last node which set the "ERROR" flag in the WorkflowToken
      List<NodeValueEntry> errorNodeValueList = token.getAll("ERROR");
      String nodeNameWhoSetTheErrorFlagLast = errorNodeValueList.get(errorNodeValueList.size() - 1);
     
      // To get the start time of the MapReduce program with unique name "PurchaseHistoryBuilder"
      String startTime = token.get("startTime", "PurchaseHistoryBuilder");
     
      // To get the most recent value of counter with group name
      // 'org.apache.hadoop.mapreduce.TaskCounter' and counter name 'MAP_INPUT_RECORDS'
      
      String flattenCounterKey = "mr.counters.org.apache.hadoop.mapreduce.TaskCounter.MAP_INPUT_RECORDS";
      workflowToken.get(flattenCounterKey);
    
      // To get the value of counter with group name 'org.apache.hadoop.mapreduce.TaskCounter'
      // and counter name 'MAP_INPUT_RECORDS' as set by MapReduce program with unique name 'PurchaseHistoryBuilder'
      workflowToken.get(flattenCounterKey, "PurchaseHistoryBuilder");
     ...
    }
    WorkflowToken in presence of Fork and Join
    When a fork is encountered in the Workflow, we make a copy of the WorkflowToken and pass it along to each branch. At the join, we create a new WorkflowToken, which will be a merge of the WorkflowTokens associated with each of the branches of the fork. Since we are storing the information in the token at the node level, there will not be any conflicts during the merge process.
    Persisting the WorkflowToken
    The RunRecord for the Workflow will contain the WorkflowToken as its property. This token will be updated before the execution of the action in the Workflow. We will add a version field to the RunRecord itself which will help in the upgrade process.
    RESTful end-points to access the value of the WorkflowToken that was received by an individual node in the Workflow
    We will expose a RESTful end point to retrieve the token values that were set by a particular node as identified by its unique name.

     

    Code Block
     									|
      //                                  |--------------------->ProblemLogger--------------------->|
     
      
      // Use case 1: Predicate can add the key "branch" in the WorkflowToken with value as "true" if true branch will be executed
      // or "false" otherwise. In "StatusReporter" in order to get which branch in the Workflow was executed
      boolean bTrueBranch = Boolean.parseBoolean(token.get("branch"));
     
      // Use case 2: User may want to compare the records emitted by "PurchaseByCustomer" and "PurchaseByProduct", in order to find which job
      // is generating more records. 
      String flattenReduceOutputRecordsCounterName = "org.apache.hadoop.mapreduce.TaskCounter.REDUCE_OUTPUT_RECORDS";
      String purchaseByCustomerCounterValue = token.get(flattenReduceOutputRecordsCounterName, "PurchaseByCustomer", WorkflowToken.Scope.SYSTEM);
      String purchaseByProductCounterValue = token.get(flattenReduceOutputRecordsCounterName, "PurchaseByProduct", WorkflowToken.Scope.SYSTEM);
      
      // Use case 3: Since Workflow can have multiple complex conditions and forks in its structure, in the "StatusReporter", 
      // user may want to  know how many actions were executed as a part of this run. If the number of nodes executed were below
      // certain threshold send an alert. Assuming that every node in the Workflow adds the key "action.type" with the value as action
      // type for node in the WorkflowToken, user can further figure out the break down by action type in the particular Workflow run.
      List<NodeValueEntry> nodeValues = token.getAll("action.type");
      int totalNodeExecuted = nodeValues.size();
      int mapReduceNodes = 0;
      int sparkNodes = 0;
      int customActionNodes = 0;
      int conditions = 0;
      for (NodeValueEntry entry : nodeValues) {
        if (entry.getValue().equals("MAPREDUCE")) {
          mapReduceNodes++;
        } 
        if (entry.getValue().equals("SPARK")) {
          sparkNodes++;
        } 
        if (entry.getValue().equals("CUSTOM_ACTION")) {
          customActionNodes++;
        }
        if (entry.getValue().equals("CONDITION")) {
          conditions++;
        }
      }
    
      // Use case 4: To get the name of the last node which set the "ERROR" flag in the WorkflowToken
      List<NodeValueEntry> errorNodeValueList = token.getAll("ERROR");
      String nodeNameWhoSetTheErrorFlagLast = errorNodeValueList.get(errorNodeValueList.size() - 1);
     
      // To get the start time of the MapReduce program with unique name "PurchaseHistoryBuilder"
      String startTime = token.get("start.time", "PurchaseHistoryBuilder");
     
      // To get the most recent value of counter with group name
      // 'org.apache.hadoop.mapreduce.TaskCounter' and counter name 'MAP_INPUT_RECORDS'
      
      String flattenCounterKey = "mr.counters.org.apache.hadoop.mapreduce.TaskCounter.MAP_INPUT_RECORDS";
      workflowToken.get(flattenCounterKey, WorkflowToken.Scope.SYSTEM);
    
      // To get the value of counter with group name 'org.apache.hadoop.mapreduce.TaskCounter'
      // and counter name 'MAP_INPUT_RECORDS' as set by MapReduce program with unique name 'PurchaseHistoryBuilder'
      workflowToken.get(flattenCounterKey, "PurchaseHistoryBuilder", WorkflowToken.Scope.SYSTEM);
     ...
    }
  • WorkflowToken in presence of Fork and Join
    When a fork is encountered in the Workflow, we make a copy of the WorkflowToken and pass it along to each branch. At the join, we create a new WorkflowToken, which will be a merge of the WorkflowTokens associated with each of the branches of the fork. Since we are storing the information in the token at the node level, there will not be any conflicts during the merge process.

  • Persisting the WorkflowToken
    The RunRecord for the Workflow will contain the WorkflowToken as its property. This token will be updated before the execution of the action in the Workflow. We will add a version field to the RunRecord itself which will help in the upgrade process.

  • RESTful end-points to access the value of the WorkflowToken that was received by an individual node in the Workflow

    1. To get the values that user put in the WorkflowToken for a particular run

     

    Code Block
    languagejava
    /apps/{app-id}/workflows/{workflow-id}/runs/{run-id}/token

    2. To get the values that CDAP put (e.g. MapReduce counters for MapReduce nodes) in the WorkflowToken for a particular run

    Code Block
    /apps/{app-id}/workflows/{workflow-id}/runs/{run-id}/token?scope=system

    3. To get the key values in the USER scope that particular node added to the WorkflowToken

    Code Block
    /apps/{app-id}/workflows/{workflow-id}/runs/{run-id}/nodes/{node-id}/token

    4. To get the key values in the SYSTEM scope that particular node added to the WorkflowToken

    Code Block
    /apps/{app-id}/workflows/{workflow-id}/runs/{run-id}/nodes/{node-id}/token?scope=system

     

     

    REST APIResponseCommentsReviewed?
    /namespaces/{namespace-id}/apps/{app-id}/workflows/{workflow-name}/runs/{run-id}/token

    Json containing the entire workflow token for a particular workflow run e.g.

    Code Block
    {
      "tokenValueMap": {
        "key1": [
          {
            "nodeName": "node1",
            "value": "value1"
          },
          {
            "nodeName": "node2",
            "value": "value2"
          }
        ],
        "key2": [
          {
            "nodeName": "node2",
            "value": "v2"
          }
        ]
      }
    }

    Response Codes: 

    200 if successful
    404 if app/workflow not found
    500 if there is an internal error

      
    /namespaces/{namespace-id}/apps/{app-id}/workflows/{workflow-name}/runs/{run-id}/nodes/{unique-node-name}/token
    Code Block
    {
      "key1": "value1",
      "key2": "value2
    }

    Response Codes: 

    200 if successful
    404 if app/workflow not found
    500 if there is an internal error