Java executors: how to be notified, without blocking, when a task completes?

JavaCallbackNotifyExecutor

Java Problem Overview


Say I have a queue full of tasks which I need to submit to an executor service. I want them processed one at a time. The simplest way I can think of is to:

  1. Take a task from the queue
  2. Submit it to the executor
  3. Call .get on the returned Future and block until a result is available
  4. Take another task from the queue...

However, I am trying to avoid blocking completely. If I have 10,000 such queues, which need their tasks processed one at a time, I'll run out of stack space because most of them will be holding on to blocked threads.

What I would like is to submit a task and provide a call-back which is called when the task is complete. I'll use that call-back notification as a flag to send the next task. (functionaljava and jetlang apparently use such non-blocking algorithms, but I can't understand their code)

How can I do that using JDK's java.util.concurrent, short of writing my own executor service?

(the queue which feeds me these tasks may itself block, but that is an issue to be tackled later)

Java Solutions


Solution 1 - Java

Define a callback interface to receive whatever parameters you want to pass along in the completion notification. Then invoke it at the end of the task.

You could even write a general wrapper for Runnable tasks, and submit these to ExecutorService. Or, see below for a mechanism built into Java 8.

class CallbackTask implements Runnable {

  private final Runnable task;

  private final Callback callback;

  CallbackTask(Runnable task, Callback callback) {
    this.task = task;
    this.callback = callback;
  }

  public void run() {
    task.run();
    callback.complete();
  }

}

With CompletableFuture, Java 8 included a more elaborate means to compose pipelines where processes can be completed asynchronously and conditionally. Here's a contrived but complete example of notification.

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

public class GetTaskNotificationWithoutBlocking {

  public static void main(String... argv) throws Exception {
    ExampleService svc = new ExampleService();
    GetTaskNotificationWithoutBlocking listener = new GetTaskNotificationWithoutBlocking();
    CompletableFuture<String> f = CompletableFuture.supplyAsync(svc::work);
    f.thenAccept(listener::notify);
    System.out.println("Exiting main()");
  }

  void notify(String msg) {
    System.out.println("Received message: " + msg);
  }

}

class ExampleService {
    
  String work() {
    sleep(7000, TimeUnit.MILLISECONDS); /* Pretend to be busy... */
    char[] str = new char[5];
    ThreadLocalRandom current = ThreadLocalRandom.current();
    for (int idx = 0; idx < str.length; ++idx)
      str[idx] = (char) ('A' + current.nextInt(26));
    String msg = new String(str);
    System.out.println("Generated message: " + msg);
    return msg;
  }

  public static void sleep(long average, TimeUnit unit) {
    String name = Thread.currentThread().getName();
    long timeout = Math.min(exponential(average), Math.multiplyExact(10, average));
    System.out.printf("%s sleeping %d %s...%n", name, timeout, unit);
    try {
      unit.sleep(timeout);
      System.out.println(name + " awoke.");
    } catch (InterruptedException abort) {
      Thread.currentThread().interrupt();
      System.out.println(name + " interrupted.");
    }
  }

  public static long exponential(long avg) {
    return (long) (avg * -Math.log(1 - ThreadLocalRandom.current().nextDouble()));
  }

}

Solution 2 - Java

In Java 8 you can use CompletableFuture. Here's an example I had in my code where I'm using it to fetch users from my user service, map them to my view objects and then update my view or show an error dialog (this is a GUI application):

    CompletableFuture.supplyAsync(
            userService::listUsers
    ).thenApply(
            this::mapUsersToUserViews
    ).thenAccept(
            this::updateView
    ).exceptionally(
            throwable -> { showErrorDialogFor(throwable); return null; }
    );

It executes asynchronously. I'm using two private methods: mapUsersToUserViews and updateView.

Solution 3 - Java

Use Guava's listenable future API and add a callback. Cf. from the website :

ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
ListenableFuture<Explosion> explosion = service.submit(new Callable<Explosion>() {
  public Explosion call() {
    return pushBigRedButton();
  }
});
Futures.addCallback(explosion, new FutureCallback<Explosion>() {
  // we want this handler to run immediately after we push the big red button!
  public void onSuccess(Explosion explosion) {
    walkAwayFrom(explosion);
  }
  public void onFailure(Throwable thrown) {
    battleArchNemesis(); // escaped the explosion!
  }
});

