Performance improvement for getting active program runs within a time range for Ops Dashboard

Overview

This design document focuses on possible performance improvement for getting program runs with RUNNING status ("active runs" for short) within a given time range, which is required for Ops Dashboard. There are two types of program runs that can be active runs within a given time range [begin, end):

  1. Runs with RUNNING status at the time when the query is made with (program_start_time < end)

  2. Runs that are stopped with COMPLETED, FAILED, or KILLEd status, with (program_start_time < end && program_stop_time >= begin)

    a. 
           query_begin_time        query_end_time
                  |                       |
    ------------------------------------------------------------> t
         |                    |
    program_start_time   program_stop_time
    
    
    b.
          query_begin_time        query_end_time
                  |                       |
    ------------------------------------------------------------> t
             |                                 |
       program_start_time             program_stop_time
    
    
    c.
          query_begin_time        query_end_time
                  |                       |
    ------------------------------------------------------------> t
                      |                |
              program_start_time  program_stop_time
    
    
    d.
          query_begin_time        query_end_time
                  |                       |
    ------------------------------------------------------------> t
                      |                            |
              program_start_time            program_stop_time

For the first case, since the number of runs with RUNNING status is limited, and the run records are prefixed with their statuses in AppMetadataStore, it will be trivial to get the RUNNING runs and filter the runs satisfying the condition (program_start_time < end). However, the number of stopped runs is much larger, so it is not feasible to get all the stopped runs and filter the runs. For cases 2a, 2c, and 2d, since the range of program_start_time or program_start_time are limited within [being, end), if scanning can be done based on the range of program_start_time or program_start_time, these cases will be trivial. For instance, if in the AppMetadataStore, RUNNING records are kept and the row keys contain the program_start_time, case 2c and 2d can be easily handled. However, in case 2b, where there's no stopped runs with program_start_time or program_stop_time between the query range [begin, end),  with the existing run records table, all the stopped runs must be scanned to filter out the stopped runs with (program_start_time < begin && program_end_time > end) Therefore, in this design, we will focus on the performance improvement of getting active runs with stopped status at the query time for case 2b.

Problem with the existing approach:

Currently, stopped program run records are persisted in AppMetadataStore with row keys consisting of program id parts and inverted start time:

runRecordCompleted|namespace|app|version|programtype|program|inverted start time|runid

Therefore, to get all the active runs within a given time range [begin, end), a full-table scan must be performed on AppMetadataStore.

User Stories

In no specific order...

IdDescriptionRequirements fulfilled
U1As a user, I should be able to get active runs from a certain namespace within a time range [start, end)R1

Requirements

IdRequirement
R1When looking up active runs within a given time range [begin, end) in a given namesapce, only the run records in the given namespace with (program_start_time < end && program_stop_time >= begin) will be returned.

Design

Approach 1: Stopped runs indexed by start time and stop time

      query_begin_time        query_end_time
              |                       |
------------------------------------------------------------> t
|---->                        <-------|
start row key                   stop row key

Instead of scanning for all stopped runs, one possible optimization is to index the program_start_time and then determine the range of program_start_time which includes the program_start_time of all active runs within the query time range as shown in the graph above. Because of the existence of case 2b mentioned in the overview, the lower bound of the program_start_time range cannot be directly determined. A naive way is to set the program_start_time range as [0, end) to include all stopped runs with program_start_time < end. An option of optimization will be discussed for getting the earliest program_start_time of the active runs within the query time range, and use this as the lower bound of the program_start_time range instead of 0. After getting the runs within the program_start_time range, they are further filtered by the condition program_stop_time > begin.

To achieve this goal, a new table will be added with the following row key structure:

Row key:

<namespace-id>
<start-time>
<stop-time>
<program-run-id>

where:

<namespace-id>: the namespace of the program run

<start-time>: start time in seconds

<stop-time>: stop time of the program run

<program-run-id>: namespace+app+program type+program+run

Value

program run status


Write path:

In DefaultStore, each time when a program run stops, the start time and the stop time of the program run will be recorded in the row key in the index table with row key <namespace-id>:<inverted-start-time>:<stop-time>:<program-run-id>

Read path:

Given a query for active runs from a given namespace <query-namespace> within the time range [start, end), the scanning will be started from the start row key:  <query-namespace>:0 since we don't know what is the earliest start time of the active runs within the given time range [start, end), and stop at the stop row key <query-namespace>:<end>. A possible optimization is discussed in the next section for getting a larger start row key with the earliest start time of the runs in a given time range. After getting the runs with program_start_time < end, a row key filter will be applied on these runs to get runs with program_stop_time > begin. Then corresponding run records will be read from the AppMetaStore with these run id's.

