Operations Dashboard Backend Design

Checklist

  • User Stories Documented
  • User Stories Reviewed
  • Design Reviewed
  • APIs reviewed
  • Release priorities assigned
  • Test cases reviewed
  • Blog post

Introduction

Operations dashboard provides users two views: Dashboard and Reporting. The Dashboard view is a real-time interactive interface that allows users to take a sneak peek of the program runs statistics, resource usage within any 24 hours in the last week. A user can also select any hour within the 24 hours view to get program run details within that hour. The Reporting view provides a comprehensive report of program runs, resource usage and other statistics selected by users within a given time range such as 24 hours or one week. However, once users select the types of data to be included in the report, the Reporting view does not support interaction and may require users to wait for the report to be generated. 

Goal

Provide API and data required to generate interactive Dashboard view within a shorter time range and comprehensive reports within a longer time range.

User Stories

  1. As a cluster administrator, I want to see how many programs were running at 8pm yesterday and who started those programs.
  2. As a cluster administrator, I want to see how memory and core usage changes accross time in the last 24 hours.
  3. As a cluster administrator, I want to see what programs are scheduled to run in next 24 hours.
  4. As a pipeline developer, I want to see how many programs failed in the last 24 hours and be able to investigate their logs.


More use cases: https://confluence.cask.co/pages/viewpage.action?title=Operational+Dashboard&spaceKey=PROD

Design

As shown in the diagram above, each program run is currently publishing a message that contains program run meta information such as namespace, program name, status, etc to a TMS topic. Additional fields will be added to these messages for report generation, such as launching method and user. The two additional fields launching method and user will also be added to RunRecordMeta. Such messages are currently processed by ProgramNotificationSubscriberService to persist in an HBase table RunReocrdMeta.DashboardHandler handles requests for dashboard view by scanning RunReocrdMeta/ If the user chooses to enable the report generation feature, a Spark program will be launched. The spark program contains the ProgramMetaSubscriberService that subscribes to the TMS topic and write the program run meta information into time partitioned files. Requests for generating or getting reports will be handled by ReportHandler. Both ProgramMetaSubscriberService and ReportHandler run in the Spark driver. The handler will forward the request to ReportGenerator in Spark executor and ReportGenerator will start a report generation job and return a unique report ID. ReportGenerator will first read from the time partitioned program run meta records and write report files under directories named by report ID's.

Open questions: how will the operation meta data be collected from the cloud? how will resource usage be measured and displayed?

Approach

End-to-End Report Generation

The client first gets a list of all the reports owned by the user by calling GET /reports. With the report id's returned, the user can get further information about each report by calling GET /reports/<report-id> , such as the request used to generate this report and the status of the report generation. If no existing report contains the desired information, the user can call POST /reports to generate a report. A report ID will be returned for the user to query for the status of the requested report. When the report is completed, the user can call GET /reports/<report-id>/summary and GET /reports/<report-id>/runs to get the report summary and details.

The diagram above is generated by https://www.websequencediagrams.com/ with the following lines:

title Report Query Sequence
Client->ProgramOperationsHTTPHandler: GET /reports
ProgramOperationsHTTPHandler->Client: Existing report ids owned by the user
Client->ProgramOperationsHTTPHandler: GET /reports<report-id>
ProgramOperationsHTTPHandler -> Client: Report generation status
Client->ProgramOperationsHTTPHandler: POST /reports with query in body to generate report
ProgramOperationsHTTPHandler -> ReportGenerator: Generate report if the report doesn't exist and is not being generated
ReportGenerator -> ProgramOperationsHTTPHandler: Job status and Report ID
ProgramOperationsHTTPHandler->Client: Report ID
Client->ProgramOperationsHTTPHandler: GET /reports<report-id>
ProgramOperationsHTTPHandler -> Client: Report generation status
Client->ProgramOperationsHTTPHandler: GET /reports/<report-id>/summary
ProgramOperationsHTTPHandler->Client: Report summay
Client->ProgramOperationsHTTPHandler: GET /reports/<report-id>/runs
ProgramOperationsHTTPHandler->Client: Report details


Report Generator State Diagram

