RxJS: How to not subscribe to initial value and/or undefined?
JavascriptRxjsJavascript Problem Overview
Being new to RxJS I often create a subject which holds values in the future, but is initially undefined
. It can only be undefined
the first time. I currently use a filter
to skip undefined
values, but this is quite cumbersome as I do it everywhere as I need only once. (Maybe I do something wrong here?) Can I somehow subscribe to mySubject
only after it got its first value via onNext
?
var mySubject = new Rx.BehaviorSubject(undefined);
mySubject.filter(function(value) {
return value !== undefined;
}).subscribe(function(value) {
// do something with the value
});
Javascript Solutions
Solution 1 - Javascript
Use new Rx.ReplaySubject(1)
instead of BehaviorSubject
.
Solution 2 - Javascript
As mentioned by [Will][1] you should be able to just skip the first value by using the skip operator:
var mySubject = new Rx.BehaviorSubject(undefined);
mySubject.pipe(skip(1)).subscribe(function(value) {
// do something with the value
});
Solution 3 - Javascript
mySubject.pipe( skipWhile( v => !v ) );
Solution 4 - Javascript
For now I am using the filter
operator, but I don't know if it's a good solution:
var mySubject = new Rx.BehaviorSubject().filter(x => !!x);
mySubject.subscribe(value => { /* will receive value from below */);
mySubject.next('value');
mySubject.subscribe(value => { /* also receives the value */ });
Solution 5 - Javascript
I have found this frustrating in both RxJS and RxSwift. (Wanting a value subject combined with the ability to wait on the first value).
For JS I am currently just tucking away a filtered version in the subject, something like this:
let mySubject = new Rx.BehaviorSubject();
mySubject.wait = mySubject.pipe(filter(v=>v!==undefined));
So the subject is still exposed for publishing but clients don't have to repeat the filter.
mySubject.wait.subscribe((v)=>{...});
Solution 6 - Javascript
Sometimes there will be a need for behaviourSubject, where the initial value does not matter and the current value is needed asynchronously while working inside a stream, Just in our case multiple chain promises are handled with user cancel while processing or during fetching the data from anywhere inside the stream.
This can be achieved using the following way.
// for user related commands
this.commandSource = new BehaviorSubject(CONTINUE);
// filtering over initial value which is continue to make it as a different pipe
const stopPipe = commandSource.pipe(filter(val => val === STOP));
const fetchStream = Observable.fromPromise(this.fetchDetails);
merge(fetchStream, stopPipe).pipe(
take(1),
takeWhile(() => commandSource.value === CONTINUE),
concatMap((response) => {
// fetch Another response you can return promise directly in concatMap
// return array of response [1 ,2 ,3];
return this.fetchYetAnotherDetails;
}),
// we can add this to stop stream in multiple places while processing the response
takeWhile(() => commandSource.value === CONTINUE),
// triggers parallelly values from the concatMap that is 1, 2 , 3
mergeMap(() => // massage the response parallelly using )
finalize(() => thi
commandSource.complete())
).subscribe(res => {
// handle each response 1, 2, 3 mapped
}, () => {
// handle error
}, () => {
// handle complete of the stream
});
// when user, clicks cancel, this should stop the stream.
commandSource.next(STOP)