Optimization for getting earliest start time of the active runs within a time range

In order to get a start row key for the scanning mentioned above, for a given query time range [begin, end), we need to find out the earliest start time among the active runs within this time range.

Existing approach

LineageAdmin#doComputeLineage method is currently using a similar approach as described above: it first finds out all the active runs by calling within a given time range Store#getRunningInRange , then finds the earliest start time among these runs. However, the current implementation of AppMetadataStore#getRunningInRange performs a scan for all runs with each status prefix, and then filter the runs by their start time and stop time in their values. This is very inefficient and the process of getting the earliest start time is redundant since we already get the active runs within the time range.
Furthermore, assuming we can get the earliest start time of the active runs within the query time range efficiently, we still cannot use it efficiently. In AppMetadataStore, the row keys of stopped runs are formatted in the following way:
runRecordCompleted|namespace|app|version|programtype|program|inverted start time|runid
With the programId parts prefixed before the inverted start time, we cannot form a start key and stop key with the earliest start time and end time of the query time range.

Optimization

A table to lookup earliest start time (EarliestStartTimeTable)
Row key design:
<namespace>:<timestamp-rounded-to-hour>
Value:
earliest start time of the active runs at the given time in the row key.

Write path:
-----------
A service is scheduled to run at the beginning of every hour. It gets all active runs within each namespace and find out the earliest start time among those active runs in each namespace, and write to the table with the namespace and the current hour as row keys, and the earliest start time as the value. If at that time, no program is running, the time stamp of the hour will be written as the earliest start time, which guarantees that if there are any program runs start after the beginning of that hour, and stop before the next hour begins, we still have a lower bound of their earliest start time so that they will also be within the range of scanning.
Read path:
-----------
Given a query time range [begin, end), query with the begin time rounded to hour to get the earliest start time of the query time range. For instance, the query time range is 03/17/2017 7:30am - 8pm in namespace "ns2", the earliest start time will be retrieved from the row
ns2:<03/17/2017 7:00am in epoch time>

Limitation of the improvement: now there are two extra tables to maintain and this will be more complex and harder to maintain. In addition, if there is at least one long running program whose start time is really old, there won't be much improvement in the performance.

Approach 2: Stopped runs indexed by their active time (proposed by Shankar)

  query_begin_time                    query_end_time
        |                                    |
------------------------------------------------------------> t
  |       |       |       |       |       |       |       |
  t1      t2      t3      t4      t5      t6      t7      t8
run1    run1    run1
                run2    run2    run2   run3 run3
Another option for getting the active runs within a time range will be indexing the stopped runs by the time when they are RUNNING. Then a given a time range [begin, end), we can scan in the table starting from begin and stopping at end to get all the active runs within that time range. For instance, as shown in the graph above, stopped runs are written at the timestamps when they are RUNNING. For instance, run1 is RUNNING at t1, t2, and t3, it's written in the table with 3 records with timestamp t1, t2 and t3. When querying with the time range [query_begin_time, query_end_time), the scan will be done between the closest timestamps from t1 to t6 and get active runs run1 and run2.
Row-key design(ActiveProgramRunHistoryTable)

<namespace>:<timestamp-rounded-to-hour(alternatively, rounded to date)>:<invertedStartTime>:<stopTime>:<program-run>
Write Path:
-----------
When a program is stopped, we write multiple records to this table. Since we are only interested in at most last 7 days of information in Ops Dashboard, the max number of entries we write is 7 if the rounded time is in dates, or 24*7 = 168 if the rounded time is in hours. 
Example : Say program1 started at 7:05AM and stopped at 9:02 AM. With rounded time in hours, e write entries for timestamp 7AM8AM and 9AM. in those bucket ranges the program was running in some time interval. 
ns1:7am:<...>, ns1:8am:<...>, ns1:9am:<...>
If say a program ran for a month, started on 03/17/2017 7AM and stopped on 04/17/2017 at 5PM. Since the user queries will only be in the last week time range, we only need to write 7 entries to the table from 04/11/2017- 5PM till 04/17/2017 5PM. 
Read Path:
-----------
When a user queries for "Active runs" for the last 12 hours in namespace "query-namespace" . 
we construct the rowkey such as start key:
<query-namespace>:<user-query-start-ts-rounded-to-hour>
and stop key:
<query-namespace>:<user-query-stop-ts-rouded-to-hour> 
For instance, with query time range 03/17/2017 10:35am - 10:35pm and namespace "ns2", the start key will be:
ns2:<03/17/2017 10:00am in epoch time>
and stop key:
ns2:<03/17/2017 10:00pm in epoch time>
for the first and last bucket we have to filter further. In the first bucket to skip records that stopped before our start-timestamp and in the last bucket to skip records that started after our stop time-stamp. if there are any the middle buckets it can be safe to return them as is. After obtaining the run id's meeting the requirements, the corresponding run records are read from the AppMetadataStore.

