Pipeline Payload Passing

Checklist

  • User Stories Documented
  • User Stories Reviewed
  • Design Reviewed
  • APIs reviewed
  • Release priorities assigned
  • Test cases reviewed
  • Blog post

Introduction

This design provides the capability for passing information between the triggering pipeline and triggered pipeline based on the program status based scheduling.

Goals

Provides clear API and backend design for passing program status change event payload between pipelines. 

User Stories 

  • Pipeline B is triggered when Pipeline A completes, and Pipeline B also needs the username and password used by Pipeline A
  • Pipeline B is triggered when Pipeline A completes, and Pipeline B also needs the stream id Pipeline A reads from
  • Pipeline B is triggered when Pipeline A completes, and Pipeline B also needs the schema of a sink Pipeline A writes to.

Design

A new method will be provided in RuntimeContext to return a new API class TriggeringScheduleInfo to provide information of the schedule which launches the program. All the runtime arguments, stage configurations, user workflow tokens of the pipeline run which triggers the schedule will be included in the . To use these properties, users can define a mapping between a certain property from the triggering program and a runtime argument in the triggered program. Such mapping should be stored in the schedule properties with the key "triggering.properties.mapping". The app containing the triggered program should be able to decode the value contained in the field "triggering.properties.mapping" and get the corresponding properties from TriggeringScheduleInfo and use the values as runtime arguments according to the properties mapping.

For pipeline app, the syntax for referring to different properties in the  filed "triggering.properties.mapping" is defined below:

The syntax for runtime args from the triggering pipeline is a JSON representation of the TriggeringPipelinePropertyMap class (see API changes section) :

{
  "arguments": [
    {
      "source": runtime arg in triggering pipeline,
      "target": what to rename it to // optional
    },
    ...
  ],
  "pluginProperties": [
    {
      "stageName": stage name in triggering pipeline,
      "source": name of the property for that stage,
      "target": what to rename it to // optional
    },
    ...
  ]
}

 

Approach

Here's an example use case to illustrate the approach: Pipeline A in namespace Default triggers Pipeline B when Pipeline A completes. Pipeline B needs the runtime args hostname from Pipeline A as the value of its host field in runtime args. For instance, For instance, Pipeline A contains runtime args pair "hostname" -> "wiki.cask.co" , then in Pipeline B, the value of "host" should be "wiki.cask.co"

When setting up program status schedule, the following JSON String is stored as a value of the field "triggering.properties.mapping" in the schedule properties:

{
  "arguments": [
    {
      "source": "hostname"
      "target": "host"
    }
  ],
  "pluginproperties": []
}

When the notification of Pipeline A's completion triggers the schedule to launch Pipeline B, in the Pipeline app of Pipeline B, the field "triggering.properties.mapping" is extracted from schedule properties. From the decoded map, Pipeline app recognizes the entry in arguments list means a runtime argument with key "hostname" in the Pipeline with name A  in the default namespace. Pipeline app thus looks this runtime argument value in Pipeline A from TriggeringScheduleInfo from the RuntimeContext , and provides it as the value for runtime argument "host" in Pipeline B

API changes

New Programmatic APIs

 

 

RuntimeContext
/**
 * This interface represents a context for a processor or elements of a processor.
 */
public interface RuntimeContext {

...

  /**
   * @return {@link TriggeringScheduleInfo} if the program is triggered by a schedule. Otherwise, returns {@code null}.
   */
  @Nullable
  TriggeringScheduleInfo getTriggeringScheduleInfo();
 
...
 
}
ScheduleInfo
/**
 * The information of a schedule that can be used by the program launched by the schedule.
 */
public class TriggeringScheduleInfo {

  private final String name;
  private final String description;
  private final TriggerInfo triggerInfo;
  private final Map<String, String> properties;

  public TriggeringScheduleInfo(String name, String description, TriggerInfo triggerInfo,
                                Map<String, String> properties) {
    this.name = name;
    this.description = description;
    this.properties = properties;
    this.triggerInfo = triggerInfo;
  }

  /**
   * @return Schedule's name, which is unique in an application.
   */
  public String getName() {
    return name;
  }

  /**
   * @return Description of the schedule.
   */
  public String getDescription() {
    return description;
  }

  /**
   * @return Information of the trigger contained in this schedule.
   */
  public TriggerInfo getTriggerInfo() {
    return triggerInfo;
  }

  /**
   * @return Properties of the schedule.
   */
  public Map<String, String> getProperties() {
    return properties;
  }
}
TriggerInfo
/**
 * Base class for the trigger information to be passed to the triggered program.
 */
public abstract class TriggerInfo {
  private final Trigger.Type type;

