Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 6 Next »

Checklist

  • User Stories Documented
  • User Stories Reviewed
  • Design Reviewed
  • APIs reviewed
  • Release priorities assigned
  • Test cases reviewed
  • Blog post

Introduction 

There are a few use cases that we want to support with this feature. The first is to support a use case where the pipeline is a geofencing use case, where the pipeline is processing user locations, and wants to send notifications when a user enters or leaves a geo fence. Similar use cases include pipelines that are reading user biometric data that need to send notifications if user heart rate exceeds a certain value, or pipelines that read machine cpu usage that need to send notification if cpu usage exceeds a certain value. In these use cases, some uncommon event occurs that must be acted on by a separate system. The events that trigger these notifications still need to be processed by the rest of the pipeline, there is just some side effect that is triggered once they are observed.

Goals

The goal is to allow stages in a pipeline to emit notifications, which can be configured to be published to TMS or to Kafka.

User Stories 

  • As a pipeline developer, I want to be able to create a pipeline where notifications are published to Kafka or TMS when some condition is met
  • As a pipeline developer, I want to be able to create some pipelines that publish notifications and some that do not, even when the conditions are met
  • As a pipeline developer, I want to be able to configure which topic notifications are published to
  • As a pipeline developer, I want to be able to tell which plugins can emit notifications and which plugins cannot
  • As a cluster administrator, I want to be able to see how many notifications were published for a pipeline run
  • As a plugin developer, I want to be able to write arbitrary logic to control when to publish notifications
  • As a plugin developer, I want to be able to indicate which plugins can emit notifications

Design

At a high level, we would like each existing plugin type (except sinks) to be able to emit notifications. A notification is not an arbitrary record, but must conform to a specific schema. Each plugin will indicate whether it can emit notifications or not, which can be reflected in the UI by an additional 'port'. When a pipeline stage is connected to a new 'NotificationPublisher' plugin type, any notifications emitted by that stage will be sent to the NotificationPublisher for actual publishing.

When the actual notifications are published is left up to the pipeline. If a NotificationPublisher plugin is not attached to a stage, any notifications emitted by the stage will be dropped.

Approach

Pipeline Config

There will be no changes to the structure of the pipeline config. If a NotificationPublisher plugin type is connected to a stage, any notifications emitted by the stage will be sent to the publisher. The pipeline depicted in the image above would look something like:

{
  "stages": [
    { "name": "source", "plugin": { "type": "batchsource", ... } },
    { "name": "transform", "plugin": { "type": "transform", ... } },
    { "name": "sink", "plugin": { "type": "batchsink", ... } },
    { "name": "notificationemitter", "plugin": { "type": "notificationpublisher", ... } }
  ],
  "connections": [
    { "from": "source", "to": "transform" },
    { "from": "transform", "to": "sink" },
    { "from": "transform", "to": "notificationemitter" }
  ]
}

Emitting Notifications

The Emitter interface will be enhanced to allow emitting a notification:

public interface Emitter<T> {

  // existing method
  void emit(T value);

  // existing method
  void emitError(InvalidEntry<T> invalidEntry);
 
  void getNotificationEmitter(String topic);
 
  void getNotificationEmitter(String namespace, String topic);
}
 
public interface NotificationEmitter {

  void emit(Charset charset, String...payloads);

  void emit(Charset charset, Iterator<String> payloads);

  void emit(byte[]...payloads);

  void emit(Iterator<byte[]> payloads);
}

Publishing Notifications

Notifications can be published using a new NotificationPublisher plugin type:

public abstract class NotificationPublisher extends PipelineConfigurable, implements StageLifecycle<MessagingContext> {
  private MessagingContext context;
  
  @Override
  public void initialize(MessagingContext context) throws Exception {
    this.context = context;
  }

  @Override
  public void public(Notification notification) throws Exception {
  }

  @Override
  public void destroy() {
    // no-op
  }
}
 
public class Notification {
  String namespace;
  String topic;
  byte[] payloads;
}

Plugin Notification Port Indication

Though every plugin will be able to emit notifications through the programmatic API, most plugins will not make use of this functionality. Users should be given some indication of which plugins make use of it and which do not. One way to do this is to annotate the plugin:

@Plugin(type = Transform.PLUGIN_TYPE)
@Name("Notifier")
@NotificationsEnabled
public class NotifierTransform extends Transform {
 
}

The NotificationsEnabled will be an annotation only in the cdap-etl-api, as it is specific to pipelines. It is not a CDAP annotation. In order for the UI to see it, it must be somehow exposed through the RESTful API in a generic way. One way to do this is to add an 'annotations' field to the PluginClass object:

GET /v3/namespaces/<namespace-id>/artifacts/<artifact-name>/versions/<artifact-version>/extensions/<plugin-type>/plugins/<plugin-name>
[
  {
    "name": "JavaScript",
    "type": "transform",
    "annotations": [
      "NotificationsEnabled"
    ],
    "description": "Executes user-provided JavaScript that transforms one record into zero or more records.",
    "className": "co.cask.hydrator.plugin.transform.JavaScriptTransform",
    "artifact": { ... }, 
    "properties": { ... }
  }
  ...
]

This would still make CDAP generic, as it knows nothing about what the custom annotation means, but allows the UI and data pipeline app to do special logic with the annotation.

 

API changes

New Programmatic APIs

New Java APIs introduced (both user facing and internal)

Deprecated Programmatic APIs

 

New REST APIs

PathMethodDescriptionResponse CodeResponse
/v3/apps/<app-id>GETReturns the application spec for a given application

200 - On success

404 - When application is not available

500 - Any internal errors

 

     

Deprecated REST API

None

CLI Impact or Changes

  • None

UI Impact or Changes

  • UI must be able to detect which plugins can emit notifications, and display a corresponding port
  • UI must display metrics for notifications emitted

Security Impact 

This feature will use TMS, so any authorization added to TMS will affect this feature.

Impact on Infrastructure Outages 

None

Test Scenarios

Test IDTest DescriptionExpected Results
   
   
   
   

Releases

Release X.Y.Z

Release X.Y.Z

Related Work

  • Work #1
  • Work #2
  • Work #3

 

Future work

  • No labels