Handling exceptions from Java ExecutorService tasks

JavaMultithreadingExceptionExecutorserviceThreadpoolexecutor

Java Problem Overview


I'm trying to use Java's ThreadPoolExecutor class to run a large number of heavy weight tasks with a fixed number of threads. Each of the tasks has many places during which it may fail due to exceptions.

I've subclassed ThreadPoolExecutor and I've overridden the afterExecute method which is supposed to provide any uncaught exceptions encountered while running a task. However, I can't seem to make it work.

For example:

public class ThreadPoolErrors extends ThreadPoolExecutor {
	public ThreadPoolErrors() {
		super(	1, // core threads
				1, // max threads
				1, // timeout
				TimeUnit.MINUTES, // timeout units
				new LinkedBlockingQueue<Runnable>() // work queue
		);
	}
	
	protected void afterExecute(Runnable r, Throwable t) {
		super.afterExecute(r, t);
		if(t != null) {
			System.out.println("Got an error: " + t);
		} else {
			System.out.println("Everything's fine--situation normal!");
		}
	}
	
	public static void main( String [] args) {
		ThreadPoolErrors threadPool = new ThreadPoolErrors();
		threadPool.submit( 
				new Runnable() {
					public void run() {
						throw new RuntimeException("Ouch! Got an error.");
					}
				}
		);
		threadPool.shutdown();
	}
}

The output from this program is "Everything's fine--situation normal!" even though the only Runnable submitted to the thread pool throws an exception. Any clue to what's going on here?

Thanks!

Java Solutions


Solution 1 - Java

WARNING: It should be noted that this solution will block the calling thread.


If you want to process exceptions thrown by the task, then it is generally better to use Callable rather than Runnable.

Callable.call() is permitted to throw checked exceptions, and these get propagated back to the calling thread:

Callable task = ...
Future future = executor.submit(task);
try {
   future.get();
} catch (ExecutionException ex) {
   ex.getCause().printStackTrace();
}

If Callable.call() throws an exception, this will be wrapped in an ExecutionException and thrown by Future.get().

This is likely to be much preferable to subclassing ThreadPoolExecutor. It also gives you the opportunity to re-submit the task if the exception is a recoverable one.

Solution 2 - Java

From the docs:

> Note: When actions are enclosed in > tasks (such as FutureTask) either > explicitly or via methods such as > submit, these task objects catch and > maintain computational exceptions, and > so they do not cause abrupt > termination, and the internal > exceptions are not passed to this > method.

When you submit a Runnable, it'll get wrapped in a Future.

Your afterExecute should be something like this:

public final class ExtendedExecutor extends ThreadPoolExecutor {
    
    // ...
    
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        if (t == null && r instanceof Future<?>) {
            try {
                Future<?> future = (Future<?>) r;
                if (future.isDone()) {
                    future.get();
                }
            } catch (CancellationException ce) {
                t = ce;
            } catch (ExecutionException ee) {
                t = ee.getCause();
            } catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
            }
        }
        if (t != null) {
            System.out.println(t);
        }
    }
}

Solution 3 - Java

The explanation for this behavior is right in the javadoc for afterExecute:

> Note: When actions are enclosed in > tasks (such as FutureTask) either > explicitly or via methods such as > submit, these task objects catch and > maintain computational exceptions, and > so they do not cause abrupt > termination, and the internal > exceptions are not passed to this > method.

Solution 4 - Java

I got around it by wrapping the supplied runnable submitted to the executor.

CompletableFuture.runAsync(() -> {
        try {
              runnable.run();
        } catch (Throwable e) {
              Log.info(Concurrency.class, "runAsync", e);
        }
}, executorService);

Solution 5 - Java

I'm using VerboseRunnable class from jcabi-log, which swallows all exceptions and logs them. Very convenient, for example:

import com.jcabi.log.VerboseRunnable;
scheduler.scheduleWithFixedDelay(
  new VerboseRunnable(
    Runnable() {
      public void run() { 
        // the code, which may throw
      }
    },
    true // it means that all exceptions will be swallowed and logged
  ),
  1, 1, TimeUnit.MILLISECONDS
);

Solution 6 - Java

