Concat VS Merge operator

JavaJava 8Rx Java

Java Problem Overview


I was checking the documentation of RXJava and I notice that concat and merge operators seems do the same. I wrote a couple test to be sure.

@Test
public void testContact() {

    Observable.concat(Observable.just("Hello"),
                      Observable.just("reactive"),
                      Observable.just("world"))
              .subscribe(System.out::println);
}

@Test
public void testMerge() {

    Observable.merge(Observable.just("Hello"),
                      Observable.just("reactive"),
                      Observable.just("world"))
            .subscribe(System.out::println);
}

The documentation says

> The Merge operator is also similar. It combines the emissions of two or more Observables, but may interleave them, whereas Concat never interleaves the emissions from multiple Observables.

But still I don't full understand, running this test thousand of times the merge result always is the same. Since the order is not granted I was expecting sometimes "reactive" "world" "hello" for example.

The code is here https://github.com/politrons/reactive

Java Solutions


Solution 1 - Java

It is as described in documentation you have quoted - merge can interleave the outputs, while concat will first wait for earlier streams to finish before processing later streams. In your case, with single-element, static streams, it is not making any real difference (but in theory, merge could output words in random order and still be valid according to spec). If you want to see the difference, try following (you will need to add some sleep afterwards to avoid early exit)

	Observable.merge(
			Observable.interval(1, TimeUnit.SECONDS).map(id -> "A" + id),
			Observable.interval(1, TimeUnit.SECONDS).map(id -> "B" + id))
	.subscribe(System.out::println);

> A0 B0 A1 B1 B2 A2 B3 A3 B4 A4

versus

	Observable.concat(
			Observable.interval(1, TimeUnit.SECONDS).map(id -> "A" + id),
			Observable.interval(1, TimeUnit.SECONDS).map(id -> "B" + id))
	.subscribe(System.out::println);

> A0 A1 A2 A3 A4 A5 A6 A7 A8

Concat will never start printing B, because stream A never finishes.

s/stream/observable/g ;)

Documentation gives nice graphs to show the difference. You need to remember that merge gives no guarantee of interleaving items one by one, it is just an one of possible examples.

Concat

[![Concat operator][2]][2] [2]: https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concat.png

Merge

[![Merge operator][1]][1] [1]: https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/merge.png

Solution 2 - Java

Concat

Concat emits the emissions from two or more Observables without interleaving them. It will maintain the order of the observables while emitting the items. It means that it will emit all the items of the first observable and then it will emit all the items of the second observable and so on.

Concat Operator

Let's understand it clearly by an example.

final String[] listFirst = {"A1", "A2", "A3", "A4"};
final String[] listSecond = {"B1", "B2", "B3"};

final Observable<String> observableFirst = Observable.fromArray(listFirst);
final Observable<String> observableSecond = Observable.fromArray(listSecond);

Observable.concat(observableFirst, observableSecond)
        .subscribe(new Observer<String>() {

            @Override
            public void onSubscribe(Disposable d) {
                
            }

            @Override
            public void onNext(String value) {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

As we are using Concat Operator, it will maintain the order and emit the values as A1, A2, A3, A4, B1, B2, B3.

Merge

Merge combines multiple Observables into one by merging their emissions. It will not maintain the order while emitting the items.

Merge Operator

Let's understand it clearly by an example.

final String[] listFirst = {"A1", "A2", "A3", "A4"};
final String[] listSecond = {"B1", "B2", "B3"};

final Observable<String> observableFirst = Observable.fromArray(listFirst);
final Observable<String> observableSecond = Observable.fromArray(listSecond);

Observable.merge(observableFirst, observableSecond)
        .subscribe(new Observer<String>() {

            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(String value) {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

As we are using Merge Operator, it will not maintain the order and can emit the values in any order such as A1, B1, A2, A3, B2, B3, A4 or A1, A2, B1, B2, A3, A4, B3 or can be anything.

This is how we should use the Concat and the Merge operators in RxJava depending on our use-case.

Solution 3 - Java

If both sources are synchronous as OP stated they return the exact order by both merge and concat.

But when sources are not synchronous, for instance two source like database and a socket for instance push data with random or undetermined delays the outcome or order of data is different for merge and concat.

fun <T> Observable<T>.getAsyncItems(): Observable<T> {

    return concatMap {

        val delay = Random().nextInt(10)

        Observable.just(it)
                .delay(delay.toLong(), TimeUnit.MILLISECONDS)
    }
}

This extension function above adds delay to each emission randomly, but they are always emitted in same order. If data is A, B, C, D, and E, then output is always A, B, C, D, and E.

Now let's compare how merge and concat behave when sources are async.

val source1 =
        Observable.just("A", "B", "C", "D", "E").getAsyncItems()
val source2 =
        Observable.just(1, 2, 3, 4, 5).getAsyncItems()

Observable.merge(source1, source2).subscribe {print("$it-")}

Prints something like this A-B-1-C-2-D-3-4-E-5- or A-1-2-3-B-C-D-4-E-5- it depends on random delay.

Observable.concat(source1, source2).subscribe {print("$it-")}

Prints A-B-C-D-E-1-2-3-4-5- no matter how many times it runs.

Attributions

All content for this solution is sourced from the original question on Stackoverflow.

The content on this page is licensed under the Attribution-ShareAlike 4.0 International (CC BY-SA 4.0) license.

Content TypeOriginal AuthorOriginal Content on Stackoverflow
QuestionpaulView Question on Stackoverflow
Solution 1 - JavaArtur BiesiadowskiView Answer on Stackoverflow
Solution 2 - JavaAmit ShekharView Answer on Stackoverflow
Solution 3 - JavaThracianView Answer on Stackoverflow