Scala Futures - built in timeout?

ScalaConcurrency

Scala Problem Overview


there is an aspect of futures that I do not exactly understand from the official tutorial ref. http://docs.scala-lang.org/overviews/core/futures.html

Do futures in scala have a built in time-out mechanism of some kind? Let's say the example below was a 5 gigabyte text file... does the implied scope of "Implicits.global" eventually cause onFailure to fire in a non-blocking way or can that be defined? And without a default time-out of some kind, wouldn't that imply it's possible neither success nor failure would ever fire?

import scala.concurrent._
import ExecutionContext.Implicits.global

val firstOccurence: Future[Int] = future {
  val source = scala.io.Source.fromFile("myText.txt")
  source.toSeq.indexOfSlice("myKeyword")
}
firstOccurence onSuccess {
  case idx => println("The keyword first appears at position: " + idx)
}
firstOccurence onFailure {
  case t => println("Could not process file: " + t.getMessage)
}

Scala Solutions


Solution 1 - Scala

You only get timeout behavior when you use blocking to get the results of the Future. If you want to use the non-blocking callbacks onComplete, onSuccess or onFailure, then you would have to roll your own timeout handling. Akka has built in timeout handling for request/response (?) messaging between actors, but not sure if you want to start using Akka. FWIW, in Akka, for timeout handling, they compose two Futures together via Future.firstCompletedOf, one which represents the actual async task and one that represents the timeout. If the timeout timer (via a HashedWheelTimer) pops first, you get a failure on the async callback.

A very simplified example of rolling your own might go something like this. First, an object for scheduling timeouts:

import org.jboss.netty.util.{HashedWheelTimer, TimerTask, Timeout}
import java.util.concurrent.TimeUnit
import scala.concurrent.duration.Duration
import scala.concurrent.Promise
import java.util.concurrent.TimeoutException

object TimeoutScheduler{
  val timer = new HashedWheelTimer(10, TimeUnit.MILLISECONDS)
  def scheduleTimeout(promise:Promise[_], after:Duration) = {
    timer.newTimeout(new TimerTask{
      def run(timeout:Timeout){              
        promise.failure(new TimeoutException("Operation timed out after " + after.toMillis + " millis"))        
      }
    }, after.toNanos, TimeUnit.NANOSECONDS)
  }
}

Then a function to take a Future and add timeout behavior to it:

import scala.concurrent.{Future, ExecutionContext, Promise}
import scala.concurrent.duration.Duration

def withTimeout[T](fut:Future[T])(implicit ec:ExecutionContext, after:Duration) = {
  val prom = Promise[T]()
  val timeout = TimeoutScheduler.scheduleTimeout(prom, after)
  val combinedFut = Future.firstCompletedOf(List(fut, prom.future))
  fut onComplete{case result => timeout.cancel()}
  combinedFut
}

Note that the HashedWheelTimer I am using here is from Netty.

Solution 2 - Scala

All of these answers require additional dependencies. I decided to write a version using java.util.Timer which is an efficient way to run a function in the future, in this case to trigger a timeout.

Blog post with more details here

Using this with Scala's Promise, we can make a Future with timeout as follows:

package justinhj.concurrency

import java.util.concurrent.TimeoutException
import java.util.{Timer, TimerTask}

import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.language.postfixOps

object FutureUtil {

  // All Future's that use futureWithTimeout will use the same Timer object
  // it is thread safe and scales to thousands of active timers
  // The true parameter ensures that timeout timers are daemon threads and do not stop
  // the program from shutting down

  val timer: Timer = new Timer(true)

  /**
    * Returns the result of the provided future within the given time or a timeout exception, whichever is first
    * This uses Java Timer which runs a single thread to handle all futureWithTimeouts and does not block like a
    * Thread.sleep would
    * @param future Caller passes a future to execute
    * @param timeout Time before we return a Timeout exception instead of future's outcome
    * @return Future[T]
    */
  def futureWithTimeout[T](future : Future[T], timeout : FiniteDuration)(implicit ec: ExecutionContext): Future[T] = {

    // Promise will be fulfilled with either the callers Future or the timer task if it times out
    val p = Promise[T]

    // and a Timer task to handle timing out

    val timerTask = new TimerTask() {
      def run() : Unit = {
            p.tryFailure(new TimeoutException())
        }
      }

    // Set the timeout to check in the future
    timer.schedule(timerTask, timeout.toMillis)

    future.map {
      a =>
        if(p.trySuccess(a)) {
          timerTask.cancel()
        }
    }
    .recover {
      case e: Exception =>
        if(p.tryFailure(e)) {
          timerTask.cancel()
        }
    }

    p.future
  }

}