Approach 3: Periodically emit metrics for running programs (proposed by Terence)

In the cloud environment, there is a requirement to show the node hour. This requirement can be satisfied by emitting heartbeat to TMS for active runs periodically. A subscriber will persist the messages to a table. Since the longest query time range for Ops dashboard will be 7 days, the table will have TTL a little longer than 7 days, such as one month. The content of the table can either include run ID's or copies of program run records. Saving copies of program run records can eliminate the need for a separate query for run records form AppMetadataStore, but with long running programs, this can occupy more spaces.

Row key design:

<timestamp-rounded-to-hour(alternatively, rounded to date)>:<invertedStartTime>:<stopTime>:<namespace>:<program-run>

The table will be salted to avoid hotspot in region server. 

The read path of this approach will be almost the same as in Approach 2, except the range of reading around the query time range will depend on the period of such metrics is emitted. For instance, if the metric is emitted every 30 minutes, and the query time range is 7:30am - 9pm, then scanning range will be 7:00am - 9am.


Performance Experiment with Approach 1 optimization VS. no optimization

Experiments were run to evaluate the performance of indexing run records compared to the existing brute force method of scanning the whole run record table:

Dataset: 400k run records with alternating 6 namespaces with start time ranging from 0 - 400k, and stop time ranging from 1 - 401k. Each run record's stop time is 1 second later than the start time. 

Hypothesis: With such dataset, no matter the number of the namespaces to query and the time range to query, the existing full-table scan on the store will have the same performance. On the other hand, with the index table, we would expect faster performance if there are fewer namespaces to query. Also with the time range [start, end),  the smaller is "end", the faster the performance will be. In the worst case senario, when all the 6 namespaces are queried and the time range has "end" = 400k, the performance of querying with the index table will be worse than the performance of the querying with full-table scan of the AppMetaStore. However, if "end" and the numebr of namespaces are smaller enough, the performance of querying with the index table will outperform full-table scan.

Result:

Performance of querying with full-table scan in AppMetaStore:

startendnumber of namespacestime (ms)
051

4996

053

7456

056

10457

200000

200005

1

5060

200000

200005

3

7246

200000

200005

6

10720

399995

400000

1

5006

399995

400000

3

7214

399995

400000

6

10577

Performance of querying with index table:

startendnumber of namespacestime (ms)
051

5792

053

7351

056

10576

200000

200005

1

5626

200000

200005

3

8760

200000

200005

6

13218

399995

400000

1

5700

399995

400000

3

9371

399995

400000

6

14079

Performance of querying with index table and improved fuzzy row key filter in AppMetadataStore:

startendnumber of namespacestime (ms)
051

3692

053

4071

056

4942

200000

200005

1

4875

200000

200005

3

5421

200000

200005

6

7800

399995

400000

1

4537

399995

400000

3

6000

399995

400000

6

8096

Performance of querying only ProgramRunId's with index table:

startendnumber of namespacestime (ms)
051

3325

053

3355

056

3406

200000

200005

1

3758

200000

200005

3

4362

200000

200005

6

5043

399995

400000

1

4200

399995

400000

3

5081

399995

400000

6

6226


Analysis:

The performance of querying with index table improves as the number of namespaces to be queried decreases (almost linearly) as expected in the hypothesis.

However, there's no significant improvement of the performance compared to using full-table scan in AppMetaStore, this is because it requires two lookups, first get ProgramRunId's which were running in the given time range from the index table, and then get run records from the AppMetaStore for these ProgramRunId's. If we only measure the performance of getting ProgramRunId's from the index table, the performance is aligned with the hypothesis: lookup time decreases as the "end" of query time range decreases.

Created in 2020 by Google Inc.