In RxJava, how to pass a variable along when chaining observables?

JavaRx JavaReactive ProgrammingFlatmap

Java Problem Overview


I am chaining async operations using RxJava, and I'd like to pass some variable downstream:

Observable
   .from(modifications)
   .flatmap( (data1) -> { return op1(data1); })
   ...
   .flatmap( (data2) -> { 
       // How to access data1 here ?
       return op2(data2);
   })

It seems like a common pattern but I couldn't find information about it.

Java Solutions


Solution 1 - Java

The advice I got from the Couchbase forum is to use nested observables:

Observable
    .from(modifications)
    .flatmap( (data1) -> { 
        return op1(data1)
            ...
            .flatmap( (data2) -> { 
                // I can access data1 here
                return op2(data2);
            })
        });

EDIT: I'll mark this as the accepted answer as it seems to be the most recommended. If your processing is too complex to nest everything you can also check the solution with function calls.

Solution 2 - Java

Another possibility is to map the result of op1 to a org.apache.commons.lang3.tuple.Pair that contains the variable and pass that along:

Observable
   .from(modifications)
   .flatmap( (data1) -> {
       return op1(data1).map( obj -> { return Pair.of(data1,obj); });
   })
   ...
   .flatmap( (dataPair) -> { 
       // data1 is dataPair.getLeft()
       return op2(dataPair.getRight());
   })

It works but it feels a bit uncomfortable to have variables hidden inside a Pair/Triple/... and it gets very verbose if you use the Java 6 notation.

I wonder if there is a better solution, maybe some RxJava operator could help?

Solution 3 - Java

flatmap can take a second arg:

Observable.just("foo")
                .flatMap(foo -> Observable.range(1, 5), Pair::of)
                .subscribe(pair -> System.out.println("Result: " + pair.getFirst() + " Foo: " + pair.getSecond()));

source: https://medium.com/rxjava-tidbits/rxjava-tidbits-1-use-flatmap-and-retain-original-source-value-4ec6a2de52d4

Solution 4 - Java

One possibility would be to use a function call:

private static Observable<T> myFunc(final Object data1) {
    return op1(data1)
        ...
        .flatmap( (data2) -> { 
            // I can access data1 here
            return op2(data2);
        });
}

Observable
   .from(modifications)
   .flatmap( (data1) -> { return myFunc(data1); })

BUT: correct me if I'm wrong but it doesn't feel like the reactive-programming way of doing it

Solution 5 - Java

Actually we have library, that simplify call chains.

https://github.com/pakoito/Komprehensions

Adding as Gradle dependency:

implementation 'io.reactivex.rxjava2:rxjava:2.2.1'
implementation 'com.github.pakoito.Komprehensions:komprehensions-rx2:1.3.2'

Usage (Kotlin):

val observable = doFlatMap(
    { Observable.from(modifications) },
    { data1 -> op1(data1) },
    { data1, data2 -> op2(data2) },
    { data1, data2, data3 -> op3(data1, data2, data3) }
)

Solution 6 - Java

I know this is an old question, but using RxJava2 & lambda, You can use something like:

Observable
.from(modifications)
.flatMap((Function<Data1, ObservableSource<Data2>>) data1 -> {
                        //Get data 2 obeservable
                        
                            return Observable.just(new Data2())
                        }
                    }, Pair::of)

On the next flow (flatmap/map) your output pair will be (data1, data2)

Solution 7 - Java

solution on this thread works, but for complex chains it makes code difficult to read, I had to pass multiple values and what i did was create a private class with all parameters, I find code to be more readable this way,

private class CommonData{
   private string data1;
   private string data2;

   *getters and setters*
}
...
final CommonData data = new CommonData();
Observable
   .from(modifications)
   .flatmap( (data1) -> { 
       data.setData1(data1);
       return op1(data1); 
   })
   ...
   .flatmap( (data2) -> { 
       data2 = data.getData1() + "data 2... ";
       data.setData2(data2);
       return op2(data2);
   })

hope it helps

Solution 8 - Java

You can use "global" variable to achive this:

 Object[] data1Wrapper = new Object[]{null};
 Object[] data2Wrapper = new Object[]{null};
 Observable
    .from(modifications)
    .flatmap(data1 -> {
        data1Wrapper[0] = data1;
        return op1(data1)
     })
      ...
    .flatmap(data2 -> { 
        // I can access data1 here use data1Wrapper[0]
        Object data1 = data1Wrapper[0];
        data2Wrapper[0] = data2;
        return op2(data2);
     })

Attributions

All content for this solution is sourced from the original question on Stackoverflow.

The content on this page is licensed under the Attribution-ShareAlike 4.0 International (CC BY-SA 4.0) license.

Content TypeOriginal AuthorOriginal Content on Stackoverflow
QuestionJulian GoView Question on Stackoverflow
Solution 1 - JavaJulian GoView Answer on Stackoverflow
Solution 2 - JavaJulian GoView Answer on Stackoverflow
Solution 3 - JavaSisyphusView Answer on Stackoverflow
Solution 4 - JavaJulian GoView Answer on Stackoverflow
Solution 5 - JavaAndrewView Answer on Stackoverflow
Solution 6 - JavaNdivhuwoView Answer on Stackoverflow
Solution 7 - JavajosesueroView Answer on Stackoverflow
Solution 8 - JavahappyyangyuanView Answer on Stackoverflow