How to interrupt a BlockingQueue which is blocking on take()?

JavaConcurrencyBlockingqueue

Java Problem Overview


I have a class that takes objects from a BlockingQueue and processes them by calling take() in a continuous loop. At some point I know that no more objects will be added to the queue. How do I interrupt the take() method so that it stops blocking?

Here's the class that processes the objects:

public class MyObjHandler implements Runnable {

  private final BlockingQueue<MyObj> queue;
  
  public class MyObjHandler(BlockingQueue queue) {
    this.queue = queue;
  }
 
  public void run() {
    try {
      while (true) {
        MyObj obj = queue.take();
        // process obj here
        // ...
      }
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
    }
  }
}

And here's the method that uses this class to process objects:

public void testHandler() {

  BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100);  

  MyObjectHandler  handler = new MyObjectHandler(queue);
  new Thread(handler).start();

  // get objects for handler to process
  for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); ) {
    queue.put(i.next());
  }

  // what code should go here to tell the handler
  // to stop waiting for more objects?
}

Java Solutions


Solution 1 - Java

If interrupting the thread is not an option, another is to place a "marker" or "command" object on the queue that would be recognized as such by MyObjHandler and break out of the loop.

Solution 2 - Java

BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100);
MyObjectHandler handler = new MyObjectHandler(queue);
Thread thread = new Thread(handler);
thread.start();
for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); ) {
  queue.put(i.next());
}
thread.interrupt();

However, if you do this, the thread might be interrupted while there are still items in the queue, waiting to be processed. You might want to consider using poll instead of take, which will allow the processing thread to timeout and terminate when it has waited for a while with no new input.

Solution 3 - Java

Very late but Hope this helps other too as I faced the similar problem and used the poll approach suggested by erickson above with some minor changes,

class MyObjHandler implements Runnable 
{
	private final BlockingQueue<MyObj> queue;
	public volatile boolean Finished;  //VOLATILE GUARANTEES UPDATED VALUE VISIBLE TO ALL
	public MyObjHandler(BlockingQueue queue) 
	{
		this.queue = queue;
		Finished = false;
	}
	@Override
	public void run() 
	{        
		while (true) 
		{
			try 
			{
				MyObj obj = queue.poll(100, TimeUnit.MILLISECONDS);
                if(obj!= null)//Checking if job is to be processed then processing it first and then checking for return
				{
					// process obj here
					// ...
				}
				if(Finished && queue.isEmpty())
					return;
				
			} 
			catch (InterruptedException e) 
			{					
				return;
			}
		}
	}
}

public void testHandler() 
{
	BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100); 

	MyObjHandler  handler = new MyObjHandler(queue);
	new Thread(handler).start();

	// get objects for handler to process
	for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); )
	{
		queue.put(i.next());
	}

	// what code should go here to tell the handler to stop waiting for more objects?
	handler.Finished = true; //THIS TELLS HIM
	//If you need you can wait for the termination otherwise remove join
	myThread.join();
}

This solved both the problems

  1. Flagged the BlockingQueue so that it knows it has not to wait more for elements
  2. Did not interrupted in between so that processing blocks terminates only when all the items in queue are processed and there are no items remaining to be added

Solution 4 - Java

Or don't interrupt, its nasty.

    public class MyQueue<T> extends ArrayBlockingQueue<T> {
    
    	private static final long serialVersionUID = 1L;
    	private boolean done = false;
    	
    	public ParserQueue(int capacity) {	super(capacity); }
    	
    	public void done() { done = true; }
    	
    	public boolean isDone() { return done; }
    	
    	/**
    	 * May return null if producer ends the production after consumer 
    	 * has entered the element-await state.
    	 */
    	public T take() throws InterruptedException {
    		T el;
    		while ((el = super.poll()) == null && !done) {
    			synchronized (this) {
    				wait();
    			}
    		}
    		
    		return el;
    	}
    }
  1. when producer puts object to the queue, call queue.notify(), if it ends, call queue.done()
  2. loop while (!queue.isDone() || !queue.isEmpty())
  3. test take() return value for null

Solution 5 - Java

Interrupt the thread:

thread.interrupt()

Attributions

All content for this solution is sourced from the original question on Stackoverflow.

The content on this page is licensed under the Attribution-ShareAlike 4.0 International (CC BY-SA 4.0) license.

Content TypeOriginal AuthorOriginal Content on Stackoverflow
QuestionMCSView Question on Stackoverflow
Solution 1 - JavaChris ThornhillView Answer on Stackoverflow
Solution 2 - JavaericksonView Answer on Stackoverflow
Solution 3 - JavadbwView Answer on Stackoverflow
Solution 4 - JavatomasbView Answer on Stackoverflow
Solution 5 - JavastepanchegView Answer on Stackoverflow