Angular 2 Observable with multiple subscribers

AngularRxjs

Angular Problem Overview


I have an angular 2 service that fetch data from an API this service has 3 subscribers (defined in Components) each doing something else with the data (different graphs)

I'm noticing that I'm doing three GET requests to the API while what i want to achieve is one request and that the subscribers will share the data I've looks into HOT and COLD observable and tried the .share() on the observable but I'm still doing 3 individual calls

Update, adding code

> Service

import { Injectable } from '@angular/core';
import { Http, Response } from '@angular/http';

import {Observable} from 'rxjs/Rx';

// Import RxJs required methods
import 'rxjs/add/operator/map';
import 'rxjs/add/operator/catch';

import { StationCompliance } from './model/StationCompliance';


@Injectable()
export class StationComplianceService {

  private url = '/api/read/stations';

  constructor(private http : Http) {
    console.log('Started Station compliance service');
   }

   getStationCompliance() : Observable<StationCompliance []> {
     return this.http.get(this.url)
      .map((res:Response) => res.json())
      .catch((error:any) => Observable.throw(error.json().error || 'Server Error'));
   }
}

> Component 1

import { Component, OnInit } from '@angular/core';
import { CHART_DIRECTIVES } from 'angular2-highcharts';

import { StationComplianceService } from '../station-compliance.service';


@Component({
  selector: 'app-up-down-graph',
  templateUrl: './up-down-graph.component.html',
  styleUrls: ['./up-down-graph.component.css']
})
export class UpDownGraphComponent implements OnInit {

  graphData;

  errorMessage: string;

  options;

  constructor(private stationService : StationComplianceService) { }

  ngOnInit() {
    this.getStationTypes();
  }

  getStationTypes(){
    this.stationService.getStationCompliance()
      .subscribe(
        graphData => {
          this.graphData = graphData;
          this.options = {
            chart : {type: 'pie',
                    plotShadow: true
            },
            plotOptions : {
              showInLegend: true
            },
            title : {text: 'Up and Down devices'},
            series: [{
              data: this.processStationType(this.graphData)
            }]
          }
        },
        error => this.errorMessage = <any>error
      );
  }

Other two components are almost the same, they just show other graph

Angular Solutions


Solution 1 - Angular

I encountered a similar problem and solved it using Aran's suggestion to reference Cory Rylan's Angular 2 Observable Data Services blog post. The key for me was using BehaviorSubject. Here's the snippets of the code that ultimately worked for me.

Data Service:
The data service creates an internal BehaviorSubject to cache the data once when the service is initialized. Consumers use the subscribeToDataService() method to access the data.

    import { Injectable } from '@angular/core';
    import { Http, Response } from '@angular/http';
    
    import { BehaviorSubject } from 'rxjs/BehaviorSubject';
    import { Observable } from 'rxjs/Observable';
    
    import { Data } from './data';
    import { properties } from '../../properties';
    
    @Injectable()
    export class DataService {
      allData: Data[] = new Array<Data>();
      allData$: BehaviorSubject<Data[]>;
    
      constructor(private http: Http) {
        this.initializeDataService();
      }
    
      initializeDataService() {
        if (!this.allData$) {
          this.allData$ = <BehaviorSubject<Data[]>> new BehaviorSubject(new Array<Data>());
    
          this.http.get(properties.DATA_API)
            .map(this.extractData)
            .catch(this.handleError)
            .subscribe(
              allData => {
                this.allData = allData;
                this.allData$.next(allData);
              },
              error => console.log("Error subscribing to DataService: " + error)
            );
        }
      }
    
      subscribeToDataService(): Observable<Data[]> {
        return this.allData$.asObservable();
      }
    
      // other methods have been omitted
    
    }
    

Component:
Components can subscribe to the data service upon initialization.

    export class TestComponent implements OnInit {
      allData$: Observable<Data[]>;
    
      constructor(private dataService: DataService) {
      }
    
      ngOnInit() {
        this.allData$ = this.dataService.subscribeToDataService();
      }
    
    }
    

Component Template:
The template can then iterate over the observable as necessary using the async pipe.

    *ngFor="let data of allData$ | async" 
    

Subscribers are updated each time the next() method is called on the BehaviorSubject in the data service.

Solution 2 - Angular

The issue that you have in your code is that you are returning a new observable each time your function is called. This is because http.get is creating a new Observable each time it is called. The way to solve this could be to store the observable (via closure) in the service which will ensure that all of the subjects are subscribing to the same observable. This isn't perfect code, but I had a similar issue and this solved my problem for the time being.

import { Injectable } from '@angular/core';
import { Http, Response } from '@angular/http';

import {Observable} from 'rxjs/Rx';

// Import RxJs required methods
import 'rxjs/add/operator/map';
import 'rxjs/add/operator/catch';

import { StationCompliance } from './model/StationCompliance';


@Injectable()
export class StationComplianceService {

  private url = '/api/read/stations';

  constructor(private http : Http) {
    console.log('Started Station compliance service');
   }
   
