Salt Metrics Tables
Checklist
- User Stories Documented
- User Stories Reviewed
- Design Reviewed
- APIs reviewed
- Release priorities assigned
- Test cases reviewed
- Blog post
Introduction
Metrics are written to the metrics table with a row key that is composed from all the dimensions in the metrics context and the metric name. The metric context contains the instance id of the container, which, in a MapReduce job, is the task ID of the mapper or reducer. A large MapReduce that runs 1000 mappers and emits 10 different metrics would therefore write to 10000 rows every second. Because the metrics table is not salted or otherwise spread out, all these writes (having the same prefix of namespace, app, program id, run id) go to the same region. This region goes under very heavy load, which can impact performance of other regions served on the same region server.
Goals
The goal is to salt following metrics tables to avoid hot spotting.
- cdap_system:metrics.v2.table.ts.1
- cdap_system:metrics.v2.table.ts.2147483647
- cdap_system:metrics.v2.table.ts.3600
- cdap_system:metrics.v2.table.ts.60
User Stories
- CDAP system should be able to salt metrics tables so that all the writes does not go to same region.
Design
In order to salt HBase metrics tables, we can create a new salted metrics tables:
- cdap_system:metrics.v3.table.ts.1
- cdap_system:metrics.v3.table.ts.2147483647
- cdap_system:metrics.v3.table.ts.3600
- cdap_system:metrics.v3.table.ts.60
Tables that do not require salting:
- cdap_system:metrics.kafka.meta
- cdap_system:metrics.v2.entity
Once new salted tables are created, all the writes will go to new salted tables only. However, implementation of queries becomes little bit tricky because:
- There can be some in-flight data in the transport after new tables are created. This means that the same rows can get split between old and new tables. So when we apply scan we need to merge data from both the tables.
- Now for scan, data can be present in both old and new tables. This means we have to find out the type of the metric (Gauge or Counter) and either increment value of the cell (counter) or return the latest cell value (gauge).
For queries, we propose to read from both old and new tables. Now when we scan both the tables, we will need to differentiate between metrics types, i.e. Gauge or Counter, in order to merge and return the results. That means for Gauge type of metrics we will return the latest timeseries value and for Counter type we will aggregate the results. To distinguish the type, we can use HBase tags for cells. However, tags are only supported from HBase 0.98 onwards. This means with this approach for lower HBase versions metrics may not be accurate after the upgrade.
Migration
Metrics Tables with resolution 1s, 1m and 1h have certain retention duration which can be configured from cdap-site.xml. Now, once data in these tables gets expired, a background thread running in metrics processor can delete these tables. However, totals resolution table retains metrics for Integer.MAX_VALUE time. This table we can copy each row from unsalted table to new table with additional column `u`. This additional column will be used to figure out if the row has been copied.
Implementation
Implement new CombinedHBaseMetricsTable to apply merging of rows from unsalted and slated tables.
public class CombinedHBaseMetricsTable implements MetricsTable { private final MetricsTable unsaltedHBaseTable; private final MetricsTable saltedHBaseTable; @Nullable @Override public byte[] get(byte[] row, byte[] column) { // read from old and new tables return new byte[0]; } @Override public void put(SortedMap<byte[], ? extends SortedMap<byte[], Long>> updates) { // write only to saltedHBaseTable } @Override public void putBytes(SortedMap<byte[], ? extends SortedMap<byte[], byte[]>> updates) { // write only to saltedHBaseTable } @Override public boolean swap(byte[] row, byte[] column, byte[] oldValue, byte[] newValue) { return false; } @Override public void increment(byte[] row, Map<byte[], Long> increments) { } @Override public void increment(NavigableMap<byte[], NavigableMap<byte[], Long>> updates) { // write only to saltedHBaseTable } @Override public long incrementAndGet(byte[] row, byte[] column, long delta) { return 0; } @Override public void delete(byte[] row, byte[][] columns) { // delete records from both the tables } @Override public Scanner rowScan(@Nullable byte[] start, @Nullable byte[] stop, @Nullable FuzzyRowFilter filter) { // scan and merge records from both the tables return null; } @Override public void close() throws IOException { } }
Approach
Approach #1
Approach #2
API changes
New Programmatic APIs
New Java APIs introduced (both user facing and internal)
Deprecated Programmatic APIs
New REST APIs
Path | Method | Description | Response Code | Response |
---|---|---|---|---|
/v3/apps/<app-id> | GET | Returns the application spec for a given application | 200 - On success 404 - When application is not available 500 - Any internal errors |
|
Deprecated REST API
Path | Method | Description |
---|---|---|
/v3/apps/<app-id> | GET | Returns the application spec for a given application |
CLI Impact or Changes
- Impact #1
- Impact #2
- Impact #3
UI Impact or Changes
- Impact #1
- Impact #2
- Impact #3
Security Impact
What's the impact on Authorization and how does the design take care of this aspect
Impact on Infrastructure Outages
System behavior (if applicable - document impact on downstream [ YARN, HBase etc ] component failures) and how does the design take care of these aspect
Test Scenarios
Test ID | Test Description | Expected Results |
---|---|---|
Releases
Release X.Y.Z
Release X.Y.Z
Related Work
- Work #1
- Work #2
- Work #3
Future work
Created in 2020 by Google Inc.