"Parallel.For" for Java?

JavaParallel Processing

Java Problem Overview


I was wondering if there is a Parallel.For equivalent to the .net version for Java?

If there is could someone please supply an example? thanks!

Java Solutions


Solution 1 - Java

I guess the closest thing would be:

ExecutorService exec = Executors.newFixedThreadPool(SOME_NUM_OF_THREADS);
try {
	for (final Object o : list) {
		exec.submit(new Runnable() {
			@Override
			public void run() {
				// do stuff with o.
			}
		});
	}
} finally {
	exec.shutdown();
}

Based on TheLQ's comments, you would set SUM_NUM_THREADS to Runtime.getRuntime().availableProcessors();

Edit: Decided to add a basic "Parallel.For" implementation

public class Parallel {
    private static final int NUM_CORES = Runtime.getRuntime().availableProcessors();

    private static final ExecutorService forPool = Executors.newFixedThreadPool(NUM_CORES * 2, new NamedThreadFactory("Parallel.For"));

    public static <T> void For(final Iterable<T> elements, final Operation<T> operation) {
        try {
            // invokeAll blocks for us until all submitted tasks in the call complete
            forPool.invokeAll(createCallables(elements, operation));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    
    public static <T> Collection<Callable<Void>> createCallables(final Iterable<T> elements, final Operation<T> operation) {
        List<Callable<Void>> callables = new LinkedList<Callable<Void>>();
        for (final T elem : elements) {
            callables.add(new Callable<Void>() {
                @Override
                public Void call() {
                    operation.perform(elem);
                    return null;
                }
            });
        }
        
        return callables;
    }

    public static interface Operation<T> {
        public void perform(T pParameter);
    }
}

Example Usage of Parallel.For

// Collection of items to process in parallel
Collection<Integer> elems = new LinkedList<Integer>();
for (int i = 0; i < 40; ++i) {
    elems.add(i);
}
Parallel.For(elems, 
 // The operation to perform with each item
 new Parallel.Operation<Integer>() {
    public void perform(Integer param) {
        System.out.println(param);
    };
});

I guess this implementation is really more similar to Parallel.ForEach

Edit I put this up on GitHub if anyone is interested. Parallel For on GitHub

Solution 2 - Java

MLaw's solution is a very practical Parallel.ForEach. I added a bit modification to make a Parallel.For.

public class Parallel
{
static final int iCPU = Runtime.getRuntime().availableProcessors();

public static <T> void ForEach(Iterable <T> parameters,
				   final LoopBody<T> loopBody)
{
	ExecutorService executor = Executors.newFixedThreadPool(iCPU);
	List<Future<?>> futures  = new LinkedList<Future<?>>();
	
	for (final T param : parameters)
	{
		Future<?> future = executor.submit(new Runnable()
		{
			public void run() { loopBody.run(param); }
		});
		
		futures.add(future);
	}
	
	for (Future<?> f : futures)
	{
		try   { f.get(); }
		catch (InterruptedException e) { } 
		catch (ExecutionException   e) { }         
	}
	
	executor.shutdown();     
}

public static void For(int start,
		           int stop,
		   	   final LoopBody<Integer> loopBody)
{
	ExecutorService executor = Executors.newFixedThreadPool(iCPU);
	List<Future<?>> futures  = new LinkedList<Future<?>>();
	
	for (int i=start; i<stop; i++)
	{
		final Integer k = i;
		Future<?> future = executor.submit(new Runnable()
		{
			public void run() { loopBody.run(k); }
		});		
		futures.add(future);
	}
	
	for (Future<?> f : futures)
	{
		try   { f.get(); }
		catch (InterruptedException e) { } 
		catch (ExecutionException   e) { }         
	}
	
	executor.shutdown();     
}
}

public interface LoopBody <T>
{
	void run(T i);
}

public class ParallelTest
{
int k;	

public ParallelTest()
{
	k = 0;
	Parallel.For(0, 10, new LoopBody <Integer>()
	{
		public void run(Integer i)
		{
			k += i;
			System.out.println(i);			
		}
	});
	System.out.println("Sum = "+ k);
}

public static void main(String [] argv)
{
	ParallelTest test = new ParallelTest();
}
}

Solution 3 - Java

Built upon mlaw suggestion, add CountDownLatch. Add chunksize to reduce submit().

When tested with 4 million items array, this one gives 5X speed up over sequential for() on my Core i7 2630QM CPU.

public class Loop {
	public interface Each {
		void run(int i);
	}

	private static final int CPUs = Runtime.getRuntime().availableProcessors();

	public static void withIndex(int start, int stop, final Each body) {
		int chunksize = (stop - start + CPUs - 1) / CPUs;
		int loops = (stop - start + chunksize - 1) / chunksize;
		ExecutorService executor = Executors.newFixedThreadPool(CPUs);
		final CountDownLatch latch = new CountDownLatch(loops);
		for (int i=start; i<stop;) {
			final int lo = i;
			i += chunksize;
			final int hi = (i<stop) ? i : stop;
			executor.submit(new Runnable() {
				public void run() {
					for (int i=lo; i<hi; i++)
						body.run(i);
					latch.countDown();
				}
			});
		}
		try {
			latch.await();
		} catch (InterruptedException e) {}
		executor.shutdown();
	}

	public static void main(String [] argv) {
		Loop.withIndex(0, 9, new Loop.Each() {
			public void run(int i) {
				System.out.println(i*10);
			}
		});
	}
}

Solution 4 - Java

Here is my contribution to this topic https://github.com/pablormier/parallel-loops. The usage is very simple:

Collection<String> upperCaseWords = 
    Parallel.ForEach(words, new Parallel.F<String, String>() {
        public String apply(String s) {
            return s.toUpperCase();
        }
    });

It's also possible to change some behaviour aspects, like the number of threads (by default it uses a cached thread pool):

Collection<String> upperCaseWords = 
            new Parallel.ForEach<String, String>(words)
                .withFixedThreads(4)
                .apply(new Parallel.F<String, String>() {
                    public String apply(String s) {
                        return s.toUpperCase();
                    }
                }).values();

All the code is self-contained in one java class and has no more dependencies than the JDK. I also encourage you to check the new way to parallelize in a functional-style way with Java 8

Solution 5 - Java

Fork join framework in Java 7 is for concurrency support. But I don't know about an exact equivalent for Parallel.For.

Solution 6 - Java

A simpler option would be

// A thread pool which runs for the life of the application.
private static final ExecutorService EXEC = 
    Executors.newFixedThreadPool(SOME_NUM_OF_THREADS); 

//later 
EXEC.invokeAll(tasks); // you can optionally specify a timeout.

Solution 7 - Java

There is a equivalent for Parallel.For available as a java extension. It is called Ateji PX, they have a free version you can play with. http://www.ateji.com/px/index.html

It is the exact equivalent of parallel.for and looks similar to.

For ||

More examples and explaination on wikipedia: http://en.wikipedia.org/wiki/Ateji_PX

Closed thing in Java IMO

Solution 8 - Java

Synchronization often kills the speedup of parallel for-loops. Therefore, parallel for-loops often need their private data and a reduction mechanism to reduce all threads private data to comprise a single result.

So I've extended the Parallel.For version of Weimin Xiao by a reduction mechanism.

public class Parallel {
public static interface IntLoopBody {
    void run(int i);
}

public static interface LoopBody<T> {
    void run(T i);
}

public static interface RedDataCreator<T> {
	T run();
}

public static interface RedLoopBody<T> {
    void run(int i, T data);
}

public static interface Reducer<T> {
    void run(T returnData, T addData);
}

private static class ReductionData<T> {
	Future<?> future;
	T data;
}

static final int nCPU = Runtime.getRuntime().availableProcessors();

public static <T> void ForEach(Iterable <T> parameters, final LoopBody<T> loopBody) {
    ExecutorService executor = Executors.newFixedThreadPool(nCPU);
    List<Future<?>> futures  = new LinkedList<Future<?>>();

    for (final T param : parameters) {
    	futures.add(executor.submit(() -> loopBody.run(param) ));
    }

    for (Future<?> f : futures) {
    	try { 
    		f.get();
	    } catch (InterruptedException | ExecutionException e) { 
	    	System.out.println(e); 
	    }
    }
    executor.shutdown();     
}

public static void For(int start, int stop, final IntLoopBody loopBody) {
    final int chunkSize = (stop - start + nCPU - 1)/nCPU;
    final int loops = (stop - start + chunkSize - 1)/chunkSize;
    ExecutorService executor = Executors.newFixedThreadPool(loops);
    List<Future<?>> futures  = new LinkedList<Future<?>>();

    for (int i=start; i < stop; ) {
        final int iStart = i;
        i += chunkSize;
        final int iStop = (i < stop) ? i : stop;
        
        futures.add(executor.submit(() -> {
        	for (int j = iStart; j < iStop; j++) 
        		loopBody.run(j);
        }));     
    }

    for (Future<?> f : futures) {
    	try { 
    		f.get();
	    } catch (InterruptedException | ExecutionException e) { 
	    	System.out.println(e); 
	    }
    }
    executor.shutdown();     
}

public static <T> void For(int start, int stop, T result, final RedDataCreator<T> creator, final RedLoopBody<T> loopBody, final Reducer<T> reducer) {
    final int chunkSize = (stop - start + nCPU - 1)/nCPU;
    final int loops = (stop - start + chunkSize - 1)/chunkSize;
    ExecutorService executor = Executors.newFixedThreadPool(loops);
    List<ReductionData<T>> redData  = new LinkedList<ReductionData<T>>();

    for (int i = start; i < stop; ) {
        final int iStart = i;
        i += chunkSize;
        final int iStop = (i < stop) ? i : stop;
        final ReductionData<T> rd = new ReductionData<T>();
                    
        rd.data = creator.run();
        rd.future = executor.submit(() -> {
            for (int j = iStart; j < iStop; j++) {
            	loopBody.run(j, rd.data);
            }
        });
        redData.add(rd);
    }

    for (ReductionData<T> rd : redData) {
    	try { 
    		rd.future.get();
    		if (rd.data != null) {
    			reducer.run(result, rd.data);
    		}
	    } catch (InterruptedException | ExecutionException e) { 
			e.printStackTrace();
	    }
    }
    executor.shutdown();     
}
}

Here is a simple test example: a parallel character counter using a non-synchronized map.

import java.util.*;

public class ParallelTest {
static class Counter {
	int cnt;
	
	Counter() {
		cnt = 1;
	}
}

public static void main(String[] args) {
	String text = "More formally, if this map contains a mapping from a key k to a " + 
            "value v such that key compares equal to k according to the map's ordering, then " +
            "this method returns v; otherwise it returns null.";
	Map<Character, Counter> charCounter1 = new TreeMap<Character, Counter>();
	Map<Character, Counter> charCounter2 = new TreeMap<Character, Counter>();

	// first sequentially
	for(int i=0; i < text.length(); i++) {
		char c = text.charAt(i);
		Counter cnt = charCounter1.get(c);
		if (cnt == null) {
			charCounter1.put(c, new Counter());
		} else {
			cnt.cnt++;
		}
	}
	for(Map.Entry<Character, Counter> entry: charCounter1.entrySet()) {
		System.out.println(entry.getKey() + ": " + entry.getValue().cnt);
	}
	
	// now parallel without synchronization
	Parallel.For(0, text.length(), charCounter2,
		// Creator
		() -> new TreeMap<Character, Counter>(), 
		// Loop Body
		(i, map) -> {
			char c = text.charAt(i);
			Counter cnt = map.get(c);
			if (cnt == null) {
				map.put(c, new Counter());
			} else {
				cnt.cnt++;
			}
		}, 
		// Reducer
		(result, map) -> {
			for(Map.Entry<Character, Counter> entry: map.entrySet()) {
				Counter cntR = result.get(entry.getKey());
				if (cntR == null) {
					result.put(entry.getKey(), entry.getValue());
				} else {
					cntR.cnt += entry.getValue().cnt;
				}
			}
		}
	);
	
	// compare results
	assert charCounter1.size() == charCounter2.size() : "wrong size: " + charCounter1.size() + ", " + charCounter2.size();
	Iterator<Map.Entry<Character, Counter>> it2 = charCounter2.entrySet().iterator();
	for(Map.Entry<Character, Counter> entry: charCounter1.entrySet()) {
		Map.Entry<Character, Counter> entry2 = it2.next();
		assert entry.getKey() == entry2.getKey() && entry.getValue().cnt == entry2.getValue().cnt : "wrong content";
	}
	
	System.out.println("Well done!");
}
}

Solution 9 - Java

I have an updated Java Parallel class which can do Parallel.For, Parallel.ForEach, Parallel.Tasks, and partitioned parallel loop. Source code is as follows:

Examples of using those parallel loops are the following:

public static void main(String [] argv)
{
    //sample data
    final ArrayList<String> ss = new ArrayList<String>();

    String [] s = {"a", "b", "c", "d", "e", "f", "g"};
    for (String z : s) ss.add(z);
    int m = ss.size();

    //parallel-for loop
    System.out.println("Parallel.For loop:");
    Parallel.For(0, m, new LoopBody<Integer>()
    {
        public void run(Integer i)
        {
           System.out.println(i +"\t"+ ss.get(i));   
        }       
    });   

   //parallel for-each loop
   System.out.println("Parallel.ForEach loop:");
   Parallel.ForEach(ss, new LoopBody<String>()
   {
       public void run(String p)
       {
           System.out.println(p);               
       }       
   });

   //partitioned parallel loop
   System.out.println("Partitioned Parallel loop:");
   Parallel.ForEach(Parallel.create(0, m), new LoopBody<Partition>()
   {
       public void run(Partition p)
       {
           for(int i=p.start; i<p.end; i++)
               System.out.println(i +"\t"+ ss.get(i));
       }
   });

   //parallel tasks
   System.out.println("Parallel Tasks:");
   Parallel.Tasks(new Task []
   {
       //task-1
       new Task() {public void run()
       {
           for(int i=0; i<3; i++)
               System.out.println(i +"\t"+ ss.get(i));
       }},
   
       //task-2
       new Task() {public void run()
       {
           for (int i=3; i<6; i++)
               System.out.println(i +"\t"+ ss.get(i));
       }}   
   });
}

Solution 10 - Java

I found ForkJoinPool and IntStream much helpfull in my case (Parallel For with limited number of threads).

C#:

static void MathParallel(int threads)
        {
            Parallel.For(1, partitions, new ParallelOptions { MaxDegreeOfParallelism = threads }, (i) => {
                partitionScores[i] = Math.Sin(3*i);
            });
        }

and Java equivalent:

static void mathParallel(int threads) {
        ForkJoinPool pool = new ForkJoinPool(threads);
            pool.submit(()-> IntStream.range(0, partitions).parallel().forEach(i -> {
                partitionScores[i] = Math.sin(3*i);
            }));
        pool.shutdown();
        while (!pool.isTerminated()){
        }
    }

Solution 11 - Java

This is what I use for Java 7 and less.

For Java 8 you can use forEach()

[UPDATE ]

Parallel class :

private static final int NUM_CORES = Runtime.getRuntime().availableProcessors();
private static final int MAX_THREAD = NUM_CORES*2;	

public static <T2 extends T, T> void For(final Iterable<T2> elements, final Operation<T> operation) {
    if (elements != null) {
        final Iterator<T2> iterator = elements.iterator();
        if (iterator.hasNext()) {
            final Throwable[] throwable = new Throwable[1];
            final Callable<Void> callable = new Callable<Void>() {
                boolean first = true;
                @Override
                public final Void call() throws Exception {
                    if ((first || operation.follow()) && iterator.hasNext()) {
                        T result;
                        result = iterator.next();
                        operation.perform(result);
                        if (first) {
                            synchronized (this) {
                                first = false;
                            }
                        }
                    }
                    return null;
                }
            };
            final Runnable runnable = new Runnable() {
                @Override
                public final void run() {
                    while (iterator.hasNext()) {
                        try {
                            synchronized (callable) {
                                callable.call();
                            }
                            if (!operation.follow()) {
                                break;
                            }
                        } catch (Throwable t) {
                            t.printStackTrace();
                            synchronized (throwable) {
                                throwable[0] = t;
                            }
                            throw new RuntimeException(t);
                        }
                    }
                }
            };
            final ExecutorService executor = Executors.newFixedThreadPool(MAX_THREAD);
            for (int threadIndex=0; threadIndex<MAX_THREAD && iterator.hasNext(); threadIndex++) {
                executor.execute(runnable);
            }
            executor.shutdown();
            while (!executor.isTerminated()) {
                try {
                    Thread.sleep(0,1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    throw new RuntimeException(e);
                }
            }
            if (throwable[0] != null) throw new RuntimeException(throwable[0]);
        }
    }
}

public interface Operation<T> {
    void perform(T pParameter);
    boolean follow();
}

Example

@Test
public void test() {
    List<Long> longList = new ArrayList<Long>();
    for (long i = 0; i < 1000000; i++) {
        longList.add(i);
    }
    final List<Integer> integerList = new LinkedList<>();
    Parallel.For((Iterable<? extends Number>) longList, new Parallel.Operation<Number>() {

        @Override
        public void perform(Number pParameter) {
            System.out.println(pParameter);
            integerList.add(pParameter.intValue());
        }

        @Override
        public boolean follow() {
            return true;
        }
    });
    for (Number num : integerList) {
        System.out.println(num);
    }
}

Monitoring

[tag:java][tag:parallel][tag:multithreading]

Attributions

All content for this solution is sourced from the original question on Stackoverflow.

The content on this page is licensed under the Attribution-ShareAlike 4.0 International (CC BY-SA 4.0) license.

Content TypeOriginal AuthorOriginal Content on Stackoverflow
QuestionJamieView Question on Stackoverflow
Solution 1 - JavaMatt WonlawView Answer on Stackoverflow
Solution 2 - JavaWeimin XiaoView Answer on Stackoverflow
Solution 3 - JavasantiwkView Answer on Stackoverflow
Solution 4 - JavaPablo R. MierView Answer on Stackoverflow
Solution 5 - JavaEmilView Answer on Stackoverflow
Solution 6 - JavaPeter LawreyView Answer on Stackoverflow
Solution 7 - JavaClippyView Answer on Stackoverflow
Solution 8 - JavaChrisView Answer on Stackoverflow
Solution 9 - JavaWeimin XiaoView Answer on Stackoverflow
Solution 10 - JavaFaFrykView Answer on Stackoverflow
Solution 11 - JavaCrammeurView Answer on Stackoverflow