How to wait for all threads to finish, using ExecutorService?

JavaMultithreadingConcurrencyParallel ProcessingExecutorservice

Java Problem Overview


I need to execute some amount of tasks 4 at a time, something like this:

ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
	taskExecutor.execute(new MyTask());
}
//...wait for completion somehow

How can I get notified once all of them are complete? For now I can't think about anything better than setting some global task counter and decrease it at the end of every task, then monitor in infinite loop this counter to become 0; or get a list of Futures and in infinite loop monitor isDone for all of them. What are better solutions not involving infinite loops?

Thanks.

Java Solutions


Solution 1 - Java

Basically on an ExecutorService you call shutdown() and then awaitTermination():

ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
  taskExecutor.execute(new MyTask());
}
taskExecutor.shutdown();
try {
  taskExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
  ...
}

Solution 2 - Java

Use a http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CountDownLatch.html">CountDownLatch</a>;:

CountDownLatch latch = new CountDownLatch(totalNumberOfTasks);
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
  taskExecutor.execute(new MyTask());
}

try {
  latch.await();
} catch (InterruptedException E) {
   // handle
}

and within your task (enclose in try / finally)

latch.countDown();

Solution 3 - Java

ExecutorService.invokeAll() does it for you.

ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
List<Callable<?>> tasks; // your tasks
// invokeAll() returns when all tasks are complete
List<Future<?>> futures = taskExecutor.invokeAll(tasks);

Solution 4 - Java

You can use Lists of Futures, as well:

List<Future> futures = new ArrayList<Future>();
// now add to it:
futures.add(executorInstance.submit(new Callable<Void>() {
  public Void call() throws IOException {
     // do something
    return null;
  }
}));

then when you want to join on all of them, its essentially the equivalent of joining on each, (with the added benefit that it re-raises exceptions from child threads to the main):

for(Future f: this.futures) { f.get(); }

Basically the trick is to call .get() on each Future one at a time, instead of infinite looping calling isDone() on (all or each). So you're guaranteed to "move on" through and past this block as soon as the last thread finishes. The caveat is that since the .get() call re-raises exceptions, if one of the threads dies, you would raise from this possibly before the other threads have finished to completion [to avoid this, you could add a catch ExecutionException around the get call]. The other caveat is it keeps a reference to all threads so if they have thread local variables they won't get collected till after you get past this block (though you might be able to get around this, if it became a problem, by removing Future's off the ArrayList). If you wanted to know which Future "finishes first" you could use some something like https://stackoverflow.com/a/31885029/32453

Solution 5 - Java

In Java8 you can do it with CompletableFuture:

ExecutorService es = Executors.newFixedThreadPool(4);
List<Runnable> tasks = getTasks();
CompletableFuture<?>[] futures = tasks.stream()
                               .map(task -> CompletableFuture.runAsync(task, es))
                               .toArray(CompletableFuture[]::new);
CompletableFuture.allOf(futures).join();    
es.shutdown();

Solution 6 - Java

Just my two cents. To overcome the requirement of CountDownLatch to know the number of tasks beforehand, you could do it the old fashion way by using a simple Semaphore.

ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
int numberOfTasks=0;
Semaphore s=new Semaphore(0);
while(...) {
    taskExecutor.execute(new MyTask());
    numberOfTasks++;
}

try {
    s.aquire(numberOfTasks);
...

In your task just call s.release() as you would latch.countDown();

Solution 7 - Java

A bit late to the game but for the sake of completion...

Instead of 'waiting' for all tasks to finish, you can think in terms of the Hollywood principle, "don't call me, I'll call you" - when I'm finished. I think the resulting code is more elegant...

Guava offers some interesting tools to accomplish this.

An example:

Wrap an ExecutorService into a ListeningExecutorService:

ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));

Submit a collection of callables for execution ::

for (Callable<Integer> callable : callables) {
  ListenableFuture<Integer> lf = service.submit(callable);
  // listenableFutures is a collection
  listenableFutures.add(lf)
});

Now the essential part:

ListenableFuture<List<Integer>> lf = Futures.successfulAsList(listenableFutures);

Attach a callback to the ListenableFuture, that you can use to be notified when all futures complete:

Futures.addCallback(lf, new FutureCallback<List<Integer>> () {
    @Override
    public void onSuccess(List<Integer> result) {
	    // do something with all the results
    }

    @Override
    public void onFailure(Throwable t) {
	    // log failure
    }
});

