Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 27 Next »

Checklist

  • User Stories Documented
  • User Stories Reviewed
  • Design Reviewed
  • APIs reviewed
  • Release priorities assigned
  • Test cases reviewed
  • Blog post

Introduction 

The current scheduler runs in the master and is a (almost) purely cron-based Quartz scheduler. This imposes certain limitations that we wish to in a redesign of the CDAP scheduler. 

Goals

  • Scalability: Quartz can only schedule as many programs as it has threads. Each threads remains occupied (even though it is just waiting) while the workflow is executing. 
  • Reliability: When the master fails over, it may take a few seconds to a minute. During that time schedules can be missed.
  • Efficacy: Pure time-based scheduling only works well for data that arrives reliably and periodically. For any data that arrives at unknown times, the schedule has to trigger speculatively, and the workflow will do nothing if there is no data. This can be wasteful in terms of cluster resources.
  • Expressivity: Certain scheduling constraints can hardly be expressed in terms of only time. For example, a schedule that fires whenever at least 1GB of data is available, or at least 4 hours have passed since the last run.

User Stories 

  1. In a large cluster, hundreds to thousands of workflows are scheduled to process incoming data at fixed times. The scheduler needs to be scalable enough to fulfill these schedules reliably.
  2. The CDAP master fails over. No schedules should be skipped. 
  3. New data for a feed arrives once daily at unknown times. The SLA for processing is 3 hours after arrival. With cron-style scheduling, this  workflow must run every 3 hours, but 7 out of 8 times it will do nothing. Instead the workflow should only run when the data arrives. This would reduce the number of runs and the processing latency.
  4. Workflow B always reads the output of workflow A. Ideally, B should run immediately after A has successfully completed, and B should not run if A failed. 
  5. A data lake has thousands of feeds. Many of these feeds arrives rarely, and their SLAs are generous: They need to processed once a day at no particular time.  
  6. A data lake has a high load of feeds and processing, and at times it may not have enough resources to process all incoming data. Feeds can be marked with a priority, such that high priority workflows are always scheduled on time, whereas low-priority workflows can defer processing and catch up at times of lower load. 
  7. A data maintenance workflow for a feed needs to run once a day. It should run within a time window such as 10pm-6am. In some cases, it only needs to run if there is new data.
  8. A workflow consumes (joins) multiple datasets. Ideally, it should be scheduled to run after all of its inputs have received new data. However, if some of its inputs do not receive new data, the workflow should be scheduled after a time of waiting; it will then consume the existing data in these inputs. 
  9. A workflow processes a feed that arrives at unknown times. When that data arrives, it may arrive in chunks over a time window, or it may be known that it always arrives in N chunks. The workflow should be triggered by the new data, but only after all data has arrived.
  10. A workflow processes a feed that arrives frequently (for example, every second). The workflow should be triggered when data is available. However, it should not run every second, but always wait for enough data to accumulate, where "enough" needs to be quantified in some way (mega bytes, seconds elapsed, etc.) 
  11. A workflow needs to run at least every 4 hours. But if the data is bursty, it should also run sooner, as soon as 100GB of data have arrived. 

Design

Here we need to design 2 aspects:

a. Improving the scalability of the scheduler so that it can support thousands of concurrent schedules.

b. Improving the expressive power of schedules by adding event-based, conditional, and composite triggers.

Scalability

  • The Quartz scheduler is limited in scale because it requires an active thread per schedule; the thread blocks as long as the program triggered by the schedule is running. That limits the number of programs that can be scheduled and executed at the same time. A re-implementation of the time-based scheduler (be it without Quartz, or be it using Quartz in a different way) would greatly increase the number of schedules supported by a single instance of the  scheduler. 
  • Multiple instances of the scheduler can scale out linearly. This requires that instances do not interfere with each other, that is:
    • either there is a disjoint assignment of schedules to instances
    • or all instances can share all schedules by keeping all state persisted, and by mutually ensuring only one instance works on the same schedule at the same time
  • Multiple instances of the scheduler can run
    • either in each CDAP master. That means a secondary (standby) master must still have an active scheduler
    • in Yarn containers. This would further increase the Yarn container footprint of CDAP's master services. But it would decouple the scheduler from the number of masters, providing more scalability.

