Messaging

Introduction

CDAP has many use cases for using a reliable, transactional messaging system for communication between different components. Currently we use Kafka as an interim solution for some of these use cases. But Kafka has a few shortcomings: it is not transactional (that is, we cannot ensure that a state transition and the notification about that state change happen consistently); and it is not durable (that is, if we lose a file system, we may lose messages). This document outlines use cases and requirements for the messaging system, and discusses the design of the implementation. 

Use Cases

  1. The System (Dataset Framework) wants to publish events about programs accessing datasets. Consumers for these events are the lineage writer, usage registry, and audit trail. 
  2. The System wants to publish events about meta data changes for entities. These events will be consumed by the Tracker extension. 
  3. The System (Scheduler) wants to trigger runs of programs (workflows) based on events; for example, availability of data, status of another program, external events, etc.
  4. The System (App Fabric) wants to emit notifications about the state of running programs (or their completion). This may include changes to the workflow token as execution of a workflow progresses.
  5. The System wants to notify programs about a change in their configuration. 
  6. The User (Developer, DevOps) wants to receive notifications about events in the system, such as meta data changes, program state transitions, data availability, etc. 
  7. The User (DevOps) wants to trigger programs based on custom notifications.
  8. The User (Developer, DevOps) wants to send control messages to all containers of a program or an app. 

The following are explicitly not intended as use cases:

  • publishing logs
  • emitting metrics
  • transporting data (especially row/cell level data: the volume is too much)

Requirements

  1. Throughput should be relatively low, in the thousands of messages per second at most
  2. Throughput can scale linearly with resources available for the messaging system
  3. Latency should be near real time, ideally sub-second. However, this is impacted by event ordering (see below for discussion)
  4. Messaging system must be capable of transactional operations, configurable at the topic level
    1. In the context of a larger transaction: for state change, we need to ensure that either both or none of the state change and the corresponding notification happen
    2. In a transaction by itself: for cases such as audit logging, we need mostly the C and D from ACID: consistent and durable
    3. Non-transactionally: for some use cases, performance tops reliability, and messages would be sent asynchronously, possibly with retry, but dropped in case of failure to send
  5. Publishing must be reliable and durable: if a message is published and the messaging system responds with OK, then the message is guaranteed to be persisted and eventually available to all consumers
  6. Publishing must be possible both synchronously and asynchronously, although that can be a client-side option.
  7. Publishing must be available as a service, via an RPC call. The protocol must support the TransactionAware interface.
  8. The messaging system must preserve the order in which messages are sent for consumers
    1. Order must be preserved within a topic, but need not be guaranteed across topics
    2. All consumers must see the same message ordering, regardless of when the consumption happen
  9. The messaging system must provide visibility into its state
    1. Metrics about queue size, throughput, latency, ...
    2. Ability to inspect the content of a queue without consuming it (this is mostly for recovery from problems)
  10. The message queue must be addressable by time. That is, a consumer must be able to consume starting with messages that were sent at a given time.
  11. Messages must expire after a configurable time-to-live
    1. Configurable at the topic level
    2. Overridable for each message with a shorter life time than configured for the topic
  12. The messaging system must be as self-contained as possible
    1. It must not rely on the Dataset Service. This is to ensure that messages can be transported even in case of a DS Service outage.
    2. Non-transactional publishing must be available even when the Transaction Service is down
  13. Consumption of topics is not partitioned. That is, every consumer will see all messages in a topic. 

Discussion

Latency vs. Ordering

Our goal is to reduce the latency of message delivery to a minimum (as long as we can assume that the consumers are able to keep up with the rate of incoming messages). However, if we want to preserve the order of messages, that can impact latency. The reason is that some messages may be enqueued but not immediately visible, because their transaction has not committed yet. That means that other messages that were published later may become visible first.