  public TriggerInfo(Trigger.Type type) {
    this.type = type;
  }

  /**
   * @return The type of the trigger.
   */
  public Trigger.Type getType() {
    return type;
  }
}
ProgramStatusTriggerInfo
/**
 * The program status trigger information to be passed to the triggered program.
 */
public class ProgramStatusTriggerInfo extends TriggerInfo {

  private final String namespace;
  private final ApplicationSpecification applicationSpecification;
  private final ProgramType programType;
  private final String program;
  private final Set<ProgramStatus> triggerStatues;
  @Nullable
  private final String runId;
  @Nullable
  private final ProgramStatus programStatus;
  @Nullable
  private final WorkflowToken workflowToken;
  @Nullable
  private final Map<String, String> runtimeArguments;

  public ProgramStatusTriggerInfo(String namespace, ApplicationSpecification applicationSpecification,
                                  ProgramType programType, String program, Set<ProgramStatus> triggerStatues,
                                  @Nullable String runId, @Nullable ProgramStatus programStatus,
                                  @Nullable WorkflowToken workflowToken,
                                  @Nullable Map<String, String> runtimeArguments) {
    super(Trigger.Type.PROGRAM_STATUS);
    this.namespace = namespace;
    this.applicationSpecification = applicationSpecification;
    this.programType = programType;
    this.program = program;
    this.triggerStatues = triggerStatues;
    this.runId = runId;
    this.programStatus = programStatus;
    this.workflowToken = workflowToken;
    this.runtimeArguments = runtimeArguments;
  }

  /**
   * @return The namespace of the triggering program.
   */
  public String getNamespace() {
    return namespace;
  }

  /**
   * @return The application specification of the application that contains the triggering program.
   */
  public ApplicationSpecification getApplicationSpecification() {
    return applicationSpecification;
  }

  /**
   * @return The program type of the triggering program.
   */
  public ProgramType getProgramType() {
    return programType;
  }

  /**
   * @return The program name of the triggering program.
   */
  public String getProgram() {
    return program;
  }

  /**
   * @return All the program statuses that can satisfy the program status trigger.
   */
  public Set<ProgramStatus> getTriggerStatues() {
    return triggerStatues;
  }

  /**
   * @return The program run Id of the triggering program run that can satisfy the program status trigger,
   *         or {@code null} if there is no such run.
   */
  @Nullable
  public String getRunId() {
    return runId;
  }

  /**
   * @return The program status of the triggering program run that can satisfy the program status trigger,
   *         or {@code null} if there is no such run.
   */
  @Nullable
  public ProgramStatus getProgramStatus() {
    return programStatus;
  }

  /**
   * @return The workflow token if the program is a workflow with a run that can
   *         satisfy the program status trigger, or an empty workflow token if there's no such run.
   *         Return {@code null} if the program is not a workflow.
   */
  @Nullable
  public WorkflowToken getWorkflowToken() {
    return workflowToken;
  }

  /**
   * @return The runtime arguments of the triggering program run that can satisfy the program status trigger,
   *         or {@code null} if there is no such run.
   */
  @Nullable
  public Map<String, String> getRuntimeArguments() {
    return runtimeArguments;
  }
}
TriggeringPipelinePropertyId
/**
 * Base class for identifiers of properties from the triggering pipeline.
 */
public abstract class TriggeringPipelinePropertyId {

  /**
   * The type of the triggering pipeline property
   */
  public enum Type {
    RUNTIME_ARG,
    PLUGIN_PROPERTY,
    TOKEN
  }

  private final Type type;
  private final String namespace;
  private final String pipelineName;

  public TriggeringPipelinePropertyId(Type type, String namespace, String pipelineName) {
    this.type = type;
    this.namespace = namespace;
    this.pipelineName = pipelineName;
  }

  /**
   * @return Type of the triggering pipeline property.
   */
  public Type getType() {
    return type;
  }

  /**
   * @return Namespace of the triggering pipeline.
   */
  public String getNamespace() {
    return namespace;
  }

  /**
   * @return Names of the triggering pipeline.
   */
  public String getPipelineName() {
    return pipelineName;
  }
}
TriggeringPipelinePropertyId
 /**
 * Identifier of runtime arguments from the triggering pipeline.
 */
public class TriggeringPipelineRuntimeArgId extends TriggeringPipelinePropertyId {
  private final String runtimeArgumentKey;

  public TriggeringPipelineRuntimeArgId(String namespace, String pipelineName, String runtimeArgumentKey) {
    super(Type.RUNTIME_ARG, namespace, pipelineName);
    this.runtimeArgumentKey = runtimeArgumentKey;
  }

