Reactive Programming - RxJS vs EventEmitter in Node.js

Javascriptnode.jsAsynchronousReactive ProgrammingRxjs

Javascript Problem Overview


Recently I've started looking at https://github.com/Reactive-Extensions/RxJS">RxJS</a> and RxJava(from Netflix) libraries which work on the concept of Reactive Programming.

Node.js works on the basis of event loops, which provides you all the arsenal for asynchronous programming and the subsequent node libraries like "cluster" help you to get best out of your multi-core machine. And Node.js also provides you the EventEmitter functionality where you can subscribe to events and act upon it asynchronously.

On the other hand if I understand correctly RxJS (and Reactive Programming in general) works on the principle of event streams, subscribing to event streams, transforming the event stream data asynchronously.

So, the question is what does using Rx packages in Node.js mean. How different is the Node's event loop, event emitter & subscriptions to the Rx's streams and subscriptions.

Javascript Solutions


Solution 1 - Javascript

Observables are not like EventEmitters. They may act like EventEmitters in some cases, namely when they are multicasted using RxJS Subjects, but usually they don't act like EventEmitters.

In short, an RxJS Subject is like an EventEmitter, but an RxJS Observable is a more generic interface. Observables are more similar to functions with zero arguments.

Consider the following:


function foo() {
  console.log('Hello');
  return 42;
}

var x = foo.call(); // same as foo()
console.log(x);
var y = foo.call(); // same as foo()
console.log(y);

Of course we all expect to see as output:

"Hello"
42
"Hello"
42

You can write the same behavior above, but with Observables:

var foo = Rx.Observable.create(function (observer) {
  console.log('Hello');
  observer.next(42);
});

foo.subscribe(function (x) {
  console.log(x);
});
foo.subscribe(function (y) {
  console.log(y);
});

And the output is the same:

"Hello"
42
"Hello"
42

That's because both functions and Observables are lazy computations. If you don't call the function, the console.log('Hello') won't happen. Also with Observables, if you don't "call" (subscribe), the console.log('Hello') won't happen. Plus, "calling" or "subscribing" is an independent operation: two function calls trigger two separate side effects, and two Observable subscribes trigger two separate side effects. As opposed to EventEmitters which share the side effects and have eager execution regardless of the existence of subscribers, Observables have no shared execution and are lazy.


So far, no difference between the behavior of a function and an Observable. This StackOverflow question would have been better phrased as "RxJS Observables vs functions?".

Some people claim that Observables are asynchronous. That is not true. If you surround a function call with logs, like this:

console.log('before');
console.log(foo.call());
console.log('after');

You will obviously see the output:

"before"
"Hello"
42
"after"

And this is the same behavior with Observables:

console.log('before');
foo.subscribe(function (x) {
  console.log(x);
});
console.log('after');

And the output:

"before"
"Hello"
42
"after"

Which proves the subscription of foo was entirely synchronous, just like a function.


So what is really the difference between an Observable and a function?

Observables can "return" multiple values over time, something which functions cannot. You can't do this:

function foo() {
  console.log('Hello');
  return 42;
  return 100; // dead code. will never happen
}

Functions can only return one value. Observables, however, can do this:

var foo = Rx.Observable.create(function (observer) {
  console.log('Hello');
  observer.next(42);
  observer.next(100); // "return" another value
  observer.next(200);
});

console.log('before');
foo.subscribe(function (x) {
  console.log(x);
});
console.log('after');

With synchronous output:

"before"
"Hello"
42
100
200
"after"

But you can also "return" values asynchronously:

var foo = Rx.Observable.create(function (observer) {
  console.log('Hello');
  observer.next(42);
  observer.next(100);
  observer.next(200);
  setTimeout(function () {
    observer.next(300);
  }, 1000);
});

With output:

"before"
"Hello"
42
100
200
"after"
300

To conclude,

  • func.call() means "give me one value immediately (synchronously)"
  • obsv.subscribe() means "give me values. Maybe many of them, maybe synchronously, maybe asynchronously"

That's how Observables are a generalization of functions (that have no arguments).

Solution 2 - Javascript

When does a listener gets attached to Emitter ?

With event emitters, the listeners are notified whenever an event, that they are interested in happens. When a new listener is added after the event has occurred, he will not know about the past event. Also the new listener will not know the history of events that happened before. Ofcourse, we could manually program our emitter and listener to handle this custom logic.

With reactive streams, the subscriber gets the stream of events that happened from the beginning. So the time at which he subscribes is not strict. Now he can perform variety of operations on the stream to get the sub-stream of events that he is interested in.

The advantage of this comes out:

  • when we need to process the events that happened over time
  • order in which they happened
  • patterns in which the events happened (Let's say, after every buy event on Google stock, a sell event on Microsoft stock happens within 5 minutes)

Higher order streams:

A Higher-order stream is a "stream of streams": a stream whose event values are themselves streams.

With Event emitters, one way of doing it is by having same listener attached to multiple event emitters. It becomes complex when we need to correlate the event happened on different emitters.

With reactive streams, it's a breeze. An example from mostjs (which is a reactive programming library, like RxJS but more performant)

const firstClick = most.fromEvent('click', document).take(1);
const mousemovesAfterFirstClick = firstClick.map(() =>
    most.fromEvent('mousemove', document)
        .takeUntil(most.of().delay(5000)))

In the above example, we correlate the click events with mouse move events. Deducting patterns across events become so easier to accomplish when events are available as a stream.

Having said that, with EventEmitter we could accomplish all this by over engineering our emitters and listeners. It needs over engineering because it is not intended in first place for such scenarios. Whereas reactive streams does this so fluently because it is intended to solve such problems.

Attributions

All content for this solution is sourced from the original question on Stackoverflow.

The content on this page is licensed under the Attribution-ShareAlike 4.0 International (CC BY-SA 4.0) license.

Content TypeOriginal AuthorOriginal Content on Stackoverflow
QuestionKishore YekkantiView Question on Stackoverflow
Solution 1 - JavascriptAndré StaltzView Answer on Stackoverflow
Solution 2 - JavascriptSairam KrishView Answer on Stackoverflow