Guava services tutorial with examples
Introduction
This article aims at introducing commonly used implementation of Guava Service interface in version 13.0.1 with concrete examples from CDAP. This article contains material adapted or quoted from https://github.com/google/guava/wiki/ServiceExplained and Java docs in Guava source code, but citation is omitted for readability and convenience.
Overview
Guava Service interface represents an object with an operational state, with start() and stop() methods that return a ListenableFuture that represents the result of an asynchronous transition to a desired state. Synchronous startAndWait() and stopAndWait() methods waits for the transition to a desired state to complete. For example, webservers, RPC servers, and timers, can implement the Service interface.Managing the state of services like these, which require proper startup and shutdown management, can be nontrivial, especially if multiple threads or scheduling is involved. Guava provides some skeletons to manage the state logic and synchronization details for you.
Principles of Using a Service
1. A Service implementation must implement only the valid state transitions:
The normal lifecycle of a Service is
Service.State.NEWtoService.State.STARTINGtoService.State.RUNNINGtoService.State.STOPPINGtoService.State.TERMINATED
Any exceptions thrown at states Service.State.NEW, Service.State.STARTING, Service.State.RUNNING or Service.State.STOPPING will lead to Service.State.FAILED. Service.State.FAILED will also be reached if there are failures or if Service#stop() is called before the Service reaches the Service.State.RUNNING state. Service#stop()
Service#addListener(Listener listener, Executor executor) method can add an Service.Listener that listens to every state transition of the Service and execute certain actions with the Executor. The set of legal transitions form a DAG (http://en.wikipedia.org/wiki/Directed_acyclic_graph), therefore every method of the listener will be called at most once. An implementation of the Service interface must implement all the state transitions as defined here.
2. All state transitions must complete within finite amount of time and not be blocked:
A service may make use of other services. However, services should perform state transitions independently, without waiting for certain conditions of other services. Otherwise, state transitions may be blocked indefinitely when services waiting each other forms a situation like dead lock.
3. A Service implementation must respond to start() or startAndWait() by transitioning to Service.State.RUNNING
When start() or startAndWait() is called for the first time in a Service, the service will respond by by transitioning form Service.State.NEW to Service.State.RUNNING. Any exceptions happening during the state transition will cause the service to transition into Service.State.FAILED. start() returns a ListenableFuture representing an asynchronous computation of this state transition. On the other hand, startAndWait() waits for the state transition to complete. Therefore, after startAndWait() returns, the service should be guaranteed to be in Service.State.RUNNING if no exception occurs.
4. A Service implementation must respond to stop() or stopAndWait() by transitioning to a terminal state, and close all the threads and objects initialized by it:
Unless already at a terminal or Service.State.NEW state, no matter what is the current state of a service, it must respond to stop() asynchronously and stopAndWait() synchronously by transitioning to one of the terminal states: Service.State.TERMINATED, or if any exception occurs or the service hasn't reached Service.State.FAILED . Since, a service cannot ever leave a terminal state, when a service reaches the Service.State.RUNNINGService.State.FAILED or Service.State.TERMINATED state, all the threads and objects initialized when the service starts must be properly closed.
To correctly follow all the requirements listed above can be challenging. Therefore, it's strongly recommended to use the following abstract classes in Guava which implement this interface and make the threading and state management easier.
Implementations
AbstractService
When you need to do your own manual thread management, override AbstractService directly. Typically, you should be well served by one of the below implementations, but implementing AbstractService is recommended when, for example, you are modeling something that provides its own threading semantics as a Service , you have your own specific threading requirements.
To implement AbstractService you must implement 2 methods.
doStart():doStart()is called directly by the first call tostart(). The service is inService.State.STARTINGstate whendoStart()method is called.doStart()should perform all initialization and eventually MUST callnotifyStarted()to transition the service intoService.State.RUNNINGstate if start up succeeded.startAndWait()will only return afternotifyStarted()returns successfully. Any Throwable thrown bydoStart()will incurnotifyFailed()to transition the service intoService.State.FAILEDstate.doStop():doStop()is called directly by the first call tostop()only if the service inService.State.RUNNINGorService.State.STARTINGstate, which meansdoStart()must have completed successfully. YourdoStop()method should shut down your service and then eventually MUST callnotifyStopped()if shutdown succeeded.startAndWait()will only return afternotifyStarted()returns successfully. Any Throwable thrown bydoStop()will incurnotifyFailed()to transition the service intoService.State.FAILEDstate.
start() and stop() methods of AbstractService run in the same thread in which they are called. Your doStart() and doStop() methods should be fast, because the service object is locked when doStart() or doStop() is running, and no other state transition can happen when the service object is locked. If you need to do expensive initialization, such as reading files, opening network connections, or any operation that might block, you should consider moving that work to another thread.
An example implementation of AbstractService in CDAP is RetryOnStartFailureService. It wraps around another Service such that, if the wrapped service failed to start, it will get restarted based on the RetryStrategy.
Notice that RetryOnStartFailureService#doStart() creates a new thread startupThread and immediately calls notifyStarted() after the asynchronous start of the startupThread, regardless of whether the wrapped service currentDelegate has successfully started. Therefore, calling RetryOnStartFailureService#startAndWait() can only guarantee that this RetryOnStartFailureService is in Service.State.RUNNING but makes no guarantee that the wrapped service in RetryOnStartFailureService has finished startup.
public class RetryOnStartFailureService extends AbstractService {
...
private volatile Service currentDelegate;
...
/**
* Creates a new instance.
*
* @param delegate a {@link Supplier} that gives new instance of the delegating Service.
* @param retryStrategy strategy to use for retrying
*/
public RetryOnStartFailureService(Supplier<Service> delegate, RetryStrategy retryStrategy) {
this.delegate = delegate;
this.currentDelegate = delegate.get();
this.delegateServiceName = currentDelegate.getClass().getSimpleName();
this.retryStrategy = retryStrategy;
}
@Override
protected void doStart() {
startupThread = new Thread("Endure-Service-" + delegateServiceName) {
@Override
public void run() {
int failures = 0;
long startTime = System.currentTimeMillis();
long delay = 0L;
while (delay >= 0 && !stopped) {
try {
currentDelegate.start().get();
// Only assigned the delegate if and only if the delegate service started successfully
startedService = currentDelegate;
break;
} catch (InterruptedException e) {
// This thread will be interrupted from the doStop() method. Don't reset the interrupt flag.
} catch (Throwable t) {
...
}
}
}
};
startupThread.start();
notifyStarted();
}
@Override
protected void doStop() {
// doStop() won't be called until doStart() returns, hence the startupThread would never be null
stopped = true;
startupThread.interrupt();
Uninterruptibles.joinUninterruptibly(startupThread);
// Stop the started service if it exists and propagate the stop state
// There could be a small race between the delegate service started successfully and
// the setting of the startedService field. When that happens, the stop failure state is not propagated.
// Nevertheless, there won't be any service left behind without stopping.
if (startedService != null) {
Futures.addCallback(startedService.stop(), new FutureCallback<State>() {
@Override
public void onSuccess(State result) {
notifyStopped();
}
@Override
public void onFailure(Throwable t) {
notifyFailed(t);
}
}, Threads.SAME_THREAD_EXECUTOR);
return;
}
// If there is no started service, stop the current delete, but no need to propagate the stop state
// because if the underlying service is not yet started due to failure, it shouldn't affect the stop state
// of this retrying service.
if (currentDelegate != null) {
currentDelegate.stop().addListener(new Runnable() {
@Override
public void run() {
notifyStopped();
}
}, Threads.SAME_THREAD_EXECUTOR);
return;
}
// Otherwise, if nothing has been started yet, just notify this service is stopped
notifyStopped();
}
...
}
An example implementation of AbstractService in CDAP is RetryOnStartFailureService. It wraps around another Service such that, if the wrapped service failed to start, it will get restarted based on the RetryStrategy.
AbstractIdleService
The AbstractIdleService skeleton implements a Service which does not need to perform any action while in the "running" state -- and therefore does not need a thread while running -- but has startup and shutdown actions to perform. Implementing such a service is as easy as extending AbstractIdleService and implementing the startUp() and shutDown() methods.
When AbstractIdleService#start() or AbstractIdleService#stop() is called, a new thread will be created to call startUp() or shutDown() respectively. Call AbstractIdleService#startAndWait() or AbstractIdleService#stopAndWait() to make sure startUp() or shutDown() completes. shutDown() will only be called if startUp() completed successfully.
ATTENTION: Even though AbstractIdleService runs startUp() or shutDown() in a new thread, notifyStarted() or notifyStopped() is called after these methods respectively. AbstractIdleService#startAndWait() or AbstractIdleService#stopAndWait() will not return until notifyStarted() or notifyStopped()is called.
startUp() has completed, but all other methods in AbstractNotificationService can only run after startUp() completed. This is guaranteed by AbstractNotificationService#startAndWait() being called before other methods in AbstractNotificationService are called. public abstract class AbstractNotificationService extends AbstractIdleService implements NotificationService {
...
@Override
protected void startUp() throws Exception {
transactionSystemClient.startAndWait();
}
@Override
protected void shutDown() throws Exception {
transactionSystemClient.stopAndWait();
}
...
AbstractExecutionThreadService
An AbstractExecutionThreadService performs startup, running, and shutdown actions in a single thread. You must override the run() method, and it must respond to stop requests. For example, you might perform actions in a work loop:
public void run() {
while (isRunning()) {
// perform a unit of work
}
}
Alternately, you may override triggerShutdown() in any way that causes run() to return.
Overriding startUp() and shutDown() is optional, but the service state will be managed for you. startUp() is guaranteed to be called before run() and shutDown() is guaranteed to be called after run() returns or throws any exception.
Note that start() calls your startUp() method, creates a thread for you, and invokes run() in that thread. stop() calls triggerShutdown() , so that shutDown() will be executed in that thread after run() returns. Then eventually the thread dies.
An example implementation of AbstractExecutionThreadService in CDAP is AggregatedMetricsCollectionService. triggerShutdown() calls runThread.interrupt() in order to cause run() to return. run() responds to stop() by catching InterruptedException raised by the interrupt in triggerShutdown(), and exiting the while loop when isRunning() returns false. isRunning() will return false, when stop() is called, since stop() transitions the service to Service.State.STOPPING. shutDown() performs a final action that flushes the metrics since there will be no more metrics to be flushed after run() returns,
public abstract class AggregatedMetricsCollectionService extends AbstractExecutionThreadService
implements MetricsCollectionService {
...
@Override
protected void startUp() throws Exception {
runThread = Thread.currentThread();
}
@Override
protected final void run() {
long sleepMillis = getInitialDelayMillis();
while (isRunning()) {
try {
TimeUnit.MILLISECONDS.sleep(sleepMillis);
...
} catch (InterruptedException e) {
// Expected when stop is called.
break;
}
}
}
...
@Override
protected void shutDown() throws Exception {
// Flush the metrics when shutting down.
publishMetrics(System.currentTimeMillis());
}
@Override
protected void triggerShutdown() {
if (runThread != null) {
runThread.interrupt();
}
}
...
}
AbstractScheduledService
An AbstractScheduledService performs some periodic task while running. Subclasses implement runOneIteration() to specify one iteration of the task, as well as the familiar startUp() and shutDown() methods.
executor() returns the Executor that will be used to run this service. The default implementation returns a new Executor that sets the name of its threads to the string returned by getServiceName() method. Subclass may override this method to use a custom Executor.with a specific name, thread group or priority. startUp() and shutDown() methods will be run in the Executor returned by executor()
Unlike in AbstractExecutionThreadService, it's not guaranteed that startUp() has been called when shutDown() is called.
To describe the execution schedule, you must implement the scheduler() method. Typically, you will use one of the provided schedules from AbstractScheduledService.Scheduler , either newFixedRateSchedule(initialDelay, delay, TimeUnit) or newFixedDelaySchedule(initialDelay, delay, TimeUnit) , corresponding to the familiar methods in ScheduledExecutorService . Custom schedules can be implemented using CustomScheduler ; see the Javadoc for details.
An example implementation of AbstractExecutionThreadService in CDAP is AbstractResourceReporter. It writes out resource metrics every reportInterval seconds which is defined in scheduler().
public abstract class AbstractResourceReporter extends AbstractScheduledService implements ProgramResourceReporter {
...
protected void runOneIteration() throws Exception {
reportResources();
}
@Override
protected void shutDown() throws Exception {
if (executor != null) {
executor.shutdownNow();
}
}
protected Scheduler scheduler() {
return Scheduler.newFixedRateSchedule(0, reportInterval, TimeUnit.SECONDS);
}
@Override
protected final ScheduledExecutorService executor() {
executor = Executors.newSingleThreadScheduledExecutor(Threads.createDaemonThreadFactory("reporter-scheduler"));
return executor;
}
}
Created in 2020 by Google Inc.