ThreadPoolExecutor Block When Queue Is Full?

JavaMultithreadingConcurrencyExecutorserviceExecutor

Java Problem Overview


I am trying to execute lots of tasks using a ThreadPoolExecutor. Below is a hypothetical example:

def workQueue = new ArrayBlockingQueue<Runnable>(3, false)
def threadPoolExecutor = new ThreadPoolExecutor(3, 3, 1L, TimeUnit.HOURS, workQueue)
for(int i = 0; i < 100000; i++)
    threadPoolExecutor.execute(runnable)

The problem is that I quickly get a java.util.concurrent.RejectedExecutionException since the number of tasks exceeds the size of the work queue. However, the desired behavior I am looking for is to have the main thread block until there is room in the queue. What is the best way to accomplish this?

Java Solutions


Solution 1 - Java

In some very narrow circumstances, you can implement a java.util.concurrent.RejectedExecutionHandler that does what you need.

RejectedExecutionHandler block = new RejectedExecutionHandler() {
  rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
     executor.getQueue().put( r );
  }
};

ThreadPoolExecutor pool = new ...
pool.setRejectedExecutionHandler(block);

Now. This is a very bad idea for the following reasons

  • It's prone to deadlock because all the threads in the pool may die before the thing you put in the queue is visible. Mitigate this by setting a reasonable keep alive time.
  • The task is not wrapped the way your Executor may expect. Lots of executor implementations wrap their tasks in some sort of tracking object before execution. Look at the source of yours.
  • Adding via getQueue() is strongly discouraged by the API, and may be prohibited at some point.

A almost-always-better strategy is to install ThreadPoolExecutor.CallerRunsPolicy which will throttle your app by running the task on the thread which is calling execute().

However, sometimes a blocking strategy, with all its inherent risks, is really what you want. I'd say under these conditions

  • You only have one thread calling execute()
  • You have to (or want to) have a very small queue length
  • You absolutely need to limit the number of threads running this work (usually for external reasons), and a caller-runs strategy would break that.
  • Your tasks are of unpredictable size, so caller-runs could introduce starvation if the pool was momentarily busy with 4 short tasks and your one thread calling execute got stuck with a big one.

So, as I say. It's rarely needed and can be dangerous, but there you go.

Good Luck.

Solution 2 - Java

What you need to do is to wrap your ThreadPoolExecutor into Executor which explicitly limits amount of concurrently executed operations inside it:

 private static class BlockingExecutor implements Executor {

    final Semaphore semaphore;
    final Executor delegate;

    private BlockingExecutor(final int concurrentTasksLimit, final Executor delegate) {
        semaphore = new Semaphore(concurrentTasksLimit);
        this.delegate = delegate;
    }

    @Override
    public void execute(final Runnable command) {
        try {
            semaphore.acquire();
        } catch (InterruptedException e) {
            e.printStackTrace();
            return;
        }

        final Runnable wrapped = () -> {
            try {
                command.run();
            } finally {
                semaphore.release();
            }
        };

        delegate.execute(wrapped);

    }
}

You can adjust concurrentTasksLimit to the threadPoolSize + queueSize of your delegate executor and it will pretty much solve your problem

Solution 3 - Java

You could use a semaphore to block threads from going into the pool.

ExecutorService service = new ThreadPoolExecutor(
    3, 
    3, 
    1, 
    TimeUnit.HOURS, 
    new ArrayBlockingQueue<>(6, false)
);

Semaphore lock = new Semaphore(6); // equal to queue capacity

