Java: ExecutorService that blocks on submission after a certain queue size

JavaConcurrencyThreadpoolExecutorservice

Java Problem Overview


I am trying to code a solution in which a single thread produces I/O-intensive tasks that can be performed in parallel. Each task have significant in-memory data. So I want to be able limit the number of tasks that are pending at a moment.

If I create ThreadPoolExecutor like this:

    ThreadPoolExecutor executor = new ThreadPoolExecutor(numWorkerThreads, numWorkerThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>(maxQueue));

Then the executor.submit(callable) throws RejectedExecutionException when the queue fills up and all the threads are already busy.

What can I do to make executor.submit(callable) block when the queue is full and all threads are busy?

EDIT: I tried this:

executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

And it somewhat achieves the effect that I want achieved but in an inelegant way (basically rejected threads are run in the calling thread, so this blocks the calling thread from submitting more).

EDIT: (5 years after asking the question)

To anyone reading this question and its answers, please don't take the accepted answer as one correct solution. Please read through all answers and comments.

Java Solutions


Solution 1 - Java

I have done this same thing. The trick is to create a BlockingQueue where the offer() method is really a put(). (you can use whatever base BlockingQueue impl you want).

public class LimitedQueue<E> extends LinkedBlockingQueue<E> 
{
    public LimitedQueue(int maxSize)
    {
        super(maxSize);
    }

    @Override
    public boolean offer(E e)
    {
        // turn offer() and add() into a blocking calls (unless interrupted)
        try {
            put(e);
            return true;
        } catch(InterruptedException ie) {
            Thread.currentThread().interrupt();
        }
        return false;
    }
    
}

Note that this only works for thread pool where corePoolSize==maxPoolSize so be careful there (see comments).

Solution 2 - Java

Here is how I solved this on my end:

(note: this solution does block the thread that submits the Callable, so it prevents RejectedExecutionException from being thrown )

public class BoundedExecutor extends ThreadPoolExecutor{
   
    private final Semaphore semaphore;
    
    public BoundedExecutor(int bound) {
        super(bound, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
        semaphore = new Semaphore(bound);
    }
    
    /**Submits task to execution pool, but blocks while number of running threads 
     * has reached the bound limit
     */
    public <T> Future<T> submitButBlockIfFull(final Callable<T> task) throws InterruptedException{

        semaphore.acquire();            
        return submit(task);                    
    }
    

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);

        semaphore.release();
    }
}

Solution 3 - Java

The currently accepted answer has a potentially significant problem - it changes the behavior of ThreadPoolExecutor.execute such that if you have a corePoolSize < maxPoolSize, the ThreadPoolExecutor logic will never add additional workers beyond the core.

From ThreadPoolExecutor.execute(Runnable):

    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);

Specifically, that last 'else' block willl never be hit.

A better alternative is to do something similar to what OP is already doing - use a RejectedExecutionHandler to do the same put logic:

public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
    try {
        if (!executor.isShutdown()) {
            executor.getQueue().put(r);
        }
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
        throw new RejectedExecutionException("Executor was interrupted while the task was waiting to put on work queue", e);
    }
}

There are some things to watch out for with this approach, as pointed out in the comments (referring to this answer):

  1. If corePoolSize==0, then there is a race condition where all threads in the pool may die before the task is visible
  2. Using an implementation that wraps the queue tasks (not applicable to ThreadPoolExecutor) will result in issues unless the handler also wraps it the same way.

Keeping those gotchas in mind, this solution will work for most typical ThreadPoolExecutors, and will properly handle the case where corePoolSize < maxPoolSize.

Solution 4 - Java

I know this is an old question but had a similar issue that creating new tasks was very fast and if there were too many an OutOfMemoryError occur because existing task were not completed fast enough.

In my case Callables are submitted and I need the result hence I need to store all the Futures returned by executor.submit(). My solution was to put the Futures into a BlockingQueue with a maximum size. Once that queue is full, no more tasks are generated until some are completed (elements removed from queue). In pseudo-code:

final ExecutorService executor = Executors.newFixedThreadPool(numWorkerThreads);
final LinkedBlockingQueue<Future> futures = new LinkedBlockingQueue<>(maxQueueSize);
try {	
	Thread taskGenerator = new Thread() {
		@Override
		public void run() {
			while (reader.hasNext) {
				Callable task = generateTask(reader.next());
				Future future = executor.submit(task);
				try {
					// if queue is full blocks until a task
					// is completed and hence no future tasks are submitted.
					futures.put(future);
				} catch (InterruptedException ex) {
					Thread.currentThread().interrupt();			
				}
			}
		executor.shutdown();
		}
	}
	taskGenerator.start();
	
	// read from queue as long as task are being generated
	// or while Queue has elements in it
	while (taskGenerator.isAlive()
					|| !futures.isEmpty()) {
		Future future = futures.take();
		// do something
	}
} catch (InterruptedException ex) {
	Thread.currentThread().interrupt();		
} catch (ExecutionException ex) {
	throw new MyException(ex);
} finally {
	executor.shutdownNow();
}

Solution 5 - Java

How about using the CallerBlocksPolicy class if you are using spring-integration?

This class implements the RejectedExecutionHandler interface, which is a handler for tasks that cannot be executed by a ThreadPoolExecutor.

You can use this policy like this.

executor.setRejectedExecutionHandler(new CallerBlocksPolicy());

The main difference between CallerBlocksPolicy and CallerRunsPolicy is whether it blocks or runs the task in the caller thread.

Please refer to this code.

Solution 6 - Java

I had the similar problem and I implemented that by using beforeExecute/afterExecute hooks from ThreadPoolExecutor:

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * Blocks current task execution if there is not enough resources for it.
 * Maximum task count usage controlled by maxTaskCount property.
 */
public class BlockingThreadPoolExecutor extends ThreadPoolExecutor {

	private final ReentrantLock taskLock = new ReentrantLock();
	private final Condition unpaused = taskLock.newCondition();
	private final int maxTaskCount;

	private volatile int currentTaskCount;

	public BlockingThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
			long keepAliveTime, TimeUnit unit,
			BlockingQueue<Runnable> workQueue, int maxTaskCount) {
		super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
		this.maxTaskCount = maxTaskCount;
	}

	/**
	 * Executes task if there is enough system resources for it. Otherwise
	 * waits.
	 */
	@Override
	protected void beforeExecute(Thread t, Runnable r) {
		super.beforeExecute(t, r);
		taskLock.lock();
		try {
			// Spin while we will not have enough capacity for this job
			while (maxTaskCount < currentTaskCount) {
				try {
					unpaused.await();
				} catch (InterruptedException e) {
					t.interrupt();
				}
			}
			currentTaskCount++;
		} finally {
			taskLock.unlock();
		}
	}

	/**
	 * Signalling that one more task is welcome
	 */
	@Override
	protected void afterExecute(Runnable r, Throwable t) {
		super.afterExecute(r, t);
		taskLock.lock();
		try {
			currentTaskCount--;
			unpaused.signalAll();
		} finally {
			taskLock.unlock();
		}
	}
}

This should be good enough for you. Btw, original implementation was task size based because one task could be larger 100 time than another and submitting two huge tasks was killing the box, but running one big and plenty of small was Okay. If your I/O-intensive tasks are roughly the same size you could use this class, otherwise just let me know and I'll post size based implementation.

P.S. You would want to check ThreadPoolExecutor javadoc. It's really nice user guide from Doug Lea about how it could be easily customized.

Solution 7 - Java

I have implemented a solution following the decorator pattern and using a semaphore to control the number of executed tasks. You can use it with any Executor and:

  • Specify the maximum of ongoing tasks
  • Specify the maximum timeout to wait for a task execution permit (if the timeout passes and no permit is acquired, a RejectedExecutionException is thrown)