In response, we can either:

  • Block those later messages from consumption until all earlier messages have become visible. That would impact the latency for the later messages (which are already available but can't be consumed).
    or
  • Let the consumer retrieve the later messages, and deliver the earlier one(s) as soon as they becomes visible. That, however, would impact the order of consumption. 

For easy consumption and consistency between consumers, we choose ordering over latency. With proper implementation (discussed below), latency can be minimize while maintaining proper ordering.

Design

Before going deep into the details, these are the key principles/features of the messaging system, based on the use cases and requirements described above:

  • Support both transactional and non-transactional message publishing
  • Support both transactional and non-transactional message consumption
  • Messages are ordered by publish time
    • This helps address messages by timestamp
  • Maintain the same message ordering for all consumers
    • Consistent message ordering is the key to reprocess messages safely
    • Easy to write a tool to inspect what's inside the queue
  • Each message has a unique message ID. The message IDs should be sortable in the same order as the message ordering as seen by the consumers.
  • Latency must be as low as possible.
    • For messages published non-transactionally, they should be available for consumption right after they are persisted in the messaging system
    • For messages published transactionally, they will be persisted to the message system as close as possible to the transaction commit time
      • Use either client side buffering or an auxiliary payload table to minimize the "blocking" effect of a non-committed transaction. See later sections for the detailed design.
  • Consumers poll for new messages instead of pushing from the server
    • This gives consumer more control
  • Messages are immutable once they get "accepted" by the messaging system
    • For non-transactional message, "accepted" means the message is persisted
    • For transactional messages, "accepted" means the transaction of that message is committed
    • Ordering is immutable as well
  • A centralized messaging service will be provided and all messaging clients will only interact with the messaging service through RESTful APIs
    • Multiple instances of the messaging service can be up and running at the same time for scalability
  • The messaging system should have minimal dependencies on other CDAP services, otherwise we cannot use it as a way to publish error status in case something has failed.
    • Shouldn't depend on the Transaction Service
    • Shouldn't depend on the Dataset Service
    • Failure to emit log or metrics shouldn't be a failure for the messaging operations
      • Meaning no hard dependency on Kafka

Message Table

The Message Table provides ordering of messages, and entries in this table are immutable. Immutability is the key to provide consistent ordering across consumers. Also, immutable means entries can be cached for multiple consumers to consume without consulting the backing storage until the cache runs out. This provides an easy way to boost performance.

In Standalone, the Message Table is backed by a non-transactional LevelDB table. In distributed mode, it will be backed by a non-transactional HBase table.

Row Key Format

concat(topic, generation, publish_timestamp, seq_id)

  • topic
    • String representation of the topic name
  • generation
    • 4 bytes int representing the generation of this topic. Generation increase by one when at topic creation time.
  • publish_timestamp
    • 8 bytes time in milliseconds
  • seq_id
    • 2 bytes sequence ID. It is for uniquely identifying messages published in the same millisecond.
    • The 2 bytes gives us 64M messages per second per topic, which is well over the limit we need

Columns Format

Each row in the Message Table can have up to two columns and should have at least one column.

Column NameDescription
t

If present, stores the transaction write pointer (8 bytes) of the transaction that was used for the message publishing.

Only messages published transactionally have this column.

p

If present, stores the message payload. If it is missing, the t column should exist and the transaction write pointer

will be used to scan the Payload Table (describe below) to fetch messages published within that transaction.

Payload Table

The Payload Table stores the message payload for messages published transactionally. Not all transactional message payloads are stored in this table; only for the ones requested by the client. Typically, this table can be used to store the payload of messages published from long-running transaction to minimize consumer latency. The details of the protocol will be discussed below.

In Standalone, the Payload Table is backed by a non-transactional LevelDB table. In distributed mode, it will be backed by a non-transactional HBase table.

Row Key Format

concat(topic, generation, transaction_write_pointer, write_timestamp, p_seq_id)

  • topic
    • String representation of the topic name
  • generation
    • 4 bytes int representing the generation of this topic. Generation increase by one when at topic creation time.
  • transaction_write_pointer
    • 8 bytes transaction write pointer of the transaction that the message was published from
  • write_timestamp
    • 8 bytes time in milliseconds of the time when the message was persisted
    • This is for ordering messages published in the same transaction
  • p_seq_id
    • 2 bytes sequence ID. It is for uniquely identifying messages published in the same millisecond.

Column Format

Column NameDescription
pStores the message payload

Message Identifier

With the row key format described above, each message in a topic is uniquely identified by a message ID in this format:

concat(publish_timestamp, seq_id, write_timestamp, p_seq_id)

For messages that don't store the payload in the Payload table, the write_timestamp and p_seq_id are both "{{0}}".

The message ID are also sortable by lexicographical order, which gives the ordering of messages as seen by any consumer.

Publishing

In this section, we will discuss the protocol for publishing messages, both transactionally and non-transactionally.

Non-transactional

For non-transactional publishing, it is relatively straightforward.

  1. Create the row key for the Message Table, using the topic, publish timestamp (same as current timestamp) and sequence id
    1. The sequence id can easily be generated by memorizing the last publish timestamp in the messaging service process
    2. Batching of concurrent requests to the same topic can use the same publish timestamp with an incremental sequence id
  2. Write the row with the payload stored under the payload column (the p column)
    1. If there is batching, then multiple row keys will be generated in step 1 and multiple rows will be inserted as a batch operation to the backing store

Transactional

For transactional publishing, it is broken down into two different cases in order to provide better latency and throughput under different usage situations

Message Write Time is Close to Transaction Commit Time

This is mainly the case for messages published from a transaction used in the same JVM process. For example, Worker, Flowlet, Service, and Lifecycle methods of MR and Spark. All messages published will be buffered on the client side and only write to the messaging system when the transaction is ready to commit.

The mechanism on the messaging service side is pretty much the same as the non-transactional some, with the addition of the transaction write pointer being recorded in the Message Table as well.

  1. Create the row key for the Message Table, using the topic, publish timestamp (same as current timestamp) and sequence id
  2. Write the row with the payload stored under the payload column (the p column) and the transaction write pointer under the transaction column (the t column)

Message Write Time is Not Close to Transaction Commit Time

This includes messages published from MR tasks, Spark driver and executors, as well as many of the messages being published from the same JVM process such that it exceeds the client side buffer (this should be a very rare case).

Instead of writing messages directly to the Message Table, the payloads are written to the Payload Table as the messages are published. At the transaction commit time, an entry will be inserted in the Message Table, which serves the purpose of inserting all messages published in the same transaction at that time.

  1. On message publish, create the row key for the Payload Table, using the topic, transaction write pointer, current timestamp and sequence id
    1. Sequence id is generated similar to that described above
    2. Batch inserts can also be done as described above
  2. At transaction commit time, which is notified by the client, create the row key for the Message Table, using the topic, publish timestamp (same as current timestamp) and sequence id
  3. Write the row with the transaction write pointer stored under the transaction column (the t column)

Rollback of Published Message

The publishing client is responsible for telling the messaging system to rollback published messages (by Message IDs). To rollback, simply mark the corresponding entries in the Message Table as invalid (invert the sign of Transaction write ptr). We don't have to do anything to the entries in the payload table (and let the TTL of the topic delete the entries). This is because we can simply skip reading entries in message table while transactionally reading messages and thus we won't touch the payload table. This gives us couple of benefits - i) rollback will be faster since we don't have to delete entries in payload table ii) non-transactionally consuming messages will give us the repeatable results back as long as the entries have not been removed by the expiration of TTL.

