Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 7 Next »


Checklist

  • User Stories Documented
  • User Stories Reviewed
  • Design Reviewed
  • APIs reviewed
  • Release priorities assigned
  • Test cases reviewed
  • Blog post

Introduction 

As the data pipeline experience has become more sophisticated, it has become clear that there are a good amount of capabilities required during the pipeline creation process. Some of these capabilities are provided by the CDAP platform if they are generic enough, such as the listing of plugins and their properties. However, there are a class of capabilities that are specific only to pipelines, such as pipeline validation, schema propagation, plugin templates, and pipeline drafts. These have all been implemented with non-trivial logic in the UI. This type of pipeline specific logic does not belong in the UI, but in a backend service that the UI can call. These types of features were explored using the plugin endpoint feature (for get schema buttons), but that has turned out to be difficult to handle errors correctly and ends up running user code in the CDAP master. The lack of this pipeline backend layer has slowed UI development, contributed to bugs in the product, and added a lot of technical debt. By moving much of this logic to a system application, overall development speed will increase and the door will be opened to a richer set of features in the future. 

Goals

Design the pipeline system application to remove tech debt from the UI and build an architecture that supports a richer set of pipeline specific product requirements in the future.

User Stories 

  1. As a pipeline developer, I want to be able to validate a pipeline before deploying it and know exactly which stages and which fields are invalid and why
  2. As a pipeline developer, I want to be able to validate a pipeline stage and know exactly which fields are invalid and why
  3. As a pipeline developer, I want to be able to debug a single pipeline stage by specifying input and examining output and/or errors
  4. As a pipeline developer, I want the schema displayed by the UI to always match what is used during execution
  5. As a pipeline developer, I want to be able to import a pipeline spec with missing artifacts and have the option to automatically update those versions
  6. As a plugin developer, I want schema propagation to be defined in a single place
  7. As a CDAP administrator, I don't want user code to run in the CDAP master

Design

A new system application will be introduced to provide much of the more complex logic that is currently handled by the UI. The data pipeline code will be enhanced with a Service program to expose a few APIs. Where possible, APIs will be stateless. The application will be deployed at startup to the system namespace through a bootstrap action. All REST endpoints described are prefixed by 'v3/namespaces/system/apps/pipeline/versions/6/services/studio/methods'. The app will be versioned with the major CDAP version number. This is to support environments that have legacy pipelines cross major version boundaries, which may introduce backward incompatibilities.  

Validate Pipeline

Pipeline validation is already done by the application when a pipeline is deployed. The logic just needs to be refactored so that it can be run during application configure time and in a service method call. The request body contains the namespace of the pipeline, it's type, and the same config that would be used to deploy the pipeline.

POST v1/validations/pipeline


Request Body:
{
  "namespace": "default",
  "type": "batch" | "streaming" ,
  "config": {
    "stages": [ ... ],
    "connections": [ ... ],
    ...
  }
}


Response:
{
  "type": "batch" | "streaming",
  "isValid": true | false,
  "errors": [
    {
      "stage": "stageName",
      "field": "fieldName",
      "message": "..."
    }
  ],
  "spec": {
    "stages": [
      {
        "name": "stageName",
        "plugin": {
          "type": "pluginType",
          "name": "pluginName",
          "properties": {
            "name": "value"
          },
          "artifact": {
            "name": "core-plugins",
            "scope": "SYSTEM" | "USER",
            "version": "1.0.0"
          },
          "inputSchemas": {
            "inputStageName": { schema object }
          },
          "outputPorts": {
            "outputStageName": {
              "port": "portName",
              "schema": "portSchema"
            }
          },
          "outputSchema": { schema object },
          "errorSchema": { schema object },
          "inputs": [ "input1", "input2", ... ],
          "outputs": [ "output1", "output2", ... ]
        }
      },
      ...
    ],
    "connections": [
      {
        "from": "inputStageName",
        "to": "outputStageName",
        "port": "outputPort",
        "condition": true | false
      },
      ...
    ],
    "resources": {
      "virtualCores": 1,
      "memoryMB": 1024
    },
    "driverResources": { ... },
    "clientResources": { ... },
    "stageLoggingEnabled": true | false,
    "processTimingEnabled": true | false,
    "properties": {
      "name": "val",
      ...
    },
    "endingActions": [
      {
        "name": "postaction name",
        "plugin": {
          "type": "postaction",
          "name": "email",
          "properties": { ... },
          "artifact": { ... }
        }
      }
    ]
  }
}

The response contains whether the pipeline is valid, errors if it is invalid, and the spec if it is valid. This is the same spec that can be found in the program properties of a deployed pipeline. It is similar to the pipeline config except it contains the exact plugin artifact that will be used (different than input if the input contains a version range or missing field), and it is enhanced with more information about schema.

If the error is not specific to a stage (for example, the pipeline structure is invalid), the stage name and field name will be missing. In order to capture information about which field is invalid, a new Exception will be added to the pipeline API. It will be up to the plugins to throw the exception containing the field that is invalid:

public class InvalidStageException extends RuntimeException {
  private final List<Exception> causes;


  public InvalidStageException(Collection<Exception> causes);
}


public class InvalidFieldException extends RuntimeException {
  private final String fieldName;
  
  public InvalidFieldException(String fieldName, String message, Throwable cause) { 
    super(message, cause);
    this.fieldName = fieldName;
  }


  public String getFieldName() {
    return fieldName;
  }
}

Validate Stage


Stage Debugging


Approach

Approach #1

Approach #2

API changes

New Programmatic APIs

None

Deprecated Programmatic APIs

None

New REST APIs

PathMethodDescriptionResponse CodeResponse










Deprecated REST API

None

CLI Impact or Changes

None

UI Impact or Changes

  • Impact #1
  • Impact #2
  • Impact #3

Security Impact 

What's the impact on Authorization and how does the design take care of this aspect

Impact on Infrastructure Outages 

Users will not be able to use the pipeline studio if the new pipeline service is down.

Test Scenarios

Test IDTest DescriptionExpected Results












Releases

Release X.Y.Z

Release X.Y.Z

Related Work

  • Work #1
  • Work #2
  • Work #3


Future work

  • No labels