Expressiveness

Schedules can be enhanced with these features:

  • A trigger based on a notification from TMS. Other system services emit these notifications, for example:
    • A new partition in a file set
    • A program has succeeded (or failed)
    • A stream has N GB of new data
  • A repeated trigger, for example:
    • 4 new partitions in a file set
    • A program has succeeded N times
    • etc.
  • A time span has passed since the last time an event has happened
  • A composite trigger: 
    • (AND) Multiple triggers must all occur
    • (OR) One out of several triggers must occur
    • An OR could also be expressed as two separate schedules 
  • Run constraints:
    • Limit on the number of concurrent runs (existing for time schedules)
      • Limit across a group of workflows/schedules
    • Limit on the number of runs per time unit (hour, day, etc)
    • Delay a schedule after it triggered until a configured time window (for instance, run a program N minutes after the required data arrives)
    • Conditionally execute based on cluster load/resource availability
    • Constraints should be able to suppress or delay the execution of a program

Terminology

We will consistently use the following terms when talking about schedules:

  • A schedule consists of the following:
    • The program to start, along with runtime arguments or options. 
    • A trigger
    • A set of run constraints
  • A trigger is a rule describing a condition that initiates the start of a program. A trigger can be 
    • time-based
    • event-based 
    • (possibly more in the future)
  • A run constraint is a condition that needs to be fulfilled before the program is started. 
    • A run constraint can either cancel or delay the start of the program
  • A message is an object that contains an event
    • Messages are transported via TMS (transactional messaging service) topics
  • A job represents a program whose trigger has been fired by an event. 
    • A job is pending if one or more of its constraints are not fulfilled yet
    • What exactly is contained in a job is an implementation detail. Most likely it contains:
      • The schedule (or its id) that was triggered
      • The trigger that caused this job to be created, and the event that fired it. 
      • Additional triggers that have fired (and their events). This is in the future when we start supporting composite triggers.
      • The program to be started and its runtime arguments (or perhaps we only compute the runtime arguments when we start the program)
      • All of the schedule's run constraints, and whether and how they have been fulfilled. 
  • The 'working set' is the set of pending jobs maintained by the scheduler.
    • TODO: find a better name. Job queue? Pending set?

Approach

Event-based scheduling will rely on the Transactional Messaging System (TMS) in CDAP.

End-to-End Architecture

  • Program container, system services, and the CDAP master will emit messages to TMS topics. These messages can be about program life cycle, data availability, or possibly other types of events. 
  • The scheduler subscribes to these messages transactionally, that is, it makes sure it never drops a messages. 
  • Based on the event described by a message, the scheduler starts workflows (and in the future, other program types).
  • Some workflows have run constraints that prevent them from running or delay the start of the workflow. 
    • The scheduler maintains internal state about what workflows have been triggered and what constraints they are depending on (the 'working set' of pending jobs)
    • When all constraints of the job are fulfilled, the scheduler starts the job's program. 
    • More details below.
  • To start a program, the scheduler relies on the CDAP master. That is, none of the program start and monitoring logic needs to move into the scheduler. 
  • The scheduler could run inside the CDAP master, or it could run in a system container for greater scalability. 

Scheduler Internals

The has two main components:

  • A TMS subscriber that consumes messages. 
    • For each event found in a message, it looks up the schedule store to find all schedules triggered by this event and creates or updates a job in the working set for it. To make the schedule lookup efficient, the schedule store must be indexed or allow some other kind of reverse lookup. 
    • After all jobs are created for a message, the message is marked as read, and it will never be consumed a again. To make sure that it never drops an event, the job must be persisted to the working set in the same transaction.
  • A constraint checker (TODO: find better name).
    • It validates the run constraints for the jobs in the working set. 
    • When all constraints for a job are fulfilled, it makes a call to the CDAP master to start the job, and removes the job from the working set. 