Consuming

The general principle of consuming messages is to scan the Message Table in ascending order. When it encounters an entry with the payload stored in the Payload Table, scan the Payload Table using the transaction write pointer as the prefix.

Non-transactional

This is the low latency choice since it will never get blocked by uncommitted messages. However, the side effect of that is it will see uncommitted messages. In case the uncommitted messages are rollbacked, a non-transactional consumer will see more messages than the transactional consumer, although the ordering of committed messages will remain the same.

  1. Scan the Message Table, with an optional start Message ID or timestamp, which can either be inclusive or exclusive
  2. Depending on whether the row has the payload column (the p column), the handling is different:
    1. With the payload column (the p column)
      1. The row represents a message and the payload is the value in the p column
      2. Message ID is generated from the row key as concat(publish_time, seq_id, 0L, 0)
    2. Without the payload column (the p column): the transaction column (the t column) must exist
      1. Scan the Payload Table with prefix concat(topic, transaction_write_pointer), where the transaction_write_pointer is the value of the t column in the Message Table
      2. Each row encountered during the scanning is a message and the payload is the value in the p column
      3. Message ID is generated from the row key in the Message Table and the row key in the Payload Table as concat(publish_time, seq_id, write_timestamp, p_seq_id)

Transactional

Transactional consumption basically follow the same procedures as the non-transactional one, with the addition that it will stop at the first uncommitted message when scanning the Message Table. The transaction information comes from the client and it is the client's responsibility to open a new transaction in order to get a new snapshot of committed messages in the messaging system. This will increase the latency of message consumption, but with the technique described above for message publishing, this latency should be minimal; in the range of less than a second.