This also offers the advantage that you can collect all the results in one place once the processing is finished...

More information here

Solution 8 - Java

The http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CyclicBarrier.html">CyclicBarrier</a> class in Java 5 and later is designed for this sort of thing.

Solution 9 - Java

here is two options , just bit confuse which one is best to go.

Option 1:

ExecutorService es = Executors.newFixedThreadPool(4);
List<Runnable> tasks = getTasks();
CompletableFuture<?>[] futures = tasks.stream()
                               .map(task -> CompletableFuture.runAsync(task, es))
                               .toArray(CompletableFuture[]::new);
CompletableFuture.allOf(futures).join();    
es.shutdown();

Option 2:

ExecutorService es = Executors.newFixedThreadPool(4);
List< Future<?>> futures = new ArrayList<>();
for(Runnable task : taskList) {
	futures.add(es.submit(task));
}
	
for(Future<?> future : futures) {
	try {
		future.get();
	}catch(Exception e){
		// do logging and nothing else
	}
}
es.shutdown();

Here putting future.get(); in try catch is good idea right?

Solution 10 - Java

Follow one of below approaches.

  1. Iterate through all Future tasks, returned from submit on ExecutorService and check the status with blocking call get() on Future object as suggested by Kiran
  2. Use invokeAll() on ExecutorService
  3. CountDownLatch
  4. ForkJoinPool or Executors.html#newWorkStealingPool
  5. Use shutdown, awaitTermination, shutdownNow APIs of ThreadPoolExecutor in proper sequence

Related SE questions:

https://stackoverflow.com/questions/17827022/how-is-countdownlatch-used-in-java-multithreading/37163361#37163361

https://stackoverflow.com/questions/36644043/how-to-forcefully-shutdown-java-executorservice/36644320#36644320

Solution 11 - Java

You could wrap your tasks in another runnable, that will send notifications:

taskExecutor.execute(new Runnable() {
  public void run() {
    taskStartedNotification();
    new MyTask().run();
    taskFinishedNotification();
  }
});

Solution 12 - Java

I've just written a sample program that solves your problem. There was no concise implementation given, so I'll add one. While you can use executor.shutdown() and executor.awaitTermination(), it is not the best practice as the time taken by different threads would be unpredictable.

ExecutorService es = Executors.newCachedThreadPool();
	List<Callable<Integer>> tasks = new ArrayList<>();

	for (int j = 1; j <= 10; j++) {
		tasks.add(new Callable<Integer>() {

			@Override
			public Integer call() throws Exception {
				int sum = 0;
				System.out.println("Starting Thread "
						+ Thread.currentThread().getId());

				for (int i = 0; i < 1000000; i++) {
					sum += i;
				}

				System.out.println("Stopping Thread "
						+ Thread.currentThread().getId());
				return sum;
			}

		});
	}

	try {
		List<Future<Integer>> futures = es.invokeAll(tasks);
		int flag = 0;

		for (Future<Integer> f : futures) {
			Integer res = f.get();
			System.out.println("Sum: " + res);
			if (!f.isDone()) 
				flag = 1;
		}

		if (flag == 0)
			System.out.println("SUCCESS");
		else
			System.out.println("FAILED");

	} catch (InterruptedException | ExecutionException e) {
		e.printStackTrace();
	}

Solution 13 - Java

Just to provide more alternatives here different to use latch/barriers. You can also get the partial results until all of them finish using CompletionService.

From Java Concurrency in practice: "If you have a batch of computations to submit to an Executor and you want to retrieve their results as they become available, you could retain the Future associated with each task and repeatedly poll for completion by calling get with a timeout of zero. This is possible, but tedious. Fortunately there is a better way: a completion service."

Here the implementation

public class TaskSubmiter {
	private final ExecutorService executor;
	TaskSubmiter(ExecutorService executor) { this.executor = executor; }
	void doSomethingLarge(AnySourceClass source) {
		final List<InterestedResult> info = doPartialAsyncProcess(source);
		CompletionService<PartialResult> completionService = new ExecutorCompletionService<PartialResult>(executor);
		for (final InterestedResult interestedResultItem : info)
			completionService.submit(new Callable<PartialResult>() {
				public PartialResult call() {
					return InterestedResult.doAnOperationToGetPartialResult();
				}
		});

	try {
		for (int t = 0, n = info.size(); t < n; t++) {
			Future<PartialResult> f = completionService.take();
			PartialResult PartialResult = f.get();
			processThisSegment(PartialResult);
	    	}
	    } 
	    catch (InterruptedException e) {
			Thread.currentThread().interrupt();
		} 
	    catch (ExecutionException e) {
			throw somethinghrowable(e.getCause());
		}
	}
}

