What is the difference between concatMap and flatMap in RxJava

JavaRx JavaFlatmapConcatmap

Java Problem Overview


It seems that these 2 functions are pretty similar. They have same signature (accepting rx.functions.Func1<? super T, ? extends Observable<? extends R>> func), and their marble diagrams look exactly same. Can't paste the pics here, but here's one for [concatMap][1], and here's one for [flatMap][2]. There seems to be some subtle difference in the description of resulting Observable, where one produced by concatMap contains items that result from concatinating resulting Observables, and the one produced by flatMap contains items that result from first merging the resulting Observables, and emitting the result of that merger.

However, this subtlety is totally unclear to me. Can anyone give a better explanation of this difference, and ideally give some examples illustrating this difference.

[1]: http://netflix.github.io/RxJava/javadoc/rx/Observable.html#concatMap%28rx.functions.Func1%29 "concatMap" [2]: http://netflix.github.io/RxJava/javadoc/rx/Observable.html#flatMap%28rx.functions.Func1%29 "flatMap"

Java Solutions


Solution 1 - Java


As you wrote, the two functions are very similar and the subtle difference is how the output is created ( after the mapping function is applied).

Flat map uses merge operator while concatMap uses concat operator.

As you see the concatMap output sequence is ordered - all of the items emitted by the first Observable being emitted before any of the items emitted by the second Observable,
while flatMap output sequence is merged - the items emitted by the merged Observable may appear in any order, regardless of which source Observable they came from.

Solution 2 - Java

Even though the answers here are good it wasn't easy to spot the difference without an example. So, I created a simple example for this:

@Test
public void flatMapVsConcatMap() throws Exception {
    System.out.println("******** Using flatMap() *********");
    Observable.range(1, 15)
            .flatMap(item -> Observable.just(item).delay(1, TimeUnit.MILLISECONDS))
            .subscribe(x -> System.out.print(x + " "));

    Thread.sleep(100);

    System.out.println("\n******** Using concatMap() *********");
    Observable.range(1, 15)
            .concatMap(item -> Observable.just(item).delay(1, TimeUnit.MILLISECONDS))
            .subscribe(x -> System.out.print(x + " "));

    Thread.sleep(100);
}

> ******** Using flatMap() *********

> 1 2 3 4 5 6 7 9 8 11 13 15 10 12 14

> ******** Using concatMap() *********

> 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15

As it could be seen from the output, the results for flatMap are unordered while for concatMap they're.

Solution 3 - Java

One very important difference: the concatMap waits for the current emitted observable to complete and flatMap doesn't. flatMap tries to start as many possible. Simply said - you cannot concatenate something infinite. Just make sure that the observables you emit in concatMap can complete, otherwise the whole flow will get stuck waiting for the current observable to complete to concatenate the next one.

Solution 4 - Java

I find example in most upvoted answer not much clear, so i post one that helped me to understand difference between flatMap and concatMap.

FlatMap takes emissions from source observable, then create new observable and merge it to original chain, while concatMap concat it to original chain.

Main difference is that concatMap() will merge each mapped Observable sequentially and fire it one at a time. It will only move to the next Observable when the current one calls onComplete().

Here is flatMap example:

private void flatMapVsConcatMap() throws InterruptedException {
    Observable.just(5, 2, 4, 1)
            .flatMap(
                    second ->
                            Observable.just("Emit delayed with " + second + " second")
                                    .delay(second, TimeUnit.SECONDS)
            )
            .subscribe(
                    System.out::println,
                    Throwable::printStackTrace
            );

    Thread.sleep(15_000);
}

Will result in :

> Emit delayed with 1 second
> Emit delayed with 2 second
> Emit delayed with 4 second
> Emit delayed with 5 second

Here is concatMap example:

private void flatMapVsConcatMap() throws InterruptedException {
    Observable.just(5, 2, 4, 1)
            .concatMap(
                    second ->
                            Observable.just("Emit delayed with " + second + " second")
                                    .delay(second, TimeUnit.SECONDS)
            )
            .subscribe(
                    System.out::println,
                    Throwable::printStackTrace
            );

    Thread.sleep(15_000);
}

Will result in :

> Emit delayed with 5 second
> Emit delayed with 2 second
> Emit delayed with 4 second
> Emit delayed with 1 second

Note to use Thread.sleep() because delay() operates by default on the computation Scheduler

Solution 5 - Java

First of all, flatMap is same as mergeMap in Rxjs. So that is one confusion less. So there are two observables..

  1. o1: A simple list of items from(['Kitty','Donald','Batman'])

  2. process_o1(): process_o1() is a function that takes as one parameter 'item' and does something with it and returns an Observable which on completion emits 'done with [item]'.

    o1.pipe(mergeMap(item => process_o1(item))).subscribe(data => { console.log(data); });

Here we are going to see:- done with Kity.

done with Donald.

done with Batman.

without any guarantee that Kitty comes before Donald and Donald comes before Batman. This is because, as soon as the outer observable emits an item the inner observable is subscribed.

=== But in case of concatMap:-

o1.pipe(concatMap(item => process_o1(item))).subscribe(data => {
console.log(data);
});

We have the guarantee of the below sequence:-

done with Kity.

done with Donald.

done with Batman.

Because, with concatMap operator, the inner Observable is not subscribed before the previous inner Observable returns.

The outer observable is free to just go ahead and emit all its values, but the concatMap will make sure that it deals with each of those values one by one and maintains the order. Hence the name concatMap.

In crux, if you are keen on maintaining the order of doing things, you should use concatMap. But if you dont care about order, you can go ahead with mergeMap which will subscribe to all inner Observables at once and keep emitting values as and when they return.

Solution 6 - Java

Others already pointed out the answer but in case it is not too evident, there is a risk of creating undesired parallelism with flatMap, if this is undesired you can use concatMap, or the overload flatMap(Function<? super T,? extends Publisher<? extends V>> mapper, int concurrency)

Solution 7 - Java

flatMap vs concatMap

flatMap - merge - if new item is emitted it has a priority

concatMap - concatenate - add into the end - emit full sequence and only after that(previous was finished) is able to emit next sequence

https://i.stack.imgur.com/Q2Hwc.png" height="300"/>

[map vs flatMap]

Attributions

All content for this solution is sourced from the original question on Stackoverflow.

The content on this page is licensed under the Attribution-ShareAlike 4.0 International (CC BY-SA 4.0) license.

Content TypeOriginal AuthorOriginal Content on Stackoverflow
QuestionHaspemulatorView Question on Stackoverflow
Solution 1 - JavaMarek HawrylczakView Answer on Stackoverflow
Solution 2 - JavaAnatoliiView Answer on Stackoverflow
Solution 3 - JavaWindRiderView Answer on Stackoverflow
Solution 4 - JavaEfimov AleksandrView Answer on Stackoverflow
Solution 5 - JavaAshish MishraView Answer on Stackoverflow
Solution 6 - JavaDaniel ArechigaView Answer on Stackoverflow
Solution 7 - JavayoAlex5View Answer on Stackoverflow