Solution 4 - Java

You could extend FutureTask class, and override the done() method, then add the FutureTask object to the ExecutorService, so the done() method will invoke when the FutureTask completed immediately.

Solution 5 - Java

ThreadPoolExecutor also has beforeExecute and afterExecute hook methods that you can override and make use of. Here is the description from ThreadPoolExecutor's Javadocs.

>###Hook methods This class provides protected overridable beforeExecute(java.lang.Thread, java.lang.Runnable) and afterExecute(java.lang.Runnable, java.lang.Throwable) methods that are called before and after execution of each task. These can be used to manipulate the execution environment; for example, reinitializing ThreadLocals, gathering statistics, or adding log entries. Additionally, method terminated() can be overridden to perform any special processing that needs to be done once the Executor has fully terminated. If hook or callback methods throw exceptions, internal worker threads may in turn fail and abruptly terminate.

Solution 6 - Java

Use a CountDownLatch.

It's from java.util.concurrent and it's exactly the way to wait for several threads to complete execution before continuing.

In order to achieve the callback effect you're looking after, that does require a little additional extra work. Namely, handling this by yourself in a separate thread which uses the CountDownLatch and does wait on it, then goes on about notifying whatever it is you need to notify. There is no native support for callback, or anything similar to that effect.


EDIT: now that I further understand your question, I think you are reaching too far, unnecessarily. If you take a regular SingleThreadExecutor, give it all the tasks, and it will do the queueing natively.

Solution 7 - Java

If you want to make sure that no tasks will run at the same time then use a SingleThreadedExecutor. The tasks will be processed in the order the are submitted. You don't even need to hold the tasks, just submit them to the exec.

Solution 8 - Java

This is an extension to Pache's answer using Guava's ListenableFuture.

In particular, Futures.transform() returns ListenableFuture so can be used to chain async calls. Futures.addCallback() returns void, so cannot be used for chaining, but is good for handling success/failure on an async completion.

// ListenableFuture1: Open Database
ListenableFuture<Database> database = service.submit(() -> openDatabase());

// ListenableFuture2: Query Database for Cursor rows
ListenableFuture<Cursor> cursor =
    Futures.transform(database, database -> database.query(table, ...));

// ListenableFuture3: Convert Cursor rows to List<Foo>
ListenableFuture<List<Foo>> fooList =
    Futures.transform(cursor, cursor -> cursorToFooList(cursor));

// Final Callback: Handle the success/errors when final future completes
Futures.addCallback(fooList, new FutureCallback<List<Foo>>() {
  public void onSuccess(List<Foo> foos) {
    doSomethingWith(foos);
  }
  public void onFailure(Throwable thrown) {
    log.error(thrown);
  }
});

NOTE: In addition to chaining async tasks, Futures.transform() also allows you to schedule each task on a separate executor (Not shown in this example).

Solution 9 - Java

Simple code to implement Callback mechanism using ExecutorService

import java.util.concurrent.*;
import java.util.*;

public class CallBackDemo{
	public CallBackDemo(){
		System.out.println("creating service");
		ExecutorService service = Executors.newFixedThreadPool(5);
	
		try{
			for ( int i=0; i<5; i++){
				Callback callback = new Callback(i+1);
				MyCallable myCallable = new MyCallable((long)i+1,callback);
				Future<Long> future = service.submit(myCallable);
				//System.out.println("future status:"+future.get()+":"+future.isDone());
			}
		}catch(Exception err){
			err.printStackTrace();
		}
		service.shutdown();
	}
	public static void main(String args[]){
		CallBackDemo demo = new CallBackDemo();
	}
}
class MyCallable implements Callable<Long>{
	Long id = 0L;
	Callback callback;
	public MyCallable(Long val,Callback obj){
		this.id = val;
		this.callback = obj;
	}
	public Long call(){
		//Add your business logic
		System.out.println("Callable:"+id+":"+Thread.currentThread().getName());
		callback.callbackMethod();
		return id;
	}
}
class Callback {
	private int i;
	public Callback(int i){
		this.i = i;
	}
	public void callbackMethod(){
		System.out.println("Call back:"+i);
		// Add your business logic
	}
}

