What is pipe for in RxJS?

AngularRxjsRxjs5

Angular Problem Overview


I think I have the base concept, but there are some obscurities

So in general this is how I use an Observable:

observable.subscribe(x => {

})

If I want to filter data I can use this:

import { first, last, map, reduce, find, skipWhile } from 'rxjs/operators';
observable.pipe(
    map(x => {return x}),
    first()
    ).subscribe(x => {

})

I can also do this:

import 'rxjs/add/operator/map';
import 'rxjs/add/operator/first';

observable.map(x => {return x}).first().subscribe(x => {

})

So my questions are:

  1. What is the difference?
  2. If there is no difference, why the function pipe exists?
  3. Why those functions need different imports?

Angular Solutions


Solution 1 - Angular

The "pipeable" (former "lettable") operators is the current and recommended way of using operators since RxJS 5.5.

I strongly recommend you to read the official documentation on pipeable operators

The main difference is that it's easier to make custom operators and that it's better treeshakable while not altering some global Observable object that could possible make collisions if two different parties wanted to create an operator of the same name.

Using separate import statement for each operator 'rxjs/add/operator/first' was a way to make smaller app bundles. By importing only operators you need instead of the entire RxJS library you can significantly reduce the total bundle size. However the compiler can't know if you imported 'rxjs/add/operator/first' because you really need it in you code or you just forgot to remove it when refactoring your code. That's one of the advantages of using pipeable operators where unused imports are ignored automatically.

Solution 2 - Angular

The pipe method

According to original Documentation

the pipable operator is that function take observables as a input and it returns another observable .previous observable stays unmodified.

pipe(...fns: UnaryFunction<any, any>[]): UnaryFunction<any, any>

Original Post

> What pipe mean? > > That means that any operators you previously used on the instance of > observable are available as pure functions under rxjs/operators. > This makes building a composition of operators or re-using operators > becomes really easy, without having to resort to all sorts of > programming gymnastics where you have to create a custom observable > extending Observable, then overwrite lift just to make your own custom > thing.

const { Observable } = require('rxjs/Rx')
const { filter, map, reduce,  } = require('rxjs/operators')
const { pipe } = require('rxjs/Rx')

const filterOutWithEvens = filter(x => x % 2)
const doubleByValue = x => map(value => value * x);
const sumValue = reduce((acc, next) => acc + next, 0);
const source$ = Observable.range(0, 10)

source$.pipe(
  filterOutWithEvens, 
  doubleByValue(2), 
  sumValue)
  .subscribe(console.log); // 50

Solution 3 - Angular

What is the difference? As you see in your example, the main difference is to improve the readability of the source code. There are only two functions in your example, but imagine if there are a dozen of the functions? then it will go like

function1().function2().function3().function4()

it's really getting ugly, and difficult to read, especially when you are filling inside of the functions. On top of that certain editors like Visual Studio code doesn't allow more than 140 line length. but if it goes like following.

Observable.pipe(
function1(),
function2(),
function3(),
function4()
)

This drastically improves the readability.

If there is no difference, why the function pipe exists? The purpose of the PIPE() function is to lump together all the functions that take, and return observable. It takes an observable initially, then that observable is used throughout the pipe() function by each function used inside of it.

First function takes the observable, processes it, modify its value, and passes to the next function, then next function takes the output observable of the first function, processes it, and passes to the next function, then it goes on until all the functions inside of pipe() function use that observable, finally you have the processed observable. At the end you can execute the observable with subscribe() function to extract the value out of it. Remember, the values in the original observable are not changed.!! 

Why those functions need different imports? Imports depend on in where the function is specified in the rxjs package. It goes like this. All the modules are stored in node_modules folder in Angular. import { class } from "module";

Let's take the following code as an example. I have just wrote it in stackblitz. So nothing is automatically generated, or copied from somewhere else. I don't see the point of copying what stated in rxjs documentation when you can go and read it too. I assume you asked this question here, because you didn't understand the documentation. 

  • There are pipe, observable, of, map classes imported from the respective modules. 

  • In body of the class, I used the Pipe() function as seen in the code. 

  • The Of() function returns an observable, that emits numbers in sequence when it's subscribed.

  • Observable isn't subscribed yet.

  • When you used it likes Observable.pipe(), the pipe() function uses the given Observable as an input.

  • The first function, map() function uses that Observable, process it, return the processed Observable back to the pipe() function,

  • then that processed Observable is given to the next function if there is any,

  • and it goes on like that until all the functions process the Observable,

  • at the end that Observable is returned by the pipe() function to a variable, in the following example its obs.