Initially, the report generator is in Idle state waiting for report generation request. When a report generation request arrives, the report generator transitions into Started state and generates the report ID. The report ID is returned in the response to the request, and a new directory will be created with the report ID. In the directory, a Job Creation File will be created, which includes the report generation request, time when the job started and the RunId of the job. After writing the file, the report generator transitions into Running state and it may ends in Completed or Failed state. If successfully completed, a _SUCCESS file and the report will be written.

What to do when the report generator is not in Idle state but a new request is received? How to scale report generator? Expose an endpoint to stop a report generation job?

Data model

Message format

Program Run StatusExisting fieldsAdditional fields
STARTINGprogramRunId (JSON), startTime, status (STARTING), programStatus, userOverrides (JSON map), systemOverrides (JSON map), TwillRunId (optional)parentArtifactId (JSON), principal (JSON)
RUNNINGprogramRunId (JSON), startRunningTime, status (RUNNING)
COMPLETED programRunId (JSON), endTime, status (COMPLETED)
FAILED   programRunId (JSON), endTime, status (FAILED), failureCause (JSON)
KILLED  programRunId (JSON), endTime, status (KILLED)


ProgramRunMetaRecord Schema

[  
  {
    "namespace": "co.cask.report",
    "type": "record",
    "name": "ReportRecord", 
    "fields": [
      /**
       *  ID of the source of this programRunMetaRecord. Specifically, TMS messageId in implementation
       */
      {"name": "sourceId",
       "type": "string"},

      {"name": "application",
       "type": "string"},
      {"name": "version",
       "type": "string"},
      {"name": "type",
       "type": "string"},
      {"name": "program",
       "type": "string"},
      {"name": "run",
       "type": "string"},
      {"name": "time",
       "type": "long"},
      {"name": "status",
       "type": "string"},
      {"name": "statusInfo",
       "type": ["null", "co.cask.report.ProgramStartingInfo"]} 
    ]
   }
  },
  {
    "namespace": "co.cask.report",
    "type": "record",
    "name": "ProgramStartingInfo", 
    "fields": [
      {"name": "artifactName",
       "type": "string"},
      {"name": "artifactVersion",
       "type": "string"},
      {"name": "user",
       "type": "string"},
      /**
       *  A map of runtime arguments
       */
      {"name": "runtimeArgs",
       "type": "map", 
       "values": "string"},

      /**
       *  Method of the program being launched. "method" can have value either "MANUAL" or "SCHEDULED". "triggerInfo" is null  
       *  if the program is started manually or contains the TriggerInfo in JSON format if it is started by schedule.
       */
      {
        "name": "startMethod",
        "type": "record",
        "fields" : [
          {"name": "method", "type": "string"},
          {"name": "triggerInfo", "type": ["null", "string"]}
        ]
      }
    ]
  }
]

Report Schema

Report schema will be a subset of the ProgramRunMetaRecord schema depending on the query. sourceId will always be excluded from the report.

Mechanism for storage 

ProgramRunMetaRecord File format 

FormatProsCons
Parquet

Fast read and query, more space efficient, can utilize row group statistics and page statistics to filter records, most importantly, with a given query time range [BEGIN, END), we can efficiently filter out records with startTime < END && endTime > BEGIN with predicate pushdown, program runs started by certain users or status with dictionary pushdown.

Slower write (less relevant since write is not frequent); cannot fully utilize the column chunk if new program runs are not frequently generated
AvroFaster write. Can append row by row with less delay, good for realtime dashboard view.Slower filtering and random access; more space consumption

Suppose records have average size 400B, there are 100 new records every minute, a file with all records generated in 24 hours is roughly 50MB, this can easily fit into one block of HDFS file.

Since there are only around 20 fields and most fields will be included in a report, Parquet doesn't really have much advantage of querying. However, if each individual file is large and require filtering out a small fraction of records, Parquet still has advantage in reading.

Source: https://db-blog.web.cern.ch/blog/zbigniew-baranowski/2017-01-performance-comparison-different-file-formats-and-storage-engines


ProgramRunMetaRecord Storage Layout

/<meta-file-base-directory>/<namespace>/<date>/<file-creation-time>.avro

Files are stored in HDFS directories as specified above according to namespace, and each file contains only program runs within a certain time range, as specified by the filename.