Message Retention

All published message will be stored in the messaging system until it expires, which is a property defined by topic. Since we use one Message Table for all topics, we cannot use the simple TTL mechanism as provide by HBase. The support of message retention can be broken down into two parts:

  1. Message consumption
    1. During message consumption, we can simply apply a lower bound on the scan start row based on the TTL setting of the topic being consumed from
  2. Message cleanup
    1. In local mode, LevelDB is used as the backing store. The messaging system will have a cleanup thread up and running and periodically scan and delete old entries based on the TTL setting of each topic.
    2. In distributed mode, since HBase is used as the backing store, the best way to do cleanup is to use a coprocessor that drops expired cells at flush/compaction time. The TTL setting can be made available to the coprocessor through the table attribute.

Implementation Considerations

  • To avoid scanning the HBase too much, the messaging service should cache recent scans in memory. This is based on the assumption that consumers shouldn't be falling too far behind, which is one of the reason why we want a messaging system to provide faster reaction time.
  • Batch write should be used to give higher throughput and lower latency for concurrent publishing
    • Refer to ConcurrentStreamWriter on the implementation
  • When doing transactional publishing, it it important to have the messages write to the messaging system as the last "persist" operation (i.e. it persists after all datasets writes), meaning it has to be the last TransactionAware to be used in the transaction context.
    • This help lower the latency of transactional consumers, since the latency is basically the time between message persist and commit to Apache Tephra
  • Multiple instances of messaging service instances (not part of the initial scope)
    • When we reached the point where we need to scale the messaging service to multiple instances, there are couple choices on how it could look like
      1. All instances are equal. This mean all instances can handle publishing and consumption from all topics
        1. Different instances need to generate different seq_id for the same timestamp to avoid key collision. We potentially can use the instance ID as the first byte in the sequence id.
        1. (Pro) Easy to implement as there is no coordination needed
        2. (Con) The consumer scan cache can take up a lot of space because all instances would be caching for all topics
      2. Perform resource assignment through the ResourceCoordinator such that a given topic would only be handled by a sub-set of instances
        1. Still need to handle sequence id generation such as to avoid key collision
        2. (Pro) Most flexible to have a trade between scalablity vs resource usage (memory cache) and can be tuned per topic based on traffic
        3. (Con) Implementation can be complicated as it involves
          1. Usage of ResourceCoordinator
          2. Usage of DistributedLock for the topic transition to have a guaranteed leader set for a topic
          3. Traffic forwarding and retry are needed during topic transition from one instance to another
  • Removing entries with invalid transaction in the MessageTable is also needed. Refer to the Tephra implementation on how this should be done.

Security

For authentication, it is pretty much the same as all other intra-CDAP RESTful calls. For authorization, we can do it at the topic level.

Tables used for store

  • MessageTable
  • PayloadTable
  • MetadataTable 