Another solution would be to use the ManagedTask and ManagedTaskListener.

You need a Callable or Runnable which implements the interface ManagedTask.

The method getManagedTaskListener returns the instance you want.

public ManagedTaskListener getManagedTaskListener() {

And you implement in ManagedTaskListener the taskDone method:

@Override
public void taskDone(Future<?> future, ManagedExecutorService executor, Object task, Throwable exception) {
    if (exception != null) {
        LOGGER.log(Level.SEVERE, exception.getMessage());
    }
}

More details about managed task lifecycle and listener.

Solution 7 - Java

This works

  • It is derived from SingleThreadExecutor, but you can adapt it easily
  • Java 8 lamdas code, but easy to fix

It will create a Executor with a single thread, that can get a lot of tasks; and will wait for the current one to end execution to begin with the next

In case of uncaugth error or exception the uncaughtExceptionHandler will catch it

public final class SingleThreadExecutorWithExceptions {

public static ExecutorService newSingleThreadExecutorWithExceptions(final Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {

    ThreadFactory factory = (Runnable runnable)  -> {
        final Thread newThread = new Thread(runnable, "SingleThreadExecutorWithExceptions");
        newThread.setUncaughtExceptionHandler( (final Thread caugthThread,final Throwable throwable) -> {
            uncaughtExceptionHandler.uncaughtException(caugthThread, throwable);
        });
        return newThread;
    };
    return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                    0L, TimeUnit.MILLISECONDS,
                    new LinkedBlockingQueue<Runnable>(),
                    factory){


                protected void afterExecute(Runnable runnable, Throwable throwable) {
                    super.afterExecute(runnable, throwable);
                    if (throwable == null && runnable instanceof Future<?>) {
                        try {
                            Future<?> future = (Future<?>) runnable;
                            if (future.isDone()) {
                                future.get();
                            }
                        } catch (CancellationException ce) {
                            throwable = ce;
                        } catch (ExecutionException ee) {
                            throwable = ee.getCause();
                        } catch (InterruptedException ie) {
                            Thread.currentThread().interrupt(); // ignore/reset
                        }
                    }
                    if (throwable != null) {
                        uncaughtExceptionHandler.uncaughtException(Thread.currentThread(),throwable);
                    }
                }
            });
}



private static class FinalizableDelegatedExecutorService
        extends DelegatedExecutorService {
    FinalizableDelegatedExecutorService(ExecutorService executor) {
        super(executor);
    }
    protected void finalize() {
        super.shutdown();
    }
}

/**
 * A wrapper class that exposes only the ExecutorService methods
 * of an ExecutorService implementation.
 */
private static class DelegatedExecutorService extends AbstractExecutorService {
    private final ExecutorService e;
    DelegatedExecutorService(ExecutorService executor) { e = executor; }
    public void execute(Runnable command) { e.execute(command); }
    public void shutdown() { e.shutdown(); }
    public List<Runnable> shutdownNow() { return e.shutdownNow(); }
    public boolean isShutdown() { return e.isShutdown(); }
    public boolean isTerminated() { return e.isTerminated(); }
    public boolean awaitTermination(long timeout, TimeUnit unit)
            throws InterruptedException {
        return e.awaitTermination(timeout, unit);
    }
    public Future<?> submit(Runnable task) {
        return e.submit(task);
    }
    public <T> Future<T> submit(Callable<T> task) {
        return e.submit(task);
    }
    public <T> Future<T> submit(Runnable task, T result) {
        return e.submit(task, result);
    }
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
            throws InterruptedException {
        return e.invokeAll(tasks);
    }
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                         long timeout, TimeUnit unit)
            throws InterruptedException {
        return e.invokeAll(tasks, timeout, unit);
    }
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
            throws InterruptedException, ExecutionException {
        return e.invokeAny(tasks);
    }
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                           long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException {
        return e.invokeAny(tasks, timeout, unit);
    }
}



private SingleThreadExecutorWithExceptions() {}

}

Solution 8 - Java

If you want to monitor the execution of task, you could spin 1 or 2 threads (maybe more depending on the load) and use them to take tasks from an ExecutionCompletionService wrapper.

Solution 9 - Java

