Multiple subscriptions to Observable
JavascriptRxjsJavascript Problem Overview
I create my own Observable
and subscribed two functions to it. I would expect to have both functions executed for each element in the sequence but only the last one is.
let observer = null
const notificationArrayStream = Rx.Observable.create(function (obs) {
observer = obs;
return () => {}
})
function trigger(something) {
observer.next(something)
}
notificationArrayStream.subscribe((x) => console.log('a: ' + x))
notificationArrayStream.subscribe((x) => console.log('b: ' + x))
trigger('TEST')
Expected output
a: TEST
b: TEST
Actual output
b: TEST
Here's the JSBin: http://jsbin.com/cahoyey/edit?js,console
Why is that? How can I have multiple functions subscribed to a single Observable
?
Javascript Solutions
Solution 1 - Javascript
Subject
In your case, you could simply use a Subject
. A [subject][1] allows you to share a single execution with multiple observers when using it as a proxy for a group of subscribers and a source.
In essence, here's your example using a subject:
const subject = new Subject();
function trigger(something) {
subject.next(something);
}
subject.subscribe((x) => console.log('a: ' + x));
subject.subscribe((x) => console.log('b: ' + x));
trigger('TEST');
Result:
a: TEST
b: TEST
Pitfall: Observers arriving too late
Note that the timing of when you subscribe and when you broadcast the data is relevant. If you send a broadcast before subscribing, you're not getting notified by this broadcast:
function trigger(something) {
subject.next(something);
}
trigger('TEST');
subject.subscribe((x) => console.log('a: ' + x));
subject.subscribe((x) => console.log('b: ' + x));
Result: (empty)
ReplaySubject & BehaviorSubject
If you want to ensure that even future subscribers get notified, you can use a [ReplaySubject][2] or a [BehaviorSubject][3] instead.
Here's an example using a ReplaySubject
(with a cache-size of 5, meaning up to 5 values from the past will be remembered, as opposed to a BehaviorSubject which can remember only the last value):
const subject = new ReplaySubject(5); // buffer size is 5
function trigger(something) {
subject.next(something);
}
trigger('TEST');
subject.subscribe((x) => console.log('a: ' + x));
subject.subscribe((x) => console.log('b: ' + x));
Result:
a: TEST
b: TEST
[1]: https://github.com/Reactive-Extensions/RxJS/blob/master/doc/gettingstarted/subjects.md "Subject" [2]: https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/subjects/replaysubject.md "ReplaySubject" [3]: https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/subjects/behaviorsubject.md
Solution 2 - Javascript
To have multiple functions subscribe to a single Observable, just subscribe them to that observable, it is that simple. And actually that's what you did.
BUT your code does not work because after notificationArrayStream.subscribe((x) => console.log('b: ' + x))
is executed, observer
is (x) => console.log('b: ' + x))
, so observer.next
will give you b: TEST
.
So basically it is your observable creation which is wrong. In create
you passed an observer as parameter so you can pass it values. Those values you need to generate somehow through your own logic, but as you can see your logic here is erroneous. I would recommend you use a subject if you want to push values to the observer.
Something like:
const notificationArrayStream = Rx.Observable.create(function (obs) {
mySubject.subscribe(obs);
return () => {}
})
function trigger(something) {
mySubject.next(something)
}
Solution 3 - Javascript
Every time you subscribe, you are overriding the var observer.
The trigger function only reference this one var, hence no surprise there is only one log.
If we make the var an array it works as intended: JS Bin
let obs = [];
let foo = Rx.Observable.create(function (observer) {
obs.push(observer);
});
function trigger(sth){
// console.log('trigger fn');
obs.forEach(ob => ob.next(sth));
}
foo.subscribe(function (x) {
console.log(`a:${x}`);
});
foo.subscribe(function (y) {
console.log(`b:${y}`);
});
trigger(1);
trigger(2);
trigger(3);
trigger(4);
A cleaner solution would be to use Subject, as suggested above.
Solution 4 - Javascript
Observables are not multicasting; unless you use any kind of Subject
. You can of course create Subject, pipe the Observable
output into like other answers propose.
However if you already have an Observalbe
, it is way more convenient to use share()
that turns Observable
into Subject
or shareReplay(n)
which would be equivalent for ReplaySubject(n)
:
import {share} from 'rxjs/operators';
let observer = null
const notificationArrayStream = new Observable(obs => {
observer = obs;
}).pipe(share());
function trigger(something) {
observer.next(something)
}
notificationArrayStream.subscribe((x) => console.log('a: ' + x))
notificationArrayStream.subscribe((x) => console.log('b: ' + x))
trigger('TEST')
That's pretty much it.
Solution 5 - Javascript
You can build wrapper class Subscribable<> based on ReplaySubject. It would be cleaner than managing Subject and Observable:
export class Subscribable<T> {
private valueSource: Subject = new ReplaySubject(1);
public value: Observable;
private _value: T;
constructor() {
this.value = this.valueSource.asObservable();
}
public set(val: T) {
this.valueSource.next(val);
this._value = val;
}
public get(): T {
return this._value;
}
}
Usage:
let arrayStream : Subscribable<TYPE> = new Subscribable<TYPE>();
…
public setArrayStream (value: TYPE) {
this.set(value);
}
Handle value change:
arrayStream.value.subscribe(res => { /*handle it*/ });
Original article: http://devinstance.net/articles/20170921/rxjs-subscribable
Solution 6 - Javascript
Instead of using a Subject, it is also possible to use the publishReplay() + refCount() combo to allow an observable to multicast to multiple subscribers:
const notificationArrayStream = Rx.Observable.create(function (obs) {
observer = obs;
return () => {}
}).pipe(publishReplay(), refCount())
Solution 7 - Javascript
const subs = []
const ob = new Observable((s) => {
console.log('called')
subs.push(s)
})
const trigger = (v) => {
subs.forEach((sub) => {
sub.next(v)
})
}
ob.subscribe((v) => {
console.log('ob1', v)
})
ob.subscribe((v) => {
console.log('ob2', v)
})
trigger(1)
Change your code into something like this, and it will work. The point here is that each subscription is updated through its corresponding subscriber, if you have multiple subscriptions, you have to notify multiple subscribers. And in your case, you just notified the last one.