CDAP Cloud - Program Launcher

Checklist

  • User Stories Documented
  • User Stories Reviewed
  • Design Reviewed
  • APIs reviewed
  • Release priorities assigned
  • Test cases reviewed
  • Blog post

Introduction 

The Program Launcher component will be responsible for preparing and launching the CDAP runtime on a specified target cluster. Depending on the program's profile, this may mean running the program against a local (to the CDAP master's cluster) YARN cluster or a Google DataProc cluster running remotely in the cloud.

Goals

The goal of this document is to describe, in detail, what the Program Launcher's responsibilities are and to propose a Program Launcher SPI. The process of creating this document will also help identify missing pieces of the integration with other CDAP components.

User Stories

  1. As a developer, I want to run my program against an on-premises Hadoop cluster.
  2. As a developer, I want to run my program against a Google DataProc cluster.
  3. As a cloud provider/vendor, I want to be able to develop a Program Launcher extension that will enable CDAP to launch programs against that specific cloud. 

Design

Responsibilities

File Localization - The Program Launcher is responsible for localizing CDAP runtime jars and configurations to the target cluster. The Program Launcher will be responsible for aggregating files and making them available to the program runtime.

If running against a local YARN cluster, this step may be a no-op. On the other hand, if the implementation is for Amazon EMR, the extension may push the files to an S3 bucket.

Launching the CDAP Runtime on Target Cluster - Bootstraps the CDAP runtime on the target cluster. This will launch a Java process on a node of the target cluster, which will be responsible for launching and running the user's application.

Lightweight messaging between CDAP Master and Target Cluster - The Program Launcher SPI will be responsible for implementing a (lightweight) messaging system, so that information can be passed between the two components. This is necessary so that when the CDAP runtime on the target cluster is started successfully, it can pass back information to the Program Launcher component, conveying how best to monitor the program.

Program Launcher - Lifecycle

The following describes the high-level steps of the Program Launcher's lifecycle:

  1. Receive a request to launch a specific CDAP program
  2. Gather necessary files (JARs, data files, configuration files)
  3. Localize these files to the target cluster
  4. Launch a JAR on the target cluster
  5. This Java program will use the localized files to bootstrap a CDAP runtime with the proper classpath
  6. A DistributedProgramRunner can be used on in the bootstrapped CDAP runtime to launch the program against the now "local" YARN cluster

Open Questions

  1. Integrations with other components:
    1. What API will Program Launcher expose to the rest of CDAP?
    2. How will the handoff work from the Program Launcher to the Runtime Monitor? For example, at what process will we start monitoring the launching of the program?
  2. How will impersonation work, when it comes to launching programs on remote target clusters?
  3. How will authentication against the target cluster happen? Is it the responsibility of the system or of the extension?
  4. The current design assumes there is a YARN cluster running in the target cluster. What if there is some other execution engine, such as Kubernetes? What will the responsibility of the ProgramLauncher extension look like then?


Target Cluster Implementations to Consider

  1. Local Hadoop/YARN cluster
  2. Google DataProc
  3. EMR
  4. Kubernetes

Handling Failure Scenarios

Some failures that the Program Launcher needs to consider:

  1. Program Launcher component goes down after it launches the bootstrapping process on the target cluster, but before the handoff to the Program Monitor to monitor it.
  2. There is a failure in localizing the files (in the ProgramLauncher extension). In this case, the system can retry the step.


In order to handle these sort of failure scenarios, the program launcher will have a sort of action log which records intent as well as state. This will enable it to recover execution state in case of failure.

Proposed states:

REQUESTING_LAUNCH
LAUNCHING
RUNNING

These states will not be exposed to the user, but instead used internally to keep track of what actions are possible in any given situation.

The launch process will look like:

  1. The ProgramLauncher is requested to run a particular program. At this point, the launch state will be REQUESTING_LAUNCH, and it will start working on gathering the relevant files for the application and copying the files over to the target cluster.
  2. Once the files are copied over to the target cluster, a bash script will be run to set up the classpath and execute a Java main class. Before this bash script is executed, the state will be changed to LAUNCHING.
  3. Once that bash script has completed execution and the Java runtime is initialized (including the runtime monitor server being in place), we can set the state to RUNNING.

Then, in the case of system failure, we can recover by inspecting this launch state. If the state is REQUESTING_LAUNCH, that means that we have not yet started step 2 outlined above. It is safe to restart step 1, though we will just need to clean up any temporary files that we may have copied over to the target cluster at some time.

If the state is LAUNCHING, then it is not safe to restart step 2, because the Java main class which runs the program on the target cluster may already have been launched. At this point, we can try to see if the runtime server is running, and if so, monitor the program then.

If the state is RUNNING, then there are no steps for the ProgramLauncher to complete, and we can simply recover by reconnecting to the runtime monitor.

API changes

New Programmatic APIs/SPIs

TargetClusterProgramRunner

/**
 * Implementation of ProgramRunner, which is responsible for launching the program against a (remote) target cluster.
 */
public class TargetClusterProgramRunner extends ProgramRunner {
 
  /**
   * Runs the given program against a (remote) target cluster. Internally, it will use a ProgramLauncher SPI implementation in order to facilitate the steps specific to the target cluster.
   */
  ProgramController run(Program program, ProgramOptions options) {
    // run the program against the target cluster
 
  }
}

Program Launcher SPI

/**
 * Responsible for facilitating the launching of a program, particularly the steps that are specific to a particular target cluster type.
 */
public interface ProgramLauncher {
 
  /**
   * Responsible for localizing the given files, such that they are available on the target cluster.
   */
  void localizeFiles(List<File> filesToLocalize);
 
  /**
   * Responsible for bootstrapping the CDAP runtime on the target cluster.
   * The extension will be responsible for providing the path to the localized files as the first argument.
   */
  void launchCdapRuntime(File jarFile);
  
}

Deprecated Programmatic APIs

N/A

New REST APIs

PathMethodDescriptionRequest ParamsRequest BodyResponse CodeResponse







Deprecated REST API

N/A

CLI Impact or Changes

N/A

UI Impact or Changes

N/A

Security Impact 

There are still several unanswered questions around how we will handle security with respect to remote clusters. For instance, how will we launch against a remote YARN cluster? Are the credentials and the authentication the responsibility of the extensions? Also see Open Questions #2.

Impact on Infrastructure Outages 

Adds an additional dependency on external cloud services.

Test Scenarios

Test IDTest DescriptionExpected Results












Releases

Release 5.0.0

Error rendering macro 'jira' : Unable to locate Jira server for this macro. It may be due to Application Link configuration.

Release 5.1.0

Related Work

  • Work #1
  • Work #2
  • Work #3


Future work

Created in 2020 by Google Inc.