Solution 14 - Java

This is my solution, based in "AdamSkywalker" tip, and it works

package frss.main;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class TestHilos {

	void procesar() {
		ExecutorService es = Executors.newFixedThreadPool(4);
		List<Runnable> tasks = getTasks();
		CompletableFuture<?>[] futures = tasks.stream().map(task -> CompletableFuture.runAsync(task, es)).toArray(CompletableFuture[]::new);
		CompletableFuture.allOf(futures).join();
		es.shutdown();
		
		System.out.println("FIN DEL PROCESO DE HILOS");
	}

	private List<Runnable> getTasks() {
		List<Runnable> tasks = new ArrayList<Runnable>();
		
		Hilo01 task1 = new Hilo01();
		tasks.add(task1);
		
		Hilo02 task2 = new Hilo02();
		tasks.add(task2);
		return tasks;
	}
	
	private class Hilo01 extends Thread {

		@Override
		public void run() {
			System.out.println("HILO 1");
		}
		
	}
	
	private class Hilo02 extends Thread {

		@Override
		public void run() {
			try {
				sleep(2000);
			}
			catch (InterruptedException e) {
				e.printStackTrace();
			}
			System.out.println("HILO 2");
		}
		
	}
	

	public static void main(String[] args) {
		TestHilos test = new TestHilos();
		test.procesar();
	}
}

Solution 15 - Java

Clean way with ExecutorService

 List<Future<Void>> results = null;
 try {
     List<Callable<Void>> tasks = new ArrayList<>();
     ExecutorService executorService = Executors.newFixedThreadPool(4);
     results = executorService.invokeAll(tasks);
 } catch (InterruptedException ex) {
     ...
 } catch (Exception ex) {
     ...
 }

Solution 16 - Java

You could use this code:

public class MyTask implements Runnable {
      
    private CountDownLatch countDownLatch;

    public MyTask(CountDownLatch countDownLatch {
         this.countDownLatch = countDownLatch;
    }

    @Override
    public void run() {
         try {
             //Do somethings
             //
             this.countDownLatch.countDown();//important
         } catch (InterruptedException ex) {
              Thread.currentThread().interrupt();
         }
     }
}

CountDownLatch countDownLatch = new CountDownLatch(NUMBER_OF_TASKS);
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
for (int i = 0; i < NUMBER_OF_TASKS; i++){
     taskExecutor.execute(new MyTask(countDownLatch));
}
countDownLatch.await();
System.out.println("Finish tasks");

Solution 17 - Java

So I post my answer from linked question here, incase someone want a simpler way to do this

ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture[] futures = new CompletableFuture[10];
int i = 0;
while (...) {
    futures[i++] =  CompletableFuture.runAsync(runner, executor);
}

CompletableFuture.allOf(futures).join(); // THis will wait until all future ready.

Solution 18 - Java

I created the following working example. The idea is to have a way to process a pool of tasks (I am using a queue as example) with many Threads (determined programmatically by the numberOfTasks/threshold), and wait until all Threads are completed to continue with some other processing.

import java.util.PriorityQueue;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/** Testing CountDownLatch and ExecutorService to manage scenario where
 * multiple Threads work together to complete tasks from a single
 * resource provider, so the processing can be faster. */
public class ThreadCountDown {

private CountDownLatch threadsCountdown = null;
private static Queue<Integer> tasks = new PriorityQueue<>();

public static void main(String[] args) {
    // Create a queue with "Tasks"
    int numberOfTasks = 2000;
    while(numberOfTasks-- > 0) {
        tasks.add(numberOfTasks);
    }

    // Initiate Processing of Tasks
    ThreadCountDown main = new ThreadCountDown();
    main.process(tasks);
}

/* Receiving the Tasks to process, and creating multiple Threads
* to process in parallel. */
private void process(Queue<Integer> tasks) {
    int numberOfThreads = getNumberOfThreadsRequired(tasks.size());
    threadsCountdown = new CountDownLatch(numberOfThreads);
    ExecutorService threadExecutor = Executors.newFixedThreadPool(numberOfThreads);

    //Initialize each Thread
    while(numberOfThreads-- > 0) {
        System.out.println("Initializing Thread: "+numberOfThreads);
        threadExecutor.execute(new MyThread("Thread "+numberOfThreads));
    }

    try {
        //Shutdown the Executor, so it cannot receive more Threads.
        threadExecutor.shutdown();
        threadsCountdown.await();
        System.out.println("ALL THREADS COMPLETED!");
        //continue With Some Other Process Here
    } catch (InterruptedException ex) {
        ex.printStackTrace();
    }
}

/* Determine the number of Threads to create */
private int getNumberOfThreadsRequired(int size) {
    int threshold = 100;
    int threads = size / threshold;
    if( size > (threads*threshold) ){
        threads++;
    }
    return threads;
}

/* Task Provider. All Threads will get their task from here */
private synchronized static Integer getTask(){
    return tasks.poll();
}

/* The Threads will get Tasks and process them, while still available.
* When no more tasks available, the thread will complete and reduce the threadsCountdown */
private class MyThread implements Runnable {

