Checklist
- User Stories Documented
- User Stories Reviewed
- Design Reviewed
- APIs reviewed
- Release priorities assigned
- Test cases reviewed
- Blog post
Introduction
As the data pipeline experience has become more sophisticated, it has become clear that there are a good amount of capabilities required during the pipeline creation process. Some of these capabilities are provided by the CDAP platform if they are generic enough, such as the listing of plugins and their properties. However, there are a class of capabilities that are specific only to pipelines, such as pipeline validation, schema propagation, plugin templates, and pipeline drafts. These have all been implemented with non-trivial logic in the UI. This type of pipeline specific logic does not belong in the UI, but in a backend service that the UI can call. These types of features were explored using the plugin endpoint feature (for get schema buttons), but that has turned out to be difficult to handle errors correctly and ends up running user code in the CDAP master. The lack of this pipeline backend layer has slowed UI development, contributed to bugs in the product, and added a lot of technical debt. By moving much of this logic to a system application, overall development speed will increase and the door will be opened to a richer set of features in the future.
Goals
Design the pipeline system application to remove tech debt from the UI and build an architecture that supports a richer set of pipeline specific product requirements in the future.
User Stories
- As a pipeline developer, I want to be able to validate a pipeline before deploying it and know exactly which stages and which fields are invalid and why
- As a pipeline developer, I want to be able to validate a pipeline stage and know exactly which fields are invalid and why
- As a pipeline developer, I want to be able to debug a single pipeline stage by specifying input and examining output and/or errors
- As a pipeline developer, I want the schema displayed by the UI to always match what is used during execution
- As a pipeline developer, I want to be able to import a pipeline spec with missing artifacts and have the option to automatically update those versions
- As a plugin developer, I want schema propagation to be defined in a single place
- As a CDAP administrator, I don't want user code to run in the CDAP master
Design
A new system application will be introduced to provide much of the more complex logic that is currently handled by the UI. The data pipeline code will be enhanced with a Service program to expose a few APIs. Where possible, APIs will be stateless. The application will be deployed at startup to the system namespace through a bootstrap action. All REST endpoints described are prefixed by 'v3/namespaces/system/apps/pipeline/versions/6/services/studio/methods'. The app will be versioned with the major CDAP version number. This is to support environments that have legacy pipelines cross major version boundaries, which may introduce backward incompatibilities.
Validate Pipeline
Pipeline validation is already done by the application when a pipeline is deployed. The logic just needs to be refactored so that it can be run during application configure time and in a service method call. The request body contains the namespace of the pipeline, it's type, and the same config that would be used to deploy the pipeline.
POST v1/validations/pipeline Request Body: { "namespace": "default", "type": "batch" | "streaming" , "config": { "stages": [ ... ], "connections": [ ... ], ... } } Response: { "type": "batch" | "streaming", "isValid": true | false, "errors": [ { "type": "Invalid Field", "message": "...", // type specific fields } ], "spec": { "stages": [ { "name": "stageName", "plugin": { "type": "pluginType", "name": "pluginName", "properties": { "name": "value" }, "artifact": { "name": "core-plugins", "scope": "SYSTEM" | "USER", "version": "1.0.0" }, "inputSchemas": { "inputStageName": { schema object } }, "outputPorts": { "outputStageName": { "port": "portName", "schema": "portSchema" } }, "outputSchema": { schema object }, "errorSchema": { schema object } } }, ... ], "connections": [ { "from": "inputStageName", "to": "outputStageName", "port": "outputPort", "condition": true | false }, ... ], "resources": { "virtualCores": 1, "memoryMB": 1024 }, "driverResources": { ... }, "clientResources": { ... }, "stageLoggingEnabled": true | false, "processTimingEnabled": true | false, "properties": { "name": "val", ... }, "endingActions": [ { "name": "postaction name", "plugin": { "type": "postaction", "name": "email", "properties": { ... }, "artifact": { ... } } } ] } }
Namespace is required in the request in order for the app to pick up the correct plugin, since plugin artifacts can be in user scope.
The response contains whether the pipeline is valid, errors if it is invalid, and the spec if it is valid. This is the same spec that can be found in the program properties of a deployed pipeline. It is similar to the pipeline config except it contains the exact plugin artifact that will be used (different than input if the input contains a version range or missing field), and it is enhanced with more information about schema.
If the error is not specific to a stage (for example, the pipeline structure is invalid), the stage name and field name will be missing. In order to capture this information, the plugin APIs will be extended to include a validate method that can throw a specific type of exception. This method will be called at configure time when the pipeline is deployed, as well as when the validate service endpoint is called.
public class InvalidStageException extends PipelineException { private final List<Exception> errors; public InvalidStageException(Collection<Exception> errors); private final List<Exception> getErrors(); } public class InvalidFieldException extends PipelineException { private final String fieldName; public InvalidFieldException(String fieldName, String message, Throwable cause) { super(message, cause); this.fieldName = fieldName; } public String getFieldName() { return fieldName; } } public interface PipelineConfigurable { void configurePipeline(PipelineConfigurer pipelineConfigurer) throws InvalidStageException; }
This allows plugins to write validation code that looks something like:
@Override public void configurePipeline(PipelineConfigurer pipelineConfigurer) throws InvalidStageException { List<Exception> configErrors = new ArrayList<>(); if (config.getNumPartitions() < 1) { configErrors.add(new InvalidFieldException("numPartitions", "The number of partitions must be at least 1.")); } if (!configErrors.isEmpty()) { throw new InvalidStageException(configErrors); } }
Which would result in errors in the REST response like:
"errors": [ { "type": "Invalid Field", "message": "The number of partitions must be at least 1.", "stage": "stage-4", "field": "numPartitions" } ]
Another type of error could be due to a plugin artifact that is missing. This specific type of error is fairly common during pipeline import, when a user imports a pipeline from a different version of CDAP, and the artifact versions do not match. The error would look like:
"errors": [ { "stage": "stage-5", "type": "Artifact Not Found", "message": "No plugin named 'table' could be found.", "suggestedArtifact": { "scope": "SYSTEM", "name": "core-plugins", "version": "6.0.0" } } ]
Validate Stage
Validation for a single stage is very similar to that of an entire pipeline. Instead of passing the entire pipeline config, the client passes just the input schema and the stage config. Instead of responding with the entire pipeline spec, just stage spec is returned.
POST v1/validations/stage Request Body: { "namespace": "default", "stage": { "name": "stageName", "plugin": { "type": "pluginType", "name": "pluginName", "artifact": { "scope": "SYSTEM" | "USER", "name": "core-plugins", "version": "1.0.0" }, "properties": { "name": "val", ... } }, "inputSchemas": { "inputStageName": { schema object } } } } Response: { "isValid": true | false, "errors": [ { "type": "Invalid Field", "message": "..." } ], "spec": { "name": "stageName", "plugin": { "type": "pluginType", "name": "pluginName", "properties": { "name": "value" }, "artifact": { "name": "core-plugins", "scope": "SYSTEM" | "USER", "version": "1.0.0" }, "inputSchemas": { "inputStageName": { schema object } }, "outputPorts": { "outputStageName": { "port": "portName", "schema": "portSchema" } }, "outputSchema": { schema object }, "errorSchema": { schema object } } } }
Stage Debugging
Approach
Approach #1
Approach #2
API changes
New Programmatic APIs
None
Deprecated Programmatic APIs
None
New REST APIs
Path | Method | Description | Response Code | Response |
---|---|---|---|---|
Deprecated REST API
None
CLI Impact or Changes
None
UI Impact or Changes
- Impact #1
- Impact #2
- Impact #3
Security Impact
What's the impact on Authorization and how does the design take care of this aspect
Impact on Infrastructure Outages
Users will not be able to use the pipeline studio if the new pipeline service is down.
Test Scenarios
Test ID | Test Description | Expected Results |
---|---|---|
Releases
Release X.Y.Z
Release X.Y.Z
Related Work
- Work #1
- Work #2
- Work #3