Akka Actor not terminating if an exception is thrown
ScalaRoutingActorAkkaFault ToleranceScala Problem Overview
I am currently trying to get started with Akka and I am facing a weird problem. I've got the following code for my Actor:
class AkkaWorkerFT extends Actor {
def receive = {
case Work(n, c) if n < 0 => throw new Exception("Negative number")
case Work(n, c) => self reply n.isProbablePrime(c);
}
}
And this is how I start my workers:
val workers = Vector.fill(nrOfWorkers)(actorOf[AkkaWorkerFT].start());
val router = Routing.loadBalancerActor(SmallestMailboxFirstIterator(workers)).start()
And this is how I shut everything down:
futures.foreach( _.await )
router ! Broadcast(PoisonPill)
router ! PoisonPill
Now what happens is if I send the workers messages with n > 0 (no exception is thrown), everything works fine and the application shuts down properly. However, as soon as I send it a single message which results in an exception, the application does not terminate because there is still an actor running, but I can't figure out where it comes from.
In case it helps, this is the stack of the thread in question:
Thread [akka:event-driven:dispatcher:event:handler-6] (Suspended)
Unsafe.park(boolean, long) line: not available [native method]
LockSupport.park(Object) line: 158
AbstractQueuedSynchronizer$ConditionObject.await() line: 1987
LinkedBlockingQueue<E>.take() line: 399
ThreadPoolExecutor.getTask() line: 947
ThreadPoolExecutor$Worker.run() line: 907
MonitorableThread(Thread).run() line: 680
MonitorableThread.run() line: 182
PS: The thread which is not terminating isn't any of the worker threads, because I've added a postStop callback, every one of them stops properly.
PPS: Actors.registry.shutdownAll
workarounds the problem, but I think shutdownAll should only be used as a last resort, shouldn't it?
Scala Solutions
Solution 1 - Scala
The proper way to handle problems inside akka actors is not to throw an exception but rather to set supervisor hierarchies
> "Throwing an exception in concurrent code (let’s assume we are using > non-linked actors), will just simply blow up the thread that currently > executes the actor. > > There is no way to find out that things went wrong (apart from > inspecting the stack trace). > There is nothing you can do about it."
see Fault Tolerance Through Supervisor Hierarchies (1.2)
*** note *** the above is true for old versions of Akka (1.2) In newer versions (e.g. 2.2) you'd still set a supervisor hierarchy but it will trap Exceptions thrown by child processes. e.g.
class Child extends Actor {
var state = 0
def receive = {
case ex: Exception ⇒ throw ex
case x: Int ⇒ state = x
case "get" ⇒ sender ! state
}
}
and in the supervisor:
class Supervisor extends Actor {
import akka.actor.OneForOneStrategy
import akka.actor.SupervisorStrategy._
import scala.concurrent.duration._
override val supervisorStrategy =
OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
case _: ArithmeticException ⇒ Resume
case _: NullPointerException ⇒ Restart
case _: IllegalArgumentException ⇒ Stop
case _: Exception ⇒ Escalate
}
def receive = {
case p: Props ⇒ sender ! context.actorOf(p)
}
}
Solution 2 - Scala
Turning off the logging to make sure things terminate, as proposed by Viktor, is a bit strange. What you can do instead is:
EventHandler.shutdown()
that cleanly shuts down all the (logger) listeners that keep the world running after the exception:
def shutdown() {
foreachListener(_.stop())
EventHandlerDispatcher.shutdown()
}
Solution 3 - Scala
Turn of the logger in the akka.conf