Event Stream API

Currently, CDAP provides a set of REST APIs to query the status of pipelines. But the usage of these APIs for frequent polling might add load on the CDAP services and might be detrimental to the performance of the system.

As an alternative, CDAP users who are interested in monitoring details of data pipelines deployed in various instances can subscribe to the Event Stream API and receive events containing the details of data pipelines. This lets CDAP users monitor pipelines using an external tool.

CDAP uses a Transactional Messaging System (TMS) to provide a publish-subscribe mechanism for its internal events. This is leveraged to provide relevant events as public events.

What information does the API provide?

The Event Stream API provides the following details of a data pipeline run:

  • General details of a pipeline run like application name, program name, run ID, status of the run, runtime arguments, etc.

  • Spark execution metrics.

  • Errors, if any.

  • Start metadata (introduced in CDAP 6.8.0). Contains how the pipeline was started. If it was triggered manually, by time schedule, or by a trigger. Also, contains Trigger and Time schedule details.

How to consume the API?

Currently, CDAP only supports consumption of events through Google Pub/Sub.

You can implement the `EventWriter` interface and publish these events to the publish-subscribe service of your choice.

How to enable the API in CDAP?

Prerequisites

For the Event Writer extension to work, you need the following:

Preparing and loading the extension

  1. Clone the events-writer-extension repository (GitHub - cdapio/events-writer-extension ).

  2. Inside a terminal, navigate to the folder of the downloaded repository and compile it by running: mvn clean package -Pdist. This generates the compiled JARs inside the folder target/libexec and cdap-event-writer-ext-gcp-pubsub. You will install it in the following steps.

  3. Navigate to your CDAP installation folder and inside the folder ext, create a new folder named eventwriters. Inside it, create another folder called pubsub.

  4. Copy all the files inside the folders from step 2 in the pubsub folder that you created in step 3.

Configuration

To configure the extension, you must edit your CDAP installation settings (both CDAP Sandbox and Distributed CDAP on GKE).

Configuration:

  • feature.event.publish.enabled : true

  • events.writer.extensions.dir : ext/eventwriters

  • event.writer.pub-sub-event-writer.project : <gcp-project-id>

  • event.writer.pub-sub-event-writer.serviceAccountPath : <path-to-service-account-file>

  • event.writer.pub-sub-event-writer.topic : <gcp-pub-sub-topic-name>

  • events.writer.extensions.enabled.list : pub-sub-event-writer

Optional properties in case you’re CDAP runs behind a network proxy:

  • event.writer.pub-sub-event-writer.proxy_host : <proxy-ip>

  • event.writer.pub-sub-event-writer.proxy_port : <proxy-port>

Optional for Spark Execution Metrics:

  • spark.metrics.host: http://<spark-host>:<port>

  • spark.metrics.strategy.retry.policy.base.delay.ms: "100"

  • spark.metrics.strategy.retry.policy.max.delay.ms: "2000"

  • spark.metrics.strategy.retry.policy.max.retries: "10"

  • spark.metrics.strategy.retry.policy.max.time.secs: "30"

  • spark.metrics.strategy.retry.policy.type: exponential.backoff

  • spark.metrics.max.termination.minutes: "15

CDAP Sandbox

Navigate to the CDAP sandbox folder, open the file ./conf/cdap-site.xml and add the above properties.

Distributed CDAP on GKE

Set up permissions and service accounts:

  1. Enable the Workload Federation. Authenticate to Google Cloud APIs from GKE workloads  |  Google Kubernetes Engine (GKE)

When installing CDAP on GKE, a Service Account is created inside the Kubernetes cluster. 

Installing CDAP on Kubernetes

  1. Create a service account on GCP Console and give it the Pub/Sub publisher permissions.

  2. Link both service accounts.

Edit settings:

To edit the settings, log in to the Kubernetes cluster and run:

kubectl edit cdapmasters.cdap.cdap.io/cdap

Starting and debugging the application

After all the previous steps have been completed, start/restart CDAP.

Starting CDAP Sandbox

Run cdap sandbox start or cdap sandbox restart (in case it’s running)

Starting Distributed CDAP on GKE

If your CDAP instance is running, it should restart automatically when you edit the settings.

If you need to start the CDAP instance, follow these steps: Installing CDAP on Kubernetes

Debugging

Recommended: Use IntelliJ for Development and Debugging. To create the debugger configuration in the even writer extensions project Intellij, follow these steps:

Debugging a CDAP Application

Logging

pubsub Event Writer extension

Everytime a message is successfully published, a log at trace level will be published with the Pub/Sub message ID.

Error logs will be printed if:

  • If the Pub/Sub publisher doesn’t start.

  • If an error happens publishing the message.

  • If an error occurs while destroying the publisher.

FAQs

Q: What are the available extensions for Event Writer?

A: For now, CDAP only supports sending events to Google Cloud Pub/Sub.

Q. How can I enable/disable the Event Writer?

A: By setting the configuration property “event.publish.enabled” to true/false.

Q: Can I store the extensions in a different folder? 

A: Yes, as long as you configure the proper directory in CDAP Settings (events.writer.extensions.dir : ext/eventwriters).

Q: How can I check that the extension is loaded correctly?

A: If the pubsub event writer extension is loaded correctly, on startup it will print an info level log with the message Publisher created successfully.

Created in 2020 by Google Inc.