TMS Topics

What topics will be used for TMS message transport?

  • We will NOT segregate TMS topics by namespace. That is, we will not have a separate topic for each namespace, but all namespaces will share the same topic. This makes topic management siginificantly easier: We do not need to create/delete topics every time we create/delete a namespace. Also, the scheduler does not need to subscribe/unsubscribe every time a namespace is created or deleted. Scalability should not be an issue: In the end the scheduler needs consume all messages. We expect the volume of messages to be relatively low. 
  • We will have one topic per message type: Data availability and program lifecycle events. These topics can be statically configured in cdap-site.xml

TMS Message Design

The message sent on TMS will have the following fields:
  • timestamp (long). Epoch timestamp of the triggering event.
  • triggerType (String).
  • payload (Json object), where the set of fields depends on the triggerType.

For PFS Partition triggers, it will contain the following fields:

  • pfsName: (String) name of the dataset
  • partitionKeys: GSON-serialized String of the list of partition keys that were added

 

Trigger Event
 public class TriggerEvent {
  private final long timestamp;
  // TODO: change to Enum
  private final String triggerType;
  // TODO: change to object?
  private final Map<String, String> properties;

  public TriggerEvent(long timestamp, String triggerType, Map<String, String> properties) {
    this.timestamp = timestamp;
    this.triggerType = triggerType;
    this.properties = properties;
  }
}

 

 

API changes 

A schedule will be represented as an object consisting of the following fields:

  • Trigger Type: This can either be a time trigger or a data trigger.
  • Program Id: Defines the program to launch when the schedule is triggered.
  • Concurrency: Defines an upper limit on the number of runs of the specified program.


Schedules that have a Data Trigger will have the following additional fields:

  • Delay: Duration after which the schedule is triggered that the program execution will be attempted.
  • Time Range: A time range that defines when the schedule is allowed to run. For instance (1-3AM).
  • Duration Since Last Run: A duration that must have passed since the last execution of the program.

 

Trigger Types:

  • Time Trigger: Defined simply by a cron expression
  • Data Trigger: Currently only one type of data trigger will be implemented:
    • PartitionedTrigger: Defined by a dataset Id and a number of partitions 

 

New Programmatic APIs

New Java APIs introduced (both user facing and internal)

Schedule Builder
public class Schedules {

  public static Builder builder(String name) {
    return new Builder(name);
  }

  public static class Builder {
    private final String name;
    private String description;
    private List<Constraint> constraints;

    private Builder(String name) {
      this.name = name;
      this.description = "";
      this.constraints = new ArrayList<>();
    }

    public Builder setDescription(String description) {
      this.description = description;
      return this;
    }

    public Builder setMaxConcurrentRuns(int max) {
      if (max < 1) {
        throw new IllegalArgumentException("max concurrent runs must be at least 1.");
      }
      constraints.add(new ConcurrencyConstraint(max));
      return this;
    }

    // fields that pertain more to data-triggered schedules
    public Builder setDelayMillis(long delayMillis) {
      // TODO: disallow from being called multiple times?
      constraints.add(new DelayConstraint(delayMillis));
      return this;
    }

    public Builder setTimeRange(int startHour, int endHour) {
      constraints.add(new TimeRangeConstraint(startHour, endHour));
      return this;
    }

    public Builder setDurationSinceLastRun(long delayMillis) {
      constraints.add(new DurationSinceLastRunConstraint(delayMillis));
      return this;
    }

    // TODO?: Add a method that allows adding a generic Constraint object?
    // This will mean moving our Constraint interface to API, and somehow packaging user class into Scheduler system

