Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 11 Next »


Checklist

  • User Stories Documented
  • User Stories Reviewed
  • Design Reviewed
  • APIs reviewed
  • Release priorities assigned
  • Test cases reviewed
  • Blog post

Introduction 

As the data pipeline experience has become more sophisticated, it has become clear that there are a good amount of capabilities required during the pipeline creation process. Some of these capabilities are provided by the CDAP platform if they are generic enough, such as the listing of plugins and their properties. However, there are a class of capabilities that are specific only to pipelines, such as pipeline validation, schema propagation, plugin templates, and pipeline drafts. These have all been implemented with non-trivial logic in the UI. This type of pipeline specific logic does not belong in the UI, but in a backend service that the UI can call. These types of features were explored using the plugin endpoint feature (for get schema buttons), but that has turned out to be difficult to handle errors correctly and ends up running user code in the CDAP master. The lack of this pipeline backend layer has slowed UI development, contributed to bugs in the product, and added a lot of technical debt. By moving much of this logic to a system application, overall development speed will increase and the door will be opened to a richer set of features in the future. 

Goals

Design the pipeline system application to remove tech debt from the UI and build an architecture that supports a richer set of pipeline specific product requirements in the future.

User Stories 

  1. As a pipeline developer, I want to be able to validate a pipeline before deploying it and know exactly which stages and which fields are invalid and why
  2. As a pipeline developer, I want to be able to validate a pipeline stage and know exactly which fields are invalid and why
  3. As a pipeline developer, I want to be able to debug a single pipeline stage by specifying input and examining output and/or errors
  4. As a pipeline developer, I want the schema displayed by the UI to always match what is used during execution
  5. As a pipeline developer, I want to be able to import a pipeline spec with missing artifacts and have the option to automatically update those versions
  6. As a plugin developer, I want schema propagation to be defined in a single place
  7. As a CDAP administrator, I don't want user code to run in the CDAP master

Design

A new system application will be introduced to provide much of the more complex logic that is currently handled by the UI. The data pipeline code will be enhanced with a Service program to expose a few APIs. Where possible, APIs will be stateless. The application will be deployed at startup to the system namespace through a bootstrap action. All REST endpoints described are prefixed by 'v3/namespaces/system/apps/pipeline/versions/6/services/studio/methods'. The app will be versioned with the major CDAP version number. This is to support environments that have legacy pipelines cross major version boundaries, which may introduce backward incompatibilities.  

Validate Pipeline

Pipeline validation is already done by the application when a pipeline is deployed. The logic just needs to be refactored so that it can be run during application configure time and in a service method call. The request body contains the namespace of the pipeline, it's type, and the same config that would be used to deploy the pipeline.

POST v1/validations/pipeline

Request Body:
{
  "namespace": "default",
  "type": "batch" | "streaming" ,
  "config": {
    "stages": [ ... ],
    "connections": [ ... ],
    ...
  }
}

Response:
{
  "type": "batch" | "streaming",
  "isValid": true | false,
  "errors": [
    {
      "type": "Invalid Field",
      "message": "...",
      // type specific fields
    }
  ],
  "spec": {
    "stages": [
      {
        "name": "stageName",
        "plugin": {
          "type": "pluginType",
          "name": "pluginName",
          "properties": {
            "name": "value"
          },
          "artifact": {
            "name": "core-plugins",
            "scope": "SYSTEM" | "USER",
            "version": "1.0.0"
          },
          "inputSchemas": {
            "inputStageName": { schema object }
          },
          "outputPorts": {
            "outputStageName": {
              "port": "portName",
              "schema": "portSchema"
            }
          },
          "outputSchema": { schema object },
          "errorSchema": { schema object }
        }
      },
      ...
    ],
    "connections": [
      {
        "from": "inputStageName",
        "to": "outputStageName",
        "port": "outputPort",
        "condition": true | false
      },
      ...
    ],
    "resources": {
      "virtualCores": 1,
      "memoryMB": 1024
    },
    "driverResources": { ... },
    "clientResources": { ... },
    "stageLoggingEnabled": true | false,
    "processTimingEnabled": true | false,
    "properties": {
      "name": "val",
      ...
    },
    "endingActions": [
      {
        "name": "postaction name",
        "plugin": {
          "type": "postaction",
          "name": "email",
          "properties": { ... },
          "artifact": { ... }
        }
      }
    ]
  }
}

Namespace is required in the request in order for the app to pick up the correct plugin, since plugin artifacts can be in user scope.

The response contains whether the pipeline is valid, errors if it is invalid, and the spec if it is valid. This is the same spec that can be found in the program properties of a deployed pipeline. It is similar to the pipeline config except it contains the exact plugin artifact that will be used (different than input if the input contains a version range or missing field), and it is enhanced with more information about schema.

If the error is not specific to a stage (for example, the pipeline structure is invalid), the stage name and field name will be missing. In order to capture this information, the plugin APIs will be extended to include a validate method that can throw a specific type of exception. This method will be called at configure time when the pipeline is deployed, as well as when the validate service endpoint is called.

public class InvalidStageException extends PipelineException {
  private final List<Exception> errors;

  public InvalidStageException(Collection<Exception> errors);

