Asynchronous timeouts with Completable Future in Java 8 and 9

Java 8 introduced CompletableFuture<T> as an enhancement to Future<T>. It is a new class which lets you express the flow of information from different tasks using a callback driven style. A CompletableFuture is used for defining computations on singular events, which is a different use case than computations on streams of events (e.g. Observable using RxJava). In this article, you will learn about the problem with timeouts in Java 8’s CompletableFuture and the improvements that Java 9 brings.

Combining two services

For the purpose of this article, let’s say you’d like to combine the result of two services over the network:

  1. A best price finder for a flight route
  2. An exchange service that converts USD to GBP

Both of these services will introduce a certain delay before responding back with a result. This is due to the costs of network communication with the service.

You could solve this problem by making use of CompletableFuture as follows (by default a CompletableFuture uses the common thread pool but this can be parametrised with an Executor using an overload of supplyAsync):

  1. BigDecimal amount =
  2. CompletableFuture.supplyAsync(() -> findBestPrice("LDN - NYC"))
  3. .thenCombine(CompletableFuture.supplyAsync(() -> queryExchangeRateFor("GBP")),
  4. this::convert)
  5. .get();

In the code above, the method convert takes the two BigDecimal results from findBestPrice and queryExchangeRateFor and calculates the final amount. You can refer to the javadoc for a more detailed description of the methods available using CompletableFuture.

Timeout mechanism

However, there are a few problems associated with this code. First, get() is a blocking call. This means that the main thread will have to wait until the result is ready before it can progress. Ideally, you’d like the main thread to do other useful work while the result is calculated in the background. Second, the main thread could be blocking indefinitely because there isn’t a timeout specified. What if one of the service is overloaded and doesn’t respond? To add a timeout mechanism, you can use the other version of get inherited from Future which throws a TimeoutException when the overall pipeline takes longer than a certain amount of time to return the result:

  1. BigDecimal result =
  2. CompletableFuture.supplyAsync(() -> findBestPrice("LDN - NYC"), executorService)
  3. .thenCombine(CompletableFuture.supplyAsync(() -> queryExchangeRateFor("GBP")),
  4. this::convert)
  5. .get(1, TimeUnit.SECONDS);

Unfortunately this code is still blocking and prevents the main thread from doing useful work in the meantime! To tackle this issue, you can refactor the above code to use thenAccept and provide a callback which is executed when the result is finally available:

  1. CompletableFuture.supplyAsync(() -> findBestPrice("LDN - NYC"), executorService)
  2. .thenCombine(CompletableFuture.supplyAsync(() -> queryExchangeRateFor("GBP")),
  3. this::convert)
  4. .thenAccept(amount -> System.out.println("The price is: " + amount + "GBP"));

However, using this approach we lost the timeout functionality! Ideally we’d like to specify a timeout using a non-blocking method. Unfortunately there isn’t a built-in elegant support to solve this problem in Java 8. Solutions available include using acceptEither() or applyToEither together with the CompletableFuture you are waiting the result for and another CompletableFuture which wraps up a ScheduledThreadpoolExecutor that throws a TimeoutException after a certain time:

  1. CompletableFuture.supplyAsync(() -> findBestPrice("LDN - NYC"), executorService)
  2. .thenCombine(CompletableFuture.supplyAsync(() -> queryExchangeRateFor("GBP")),
  3. this::convert)
  4. .acceptEither(timeoutAfter(1, TimeUnit.SECONDS),
  5. amount -> System.out.println("The price is: " + amount + "GBP"));

A simple implementation of timeoutAfter is as follows where delayer is an instance of a ScheduledThreadPoolExecutor:

  1. public <T> CompletableFuture<T> timeoutAfter(long timeout, TimeUnit unit) {
  2. CompletableFuture<T> result = new CompletableFuture<T>();
  3. delayer.schedule(() -> result.completeExceptionally(new TimeoutException()), timeout, unit);
  4. return result;
  5. }

Java 9 improvements

Java 9’s CompletableFuture introduces several new methods amongst which are orTimeout and completeOnTimeOut that provide built-in support for dealing with timeouts.

The method orTimeout has the following signature:

  1. public CompletableFuture<T> orTimeout(long timeout, TimeUnit unit)

It internally uses a ScheduledThreadExecutor and completes the CompletableFuture with a TimeoutException after the specified timeout has elapsed. It also returns another CompletableFuture, meaning you can further chain your computation pipeline and deal with the TimeoutException by providing a friendly message back. Note that whenComplete in the code below could report other exceptional completions that might occur before the timeout occurs.

For example:

  1. CompletableFuture.supplyAsync(() -> findBestPrice("LDN - NYC"), executorService)
  2. .thenCombine(CompletableFuture.supplyAsync(() -> queryExchangeRateFor("GBP")),
  3. this::convert)
  4. .orTimeout(1, TimeUnit.SECONDS)
  5. .whenComplete((amount, error) -> {
  6. if (error == null) {
  7. System.out.println("The price is: " + amount + "GBP");
  8. } else {
  9. System.out.println("Sorry, we could not return you a result");
  10. }
  11. });

The method completeOnTimeout has the following signature:

  1. public CompletableFuture<T> completeOnTimeout(T value, long timeout, TimeUnit unit)

It also uses a ScheduledThreadExecutor internally but in contrast to orTimeout, it provides a default value in the case that the CompletableFuture pipeline times out. You can conceptually relate this method to orElse() using java.util.Optional.

  1. CompletableFuture.supplyAsync(() -> findBestPrice("LDN - NYC"), executorService)
  2. .thenCombine(CompletableFuture.supplyAsync(() -> queryExchangeRateFor("GBP")),
  3. this::convert)
  4. .completeOnTimeout(DEFAULT_PRICE, 1, TimeUnit.SECONDS)
  5. .thenAccept(amount -> {
  6. System.out.println("The price is: " + amount + "GBP");
  7. });

In the code above, if the services are slow to respond, a default price is provided otherwise the result of combining the two services is returned.

If you are curious about the internal implementation details of these methods, you can view the source of CompletableFuture in JDK9 here.

If you interested to learn more about this asynchronous style of programming check out our intensive Asynchronous and Reactive Java in-house course by using the button below.

Learn about Asynchronous and Reactive Java course

For everything Java 8 related, we also teach a popular two-day Modern Development using Java 8 course, which you can see using the button below.

Learn about Java 8 course