When should I use a CompletionService over an ExecutorService?

JavaMultithreadingConcurrencyCompletion Service

Java Problem Overview


I just found CompletionService in this blog post. However, this does't really showcases the advantages of CompletionService over a standard ExecutorService. The same code can be written with either. So, when is a CompletionService useful?

Can you give a short code sample to make it crystal clear? For example, this code sample just shows where a CompletionService is not needed (=equivalent to ExecutorService)

    ExecutorService taskExecutor = Executors.newCachedThreadPool();
    //        CompletionService<Long> taskCompletionService =
    //                new ExecutorCompletionService<Long>(taskExecutor);
    Callable<Long> callable = new Callable<Long>() {
        @Override
        public Long call() throws Exception {
            return 1L;
        }
    };

    Future<Long> future = // taskCompletionService.submit(callable);
        taskExecutor.submit(callable);

    while (!future.isDone()) {
        // Do some work...
        System.out.println("Working on something...");
    }
    try {
        System.out.println(future.get());
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }

Java Solutions


Solution 1 - Java

Omitting many details:

  • ExecutorService = incoming queue + worker threads
  • CompletionService = incoming queue + worker threads + output queue

Solution 2 - Java

With ExecutorService, once you have submitted the tasks to run, you need to manually code for efficiently getting the results of the tasks completed.

With CompletionService, this is pretty much automated. The difference is not very evident in the code you have presented because you are submitting just one task. However, imagine you have a list of tasks to be submitted. In the example below, multiple tasks are submitted to the CompletionService. Then, instead of trying to find out which task has completed (to get the results), it just asks the CompletionService instance to return the results as they become available.

public class CompletionServiceTest {
    	
    	class CalcResult {
    		 long result ;
    
    		 CalcResult(long l) {
    			 result = l;
    		 }
    	}
    	
    	class CallableTask implements Callable<CalcResult> {
    		String taskName ;
    		long  input1 ;
    		int input2 ;
    		
    		CallableTask(String name , long v1 , int v2 ) {
    			taskName = name;
    			input1 = v1;
    			input2 = v2 ;
    		}
    		
    		public CalcResult call() throws Exception {
    			System.out.println(" Task " + taskName + " Started -----");
    			for(int i=0;i<input2 ;i++) {
    				try {
    					Thread.sleep(200);
    				} catch (InterruptedException e) {
    					System.out.println(" Task " + taskName + " Interrupted !! ");
    					e.printStackTrace();
    				}
    				input1 += i;
    			}
    			System.out.println(" Task " + taskName + " Completed @@@@@@");
    			return new CalcResult(input1) ;
    		}
    		
    	}
    	
    	public void test(){
    		ExecutorService taskExecutor = Executors.newFixedThreadPool(3);
            CompletionService<CalcResult> taskCompletionService = new ExecutorCompletionService<CalcResult>(taskExecutor);
    		
    	    int submittedTasks = 5;
    	    for (int i=0;i< submittedTasks;i++) {
    	    	taskCompletionService.submit(new CallableTask (
    					String.valueOf(i), 
        					(i * 10), 
        					((i * 10) + 10  )
    					));
    	       System.out.println("Task " + String.valueOf(i) + "subitted");
    	    }
    	    for (int tasksHandled=0;tasksHandled<submittedTasks;tasksHandled++) {
    	    	try {
    	    		System.out.println("trying to take from Completion service");
    				Future<CalcResult> result = taskCompletionService.take();
    				System.out.println("result for a task availble in queue.Trying to get()");
    				// above call blocks till atleast one task is completed and results availble for it
    				// but we dont have to worry which one
    				
    				// process the result here by doing result.get()
    				CalcResult l = result.get();
    				System.out.println("Task " + String.valueOf(tasksHandled) + "Completed - results obtained : " + String.valueOf(l.result));
    				
    			} catch (InterruptedException e) {
    				// Something went wrong with a task submitted
    				System.out.println("Error Interrupted exception");
    				e.printStackTrace();
    			} catch (ExecutionException e) {
    				// Something went wrong with the result
    				e.printStackTrace();
    				System.out.println("Error get() threw exception");
    			}
    	    }
    	}
    }

Solution 3 - Java

Basically you use a CompletionService if you want to execute multiple tasks in parallel and then work with them in their completion order. So, if I execute 5 jobs, the CompletionService will give me the first one that that finishes. The example where there is only a single task confers no extra value over an Executor apart from the ability to submit a Callable.

Solution 4 - Java

I think the javadoc best answers the question of when the CompletionService is useful in a way an ExecutorService isn't.

> A service that decouples the production of new asynchronous tasks from the consumption of the results of completed tasks.