  /**
   * @return The key of the runtime argument in the triggering pipeline.
   */
  public String getRuntimeArgumentKey() {
    return runtimeArgumentKey;
  }
}
TriggeringPipelinePropertyId
/**
 * Identifier of plugin property from the triggering pipeline.
 */
public class TriggeringPipelinePluginPropertyId extends TriggeringPipelinePropertyId {
 private final String pluginName;
 private final String propertyKey;

 public TriggeringPipelinePluginPropertyId(String namespace, String pipelineName,
 String pluginName, String propertyKey) {
 super(Type.PLUGIN_PROPERTY, namespace, pipelineName);
 this.pluginName = pluginName;
 this.propertyKey = propertyKey;
 }

 /**
 * @return The name of the plugin in the triggering pipeline.
 */
 public String getPluginName() {
 return pluginName;
 }

 /**
 * @return The key of the plugin property in the triggering pipeline.
 */
 public String getPropertyKey() {
 return propertyKey;
 }
}
TriggeringPipelinePropertyId
 /**
 * Identifier of token from the triggering pipeline.
 */
public class TriggeringPipelineTokenId extends TriggeringPipelinePropertyId {
  private final String tokenKey;
  private final String nodeName;

  public TriggeringPipelineTokenId(String namespace, String pipelineName, String tokenKey, String nodeName) {
    super(Type.TOKEN, namespace, pipelineName);
    this.tokenKey = tokenKey;
    this.nodeName = nodeName;
  }

  /**
   * @return The key of the token in the triggering pipeline.
   */
  public String getTokenKey() {
    return tokenKey;
  }

  /**
   * @return The name of the node where the token is generated in the triggering pipeline.
   */
  public String getNodeName() {
    return nodeName;
  }
}

 

UI Impact or Changes

Security Impact 

What's the impact on Authorization and how does the design take care of this aspect

Impact on Infrastructure Outages 

System behavior (if applicable - document impact on downstream [ YARN, HBase etc ] component failures) and how does the design take care of these aspect

Test Scenarios

Test IDTest DescriptionExpected Results
   
   
   
   

Releases

Release X.Y.Z

Release X.Y.Z

Related Work

  • Work #1
  • Work #2
  • Work #3

 

Future work

Checklist

  • User Stories Documented
  • User Stories Reviewed
  • Design Reviewed
  • APIs reviewed
  • Release priorities assigned
  • Test cases reviewed
  • Blog post

Introduction

This design provides the capability for passing information between the triggering pipeline and triggered pipeline based on the program status based scheduling.

Goals

Provides clear API and backend design for passing program status change event payload between pipelines. 

User Stories 

  • Pipeline B is triggered when Pipeline A completes, and Pipeline B also needs the username and password used by Pipeline A
  • Pipeline B is triggered when Pipeline A completes, and Pipeline B also needs the stream id Pipeline A reads from
  • Pipeline B is triggered when Pipeline A completes, and Pipeline B also needs the schema of a sink Pipeline A writes to.

Design

A new method will be provided in RuntimeContext to return a new API class TriggeringScheduleInfo to provide information of the schedule which launches the program. All the runtime arguments, stage configurations, user workflow tokens of the pipeline run which triggers the schedule will be included in the . To use these properties, users can define a mapping between a certain property from the triggering program and a runtime argument in the triggered program. Such mapping should be stored in the schedule properties with the key "triggering.properties.mapping". The app containing the triggered program should be able to decode the value contained in the field "triggering.properties.mapping" and get the corresponding properties from TriggeringScheduleInfo and use the values as runtime arguments according to the properties mapping.

For pipeline app, the syntax for referring to different properties in the  filed "triggering.properties.mapping" is defined below:

The syntax for runtime args from the triggering pipeline is a JSON representation of the TriggeringPipelinePropertyMap class (see API changes section) :

{
  "arguments": [
    {
      "source": runtime arg in triggering pipeline,
      "target": what to rename it to // optional
    },
    ...
  ],
  "pluginproperties": [
    {
      "stageName": stage name in triggering pipeline,
      "source": name of the property for that stage,
      "target": what to rename it to // optional
    },
    ...
  ]
}

 

Approach

Here's an example use case to illustrate the approach: Pipeline A in namespace Default triggers Pipeline B when Pipeline A completes. Pipeline B needs the runtime args hostname from Pipeline A as the value of its host field in runtime args. For instance, For instance, Pipeline A contains runtime args pair "hostname" -> "wiki.cask.co" , then in Pipeline B, the value of "host" should be "wiki.cask.co"