for (int i = 0; i < 100000; i++ ) {
    try {
        lock.acquire();
        service.submit(() -> {
            try {
              task.run();
            } finally {
              lock.release();
            }
        });
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
}

Some gotchas:

  • Only use this pattern with a fixed thread pool. The queue is unlikely to be full often, thus new threads won't be created. Check out the java docs on ThreadPoolExecutor for more details: https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ThreadPoolExecutor.html There is a way around this, but it is out of scope of this answer.
  • Queue size should be higher than the number of core threads. If we were to make the queue size 3, what would end up happening is:
    • T0: all three threads are doing work, the queue is empty, no permits are available.
    • T1: Thread 1 finishes, releases a permit.
    • T2: Thread 1 polls the queue for new work, finds none, and waits.
    • T3: Main thread submits work into the pool, thread 1 starts work.

The example above translates to thread the main thread blocking thread 1. It may seem like a small period, but now multiply the frequency by days and months. All of a sudden, short periods of time add up to a large amount of time wasted.

Solution 4 - Java

This is what I ended up doing:

int NUM_THREADS = 6;
Semaphore lock = new Semaphore(NUM_THREADS);
ExecutorService pool = Executors.newCachedThreadPool();

for (int i = 0; i < 100000; i++) {
    try {
        lock.acquire();
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
    pool.execute(() -> {
        try {
            // Task logic
        } finally {
            lock.release();
        }
    });
}

Solution 5 - Java

A fairly straightforward option is to wrap your BlockingQueue with an implementation that calls put(..) when offer(..) is being invoked:

public class BlockOnOfferAdapter<T> implements BlockingQueue<T> {

(..)

  public boolean offer(E o) {
        try {
            delegate.put(o);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return false;
        }
        return true;
  }

(.. implement all other methods simply by delegating ..)

}

This works because by default put(..) waits until there is capacity in the queue when it is full, see:

    /**
     * Inserts the specified element into this queue, waiting if necessary
     * for space to become available.
     *
     * @param e the element to add
     * @throws InterruptedException if interrupted while waiting
     * @throws ClassCastException if the class of the specified element
     *         prevents it from being added to this queue
     * @throws NullPointerException if the specified element is null
     * @throws IllegalArgumentException if some property of the specified
     *         element prevents it from being added to this queue
     */
    void put(E e) throws InterruptedException;

No catching of RejectedExecutionException or complicated locking necessary.

Solution 6 - Java

Here is my code snippet in this case:

public void executeBlocking( Runnable command ) {
    if ( threadPool == null ) {
        logger.error( "Thread pool '{}' not initialized.", threadPoolName );
        return;
    }
    ThreadPool threadPoolMonitor = this;
    boolean accepted = false;
    do {
        try {
            threadPool.execute( new Runnable() {
                @Override
                public void run() {
                    try {
                        command.run();
                    }
                    // to make sure that the monitor is freed on exit
                    finally {
                        // Notify all the threads waiting for the resource, if any.
                        synchronized ( threadPoolMonitor ) {
                            threadPoolMonitor.notifyAll();
                        }
                    }
                }
            } );
            accepted = true;
        }
        catch ( RejectedExecutionException e ) {
            // Thread pool is full
            try {
                // Block until one of the threads finishes its job and exits.
                synchronized ( threadPoolMonitor ) {
                    threadPoolMonitor.wait();
                }
            }
            catch ( InterruptedException ignored ) {
                // return immediately
                break;
            }
        }
    } while ( !accepted );
}

threadPool is a local instance of java.util.concurrent.ExecutorService which has been initialized already.

Solution 7 - Java

I solved this problem using a custom RejectedExecutionHandler, which simply blocks the calling thread for a little while and then tries to submit the task again:

public class BlockWhenQueueFull implements RejectedExecutionHandler {

    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {

        // The pool is full. Wait, then try again.
        try {
            long waitMs = 250;
            Thread.sleep(waitMs);
        } catch (InterruptedException interruptedException) {}

        executor.execute(r);
    }
}

This class can just be used in the thread-pool executor as a RejectedExecutionHandler like any other. In this example:

executorPool = new def threadPoolExecutor = new ThreadPoolExecutor(3, 3, 1L, TimeUnit.HOURS, workQueue, new BlockWhenQueueFull())

The only downside I see is that the calling thread might get locked slightly longer than strictly necessary (up to 250ms). For many short-running tasks, perhaps decrease the wait-time to 10ms or so. Moreover, since this executor is effectively being called recursively, very long waits for a thread to become available (hours) might result in a stack overflow.

Nevertheless, I personally like this method. It's compact, easy to understand, and works well. Am I missing anything important?

Solution 8 - Java

Ok, old thread but this is what I found when searching for blocking thread executor. My code tries to get a semaphore when the task is submitted to the task queue. This blocks if there are no semaphores left. As soon as a task is done the semaphore is released with the decorator. Scary part is that there is a possibility of losing semaphore but that could be solved with for example a timed job that just clears semaphores on a timed basis.

So here my solution:

class BlockingThreadPoolTaskExecutor(concurrency: Int) : ThreadPoolTaskExecutor() {
    companion object {
        lateinit var semaphore: Semaphore
    }

    init {
        semaphore = Semaphore(concurrency)
        val semaphoreTaskDecorator = SemaphoreTaskDecorator()
        this.setTaskDecorator(semaphoreTaskDecorator)
    }

    override fun <T> submit(task: Callable<T>): Future<T> {
        log.debug("submit")
        semaphore.acquire()
        return super.submit(task)
    }
}

private class SemaphoreTaskDecorator : TaskDecorator {
    override fun decorate(runnable: Runnable): Runnable {
        log.debug("decorate")
        return Runnable {
            try {
                runnable.run()
            } finally {
                log.debug("decorate done")
                semaphore.release()
            }
        }
    }
}

Solution 9 - Java

One could use a LinkedBlockingQueue with ThreadPoolExecutor:

ThreadPoolExecutor threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 
            keepAliveTime, unit, new LinkedBlockingQueue<Runnable>(YOUR_QUEUE_SIZE));

Don't worry about ThreadPoolExecutor calling add(e) or offer(e) instead of put(e); ThreadPoolExecutor knows when to use one or the other. ThreadPoolExecutor assumes that is dealing with a BlockingQueue, nothing more or less, hence it won't call e.g. offer(e) instead of put(e) because doing so means relying on the fact that the provided BlockingQueue is unbounded which might not be the case.

Attributions

All content for this solution is sourced from the original question on Stackoverflow.

The content on this page is licensed under the Attribution-ShareAlike 4.0 International (CC BY-SA 4.0) license.

Content TypeOriginal AuthorOriginal Content on Stackoverflow
QuestionghemptonView Question on Stackoverflow
Solution 1 - JavaDarren GilroyView Answer on Stackoverflow
Solution 2 - JavaKonstantin AbakumovView Answer on Stackoverflow
Solution 3 - JavadevkaoruView Answer on Stackoverflow
Solution 4 - JavadisruptView Answer on Stackoverflow
Solution 5 - JavaThommView Answer on Stackoverflow
Solution 6 - JavaMoeHossView Answer on Stackoverflow
Solution 7 - JavaTinkerTankView Answer on Stackoverflow
Solution 8 - JavaMilo van der ZeeView Answer on Stackoverflow
Solution 9 - JavaAdrianView Answer on Stackoverflow