Parallel operations on Kotlin collections?

Parallel ProcessingKotlin

Parallel Processing Problem Overview


In Scala, one can easily do a parallel map, forEach, etc, with:

collection.par.map(..)

Is there an equivalent in Kotlin?

Parallel Processing Solutions


Solution 1 - Parallel Processing

The Kotlin standard library has no support for parallel operations. However, since Kotlin uses the standard Java collection classes, you can use the Java 8 stream API to perform parallel operations on Kotlin collections as well.

e.g.

myCollection.parallelStream()
        .map { ... }
        .filter { ... }

Solution 2 - Parallel Processing

As of Kotlin 1.1, parallel operations can also be expressed quite elegantly in terms of coroutines. Here is pmap on lists:

fun <A, B>List<A>.pmap(f: suspend (A) -> B): List<B> = runBlocking {
    map { async(CommonPool) { f(it) } }.map { it.await() }
}

Note that coroutines are still an experimental feature.

Solution 3 - Parallel Processing

There is no official support in Kotlin's stdlib yet, but you could define an extension function to mimic par.map:

fun <T, R> Iterable<T>.pmap(
          numThreads: Int = Runtime.getRuntime().availableProcessors() - 2, 
          exec: ExecutorService = Executors.newFixedThreadPool(numThreads),
          transform: (T) -> R): List<R> {

    // default size is just an inlined version of kotlin.collections.collectionSizeOrDefault
    val defaultSize = if (this is Collection<*>) this.size else 10
    val destination = Collections.synchronizedList(ArrayList<R>(defaultSize))

    for (item in this) {
        exec.submit { destination.add(transform(item)) }
    }

    exec.shutdown()
    exec.awaitTermination(1, TimeUnit.DAYS)

    return ArrayList<R>(destination)
}

(github source)

Here's a simple usage example

val result = listOf("foo", "bar").pmap { it+"!" }.filter { it.contains("bar") }

If needed it allows to tweak threading by providing the number of threads or even a specific java.util.concurrent.Executor. E.g.

listOf("foo", "bar").pmap(4, transform = { it + "!" })

Please note, that this approach just allows to parallelize the map operation and does not affect any downstream bits. E.g. the filter in the first example would run single-threaded. However, in many cases just the data transformation (ie. map) requires parallelization. Furthermore, it would be straightforward to extend the approach from above to other elements of Kotlin collection API.

Solution 4 - Parallel Processing

You can use this extension method:

suspend fun <A, B> Iterable<A>.pmap(f: suspend (A) -> B): List<B> = coroutineScope {
    map { async { f(it) } }.awaitAll()
}

See Parallel Map in Kotlin for more info

Solution 5 - Parallel Processing

From 1.2 version, kotlin added a stream feature which is compliant with JRE8

So, iterating over a list asynchronously could be done like bellow:

fun main(args: Array<String>) {
  val c = listOf("toto", "tata", "tutu")
  c.parallelStream().forEach { println(it) }
}

Solution 6 - Parallel Processing

Kotlin wants to be idiomatic but not too much synthetic to be hard to understand at a first glance.

Parallel computation trough Coroutines is no exception. They want it to be easy but not implicit with some pre-built method, allowing to branch the computation when needed.

In your case:

collection.map { 
        async{ produceWith(it) } 
    }
    .forEach { 
        consume(it.await()) 
    }

Notice that to call async and await you need to be inside a so called Context, you cannot make suspending calls or launching a coroutine from a non-coroutine context. To enter one you can either:

  • runBlocking { /* your code here */ }: it will suspend the current thread until the lambda returns.
  • GlobalScope.launch { }: it will execute the lambda in parallel; if your main finishes executing while your coroutines have not bad things will happen, in that case better use runBlocking.

Hope it may helps :)

Solution 7 - Parallel Processing

At the present moment no. The official Kotlin comparison to Scala mentions:

> Things that may be added to Kotlin later: > > - Parallel collections

Solution 8 - Parallel Processing

Another approach I found to be quite elegant is something like this, using the kotlinx.coroutines library:

import kotlinx.coroutines.flow.asFlow

suspend fun process(myCollection: Iterable<Foo>) {
    myCollection.asFlow()
        .map { /* ... */ }
        .filter { /* ... */ }
        .collect { /* ... perform some side effect ... */ }
}

It does however require an extra dependency; kotlinx.coroutines is not in the stdlib.

Solution 9 - Parallel Processing

This solution assumes that your project is using coroutines:

implementation( "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.2")

The functions called parallelTransform don't retain the order of elements and return a Flow<R>, while the function parallelMap retains the order and returns a List<R>.

Create a threadpool for multiple invocations:

val numberOfCores = Runtime.getRuntime().availableProcessors()
val executorDispatcher: ExecutorCoroutineDispatcher =
    Executors.newFixedThreadPool(numberOfCores ).asCoroutineDispatcher()

