Event Stream API
Currently, CDAP provides a set of REST APIs to query the status of pipelines. But the usage of these APIs for frequent polling might add load on the CDAP services and might be detrimental to the performance of the system.
As an alternative, CDAP users who are interested in monitoring details of data pipelines deployed in various instances can subscribe to the Event Stream API and receive events containing the details of data pipelines. This lets CDAP users monitor pipelines using an external tool.
CDAP uses a Transactional Messaging System (TMS) to provide a publish-subscribe mechanism for its internal events. This is leveraged to provide relevant events as public events.
What information does the API provide?
The Event Stream API provides the following details of a data pipeline run:
General details of a pipeline run like application name, program name, run ID, status of the run, runtime arguments, etc.
Spark execution metrics.
Errors, if any.
Start metadata (introduced in CDAP 6.8.0). Contains how the pipeline was started. If it was triggered manually, by time schedule, or by a trigger. Also, contains Trigger and Time schedule details.
How to consume the API?
Currently, CDAP only supports consumption of events through Google Pub/Sub.
You can implement the `EventWriter` interface and publish these events to the publish-subscribe service of your choice.
How to enable the API in CDAP?
Prerequisites
For the Event Writer extension to work, you need the following:
An installed CDAP instance supporting the event writer extensions (version 6.7.0 or greater).
The pubsub event writer writer extension. You can download it from https://github.com/cdapio/events-writer-extension
A GCP project with a Pub/Sub topic created.
A service account with permission to publish to the topic.
If you need to compile the extension, install Java 8 and Maven.
Preparing and loading the extension
Clone the events-writer-extension repository (https://github.com/cdapio/events-writer-extension ).
Inside a terminal, navigate to the folder of the downloaded repository and compile it by running: mvn clean package -Pdist. This generates the compiled JARs inside the folder target/libexec and cdap-event-writer-ext-gcp-pubsub. You will install it in the following steps.
Navigate to your CDAP installation folder and inside the folder ext, create a new folder named eventwriters. Inside it, create another folder called pubsub.
Copy all the files inside the folders from step 2 in the pubsub folder that you created in step 3.
Configuration
To configure the extension, you must edit your CDAP installation settings (both CDAP Sandbox and Distributed CDAP on GKE).
Configuration:
feature.event.publish.enabled : true
events.writer.extensions.dir : ext/eventwriters
event.writer.pub-sub-event-writer.project : <gcp-project-id>
event.writer.pub-sub-event-writer.serviceAccountPath : <path-to-service-account-file>
event.writer.pub-sub-event-writer.topic : <gcp-pub-sub-topic-name>
events.writer.extensions.enabled.list : pub-sub-event-writer
Optional properties in case you’re CDAP runs behind a network proxy:
event.writer.pub-sub-event-writer.proxy_host : <proxy-ip>
event.writer.pub-sub-event-writer.proxy_port : <proxy-port>
Optional for Spark Execution Metrics:
spark.metrics.host: http://<spark-host>:<port>
spark.metrics.strategy.retry.policy.base.delay.ms: "100"
spark.metrics.strategy.retry.policy.max.delay.ms: "2000"
spark.metrics.strategy.retry.policy.max.retries: "10"
spark.metrics.strategy.retry.policy.max.time.secs: "30"
spark.metrics.strategy.retry.policy.type: exponential.backoff
spark.metrics.max.termination.minutes: "15
CDAP Sandbox
Navigate to the CDAP sandbox folder, open the file ./conf/cdap-site.xml and add the above properties.
Distributed CDAP on GKE
Set up permissions and service accounts:
Enable the Workload Federation. https://cloud.google.com/kubernetes-engine/docs/how-to/workload-identity
When installing CDAP on GKE, a Service Account is created inside the Kubernetes cluster.
Create a service account on GCP Console and give it the Pub/Sub publisher permissions.
Link both service accounts.
Edit settings:
To edit the settings, log in to the Kubernetes cluster and run:
kubectl edit cdapmasters.cdap.cdap.io/cdap
Starting and debugging the application
After all the previous steps have been completed, start/restart CDAP.
Starting CDAP Sandbox
Run cdap sandbox start or cdap sandbox restart (in case it’s running)
Starting Distributed CDAP on GKE
If your CDAP instance is running, it should restart automatically when you edit the settings.
If you need to start the CDAP instance, follow these steps: Installing CDAP on Kubernetes
Debugging
Recommended: Use IntelliJ for Development and Debugging. To create the debugger configuration in the even writer extensions project Intellij, follow these steps:
Logging
pubsub Event Writer extension
Everytime a message is successfully published, a log at trace level will be published with the Pub/Sub message ID.
Error logs will be printed if:
If the Pub/Sub publisher doesn’t start.
If an error happens publishing the message.
If an error occurs while destroying the publisher.
FAQs
Q: What are the available extensions for Event Writer?
A: For now, CDAP only supports sending events to Google Cloud Pub/Sub.
Q. How can I enable/disable the Event Writer?
A: By setting the configuration property “event.publish.enabled” to true/false.
Q: Can I store the extensions in a different folder?
A: Yes, as long as you configure the proper directory in CDAP Settings (events.writer.extensions.dir : ext/eventwriters).
Q: How can I check that the extension is loaded correctly?
A: If the pubsub event writer extension is loaded correctly, on startup it will print an info level log with the message Publisher created successfully.
Created in 2020 by Google Inc.