How to cancel Future in Scala?

MultithreadingScalaFuture

Multithreading Problem Overview


Java Future has cancel method, which can interrupt the thread, which runs the Future task. For example, if I wrap an interruptible blocking call in a Java Future I can interrupt it later.

Scala Future provides no cancel method. Suppose I wrap an interruptible blocking call in a Scala Future. How can I interrupt it?

Multithreading Solutions


Solution 1 - Multithreading

This is not yet a part of the Futures API, but may be added as an extension in the future.

As a workaround, you could use the firstCompletedOf to wrap 2 futures - the future you want to cancel and a future that comes from a custom Promise. You could then cancel the thus created future by failing the promise:

def cancellable[T](f: Future[T])(customCode: => Unit): (() => Unit, Future[T]) = {
  val p = Promise[T]
  val first = Future firstCompletedOf Seq(p.future, f)
  val cancellation: () => Unit = {
    () =>
      first onFailure { case e => customCode}
      p failure new Exception
  }
  (cancellation, first)
}

Now you can call this on any future to obtain a "cancellable wrapper". Example use-case:

val f = callReturningAFuture()
val (cancel, f1) = cancellable(f) {
  cancelTheCallReturningAFuture()
}

// somewhere else in code
if (condition) cancel() else println(Await.result(f1))

EDIT:

For a detailed discussion on cancellation, see Chapter 4 in the Learning concurrent programming in Scala book.

Solution 2 - Multithreading

I haven't tested this, but this expands on the answer of Pablo Francisco Pérez Hidalgo. Instead of blocking waiting for the java Future, we use an intermediate Promise instead.

import java.util.concurrent.{Callable, FutureTask}
import scala.concurrent.{ExecutionContext, Promise}
import scala.util.Try

class Cancellable[T](executionContext: ExecutionContext, todo: => T) {
  private val promise = Promise[T]()

  def future = promise.future

  private val jf: FutureTask[T] = new FutureTask[T](
    new Callable[T] {
      override def call(): T = todo
    }
  ) {
    override def done() = promise.complete(Try(get()))
  }

  def cancel(): Unit = jf.cancel(true)

  executionContext.execute(jf)
}

object Cancellable {
  def apply[T](todo: => T)(implicit executionContext: ExecutionContext): Cancellable[T] =
    new Cancellable[T](executionContext, todo)
}

Solution 3 - Multithreading

By cancelling I guess you would like to violently interrupt the future.

Found this segment of code: https://gist.github.com/viktorklang/5409467

Did a few tests and seems to work fine!

Enjoy :)

Solution 4 - Multithreading

I think it is possible to reduce the complexity of the implementations provided by making use of the Java 7 Future interface and its implementations.

Cancellable can build a Java future which is the one to be cancelled by its cancel method. Another future can wait for its completion thus becoming the observable interface which is itself immutable in state:

 class Cancellable[T](executionContext: ExecutionContext, todo: => T) {
 
   private val jf: FutureTask[T] = new FutureTask[T](
     new Callable[T] {
       override def call(): T = todo
     }
   )
 
   executionContext.execute(jf)
 
   implicit val _: ExecutionContext = executionContext
 
   val future: Future[T] = Future {
     jf.get
   }
 
   def cancel(): Unit = jf.cancel(true)
 
 }

 object Cancellable {
   def apply[T](todo: => T)(implicit executionContext: ExecutionContext): Cancellable[T] =
     new Cancellable[T](executionContext, todo)
 }

Attributions

All content for this solution is sourced from the original question on Stackoverflow.

The content on this page is licensed under the Attribution-ShareAlike 4.0 International (CC BY-SA 4.0) license.

Content TypeOriginal AuthorOriginal Content on Stackoverflow
QuestionMichaelView Question on Stackoverflow
Solution 1 - Multithreadingaxel22View Answer on Stackoverflow
Solution 2 - MultithreadingnightingaleView Answer on Stackoverflow
Solution 3 - MultithreadingGeorge PligoropoulosView Answer on Stackoverflow
Solution 4 - MultithreadingPablo Francisco Pérez HidalgoView Answer on Stackoverflow