Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


Table of Contents

Checklist

  •  User Stories Documented
  •  User Stories Reviewed
  •  Design Reviewed
  •  APIs reviewed
  •  Release priorities assigned
  •  Test cases reviewed
  •  Blog post

Introduction 

As the data pipeline experience has become more sophisticated, it has become clear that there are a good amount of capabilities required during the pipeline creation process. Some of these capabilities are provided by the CDAP platform if they are generic enough, such as the listing of plugins and their properties. However, there are a class of capabilities that are specific only to pipelines, such as pipeline validation, schema propagation, plugin templates, and pipeline drafts. These have all been implemented with non-trivial logic in the UI. This type of pipeline specific logic does not belong in the UI, but in a backend service that the UI can call. This These types of features were explored using the plugin endpoint feature (for get schema buttons), but that has turned out to be difficult to handle errors correctly and ends up running user code in the CDAP master. The lack of this pipeline backend layer has slowed UI development, contributed to bugs in the product, and added a lot of technical debt. By moving much of this logic to a system application, overall development speed will increase and the door will be opened to a richer set of features in the future. 

Goals

Design the pipeline system application to remove tech debt from the UI and build an architecture that supports a richer set of pipeline specific product requirements in the future.

User Stories 

  1. As a pipeline developer, I want to be able to validate a pipeline before deploying it and know exactly which stages and which fields are invalid and why
  2. As a pipeline developer, I want to be able to validate a pipeline stage and know exactly which fields
is invalid
  1. are invalid and why
  2. As a pipeline developer, I want to be able to debug a single pipeline stage by specifying input and examining output and/or errors
  3. As a pipeline developer, I want the schema