import static java.util.concurrent.TimeUnit.MILLISECONDS;

import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;

import javax.annotation.Nonnull;

public class BlockingOnFullQueueExecutorDecorator implements Executor {

	private static final class PermitReleasingDecorator implements Runnable {

		@Nonnull
		private final Runnable delegate;

		@Nonnull
		private final Semaphore semaphore;

		private PermitReleasingDecorator(@Nonnull final Runnable task, @Nonnull final Semaphore semaphoreToRelease) {
			this.delegate = task;
			this.semaphore = semaphoreToRelease;
		}

		@Override
		public void run() {
			try {
				this.delegate.run();
			}
			finally {
				// however execution goes, release permit for next task
				this.semaphore.release();
			}
		}

		@Override
		public final String toString() {
			return String.format("%s[delegate='%s']", getClass().getSimpleName(), this.delegate);
		}
	}

	@Nonnull
	private final Semaphore taskLimit;

	@Nonnull
	private final Duration timeout;

	@Nonnull
	private final Executor delegate;

	public BlockingOnFullQueueExecutorDecorator(@Nonnull final Executor executor, final int maximumTaskNumber, @Nonnull final Duration maximumTimeout) {
		this.delegate = Objects.requireNonNull(executor, "'executor' must not be null");
		if (maximumTaskNumber < 1) {
			throw new IllegalArgumentException(String.format("At least one task must be permitted, not '%d'", maximumTaskNumber));
		}
		this.timeout = Objects.requireNonNull(maximumTimeout, "'maximumTimeout' must not be null");
		if (this.timeout.isNegative()) {
			throw new IllegalArgumentException("'maximumTimeout' must not be negative");
		}
		this.taskLimit = new Semaphore(maximumTaskNumber);
	}

	@Override
	public final void execute(final Runnable command) {
		Objects.requireNonNull(command, "'command' must not be null");
		try {
			// attempt to acquire permit for task execution
			if (!this.taskLimit.tryAcquire(this.timeout.toMillis(), MILLISECONDS)) {
				throw new RejectedExecutionException(String.format("Executor '%s' busy", this.delegate));
			}
		}
		catch (final InterruptedException e) {
			// restore interrupt status
			Thread.currentThread().interrupt();
			throw new IllegalStateException(e);
		}

		this.delegate.execute(new PermitReleasingDecorator(command, this.taskLimit));
	}

	@Override
	public final String toString() {
		return String.format("%s[availablePermits='%s',timeout='%s',delegate='%s']", getClass().getSimpleName(), this.taskLimit.availablePermits(),
				this.timeout, this.delegate);
	}
}

Solution 8 - Java

I think it is as simple as using a ArrayBlockingQueue instead of a a LinkedBlockingQueue.

Ignore me... that's totally wrong. ThreadPoolExecutor calls Queue#offer not put which would have the effect you require.

You could extend ThreadPoolExecutor and provide an implementation of execute(Runnable) that calls put in place of offer.

That doesn't seem like a completely satisfactory answer I'm afraid.

Attributions

All content for this solution is sourced from the original question on Stackoverflow.

The content on this page is licensed under the Attribution-ShareAlike 4.0 International (CC BY-SA 4.0) license.

Content TypeOriginal AuthorOriginal Content on Stackoverflow
QuestionTahir AkhtarView Question on Stackoverflow
Solution 1 - JavajtahlbornView Answer on Stackoverflow
Solution 2 - JavacvaccaView Answer on Stackoverflow
Solution 3 - JavaKreaseView Answer on Stackoverflow
Solution 4 - Javabeginner_View Answer on Stackoverflow
Solution 5 - JavaprogresivoJSView Answer on Stackoverflow
Solution 6 - JavaPetro SemeniukView Answer on Stackoverflow
Solution 7 - JavaGrzegorz LehmannView Answer on Stackoverflow
Solution 8 - JavaGareth DavisView Answer on Stackoverflow