When setting up program status schedule, an entry "runtime-arg:default:A#hostname" -> "host" will be stored in a map, and this map will be converted to a JSON String to be stored as a value of the field "triggering.properties.mapping" in the schedule properties.

{
  "arguments": [
    {
      "source": "hostname"
      "target": "host"
    },
    ...
  ],
  "pluginproperties": []
}

When the notification of Pipeline A's completion triggers the schedule to launch Pipeline B, in the Pipeline app of Pipeline B, the field "triggering.properties.mapping" is extracted from schedule properties. From the decoded map, Pipeline app recognizes the entry in 

arguments

 means a runtime argument with key "hostname" in the Pipeline with name A  in the default namespace. Pipeline app thus looks this runtime argument value in Pipeline A from TriggeringScheduleInfo from the RuntimeContext , and provides it as the value for runtime argument "host" in Pipeline B

API changes

New Programmatic APIs

 

 

RuntimeContext
/**
 * This interface represents a context for a processor or elements of a processor.
 */
public interface RuntimeContext {

...

  /**
   * @return {@link TriggeringScheduleInfo} if the program is triggered by a schedule. Otherwise, returns {@code null}.
   */
  @Nullable
  TriggeringScheduleInfo getTriggeringScheduleInfo();
 
...
 
}
ScheduleInfo
/**
 * The information of a schedule that can be used by the program launched by the schedule.
 */
public class TriggeringScheduleInfo {

  private final String name;
  private final String description;
  private final TriggerInfo triggerInfo;
  private final Map<String, String> properties;

  public TriggeringScheduleInfo(String name, String description, TriggerInfo triggerInfo,
                                Map<String, String> properties) {
    this.name = name;
    this.description = description;
    this.properties = properties;
    this.triggerInfo = triggerInfo;
  }

  /**
   * @return Schedule's name, which is unique in an application.
   */
  public String getName() {
    return name;
  }

  /**
   * @return Description of the schedule.
   */
  public String getDescription() {
    return description;
  }

  /**
   * @return Information of the trigger contained in this schedule.
   */
  public TriggerInfo getTriggerInfo() {
    return triggerInfo;
  }

  /**
   * @return Properties of the schedule.
   */
  public Map<String, String> getProperties() {
    return properties;
  }
}
TriggerInfo
/**
 * Base class for the trigger information to be passed to the triggered program.
 */
public abstract class TriggerInfo {
  private final Trigger.Type type;

  public TriggerInfo(Trigger.Type type) {
    this.type = type;
  }

  /**
   * @return The type of the trigger.
   */
  public Trigger.Type getType() {
    return type;
  }
}
ProgramStatusTriggerInfo
/**
 * The program status trigger information to be passed to the triggered program.
 */
public class ProgramStatusTriggerInfo extends TriggerInfo {

  private final String namespace;
  private final ApplicationSpecification applicationSpecification;
  private final ProgramType programType;
  private final String program;
  private final Set<ProgramStatus> triggerStatues;
  @Nullable
  private final String runId;
  @Nullable
  private final ProgramStatus programStatus;
  @Nullable
  private final WorkflowToken workflowToken;
  @Nullable
  private final Map<String, String> runtimeArguments;

  public ProgramStatusTriggerInfo(String namespace, ApplicationSpecification applicationSpecification,
                                  ProgramType programType, String program, Set<ProgramStatus> triggerStatues,
                                  @Nullable String runId, @Nullable ProgramStatus programStatus,
                                  @Nullable WorkflowToken workflowToken,
                                  @Nullable Map<String, String> runtimeArguments) {
    super(Trigger.Type.PROGRAM_STATUS);
    this.namespace = namespace;
    this.applicationSpecification = applicationSpecification;
    this.programType = programType;
    this.program = program;
    this.triggerStatues = triggerStatues;
    this.runId = runId;
    this.programStatus = programStatus;
    this.workflowToken = workflowToken;
    this.runtimeArguments = runtimeArguments;
  }

  /**
   * @return The namespace of the triggering program.
   */
  public String getNamespace() {
    return namespace;
  }

  /**
   * @return The application specification of the application that contains the triggering program.
   */
  public ApplicationSpecification getApplicationSpecification() {
    return applicationSpecification;
  }

  /**
   * @return The program type of the triggering program.
   */
  public ProgramType getProgramType() {
    return programType;
  }

  /**
   * @return The program name of the triggering program.
   */
  public String getProgram() {
    return program;
  }

  /**
   * @return All the program statuses that can satisfy the program status trigger.
   */
  public Set<ProgramStatus> getTriggerStatues() {
    return triggerStatues;
  }