APIs

RESTful

Base URL: /v1/namespaces/<ns-id> (since REST API is internal)

Create Topic

  • Request method and URI
    • PUT [base_url]/topics/[topic]
  • Request body
    • Can be empty
    • If provided, it is a JSON object containing topic properties
      • e.g. {"ttl" : [ttl-in-seconds]}
  • Response
    • 200 OK if the topic was created successfully
    • 409 CONFLICT if a topic of the same name exists already
    • 400 BAD REQUEST if the given TTL is invalid

Update Topic

  • Request method and URI
    • PUT [base_url]/topics/[topic]/properties
  • Request body
    • JSON object containing topic properties. Note that this call will replace all the existing properties.
      • e.g. {"ttl" : [ttl-in-seconds]}
  • Response
    • 200 OK if the topic properties were updated successfully
    • 404 NOT FOUND if the topic is not present
    • 400 BAD REQUEST if the properties were not correct

Get Topic Properties

  • Request method and URI
    • GET [base_url]/topics/[topic]
  • Request body
    • JSON object containing the topic name and its properties.
      • e.g. {"name" : "topic1", "properties" : { "ttl" : "123231" } }
  • Response
    • 200 OK
    • 404 NOT FOUND if the topic is not present

List Topics

  • Request method and URI
    • GET [base_url]/topics
  • Request body
    • JSON object containing a list of topic name under the namespace
      • e.g. [ "topic1", "topic2" ]
  • Response
    • 200 OK

Delete Topic

  • Request method and URI
    • DELETE [base_url]/topics/[topic]
  • Response
    • 200 OK if the topic was deleted successfully
    • 404 NOT FOUND if the topic is not present

Publish Message

There are two separate endpoints to support different publishing cases as described above. They both share the same request format.

  • Request Body Schema

    {
      "type" : "record", 
      "name" : "PublishRequest",
      "fields" : [
        { "name" : "transactionWritePointer", "type" : [ "long", "null" ] },
        { "name" : "messages", "type" : { "type" : "array", "items" : "bytes" } }
      ]
    }
  • Request body
    • Can be Avro binary or JSON, based on the request - Content-Type: application/json or avro/binary
    • Schema Fields:

      1. messages - Contains an array of byte arrays that correspond to messages

      2. transactionWritePointer - Corresponds to a transaction write pointer.

Store Messages to Payload Table 
  • Request method and URI

    • POST [base_url]/topics/[topic]/store
  • Request body should contain transactionWritePointer and messages

  • Response:
    404 NOT FOUND if the topic is not present
    200 OK if message is persisted
Publish Messages to Message Table 
  • Request method and URI
    • POST [base_url]/topics/[topic]/publish
  • Request
    • Request can optionally contain transactionWritePointer. If store calls were made previously with the same transactionWritePointer, the messages array should be empty.

    • This call can be preceded by multiple calls to the 'store' endpoint. If it is preceded by calls to store with same transactionWritePointer, then this endpoint should be called with an empty messages field. If it is not preceded by store calls with same transactionWritePointer, then this call will store the messages to the MessageTable
      directly.
    • If the call does not contain a transactionWritePointer, then the messages are stored non-transactionally (ie, without a tx write ptr).
  • Response

    404 NOT FOUND if topic is not present
    200 OK if messages are persisted
    400 BAD REQUEST if the {{transactionWritePointer}} is null and the messages field is empty. Messages field can be empty if the transactionWritePointer is provided

    Response Body:

    • If the publish is non-transactional, the response body is empty

    • If the publish is transactional, the response body will contains the information needed for rollback with the following schema

      {
        "type" : "record",
        "name" : "PublishResponse",
        "fields" : [
          { "name" : "transactionWritePointer", "type" : [ "long", "null" ] },
          { "name" : "startTimestamp", "type" : "long" },
      
          { "name" : "startSequenceId", "type" : "int" },
          { "name" : "endTimestamp", "type" : "long" },
          { "name" : "endSequenceId", "type" : "int" }
        ]
      }
    • The client shouldn't need to parse the response body, but rather treats it as opaque bytes. On rollback, simply use it as the request body.

