What is the easiest way to parallelize a task in java?

JavaMultithreadingParallel Processing

Java Problem Overview


Say I have a task like:

for(Object object: objects) {
    Result result = compute(object);
    list.add(result);
}

What is the easiest way to parallelize each compute() (assuming they are already parallelizable)?

I do not need an answer that matches strictly the code above, just a general answer. But if you need more info: my tasks are IO bound and this is for a Spring Web application and the tasks are going to be executed in a HTTP request.

Java Solutions


Solution 1 - Java

I would recommend taking a look at ExecutorService.

In particular, something like this:

ExecutorService EXEC = Executors.newCachedThreadPool();
List<Callable<Result>> tasks = new ArrayList<Callable<Result>>();
for (final Object object: objects) {
    Callable<Result> c = new Callable<Result>() {
		@Override
		public Result call() throws Exception {
			return compute(object);
		}
    };
    tasks.add(c);
}
List<Future<Result>> results = EXEC.invokeAll(tasks);

Note that using newCachedThreadPool could be bad if objects is a big list. A cached thread pool could create a thread per task! You may want to use newFixedThreadPool(n) where n is something reasonable (like the number of cores you have, assuming compute() is CPU bound).

Here's full code that actually runs:

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class ExecutorServiceExample {
    private static final Random PRNG = new Random();

    private static class Result {
        private final int wait;
        public Result(int code) {
            this.wait = code;
        }
    }

    public static Result compute(Object obj) throws InterruptedException {
        int wait = PRNG.nextInt(3000);
        Thread.sleep(wait);
        return new Result(wait);
    }

    public static void main(String[] args) throws InterruptedException,
        ExecutionException {
        List<Object> objects = new ArrayList<Object>();
        for (int i = 0; i < 100; i++) {
            objects.add(new Object());
        }

        List<Callable<Result>> tasks = new ArrayList<Callable<Result>>();
        for (final Object object : objects) {
            Callable<Result> c = new Callable<Result>() {
                @Override
                public Result call() throws Exception {
                    return compute(object);
                }
            };
            tasks.add(c);
        }

        ExecutorService exec = Executors.newCachedThreadPool();
        // some other exectuors you could try to see the different behaviours
        // ExecutorService exec = Executors.newFixedThreadPool(3);
        // ExecutorService exec = Executors.newSingleThreadExecutor();
        try {
            long start = System.currentTimeMillis();
            List<Future<Result>> results = exec.invokeAll(tasks);
            int sum = 0;
            for (Future<Result> fr : results) {
                sum += fr.get().wait;
                System.out.println(String.format("Task waited %d ms",
                    fr.get().wait));
            }
            long elapsed = System.currentTimeMillis() - start;
            System.out.println(String.format("Elapsed time: %d ms", elapsed));
            System.out.println(String.format("... but compute tasks waited for total of %d ms; speed-up of %.2fx", sum, sum / (elapsed * 1d)));
        } finally {
            exec.shutdown();
        }
    }
}

Solution 2 - Java

With Java8 and later you can use a parallelStream on the collection to achieve this:

List<T> objects = ...;

List<Result> result = objects.parallelStream().map(object -> {
            return compute(object);
        }).collect(Collectors.toList());

Note: the order of the result list may not match the order in the objects list.

Details how to setup the right number of threads are available in this stackoverflow question how-many-threads-are-spawned-in-parallelstream-in-java-8

Solution 3 - Java

One can simple create a few thread and get the result.

Thread t = new Mythread(object);

if (t.done()) {
   // get result
   // add result
}

EDIT : I think other solutions are cooler.

Solution 4 - Java

For a more detailed answer, read Java Concurrency in Practice and use java.util.concurrent.

Solution 5 - Java

Here's something I use in my own projects:

public class ParallelTasks
{
	private final Collection<Runnable> tasks = new ArrayList<Runnable>();

	public ParallelTasks()
	{
	}

	public void add(final Runnable task)
	{
		tasks.add(task);
	}

	public void go() throws InterruptedException
	{
		final ExecutorService threads = Executors.newFixedThreadPool(Runtime.getRuntime()
				.availableProcessors());
		try
		{
			final CountDownLatch latch = new CountDownLatch(tasks.size());
			for (final Runnable task : tasks)
				threads.execute(new Runnable() {
					public void run()
					{
						try
						{
							task.run();
						}
						finally
						{
							latch.countDown();
						}
					}
				});
			latch.await();
		}
		finally
		{
			threads.shutdown();
		}
	}
}

// ...

public static void main(final String[] args) throws Exception
{
    ParallelTasks tasks = new ParallelTasks();
    final Runnable waitOneSecond = new Runnable() {
        public void run()
        {
            try
            {
                Thread.sleep(1000);
            }
            catch (InterruptedException e)
            {
            }
        }
    };
    tasks.add(waitOneSecond);
    tasks.add(waitOneSecond);
    tasks.add(waitOneSecond);
    tasks.add(waitOneSecond);
    final long start = System.currentTimeMillis();
    tasks.go();
    System.err.println(System.currentTimeMillis() - start);
}

Which prints a bit over 2000 on my dual-core box.

Solution 6 - Java

You can use the ThreadPoolExecutor. Here is sample code: http://programmingexamples.wikidot.com/threadpoolexecutor (too long to bring it here)

Solution 7 - Java