    public Schedule build(Trigger trigger) {
      return new Schedule(name, description, trigger, constraints);
    }
  }
}

Triggers
public class TimeTrigger {
  private final String cronExpr;

  public TimeTrigger(String cronExpr) {
    this.cronExpr = cronExpr;
  }
}

...

public class PFSTrigger {
  private final String pfsName;
  private final int numPartitions;

  public PFSTrigger(String pfsName, int numPartitions) {
    this.pfsName = pfsName;
    this.numPartitions = numPartitions;
  }
}
Constraints
public class ConcurrencyConstraint extends Constraint {

  private final int maxConcurrency;

  public ConcurrencyConstraint(int maxConcurrency) {
    this.maxConcurrency = maxConcurrency;
  }

  @Override
  public Result check(Schedule schedule, Context context) {
    return null;
  }
}

...

public class DelayConstraint extends Constraint {

  private final long millisAfterTrigger;

  public DelayConstraint(long millisAfterTrigger) {
    this.millisAfterTrigger = millisAfterTrigger;
  }

  @Override
  public Result check(Schedule schedule, Context context) {
    return null;
  }
}

...

public class TimeRangeConstraint extends Constraint {

  // only is satisfied within the range [startHour, endHour)
  // TODO: Allow minute granularity
  private final int startHour;
  private final int endHour;

  public TimeRangeConstraint(int startHour, int endHour) {
    this.startHour = startHour;
    this.endHour = endHour;
  }

  @Override
  public Result check(Schedule schedule, Context context) {
    return null;
  }
}

...

public class DurationSinceLastRunConstraint extends Constraint {

  private final long millisSinceLastRun;

  public DurationSinceLastRunConstraint(long millisSinceLastRun) {
    this.millisSinceLastRun = millisSinceLastRun;
  }

  @Override
  public Result check(Schedule schedule, Context context) {
    return null;
  }
} 
Constraint
public interface Constraint {

  Result check(Schedule schedule, Context context);

  enum Result {
    SATISFIED,
    RETRY,
    NEVER;

    private final Long millisBeforeNextRetry;

    Result() {
      this(null);
    }

    Result(@Nullable Long millisBeforeNextRetry) {
      this.millisBeforeNextRetry = millisBeforeNextRetry;
    }
  }
}

 

 

Deprecated Programmatic APIs

New REST APIs

PathMethodDescriptionResponse CodeResponse
/v3/apps/<app-id>GETReturns the application spec for a given application

200 - On success

404 - When application is not available

500 - Any internal errors

 

     

Deprecated REST API

PathMethodDescription
/v3/apps/<app-id>GETReturns the application spec for a given application

CLI Impact or Changes

  • Impact #1
  • Impact #2
  • Impact #3

UI Impact or Changes

  • Impact #1
  • Impact #2
  • Impact #3

Security Impact 

(TODO) Figure out how programs will write to the TMS topic (and how the scheduler system service will read from it)

Miscellaneous Open Questions

  • Topology of notification messages in TMS (for instance, timestamp, partition key(s), dataset name)
  • Topology of topics in TMS (for instance, will all notifications be in a single topic?)
  • How will the notification metadata be exposed to workflow (metadata of the trigger)?
  • How can PFS Consumer be leveraged?
  • If a notification triggers many programs, how will atomicity be handled?
  • How will scaling of the scheduler service be handled?
  • Replication impact?

 

