Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Checklist

  •  User Stories Documented
  •  User Stories Reviewed
  •  Design Reviewed
  •  APIs reviewed
  •  Release priorities assigned
  •  Test cases reviewed
  •  Blog post

Introduction 

When CDAP system services are down, or when dependent services like HBase, HDFS, YARN, Kafka, ZooKeeper, Kerberos, etc are down, user programs and CDAP services must remain running, though they may be in a degraded mode where some operations will fail.

Goals

Define the expected behavior of CDAP programmatic APIs during underlying service failures

Define the expected behavior of CDAP RESTful APIs during underlying service failures

User Stories 

  • As an application developer, I want my Service, Flow, Worker, and SparkStreaming programs to remain running when underlying services experience downtime
  • As an application developer, I want my Service, Flow, Worker, and SparkStreaming programs to recover to a normal state when underlying service downtime ends
  • As an application developer, I want to be able to configure how long my batch programs (Workflow, MapReduce, Spark batch) should retry before eventually failing when underlying services experience downtime
  • As a system administrator, I want CDAP system services to remain running when underlying services experience downtime
  • As a system administrator, I want CDAP RESTful APIs to return with http code 503 when required underlying services are unavailable
  • As a system administrator, I want CDAP system services to recover to a normal state when underlying service downtime ends
  • As a system administrator, I want CDAP program run history recording to be unaffected by failover
  • As a system administrator, I want all CDAP system services to be highly available

Design

(Work in Progress)

Here we will list out different types of service unavailabilities that may occur, and describe the expected behavior for different types of programs in those situations.

RS-002: Client Resiliency

From a user program's perspective, any method that involves a remote call can throw a ServiceUnavailableException, which extends RetryableException. The program can catch and handle the exception. If it doesn't, CDAP will handle it depending on the context.

CDAP APIs that involve a remote call are:

  • DatasetContext.getDataset() (if the cache is not hit)
  • ServiceDiscoverer.getServiceURL()
  • SecureStore.listSecureData()
  • SecureStore.getSecureData()
  • Transactional.execute()
  • Admin.datasetExists()
  • Admin.getDatasetType()
  • Admin.getDatasetProperties()
  • Admin.createDataset()
  • Admin.updateDataset()
  • Admin.dropDataset()
  • Admin.truncateDataset()
  • StreamWriter.write() (only in Workers)
  • StreamWriter.writeFile() (only in Workers)
  • StreamBatchWriter (only in Workers)
  • Table methods
  • Location methods
  • PartitionedFileSet methods

If uncaught by the program, CDAP will handle the exception depending on the context it occurs in:

  • CDAP APIs that involve a remote call and that are called in a long transaction (inside a Mapper, Reducer, or Spark), or not called inside any transaction (methods where explicit transactions are used and the code is not in a TxRunnable), will be retried a configurable amount of time before throwing a ServiceUnavailableException.
  • Programs running Transactional.execute(TxRunnable) will throw a RetryableTransactionFailureException. Transactional can be used in:
    • initialize methods using explicit transactions.
    • destroy methods using explicit transactions.
    • Service endpoint methods using explicit transactions.
    • CustomAction.run().
    • Worker.run().
    • Spark client.
  • Programs running their initialize() method will retry a configurable amount of time before failing the program run
  • Programs running their destroy() method will retry a configurable amount of time before failing the program run
  • Service programs executing an endpoint method with an implicit transaction will return a 503 error code
  • Flowlet programs executing their processEvent method with an implicit transaction will indefinitely retry the process method with its event
  • Programs that are starting up and that use the `@UseDataSet` annotation will try to get and inject the Dataset for a configurable amount of time before failing the program run

This documents how these APIs should behave. It does not document how they currently behave. JIRAs will be opened for any differences in behavior and fixed.

Configurable Retries

Lifecycle Methods

Consider the following code for a lifecycle method that run at the end of a mapreduce job and writes to a Table dataset some metadata for each new output partition:

Code Block
public class MyMapReduce extends AbstractMapReduce {

  @TransactionPolicy(TransactionControl.IMPLICIT)
  @Override
  public void destroy() {
    MapReduceContext context = getContext();
    String programName = context.getSpecification().getName();
    PartitionedFileSetTable outputoutputMeta = context.getDataset("output");
    Set<PartitionKey> newOutputPartitions = getOutputPartitions();
    for (PartitionKey key : newOutputPartitions) {
      outputoutputMeta.addMetadataput(getRowKey(key), getPartitionMetadata(key));
    }
  }

This method can be retried by CDAP because it is idempotent. For methods like this, it would be desirable to have the platform retry the method, especially since a service outage can easily last longer than a short transaction timeout. Since there is no way for CDAP to know what methods are idempotent, it is up to the user to configure the amount of time to retry. One option would be to add a RetryPolicy annotation for the method. For example:

Code Block
@TransactionPolicy(control = TransactionControl.IMPLICIT)
@RetryPolicy(backoffstrategy = RetryBackoffRetryPolicy.Strategy.FIXED_DELAY, timeout = 180000, baseDelay = 10000)
@Override
public void destroy() {
  MapReduceContext context = getContext();
  String programName = context.getSpecification().getName();
  PartitionedFileSetTable outputoutputMeta = context.getDataset("output");
  Set<PartitionKey> newOutputPartitions = getOutputPartitions();
  for (PartitionKey key : newOutputPartitions) {
    PartitionDetail detail = output.getPartitionoutputMeta.put(getRowKey(key), getPartitionMetadata(key));
    output.addMetadata(key, getPartitionMetadata(key));
  }
}}
}

