Checklist
- User Stories Documented
- User Stories Reviewed
- Design Reviewed
- APIs reviewed
- Release priorities assigned
- Test cases reviewed
- Blog post
Introduction
When CDAP system services are down, or when dependent services like HBase, HDFS, YARN, Kafka, ZooKeeper, Kerberos, etc are down, user programs and CDAP services must remain running, though they may be in a degraded mode where some operations will fail.
Goals
Define the expected behavior of CDAP programmatic APIs during underlying service failures
Define the expected behavior of CDAP RESTful APIs during underlying service failures
User Stories
- As an application developer, I want my Service, Flow, Worker, and SparkStreaming programs to remain running when underlying services experience downtime
- As an application developer, I want my Service, Flow, Worker, and SparkStreaming programs to recover to a normal state when underlying service downtime ends
- As an application developer, I want to be able to configure how long my batch programs (Workflow, MapReduce, Spark batch) should retry before eventually failing when underlying services experience downtime
- As a system administrator, I want CDAP system services to remain running when underlying services experience downtime
- As a system administrator, I want CDAP RESTful APIs to return with http code 503 when required underlying services are unavailable
- As a system administrator, I want CDAP system services to recover to a normal state when underlying service downtime ends
- As a system administrator, I want CDAP program run history recording to be unaffected by failover
- As a system administrator, I want all CDAP system services to be highly available
Design (Work in Progress)
Here we will list out different types of service unavailabilities that may occur, and describe the expected behavior for different types of programs in those situations. In general, unavailability errors that occur when a program is starting up will cause the program to continuously retry startup. Errors that occur in always-running programs will be handled in a way that keeps the program running. Errors that occur in batch programs will be retried for some time before failing the program run.
Note that this documents how these APIs should behave. It does not document how they currently behave. JIRAs will be opened for any differences in behavior and fixed.
CDAP System Services
These are various services that CDAP runs, both inside YARN and on edge nodes.
Transaction
- In implicit transactional contexts, transaction start, commit, and rollback operations will fail and be handled by the platform
- programs executing their initialize() method will fail their YARN container. The container will be retried up to a configurable number of times.
- programs executing their destroy() method will fail the program run.
- Service programs executing an endpoint method will return a 503 error code
- Flow programs will indefinitely retry the processEvent method
- long transactions (in MapReduce, Spark) will retry failed commits for 10 minutes (configurable) before failing the program run
- In explicit transactional contexts (places where the Transactional interface is being used), an ServiceUnavailableException will be thrown. The developer can catch and handle the exception. If it is not caught and handled, the platform will handle the error in the same way that it does in implicit transactional contexts.
- CDAP REST APIs will return a 503
- Metrics Processor will be unable to process metrics
- Log Saver will be unable to save logs
Dataset
- Programs that use the @UseDataSet annotation and that are starting up and that will fail their YARN container. The container will be retried up to a configurable number of times.
- Dataset REST APIs will return 503
- TODO: check if other REST APIs will fail
- Deploy application REST API will fail if the application creates any datasets and return a 503
- DatasetContext.getDataset() that doesn't hit the client cache will throw a ServiceUnavailableException. Dataset methods in Admin (exists, getDatasetType, getDatasetProperties, create, update, drop, truncate) will throw a ServiceUnavailableException. The developer can catch and handle the exception. If it is not caught and handled, the platform will handle it.
- Programs executing their initialize() method will fail their YARN container. The container will be retried up to a configurable number of times
- Programs executing their destroy() method will fail the program run
- Executing a TxRunnable using Transactional.execute() will throw a TransactionFailureException with the ServiceUnavailableException as its cause. If the TransactionFailureException is not caught and handled, the program will fail
- Service programs executing an endpoint method will return a 503 error code
- Flowlet programs executing their processEvent method will indefinitely retry the method with its event
- Mapper, Reducer, CustomAction, Worker, and Spark programs will fail their run
- Explore queries on datasets will fail
- Dataset operations to create, delete, or modify explore partitions will fail and throw a ServiceUnavailableException. If the developer catches and handles the exception, the platform will not do anything extra. Otherwise:
- If this occurs in a program initialize method, the YARN container will fail and will be retried up to a configurable number of times.
- If this occurs in a program destroy method, the YARN container will fail and the program will transition to the FAILED state
- If this occurs in a Flowlet processEvent method, the method will retried indefinitely
- If this occurs in a Service endpoint method, a 503 error code will be returned
- If this occurs in a running Mapper, Reducer, CustomAction, Worker, or Spark program, the exception will fail the program run
Streams
- Stream REST APIs will return 503
- Deploy application REST API will fail if the application creates any streams and return a 503
- Notifications on stream size will not be sent (shouldn't be triggered anyway since there is no way to write to a stream when this happens)
- StreamWriter methods will throw a ServiceUnavailableException. The developer can catch and handle the exception. If it is not caught and handled, the platform will handle it.
- StreamWriter is only available in a Worker. Uncaught exceptions will fail the run
- Old Stream files on HDFS will not be cleaned up until the service is restored
Explore
- Explore REST APIs will return with 503
- Queries that are in progress will be lost
- Queries that are in progress of writing data are undefined
- Create dataset operations of Explorable datasets will fail and throw a ServiceUnavailableException. The dataset will not be deleted. If the developer catches and handles the exception, the platform will not do anything extra. Otherwise:
- If this occurs during application deployment, the application will fail to be deployed
- If this occurs in a program initialize method, the YARN container will fail and will be retried up to a configurable number of times.
- If this occurs in a program destroy method, the YARN container will fail and the program will transition to the FAILED state
- If this occurs in a Flowlet processEvent method, the method will retried indefinitely
- If this occurs in a Service endpoint method, a 503 error code will be returned
- If this occurs in a running Mapper, Reducer, CustomAction, Worker, or Spark program, the exception will fail the program run
- Dataset operations to create, delete, or modify explore partitions will fail and throw a ServiceUnavailableException. If the developer catches and handles the exception, the platform will not do anything extra. Otherwise:
- If this occurs in a program initialize method, the YARN container will fail and will be retried up to a configurable number of times.
- If this occurs in a program destroy method, the YARN container will fail and the program will transition to the FAILED state
- If this occurs in a Flowlet processEvent method, the method will retried indefinitely
- If this occurs in a Service endpoint method, a 503 error code will be returned
- If this occurs in a running Mapper, Reducer, CustomAction, Worker, or Spark program, the exception will fail the program run
Metrics
- REST APIs to fetch metrics and logs will fail with 503
Metrics Processor
- Metrics emitted after the outage will not be returned by the REST APIs to fetch metrics
- If the outage lasts longer than Kafka retention, any unprocessed metrics older than the retention will be lost
Log Saver
- Logs emitted after the outage will not be returned by the REST APIs to fetch logs
- If the outage lasts longer than Kafka retention, any log messages older than the retention will be lost
Master
- Namespace, Metadata, Preference, Configuration, Security, Artifact, and Lifecycle CDAP REST APIs will fail with 503
- Scheduled workflow runs will be missed
- Program state changes will not be recorded. Program state may be incorrect right after service is restored, but will eventually be corrected
- Program run records will not be updated. Run records may be incorrect right after service is restored, but will eventually be corrected
- Programs that are starting but have not yet received their YARN containers will fail
- cluster metrics like memory available and used will not be collected for the time period of the downtime
Auth
- CDAP users will be unable to get a security token
Router
- All CDAP REST APIs will fail
TODO: What service does SecureStore use?
Hadoop Infrastructure
These are various Hadoop systems. Note that an outage in any one of these services will likely have an impact on the CDAP system services as well.
HBase
- DatasetService should return 503, so all degradations in DatasetService will apply here
- Program state will not be recorded. Once service is restored, state will eventually be corrected
- This should not cause user programs that finish during the outage to fail
- Program run records will not be updated. Once service is restored, state will eventually be corrected
- CDAP REST APIs that create, modify, or delete a CDAP entity will fail with a 503
- Metrics still in Kafka will not be processed. If the outage lasts longer than Kafka retention, unprocessed metrics will be lost
- Log messages still in Kafka will not be saved. If the outage lasts longer than Kafka retention, log messages will be lost
- Programs will be unable to start. Scheduled program runs will fail
- Table read operations should throw a ServiceUnavailableException. The user can catch the exception and handle it. Otherwise, the platform will handle it.
- Programs executing their initialize() method will fail their YARN container. The container will be retried up to a configurable number of times
- Programs executing their destroy() method will fail the program run
- Executing a TxRunnable using Transactional.execute() will throw a TransactionFailureException with the ServiceUnavailableException as its cause. If the TransactionFailureException is not caught and handled, the program will fail
- Service programs executing an endpoint method will return a 503 error code
- Flowlet programs executing their processEvent method will indefinitely retry the method with its event
- Mapper, Reducer, CustomAction, Worker, and Spark programs will fail their run
- Table write operations should throw a ServiceUnavailableException. The user can catch the exception and handle it. Otherwise, the platform will handle it.
- Programs executing their initialize() method will fail their YARN container. The container will be retried up to a configurable number of times
- Programs executing their destroy() method will fail the program run
- Executing a TxRunnable using Transactional.execute() will throw a TransactionFailureException with the ServiceUnavailableException as its cause. If the TransactionFailureException is not caught and handled, the program will fail
- Service programs executing an endpoint method will return a 503 error code
- Flowlet programs executing their processEvent method will indefinitely retry the method with its event
- Mapper, Reducer, CustomAction, Worker, and Spark programs will fail their run
- Note: currently, due to the fact that BufferingTable is the Table implementation, writes will only fail when the transaction is committed and not on the actual method call
- TODO: investigate effect on HBase replication
Notes:
'normal' HBase failures can take a while to recover from. For example, if an HBase regionserver it can take minutes for the master to notice and reassign the regions.
Timeouts and retries for Tables may be difficult to control, as HTable and HBaseAdmin have their own logic and settings around retries and timeouts based on hbase-site.xml. HTable has a .setOperationTimeout() that it seems to use for most RPC calls it makes, and which defaults to the value of 'hbase.client.operation.timeout' (which in turn defaults to max long value in some versions of hbase). HBaseAdmin also uses this timeout for some operations, like creating a namespace, but for other operations it uses the 'hbase.client.retries.number' setting along with 'hbase.client.pause', and sometimes multiplied by 'hbase.client.retries.longer.multiplier'. The defaults and logic differ between hbase versions.
In the CDAP system services, it may make sense to use smaller timeouts and retries so that operations always return within 10 seconds or so instead of 20 minutes. For CDAP programs, it may make sense to allow users to set various hbase settings using preferences and runtime arguments to control the behavior of each program run. For example, you may want a mapreduce job that takes a week to complete to timeout after an hour, but a mapreduce job that only takes 5 minutes to complete may want to timeout after one minute.
HDFS
- FileSet operations will throw a ServiceUnavailableException. The user can catch the exception and handle it. Otherwise the platform will handle it.'
- Transaction snapshots will not be saved. If the transaction server dies, transaction state will be lost
YARN
- lots of bad stuff happens
- TODO: fill this out
Kafka
- programs that emit logs and metrics will buffer their data for some time. After some configurable timeout, metrics and logs will be dropped
- stream size notifications will not be sent
ZooKeeper
- service discovery will fail
- all CDAP REST APIs will return 503
- all CDAP Master instances will become followers, making all Master services unavailable (see CDAP Master section)
- Any HBase regionserver failures will go unnoticed. If this happens, HBase requests to regions belonging to dead regionservers will fail. See the HBase section for expected behavior in these situations
RESTful APIs
The CDAP REST APIs should always respond, even if that response is a 503.
Many internal changes will be required to ensure that there is error handling in place in case any remote call fails. For example, many internal services create Tables to store metadata. These services must not assume that table creation was successful. Instead, if it failed, they must keep trying to create that table, and in the meantime throw ServiceUnavailableException in their methods that try and use that Table.
Internal services should also not require underlying infrastructure in order to start up or continue running. For example, the CDAP master today will try and create the default HBase namespace during startup, and will hang for a long time if HBase is down.
HA
Most system services are already scalable to multiple instances (explore is not). Need to verify that if one instance dies, it is re-launched correctly. Making the Explore service HA is a significant amount of work.
Program state transition is also handled in memory in app fabric, (we add a Listener to the program controller), which means state transition logic will not be run if programs are started with one master, then the master fails over before the run finishes. (CDAP-2013)
API changes
New Programmatic APIs
There will be a new ServiceUnavailable exception that relevant methods will throw. It will be a RuntimeException.
Deprecated Programmatic APIs
No programmatic APIs will be deprecated
New REST APIs
No new REST APIs will be introduced
Deprecated REST API
No REST APIs will be deprecated
CLI Impact or Changes
- None
UI Impact or Changes
- None
Security Impact
CDAP should make sure tokens are handled correctly if Kerberos ever does down
Impact on Infrastructure Outages
Should make CDAP resilient to infrastructure outages
Test Scenarios
It is difficult to test all possible scenarios. A few are listed below and more will be added. It would be good if we could run the long running tests in combination with a chaos monkey.
Test ID | Test Description | Expected Results |
---|---|---|
Perform failover for of all dependent infrastructure (hdfs, yarn, etc.) | CDAP should recover to normal status | |
Restart all dependent infrastructure | CDAP should recover to normal status | |
Kill infrastructure slaves (datanode, regionserver, nodemanager) | CDAP should recover to normal status | |
Create a flow that generates the ordered sequence of integers up to some number, and writes them to a Table. Start the flow, then restart cdap-master. | The output table should contain all generated numbers, and run history should be recorded correctly. | |
Same as above, except with a worker | same as above | |
| ||
Releases
Release 4.1.0
Release 4.2.0
Related Work
Future work