Solution 3 - Scala

I've just created a TimeoutFuture class for a coworker:

TimeoutFuture

package model
 
import scala.concurrent._
import scala.concurrent.duration._
import play.libs.Akka
import play.api.libs.concurrent.Execution.Implicits._

object TimeoutFuture {
  def apply[A](timeout: FiniteDuration)(block: => A): Future[A] = {
    
    val prom = promise[A]
 
    // timeout logic
    Akka.system.scheduler.scheduleOnce(timeout) {
      prom tryFailure new java.util.concurrent.TimeoutException
    }
    
    // business logic
    Future { 
      prom success block
    }
 
    prom.future
  } 
}

Usage

val future = TimeoutFuture(10 seconds) { 
  // do stuff here
}
 
future onComplete {
  case Success(stuff) => // use "stuff"
  case Failure(exception) => // catch exception (either TimeoutException or an exception inside the given block)
}

Notes:

  • Assumes Play! framework (but it's easy enough to adapt)
  • Every piece of code runs in the same ExecutionContext which may not be ideal.

Solution 4 - Scala

Play framework contains Promise.timeout so you can write code like following

private def get(): Future[Option[Boolean]] = {
  val timeoutFuture = Promise.timeout(None, Duration("1s"))
  val mayBeHaveData = Future{
    // do something
    Some(true)
  }

  // if timeout occurred then None will be result of method
  Future.firstCompletedOf(List(mayBeHaveData, timeoutFuture))
}

Solution 5 - Scala

I'm quite surprise this is not standard in Scala. My versions is short and has no dependencies

import scala.concurrent.Future

sealed class TimeoutException extends RuntimeException

object FutureTimeout {

  import scala.concurrent.ExecutionContext.Implicits.global

  implicit class FutureTimeoutLike[T](f: Future[T]) {
    def withTimeout(ms: Long): Future[T] = Future.firstCompletedOf(List(f, Future {
      Thread.sleep(ms)
      throw new TimeoutException
    }))

    lazy val withTimeout: Future[T] = withTimeout(2000) // default 2s timeout
  }

}

Usage example

import FutureTimeout._
Future { /* do smth */ } withTimeout

Solution 6 - Scala

If you want the writer (promise holder) to be the one who controls the timeout logic, use akka.pattern.after, in the following way:

val timeout = akka.pattern.after(10 seconds, system.scheduler)(Future.failed(new TimeoutException(s"timed out during...")))
Future.firstCompletedOf(Seq(promiseRef.future, timeout))

This way, if your promise completion logic never takes place, your caller's future will still be completed at some point with a failure.

Solution 7 - Scala

You can specify the timeout when you wait on the future:

For scala.concurrent.Future, the result method lets you specify a timeout.

For scala.actors.Future, Futures.awaitAll lets you specify a timeout.

I do not think there is a timeout built-in the execution of a Future.

Solution 8 - Scala

Nobody's mentioned akka-streams, yet. The flows have an easy completionTimeout method, and applying that on a single-source stream works like a Future.

But, akka-streams also does cancellation so it can actually end the source from running, i.e. it signals the timeout to the source.

Solution 9 - Scala

Monix Task has timeout support

import monix.execution.Scheduler.Implicits.global
import monix.eval._
import scala.concurrent.duration._
import scala.concurrent.TimeoutException

val source = Task("Hello!").delayExecution(10.seconds)

// Triggers error if the source does not complete in 3 seconds after runOnComplete
val timedOut = source.timeout(3.seconds)

timedOut.runOnComplete(r => println(r))
//=> Failure(TimeoutException)

Solution 10 - Scala

This version works without using any external timer (just Await.result)

import scala.concurrent._
import scala.concurrent.duration.FiniteDuration

object TimeoutFuture {
    def apply[A](
        timeout: FiniteDuration
    )(block: => A)(implicit executor: ExecutionContext): Future[A] =
        try {
            Future { Await.result(Future { block }, timeout) }
        } catch {
            case _: TimeoutException => Future.failed(new TimeoutException(s"Timed out after ${timeout.toString}"))
        }
}

Solution 11 - Scala

I'm using this version (based on Play example above) which uses Akka system dispatcher:

object TimeoutFuture {
  def apply[A](system: ActorSystem, timeout: FiniteDuration)(block: => A): Future[A] = {
    implicit val executionContext = system.dispatcher

    val prom = Promise[A]

    // timeout logic
    system.scheduler.scheduleOnce(timeout) {
      prom tryFailure new java.util.concurrent.TimeoutException
    }

    // business logic
    Future {
      try {
        prom success block
      } catch {
        case t: Throwable => prom tryFailure t
      }
    }

    prom.future
  }
}

Solution 12 - Scala

The simplest way to specify timeout on Future IMO is scala's built in mechanism using scala.concurrent.Await.ready This will throw a TimeoutException if the Future takes longer than the specified timeout. Otherwise, it will return the Future itself. Here is a simple contrived example

import scala.concurrent.ExecutionContext.Implicits._
import scala.concurrent.duration._
val f1: Future[Int] = Future {
  Thread.sleep(1100)
  5
}

val fDoesntTimeout: Future[Int] = Await.ready(f1, 2000 milliseconds)

val f: Future[Int] = Future {
  Thread.sleep(1100)
  5
}
val fTimesOut: Future[Int] = Await.ready(f, 100 milliseconds)

Solution 13 - Scala

You can wait for a future to finish by making use of Await.

import scala.concurrent.duration._
import scala.concurrent.{Await, Future}

val meaningOfLife: Int = Await.result(Future(42), 1.nano)
println (meaningOfLife)

The above prints 42

You may need an implicit ExecutionContext available in which case, just add:

import scala.concurrent.ExecutionContext.Implicits.global

Another way to do it is to use Coeval from monix. This method does not work in all situations, and you can read all about it here. The basic idea is that sometimes a future does not really take any time and is returning the result of a synchronous function call or value, so this future can be evaluated on the current thread. This is also useful for testing and mocking futures. Also you don't have to specify a timeout which is expected, but still nice to not have to worry about that.

You start by transforming the future into a Task and wrap that task in a Coeval then cross your fingers as you wait to see what you get. This is a very simple example to show how it works:

You need an implicit Scheduler to be able to use it:

import monix.execution.Scheduler.Implicits.global


Coeval(Task.fromFuture(Future (42)).runSyncStep).value() match {
   case Right(v) => println(v)
   case Left(task) => println("Task did not finish")
}

The above completes and prints 42 to the console.

Coeval(Task.fromFuture(Future {
   scala.concurrent.blocking {
      42
   }
}).runSyncStep).value() match {
   case Right(v) => println(v)
   case Left(task) => println("Task did not finish")
}

This example prints Task did not finish:

Solution 14 - Scala

You can simply run the future to completion without giving any timeout interval by setting the timeout to infinite as below:

**import scala.concurrent.duration._  
Await.result(run(executionContext), Duration.Inf)**

run function can be as below :

def run(implicit ec: ExecutionContext) = {  
      val list = Seq(  
          Future { println("start 1"); Thread.sleep(1000); println("stop 1")},  
          Future { println("start 2"); Thread.sleep(2000); println("stop 2")},  
          Future { println("start 3"); Thread.sleep(3000); println("stop 3")},  
          Future { println("start 4"); Thread.sleep(4000); println("stop 4")},  
          Future { println("start 5"); Thread.sleep(5000); println("stop 5")}  
      )  
      Future.sequence(list)  
    }  

Attributions

All content for this solution is sourced from the original question on Stackoverflow.

The content on this page is licensed under the Attribution-ShareAlike 4.0 International (CC BY-SA 4.0) license.

Content TypeOriginal AuthorOriginal Content on Stackoverflow
QuestionLaloInDublinView Question on Stackoverflow
Solution 1 - ScalacmbaxterView Answer on Stackoverflow
Solution 2 - ScalajustinhjView Answer on Stackoverflow
Solution 3 - ScalaPablo FernandezView Answer on Stackoverflow
Solution 4 - ScalaKirView Answer on Stackoverflow
Solution 5 - ScalaRaulView Answer on Stackoverflow
Solution 6 - ScalagalbarmView Answer on Stackoverflow
Solution 7 - Scalagzm0View Answer on Stackoverflow
Solution 8 - ScalaakauppiView Answer on Stackoverflow
Solution 9 - ScalaWeiChing 林煒清View Answer on Stackoverflow
Solution 10 - ScalaunveloperView Answer on Stackoverflow
Solution 11 - ScalaPJ FanningView Answer on Stackoverflow
Solution 12 - ScalasparkerView Answer on Stackoverflow
Solution 13 - Scalasmac89View Answer on Stackoverflow
Solution 14 - ScalaNikunj KakadiyaView Answer on Stackoverflow