Checklist
- User Stories Documented
- User Stories Reviewed
- Design Reviewed
- APIs reviewed
- Release priorities assigned
- Test cases reviewed
- Blog post
Introduction
There are a few use cases that we want to support with this feature. The first is to support a use case where the pipeline is a geofencing use case, where the pipeline is processing user locations, and wants to send alerts when a user enters or leaves a geo fence. Similar use cases include pipelines that are reading user biometric data that need to send alerts if user heart rate exceeds a certain value, or pipelines that read machine cpu usage that need to send alerts if cpu usage exceeds a certain value. In these use cases, some uncommon event occurs that must be acted on by a separate system. The events that trigger these alerts still need to be processed by the rest of the pipeline, there is just some side effect that is triggered once they are observed.
Goals
The goal is to allow stages in a pipeline to emit alerts, which can be configured to be published to TMS or to Kafka.
User Stories
- As a pipeline developer, I want to be able to create a pipeline where alerts are published to Kafka or TMS when some condition is met
- As a pipeline developer, I want to be able to create some pipelines that publish alerts and some that do not, even when the conditions are met
- As a pipeline developer, I want to be able to configure which topic alerts are published to
- As a pipeline developer, I want to be able to tell which plugins can emit alerts and which plugins cannot
- As a cluster administrator, I want to be able to see how many alerts were published for a pipeline run
- As a plugin developer, I want to be able to write arbitrary logic to control when to publish alerts
- As a plugin developer, I want to be able to indicate which plugins can emit alerts
- As a plugin developer, I want to be able to set a payload for any alert emitted
- As a plugin developer, I want to be able to write a plugin that publishes alerts
- As an alert consumer, I want the alert to contain the namespace, pipeline name, stage name, and payload
Design
At a high level, we would like each existing plugin type (except sinks) to be able to emit alerts. An alert is not an arbitrary record, but must conform to a specific schema. Each plugin will indicate whether it can emit notifications or not, which can be reflected in the UI by an additional 'port'. When a pipeline stage is connected to a new 'AlertPublisher' plugin type, any notifications emitted by that stage will be sent to the AlertPublisher for actual publishing.
When the actual alerts are published is left up to the pipeline. If a AlertPublisher plugin is not attached to a stage, any alerts emitted by the stage will be dropped.
Approach
Pipeline Config
There will be no changes to the structure of the pipeline config. If a AlertPublisher plugin type is connected to a stage, any alerts emitted by the stage will be sent to the publisher. The pipeline depicted in the image above would look something like:
{ "stages": [ { "name": "source", "plugin": { "type": "batchsource", ... } }, { "name": "transform", "plugin": { "type": "transform", ... } }, { "name": "sink", "plugin": { "type": "batchsink", ... } }, { "name": "notificationemitter", "plugin": { "type": "alertpublisher", ... } } ], "connections": [ { "from": "source", "to": "transform" }, { "from": "transform", "to": "sink" }, { "from": "transform", "to": "notificationemitter" } ] }
Publishing Alerts
Alerts can be published using a new AlertPublisher plugin type:
public abstract class AlertPublisher extends PipelineConfigurable, implements StageLifecycle<AlertPublisherContext> { private MessagingContext context; @Override public void initialize(AlertPublisherContext context) throws Exception { this.context = context; } @Override public void publish(Iterator<Alert> payloads) throws Exception { } @Override public void destroy() { // no-op } } public interface AlertPublisherContext extends MessagingContext, StageContext { } public interface Alert { // get the stage the alert was emitted from String getStageName(); // get the payload of the alert String getPayload(); }
The publish method receives an Iterator of Alerts in case it wants to do some dedup or aggregation logic before actually publishing anything. We'll also need to add a getNamespace() and getPipelineName() methods to StageContext, as MessagePublisher requires the namespace, and publishers need access to the pipeline name.
This approach means that it is the plugin determines the structure of the alert that is published, meaning whatever consumes the alert must be aware of the format.
Emitting Alerts
The Emitter interface will be enhanced to allow emitting an alert:
public interface Emitter<T> { // existing method void emit(T value); // existing method void emitError(InvalidEntry<T> invalidEntry); void emitAlert(String payload); }
Plugin Alert Port Indication
Approach 1
Though every plugin will be able to emit notifications through the programmatic API, most plugins will not make use of this functionality. Users should be given some indication of which plugins make use of it and which do not. One way to do this is to annotate the plugin:
@Plugin(type = Transform.PLUGIN_TYPE) @Name("Alert") @EmitsAlerts public class AlertTransform extends Transform { }
The EmitsAlerts will be an annotation only in the cdap-etl-api, as it is specific to pipelines. It is not a CDAP annotation. In order for the UI to see it, it must be somehow exposed through the RESTful API in a generic way. One way to do this is to add an 'annotations' field to the PluginClass object:
GET /v3/namespaces/<namespace-id>/artifacts/<artifact-name>/versions/<artifact-version>/extensions/<plugin-type>/plugins/<plugin-name> [ { "name": "JavaScript", "type": "transform", "annotations": [ "EmitsAlerts" ], "description": "Executes user-provided JavaScript that transforms one record into zero or more records.", "className": "co.cask.hydrator.plugin.transform.JavaScriptTransform", "artifact": { ... }, "properties": { ... } } ... ]
This would still make CDAP generic, as it knows nothing about what the custom annotation means, but allows the UI and data pipeline app to do special logic with the annotation. Note that this could also be done to indicate whether a plugin emits errors.
Approach 2
We could also extend (in cdap-data-pipeline) PluginConfig with class that contains a 'alertsEnabled' boolean field. Only plugins that use a conf with 'alertsEnabled' set to true would be allowed to emit alerts. The UI could also use this flag as a way to determine whether an alert port should be displayed.
public class AlertConfig extends PluginConfig { @Description("Whether this stage can emit alerts.") boolean alertsEnabled; }
Approach 3
We could also make use of the Plugin Endpoint system that is currently used for schema propagation. Plugins would have to include an endpoint that returns whether it emits alerts or not.
@Path("emitsAlerts") public boolean getSchema(Conf conf) { return conf.threshold != null; }
This has a benefit of not being static, which means a plugin could decide whether it might emit alerts depending on its configuration. However, this may also cause additional complication, as the alert port could potentially appear and disappear as the plugin is being configured. Downside are that it is more complicated, more work for the Plugin developer, and the Plugin Endpoint system does not support good error handling capabilities.
Implementation
Alerts will be collected in the Spark Driver in memory (through the .collect() method), before being sent to an AlertPublisher. This is because we are not able to publish within a Spark closure, and in case the publisher wants to do some dedup logic or aggregation logic. This also means that users should be educated to avoid emitting a large number of alerts, or alerts with large payloads, as the driver could run out of memory. In MapReduce, tasks are also unable to publish to TMS. The program will need to write all alerts to a local FileSet, then read from the FileSet and publish everything at the end.
API changes
New Programmatic APIs
Emitter.emitAlert and AlertPublisher are new APIs.
Deprecated Programmatic APIs
Modified REST APIs
Path | Method | Description | Response Code | Response |
---|---|---|---|---|
/v3/namespaces/<namespace-id>/artifacts/<artifact-name>/versions/<artifact-version>/extensions/<plugin-type>/plugins/<plugin-name> | GET | Returns the plugin class details |
| annotations field added |
Deprecated REST API
None
CLI Impact or Changes
- None
UI Impact or Changes
- UI must be able to detect which plugins can emit notifications, and display a corresponding port
- UI must display metrics for notifications emitted
Security Impact
This feature will use TMS, so any authorization added to TMS will affect this feature.
Impact on Infrastructure Outages
None
Test Scenarios
Test ID | Test Description | Expected Results |
---|---|---|
Releases
Release X.Y.Z
Release X.Y.Z
Related Work
- Work #1
- Work #2
- Work #3