For instance, a file with path "/<meta-file-base-directory>/default/2017-12-02/1519095562.avro" is the file containing all program runs meta information in namespace default between 12am UTC 2017-12-02 and before 12am UTC 2017-12-03.

TTL?

Report Files Layout

/<reports-base-directory>/<owner>/<report-id>/

Reports will be stored in HDFS directory with a path like above. There are three possible combinations of files in the directory:

_STARTED

_STARTED, _SUCCESS, report and summary

_STARTED and _FAILED

_STARTED file is created when the report generation job first starts and it includes the report generation request, time when the job started and the RunId of the job. If the RunId in  _STARTED doesn't match the RunId of the current Spark program run, then the report generation failed and a _FAILED file will be created. _SUCCESS file indicates the report generation succeeds.  _FAILED file indicates the report generation fails and contains the reason of failure. The report and summary files are both JSON text files following the same format as specified in the REST API response.

Runtime Infrastructure

Report ID Generation

Report ID's are generated by the report generator. 

Report Generation Job

Approach #1: A CDAP app which contains a service and a Spark program. The service is the endpoint for accepting new jobs and polling job status. If a report already exists or is being generated, the service will return the existing report link or the runId of the running job. The Spark program performs report generation and will store the result as a file. Pro: easy to develop. Con: overhead of launching Spark program.

Approach #2: A long running Spark program that contain HTTP service handler. Pro: less overhead of launching Spark program. Con: occupies more resource even when no job is being generated.

Scalability

Scalability of the ProgramOperationHandler: TODO

Scalability of the ReportGenerator: TODO

Report Pagination

Approach #1: Without offset in the request and return a body producer that can be used to call next.

Approach #2: Save the filtered and sorted results in a file and get offset from the request.

Report Cleanup

When each report is being created, there is a TTL for these files (for instance, 60 days). Before ReportHTTPHandler return the report, it will first check whether the files have exceeded the TTL since created. Expired files won't be returned. A background thread will periodically cleanup files that have existed for more than TTL + delay (maybe 1 hour), to reduce conflict between ReportHTTPHandler reading the files and deletion.

Share ID Generation

Share ID is generated by encrypting the report id, owner of the report, a random seed and maybe the share ID creation time with a private key. A share id can be decrypted to get the information for locating the report. 

Failure Handling

Failure during writing ProgramRunMetaRecord file: after restart, latest sourceId will be found from latest meta record files under each namespace will be read. Message subscriber will read messages after the latest sourceId. The unfinished runs from the latest complete row block will also be read into memory to be carried over to the next row block.

Failure during generating report: since the Report Status won't be updated after failure, existing partial report will be rewritten after the report generation reruns. It is not feasible to continue with the existing report because if some fields are sorted in the report, since the start point of reprocessing meta records cannot be determined.

Query & Reporting structure 

Sample Dashboard Query Scenarios

  1. Select all records from namespace A, B, C from 2017-12-10/13-00 to 2017-12-11/14-00 with fields application, version, program, run, start time, end time, principal name, parent artifact type, launching method, memory usage and core usage.

Sample Reporting Scenarios 

  1. Select all records from namespace A from 2017-12-10/13-00 to 2017-12-17/14-00 with fields application, version, program, run and principal name, ranked by highest memory usage
  2. Select all records from namespace A from 2017-12-10/13-00 to 2017-12-17/14-00 with fields application, version, program, run and memory usage, filtered by a given principal name 
  3. Select all records from namespaces A, B, C from 2017-12-10/13-00 to 2017-12-17/14-00 with fields application, version, program, run and principal name, ranked by highest memory usage filtered by a given principal name
  4. Select all records from namespaces A, B, C from 2017-12-10/13-00 to 2017-12-17/14-00 with fields application, version, program, run and principal name filtered by memory usage larger than 500MB

Security


Authentication Design:

  1. On Authentication enabled cluster, Spark service handler can get the user id using the HTTP header “CDAP-UserId”

  2. We will use the userId to create directory under reports structure and store reports generated by this user under this directory.

  3. On authentication disabled clusters, this header won’t be present, and we will use “system” directory to store the reports.

  4. For listing reports, we construct path based on user-id and list reports under this directory.

  5. For sharing reports, we will create an encrypted id (link) based on user name and file and send it as response. The user can share this share-id with their colleagues for sharing the report.

  6. Users with a share-id, use that share-id to retrieve a report, On backend after receiving the shared id, we decrypt username and file name and construct the correct path to the report, which we will use to read and return the report.