output:

creating service
Callable:1:pool-1-thread-1
Call back:1
Callable:3:pool-1-thread-3
Callable:2:pool-1-thread-2
Call back:2
Callable:5:pool-1-thread-5
Call back:5
Call back:3
Callable:4:pool-1-thread-4
Call back:4

Key notes:

  1. If you want process tasks in sequence in FIFO order, replace newFixedThreadPool(5) with newFixedThreadPool(1)

  2. If you want to process next task after analysing the result from callback of previous task,just un-comment below line

     //System.out.println("future status:"+future.get()+":"+future.isDone());
    
  3. You can replace newFixedThreadPool() with one of

     Executors.newCachedThreadPool()
     Executors.newWorkStealingPool()
     ThreadPoolExecutor
    

    depending on your use case.

  4. If you want to handle callback method asynchronously

    a. Pass a shared `ExecutorService or  ThreadPoolExecutor` to Callable task
    
    b. Convert your `Callable` method to `Callable/Runnable` task
    
    c. Push callback task to  `ExecutorService or  ThreadPoolExecutor`
    

Solution 10 - Java

Just to add to Matt's answer, which helped, here is a more fleshed-out example to show the use of a callback.

private static Primes primes = new Primes();

public static void main(String[] args) throws InterruptedException {
    getPrimeAsync((p) ->
        System.out.println("onPrimeListener; p=" + p));

    System.out.println("Adios mi amigito");
}
public interface OnPrimeListener {
    void onPrime(int prime);
}
public static void getPrimeAsync(OnPrimeListener listener) {
    CompletableFuture.supplyAsync(primes::getNextPrime)
        .thenApply((prime) -> {
            System.out.println("getPrimeAsync(); prime=" + prime);
            if (listener != null) {
                listener.onPrime(prime);
            }
            return prime;
        });
}

The output is:

    getPrimeAsync(); prime=241
    onPrimeListener; p=241
    Adios mi amigito

Solution 11 - Java

You may use a implementation of Callable such that

public class MyAsyncCallable<V> implements Callable<V> {
	
	CallbackInterface ci;
	
	public MyAsyncCallable(CallbackInterface ci) {
		this.ci = ci;
	}

	public V call() throws Exception {
		
		System.out.println("Call of MyCallable invoked");
		System.out.println("Result = " + this.ci.doSomething(10, 20));
		return (V) "Good job";
	}
}

where CallbackInterface is something very basic like

public interface CallbackInterface {
	public int doSomething(int a, int b);
}

and now the main class will look like this

ExecutorService ex = Executors.newFixedThreadPool(2);
		
MyAsyncCallable<String> mac = new MyAsyncCallable<String>((a, b) -> a + b);
ex.submit(mac);

Attributions

All content for this solution is sourced from the original question on Stackoverflow.

The content on this page is licensed under the Attribution-ShareAlike 4.0 International (CC BY-SA 4.0) license.

Content TypeOriginal AuthorOriginal Content on Stackoverflow
QuestionShahbazView Question on Stackoverflow
Solution 1 - JavaericksonView Answer on Stackoverflow
Solution 2 - JavaMattView Answer on Stackoverflow
Solution 3 - JavaPierre-HenriView Answer on Stackoverflow
Solution 4 - JavaAugusteView Answer on Stackoverflow
Solution 5 - JavaCem CatikkasView Answer on Stackoverflow
Solution 6 - JavaYuval AdamView Answer on Stackoverflow
Solution 7 - JavabasszeroView Answer on Stackoverflow
Solution 8 - JavabcorsoView Answer on Stackoverflow
Solution 9 - JavaRavindra babuView Answer on Stackoverflow
Solution 10 - JavaOld JackView Answer on Stackoverflow
Solution 11 - JavaDeepika AnandView Answer on Stackoverflow