Basically, this interface allows a program to have producers which create and submit tasks (and even examine the results of those submissions) without knowing about any other consumers of the results of those tasks. Meanwhile, consumers which are aware of the CompletionService could poll for or take results without being aware of the producers submitting the tasks.

For the record, and I could be wrong because it is rather late, but I am fairly certain that the sample code in that blog post causes a memory leak. Without an active consumer taking results out of the ExecutorCompletionService's internal queue, I'm not sure how the blogger expected that queue to drain.

Solution 5 - Java

First of all, if we do not want to waste processor time, we will not use

while (!future.isDone()) {
        // Do some work...
}

We must use

service.shutdown();
service.awaitTermination(14, TimeUnit.DAYS);

The bad thing about this code is that it will shut down ExecutorService. If we want to continue work with it (i.e. we have some recursicve task creation), we have two alternatives: invokeAll or ExecutorService.

invokeAll will wait untill all tasks will be complete. ExecutorService grants us ability to take or poll results one by one.

And, finily, recursive example:

ExecutorService executorService = Executors.newFixedThreadPool(THREAD_NUMBER);
ExecutorCompletionService<String> completionService = new ExecutorCompletionService<String>(executorService);

while (Tasks.size() > 0) {
	for (final Task task : Tasks) {
		completionService.submit(new Callable<String>() {	
			@Override
			public String call() throws Exception {
				return DoTask(task);
			}
		});
	} 
 
	try {					
		int taskNum = Tasks.size();
		Tasks.clear();
		for (int i = 0; i < taskNum; ++i) {
			Result result = completionService.take().get();
			if (result != null)
				Tasks.add(result.toTask());
		}			
	} catch (InterruptedException e) {
	//	error :(
	} catch (ExecutionException e) {
	//	error :(
	}
}

Solution 6 - Java

See it by yourself at run time,try to implement both solutions (Executorservice and Completionservice) and you'll see how different they behave and it will be more clear on when to use one or the other. There is an example here if you want http://rdafbn.blogspot.co.uk/2013/01/executorservice-vs-completionservice-vs.html

Solution 7 - Java

Let's say you have 5 long running task(callable task) and you have submitted those task to executer service. Now imagine you don't want to wait for all 5 task to compete instead you want to do some sort of processing on these task if any one completes. Now this can be done either by writing polling logic on future objects or use this API.

Solution 8 - Java

package com.barcap.test.test00;

import java.util.concurrent.*;

/**
 * Created by Sony on 25-04-2019.
 */
public class ExecutorCompletest00 {