Rollback Transactionally published messages 

  • Request method and URI

    • POST [base_url]/topics/[topic]/rollback
  • Request Body

    • Use the response body as is from the publish call above
  • Response:

    404 NOT FOUND if topic is not present
    200 OK if messages are rolled back

Consume Message

  • Request method and URI
    • POST [base_url]/topics/[topic]/poll
  • Request Schema

    {
      "type" : "record",
      "name" : "ConsumeRequest",
      "fields" : [
        { "name" : "startFrom", "type" : [ "bytes", "long", "null" ] },
        { "name" : "inclusive", "type" : "boolean", "default" : true },
        { "name" : "limit", "type" : [ "int", "null" ] },
        { "name" : "transaction", "type" : [ "bytes", "null" ] }
      ]
    }
  • Response Schema (will be in JSON or Avro binary based on the request Content-Type)

    {
      "type" : "array", 
      "items" : {
        "type" : "record", 
        "name": "Message", 
        "fields": [ 
          { "name" : "id", "type" : "bytes" }, 
          { "name" : "payload", "type" : "bytes" }
        ] 
      }
    }
  • Request body
    • Can be Avro binary or JSON, based on the request - Content-Type: application/json or avro/binary
    • Schema Fields:
      i) startFrom - can be bytes in which case it is considered as messageId, can be long in which case it is considered as timestamp
      ii) inclusive - boolean field that says whether the messageId/timestamp should be inclusive
      iii) limit - max number of responses to return [an hard limit will be set by the TransactionServer which will be a cConf property]
      iv) transaction -  serialized bytes (TransactionCodec) of the transaction object

  • Response
    404 NOT FOUND if topic is not present
    200 OK Response body contains an Avro binary or JSON object based on the request Content-Type with the response schema

Data Cleanup In Tables

Various cleanup of data needs to be performed. And the clean up strategy varies based on the underlying storage - LevelDB vs HBase. The matrix below describes the combinations and types of cleanup we need to do and how it is going to be performed.


HBaseLevelDB
Cleanup StrategyCoprocessors attached to tables (one for messageTable and another for payloadTable). No cleanup is required for MetadataTable.Periodically scheduled thread that cleans up data in MessageTable and PayloadTable. Frequency of schedule is configurable via cdap-site.xml.
TTL Expiration (MessageTable and Payload Table)Get TableId (namespace, topic) from rowKey and get the TopicMetadata from the MetadataTable (cached). From the cell rowkey, get the timestamp of write and use the TTL and determine if that cell needs to be skipped.Scan the tables, topic wise and remove rows that have exceeded TTL based on TopicMetadata info.
Older Generation (MessageTable and PayloadTable)Check the generation id of the row and compare it with the one we get from MetadataTable. If it is the current generation, then do nothing. If it is an older generation (gen < abs(currentgen) || gen == -1*currentgen), then skip the cell.Same logic as in HBase. While pruningMessages, scan the should start with generation '1' of that topic.
Invalid Transactions (MessageTable only)This requires (periodically refreshed) tx.snapshot in the Coprocessor. If the cell belongs to a TX_COL column, then get the tx id from it. If the transaction id is present in the invalid list (from the tx.snapshot), then invert the sign (-1 * tx_id) and put back the value. This way, the data is still visible to non-tx consumption and will be eventually cleared by TTL or when the topic is deleted.Not necessary in LevelDB since we don't support pruning invalid transactions in SDK!
Latest min tx timestamp (for manual invalid TX pruning, required only for MessageTable)The last used tx.snapshot info can be directly used to prune the invalid transaction list! So we need to log that info. Also need to write a TMS table debugger tool that can print this info as well.


Updates: Please refer to  CDAP-17939 - Getting issue details... STATUS  for the new levelDB cleanup logic.


 

Created in 2020 by Google Inc.