    private String threadName;

    protected MyThread(String threadName) {
        super();
        this.threadName = threadName;
    }

    @Override
    public void run() {
        Integer task;
        try{
            //Check in the Task pool if anything pending to process
            while( (task = getTask()) != null ){
                processTask(task);
            }
        }catch (Exception ex){
            ex.printStackTrace();
        }finally {
            /*Reduce count when no more tasks to process. Eventually all
            Threads will end-up here, reducing the count to 0, allowing
            the flow to continue after threadsCountdown.await(); */
            threadsCountdown.countDown();
        }
    }

    private void processTask(Integer task){
        try{
            System.out.println(this.threadName+" is Working on Task: "+ task);
        }catch (Exception ex){
            ex.printStackTrace();
        }
    }
}
}

Hope it helps!

Solution 19 - Java

You could use your own subclass of ExecutorCompletionService to wrap taskExecutor, and your own implementation of BlockingQueue to get informed when each task completes and perform whatever callback or other action you desire when the number of completed tasks reaches your desired goal.

Solution 20 - Java

you should use executorService.shutdown() and executorService.awaitTermination method.

An example as follows :

public class ScheduledThreadPoolExample {

    public static void main(String[] args) throws InterruptedException {
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
        executorService.scheduleAtFixedRate(() -> System.out.println("process task."),
                0, 1, TimeUnit.SECONDS);

        TimeUnit.SECONDS.sleep(10);
        executorService.shutdown();
        executorService.awaitTermination(1, TimeUnit.DAYS);
    }

}

Solution 21 - Java

if you use more thread ExecutionServices SEQUENTIALLY and want to wait EACH EXECUTIONSERVICE to be finished. The best way is like below;

ExecutorService executer1 = Executors.newFixedThreadPool(THREAD_SIZE1);
for (<loop>) {
   executer1.execute(new Runnable() {
            @Override
            public void run() {
                ...
            }
        });
} 
executer1.shutdown();

try{
   executer1.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);

   ExecutorService executer2 = Executors.newFixedThreadPool(THREAD_SIZE2);
   for (true) {
      executer2.execute(new Runnable() {
            @Override
            public void run() {
                 ...
            }
        });
   } 
   executer2.shutdown();
} catch (Exception e){
 ...
}

Solution 22 - Java

Java 8 - We can use stream API to process stream. Please see snippet below

final List<Runnable> tasks = ...; //or any other functional interface
tasks.stream().parallel().forEach(Runnable::run) // Uses default pool
    
//alternatively to specify parallelism 
new ForkJoinPool(15).submit(
          () -> tasks.stream().parallel().forEach(Runnable::run) 
    ).get();

Solution 23 - Java


ExecutorService WORKER_THREAD_POOL 
  = Executors.newFixedThreadPool(10);