  /**
   * @return The program run Id of the triggering program run that can satisfy the program status trigger,
   *         or {@code null} if there is no such run.
   */
  @Nullable
  public String getRunId() {
    return runId;
  }

  /**
   * @return The program status of the triggering program run that can satisfy the program status trigger,
   *         or {@code null} if there is no such run.
   */
  @Nullable
  public ProgramStatus getProgramStatus() {
    return programStatus;
  }

  /**
   * @return The workflow token if the program is a workflow with a run that can
   *         satisfy the program status trigger, or an empty workflow token if there's no such run.
   *         Return {@code null} if the program is not a workflow.
   */
  @Nullable
  public WorkflowToken getWorkflowToken() {
    return workflowToken;
  }

  /**
   * @return The runtime arguments of the triggering program run that can satisfy the program status trigger,
   *         or {@code null} if there is no such run.
   */
  @Nullable
  public Map<String, String> getRuntimeArguments() {
    return runtimeArguments;
  }
}
TriggeringPipelinePropertyId
/**
 * Base class for identifiers of properties from the triggering pipeline.
 */
public abstract class TriggeringPipelinePropertyId {

  /**
   * The type of the triggering pipeline property
   */
  public enum Type {
    RUNTIME_ARG,
    PLUGIN_PROPERTY,
    TOKEN
  }

  private final Type type;
  private final String namespace;
  private final String pipelineName;

  public TriggeringPipelinePropertyId(Type type, String namespace, String pipelineName) {
    this.type = type;
    this.namespace = namespace;
    this.pipelineName = pipelineName;
  }

  /**
   * @return Type of the triggering pipeline property.
   */
  public Type getType() {
    return type;
  }

  /**
   * @return Namespace of the triggering pipeline.
   */
  public String getNamespace() {
    return namespace;
  }

  /**
   * @return Names of the triggering pipeline.
   */
  public String getPipelineName() {
    return pipelineName;
  }
}
TriggeringPipelinePropertyId
 /**
 * Identifier of runtime arguments from the triggering pipeline.
 */
public class TriggeringPipelineRuntimeArgId extends TriggeringPipelinePropertyId {
  private final String runtimeArgumentKey;

  public TriggeringPipelineRuntimeArgId(String namespace, String pipelineName, String runtimeArgumentKey) {
    super(Type.RUNTIME_ARG, namespace, pipelineName);
    this.runtimeArgumentKey = runtimeArgumentKey;
  }

  /**
   * @return The key of the runtime argument in the triggering pipeline.
   */
  public String getRuntimeArgumentKey() {
    return runtimeArgumentKey;
  }
}
TriggeringPipelinePropertyId
/**
 * Identifier of plugin property from the triggering pipeline.
 */
public class TriggeringPipelinePluginPropertyId extends TriggeringPipelinePropertyId {
 private final String pluginName;
 private final String propertyKey;

 public TriggeringPipelinePluginPropertyId(String namespace, String pipelineName,
 String pluginName, String propertyKey) {
 super(Type.PLUGIN_PROPERTY, namespace, pipelineName);
 this.pluginName = pluginName;
 this.propertyKey = propertyKey;
 }

 /**
 * @return The name of the plugin in the triggering pipeline.
 */
 public String getPluginName() {
 return pluginName;
 }

 /**
 * @return The key of the plugin property in the triggering pipeline.
 */
 public String getPropertyKey() {
 return propertyKey;
 }
}
TriggeringPipelinePropertyId
 /**
 * Identifier of token from the triggering pipeline.
 */
public class TriggeringPipelineTokenId extends TriggeringPipelinePropertyId {
  private final String tokenKey;
  private final String nodeName;

  public TriggeringPipelineTokenId(String namespace, String pipelineName, String tokenKey, String nodeName) {
    super(Type.TOKEN, namespace, pipelineName);
    this.tokenKey = tokenKey;
    this.nodeName = nodeName;
  }

  /**
   * @return The key of the token in the triggering pipeline.
   */
  public String getTokenKey() {
    return tokenKey;
  }

  /**
   * @return The name of the node where the token is generated in the triggering pipeline.
   */
  public String getNodeName() {
    return nodeName;
  }
}

 

UI Impact or Changes

Security Impact 

What's the impact on Authorization and how does the design take care of this aspect

Impact on Infrastructure Outages 

System behavior (if applicable - document impact on downstream [ YARN, HBase etc ] component failures) and how does the design take care of these aspect

Test Scenarios

Test IDTest DescriptionExpected Results
   
   
   
   

Releases

Release X.Y.Z

Release X.Y.Z

Related Work

  • Work #1
  • Work #2
  • Work #3

 

Future work

Created in 2020 by Google Inc.