Dashboard View

Program Runs Statistics:

  1. The average duration of the completed program runs in each time bucket within a time range.
  2. Future scheduled program runs with namespace, program name, parent app type and
    1. user ( ? )

Resource Information:

  1. Memory usage of namespace(s) (?), clusters
  2. Max memory available
  3. Core usage of namespace(s) (?), clusters
  4. Max number of cores available

Report View

Additional Program Runs information besides those in Dashboard view:

  1. Memory Usage
  2. Number of CPU
  3. Number of Containers
  4. Number of Log Warnings
  5. Number of Log Errors
  6. Number of records out

Summary Counts:

  1. Runs per namespace
  2. Time range
  3. Pipelines (Realtimevs Batch), custom apps
  4. Durations: min, max, average
  5. Last Started: Oldest and Newest
  6. List of users & count per user
  7. List of start method & count per methods


API changes

RunRecord API Change

/**
 * This class records information for a particular run.
 */
public class RunRecord {
...
  @SerializedName("artifactname")
  private final String artifactName;
 
  @SerializedName("artifactversion")
  private final String artifactVersion;
 
  @SerializedName("principal")
  private final String principalName;

  @SerializedName("principaltype")
  private final String principalType;
 
...
}


New REST APIs

PathMethodDescriptionRequestResponse CodeResponse
/v3/reports?offset=<offset>&limit=<limit>GETList all reports owned by the given user

Query Parameters:

offset - the offset to start read the reports list

limit - the max number of reports returned

200 - On successfully returning the list of reports

Return a JSON object

{ 
  "offset" : <offset>,
  "limit" : <limit>,
  "total" : <total>,
  "reports": [
    {
      "id" : "<id>",
      "name" : "<name>",
      "description":"<description>",
      "expiry" : <expiry>,
      "created" : <time created>,
      "status" : "<status>"
    },
    ...
  ]
}

Each entry in "reports" contains the name of the report, description of the report (optional), whether the report is saved or not, if not saved, expiry time of the report as epoch time in seconds, the time when the report was created, the "request" contains the request object to create the report in "POST /v3/reports" endpoint

/v3/reportsPOST

Request to generate program runs report with required fields, filtering, and sorting conditions specified in the request body.


Request body:

The name of the report, the start (inclusive, all runs in the report will have no end time or end time >= start) and end (exclusive, all runs in the report will have start time < end) time in seconds to indicate the time range for generating the report.

Specify required fields in an array in "fields" property.

Use "sort" to indicate whether to sort records by a field in ascending or descending order.

Specify the filtering conditions in "filters". To get records with certain values in a field, add the values to be matched in "whitelist". To exclude records with certain values in a field, add the values to be excluded in "blacklist". To get records with values within a certain range in a field, specify the "min" (inclusive) and "max" (exclusive) in "range". One of "min" or "max" can be omitted in "range" to indicate an open-ended range. 

{
  "name" : "<name>",
  "start" : <strat>, 
  "end" : <end>,
  "fields" : ["field1", "field2", "field3", ...],
  "sort" : [
    {
       "fieldName" : "<fieldName>",
       "order" : "<ASCENDING or DESCENDING>"
    },
   ...
   ],
  "filters" : [
    {
      "fieldName" : "<fieldName>",
      "whitelist" : ["value1", "value2", ...],
    },
    {
      "fieldName" : "<fieldName>",
      "blacklist" : ["value1", "value2", ...],
    },
    {
      "fieldName" : "<fieldName>",
      "range" : {
        "min" : <min>,
        "max" : <max>
      }
    },
    ...
  ]
}

A complete list of all the possible fields in the report and eligible properties for each field are listed below:

namespace: values
artifactScope: values
artifactName: values
artifactVersion: values
applicationName: values
applicationVersion: values
type: values
program: values
run: values
status: values
start: range, sortBy
running: range, sortBy
end: range, sortBy
duration: range, sortBy
user: values
startMethod: values
runtimeArgs: ""
numLogWarnings: range, sortBy
numLogErrors: range, sortBy
numRecordsOut: range, sortBy

 

Example:

{ 
  "start" : 1518211210, 
  "end" : 1518214810,
  "fields" : ["namespace", "duration", "user"],
  "sortBy" : {
     "fieldName" : "duration",
     "order" : "DESCENDING"
  },
  "filters" : [
    {
      "fieldName" : "namespace",
      "whitelist" : ["mao", "default"]
    },
    {
      "fieldName" : "user",
      "whitelist" : ["Lea", "Ajai"]
    },
    {
      "fieldName" : "status",
      "blacklist" : ["STARTING", "SUSPENDED"]
    },
    {
      "fieldName" : "duration",
      "range" : {
        "max" : 30
      }
    }
  ]
}


200 - On successfully starting a new report generation job or a report generation is in progress, or the requested report already exists

400 - Invalid time range, invalid query, or nonexistent namespace in the request

Return a JSON object

{ 
  "id":"<id>"
}


The "id" is a unique and deterministic identifier to check the status of the report generation job and to retrieve the report result.

/v3/reports/info?reportId=<report-id> (or) share-id=<share-id>GETRequest to get meta info and status about a report

Query Parameter:

report-Id - id of the report

share-id - the shareid granted by the owner of this report if it exists

Either of the reportId and shareId is expected. If none or both specified a BadRequestException will be thrown.

reportId will be used in adjunction with user information header to identify the report.

while shareId will be decoded to find out the owner and reportId information and the decoded information will be used to identify the report.

200 - On successfully returning the result

404 - When the given report does not exist