This would tell CDAP to retry the destroy() method for 3 minutes if it throws a RetryableException.

Now consider some similar code that has been changed to use a transaction per partition adds several output partitions at the end of the job, but adds each partition in its own transaction in order to avoid transaction timeouts when there are many output partitions:

Code Block
@TransactionPolicy(TransactionControl.EXPLICIT)
@Override
public void destroy() {
  MapReduceContext context = getContext();
  final String programName = context.getSpecification().getName();
  PartitionedFileSet output;
  long startTime = System.currentTimeMillis();
  while (System.currentTimeMillis() - startTime < 180000) {
    try {
      output = context.getDataset("output");  // remote call
      break;
    } catch (RetryableException e) {
      TimeUnit.SECONDS.sleep(1);
    }
  }
  if (output == null) {
    throw new RuntimeException("Unable to get output dataset after 3 minutes");
  }

  Set<PartitionKey> newOutputPartitions = getOutputPartitions();
  for (final PartitionKey key : newOutputPartitions) {
    long startTime = System.currentTimeMillis();
    boolean succeeded = false;
    while (System.currentTimeMillis() - startTime < 180000) {
      try {
        context.execute(new TxRunnable() { // remote call
          @Override
          public void run(DatasetContext context) throws Exception {
            output.addPartition(key, getPartitionPath(key)); // remote call
          }
        });
        succeeded = true;
        break;
      } catch (RetryableTransactionFailureException e) {
        TimeUnit.SECONDS.sleep(1);
      } catch (TransactionFailureException e) {
        throw new RuntimeException(e);
      }
    }
    if (!succeeded) {
      throw new RuntimeException("Unable to add metadata after 3 minutes");
    }
  }
}

The destroy() method is no longer idempotent because it starts multiple transactions, which means you would not want to configure CDAP to retry this method. Instead, the developer is forced to add their own retry logic around every remote call. This, however, makes the code very messy, and requires similar logic to be present in every single user program. In these situations, it would be desirable for CDAP to handle the retries for the getDataset() call, and also for the execute() call.

Option 1 (not chosen)

We could extend the annotation idea to apply to all remote calls within a method. Then the code would look like:

Code Block
@TransactionPolicy(TransactionControl.EXPLICIT)
@RetryPolicy(backoffstrategy = RetryBackoffRetryPolicy.Strategy.FIXED_DELAY, timeout = 180000, baseDelay = 10000)
@Override
public void destroy() {
  MapReduceContext context = getContext();
  final PartitionedFileSet output = context.getDataset("output");  // remote call

  Set<PartitionKey> newOutputPartitions = getOutputPartitions();
  for (final PartitionKey key : newOutputPartitions) {
    try {
      context.execute(new TxRunnable() { // remote call
        @Override
        public void run(DatasetContext context) throws Exception {
          output.addPartition(key, getPartitionPath(key)); // remote call
        }
      }
    } catch (TransactionFailureException e) {
      throw new RuntimeException(e);
    }
  }
}

The retry policy would then apply to all remote calls within the method, which are the getDataset() call and to the execute() call. It would not, however, apply to the addPartition() call, though TxRunnable's run() method could also be annotated if desired. This does make things confusing, since it is hard to tell what the retry policy applies to and seems to suggest that the entire destroy() method will be retried instead of the getDataset() and execute() methods.

Option 2 (not chosen)

Another option would be to add methods for each remote call to take a RetryPolicy as an argument. For example:

Code Block
@TransactionPolicy(TransactionControl.EXPLICIT)
@Override
public void destroy() {
  RetryPolicy retryPolicy = RetryPoliciesRetryPolicy.timeLimitfixedDelay(3, TimeUnit.MINUTES, RetryPolicies.fixedBackoff(110, TimeUnit.SECONDS));
  final PartitionedFileSet output = getContext().getDataset("output", retryPolicy);  // remote call
  Set<PartitionKey> newOutputPartitions = getOutputPartitions();
  for (final PartitionKey key : newOutputPartitions) {
    try {
      context.execute( // remote call
        new TxRunnable() {
          @Override
          public void run(DatasetContext context) throws Exception {
            output.addPartition(key, getPartitionPath(key)); // remote call
          }
        },
        30, retryPolicy);
    } catch (TransactionFailureException e) {
      throw new RuntimeException(e);
    }
  }
}

This would remove the ambiguity, and would also indicate to users which methods involve remote calls and which ones do not. However, this is a little more verbose and also adds a lot of clutter to the CDAP APIs, especially as APIs evolve and more parameters are added to various methods.

Option 3 (chosen)

An alternative to this would be to let the program cluster admin set a retry policy that will apply to all remote calls in the program:

Code Block
@Override
public void configure() {
  setRemoteRetryPolicy(RetryPolicies.timeLimit(3, TimeUnit.MINUTES, RetryPolicies.fixedBackoff(1, TimeUnit.SECONDS)));
}
 
@TransactionPolicy(TransactionControl.EXPLICIT)
@Override
public void destroy() {
  PartitionedFileSet output = getContext().getDataset("output");
  ...
}

This would also extend naturally to users being able to dynamically set the remote retry policy of particular program using preferences and runtime arguments.

In situations where a developer program wide retry policy by setting preferences and runtime arguments. In the future, we may add an ability to configure a custom policy programmatically, but the first release will only support runtime arguments. This parallels what is done for setting short transaction timeouts through runtime arguments.  If none is set, a default will be used, depending on the program type. These defaults will be configurable through cdap-site.xml.

 

In situations where a developer really wants to control over retries for specific calls, they can set the retry policy to no retries, then use a utility class that CDAP provides that executes a Callable, retrying it if it throws certain types of exceptions:

Code Blockpublic static class Retries { public static <T> T call(Callable<T> callable, RetryPolicy retryPolicy)

none, and handle the retry logic in their code.

MapReduce/Spark

Consider the following MapReduce code:

Code Block
@Override
public void initialize() throws Exception {
  MapReduceContext context ...
= getContext();
 } }Job  job @Override
public void initialize= context.getHadoopJob();
{   getContext().setRemoteRetryPolicy(RetryPolicies.NO_RETRIESjob.setReducerClass(PerUserReducer.class);
}   @TransactionPolicycontext.addInput(TransactionControl.EXPLICIT)
@Override
public void destroy() {
  final MapReduceContext context = getContext(Input.ofDataset("purchases"), PurchaseMapper.class);
  final PartitionedFileSet output = Retries.call(new Callable<Object>() {
    @Override
    public Object call() throws Exception {
      return context.getDataset("output");
    }
  }, RetryPolicies.timeLimit(3, TimeUnit.MINUTES, RetryPolicies.fixedBackoff(1, TimeUnit.SECONDS)));
  ...
}

 

MapReduce/Spark

Consider the following MapReduce code taken from the Purchase example:

Code Block
@Override
public void initialize() throws Exception {
  MapReduceContext context = getContext();
  Job job = context.getHadoopJob();
  job.setReducerClass(PerUserReducer.class);
  context.addInput(Input.ofDataset("purchases"), PurchaseMapper.class);
  context.addOutput(Output.ofDataset("history"));
}

In this example, the MapReduce job reads from an ObjectMappedTable called "purchases" and writes to a custom dataset called "history". This means that all the code handling retries and timeouts is located in the datasets and not in this program. Today, with these particular datasets, read operations eventually boil down to HBase scan calls, with any exception simply being propagated up as a runtime exception. This means the resiliency is actually dependent on the hbase client settings in hbase-site.xml. In order to configure retries and timeouts from job to job, the developer would have to know that HBase is being used behind the scenes, and to set various hbase settings in the job Configuration.

Code Block
@Override
public void initialize() throws Exception {
  ...
  Configuration config = job.getConfiguration();
  config.setInt("hbase.client.operation.timeout", 3 * 60 * 60 * 1000);
}

Requiring this breaks abstraction barriers and also requires strong hbase knowledge. One way to address this is to introduce retry policy as a dataset property.

Code Block
@Override
public void initialize() throws Exception {
  ...
  Map<String, String> args = new HashMap<>();
  DatasetArguments.setRetryPolicy(args, RetryPolicies.timeLimit(3, TimeUnit.HOURS, RetryPolicies.fixedBackoff(1, TimeUnit.MINUTES)));
  context.addInput(Input.ofDataset("purchases", args), PurchaseMapper.class);
}

This will tell the "purchases" dataset to retry any method that throws a RetryableException for 3 hours. It would be up to the dataset to implement the correct resiliency behavior. If a retry policy is not given as a dataset argument, the retry policy for the program will be used. This would only apply to that dataset and not to remote calls that happen inside the Mapper or Reducer.

Spark would work in a similar way, with the .fromDataset() and saveAsDataset() being analogous to the addInput() and setOutput() methods in MapReduce. 

Services

Service endpoints that use implicit transaction control can be annotated with a retry policy. If the method throws a RetryableException, the method will be retried according to the policy. If it still fails, a 503 will be returned.

Code Block
@Path("history/{customer}")
@GET
@TransactionPolicy(TransactionControl.IMPLICIT)
@RetryPolicy(backoff = RetryBackoff.FIXED_DELAY, timeout = 5000)
public void history(HttpServiceRequest request, HttpServiceResponder responder,
                    @PathParam("customer") String customer) {
  PurchaseHistory history = store.read(customer);
  if (history == null) {
    responder.sendString(HttpURLConnection.HTTP_NO_CONTENT,
                         String.format("No purchase history found for %s", customer), Charsets.UTF_8);
  } else {
    responder.sendJson(history);
  }
}

In this example, the store.read() method may throw a RetryableException. If so, CDAP will retry the method for 5 seconds before returning a 503.

Flowlets

All flowlet process methods use implicit transaction control. The RetryPolicy annotation can also be for the process method. On retryable exceptions, the method will be retried according to the policy before moving on. The default policy will be configurable.

Workers

Workers use explicit transaction control for their run method. If a RetryableException is not handled by the user code, the run will fail. The default policy will be configurable.

UseDataSet

Fields annotated with the UseDataSet annotation will be injected by the CDAP platform when the class is being instantiated. This amounts to a getDataset() call made by the CDAP framework during instantiation.  If the dataset service is not available when this happens, it would be desirable to retry some amount of time before failing. To support this, we can allow the RetryPolicy annotation for dataset fields, similar to lifecycle methods:

Code Block
  @UseDataSet("frequentCustomers")
  @RetryPolicy(backoff = RetryBackoff.FIXED_DELAY, timeout = 180000)
  private KeyValueTable frequentCustomers;

This would tell CDAP to retry the getDataset() call for 3 minutes if it throws a RetryableException while instantiating the class.

Summary

Add a configure time method to set the remote retry policy of a program.

The remote retry policy can be dynamically configured using preferences and runtime arguments.

A Retries utility class will be introduced to help users retry arbitrary code in case they need really fine grained control.

For methods using implicit transaction control, a RetryPolicy annotation will be allowed. This essentially amounts to syntactic sugar for a Transactional.execute(TxRunnable, RetryPolicy) call.

For dataset fields injected by the UseDataSet annotation, a RetryPolicy annotation will be allowed. This essentially amounts to syntactic sugar for a getDataset(name, retryPolicy) call.

CDAP System Service Failures

These are various services that CDAP runs, both inside YARN and on edge nodes. This section documents what service degradations are expected if various CDAP system services are not available.

Transaction

  • Transactions cannot be started or committed.
  • In contexts where a long transaction is used (MapReduce, Spark), 
    • transaction start will be retried for a configurable amount of time before the run is failed
    • transaction commit will be retried for a configurable amount of time before the run is failed
  • In methods where a short transaction is used:
    • any transaction failure in a Service method will cause a 503 to be returned
    • transaction start failures in a flowlet will be retried indefinitely
    • transaction commit failures in a flowlet will cause the process method to be retried
    • transaction start failures before an initialize() or destroy() method will be retried for a configurable amount of time before causing the program to fail
    • transaction commit failures after an initialize() or destroy() method will cause the method to throw a ServiceUnavailableException. If uncaught, the program will fail
    • transaction start failures in Transactional.execute() will retry transaction start a configurable amount of time before throwing a TransactionFailureException with the ServiceUnavailableException as its cause
    • transaction commit failures in Transactional.execute() will throw a TransactionFailureException with the ServiceUnavailableException as its cause
  • CDAP REST APIs will return a 503
  • Metrics Processor will be unable to process metrics
  • Log Saver will be unable to save logs

Dataset

  • Programs that use the @UseDataSet annotation and that are starting up and that will fail their YARN container. The container will be retried up to a configurable number of times.
  • Dataset REST APIs will return 503
  • Deploy application REST API will fail if the application creates any datasets and return a 503
  • TODO: check if other REST APIs will fail
  • Explore queries on datasets will fail
  • See Programmatic APIs section. Applies to:
    • DatasetContext.getDataset() (if the cache is not hit)
    • ServiceDiscoverer.getServiceURL()
    • SecureStore.listSecureData()
    • SecureStore.getSecureData()
    • Admin.datasetExists()
    • Admin.getDatasetType()
    • Admin.getDatasetProperties()
    • Admin.createDataset()
    • Admin.updateDataset()
    • Admin.dropDataset()
    • Admin.truncateDataset()

Streams

  • Stream REST APIs will return 503
  • Deploy application REST API will fail if the application creates any streams and return a 503
  • Notifications on stream size will not be sent (shouldn't be triggered anyway since there is no way to write to a stream when this happens)
  • StreamWriter methods will throw a ServiceUnavailableException. The developer can catch and handle the exception. If it is not caught and handled, the platform will handle it.
    • StreamWriter is only available in a Worker. Uncaught exceptions will fail the run
  • Old Stream files on HDFS will not be cleaned up until the service is restored
  • See Programmatic APIs section. Applies to:
    • StreamWriter.write() (only in Workers)
    • StreamWriter.writeFile() (only in Workers)
    • StreamBatchWriter (only in Workers)

Explore

  • Explore REST APIs will return with 503
  • Queries that are in progress will be lost
  • Queries that are in progress of writing data are undefined
  • See Programmatic APIs section. Applies to the following methods if the dataset referenced is explorable:
    • Admin.createDataset()
    • Admin.updateDataset()
    • Admin.dropDataset()
    • Admin.truncateDataset()
    • PartitionedFileSet.addPartition()
    • PartitionedFileSet.dropPartition()

Metrics

  • REST APIs to fetch metrics and logs will fail with 503

Metrics Processor

  • Metrics emitted after the outage will not be returned by the REST APIs to fetch metrics
  • If the outage lasts longer than Kafka retention, any unprocessed metrics older than the retention will be lost

Log Saver

  • Logs emitted after the outage will not be returned by the REST APIs to fetch logs
  • If the outage lasts longer than Kafka retention, any log messages older than the retention will be lost

Master

  • Namespace, Metadata, Preference, Configuration, Security, Artifact, and Lifecycle CDAP REST APIs will fail with 503 
  • Scheduled workflow runs will be missed
  • Program state changes will not be recorded. Program state may be incorrect right after service is restored, but will eventually be corrected
  • Program run records will not be updated. Run records may be incorrect right after service is restored, but will eventually be corrected
  • Programs that are starting but have not yet received their YARN containers will fail
  • cluster metrics like memory available and used will not be collected for the time period of the downtime

Auth

  • CDAP users will be unable to get a security token

Router

All CDAP REST APIs will fail

TODO: What service does SecureStore use?

 

Hadoop Infrastructure

This section documents expected degradation when various Hadoop systems are down. Note that an outage in any one of these services will likely have an impact on the CDAP system services as well.

HBase

  • DatasetService should return 503, so all degradations in DatasetService will apply here
  • Program state will not be recorded. Once service is restored, state will eventually be corrected
    • This should not cause user programs that finish during the outage to fail
  • Program run records will not be updated. Once service is restored, state will eventually be corrected
  • CDAP REST APIs that lookup, create, modify, or delete a CDAP entity will fail with a 503
    • TODO: are there any APIs that don't fall under this category? fetch logs maybe?
  • Metrics still in Kafka will not be processed. If the outage lasts longer than Kafka retention, unprocessed metrics will be lost
  • Log messages still in Kafka will not be saved. If the outage lasts longer than Kafka retention, log messages will be lost
  • Programs will be unable to start. Scheduled program runs will fail
  • See Programmatic APIs section. Applies to:
    • DatasetContext.getDataset() (if the cache is not hit)
    • Admin.datasetExists()
    • Admin.getDatasetType()
    • Admin.getDatasetProperties()
    • Admin.createDataset()
    • Admin.updateDataset()
    • Admin.dropDataset()
    • Admin.truncateDataset()
    • PartitionedFileSet methods
    • Table get and scan methods
    • Due to the usage of BufferingTable, Table put and increment methods won't throw exceptions. HBase unavailability will only be seen by the CDAP platform when a transaction is committed.
  • TODO: investigate effect on HBase replication 

 

Notes:

'normal' HBase failures can take a while to recover from. For example, if an HBase regionserver it can take minutes for the master to notice and reassign the regions. This suggest that the default retry timeout should be pretty high.

Timeouts and retries for Tables may be difficult to control, as HTable and HBaseAdmin have their own logic and settings around retries and timeouts based on hbase-site.xml. HTable has a .setOperationTimeout() that it seems to use for most RPC calls it makes, and which defaults to the value of 'hbase.client.operation.timeout' (which in turn defaults to max long value in some versions of hbase). HBaseAdmin also uses this timeout for some operations, like creating a namespace, but for other operations it uses the 'hbase.client.retries.number' setting along with 'hbase.client.pause', and sometimes multiplied by 'hbase.client.retries.longer.multiplier'. The defaults and logic differ between hbase versions.

In the CDAP system services, it may make sense to use smaller timeouts and retries so that operations always return within 10 seconds or so instead of 20 minutes.

HDFS

  • Transaction snapshots will not be saved. If the transaction server dies, some transaction state will be lost
  • HBase relies on HDFS and will be unavailable. Everything under HBase applies here.
  • All REST APIs will return 503
  • See programmatic APIs section. Applies to all APIs.

YARN

  • All REST APIs will return 503
  • CDAP system services should be re-launched once YARN returns
  • User programs that were running when YARN died should be re-launched once YARN returns
  • TODO: what happens to running java processes? Do they keep running as zombies?
  • If YARN is down longer than the Kafka retention, unprocessed metrics and unsaved log messages will be lost

Kafka

  • Programs that emit logs and metrics will buffer their data for some time. After some configurable timeout, metrics and logs will be dropped
  • Stream size notifications will not be sent
  • Recent, unsaved log messages will not be return by the CDAP REST log endpoints
  • If the outage lasts longer than the Kafka retention, log messages and metrics will be lost

ZooKeeper

  • All CDAP REST APIs will return 503, since CDAP services will be undiscoverable
  • See programmatic APIs section. All APIs apply, since CDAP services will be undiscoverable
  • Any HBase regionserver failures will go unnoticed. If this happens, HBase requests to regions belonging to dead regionservers will fail. See the HBase section for expected behavior in these situations

Kerberos

  • TODO: investigate what happens here. Probably won't be able to start programs and some might expire and die, in which case we should continuously relaunch?

RESTful APIs

The CDAP REST APIs should always respond, even if that response is a 503.

 

Many internal changes will be required to ensure that there is error handling in place in case any remote call fails. For example, many internal services create Tables to store metadata. These services must not assume that table creation was successful. Instead, if it failed, they must keep trying to create that table, and in the meantime throw ServiceUnavailableException in their methods that try and use that Table. 

 

Internal services should also not require underlying infrastructure in order to start up or continue running. For example, the CDAP master today will try and create the default HBase namespace during startup, and will hang for a long time if HBase is down. 

RS-003: System Services HA

Most system services are already scalable to multiple instances (explore is not). Need to verify that if one instance dies, it is re-launched correctly. At a high level, making the Explore service HA amounts to moving any in-memory state to a dataset. Like any explore change, it will probably end up being a significant amount of work. 

Program state transition is also handled in memory in app fabric, (we add a Listener to the program controller), which means state transition logic will not be run if programs are started with one master, then the master fails over before the run finishes. The run record corrector will eventually set the state to 'failed', but that may not match the state it would normally transition to ('completed', 'killed', etc.). CDAP-7496

API changes

New Programmatic APIs

There will be a new ServiceUnavailable exception that relevant methods will throw. It will be a RuntimeException.

Deprecated Programmatic APIs

No programmatic APIs will be deprecated

New REST APIs

No new REST APIs will be introduced

Deprecated REST API

No REST APIs will be deprecated

CLI Impact or Changes

  • None

UI Impact or Changes

  • None

Security Impact 

CDAP should make sure tokens are handled correctly if Kerberos ever does down

Impact on Infrastructure Outages 

Should make CDAP resilient to infrastructure outages

Test Scenarios

It is difficult to test all possible scenarios. A few are listed below and more will be added. It would be good if we could run the long running tests in combination with a chaos monkey.

Test IDTest DescriptionExpected Results Perform failover for of all dependent infrastructure (hdfs, yarn, etc.)CDAP and user programs should recover to normal status Restart all dependent infrastructureCDAP and user programs should recover to normal status Kill infrastructure slaves (datanode, regionserver, nodemanager)CDAP and user programs should recover to normal status
context.addOutput(Output.ofDataset("history"));
}

In this example, the MapReduce job reads from an ObjectMappedTable called "purchases" and writes to a custom dataset called "history". This means that all the code handling retries and timeouts is located in the datasets and not in this program. This means all system datasets must be aware of the retry policy for the program, and apply it to the relevant methods. Setting a 3 hour timeout at configure time will tell the ObjectMappedTable to retry its scan operations for 3 hours while reading data in the map reduce job.

Spark would work in a similar way, with the .fromDataset() and saveAsDataset() being analogous to the addInput() and setOutput() methods in MapReduce. 

Workers

Workers use explicit transaction control for their run method. If a RetryableException is not handled by the user code, the run will fail.

UseDataSet

Fields annotated with the UseDataSet annotation will be injected by the CDAP platform when the class is being instantiated. This amounts to a getDataset() call made by the CDAP framework during instantiation.  If the dataset service is not available when this happens, it would be desirable to retry some amount of time before failing.  The program remote retry policy will be used for this timeout.

Summary

A retry policy for CDAP internal remote calls can be configured per program run using preferences and runtime arguments.

CDAP System Service Failures

These are various services that CDAP runs, both inside YARN and on edge nodes. This section documents what service degradations are expected if various CDAP system services are not available.

Transaction

  • Transactions cannot be started or committed.
  • In contexts where a long transaction is used (MapReduce, Spark), 
    • transaction start will be retried for a configurable amount of time before the run is failed
    • transaction commit will be retried for a configurable amount of time before the run is failed
  • In methods where a short transaction is used:
    • any transaction failure in a Service method will cause a 503 to be returned
    • transaction start failures in a flowlet will be retried indefinitely
    • transaction commit failures in a flowlet will cause the process method to be retried
    • transaction start failures before an initialize() or destroy() method will be retried for a configurable amount of time before causing the program to fail
    • transaction commit failures after an initialize() or destroy() method will cause the method to throw a ServiceUnavailableException. If uncaught, the program will fail
    • transaction start failures in Transactional.execute() will retry transaction start a configurable amount of time before throwing a TransactionFailureException with the ServiceUnavailableException as its cause
    • transaction commit failures in Transactional.execute() will throw a TransactionFailureException with the ServiceUnavailableException as its cause
  • CDAP REST APIs will return a 503
  • Metrics Processor will be unable to process metrics
  • Log Saver will be unable to save logs

Dataset

  • Programs that use the @UseDataSet annotation and that are starting up and that will fail their YARN container. The container will be retried up to a configurable number of times.
  • Dataset REST APIs will return 503
  • Deploy application REST API will fail if the application creates any datasets and return a 503
  • TODO: check if other REST APIs will fail
  • Explore queries on datasets will fail
  • See Programmatic APIs section. Applies to:
    • DatasetContext.getDataset() (if the cache is not hit)
    • ServiceDiscoverer.getServiceURL()
    • SecureStore.listSecureData()
    • SecureStore.getSecureData()
    • Admin.datasetExists()
    • Admin.getDatasetType()
    • Admin.getDatasetProperties()
    • Admin.createDataset()
    • Admin.updateDataset()
    • Admin.dropDataset()
    • Admin.truncateDataset()

Streams

  • Stream REST APIs will return 503
  • Deploy application REST API will fail if the application creates any streams and return a 503
  • Notifications on stream size will not be sent (shouldn't be triggered anyway since there is no way to write to a stream when this happens)
  • StreamWriter methods will throw a ServiceUnavailableException. The developer can catch and handle the exception. If it is not caught and handled, the platform will handle it.
    • StreamWriter is only available in a Worker. Uncaught exceptions will fail the run
  • Old Stream files on HDFS will not be cleaned up until the service is restored
  • See Programmatic APIs section. Applies to:
    • StreamWriter.write() (only in Workers)
    • StreamWriter.writeFile() (only in Workers)
    • StreamBatchWriter (only in Workers)

Explore

  • Explore REST APIs will return with 503
  • Queries that are in progress will be lost
  • Queries that are in progress of writing data are undefined
  • See Programmatic APIs section. Applies to the following methods if the dataset referenced is explorable:
    • Admin.createDataset()
    • Admin.updateDataset()
    • Admin.dropDataset()
    • Admin.truncateDataset()
    • PartitionedFileSet.addPartition()
    • PartitionedFileSet.dropPartition()

Metrics

  • REST APIs to fetch metrics and logs will fail with 503

Metrics Processor

  • Metrics emitted after the outage will not be returned by the REST APIs to fetch metrics
  • If the outage lasts longer than Kafka retention, any unprocessed metrics older than the retention will be lost

Log Saver

  • Logs emitted after the outage will not be returned by the REST APIs to fetch logs
  • If the outage lasts longer than Kafka retention, any log messages older than the retention will be lost

Master

  • Namespace, Metadata, Preference, Configuration, Security, Artifact, and Lifecycle CDAP REST APIs will fail with 503 
  • Scheduled workflow runs will be missed
  • Program state changes will not be recorded. Program state may be incorrect right after service is restored, but will eventually be corrected
  • Program run records will not be updated. Run records may be incorrect right after service is restored, but will eventually be corrected
  • Programs that are starting but have not yet received their YARN containers will fail
  • cluster metrics like memory available and used will not be collected for the time period of the downtime

Auth

  • CDAP users will be unable to get a security token

Router

  • All CDAP REST APIs will fail

TODO: What service does SecureStore use?

 

Hadoop Infrastructure

This section documents expected degradation when various Hadoop systems are down. Note that an outage in any one of these services will likely have an impact on the CDAP system services as well.

HBase

  • DatasetService should return 503, so all degradations in DatasetService will apply here
  • Program state will not be recorded. Once service is restored, state will eventually be corrected
    • This should not cause user programs that finish during the outage to fail
  • Program run records will not be updated. Once service is restored, state will eventually be corrected
  • CDAP REST APIs that lookup, create, modify, or delete a CDAP entity will fail with a 503
    • TODO: are there any APIs that don't fall under this category? fetch logs maybe?
  • Metrics still in Kafka will not be processed. If the outage lasts longer than Kafka retention, unprocessed metrics will be lost
  • Log messages still in Kafka will not be saved. If the outage lasts longer than Kafka retention, log messages will be lost
  • Programs will be unable to start. Scheduled program runs will fail
  • See Programmatic APIs section. Applies to:
    • DatasetContext.getDataset() (if the cache is not hit)
    • Admin.datasetExists()
    • Admin.getDatasetType()
    • Admin.getDatasetProperties()
    • Admin.createDataset()
    • Admin.updateDataset()
    • Admin.dropDataset()
    • Admin.truncateDataset()
    • PartitionedFileSet methods
    • Table get and scan methods
    • Due to the usage of BufferingTable, Table put and increment methods won't throw exceptions. HBase unavailability will only be seen by the CDAP platform when a transaction is committed.
  • TODO: investigate effect on HBase replication 

 

Notes:

'normal' HBase failures can take a while to recover from. For example, if an HBase regionserver it can take minutes for the master to notice and reassign the regions. This suggest that the default retry timeout should be pretty high.

Timeouts and retries for Tables may be difficult to control, as HTable and HBaseAdmin have their own logic and settings around retries and timeouts based on hbase-site.xml. HTable has a .setOperationTimeout() that it seems to use for most RPC calls it makes, and which defaults to the value of 'hbase.client.operation.timeout' (which in turn defaults to max long value in some versions of hbase). HBaseAdmin also uses this timeout for some operations, like creating a namespace, but for other operations it uses the 'hbase.client.retries.number' setting along with 'hbase.client.pause', and sometimes multiplied by 'hbase.client.retries.longer.multiplier'. The defaults and logic differ between hbase versions.

In the CDAP system services, it may make sense to use smaller timeouts and retries so that operations always return within 10 seconds or so instead of 20 minutes.

HDFS

  • Transaction snapshots will not be saved. If the transaction server dies, some transaction state will be lost
  • HBase relies on HDFS and will be unavailable. Everything under HBase applies here.
  • All REST APIs will return 503
  • See programmatic APIs section. Applies to all APIs.

YARN

  • All REST APIs will return 503
  • CDAP system services should be re-launched once YARN returns
  • User programs that were running when YARN died should be re-launched once YARN returns
  • TODO: what happens to running java processes? Do they keep running as zombies?
  • If YARN is down longer than the Kafka retention, unprocessed metrics and unsaved log messages will be lost

Kafka

  • Programs that emit logs and metrics will buffer their data for some time. After some configurable timeout, metrics and logs will be dropped
  • Stream size notifications will not be sent
  • Recent, unsaved log messages will not be return by the CDAP REST log endpoints
  • If the outage lasts longer than the Kafka retention, log messages and metrics will be lost

ZooKeeper

  • All CDAP REST APIs will return 503, since CDAP services will be undiscoverable
  • See programmatic APIs section. All APIs apply, since CDAP services will be undiscoverable
  • Any HBase regionserver failures will go unnoticed. If this happens, HBase requests to regions belonging to dead regionservers will fail. See the HBase section for expected behavior in these situations

Kerberos

  • TODO: investigate what happens here. Probably won't be able to start programs and some might expire and die, in which case we should continuously relaunch?

RESTful APIs

The CDAP REST APIs should always respond, even if that response is a 503.

 

Many internal changes will be required to ensure that there is error handling in place in case any remote call fails. For example, many internal services create Tables to store metadata. These services must not assume that table creation was successful. Instead, if it failed, they must keep trying to create that table, and in the meantime throw ServiceUnavailableException in their methods that try and use that Table. 

 

Internal services should also not require underlying infrastructure in order to start up or continue running. For example, the CDAP master today will try and create the default HBase namespace during startup, and will hang for a long time if HBase is down. 

API changes

New Programmatic APIs

There will be two new exceptions:

Code Block
public class RetryableException extends RuntimeException {
  ...
}
 
public class ServiceUnavailableException extends RetryableException {
  ...
}

 

There will also be preferences and runtime arguments that can be set, which correspond directly to the retry policy properties that are set programmatically.

namedescription
retry.policy.strategy'none', 'fixed_delay', or 'exponential_backoff'
retry.policy.timeouttimeout in millis before CDAP gives up retrying the remote operation
retry.policy.max.retriesmaximum number of retries before CDAP gives up
retry.policy.delay.basestarting delay in millis between retries.
retry.policy.delay.maxmax delay in millis between retries. Only used if the strategy is exponential backoff.

 

Deprecated Programmatic APIs

No programmatic APIs will be deprecated

New REST APIs

No new REST APIs will be introduced

Deprecated REST API

No REST APIs will be deprecated

CLI Impact or Changes

  • None

UI Impact or Changes

  • None

Security Impact 

CDAP should make sure tokens are handled correctly if Kerberos ever does down

Impact on Infrastructure Outages 

Should make CDAP resilient to infrastructure outages

Test Scenarios

It is difficult to test all possible scenarios. A few are listed below and more will be added. It would be good if we could run the long running tests in combination with a chaos monkey.

 

 

ReleaseTest IDComponent or
Sub-System or
Feature Category
DescriptionDetailed StepsExpected OutcomeReal Outcome
 RES - 1 MapReduce execution when CDAP master down.Run a MapReduce program that reads from and writes to a Table. While it is starting, shut down CDAP master. Before the timeout is reached, start the CDAP master again.The program should retry TxManager, DatasetService, and AppFabric calls until master comes back up. 
 RES - 2 Spark execution when CDAP master down.Run a Spark program that reads from and writes to a Table. While it is starting, shut down CDAP master. Before the timeout is reached, start up CDAP master again.The program should retry TxManager, DatasetService, and AppFabric calls until master comes back up. 
 RES -3 Flow processing while CDAP restarts.Run a Flow that reads from and writes to a Table. While it is running shut down CDAP master. After some time, start up CDAP master again.The flow should retry calls until CDAP services come back up. 
 RES - 4 Service behavior when the CDAP master is down.Run a Service that reads from and writes to a Table. While it is running, shut down CDAP master. After some time, start up CDAP master again.Service endpoints should return a 503 within 10 seconds until the CDAP services come back up 
 RES - 5 Workflow custom action execution when CDAP master restarts.Run a Workflow with a custom action that reads from and writes to a Table. While it is running, shut down CDAP master. Before the timeout is reached, start up CDAP master again.The program should retry TxManager, DatasetService, and AppFabric calls until master comes back up. 
 RES - 6 Repeat RES-1 to RES-5, except failover from one CDAP master to another instead of stopping CDAP master services.   
 RES - 7 Repeat RES-1 to RES-5, except shut down HBase instead of CDAP master services.   
 RES - 8 Repeat RES-1 to RES-5, except restart regionservers in a rolling fashion instead of restarting CDAP master services   
 RES - 9 Repeat RES-1 to RES-5, except shut down HDFS instead of CDAP master services.   
 RES - 10 Repeat RES-1 to RES-5, except restart datanodes in a rolling fashion instead of restarting CDAP master services   
 RES - 11 Repeat RES-1 to RES-5, exception failover from one namenode to another instead of restarting CDAP master services   
 RES - 12 Repeat RES-1 to RES-5, except shut down Zookeeper instead of CDAP master services.   
 RES - 13 Shut down HBase. Relevant CDAP REST endpoints should return with 503 within 10 seconds until HBase is back up 
 RES - 14 Shut down HDFS. Relevant CDAP REST endpoints should return with 503 within 10 seconds until HDFS is back up 
 RES - 15 Shut down ZooKeeper. Relevant CDAP REST endpoints should return with 503 within 10 seconds until ZooKeeper is back up 
 RES - 16 Kill TransactionService. Relevant CDAP REST endpoints should return with 503 within 10 seconds. Service should be restarted automatically 
 RES - 17 Kill Dataset Op Service. Relevant CDAP REST endpoints should return with 503 within 10 seconds. Service should be restarted automatically 
 RES - 18CoprocessorsTest rolling CDAP coprocessor upgrade1. set 'master.manage.hbase.coprocessors' to 'false' in cdap-site.xml.
2. follow hbase setup steps on https://wiki.cask.co/display/CE/CDAP+Replication+Deployment to place coprocessor fat jar on regionserver classpath
3. manually modify a coprocessor to contain an extra log message
4. deploy modified coprocessor to regionserver1 and restart regionserver
5. verify new log message shows up in hbase log
6. perform steps 4-5 on other regionservers
CDAP services and user programs should not have been affected during rolling upgrade 
 RES-19Rollling upgradesPerform rolling upgrade of HBase Relevant CDAP REST endpoints should return with 503 within 10 seconds until HBase is back up 
 RES-20Rollling upgradesPerform rolling upgrade of HDFS Relevant CDAP REST endpoints should return with 503 within 10 seconds until HDFS is back up 
 RES-21Rolling upgradesPerform rolling upgrade of Zookeeper Relevant CDAP REST endpoints should return with 503 within 10 seconds until Zookeeper is back up 

Releases

Release 4.1.0

Release 4.2.0

Related Work

 

Future work