Multiple subscriptions to Observable

JavascriptRxjs

Javascript Problem Overview


I create my own Observable and subscribed two functions to it. I would expect to have both functions executed for each element in the sequence but only the last one is.

let observer = null
const notificationArrayStream = Rx.Observable.create(function (obs) {
  observer = obs;
  return () => {}
})

function trigger(something) {
  observer.next(something)
}

notificationArrayStream.subscribe((x) => console.log('a: ' + x))
notificationArrayStream.subscribe((x) => console.log('b: ' + x))

trigger('TEST')

Expected output

a: TEST
b: TEST

Actual output

b: TEST

Here's the JSBin: http://jsbin.com/cahoyey/edit?js,console

Why is that? How can I have multiple functions subscribed to a single Observable?

Javascript Solutions


Solution 1 - Javascript

Subject

In your case, you could simply use a Subject. A [subject][1] allows you to share a single execution with multiple observers when using it as a proxy for a group of subscribers and a source.

In essence, here's your example using a subject:

const subject = new Subject();

function trigger(something) {
    subject.next(something);
}

subject.subscribe((x) => console.log('a: ' + x));
subject.subscribe((x) => console.log('b: ' + x));

trigger('TEST');

Result:

a: TEST
b: TEST

Pitfall: Observers arriving too late

Note that the timing of when you subscribe and when you broadcast the data is relevant. If you send a broadcast before subscribing, you're not getting notified by this broadcast:

function trigger(something) {
    subject.next(something);
}

trigger('TEST');

subject.subscribe((x) => console.log('a: ' + x));
subject.subscribe((x) => console.log('b: ' + x));

Result: (empty)


ReplaySubject & BehaviorSubject

If you want to ensure that even future subscribers get notified, you can use a [ReplaySubject][2] or a [BehaviorSubject][3] instead.

Here's an example using a ReplaySubject (with a cache-size of 5, meaning up to 5 values from the past will be remembered, as opposed to a BehaviorSubject which can remember only the last value):

const subject = new ReplaySubject(5); // buffer size is 5

function trigger(something) {
    subject.next(something);
}

trigger('TEST');

subject.subscribe((x) => console.log('a: ' + x));
subject.subscribe((x) => console.log('b: ' + x));

Result:

a: TEST
b: TEST

[1]: https://github.com/Reactive-Extensions/RxJS/blob/master/doc/gettingstarted/subjects.md "Subject" [2]: https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/subjects/replaysubject.md "ReplaySubject" [3]: https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/subjects/behaviorsubject.md

Solution 2 - Javascript

To have multiple functions subscribe to a single Observable, just subscribe them to that observable, it is that simple. And actually that's what you did.

BUT your code does not work because after notificationArrayStream.subscribe((x) => console.log('b: ' + x)) is executed, observer is (x) => console.log('b: ' + x)), so observer.next will give you b: TEST.

So basically it is your observable creation which is wrong. In create you passed an observer as parameter so you can pass it values. Those values you need to generate somehow through your own logic, but as you can see your logic here is erroneous. I would recommend you use a subject if you want to push values to the observer.

Something like:

const notificationArrayStream = Rx.Observable.create(function (obs) {
  mySubject.subscribe(obs);
  return () => {}
})

function trigger(something) {
  mySubject.next(something)
}

Solution 3 - Javascript

Every time you subscribe, you are overriding the var observer.

The trigger function only reference this one var, hence no surprise there is only one log.

If we make the var an array it works as intended: JS Bin

let obs = [];

let foo = Rx.Observable.create(function (observer) {
  obs.push(observer);
});

function trigger(sth){
//   console.log('trigger fn');
  obs.forEach(ob => ob.next(sth));
}

foo.subscribe(function (x) {
  console.log(`a:${x}`);
});
foo.subscribe(function (y) {
  console.log(`b:${y}`);
});

trigger(1);
trigger(2);
trigger(3);
trigger(4);

A cleaner solution would be to use Subject, as suggested above.

Solution 4 - Javascript

Observables are not multicasting; unless you use any kind of Subject. You can of course create Subject, pipe the Observable output into like other answers propose.

However if you already have an Observalbe, it is way more convenient to use share() that turns Observable into Subject or shareReplay(n) which would be equivalent for ReplaySubject(n):

import {share} from 'rxjs/operators';

let observer = null

const notificationArrayStream = new Observable(obs => {
  observer = obs;
}).pipe(share());

function trigger(something) {
  observer.next(something)
}

notificationArrayStream.subscribe((x) => console.log('a: ' + x))
notificationArrayStream.subscribe((x) => console.log('b: ' + x))

trigger('TEST')

That's pretty much it.

Solution 5 - Javascript

You can build wrapper class Subscribable<> based on ReplaySubject. It would be cleaner than managing Subject and Observable:

export class Subscribable<T> {

    private valueSource: Subject = new ReplaySubject(1);
    public value: Observable;
    private _value: T;

    constructor() {
        this.value = this.valueSource.asObservable();
    }

    public set(val: T) {
        this.valueSource.next(val);
        this._value = val;
    }

    public get(): T {
        return this._value;
    }
}

Usage:

let arrayStream : Subscribable<TYPE> = new Subscribable<TYPE>();

…
public setArrayStream (value: TYPE) {
	this.set(value);
}

Handle value change:

arrayStream.value.subscribe(res => { /*handle it*/ });

Original article: http://devinstance.net/articles/20170921/rxjs-subscribable

Solution 6 - Javascript

Instead of using a Subject, it is also possible to use the publishReplay() + refCount() combo to allow an observable to multicast to multiple subscribers:

const notificationArrayStream = Rx.Observable.create(function (obs) {
  observer = obs;
  return () => {}
}).pipe(publishReplay(), refCount())

Solution 7 - Javascript

const subs = []

const ob = new Observable((s) => {
  console.log('called')
  subs.push(s)
})

const trigger = (v) => {
  subs.forEach((sub) => {
    sub.next(v)
  })
}

ob.subscribe((v) => {
  console.log('ob1', v)
})

ob.subscribe((v) => {
  console.log('ob2', v)
})

trigger(1)

Change your code into something like this, and it will work. The point here is that each subscription is updated through its corresponding subscriber, if you have multiple subscriptions, you have to notify multiple subscribers. And in your case, you just notified the last one.

Attributions

All content for this solution is sourced from the original question on Stackoverflow.

The content on this page is licensed under the Attribution-ShareAlike 4.0 International (CC BY-SA 4.0) license.

Content TypeOriginal AuthorOriginal Content on Stackoverflow
QuestioncgrossView Question on Stackoverflow
Solution 1 - JavascriptMobiletainmentView Answer on Stackoverflow
Solution 2 - Javascriptuser3743222View Answer on Stackoverflow
Solution 3 - JavascriptLuca BertolasiView Answer on Stackoverflow
Solution 4 - JavascriptsmnbbrvView Answer on Stackoverflow
Solution 5 - JavascriptyfranzView Answer on Stackoverflow
Solution 6 - JavascriptgolfadasView Answer on Stackoverflow
Solution 7 - JavascriptLiangView Answer on Stackoverflow