Work Items

  • Notifications
    • Design topology of topics and schema of messages
    • Emit messages to TMS 
      • when a program completes
      • when a partition is added to a PFS
      • others?
    • Security: 
      • Need to emit from program runners/app fabric but also from dataset=application code
      • Need to subscribe from CDAP system service (scheduler)
  • Specification
    • Design and implement API tp configure schedules, triggers and run constraints
      • as part of app deployment
      • separately from app deployment
  • Scheduler
    • Implement as a system service running in Yarn
      • Impersonation
      • Workflows are still started by master?
    • Subscribe to TMS
    • Message-to-Schedule lookup (n-to-m mapping)
    • Atomicity of message consumption when a single event triggers multiple schedules
    • Run constraints
      • Max Concurrent runs  
        • of same workflow
        • of other workflows in same "bucket" (how to define that)
      • Only run in specific time window (e.g. 1-pm-6am)
      • Delay workflow start by N minutes after message triggered
      • Run no sooner than N minutes after last run of the workflow
      • Unfulfilled constraint can lead to 
        • skip this run of the schedule
        • wait until constraint is fulfilled
          • what if multiple runs triggered by the same schedule are waiting, do they all start when constraint become true? Or only one? 
      • others?
    • "working set" = all pending triggers (schedules that were triggered and are waiting for run constraints to become true)
      • Persistence to survive outage
      • Monitoring/polling of working and starting workflows when their constraints are fulfilled 
    • Tools to debug / triage
      • debugger to inspect the state
      • emit metrics
      • record history in some way
    • Scalability
      • multiple instances of scheduler sharing the work
  • Tech debt
    • Port existing schedulers to new framework
      • Quartz scheduler
      • Stream size scheduler
    • Migration of existing schedule store to new schema/format
  • Test
    • Long running tests for all above scenarios
  • Examples
  • Documentation

Impact on Infrastructure Outages 

System behavior (if applicable - document impact on downstream [ YARN, HBase etc ] component failures) and how does the design take care of these aspect

Test Scenarios

Test IDTest DescriptionExpected Results
 Configure a schedule to run whenever there is ANY new partition, but configure a minimum interval (5 minutes?) between runs. Add five partitions, distributed evenly across a five minute time window.The first partition being added will trigger a workflow run, but the partitions added right after it shouldn't trigger a workflow run because sufficient time hasn't passed.
   
   
   

Releases

Release 4.2.0

Scope:

  • Scheduler runs in CDAP master, only one instance is active
  • Supported event types: 
    • Availability of a new partition in a (T)PFS, 
    • Completion of a program (stretch).
  • Supported triggers:
    • Availability of N partitions
    • Successful completion of a program (stretch)
    • Failure of a program (stretch)
  • Supported run constraints:
    • Concurrent runs of the same program
    • Concurrent runs of programs in the same "category" (stretch)
    • Delay execution by N time units
    • Minimal time elapsed since last run of the same program
    • Only run in a fixed time window
    • Timeout of a pending job: discard job 
    • Timeout of a pending job: force run (stretch)  
  • Port existing features
    • Time trigger
    • Stream size (stretch)
    • Online upgrade step
  • Diagnostic tools 

Sprint 1: (demo 5/1)

  • Finish Design
  • Create TMS topic for new partition messages
  • Define a schedule with single new partition trigger
  • Scheduler to consume from TMS, find schedules, immediately start job. 
    • no run constraints
    • no working set
    • no schedule index (scan schedule store)

Sprint 2: (demo 5/8)

  • Implement working set
  • Triggers
    • based on N new partitions
    • by program completion (stretch)
      • emit messages from program runner (workflow only)
      • schedule triggered by successful run
    • time-based
      • keep quartz
      • when triggers, simply put into working set
  • Run constraints (skip only): 
    • define common API
    • concurrent runs
    • time elapsed since last run
    • fixed time window

Sprint 3: (demo 5/15)

  • Indexed schedule store
    • including online upgrade
  • Triggers
    • failed program run
  • Run constraints with delay
    • all of previous ones
    • delay execution
    • timeout and discard
  • Diagnostic tools:
    • inspect working set
    • inspect schedule store 
    • emit metrics

Sprint 4: (demo 5/22)

  • Integration test

Sprint 5: (demo 5/29)

  • Long-running test completed

Release X.Y.Z

Related Work

  • Work #1
  • Work #2
  • Work #3

 

Future work

  • No labels