If your ExecutorService comes from an external source (i. e. it's not possible to subclass ThreadPoolExecutor and override afterExecute()), you can use a dynamic proxy to achieve the desired behavior:

public static ExecutorService errorAware(final ExecutorService executor) {
	return (ExecutorService) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
			new Class[] {ExecutorService.class},
			(proxy, method, args) -> {
				if (method.getName().equals("submit")) {
					final Object arg0 = args[0];
					if (arg0 instanceof Runnable) {
						args[0] = new Runnable() {
							@Override
							public void run() {
								final Runnable task = (Runnable) arg0;
								try {
									task.run();
									if (task instanceof Future<?>) {
										final Future<?> future = (Future<?>) task;

										if (future.isDone()) {
											try {
												future.get();
											} catch (final CancellationException ce) {
												// Your error-handling code here
												ce.printStackTrace();
											} catch (final ExecutionException ee) {
												// Your error-handling code here
												ee.getCause().printStackTrace();
											} catch (final InterruptedException ie) {
												Thread.currentThread().interrupt();
											}
										}
									}
								} catch (final RuntimeException re) {
									// Your error-handling code here
									re.printStackTrace();
									throw re;
								} catch (final Error e) {
									// Your error-handling code here
									e.printStackTrace();
									throw e;
								}
							}
						};
					} else if (arg0 instanceof Callable<?>) {
						args[0] = new Callable<Object>() {
							@Override
							public Object call() throws Exception {
								final Callable<?> task = (Callable<?>) arg0;
								try {
									return task.call();
								} catch (final Exception e) {
									// Your error-handling code here
									e.printStackTrace();
									throw e;
								} catch (final Error e) {
									// Your error-handling code here
									e.printStackTrace();
									throw e;
								}
							}
						};
					}
				}
				return method.invoke(executor, args);
			});
}

Solution 10 - Java

This is because of AbstractExecutorService :: submit is wrapping your runnable into RunnableFuture (nothing but FutureTask) like below

AbstractExecutorService.java

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null); /////////HERE////////
    execute(ftask);
    return ftask;
}

Then execute will pass it to Worker and Worker.run() will call the below.

ThreadPoolExecutor.java

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();           /////////HERE////////
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

> Finally task.run(); in the above code call will call > FutureTask.run(). Here is the exception handler code, because of > this you are NOT getting the expected exception.

class FutureTask<V> implements RunnableFuture<V>

public void run() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();
                ran = true;
            } catch (Throwable ex) {   /////////HERE////////
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

Solution 11 - Java

This is similar to mmm's solution, but a bit more understandable. Have your tasks extend an abstract class that wraps the run() method.

public abstract Task implements Runnable {

    public abstract void execute();

    public void run() {
      try {
        execute();
      } catch (Throwable t) {
        // handle it  
      }
    }
}


public MySampleTask extends Task {
    public void execute() {
        // heavy, error-prone code here
    }
}

Solution 12 - Java

Instead of subclassing ThreadPoolExecutor, I would provide it with a ThreadFactory instance that creates new Threads and provides them with an UncaughtExceptionHandler

Attributions

All content for this solution is sourced from the original question on Stackoverflow.

The content on this page is licensed under the Attribution-ShareAlike 4.0 International (CC BY-SA 4.0) license.

Content TypeOriginal AuthorOriginal Content on Stackoverflow
QuestionTomView Question on Stackoverflow
Solution 1 - JavaskaffmanView Answer on Stackoverflow
Solution 2 - JavanosView Answer on Stackoverflow
Solution 3 - JavaDrew WillsView Answer on Stackoverflow
Solution 4 - JavamjsView Answer on Stackoverflow
Solution 5 - Javayegor256View Answer on Stackoverflow
Solution 6 - JavaCSchulzView Answer on Stackoverflow
Solution 7 - Javaobesga_tirantView Answer on Stackoverflow
Solution 8 - JavaCristian BotizaView Answer on Stackoverflow
Solution 9 - JavaBassView Answer on Stackoverflow
Solution 10 - JavaKanagavelu SugumarView Answer on Stackoverflow
Solution 11 - JavaccleveView Answer on Stackoverflow
Solution 12 - JavaKevinView Answer on Stackoverflow