CountDownLatch latch = new CountDownLatch(2);
for (int i = 0; i < 2; i++) {
    WORKER_THREAD_POOL.submit(() -> {
        try {
            // doSomething();
            latch.countDown();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    });
}
 
// wait for the latch to be decremented by the two remaining threads
latch.await();

If doSomething() throw some other exceptions, the latch.countDown() seems will not execute, so what should I do?

Solution 24 - Java

Try-with-Resources syntax on AutoCloseable executor service with Project Loom

Project Loom seeks to add new features to the concurrency abilities in Java.

One of those features is making the ExecutorService AutoCloseable. This means every ExecutorService implementation will offer a close method. And it means we can use try-with-resources syntax to automatically close an ExecutorService object.

The ExecutorService#close method blocks until all submitted tasks are completed. Using close takes the place of calling shutdown & awaitTermination.

Being AutoCloseable contributes to Project Loom’s attempt to bring “structured concurrency” to Java.

try (
    ExecutorService executorService = Executors.… ;
) {
    // Submit your `Runnable`/`Callable` tasks to the executor service.
    …
}
// At this point, flow-of-control blocks until all submitted tasks are done/canceled/failed.
// After this point, the executor service will have been automatically shutdown, wia `close` method called by try-with-resources syntax.

For more information on Project Loom, search for talks and interviews given by Ron Pressler and others on the Project Loom team. Focus on the more recent, as Project Loom has evolved.

Experimental builds of Project Loom technology are available now, based on early-access Java 18.

Solution 25 - Java

This might help

Log.i(LOG_TAG, "shutting down executor...");
executor.shutdown();
while (true) {
                try {
                    Log.i(LOG_TAG, "Waiting for executor to terminate...");
                    if (executor.isTerminated())
                        break;
                    if (executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
                        break;
                    }
                } catch (InterruptedException ignored) {}
            }

Solution 26 - Java

You could call waitTillDone() on this Runner class:

Runner runner = Runner.runner(4); // create pool with 4 threads in thread pool

while(...) {
    runner.run(new MyTask()); // here you submit your task
}


runner.waitTillDone(); // and this blocks until all tasks are finished (or failed)


runner.shutdown(); // once you done you can shutdown the runner

You can reuse this class and call waitTillDone() as many times as you want to before calling shutdown(), plus your code is extremly simple. Also you don't have to know the number of tasks upfront.

To use it just add this gradle/maven compile 'com.github.matejtymes:javafixes:1.3.1' dependency to your project.

More details can be found here:

https://github.com/MatejTymes/JavaFixes

Solution 27 - Java

There is a method in executor getActiveCount() - that gives the count of active threads.

After spanning the thread, we can check if the activeCount() value is 0. Once the value is zero, it is meant that there are no active threads currently running which means task is finished:

while (true) {
	if (executor.getActiveCount() == 0) {
	//ur own piece of code
    break;
	}
}

Attributions

All content for this solution is sourced from the original question on Stackoverflow.

The content on this page is licensed under the Attribution-ShareAlike 4.0 International (CC BY-SA 4.0) license.

Content TypeOriginal AuthorOriginal Content on Stackoverflow
QuestionsergView Question on Stackoverflow
Solution 1 - JavacletusView Answer on Stackoverflow
Solution 2 - JavaChssPly76View Answer on Stackoverflow
Solution 3 - JavasjleeView Answer on Stackoverflow
Solution 4 - JavarogerdpackView Answer on Stackoverflow
Solution 5 - JavaAdamSkywalkerView Answer on Stackoverflow
Solution 6 - JavastrybaView Answer on Stackoverflow
Solution 7 - JavaRăzvan PetruescuView Answer on Stackoverflow
Solution 8 - JavaPekka EnbergView Answer on Stackoverflow
Solution 9 - Javauser2862544View Answer on Stackoverflow
Solution 10 - JavaRavindra babuView Answer on Stackoverflow
Solution 11 - JavaZedView Answer on Stackoverflow
Solution 12 - JavaKiranView Answer on Stackoverflow
Solution 13 - JavaAlberto GurrionView Answer on Stackoverflow
Solution 14 - Javafrss-soft.comView Answer on Stackoverflow
Solution 15 - JavashubhamView Answer on Stackoverflow
Solution 16 - JavaTuan PhamView Answer on Stackoverflow
Solution 17 - JavaMạnh Quyết NguyễnView Answer on Stackoverflow
Solution 18 - JavaFernando GilView Answer on Stackoverflow
Solution 19 - JavaAlex MartelliView Answer on Stackoverflow
Solution 20 - JavaRollen HoltView Answer on Stackoverflow
Solution 21 - JavaDr. XView Answer on Stackoverflow
Solution 22 - JavaVladView Answer on Stackoverflow
Solution 23 - JavaPengfei ZhanView Answer on Stackoverflow
Solution 24 - JavaBasil BourqueView Answer on Stackoverflow
Solution 25 - JavaAmol DesaiView Answer on Stackoverflow
Solution 26 - JavaMatej TymesView Answer on Stackoverflow
Solution 27 - JavauserView Answer on Stackoverflow