  private final List<Exception> getErrors();
}

public class InvalidFieldException extends PipelineException {
  private final String fieldName;
  
  public InvalidFieldException(String fieldName, String message, Throwable cause) { 
    super(message, cause);
    this.fieldName = fieldName;
  }

  public String getFieldName() {
    return fieldName;
  }
}


public interface PipelineConfigurable {
  
  void configurePipeline(PipelineConfigurer pipelineConfigurer) throws InvalidStageException;

}

This allows plugins to write validation code that looks something like:

@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) throws InvalidStageException {
  List<Exception> configErrors = new ArrayList<>();
  if (config.getNumPartitions() < 1) {
    configErrors.add(new InvalidFieldException("numPartitions", "The number of partitions must be at least 1."));
  }
  if (!configErrors.isEmpty()) {
    throw new InvalidStageException(configErrors);
  }
}

Which would result in errors in the REST response like:

"errors": [
  {
    "type": "Invalid Field",
    "message": "The number of partitions must be at least 1.",
    "stage": "stage-4",
    "field": "numPartitions"
  }
]

Another type of error could be due to a plugin artifact that is missing. This specific type of error is fairly common during pipeline import, when a user imports a pipeline from a different version of CDAP, and the artifact versions do not match.  The error would look like:

"errors": [
  {
    "stage": "stage-5",
    "type": "Artifact Not Found",
    "message": "No plugin named 'table' could be found.",
    "suggestedArtifact": {
      "scope": "SYSTEM",
      "name": "core-plugins",
      "version": "6.0.0"
    }
  }
]

Validate Stage

Validation for a single stage is very similar to that of an entire pipeline. Instead of passing the entire pipeline config, the client passes just the input schema and the stage config. Instead of responding with the entire pipeline spec, just stage spec is returned.

POST v1/validations/stage

Request Body:
{
  "namespace": "default",
  "stage": {
    "name": "stageName",
    "plugin": {
      "type": "pluginType",
      "name": "pluginName",
      "artifact": {
        "scope": "SYSTEM" | "USER",
        "name": "core-plugins",
        "version": "1.0.0"
      },
      "properties": {
        "name": "val",
        ...
      }
    },
    "inputSchemas": {
      "inputStageName": { schema object }
    }
  }       
}

Response:
{
  "isValid": true | false,
  "errors": [
    {
      "type": "Invalid Field",
      "message": "..."
    }
  ],
  "spec": {
    "name": "stageName",
    "plugin": {
      "type": "pluginType",
      "name": "pluginName",
      "properties": {
        "name": "value"
      },
      "artifact": {
        "name": "core-plugins",
        "scope": "SYSTEM" | "USER",
        "version": "1.0.0"
      },
      "inputSchemas": {
        "inputStageName": { schema object }
      },
      "outputPorts": {
        "outputStageName": {
          "port": "portName",
          "schema": "portSchema"
        }
      },
      "outputSchema": { schema object },
      "errorSchema": { schema object }
    }
  }
}

This endpoint can be used to automatically update the output schema of a stage after a user changes the configuration of the stage. This would replace the 'Get Schema' button that is present in several plugins, removing the possibility of an invalid pipeline because somebody forgot to click the button.

Stage Debugging

It is often useful to be able to send input into a single pipeline stage and see what it would output. This is useful to debug a pipeline stage, and can be useful just to figure out how exactly a pipeline behaves. The request body is similar to the validate stage body, except it contains an additional section defining what the input stages are, the schema of the records from that stage, and the records from that stage. The response contains the output records, error records, or any exception that was generated while attempting to generate the output. This will be implemented initially for transform plugin types, and can be expanded as needed.

POST v1/validations/output


Request Body:
{
  "namespace": "default",
  "stage": {
    "name": "stageName",
    "plugin": {
      "type": "pluginType",
      "name": "pluginName",
      "artifact": {
        "scope": "SYSTEM" | "USER",
        "name": "core-plugins",
        "version": "1.0.0"
      },
      "properties": {
        "name": "val",
        ...
      }
    }
  },
  "input": [
    {
      "stage": "input stage name",
      "schema": { schema of data coming from that stage },
      "records": [
        {
          "fieldName": type appropriate value
        },
        ...
      ]
    },
    ...
  ]       
}


Response:
{
  "errors": [
    {
      "message": "exception message",
      "trace": "exception stack trace"
    },
    ...
  ],
  "output": {
    "schema": { output schema },
    "records": [
      {
        "fieldName": type appropriate value
      },
      ...
    ] 
  }
}

API changes

New Programmatic APIs

None

Deprecated Programmatic APIs

None

New REST APIs

PathMethodDescriptionResponse CodeResponse










Deprecated REST API

None

CLI Impact or Changes

None

UI Impact or Changes

  • Impact #1
  • Impact #2
  • Impact #3

Security Impact 

What's the impact on Authorization and how does the design take care of this aspect

Impact on Infrastructure Outages 

Users will not be able to use the pipeline studio if the new pipeline service is down.

Test Scenarios

Test IDTest DescriptionExpected Results












Releases

Release X.Y.Z

Release X.Y.Z

Related Work

  • Work #1
  • Work #2
  • Work #3


Future work

  • No labels