Schedules are used to automate the execution of workflows on a recurring time basis, on data availability, on certain statuses reached by another program, or a logical combination of the conditions mentioned previously. The schedules of an application can be added as part of application configuration. They can also be managed through Microservices endpoints that allow listing, modifying, deleting, and creating schedules, as well as disabling and enabling them.

A schedule must have a unique name within its application (the same name can be used in different applications), and additionally consists of:

To add a schedule to an application extended from AbstractApplication, use the method schedule. For example, the Purchase example creates a schedule for the PurchaseHistoryWorkflow as follows:

schedule(
  buildSchedule("DailySchedule", ProgramType.WORKFLOW, "PurchaseHistoryWorkflow")
    .withConcurrency(1).abortIfNotMet()
    .triggerByTime("0 4 * * *"));

This schedule uses a time trigger that fires every day at 4:00 am, and it specifies a concurrency run constraint that will prevent the workflow execution if another run of the same workflow is executing at that time. Note that:

If more than one conditions are required to construct the Trigger for the schedule, to an application extended from AbstractApplication, call method getTriggerFactory to get a TriggerFactory to create a specific Trigger as follows:

schedule(
  buildSchedule("Workflow1AndWorkflow2CompletedSchedule", ProgramType.WORKFLOW, "TriggeredWorkflow")
    .triggerOn(getTriggerFactory().and(getTriggerFactory().onProgramStatus(ProgramType.WORKFLOW, "Workflow1",
                                                                           ProgramStatus.COMPLETED),
                                       getTriggerFactory().onProgramStatus(ProgramType.WORKFLOW, "Workflow2",
                                                                           ProgramStatus.COMPLETED)));

This schedule uses a trigger that can only be satisfied when both Workflow1 is completed and Workflow2 is completed.

Schedules can be added and controlled by the CDAP CLI and the Lifecycle Microservices. The status of a schedule can be retrieved, and individual schedules can be added, enabled, or disabled.

When a schedule is initially deployed, it is in a disabled state. It needs to be enabled before it starts executing workflows.

Jobs and their Lifecycle

Execution of workflows is initiated when the trigger of a schedule fires. This creates a Job for the workflow in the scheduler's Job Queue. This job will not necessarily execute immediately. Instead, each job goes through a life cycle:

Events and Notifications

Triggers are fired by events such as creation of a new partition in a dataset, or fulfillment of a cron expression of a time trigger, or the status of a program. Events reach the scheduling system as notifications on the Transactional Messaging System (TMS). A single notification can contain multiple events, for example, two new partitions for a dataset. For a time trigger, the event contains the logical start time, that is, the time when the cron expression fired. This logical start time is given to the workflow as a runtime argument. For a program status trigger, the event contains the triggering program status and the triggering program run id.

Run Constraints

A run constraint can either delay or prevent the execution of a schedule’s workflow, based on a condition represented by the constraint. The default behavior of whether the execution is delayed or aborted is different for each type of run constraint. It can be configured explicitly by specifying either .waitUntilMet() or .abortIfNotMet() when adding the constraint to the schedule builder. Every individual type of run constraint also has its own default for this behavior. The following constraints are available:

Triggers

A trigger can be based on time or data availability, These are the available trigger types:

Examples

To schedule a workflow whenever a new partition is added to a dataset, but delay execution to the time window between 10pm and 6am:

schedule(buildSchedule("runOnlyAtNight", ProgramType.WORKFLOW, "cleanupWorkflow")
           .withTimeWindow("22:00", "06:00”).waitUntilMet()
           .triggerOnPartitions("myDataset", 1));

The same as before, but ensure that it runs only once in that time window:

schedule(buildSchedule("runOnlyAtNight", ProgramType.WORKFLOW, "cleanupWorkflow")
           .withTimeWindow("22:00", "06:00”).waitUntilMet()
           .withDurationSinceLastRun(6, TimeUnit.HOURS).abortIfNotMet()
           .triggerOnPartitions("myDataset", 1));

To schedule a workflow whenever there are four new partitions, with 15 minutes delay to allow additional data to arrive:

schedule(buildSchedule("onPartitionWithDelay", ProgramType.WORKFLOW, "myWorkflow")
           .withDelay(15, TimeUnit.MINUTES)
           .triggerOnPartitions("myDataset", 4));

To schedule a workflow named "cleanupWorkflow" to run whenever "dataProcessingWorkflow" (in the same namespace, application, and application version as "cleanupWorkflow") fails:

schedule(buildSchedule("onDataProcessingFail", ProgramType.WORKFLOW, "cleanupWorkflow")
            .triggerOnProgramStatus(ProgramType.WORKFLOW, "dataProcessingWorkflow",
                                    ProgramStatus.FAILED);

To ensure that the workflow runs at least once per hour:

schedule(buildSchedule("onPartitionWithDelay", ProgramType.WORKFLOW, "myWorkflow")
           .setProperties(ImmutableMap.of("triggered.by", "data"))
           .withDelay(15, TimeUnit.MINUTES)
           .withConcurrency(1).abortIfNotMet()
           .triggerOnPartitions("myDataset", 4));
schedule(buildSchedule("onceHourly", ProgramType.WORKFLOW, "myWorkflow")
           .setProperties(ImmutableMap.of("triggered.by", "time"))
           .withConcurrency(1).abortIfNotMet()
           .withDurationSinceLastRun(1, TimeUnit.HOURS).abortIfNotMet()
           .triggerByTime("0 * * * *"));

We added another schedule that runs once hourly, but only if no other run of the workflow in the last hour succeeded and no concurrent run is happening. We also add the concurrency constraint to the first schedule to make sure it does not kick off when the second schedule is just executing a job.

Note that through the properties we can indicate to the workflow which schedule triggered it. We could also pass arbitrary other properties.

Managing Schedules

Schedules Lifecycle

These actions can be performed on a schedule:

Application Deployment and Schedules

Schedules can be defined as part of application deployment, by calling schedule() in the application's configure() method; or they can be managed separately from application deployment through the CDAP Microservices. This can create a dilemma: After modifying schedules through CDAP Microservices, redeploying the application, which may happen for reasons unrelated to the schedules, would undo all those changes and reinstate the schedules defined by the configure() method. Because that is undesired, CDAP provides an option to configure whether schedules are controlled by the configure() method or not. This option is called app.deploy.update.schedules and is given as a field of the application deployment request.

Special Runtime Arguments

When a schedule with a time trigger executes a workflow it passes in the logicalStartTime runtime argument: this is the timestamp, in milliseconds, at which the schedule's cron expression was fulfilled and triggered the job.