Waiting on a list of Future
JavaMultithreadingFutureJava Problem Overview
I have a method which returns a List
of futures
List<Future<O>> futures = getFutures();
Now I want to wait until either all futures are done processing successfully or any of the tasks whose output is returned by a future throws an exception. Even if one task throws an exception, there is no point in waiting for the other futures.
Simple approach would be to
wait() {
For(Future f : futures) {
try {
f.get();
} catch(Exception e) {
//TODO catch specific exception
// this future threw exception , means somone could not do its task
return;
}
}
}
But the problem here is if, for example, the 4th future throws an exception, then I will wait unnecessarily for the first 3 futures to be available.
How to solve this? Will count down latch help in any way? I'm unable to use Future isDone
because the java doc says
boolean isDone()
Returns true if this task completed. Completion may be due to normal termination, an exception, or cancellation -- in all of these cases, this method will return true.
Java Solutions
Solution 1 - Java
You can use a CompletionService to receive the futures as soon as they are ready and if one of them throws an exception cancel the processing. Something like this:
Executor executor = Executors.newFixedThreadPool(4);
CompletionService<SomeResult> completionService =
new ExecutorCompletionService<SomeResult>(executor);
//4 tasks
for(int i = 0; i < 4; i++) {
completionService.submit(new Callable<SomeResult>() {
public SomeResult call() {
...
return result;
}
});
}
int received = 0;
boolean errors = false;
while(received < 4 && !errors) {
Future<SomeResult> resultFuture = completionService.take(); //blocks if none available
try {
SomeResult result = resultFuture.get();
received ++;
... // do something with the result
}
catch(Exception e) {
//log
errors = true;
}
}
I think you can further improve to cancel any still executing tasks if one of them throws an error.
Solution 2 - Java
If you are using Java 8 then you can do this easier with CompletableFuture and CompletableFuture.allOf, which applies the callback only after all supplied CompletableFutures are done.
// Waits for *all* futures to complete and returns a list of results.
// If *any* future completes exceptionally then the resulting future will also complete exceptionally.
public static <T> CompletableFuture<List<T>> all(List<CompletableFuture<T>> futures) {
CompletableFuture[] cfs = futures.toArray(new CompletableFuture[futures.size()]);
return CompletableFuture.allOf(cfs)
.thenApply(ignored -> futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList())
);
}
Solution 3 - Java
Use a CompletableFuture
in Java 8
// Kick of multiple, asynchronous lookups
CompletableFuture<User> page1 = gitHubLookupService.findUser("Test1");
CompletableFuture<User> page2 = gitHubLookupService.findUser("Test2");
CompletableFuture<User> page3 = gitHubLookupService.findUser("Test3");
// Wait until they are all done
CompletableFuture.allOf(page1,page2,page3).join();
logger.info("--> " + page1.get());
Solution 4 - Java
You can use an ExecutorCompletionService. The documentation even has an example for your exact use-case:
> Suppose instead that you would like to use the first non-null result of the set of tasks, ignoring any that encounter exceptions, and cancelling all other tasks when the first one is ready:
void solve(Executor e, Collection<Callable<Result>> solvers) throws InterruptedException {
CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e);
int n = solvers.size();
List<Future<Result>> futures = new ArrayList<Future<Result>>(n);
Result result = null;
try {
for (Callable<Result> s : solvers)
futures.add(ecs.submit(s));
for (int i = 0; i < n; ++i) {
try {
Result r = ecs.take().get();
if (r != null) {
result = r;
break;
}
} catch (ExecutionException ignore) {
}
}
} finally {
for (Future<Result> f : futures)
f.cancel(true);
}
if (result != null)
use(result);
}
The important thing to notice here is that ecs.take() will get the first completed task, not just the first submitted one. Thus you should get them in the order of finishing the execution (or throwing an exception).
Solution 5 - Java
If you are using Java 8 and don't want to manipulate CompletableFuture
s, I have written a tool to retrieve results for a List<Future<T>>
using streaming. The key is that you are forbidden to map(Future::get)
as it throws.
public final class Futures
{
private Futures()
{}
public static <E> Collector<Future<E>, Collection<E>, List<E>> present()
{
return new FutureCollector<>();
}
private static class FutureCollector<T> implements Collector<Future<T>, Collection<T>, List<T>>
{
private final List<Throwable> exceptions = new LinkedList<>();
@Override
public Supplier<Collection<T>> supplier()
{
return LinkedList::new;
}
@Override
public BiConsumer<Collection<T>, Future<T>> accumulator()
{
return (r, f) -> {
try
{
r.add(f.get());
}
catch (InterruptedException e)
{}
catch (ExecutionException e)
{
exceptions.add(e.getCause());
}
};
}
@Override
public BinaryOperator<Collection<T>> combiner()
{
return (l1, l2) -> {
l1.addAll(l2);
return l1;
};
}
@Override
public Function<Collection<T>, List<T>> finisher()
{
return l -> {
List<T> ret = new ArrayList<>(l);
if (!exceptions.isEmpty())
throw new AggregateException(exceptions, ret);
return ret;
};
}
@Override
public Set<java.util.stream.Collector.Characteristics> characteristics()
{
return java.util.Collections.emptySet();
}
}
This needs an AggregateException
that works like C#'s
public class AggregateException extends RuntimeException
{
/**
*
*/
private static final long serialVersionUID = -4477649337710077094L;
private final List<Throwable> causes;
private List<?> successfulElements;
public AggregateException(List<Throwable> causes, List<?> l)
{
this.causes = causes;
successfulElements = l;
}
public AggregateException(List<Throwable> causes)
{
this.causes = causes;
}
@Override
public synchronized Throwable getCause()
{
return this;
}
public List<Throwable> getCauses()
{
return causes;
}
public List<?> getSuccessfulElements()
{
return successfulElements;
}
public void setSuccessfulElements(List<?> successfulElements)
{
this.successfulElements = successfulElements;
}
}
This component acts exactly as C#'s Task.WaitAll. I am working on a variant that does the same as CompletableFuture.allOf
(equivalento to Task.WhenAll
)
The reason why I did this is that I am using Spring's ListenableFuture
and don't want to port to CompletableFuture
despite it is a more standard way
Solution 6 - Java
In case that you want combine a List of CompletableFutures, you can do this :
List<CompletableFuture<Void>> futures = new ArrayList<>();
// ... Add futures to this ArrayList of CompletableFutures
// CompletableFuture.allOf() method demand a variadic arguments
// You can use this syntax to pass a List instead
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[futures.size()]));
// Wait for all individual CompletableFuture to complete
// All individual CompletableFutures are executed in parallel
allFutures.get();
For more details on Future & CompletableFuture, useful links:
- Future: https://www.baeldung.com/java-future
- CompletableFuture: https://www.baeldung.com/java-completablefuture
- CompletableFuture: https://www.callicoder.com/java-8-completablefuture-tutorial/
Solution 7 - Java
I've got a utility class that contains these:
@FunctionalInterface
public interface CheckedSupplier<X> {
X get() throws Throwable;
}
public static <X> Supplier<X> uncheckedSupplier(final CheckedSupplier<X> supplier) {
return () -> {
try {
return supplier.get();
} catch (final Throwable checkedException) {
throw new IllegalStateException(checkedException);
}
};
}
Once you have that, using a static import, you can simple wait for all futures like this:
futures.stream().forEach(future -> uncheckedSupplier(future::get).get());
you can also collect all their results like this:
List<MyResultType> results = futures.stream()
.map(future -> uncheckedSupplier(future::get).get())
.collect(Collectors.toList());
Just revisiting my old post and noticing that you had another grief:
> But the problem here is if, for example, the 4th future throws an exception, then I will wait unnecessarily for the first 3 futures to be available.
In this case, the simple solution is to do this in parallel:
futures.stream().parallel()
.forEach(future -> uncheckedSupplier(future::get).get());
This way the first exception, although it will not stop the future, will break the forEach-statement, like in the serial example, but since all wait in parallel, you won't have to wait for the first 3 to complete.
Solution 8 - Java
maybe this would help (nothing would replaced with raw thread, yeah!)
I suggest run each Future
guy with a separated thread (they goes parallel), then when ever one of the got error, it just signal the manager(Handler
class).
class Handler{
//...
private Thread thisThread;
private boolean failed=false;
private Thread[] trds;
public void waitFor(){
thisThread=Thread.currentThread();
List<Future<Object>> futures = getFutures();
trds=new Thread[futures.size()];
for (int i = 0; i < trds.length; i++) {
RunTask rt=new RunTask(futures.get(i), this);
trds[i]=new Thread(rt);
}
synchronized (this) {
for(Thread tx:trds){
tx.start();
}
}
for(Thread tx:trds){
try {tx.join();
} catch (InterruptedException e) {
System.out.println("Job failed!");break;
}
}if(!failed){System.out.println("Job Done");}
}
private List<Future<Object>> getFutures() {
return null;
}
public synchronized void cancelOther(){if(failed){return;}
failed=true;
for(Thread tx:trds){
tx.stop();//Deprecated but works here like a boss
}thisThread.interrupt();
}
//...
}
class RunTask implements Runnable{
private Future f;private Handler h;
public RunTask(Future f,Handler h){this.f=f;this.h=h;}
public void run(){
try{
f.get();//beware about state of working, the stop() method throws ThreadDeath Error at any thread state (unless it blocked by some operation)
}catch(Exception e){System.out.println("Error, stopping other guys...");h.cancelOther();}
catch(Throwable t){System.out.println("Oops, some other guy has stopped working...");}
}
}
I have to say the above code would error(didn't check), but I hope I could explain the solution. please have a try.
Solution 9 - Java
The CompletionService will take your Callables with the .submit() method and you can retrieve the computed futures with the .take() method.
One thing you must not forget is to terminate the ExecutorService by calling the .shutdown() method. Also you can only call this method when you have saved a reference to the executor service so make sure to keep one.
Example code - For a fixed number of work items to be worked on in parallel:
ExecutorService service = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
CompletionService<YourCallableImplementor> completionService =
new ExecutorCompletionService<YourCallableImplementor>(service);
ArrayList<Future<YourCallableImplementor>> futures = new ArrayList<Future<YourCallableImplementor>>();
for (String computeMe : elementsToCompute) {
futures.add(completionService.submit(new YourCallableImplementor(computeMe)));
}
//now retrieve the futures after computation (auto wait for it)
int received = 0;
while(received < elementsToCompute.size()) {
Future<YourCallableImplementor> resultFuture = completionService.take();
YourCallableImplementor result = resultFuture.get();
received ++;
}
//important: shutdown your ExecutorService
service.shutdown();
Example code - For a dynamic number of work items to be worked on in parallel:
public void runIt(){
ExecutorService service = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
CompletionService<CallableImplementor> completionService = new ExecutorCompletionService<CallableImplementor>(service);
ArrayList<Future<CallableImplementor>> futures = new ArrayList<Future<CallableImplementor>>();
//Initial workload is 8 threads
for (int i = 0; i < 9; i++) {
futures.add(completionService.submit(write.new CallableImplementor()));
}
boolean finished = false;
while (!finished) {
try {
Future<CallableImplementor> resultFuture;
resultFuture = completionService.take();
CallableImplementor result = resultFuture.get();
finished = doSomethingWith(result.getResult());
result.setResult(null);
result = null;
resultFuture = null;
//After work package has been finished create new work package and add it to futures
futures.add(completionService.submit(write.new CallableImplementor()));
} catch (InterruptedException | ExecutionException e) {
//handle interrupted and assert correct thread / work packet count
}
}
//important: shutdown your ExecutorService
service.shutdown();
}
public class CallableImplementor implements Callable{
boolean result;
@Override
public CallableImplementor call() throws Exception {
//business logic goes here
return this;
}
public boolean getResult() {
return result;
}
public void setResult(boolean result) {
this.result = result;
}
}
Solution 10 - Java
/**
* execute suppliers as future tasks then wait / join for getting results
* @param functors a supplier(s) to execute
* @return a list of results
*/
private List getResultsInFuture(Supplier<?>... functors) {
CompletableFuture[] futures = stream(functors)
.map(CompletableFuture::supplyAsync)
.collect(Collectors.toList())
.toArray(new CompletableFuture[functors.length]);
CompletableFuture.allOf(futures).join();
return stream(futures).map(a-> {
try {
return a.get();
} catch (InterruptedException | ExecutionException e) {
//logger.error("an error occurred during runtime execution a function",e);
return null;
}
}).collect(Collectors.toList());
};
Solution 11 - Java
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class Stack2 {
public static void waitFor(List<Future<?>> futures) {
List<Future<?>> futureCopies = new ArrayList<Future<?>>(futures);//contains features for which status has not been completed
while (!futureCopies.isEmpty()) {//worst case :all task worked without exception, then this method should wait for all tasks
Iterator<Future<?>> futureCopiesIterator = futureCopies.iterator();
while (futureCopiesIterator.hasNext()) {
Future<?> future = futureCopiesIterator.next();
if (future.isDone()) {//already done
futureCopiesIterator.remove();
try {
future.get();// no longer waiting
} catch (InterruptedException e) {
//ignore
//only happen when current Thread interrupted
} catch (ExecutionException e) {
Throwable throwable = e.getCause();// real cause of exception
futureCopies.forEach(f -> f.cancel(true));//cancel other tasks that not completed
return;
}
}
}
}
}
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(3);
Runnable runnable1 = new Runnable (){
public void run(){
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
}
}
};
Runnable runnable2 = new Runnable (){
public void run(){
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
}
}
};
Runnable fail = new Runnable (){
public void run(){
try {
Thread.sleep(1000);
throw new RuntimeException("bla bla bla");
} catch (InterruptedException e) {
}
}
};
List<Future<?>> futures = Stream.of(runnable1,fail,runnable2)
.map(executorService::submit)
.collect(Collectors.toList());
double start = System.nanoTime();
waitFor(futures);
double end = (System.nanoTime()-start)/1e9;
System.out.println(end +" seconds");
}
}