CompletableFuture can't be interrupted
Future.cancel()
not inly given Future
will terminate pending get()
, but also it will try to interrupt underlying thread. This is a pretty important feature that enables better thread pool utilization. I also wrote to always prefer CompletableFuture
over standard Future
. It turns out the more powerful younger brother of Future
doesn't handle cancel()
so elegantly. Consider the following task, which we'll use later throughout the tests:class InterruptibleTask implements Runnable {Client threads can examine
private final CountDownLatch started = new CountDownLatch(1)
private final CountDownLatch interrupted = new CountDownLatch(1)
@Override
void run() {
started.countDown()
try {
Thread.sleep(10_000)
} catch (InterruptedException ignored) {
interrupted.countDown()
}
}
void blockUntilStarted() {
started.await()
}
void blockUntilInterrupted() {
assert interrupted.await(1, TimeUnit.SECONDS)
}
}
InterruptibleTask
to see whether it has started or was interrupted. First let's see how InterruptibleTask
reacts to cancel()
from outside:def "Future is cancelled without exception"() {So far so good. Clearly both
given:
def task = new InterruptibleTask()
def future = myThreadPool.submit(task)
task.blockUntilStarted()
and:
future.cancel(true)
when:
future.get()
then:
thrown(CancellationException)
}
def "CompletableFuture is cancelled via CancellationException"() {
given:
def task = new InterruptibleTask()
def future = CompletableFuture.supplyAsync({task.run()} as Supplier, myThreadPool)
task.blockUntilStarted()
and:
future.cancel(true)
when:
future.get()
then:
thrown(CancellationException)
}
Future
and CompletableFuture
work pretty much the same way - retrieving result after it was canceled throws CancellationException
. But what about thread in myThreadPool
? I thought it will be interrupted and thus recycled by the pool, how wrong was I!def "should cancel Future"() {First test submits ordinary
given:
def task = new InterruptibleTask()
def future = myThreadPool.submit(task)
task.blockUntilStarted()
when:
future.cancel(true)
then:
task.blockUntilInterrupted()
}
@Ignore("Fails with CompletableFuture")
def "should cancel CompletableFuture"() {
given:
def task = new InterruptibleTask()
def future = CompletableFuture.supplyAsync({task.run()} as Supplier, myThreadPool)
task.blockUntilStarted()
when:
future.cancel(true)
then:
task.blockUntilInterrupted()
}
Runnable
to ExecutorService
and waits until it's started. Later we cancel Future
and wait until InterruptedException
is observed. blockUntilInterrupted()
will return when underlying thread is interrupted. Second test, however, fails. CompletableFuture.cancel()
will never interrupt underlying thread, so despite Future
looking as if it was cancelled, backing thread is still running and no InterruptedException
is thrown from sleep()
. Bug or a feature? It's documented, so unfortunately a feature:Parameters:RTFM, you say, but whymayInterruptIfRunning
- this value has no effect in this implementation because interrupts are not used to control processing.
CompletableFuture
works this way? First let's examine how "old" Future
implementations differ from CompletableFuture
. FutureTask
, returned from ExecutorService.submit()
has the following cancel()
implementation (I removed Unsafe
with similar non-thread safe Java code, so treat it as pseudo code only):public boolean cancel(boolean mayInterruptIfRunning) {
if (state != NEW)
return false;
state = mayInterruptIfRunning ? INTERRUPTING : CANCELLED;
try {
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
state = INTERRUPTED;
}
}
} finally {
finishCompletion();
}
return true;
}
FutureTask
has a state
variable that follows this state diagram:In case of
cancel()
we can either enter CANCELLED
state or go to INTERRUPTED
through INTERRUPTING
. The core part is where we take runner
thread (if exists, i.e. if task is currently being executed) and we try to interrupt it. This branch takes care of eager and forced interruption of already running thread. In the end we must notify all threads blocked on Future.get()
in finishCompletion()
(irrelevant here). So it's pretty obvious how old Future
cancels already running tasks. What about CompletableFuture
? Pseudo-code of cancel()
:public boolean cancel(boolean mayInterruptIfRunning) {Quite disappointing, we barely set
boolean cancelled = false;
if (result == null) {
result = new AltResult(new CancellationException());
cancelled = true;
}
postComplete();
return cancelled || isCancelled();
}
result
to CancellationException
, ignoring mayInterruptIfRunning
flag. postComplete()
has a similar role to finishCompletion()
- notifies all pending callbacks registered on that future. Its implementation is rather unpleasant (using non-blocking Treiber stack) but it definitely doesn't interrupt any underlying thread.Reasons and implications
Limitedcancel()
in case of CompletableFuture
is not a bug, but a design decision. CompletableFuture
is not inherently bound to any thread, while Future
almost always represents background task. It's perfectly fine to create CompletableFuture
from scratch (new CompletableFuture<>()
) where there is simply no underlying thread to cancel. Still I can't help the feeling that majority of CompletableFuture
s will have an associated task and background thread. In that case malfunctioning cancel()
is a potential problem. I no longer advice blindly replacing Future
with CompletableFuture
as it might change the behavior of applications relying on cancel()
. This means CompletableFuture
intentionally breaks Liskov substitution principle - and this is a serious implication to consider.Tags: CompletableFuture, concurrency, java 8