Parallel operations on Kotlin collections?
Parallel ProcessingKotlinParallel Processing Problem Overview
In Scala, one can easily do a parallel map, forEach, etc, with:
collection.par.map(..)
Is there an equivalent in Kotlin?
Parallel Processing Solutions
Solution 1 - Parallel Processing
The Kotlin standard library has no support for parallel operations. However, since Kotlin uses the standard Java collection classes, you can use the Java 8 stream API to perform parallel operations on Kotlin collections as well.
e.g.
myCollection.parallelStream()
.map { ... }
.filter { ... }
Solution 2 - Parallel Processing
As of Kotlin 1.1, parallel operations can also be expressed quite elegantly in terms of coroutines. Here is pmap
on lists:
fun <A, B>List<A>.pmap(f: suspend (A) -> B): List<B> = runBlocking {
map { async(CommonPool) { f(it) } }.map { it.await() }
}
Note that coroutines are still an experimental feature.
Solution 3 - Parallel Processing
There is no official support in Kotlin's stdlib yet, but you could define an extension function to mimic par.map
:
fun <T, R> Iterable<T>.pmap(
numThreads: Int = Runtime.getRuntime().availableProcessors() - 2,
exec: ExecutorService = Executors.newFixedThreadPool(numThreads),
transform: (T) -> R): List<R> {
// default size is just an inlined version of kotlin.collections.collectionSizeOrDefault
val defaultSize = if (this is Collection<*>) this.size else 10
val destination = Collections.synchronizedList(ArrayList<R>(defaultSize))
for (item in this) {
exec.submit { destination.add(transform(item)) }
}
exec.shutdown()
exec.awaitTermination(1, TimeUnit.DAYS)
return ArrayList<R>(destination)
}
Here's a simple usage example
val result = listOf("foo", "bar").pmap { it+"!" }.filter { it.contains("bar") }
If needed it allows to tweak threading by providing the number of threads or even a specific java.util.concurrent.Executor
. E.g.
listOf("foo", "bar").pmap(4, transform = { it + "!" })
Please note, that this approach just allows to parallelize the map
operation and does not affect any downstream bits. E.g. the filter
in the first example would run single-threaded. However, in many cases just the data transformation (ie. map
) requires parallelization. Furthermore, it would be straightforward to extend the approach from above to other elements of Kotlin collection API.
Solution 4 - Parallel Processing
You can use this extension method:
suspend fun <A, B> Iterable<A>.pmap(f: suspend (A) -> B): List<B> = coroutineScope {
map { async { f(it) } }.awaitAll()
}
See Parallel Map in Kotlin for more info
Solution 5 - Parallel Processing
From 1.2 version, kotlin added a stream feature which is compliant with JRE8
So, iterating over a list asynchronously could be done like bellow:
fun main(args: Array<String>) {
val c = listOf("toto", "tata", "tutu")
c.parallelStream().forEach { println(it) }
}
Solution 6 - Parallel Processing
Kotlin wants to be idiomatic but not too much synthetic to be hard to understand at a first glance.
Parallel computation trough Coroutines is no exception. They want it to be easy but not implicit with some pre-built method, allowing to branch the computation when needed.
In your case:
collection.map {
async{ produceWith(it) }
}
.forEach {
consume(it.await())
}
Notice that to call async
and await
you need to be inside a so called Context
, you cannot make suspending calls or launching a coroutine from a non-coroutine context. To enter one you can either:
runBlocking { /* your code here */ }
: it will suspend the current thread until the lambda returns.GlobalScope.launch { }
: it will execute the lambda in parallel; if yourmain
finishes executing while your coroutines have not bad things will happen, in that case better userunBlocking
.
Hope it may helps :)
Solution 7 - Parallel Processing
At the present moment no. The official Kotlin comparison to Scala mentions:
> Things that may be added to Kotlin later: > > - Parallel collections
Solution 8 - Parallel Processing
Another approach I found to be quite elegant is something like this, using the kotlinx.coroutines library:
import kotlinx.coroutines.flow.asFlow
suspend fun process(myCollection: Iterable<Foo>) {
myCollection.asFlow()
.map { /* ... */ }
.filter { /* ... */ }
.collect { /* ... perform some side effect ... */ }
}
It does however require an extra dependency; kotlinx.coroutines
is not in the stdlib.
Solution 9 - Parallel Processing
This solution assumes that your project is using coroutines:
implementation( "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.2")
The functions called parallelTransform
don't retain the order of elements and return a Flow<R>
, while the function parallelMap
retains the order and returns a List<R>
.
Create a threadpool for multiple invocations:
val numberOfCores = Runtime.getRuntime().availableProcessors()
val executorDispatcher: ExecutorCoroutineDispatcher =
Executors.newFixedThreadPool(numberOfCores ).asCoroutineDispatcher()
use that dispatcher (and call close()
when it's no longer needed):
inline fun <T, R> Iterable<T>.parallelTransform(
dispatcher: ExecutorDispatcher,
crossinline transform: (T) -> R
): Flow<R> = channelFlow {
val items: Iterable<T> = this@parallelTransform
val channelFlowScope: ProducerScope<R> = this@channelFlow
launch(dispatcher) {
items.forEach {item ->
launch {
channelFlowScope.send(transform(item))
}
}
}
}
If threadpool reuse is of no concern (threadpools aren't cheap), you can use this version:
inline fun <T, R> Iterable<T>.parallelTransform(
numberOfThreads: Int,
crossinline transform: (T) -> R
): Flow<R> = channelFlow {
val items: Iterable<T> = this@parallelTransform
val channelFlowScope: ProducerScope<R> = this@channelFlow
Executors.newFixedThreadPool(numberOfThreads).asCoroutineDispatcher().use { dispatcher ->
launch( dispatcher ) {
items.forEach { item ->
launch {
channelFlowScope.send(transform(item))
}
}
}
}
}
if you need a version that retains the order of elements:
inline fun <T, R> Iterable<T>.parallelMap(
dispatcher: ExecutorDispatcher,
crossinline transform: (T) -> R
): List<R> = runBlocking {
val items: Iterable<T> = this@parallelMap
val result = ConcurrentSkipListMap<Int, R>()
launch(dispatcher) {
items.withIndex().forEach {(index, item) ->
launch {
result[index] = transform(item)
}
}
}
// ConcurrentSkipListMap is a SortedMap
// so the values will be in the right order
result.values.toList()
}
Solution 10 - Parallel Processing
I found this:
implementation 'com.github.cvb941:kotlin-parallel-operations:1.3'
details:
Solution 11 - Parallel Processing
I've come up with a couple of extension functions:
-
The
suspend
extension function onIterable<T>
type, which does a parallel processing of items and returns some result of processing each item. By default it usesDispatchers.IO
dispatcher to offload blocking tasks to a shared pool of threads. Must be called from a coroutine (including a coroutine withDispatchers.Main
dispatcher) or anothersuspend
function.suspend fun <T, R> Iterable<T>.processInParallel( dispatcher: CoroutineDispatcher = Dispatchers.IO, processBlock: suspend (v: T) -> R, ): List<R> = coroutineScope { // or supervisorScope map { async(dispatcher) { processBlock(it) } }.awaitAll() }
Example of calling from a coroutine:
val collection = listOf("A", "B", "C", "D", "E") someCoroutineScope.launch { val results = collection.processInParallel { process(it) } // use processing results }
where someCoroutineScope
is an instance of CoroutineScope
.
-
Launch and forget extension function on
CoroutineScope
, which doesn't return any result. It also usesDispatchers.IO
dispatcher by default. Can be called usingCoroutineScope
or from another coroutine.fun <T> CoroutineScope.processInParallelAndForget( iterable: Iterable<T>, dispatcher: CoroutineDispatcher = Dispatchers.IO, processBlock: suspend (v: T) -> Unit ) = iterable.forEach { launch(dispatcher) { processBlock(it) } }
Example of calling:
someoroutineScope.processInParallelAndForget(collection) { process(it) } // OR from another coroutine: someCoroutineScope.launch { processInParallelAndForget(collection) { process(it) } }
2a. Launch and forget extension function on Iterable<T>
. It's almost the same as previous, but the extension type is different. CoroutineScope
must be passed as argument to the function.
fun <T> Iterable<T>.processInParallelAndForget(
scope: CoroutineScope,
dispatcher: CoroutineDispatcher = Dispatchers.IO,
processBlock: suspend (v: T) -> Unit
) = forEach {
scope.launch(dispatcher) { processBlock(it) }
}
Calling:
collection.processInParallelAndForget(someCoroutineScope) {
process(it)
}
// OR from another coroutine:
someScope.launch {
collection.processInParallelAndForget(this) {
process(it)
}
}