400 - Bad Request - missing reportId and shareId.

 A JSON object including the creation time of the report, status, name, description (optional), the error that caused the report generation failure (optional, present only when the report generation failed), report generation request, summary of the report (optional, present if the report generation succeeds:

{ 
  "created" : <time created>,
  "status" : "<RUNNING, COMPLETED, or FAILED>",
  "name" : "<name>",
  "description" : "<description>",
  "expiry" : "<expiry>",
  "error" : "<error>",
  "request" : {
    "start" : <strat>, 
    "end" : <end>,
    "fields" : [...],
     ...
  }
  "summary" : {
    "start": <start>,
    "end": <end>,
    "namespaces":[ 
      {
        "namespace" : "<namespace>,
        "runs" : <runs>
       },
       ...
     ],
    "artifacts" : [
      {
        "scope":"<artifactScope>"
        "name":"<artifactName>",
        "version":"<artifactVersion>"
        "runs":<runs>
       },
      ...
     ],
    "durations" : {
      "min" : <min program run duration>,
      "max" : <max program run duration>,
      "average" : <average program run duration>
  },
    "starts" : {
      "newest" : <time in secs of the newest started program run>,
      "oldest" : <time in secs of the oldest started program run>
      },
    "owners" : [
      {
        "user" : "<user>", 
        "runs" : <runs>
      },
      ...
    ],
    "startMethods" : [
      {
         "method" : "<MANUAL, TIME, or PROGRAM_STATUS>",
         "runs" : <runs>
      },
     ...
     ]
  }
}
/v3/reports/<report-id>/savePOSTRequest to save the report with a given name and description
{
  "name":"<name>",
  "description":"<description>"
}

200 - On successfully updating the report

400 - Changing saved report to be expiring, invalid name or timeout, or other mistakes in the request

404 - When the given report does not exist



/v3/reports/download?offset=<offset>&limit=<limit>&report-id=<reportId> (or) share-id=<share-id>GET

Request to read a program runs report starting at the given offset and with at most the number of records specified by the limit

Query Parameters:

offset - the offset to start read the report from

limit - the max number of runs returned

report-id - id of the report

share-id - the share-id granted by the owner of this report if it exists

Either of the reportId and shareId is expected. If none or both specified a BadRequestException will be thrown.

report-id will be used in adjunction with user information header to identify the report.

while shareId will be decoded to find out the owner and reportId information and the decoded information will be used to identify the report.

200 - On successfully returning the details of a completed report

202 - On the report is still being generated

400 - Invalid offset, limit in the request, or the report generation has failed for the given report ID

404 - When the given report does not exist

A JSON format of the records:

{ 
  "offset" : <offset>,
  "limit" : <limit>,
  "total" : <total>,
  "details" : [{
    "namespace" : "default",
    "artifact": {
      "scope" : "<scope>",
      "name" : "<name>",
      "version" : "<version>"
    },
    "application" : "<application>"
    "program" : "<program>",
    "duration" : <duration>,
    "user" : "<user>"
   },
  ...
  ]
}
/v3/reports/<report-id>DELETEDelete the given report

200 - On successfully deleting the report

404 - When the given report is not available


/v3/reports/<report-id>/sharePOST

Request for a permalink to be shared other users to read the report.


200 - On successfully generating the permalink

404 - When the given report is not available

A JSON object containing the permalink

{ 
  "shareid": "<shareid>"
}
/v3/dashboard?start=<start>&duration=<duration>&namespace=<namespace1>&namespace=<namespace2>GET

Request for the program runs within the given time range.

Query Parameters:

start - the start time in seconds to get the dashboard view

duration - the duration in seconds the dashboard view (either 3600 or 86400)

namespace - the namespace to be included in the dashboard view (there can be more than one)


200 - On successfully returning the result

400 - Invalid time range in the request

A JSON object containing a list of the program runs details within the querying time range:

[{
    "namespace":"<namespace>", 
    "artifact": {
      "scope" : "<scope>",
      "name" : "<name>",
      "version" : "<version>"
    },
    "application": {
      "name" : "<name>",
      "version" : "<version>"
    },
    "type":"<type>",
    "program":"<program>",
    "run" : "<run>",
    "user": "<user>",
    "startMethod" : "<manual, scheduled or triggered>",
    "start": "<start>",
    "running": "<running>",
    "end":"<end>",
    "status":"<status>"
  },
  ...]

For program runs scheduled in the future, there will be no "start", "running", "end", and "status" fields.

Test Scenarios

Test IDTest DescriptionExpected Results












Releases

Release 5.0


Next Steps

  1. Alternative of adding parentArtifact in the message: lookup the artifactId per application
  2. Spark vs local process
  3. Conduct experiments with various query scenarios to determine query time with Parquet and Avro.
  4. Depending on the experiment result, determine the design:
    1. If Parquet is significantly faster than Avro (say 1 minutes vs 10 minutes), we need to write meta record files in batch in order to leverage the performance advantage of Parquet. One possible implementation is to keep a long running Spark program with HTTP handler, and let it periodically process and persist records in batch. When report generation request arrives, if record files do not contain the later records requested in the query time range, Spark will directly read from TMS HBase table.
    2. If Avro doesn't have performance issue, we can just keep appending Avro files without worrying about batch writing.
  5. Need to determine the contract of report existence and report generation status solely based on files. For instance the presence of __SUCCESS file means a job succeeded.


Future Work

#1/v3/namespaces/<namespace-id>/runs?statuses=<statuses>&start=<startTs>&end=<endTs>&limit=<limit>GETReturns the RunRecord's under a given namespace within the given time range. Statuses are comma separated ProgramRunStatus, defaults to ALL if left empty.

200 - On success

500 - Any internal errors

RunRecord's








 Report sharing: The report owner can choose to share the report with others or revoke the privilege of others to view the report. If a user only has READ privilege on the report, which is granted by the original owner, this user cannot share the report with others. REST API's are listed below.

/v3/namespaces/<namespace-id>/report/<report-id>/privileges/grantPOST

Grant another user with the privilege to read the report


200 - On success

403 - Current user is not authorized to grant privilege for this report

404 - When the given report is not available

409 - Privilege is already granted

 
{
  "entity": {
    "namespace": "<namespace>",
    "entity": "REPORT",
    "id" : "<report-id>"
  },
  "principal": {
    "name": "admin",
    "type": "ROLE"
  },
  "actions": [
    "READ"
  ]
}
/v3/namespaces/<namespace-id>/report/<report-id>/privileges/revokePOST

Revoke another user's privilege to read the report


200 - On success

403 - Current user is not authorized to revoke privilege for this report

404 - When the given report is not available

409 - Privilege is already revoked

 
{
  "entity": {
    "namespace": "<namespace>",
    "entity": "REPORT",
    "id" : "<report-id>"
  },
  "principal": {
    "name": "admin",
    "type": "ROLE"
  },
  "actions": [
    "READ"
  ]
}




Created in 2020 by Google Inc.