Now the thing in Observable is, As long as the observer didn't subscribe it, it doesn't emit any value. So I used the subscribe() function to subscribe to this Observable, then as soon as I subscribed it. The of() function starts emitting values, then they are processed through pipe() function, and you get the final result at the end, for instance 1 is taken from of() function, 1 is added 1 in map() function, and returned back. You can get that value as an argument inside of the subscribe( function (argument) {} ) function.

If you want to print it, then uses as

subscribe( function (argument) {
    console.log(argument)
   } 
)
    import { Component, OnInit } from '@angular/core';
    import { pipe } from 'rxjs';
    import { Observable, of } from 'rxjs';
    import { map } from 'rxjs/operators';
    
    @Component({
      selector: 'my-app',
      templateUrl: './app.component.html',
      styleUrls: [ './app.component.css' ]
    })
    export class AppComponent implements OnInit  {
    
      obs = of(1,2,3).pipe(
      map(x => x + 1),
      ); 
    
      constructor() { }
    
      ngOnInit(){  
        this.obs.subscribe(value => console.log(value))
      }
    }

https://stackblitz.com/edit/angular-ivy-plifkg

Solution 4 - Angular

A good summary I've come up with is:

It decouples the streaming operations (map, filter, reduce...) from the core functionality(subscribing, piping). By piping operations instead of chaining, it doesn't pollute the prototype of Observable making it easier to do tree shaking.

See https://github.com/ReactiveX/rxjs/blob/master/doc/pipeable-operators.md#why

> Problems with the patched operators for dot-chaining are: > > Any library that imports a patch operator will augment the > Observable.prototype for all consumers of that library, creating blind > dependencies. If the library removes their usage, they unknowingly > break everyone else. With pipeables, you have to import the operators > you need into each file you use them in. > > Operators patched directly onto the prototype are not "tree-shakeable" > by tools like rollup or webpack. Pipeable operators will be as they > are just functions pulled in from modules directly. > > Unused operators that are being imported in apps cannot be detected > reliably by any sort of build tooling or lint rule. That means that > you might import scan, but stop using it, and it's still being added > to your output bundle. With pipeable operators, if you're not using > it, a lint rule can pick it up for you. > > Functional composition is awesome. Building your own custom operators > becomes much, much easier, and now they work and look just like all > other operators from rxjs. You don't need to extend Observable or > override lift anymore.

Solution 5 - Angular

This is how I explain observable:

You need to make a plan based on weather conditions so you turn on a radio and listening to a weather channel that broadcasts weather conditions 24/7. In this scenario, instead of getting one single response, the response is ongoing. This response is like a subscription to an observable. the observable is the "weather" and the subscription is the "radio signals that keep you updated". As long as your radio is on, you are getting every available update. You are not missing any information until you turn off the radio.

I said weather is observable, but you are listening to the radio not the weather. So the radio is also an observable. What the weather announcer says is the function of the weather report sent to him by the meteorologist. What meteorologist writes is a function of data coming from the weather station. The data is coming from the weather station is the function of all the instruments (barometer, wind wane, wind gauge) attached to it and the instruments are a function of the weather itself.

There are at least 5 observables in this entire process. In this process, there are two types of observables. Source observable and output observable. In this example, the weather is the "source observable" and the radio is the "output observable". Everything in between represents the PIPE FUNCTION.

Pipe function is what takes the source observable performs operations on it to provide an output observable and all of those operations happen inside. The operations all deal with the observables themselves

Attributions

All content for this solution is sourced from the original question on Stackoverflow.

The content on this page is licensed under the Attribution-ShareAlike 4.0 International (CC BY-SA 4.0) license.

Content TypeOriginal AuthorOriginal Content on Stackoverflow
Questionenno.voidView Question on Stackoverflow
Solution 1 - AngularmartinView Answer on Stackoverflow
Solution 2 - AngularChanaka WeerasingheView Answer on Stackoverflow
Solution 3 - AngularDon DilangaView Answer on Stackoverflow
Solution 4 - AngularJuan MendesView Answer on Stackoverflow
Solution 5 - AngularYilmazView Answer on Stackoverflow