Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • WorkflowToken interface changes

    Code Block
    /**
     * Interface to represent the data that is transferred from one node to the next nodes in the {@link Workflow}.
     */
    @Beta
    public interface WorkflowToken {
    
      /**
       * Keys in the {@link WorkflowToken} can be added by user, using the
       * {@link WorkflowToken#put} method. These keys are added under the {@link Scope#USER} scope.
       * CDAP also adds some keys to the {@link WorkflowToken}. for e.g. MapReduce counters.
       * The keys added by CDAP gets added under {@link Scope#SYSTEM} scope.
       */
      public enum Scope {
        USER,
        SYSTEM
      }
    
      /**
       * Put the specified key- and value entry into the {@link WorkflowToken}.
       * The token may store additional information about the context in which
       * this key is being set, for example, the unique name of the workflow node.
       * @param key the key representing the entry
       * @param value the value for the key
       */
      // TODO [CDAP-2895] put operation should throw certain exceptions
      void put(String key, String value);
    
      /**
       * GetPut the specified key mostand recent{@link valueValue} forinto the specified{@link keyWorkflowToken}.
       * @param key The token may store additional information about the keycontext toin bewhich
    searched   * *this @returnkey theis {@link Value}being set, for example, the keyunique name of the */workflow node.
     @Nullable  * Value get(String@param key); the key representing /**entry
       * Get the @param value the {@link Value} for the key
       */
      void put(String key, Value value);
    
      /**
       * Get the most recent value added for the specified key for a {@link givenScope#USER} scope.
       * @param key the key to be searched
       * @param scope@return the {@link WorkflowToken.ScopeValue} for the key
       */
    @return  @Nullable
    the {@link Value} forget(String the key);
    from
    the specified scope/**
       */ Get the @Nullable
      Value get(String key, Scope scope);
    
      /**
       * Get the value setmost recent value for the specified key byfor thea specifiedgiven nodescope.
       * @param key the key to be searched
       * @param nodeNamescope the name of{@link WorkflowToken.Scope} for the nodekey
       * @return the {@link Value} set for the key from the byspecified nodeNamescope
       */
      @Nullable
      Value get(String key, StringScope nodeNamescope);
    
      /**
       * Get the value set for the specified key by the specified node for a {@link givenScope#USER} scope.
       * @param key the key to be searched
       * @param nodeName the name of the node
       * @param@return scope the {@link WorkflowToken.ScopeValue} set for the key by nodeName
     * @return the {@link Value} set for the key by nodeName for a given scope
       * */
      @Nullable
      Value get(String key, String nodeName, Scope scope);
    
      /**
       * Same key can be added to the WorkflowToken by multiple nodes Get the value set for the specified key by the specified node for a given scope.
       * This@param methodkey returns the {@linkkey List}to ofbe {@link NodeValueEntry}, wheresearched
        * each@param entry representsnodeName the name of the unique node
    name and the value* that@param itscope setthe    *{@link WorkflowToken.Scope} for the specified key.
       * <p>@return the {@link Value} *set Thefor listthe maintainskey theby ordernodeName infor whicha thegiven values werescope
       */
    inserted in the@Nullable
    WorkflowToken for aValue specificget(String key, exceptString innodeName, theScope casescope);
    of
    fork  /**
     * and join. In case of fork in* Same key can be added to the Workflow, copies of the WorkflowToken are made
       * and passed along each branch. At the join, all copies of the
       * WorkflowToken are merged together. While merging, the order in which the values were
       * inserted for a specific key is guaranteed within the same branch, but not across{@link WorkflowToken} by multiple nodes.
       * This method returns the {@link List} of {@link NodeValueEntry}, where
       * each entry represents the unique node name and the {@link Value} that it set
       * for the specified key for a {@link Scope#USER} scope.
       * <p>
       * differentThe branches.list maintains the order *in @param keywhich the keyvalues to bewere
    searched   * *inserted @returnin the listWorkflowToken offor {@linka NodeValueEntry}specific fromkey nodeexcept name toin the valuecase thatof nodefork
       * addedand forjoin. theIn inputcase keyof fork in the */Workflow, copies of List<NodeValueEntry> getAll(String key);the WorkflowToken are made
       /** and passed along each *branch. SameAt keythe canjoin, beall addedcopies toof the
    WorkflowToken by multiple nodes.* WorkflowToken are merged *together. This method returnsWhile merging, the {@linkorder List}in ofwhich {@link NodeValueEntry}, wherethe values were
       * eachinserted entryfor representsa thespecific uniquekey nodeis nameguaranteed andwithin the value that it setsame branch, but not across
       * for the specifieddifferent branches.
       * @param key. the key to *be <p>searched
       * @return Thethe list maintains the order in which the values were of {@link NodeValueEntry} from node name to the value that node
       * insertedadded infor the WorkflowTokeninput forkey
    a specific key except*/
    in the caseList<NodeValueEntry> of forkgetAll(String key);
    
      /**
    and join. In case* ofSame forkkey incan thebe Workflow,added copiesto of the WorkflowToken are madeby multiple nodes.
       * This andmethod passedreturns alongthe each branch. At the join, all copies of the
       * WorkflowToken are merged together. While merging, the order in which the values were{@link List} of {@link NodeValueEntry}, where
       * each entry represents the unique node name and the {@link Value} that it set
       * inserted for athe specificspecified key isfor guaranteeda withingiven thescope.
    same branch, but not* across<p>
       * different branches.The list maintains the order in which the values were
       * inserted in the WorkflowToken for a specific key except in the case of fork
       * and join. In case of fork in the Workflow, copies of the WorkflowToken are made
       * and passed along each branch. At the join, all copies of the
       * WorkflowToken are merged together. While merging, the order in which the values were
       * inserted for a specific key is guaranteed within the same branch, but not across
       * different branches.
       * @param key the key to be searched
       * @param scope the {@link WorkflowToken.Scope} for the key
       * @return the list of {@link NodeValueEntry} from node name to the value that node
       * added for the input key for a given scope
       */
      List<NodeValueEntry> getAll(String key, Scope scope);
    
      /**
       * Get the {@link Map} of key to {@link Value}s that were added to the {@link WorkflowToken}
       * by @paramspecific keynode thefor keya to{@link beScope#USER} searchedscope.
       * @param scopenodeName the {@link WorkflowToken.Scope} forunique name of the keynode
       * @return the listmap of {@link NodeValueEntry} from node name to the valuekey to values that node
       *were added forby the input key for a given scope
       specified node
       */
      Map<String, List<NodeValueEntry>Value> getAllgetAllFromNode(String key, Scope scopenodeName);
    
      /**
       * Get the {@link Map} of key-values key to {@link Value}s that were added to the {@link WorkflowToken}
       * by specific node for a given scope.
       * @param nodeName the unique name of the node
       * @param scope the {@link WorkflowToken.Scope} for the key
       * @return the map of key to values that were added by the specified node for a given scope
       */
      Map<String, Value> getAllFromNode(String nodeName, Scope scope);
    
      /**
       * Get Same key can be added to the WorkflowToken {@linkby Map}multiple ofnodes.
    key-values that were added* toThis themethod {@linkreturns WorkflowToken}the key to {@link *List} byof specific node.{@link NodeValueEntry}
       * @paramadded nodeNamein the unique name of the node{@link WorkflowToken.Scope} provided.
       * @param scope the {@link WorkflowToken.Scope}scope for the key
       * @return the {@link mapMap} of key to values that were{@link List} of {@link NodeValueEntry} added byfor
    the specified node for* athe given scope
       */
      Map<String, Value>List<NodeValueEntry>> getAllFromNodegetAll(String nodeName, Scope scope);
    
      /**
       * This method is deprecated as of release 3.1.
       * Get the Hadoop counters from the previous MapReduce program in the Workflow.
       * The method returns null if the counters are not set.
       * @return the Hadoop MapReduce counters set by the previous MapReduce program
       */
      @Deprecated
      @Nullable
      Map<String, Map<String, Long>> getMapReduceCounters();
    
      /**
       * Return true if the {@link WorkflowToken} contains the specified key
       * for a {@link Scope#USER} scope.
       * @param key the key to be tested for the presence in the {@link WorkflowToken}
       * @return the result of the test
       */
      boolean containsKey(String key);
    
      /**
       * Return true if the {@link WorkflowToken} contains the specified key.
       * @param key the key to be tested for the presence in the {@link WorkflowToken}
       * @param scope the {@link WorkflowToken.Scope} for the key
       * @return the result of the test
       */
      boolean containsKey(String key, Scope scope);
    }
     


    The method getAll(String key) in the above interface returns the List of NodeValueEntry objects. NodeValueEntry class represents nodeName and value that the node put for the specific key.

    Code Block
    /**
     * Multiple nodes in the Workflow can add the same key to the {@link WorkflowToken}.
     * This class provides a mapping from node name to the value which was set for the
     * specific key.
     */
    public final class NodeValueEntry {
      private final String nodeName;
      private final Value value;
    
      public NodeValueEntry(String nodeName, String value) {
        this.nodeName = nodeName;
        this.value = value;
      }
    
      public String getNodeName() {
        return nodeName;
      }
    
      public Value getValue() {
        return value;
      }
      
      ...
      // other methods like toString(), equals() and hashCode()		 
      ...
    }


    The details of the Value class are as follows:

    Code Block
    /**
     * Represents the value for the specific key in the {@link WorkflowToken}
     */
    public final class Value {
      private final String value;
    
      public Value(String value) {
        this.value = value;
      }
    
      public String toString() {
        return value;
      }
    
      public long getAsLong() {
        return Long.parseLong(value);
      }
    }
  • Ability to include same program multiple times in the Workflow (Most of this part is now not required, since user can specify unique names in the existing API)

    Use Case: Email campaign generates two categories of events - send events (SUCCESS, FAIL) and tracking events (OPEN, CLICK etc.). Records representing the send event and tracking event have different number of fields. These two categories of the events are sent to CDAP using streams "send" and "tracking".

    Tracking event format:

    audience_id,event_type,ip_address,device_type,event_time,link

    Example records:

    bob,CLICK,192.168.29.10,android,1436311150092,http://www.somedomain.com

    adam,CLICK,192.168.29.18,ipad,1436311232276,http://www.anotherdomain.com

    Send event format:

    audience_id::event_sub_type::ip_address::deliveryCode::event_time

    Example records:

    bob::SEND::192.168.29.10::SUCCESS::1436311232276

    adam::SEND::192.168.29.9::SUCCESS::1436311434476

    Solution: 

    Same MapReduce program "EventParser" can be used in the Workflow to parse these two categories of the events in parallel and create the list Event object per audience id. 

    Step 1: Add same MapReduce/Spark program multiple times in the Application.

    API changes for the ApplicationConfigurer to allow adding MapReduce/Spark program in the Application with the explicit name.

    Code Block
    languagejava
    /**
     * Adds a {@link MapReduce} to the Application.
     * @param name the name to be given to the {@link MapReduce} program
     * @param mapReduce the {@link MapReduce} program to be included in the Application
     */
    void addMapReduce(String name, MapReduce mapReduce);
     
    /**
     * Adds a {@link Spark} to the Application.
     * @param name the name to be given to the {@link Spark} program
     * @param spark the {@link Spark} program to be included in the Application
     */
    void addSpark(String name, Spark spark);
    

    EventParser application:

    Code Block
    languagejava
    public class EventParserApp extends AbstractApplication {
      @Override
      public void configure() {
    	// Stream to receive send events
    	addStream(new Stream("send"));
     	// Stream to receive tracking events
    	addStream(new Stream("tracking"));
     
        // Add EventParser MapReduce program multiple times in the application with different properties
      	Map<String, String> properties = Maps.newHashMap();
    	properties.put("input.stream", "tracking");
    	// 'trackingParser' is instance of the EventParser which will read the 'tracking' stream 
    	addMapReduce("trackingParser", new EventParser(properties));
    
    	properties = Maps.newHashMap();
    	properties.put("input.stream", "send");
    	// 'sendParser' is instance of the EventParser which will read the 'send' stream 
    	addMapReduce("sendParser", new EventParser(properties));
     
    	// Add Workflow which will process the tracking and send events in parallel
    	addWorkflow(new EventParserWorkflow());
      }
    }

    EventParser MapReduce program:

    Code Block
    languagejava
    public class EventParser extends AbstractMapReduce {
    
      private final Map<String, String> properties;
      public EventParser(Map<String, String> properties) {
        this.properties = properties;
      }
     
      @Override
      public void configure() {
        setDescription("MapReduce program for parsing the email events and storing them in the dataset.");
        // Serialize the properties
      	setProperties(properties);
      	setOutputDataset("events");
      }
     
      @Override
      public void beforeSubmit(MapReduceContext context) throws Exception {
      	Job job = context.getHadoopJob();
      	job.setMapperClass(EventParserMapper.class);
      	job.setReducerClass(EventParserReducer.class);
    
      	job.setMapOutputKeyClass(Text.class);
      	job.setOutputValueClass(Event.class);
    
      	job.setNumReduceTasks(1);
    	String streamToVerify = context.getSpecification().getProperties().get("input.stream");
      	job.getConfiguration().set("input.stream", streamToVerify);
    
      	// Read the purchase events from the last 60 minutes as input to the mapper.
      	final long endTime = context.getLogicalStartTime();
      	final long startTime = endTime - TimeUnit.MINUTES.toMillis(60);
      	StreamBatchReadable.useStreamInput(context, streamToVerify, startTime, endTime);
      }
    }
     
    // EventParserMapper
    public static class EventParserMapper extends Mapper<LongWritable, Text, Text, Event>  {
    
      @Override
      public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String logEvent = value.toString();
        if (logEvent.isEmpty()) {
          return;
        }
        
        String inputStream = context.getConfiguration().get("input.stream");
        Event event;
        if(inputStream.equals("send")) {
          event = getSendEvent(logEvent);
        } else {
          event = getTrackingEvent(logEvent);
        }
        if (event != null) {
          context.write(new Text(event.getAudienceId()), event);
        }
      }
      
      private Event getSendEvent(String logEvent) {
        String seperator = "::";
        int fieldLength = 5;
        String[] fields = logEvent.split(seperator);
        if (fields.length != fieldLength) {
          return null;
        }
        String audienceId = fields[0];
        String eventType = fields[1];
        String ipAddress = fields[2];
        String deliveryCode = fields[3];
        String eventTime = fields[4];
        
        return new Event(audienceId, eventType, ipAddress, eventTime, deliveryCode)
      }
      
      private Event getTrackingEvent(String logEvent) {
        String seperator = ",";
        int fieldLength = 6;
        String[] fields = logEvent.split(seperator);
        if (fields.length != fieldLength) {
          return null;
        }
        String audienceId = fields[0];
        String eventType = fields[1];
        String ipAddress = fields[2];
        String deviceType = fields[3];
        String eventTime = fields[4];
        String link = fields[5];
        return new Event(audienceId, eventType, ipAddress, eventTime, deviceType + "&&" + link);
      }
    }
     

    EventParserWorkflow:

    Code Block
    languagejava
    public class EventParserWorkflow extends AbstractWorkflow {
      @Override
      protected void configure() {
        fork()
          .addMapReduce("trackingParser")
        .also()
          .addMapReduce("sendParser")
        .join();
      }
    }

     

    Step 2: WorkflowConfigurer interface changes

    With the API changes mentioned in the above section, same program (MapReduce/Spark) can be added multiple times in the Application with different names and parameters. These programs can be referred to in the Workflow. However in order to add same action multiple times in the Workflow, we still need to specify the unique name.  

    The WorkflowConfigurer API can be updated to allow a user to set a unique name for the Workflow action, if it occurs multiple times in a Workflow and use that unique name to retrieve the token.

    Code Block
     /**
       * {@link Workflow} consists of multiple {@link WorkflowNode}s.
       * Same Workflow action can be added multiple times in the {@link Workflow} at
       * different {@link WorkflowNode}s.
       * <p>
       * This method allows associating the uniqueName to the {@link WorkflowNode}
       * which represents the Workflow action. The uniqueName helps querying for the
       * values that were added to the {@link WorkflowToken} by the particular node.
       * <p>
       * The uniqueName must be unique across all {@link WorkflowNode} in the Workflow,
       * otherwise the Application deployment will fail.
       * @param uniqueName the uniqueName to be assigned to the {@link WorkflowNode}
       *                   which represents the Spark program
       * @param action the action to be added to the {@link Workflow}
       * @return the configurer for the current condition
       */
      WorkflowForkConfigurer<T> addAction(String uniqueName, WorkflowAction action);


    WorkflowToken can also be updated from a predicate on the condition node. In the presence of multiple condition nodes in a Workflow, we will need the ability to specify unique names for the conditions as well so that token values from specific condition nodes can be fetched. 

    Code Block
    /**
     * Adds a condition with the unique name to the {@link Workflow}.
     * @param conditionName the unique name to be assigned to the condition
     * @param condition     the {@link Predicate} to be evaluated for the condition
     * @return the configurer for the condition
     */
    WorkflowConditionConfigurer<? extends WorkflowConfigurer> condition(String conditionName, Predicate<WorkflowContext> condition);
  • Provide ability to set and get information in the WorkflowToken

    1. MapReduce program: Users should be able to access and modify WorkflowToken from "beforeSubmit" and "onFinish" methods of the MapReduce program. Since these methods get the MapReduceContext, we will need to update the MapReduceContext interface to get the WorkflowToken.

    Code Block
    /**
     * If {@link MapReduce} program is executed as a part of the {@link Workflow} 
     * then get the {@link WorkflowToken} associated with the current run, otherwise return null.  
     * @return the {@link WorkflowToken} if available
     */
    @Nullable
    WorkflowToken getWorkflowToken();


    Consider the following code sample to update the WorkflowToken in the MapReduce program:

    Code Block
    @Override
    public void beforeSubmit(MapReduceContext context) throws Exception {
      ...
      WorkflowToken workflowToken = context.getWorkflowToken();
      if (workflowToken != null) {
        // Put the action type in the WorkflowToken
        workflowToken.put("action_type", "MAPREDUCE");
        // Put the start time for the action
        workflowToken.put("startTime", String.valueOf(System.currentTimeMillis()));
      }
      ...
    }
     
    @Override
    public void onFinish(boolean succeeded, MapReduceContext context) throws Exception {
      ...
      WorkflowToken workflowToken = context.getWorkflowToken();
      if (workflowToken != null) {
        // Put the end time for the action 
        workflowToken.put("endTime", String.valueOf(System.currentTimeMillis()));
      }
      ...
    }
    

    2. Spark program: Users should be able to access and modify WorkflowToken from "beforeSubmit" and "onFinish" methods of the Spark program. Since these methods get the SparkContext, we will need to update the SparkContext interface to get the WorkflowToken.

     

    Code Block
    /**
     * If {@link Spark} program is executed as a part of the {@link Workflow}
     * then get the {@link WorkflowToken} associated with the current run, otherwise return null.
     * @return the {@link WorkflowToken} if available
     */
    @Nullable
    WorkflowToken getWorkflowToken();

    3. Custom Workflow action: Since custom workflow actions already receive WorkflowContext, no changes are anticipated in the interface.

    Following is the sample code to get values from the WorkflowToken in custom action:

    Code Block
    @Override
    public void run() {
      ...
      WorkflowToken token = getContext().getToken();
      // set the type of the action of the current node	
      token.put("action_type", "CUSTOM_ACTION");
     
      // Assume that we have the following Workflow 
     
      //												|------->PurchaseByCustomer------->|
      //										True	|								   |	
      // Start---->RecordVerifier---->Predicate-------->|								   |------------->StatusReporter----->End 	
      //								  |				| 								   |	    |
      //								  | False		|------->PurchaseByProduct-------->|        |
      //                                  |                        									|
      //                                  |--------------------->ProblemLogger--------------------->|
     
      
      // Use case 1: Predicate can add the key "branch" in the WorkflowToken with value as "true" if true branch will be executed
      // or "false" otherwise. In "StatusReporter" in order to get which branch in the Workflow was executed
      boolean bTrueBranch = Boolean.parseBoolean(token.get("branch"));
     
      // Use case 2: User may want to compare the records emitted by "PurchaseByCustomer" and "PurchaseByProduct", in order to find which job
      // is generating more records. 
      String flattenReduceOutputRecordsCounterName = "org.apache.hadoop.mapreduce.TaskCounter.REDUCE_OUTPUT_RECORDS";
      String purchaseByCustomerCounterValue = token.get(flattenReduceOutputRecordsCounterName, "PurchaseByCustomer", WorkflowToken.Scope.SYSTEM);
      String purchaseByProductCounterValue = token.get(flattenReduceOutputRecordsCounterName, "PurchaseByProduct", WorkflowToken.Scope.SYSTEM);
      
      // Use case 3: Since Workflow can have multiple complex conditions and forks in its structure, in the "StatusReporter", 
      // user may want to  know how many actions were executed as a part of this run. If the number of nodes executed were below
      // certain threshold send an alert. Assuming that every node in the Workflow adds the key "action_type" with the value as action
      // type for node in the WorkflowToken, user can further figure out the break down by action type in the particular Workflow run.
      List<NodeValueEntry> nodeValues = token.getAll("action_type");
      int totalNodeExecuted = nodeValues.size();
      int mapReduceNodes = 0;
      int sparkNodes = 0;
      int customActionNodes = 0;
      int conditions = 0;
      for (NodeValueEntry entry : nodeValues) {
        if (entry.getValue().equals("MAPREDUCE")) {
          mapReduceNodes++;
        } 
        if (entry.getValue().equals("SPARK")) {
          sparkNodes++;
        } 
        if (entry.getValue().equals("CUSTOM_ACTION")) {
          customActionNodes++;
        }
        if (entry.getValue().equals("CONDITION")) {
          conditions++;
        }
      }
    
      // Use case 4: To get the name of the last node which set the "ERROR" flag in the WorkflowToken
      List<NodeValueEntry> errorNodeValueList = token.getAll("ERROR");
      String nodeNameWhoSetTheErrorFlagLast = errorNodeValueList.get(errorNodeValueList.size() - 1);
     
      // To get the start time of the MapReduce program with unique name "PurchaseHistoryBuilder"
      String startTime = token.get("startTime", "PurchaseHistoryBuilder");
     
      // To get the most recent value of counter with group name
      // 'org.apache.hadoop.mapreduce.TaskCounter' and counter name 'MAP_INPUT_RECORDS'
      
      String flattenCounterKey = "mr.counters.org.apache.hadoop.mapreduce.TaskCounter.MAP_INPUT_RECORDS";
      workflowToken.get(flattenCounterKey, WorkflowToken.Scope.SYSTEM);
    
      // To get the value of counter with group name 'org.apache.hadoop.mapreduce.TaskCounter'
      // and counter name 'MAP_INPUT_RECORDS' as set by MapReduce program with unique name 'PurchaseHistoryBuilder'
      workflowToken.get(flattenCounterKey, "PurchaseHistoryBuilder", WorkflowToken.Scope.SYSTEM);
     ...
    }
  • WorkflowToken in presence of Fork and Join
    When a fork is encountered in the Workflow, we make a copy of the WorkflowToken and pass it along to each branch. At the join, we create a new WorkflowToken, which will be a merge of the WorkflowTokens associated with each of the branches of the fork. Since we are storing the information in the token at the node level, there will not be any conflicts during the merge process.

  • Persisting the WorkflowToken
    The RunRecord for the Workflow will contain the WorkflowToken as its property. This token will be updated before the execution of the action in the Workflow. We will add a version field to the RunRecord itself which will help in the upgrade process.

  • RESTful end-points to access the value of the WorkflowToken that was received by an individual node in the Workflow
    We will expose a RESTful end point to retrieve the token values that were set by a particular node as identified by its unique name.

     

    Code Block
    /apps/{app-id}/workflows/{workflow-name}/runs/{run-id}/nodes/{unique-node-name}/token
    REST APIResponseCommentsReviewed?
    /namespaces/{namespace-id}/apps/{app-id}/workflows/{workflow-name}/runs/{run-id}/token

    Json containing the entire workflow token for a particular workflow run e.g.

    Code Block
    {
      "tokenValueMap": {
        "key1": [
          {
            "nodeName": "node1",
            "value": "value1"
          },
          {
            "nodeName": "node2",
            "value": "value2"
          }
        ],
        "key2": [
          {
            "nodeName": "node2",
            "value": "v2"
          }
        ]
      }
    }

    Response Codes: 

    200 if successful
    404 if app/workflow not found
    500 if there is an internal error

      
    /namespaces/{namespace-id}/apps/{app-id}/workflows/{workflow-name}/runs/{run-id}/nodes/{unique-node-name}/token
    Code Block
    {
      "key1": "value1",
      "key2": "value2
    }

    Response Codes: 

    200 if successful
    404 if app/workflow not found
    500 if there is an internal error

      

...