use that dispatcher (and call close() when it's no longer needed):

inline fun <T, R> Iterable<T>.parallelTransform(
    dispatcher: ExecutorDispatcher,
    crossinline transform: (T) -> R
): Flow<R> = channelFlow {

    val items: Iterable<T> = this@parallelTransform
    val channelFlowScope: ProducerScope<R> = this@channelFlow

    launch(dispatcher) {
        items.forEach {item ->
            launch {
                channelFlowScope.send(transform(item))
            }
        }
    }
}

If threadpool reuse is of no concern (threadpools aren't cheap), you can use this version:

inline fun <T, R> Iterable<T>.parallelTransform(
    numberOfThreads: Int,
    crossinline transform: (T) -> R
): Flow<R> = channelFlow {

    val items: Iterable<T> = this@parallelTransform
    val channelFlowScope: ProducerScope<R> = this@channelFlow

    Executors.newFixedThreadPool(numberOfThreads).asCoroutineDispatcher().use { dispatcher ->
        launch( dispatcher ) {
            items.forEach { item ->
                launch {
                    channelFlowScope.send(transform(item))
                }
            }
        }
    }
}

if you need a version that retains the order of elements:

inline fun <T, R> Iterable<T>.parallelMap(
	dispatcher: ExecutorDispatcher,
	crossinline transform: (T) -> R
): List<R> = runBlocking {

	val items: Iterable<T> = this@parallelMap
	val result = ConcurrentSkipListMap<Int, R>()

	launch(dispatcher) {
		items.withIndex().forEach {(index, item) ->
			launch {
				result[index] = transform(item)
			}
		}
	}

	// ConcurrentSkipListMap is a SortedMap
    // so the values will be in the right order
    result.values.toList()
}

Solution 10 - Parallel Processing

I found this:

implementation 'com.github.cvb941:kotlin-parallel-operations:1.3'

details:

https://github.com/cvb941/kotlin-parallel-operations

Solution 11 - Parallel Processing

I've come up with a couple of extension functions:

  1. The suspend extension function on Iterable<T> type, which does a parallel processing of items and returns some result of processing each item. By default it uses Dispatchers.IO dispatcher to offload blocking tasks to a shared pool of threads. Must be called from a coroutine (including a coroutine with Dispatchers.Main dispatcher) or another suspend function.

    suspend fun <T, R> Iterable<T>.processInParallel(
        dispatcher: CoroutineDispatcher = Dispatchers.IO,
        processBlock: suspend (v: T) -> R,
    ): List<R> = coroutineScope { // or supervisorScope
        map {
            async(dispatcher) { processBlock(it) }
        }.awaitAll()
    }
    

    Example of calling from a coroutine:

    val collection = listOf("A", "B", "C", "D", "E")
    
    someCoroutineScope.launch {
        val results = collection.processInParallel {
            process(it)
        }
        // use processing results
    }
    

where someCoroutineScope is an instance of CoroutineScope.

  1. Launch and forget extension function on CoroutineScope, which doesn't return any result. It also uses Dispatchers.IO dispatcher by default. Can be called using CoroutineScope or from another coroutine.

    fun <T> CoroutineScope.processInParallelAndForget(
        iterable: Iterable<T>,
        dispatcher: CoroutineDispatcher = Dispatchers.IO,
        processBlock: suspend (v: T) -> Unit
    ) = iterable.forEach {
        launch(dispatcher) { processBlock(it) }
    }
    

    Example of calling:

    someoroutineScope.processInParallelAndForget(collection) {
        process(it)
    }
    
    // OR from another coroutine:
    
    someCoroutineScope.launch {
        processInParallelAndForget(collection) {
            process(it)
        }
    }
    

2a. Launch and forget extension function on Iterable<T>. It's almost the same as previous, but the extension type is different. CoroutineScope must be passed as argument to the function.

fun <T> Iterable<T>.processInParallelAndForget(
    scope: CoroutineScope,
    dispatcher: CoroutineDispatcher = Dispatchers.IO,
    processBlock: suspend (v: T) -> Unit
) = forEach {
    scope.launch(dispatcher) { processBlock(it) }
}

Calling:

collection.processInParallelAndForget(someCoroutineScope) {
    process(it)
}

// OR from another coroutine:

someScope.launch {
    collection.processInParallelAndForget(this) {
        process(it)
    }
}

Attributions

All content for this solution is sourced from the original question on Stackoverflow.

The content on this page is licensed under the Attribution-ShareAlike 4.0 International (CC BY-SA 4.0) license.

Content TypeOriginal AuthorOriginal Content on Stackoverflow
QuestionHRJView Question on Stackoverflow
Solution 1 - Parallel ProcessingyoleView Answer on Stackoverflow
Solution 2 - Parallel ProcessingAlex KraussView Answer on Stackoverflow
Solution 3 - Parallel ProcessingHolger BrandlView Answer on Stackoverflow
Solution 4 - Parallel ProcessingSharonView Answer on Stackoverflow
Solution 5 - Parallel ProcessingOlivierTerrienView Answer on Stackoverflow
Solution 6 - Parallel ProcessingLamberto BastiView Answer on Stackoverflow
Solution 7 - Parallel ProcessingMartin KonecnyView Answer on Stackoverflow
Solution 8 - Parallel ProcessingchrisView Answer on Stackoverflow
Solution 9 - Parallel ProcessingStephanSView Answer on Stackoverflow
Solution 10 - Parallel ProcessingMilan JurkulakView Answer on Stackoverflow
Solution 11 - Parallel ProcessingSergeyView Answer on Stackoverflow