   private stationComplianceObservable: Rx.Observable<StationCompliance[]>;
   
 
   getStationCompliance() : Observable<StationCompliance []> {
    
    if(this.stationComplianceObservable){
        return this.stationComplianceObservable;
    }        

      this.stationComplianceObservable = this.http.get(this.url)
      .debounce(1000)
      .share()
      .map((res:Response) => res.json())
      .finally(function () { this.stationComplianceObservable = null}) 
      .catch((error:any) => Observable.throw(error.json().error || 'Server Error'));

    return this.stationComplianceObservable;
   }
}

Solution 3 - Angular

Solution 4 - Angular

you can create a reactive data service and define a local Observable variable which is updated internally and subscribers can update themselves. this article explains it properly data services

Solution 5 - Angular

I know this thread is old, but the accepted answer helped me a lot and I would like to add some possible improvements using debounce, switchmap and a hacky global notification system (to do this proper ngrx should be used: https://ngrx.io/)

The premise of the concept is that a notification service can be used to push changed to all other services telling them to fetch their data:

export class NotifyerService {

  constructor() { }

  notifyer: Subject<any> = new Subject

  notifyAll() {
    console.log("ALL NOTIFIED")
    this.notifyer.next("GO")
  }

}

A subject is used, because calling .next(val) on a subject pushes the data to all listeners

In the service for a particular componenet (in your case "DataService") you can manage the data aquisition, and caching activity:

export class GetDataService {

  // cache the incoming data
  cache: Subject<any> = new Subject

  constructor(private http: HttpClient,
    private notifyerService: NotifyerService) {

    // subscribe to any notification from central message broker
    this.notifyerService.notifyer.pipe(

      // filtering can be used to perform different actions based on different notifications
      filter(val => val == "GO"),

      // prevent notification spam by debouncing
      debounceTime(300),

      // SUBSCRIBE to the output of getData, cancelling previous subscription
      switchMap(() => this.getData())

    ).subscribe(res => {

      console.log("expensive server request")

      // save data in cache, notify listeners
      this.cache.next(res)
    })

  }

  // expensive function which gets data
  getData(): Observable<any> {
    return this.http.get<any>(BASE_URL);
  }

} 

The key concept in the above code is that you setup a cache object, and update it whenever there is a notification. In the constructor, we want to pipe all future notifications through a series of operators:

  • Filtering can be used if you want to only update the cache if the notification service outputs certain things (in this case "GO"). If you start doing this, ngrx will almost certainly be used instead.
  • debounceTime prevents your server being spammed with many requests (if say, the notifications are based on user input)
  • switchMap is a super important operator for caching and state change. switchMap runs the function inside (in this case, the getData function) and subscribes to its output. This means that the output of switchMap will be the subscription to the server data.
  • Another important feature of switchMap is that is cancels previous subscriptions. This means that if your server request has not returned before another notification arrives, it will cancel the old one and ping the server again.

Now after all of that, the server data will be placed in the cache with the .next(res) method.

Now all the final component needs to do is listen to the cache for updates, and handle appropriatly:

export class ButtonclickerComponent implements OnInit {

  value: any;

  constructor(private getDataService: GetDataService,
    private notifyerService: NotifyerService) { }

  ngOnInit() {

    // listen to cache for updates
    this.getDataService.cache.pipe(

      // can do something specific to this component if have multiple subscriptions off same cache
      map(x => x)

      // subsc
    ).subscribe(x => { console.log(x); this.value = x.value })

  }

  onClick() {

    // notify all components of state changes
    this.notifyerService.notifyAll()
  }

}

The concept in action:

Angular App on button click

Server response

Solution 6 - Angular

The solution is save once created observable and make it shareable (by default it is not). So your service will look like:

@Injectable()
export class StationComplianceService {

  private stationCompliance: StationCompliance;
  private stream: Observable<StationCompliance []>;
  private url = '/api/read/stations';

  constructor(private http : Http) {
    console.log('Started Station compliance service');
   }

   getStationCompliance() : Observable<StationCompliance []> {
     /** is remote value is already fetched, just return it as Observable */
     if (this.stationComliance) {
       return Observable.of(this.stationComliance);
     }
     /** otherwise if stream already created, prevent another stream creation (exactly your question */
     if (this.stream) {
       return this.stream;
     }
     /** otherwise run remote data fetching */
     this.stream = this.http.get(this.url)
      .map((res:Response) => res.json())
      .catch((error:any) => Observable.throw(error.json().error || 'Server Error'))
      .share(); /** and make the stream shareable (by default it is not) */
     return this.stream;
   }
}

Attributions

All content for this solution is sourced from the original question on Stackoverflow.

The content on this page is licensed under the Attribution-ShareAlike 4.0 International (CC BY-SA 4.0) license.

Content TypeOriginal AuthorOriginal Content on Stackoverflow
QuestionnaoruView Question on Stackoverflow
Solution 1 - AngularScottView Answer on Stackoverflow
Solution 2 - AngularRay SuelzerView Answer on Stackoverflow
Solution 3 - AngularAndreView Answer on Stackoverflow
Solution 4 - AngularAran DekarView Answer on Stackoverflow
Solution 5 - AngularangeldrothView Answer on Stackoverflow
Solution 6 - AngularzeliboblaView Answer on Stackoverflow