I to was going to mention an executor class. Here is some example code that you would place in the executor class.

    private static ExecutorService threadLauncher = Executors.newFixedThreadPool(4);
    
    private List<Callable<Object>> callableList = new ArrayList<Callable<Object>>();

    public void addCallable(Callable<Object> callable) {
		this.callableList.add(callable);
	}
	
	public void clearCallables(){
		this.callableList.clear();
	}

	public void executeThreads(){
		try {
		threadLauncher.invokeAll(this.callableList);
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

    public Object[] getResult() {

		List<Future<Object>> resultList = null;
		Object[] resultArray = null;
		try {

			resultList = threadLauncher.invokeAll(this.callableList);

			resultArray = new Object[resultList.size()];

			for (int i = 0; i < resultList.size(); i++) {
				resultArray[i] = resultList.get(i).get();
			}

		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}

		return resultArray;
	}

Then to use it you would make calls to the executor class to populate and execute it.

executor.addCallable( some implementation of callable) // do this once for each task 
Object[] results = executor.getResult();

Solution 8 - Java

Fork/Join's parallel array is one option

Solution 9 - Java

A neat way is to utilize ExecutorCompletionService.

Say you have following code (as in your example):

 public static void main(String[] args) {
    List<Character> letters = IntStream.range(65, 91).mapToObj(i -> (char) i).collect(Collectors.toList());
    List<List<Character>> list = new ArrayList<>();

    for (char letter : letters) {
      List<Character> result = computeLettersBefore(letter);
      list.add(result);
    }

    System.out.println(list);
  }

  private static List<Character> computeLettersBefore(char letter) {
    return IntStream.range(65, 1 + letter).mapToObj(i -> (char) i).collect(Collectors.toList());
  }

Now to execute the tasks in parallel all you need to do is to create ExecutorCompletionService backed by thread pool. Then submit tasks and read the results. Since ExecutorCompletionService uses LinkedBlockingQueue under the hood, the results become available for pickup as soon as they are available (if you run the code you will notice that the order of results is random):

public static void main(String[] args) throws InterruptedException, ExecutionException {
    final ExecutorService threadPool = Executors.newFixedThreadPool(3);
    final ExecutorCompletionService<List<Character>> completionService = new ExecutorCompletionService<>(threadPool);

    final List<Character> letters = IntStream.range(65, 91).mapToObj(i -> (char) i).collect(Collectors.toList());
    List<List<Character>> list = new ArrayList<>();

    for (char letter : letters) {
      completionService.submit(() -> computeLettersBefore(letter));
    }

    // NOTE: instead over iterating over letters again number of submitted tasks can be used as a base for loop
    for (char letter : letters) {
      final List<Character> result = completionService.take().get();
      list.add(result);
    }

    threadPool.shutdownNow(); // NOTE: for safety place it inside finally block 

    System.out.println(list);
  }

  private static List<Character> computeLettersBefore(char letter) {
    return IntStream.range(65, 1 + letter).mapToObj(i -> (char) i).collect(Collectors.toList());
  }

Solution 10 - Java

I know it's an old old thread, but since Rxjava (now it's v3) came out, my favorite way to do parallel programming is through its flatMap by the following several lines. (sometimes but not very intuitive at the first sight)

// Assume we're in main thread at the moment
Flowable.create(...) // upstream data provider, main thread
  .map(...) // some transformers?? main thread
  .filter(...) // some filter thread
  .flatMap(data -> Flowable.just(data)
               .subscribeOn(Schedulers.from(...your executorservice for the sub worker.....), true) // true is to delay the error. 
               .doOnNext(this::process)
           , MAX_CONCURRENT) // max number of concurrent workers
  .subscribe();

You can check it's javadoc to understand the operators. Rxjava 3- Flowable A simple example:

Flowable.range(1, 100)
				.map(Object::toString)
				.flatMap (i -> Flowable.just(i)
						.doOnNext(j -> {
							System.out.println("Current thread is ");
							Thread.sleep(100);
						}).subscribeOn(Schedulers.io()), true, 10)
		
				.subscribe(
						integer -> log.info("We consumed {}", integer),
						throwable -> log.error("We met errors", throwable),
						() -> log.info("The stream completed!!!"));

And for your case:

for(Object object: objects) {
    Result result = compute(object);
    list.add(result);
}

We could try:

Flowable.fromIterable(objects)
        .flatMap(obj -> 
                    Flowable.just(compute(obj)).subscribeOn(Schedulers.io()), true, YOUR_CONCURRENCY_NUMBER))
        .doOnNext(res -> list.add(res))
        .subscribe()

Bonus points: if you need to add some ordering, let's say for example, odd number all go to worker1, even number worker2, etc. Rxjava can achieve that easily by groupBy and flatMap operators together. I won't go too details about them here. Enjoy playing :)))

Attributions

All content for this solution is sourced from the original question on Stackoverflow.

The content on this page is licensed under the Attribution-ShareAlike 4.0 International (CC BY-SA 4.0) license.

Content TypeOriginal AuthorOriginal Content on Stackoverflow
QuestionEduardoView Question on Stackoverflow
Solution 1 - JavaoverthinkView Answer on Stackoverflow
Solution 2 - Javai000174View Answer on Stackoverflow
Solution 3 - JavafastcodejavaView Answer on Stackoverflow
Solution 4 - JavaAdam GoodeView Answer on Stackoverflow
Solution 5 - JavaJonathan FeinbergView Answer on Stackoverflow
Solution 6 - JavaDavid RabinowitzView Answer on Stackoverflow
Solution 7 - JavamkamowskiView Answer on Stackoverflow
Solution 8 - JavaMichael BarkerView Answer on Stackoverflow
Solution 9 - JavawalkerosView Answer on Stackoverflow
Solution 10 - JavaYing CherryView Answer on Stackoverflow