everywhere in the pipeline to automatically update without manually propagating it
  • As a pipeline developer, I want to be able to create templates from a plugin that have fields pre-filled
  • As a pipeline developer, I want to be able to debug a single pipeline stage by specifying input and examining output and/or errors
  • Design

    Cover details on assumptions made, design alternatives considered, high level design

    Approach

    Approach #1

    Approach #2

    API changes

    New Programmatic APIs

    New Java APIs introduced (both user facing and internal)

    Deprecated Programmatic APIs
    1. displayed by the UI to always match what is used during execution
    2. As a pipeline developer, I want to be able to import a pipeline spec with missing artifacts and have the option to automatically update those versions
    3. As a plugin developer, I want schema propagation to be defined in a single place
    4. As a CDAP administrator, I don't want user code to run in the CDAP master

    Design

    A new system application will be introduced to provide much of the more complex logic that is currently handled by the UI. The data pipeline code will be enhanced with a Service program to expose a few APIs. Where possible, APIs will be stateless. The application will be deployed at startup to the system namespace through a bootstrap action. All REST endpoints described are prefixed by 'v3/namespaces/system/apps/pipeline/services/studio/methods/v1'. 

    Validate Pipeline

    Pipeline validation is already done by the application when a pipeline is deployed. The logic just needs to be refactored so that it can be run during application configure time and in a service method call. The request body is exactly the same as the request to deploy a pipeline

    No Format
    POST v1/contexts/<namespace-id>/validations/pipeline
    
    Request Body:
    {
      "artifact": {
        "scope": "SYSTEM",
        "name": "cdap-data-pipeline",
        "version": "6.0.0"
      }
      "config": {
        "stages": [ ... ],
        "connections": [ ... ],
        ...
      }
    }
    
    Response:
    {
      "errors": [
        {
          "type": "Invalid Field",
          "message": "...",
          // type specific fields
        }
      ],
      "spec": {
        "stages": [
          {
            "name": "stageName",
            "plugin": {
              "type": "pluginType",
              "name": "pluginName",
              "properties": {
                "name": "value"
              },
              "artifact": {
                "name": "core-plugins",
                "scope": "SYSTEM" | "USER",
                "version": "1.0.0"
              },
              "inputSchemas": {
                "[inputStageName]": { schema object }
              },
              "outputPorts": {
                "[outputStageName]": {
                  "port": "portName",
                  "schema": "portSchema"
                }
              },
              "outputSchema": { schema object },
              "errorSchema": { schema object }
            }
          },
          ...
        ],
        "connections": [
          {
            "from": "inputStageName",
            "to": "outputStageName",
            "port": "outputPort",
            "condition": true | false
          },
          ...
        ],
        "resources": {
          "virtualCores": 1,
          "memoryMB": 1024
        },
        "driverResources": { ... },
        "clientResources": { ... },
        "stageLoggingEnabled": true | false,
        "processTimingEnabled": true | false,
        "properties": {
          "name": "val",
          ...
        },
        "endingActions": [
          {
            "name": "postaction name",
            "plugin": {
              "type": "postaction",
              "name": "email",
              "properties": { ... },
              "artifact": { ... }
            }
          }
        ]
      }
    }

    Namespace is required in the request in order for the app to pick up the correct plugin, since plugin artifacts can be in user scope.

    The response contains whether the pipeline is valid, errors if it is invalid, and the spec if it is valid. This is the same spec that can be found in the program properties of a deployed pipeline. It is similar to the pipeline config except it contains the exact plugin artifact that will be used (different than input if the input contains a version range or missing field), and it is enhanced with more information about schema.

    If the error is not specific to a stage (for example, the pipeline structure is invalid), the stage name and field name will be missing. In order to capture this information, the plugin APIs will be extended to include a validate method that can throw a specific type of exception. This method will be called at configure time when the pipeline is deployed, as well as when the validate service endpoint is called.

    Code Block
    public class InvalidStageException extends RuntimeException {
      private final List<Exception> errors;
    
      public InvalidStageException(Collection<Exception> errors);
    
      private final List<Exception> getErrors();
    }
    
    public class InvalidFieldException extends RuntimeException {
      private final String fieldName;
      
      public InvalidFieldException(String fieldName, String message, Throwable cause) { 
        super(message, cause);
        this.fieldName = fieldName;
      }
    
      public String getFieldName() {
        return fieldName;
      }
    }
    
    
    public interface PipelineConfigurable {
      
      void configurePipeline(PipelineConfigurer pipelineConfigurer) throws InvalidStageException;
    
    }

    This allows plugins to write validation code that looks something like:

    Code Block
    @Override
    public void propagateSchema(StageConfigurer stageConfigurer) throws InvalidStageException {
      List<Exception> configErrors = new ArrayList<>();
      if (config.getNumPartitions() < 1) {
        configErrors.add(new InvalidFieldException("numPartitions", "The number of partitions must be at least 1."));
      }
      if (!configErrors.isEmpty()) {
        throw new InvalidStageException(configErrors);
      }
    }

    Which would result in errors in the REST response like:

    No Format
    "errors": [
      {
        "type": "Invalid Field",
        "message": "The number of partitions must be at least 1.",
        "stage": "stage-4",
        "field": "numPartitions"
      }
    ]

    Another type of error could be due to a plugin artifact that is missing. This specific type of error is fairly common during pipeline import, when a user imports a pipeline from a different version of CDAP, and the artifact versions do not match.  The error would look like:

    No Format
    "errors": [
      {
        "stage": "stage-5",
        "type": "Artifact Not Found",
        "message": "No plugin named 'table' could be found.",
        "suggestedArtifact": {
          "scope": "SYSTEM",
          "name": "core-plugins",
          "version": "6.0.0"
        }
      }
    ]

    Validate Stage

    Validation for a single stage is very similar to that of an entire pipeline. Instead of passing the entire pipeline config, the client passes just the input schema and the stage config. Instead of responding with the entire pipeline spec, just stage spec is returned.

    No Format
    POST v1/contexts/<namespace-id>/validations/stage
    
    Request Body:
    {
      "stage": {
        "name": "stageName",
        "plugin": {
          "type": "pluginType",
          "name": "pluginName",
          "artifact": {
            "scope": "SYSTEM" | "USER",
            "name": "core-plugins",
            "version": "1.0.0"
          },
          "properties": {
            "name": "val",
            ...
          }
        }
      },
      "inputSchemas": [
        {
          "stage": "abc",
          "schema": { schema object}
        },
        ...
      ]       
    }
    
    Response:
    {
      "errors": [
        {
          "type": "Invalid Field",
          "message": "..."
        }
      ],
      "spec": {
        "name": "stageName",
        "plugin": {
          "type": "pluginType",
          "name": "pluginName",
          "properties": {
            "name": "value"
          },
          "artifact": {
            "name": "core-plugins",
            "scope": "SYSTEM" | "USER",
            "version": "1.0.0"
          },
          "inputSchemas": {
            "[inputStageName]": { schema object }
          },
          "portSchemas": {
            "[portName]": { schema object } 
          }
          "outputPorts": {
            "[outputStageName]": {
              "port": "portName",
              "schema": "portSchema"
            }
          },
          "outputSchema": { schema object },
          "errorSchema": { schema object }
        }
      }
    }

    This endpoint can be used to automatically update the output schema of a stage after a user changes the configuration of the stage. This would replace the 'Get Schema' button that is present in several plugins, removing the possibility of an invalid pipeline because somebody forgot to click the button.

    Stage Debugging (Future Work)

    It is often useful to be able to send input into a single pipeline stage and see what it would output. This is useful to debug a pipeline stage, and can be useful just to figure out how exactly a pipeline behaves. The request body is similar to the validate stage body, except it contains an additional section defining what the input stages are, the schema of the records from that stage, and the records from that stage. The response contains the output records, error records, or any exception that was generated while attempting to generate the output. This will be implemented initially for transform plugin types, and can be expanded as needed.

    No Format
    POST v1/contexts/<namespace-id>/validations/output
    
    
    Request Body:
    {
      "namespace": "default",
      "stage": {
        "name": "stageName",
        "plugin": {
          "type": "pluginType",
          "name": "pluginName",
          "artifact": {
            "scope": "SYSTEM" | "USER",
            "name": "core-plugins",
            "version": "1.0.0"
          },
          "properties": {
            "name": "val",
            ...
          }
        }
      },
      "input": [
        {
          "stage": "input stage name",
          "schema": { schema of data coming from that stage },
          "records": [
            {
              "fieldName": type appropriate value
            },
            ...
          ]
        },
        ...
      ]       
    }
    
    
    Response:
    {
      "errors": [
        {
          "message": "exception message",
          "trace": "exception stack trace"
        },
        ...
      ],
      "output": {
        "schema": { output schema },
        "records": [
          {
            "fieldName": type appropriate value
          },
          ...
        ] 
      }
    }

    Plugin Endpoints (Future Work)

    Plugins used to use a Plugin Endpoints feature to run custom code in the cdap master to calculate output schema. The plugin code would look something like:

    Code Block
    public class MyPlugin extends Transform {
      private final MyConfig conf;
      
      // called when 'Get Schema' button is pressed
      @Path("getSchema")
      public Schema getSchema(SchemaRequest request, EndpointPluginContext context) throws Exception {
        ...
      }
    
    
      static class SchemaRequest extends MyConf {
        private final Schema inputSchema;
      }
    }

    The current implementation of this feature is going to be removed, as it executes user code in the cdap master, and it does a poor job of error handling. 

    The new validation endpoint now removes the need for the 'get schema' use case, but there still might be times when it makes sense for a plugin to provide additional functionality that is specific to that plugin and not generic across all plugins. For example, the predictor plugin from MMDS might want to expose an endpoint that lists the available experiments, or lists the available models within an experiment. 

    Now that these plugins are actually instantiated within a CDAP Service program, these endpoints can be added as a system service API

    Code Block
    POST v1/contexts/<namespace-id>/methods/{method-name}
    
    
    Request Body:
    {
      "plugin": {
        "type": "pluginType",
        "name": "pluginName",
        "artifact": {
          "scope": "SYSTEM" | "USER",
          "name": "core-plugins",
          "version": "1.0.0"
        }
      },
      "request": { // json representation of the object passed into the plugin endpoint method
        "inputSchema": { ... },
        "x": "y",
        ...
      }     
    }

     This means there is a restriction where the Path must be a single element – it cannot contain a '/'.


    Plugin Changes

    Currently, many plugins perform validation and schema propagation in multiple places. They will often look something like:

    Code Block
    public class MyPlugin extends Transform {
      private final MyConfig conf;
      
      @Override
      public void configurePipeline(PipelineConfigurer configurer) {
        // validate config properties that were not macros
        Schema inputSchema = configurer.getStageConfigurer().getInputSchema();
        validate(inputSchema);
        configurer.getStageConfigurer().setOutputSchema(getOutputSchema(inputSchema));
        // setup datasets
        ...
      }
    
      @Override
      public void prepareRun(BatchSinkContext context) {
        // validate again to validate any config properties that were macros
        validate(context.getInputSchema());
        // setup run
        ...
      }
       
      // called when 'Get Schema' button is pressed
      @Path("getSchema")
      public Schema getSchema(SchemaRequest request) throws Exception {
        validate(request.inputSchema);
        return getOutputSchema(request.inputSchema);
      }
    
      private void validate(@Nullable Schema inputSchema) {
        if (!conf.containsMacro("f1") && conf.getF1() >= 100) {
          throw new IllegalArgumentException("f1 must be less than 100);
        }
        ...
      }
    
      private Schema getOutputSchema(@Nullable Schema inputSchema) {
        // generate output schema based on config properties
      }
    
    
      static class SchemaRequest extends MyConf {
        private final Schema inputSchema;
      }
    }
    
    

    Validation is done at configure time to catch any errors as early as possible. It is also done when preparing a run, as many properties could have been macros. It is also done when getting the output schema for the 'Get Schema' button. Similarly, schema propagation is done at configure time when possible, and for the 'Get Schema' button. It should also be done at prepare time (was never added due to time), since schema can be a macro as well.

    There is a lot of duplication. The contract for configurePipeline() can be extended so that it can be called before pipeline deployment for validation and schema propagation. In addition, it can be called when preparing a run so that full validation can take place now that macros have been evaluated, and schema that couldn't have been determined at deployment can now be propagated. This removes the need for the plugin developer to explicitly validate in two places, and removes the need to explicitly get schema in two places. The code is then simplified to just be:

    Code Block
    public class MySource extends Transform {
      private final MyConfig conf;
      
      @Override
      public void configurePipeline(PipelineConfigurer configurer) {
        // validate config properties that were not macros
        Schema inputSchema = configurer.getStageConfigurer().getInputSchema();
        validate(inputSchema);
        configurer.getStageConfigurer().setOutputSchema(getOutputSchema(inputSchema));
        // setup datasets
        ...
      }
    
      @Override
      public void prepareRun(BatchSinkContext context) {
        // setup run
        ...
      }
    
      private void validate(@Nullable Schema inputSchema) {
        if (!conf.containsMacro("f1") && conf.getF1() >= 100) {
          throw new IllegalArgumentException("f1 must be less than 100);
        }
        ...
      }
    
    }


    API changes

    New Programmatic APIs

    There are new Exceptions added to the plugin APIs

    Deprecated Programmatic APIs

    None

    New REST APIs

    PathMethodDescriptionResponse CodeResponse
    v3/v3namespaces/system/apps/<app-id>GETReturns the application spec for a given application

    200 - On success

    404 - When application is not available

    500 - Any internal errors

    Deprecated REST API

    Returns the application spec for a given application
    PathMethodDescription
    /v3/apps/<app-id>GET/pipelines/services/studio/methods/v1/contexts/<namespace-id>/validations/pipelinePOSTValidates a pipeline config

    Pipeline spec or errors

    v3/namespaces/system/apps/pipelines/services/studio/methods/v1/contexts/<namespace-id>/validations/stagePOSTValidates a single pipeline stage
    Stage spec or errors
    v3/namespaces/system/apps/pipelines/services/studio/methods/v1/contexts/<namespace-id>/validations/outputPOSTReturns what the stage would output given its configuration and some provided input
    Output records or errors

    Deprecated REST API

    None

    CLI Impact or Changes

  • Impact #1
  • Impact #2
  • Impact #3

    None

    UI Impact or Changes

  • Impact #1
  • Impact #2
  • Impact #3

    UI will need to change how the 'Get Schema' buttons are handled to call the new stage validation endpoint.

    When the UI wants to remove it's own schema propagation logic and add validation, it will need to use the new validation endpoints.

    Security Impact 

    What's the impact on Authorization and how does the design take care of this aspect

    Impact on Infrastructure Outages 

    System behavior (if applicable - document impact on downstream [ YARN, HBase etc ] component failures) and how does the design take care of these aspectUsers will not be able to use the pipeline studio if the new pipeline service is down.

    Test Scenarios

    Test IDTest DescriptionExpected Results












    Releases

    Release

    X

    6.

    Y

    0.

    ZRelease X.Y.Z

    0

    Implement validate endpoints in order to ensure user code no longer runs in the CDAP master

    Related Work

    • Work #1
    • Work #2
    • Work #3


    Future work

    Stage Debugging