    public static void main(String[] args) {

        ExecutorService exc= Executors.newFixedThreadPool( 10 );
        ExecutorCompletionService executorCompletionService= new ExecutorCompletionService( exc );

        for (int i=1;i<10;i++){
            Task00 task00= new Task00( i );
            executorCompletionService.submit( task00 );
        }
        for (int i=1;i<20;i++){
            try {
                Future<Integer> future= (Future <Integer>) executorCompletionService.take();
                Integer inttest=future.get();
                System.out.println(" the result of completion service is "+inttest);

               break;
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
    }
}

=======================================================

package com.barcap.test.test00;

import java.util.*;
import java.util.concurrent.*;

/**
 * Created by Sony on 25-04-2019.
 */
public class ExecutorServ00 {

    public static void main(String[] args) {
        ExecutorService executorService=Executors.newFixedThreadPool( 9 );
        List<Future> futList= new ArrayList <>(  );
        for (int i=1;i<10;i++) {
           Future result= executorService.submit( new Task00( i ) );
           futList.add( result );
        }

         for (Future<Integer> futureEach :futList ){
             try {
              Integer inm=   futureEach.get();

                 System.out.println("the result of future executorservice is "+inm);
                 break;
             } catch (InterruptedException e) {
                 e.printStackTrace();
             } catch (ExecutionException e) {
                 e.printStackTrace();
             }
         }
    }
}

===========================================================

package com.barcap.test.test00;

import java.util.concurrent.*;

/**
 * Created by Sony on 25-04-2019.
 */
public class Task00 implements Callable<Integer> {

    int i;

    public Task00(int i) {
        this.i = i;
    }

    @Override
    public Integer call() throws Exception {
        System.out.println(" the current thread is "+Thread.currentThread().getName()  +" the result should be "+i);
        int sleepforsec=100000/i;
         Thread.sleep( sleepforsec );
        System.out.println(" the task complted for "+Thread.currentThread().getName()  +" the result should be "+i);



        return i;
    }
}

======================================================================

difference of logs for executor completion service: the current thread is pool-1-thread-1 the result should be 1 the current thread is pool-1-thread-2 the result should be 2 the current thread is pool-1-thread-3 the result should be 3 the current thread is pool-1-thread-4 the result should be 4 the current thread is pool-1-thread-6 the result should be 6 the current thread is pool-1-thread-5 the result should be 5 the current thread is pool-1-thread-7 the result should be 7 the current thread is pool-1-thread-9 the result should be 9 the current thread is pool-1-thread-8 the result should be 8 the task complted for pool-1-thread-9 the result should be 9 teh result is 9 the task complted for pool-1-thread-8 the result should be 8 the task complted for pool-1-thread-7 the result should be 7 the task complted for pool-1-thread-6 the result should be 6 the task complted for pool-1-thread-5 the result should be 5 the task complted for pool-1-thread-4 the result should be 4 the task complted for pool-1-thread-3 the result should be 3 the task complted for pool-1-thread-2 the result should be 2

the current thread is pool-1-thread-1 the result should be 1 the current thread is pool-1-thread-3 the result should be 3 the current thread is pool-1-thread-2 the result should be 2 the current thread is pool-1-thread-5 the result should be 5 the current thread is pool-1-thread-4 the result should be 4 the current thread is pool-1-thread-6 the result should be 6 the current thread is pool-1-thread-7 the result should be 7 the current thread is pool-1-thread-8 the result should be 8 the current thread is pool-1-thread-9 the result should be 9 the task complted for pool-1-thread-9 the result should be 9 the task complted for pool-1-thread-8 the result should be 8 the task complted for pool-1-thread-7 the result should be 7 the task complted for pool-1-thread-6 the result should be 6 the task complted for pool-1-thread-5 the result should be 5 the task complted for pool-1-thread-4 the result should be 4 the task complted for pool-1-thread-3 the result should be 3 the task complted for pool-1-thread-2 the result should be 2 the task complted for pool-1-thread-1 the result should be 1 the result of future is 1

=======================================================

for executorservice the result will only be avialable after all tasks complted.

executor completionservice any result avilable make that return.

Solution 9 - Java

If the task producer is not interested in the results and it is another component's responsibility to process results of asynchronous task executed by executor service, then you should use CompletionService. It helps you in separating task result processor from task producer. See example http://www.zoftino.com/java-concurrency-executors-framework-tutorial

Solution 10 - Java

there is another advantage of using completionservice: Performance

when you call future.get(), you are spin waiting:

from java.util.concurrent.CompletableFuture

  private Object waitingGet(boolean interruptible) {
        Signaller q = null;
        boolean queued = false;
        int spins = -1;
        Object r;
        while ((r = result) == null) {
            if (spins < 0)
                spins = (Runtime.getRuntime().availableProcessors() > 1) ?
                    1 << 8 : 0; // Use brief spin-wait on multiprocessors
            else if (spins > 0) {
                if (ThreadLocalRandom.nextSecondarySeed() >= 0)
                    --spins;
            }

when you have a long-running task, this will be a disaster for performance.

with completionservice, once the task is done, it's result will be enqueued and you can poll the queue with lower performance overhand.

completionservice achieve this by using wrap task with a done hook.

java.util.concurrent.ExecutorCompletionService

    private class QueueingFuture extends FutureTask<Void> {
    QueueingFuture(RunnableFuture<V> task) {
        super(task, null);
        this.task = task;
    }
    protected void done() { completionQueue.add(task); }
    private final Future<V> task;
}

Solution 11 - Java

assuming you execute a tasks in parallel and you save the Future results in a list:

The practical main difference between ExecutorService and CompletionService is:

ExecutorService get() will try to retrieve the results in the submitted order waiting for completion.

CompletionService take() + get() will try to retrieve the results in the completion order disregarding the submission order.

Attributions

All content for this solution is sourced from the original question on Stackoverflow.

The content on this page is licensed under the Attribution-ShareAlike 4.0 International (CC BY-SA 4.0) license.

Content TypeOriginal AuthorOriginal Content on Stackoverflow
Questionripper234View Question on Stackoverflow
Solution 1 - JavaAlex MillerView Answer on Stackoverflow
Solution 2 - JavaBhaskarView Answer on Stackoverflow
Solution 3 - JavaJed Wesley-SmithView Answer on Stackoverflow
Solution 4 - JavaTim BenderView Answer on Stackoverflow
Solution 5 - JavaSklavitView Answer on Stackoverflow
Solution 6 - JavamarcocastView Answer on Stackoverflow
Solution 7 - JavaBikas KatwalView Answer on Stackoverflow
Solution 8 - JavaPrasadView Answer on Stackoverflow
Solution 9 - JavaArnav RaoView Answer on Stackoverflow
Solution 10 - Java宏杰李View Answer on Stackoverflow
Solution 11 - Javauser3657103View Answer on Stackoverflow