DEV Community

Cover image for Build Your Own RxJS - Operators - (Part 2)
Chihab Otmani
Chihab Otmani

Posted on • Edited on

Build Your Own RxJS - Operators - (Part 2)

Introduction

On the previous post of the series we built our own Observable class.

class Observable {
  private _subscribe;
  private _unsubscribe;
  private _stopped = true;
  constructor(subscribe) {
    this._subscribe = subscribe;
  }
  _stop() {
    this._stopped = true;
    setTimeout(() => {
      this._unsubscribe();
    });
  }
  subscribe(observer) {
    this._stopped = false;
    this._unsubscribe = this._subscribe({
      next: (value) => {
        if (!this._stopped) {
          observer.next(value);
        }
      },
      complete: () => {
        if (!this._stopped) {
          observer.complete();
          this._stop();
        }
      },
      error: () => {
        if (!this._stopped) {
          observer.error();
          this._stop();
        }
      },
    });
    return { unsubscribe: this._unsubscribe };
  }
}
Enter fullscreen mode Exit fullscreen mode

Let's see now how to build and compose operators in RxJS.

Definition

An operator is a function that takes a source Observable as a parameter and returns a new destination Observable. It reacts on the three events from the source observable and depending on the operator logic sends specific events to the destination Observable.

Custom Operators

Let's build a custom operator that filters out odd numbers.

function even(source: Observable) {
  const destination = new Observable((observer: Observer) => {
    const subscription = source.subscribe({
      next: (value) => {
        if (value % 2 === 0) {
          observer.next(value);
        }
      },
      error: (e) => {
        observer.error(e);
      },
      complete: () => {
        observer.complete();
      },
    });
    return () => {
      subscription?.unsubscribe();
    };
  });
  return destination;
}
Enter fullscreen mode Exit fullscreen mode

Let's apply the operator directly on an Observable without using the pipe function (we'll come back to this later), this is fundamental to understanding operators and how data flows from one to another.

const even$ = even(interval$);
const subscription = even$.subscribe({
  next: (event) => console.log(event),
});

// later
subscription.unsubscribe();
Enter fullscreen mode Exit fullscreen mode

even$ is the inner destination Observable created and returned by the even function.
When we unsubscribe from even$, we have to unsubscribe from the source Observable, it is our responsibility to add this logic.

Configurable custom operators

Now we want to have a mulitply operator that takes the number to multiply by as a parameter. We create a higher order function that, when invoked, returns the actual operator.

function multiply(by) {
  return function (observable: Observable) {
    return new Observable((observer: Observer) => {
      const subscription = observable.subscribe({
        next: (value) => {
          observer.next(value * by);
        },
        error: (e) => {
          observer.error(e);
        },
        complete: () => {
          observer.complete();
        },
      });
      return () => {
        subscription?.unsubscribe();
      };
    });
  };
}
Enter fullscreen mode Exit fullscreen mode

pipe

Let's say we want to display only the odd numbers from a interval stream that we have multiplied each value by 3.

const interval$ = interval(1000);
const intervalBy3$ = multiply(3)(interval$);
const even$ = even(intervalBy3$);
even$.subscribe({
  next: (event) => console.log(event),
});
Enter fullscreen mode Exit fullscreen mode

In one line, composing the two function calls.

const even$ = even(multiply(3)(interval$));
Enter fullscreen mode Exit fullscreen mode

Pipe is just a utility function that pipes functions together, it is not specific to operator functions, it can be used to compose any functions.

import { pipe } from "rxjs";

pipe(multiply(3), even)(interval$) === even(multiply(3)(interval$));
Enter fullscreen mode Exit fullscreen mode

Preferably we'd want to have the pipe method in our Observable class.

import { pipe } from "rxjs";

class Observable {
  constructor(subscribe) {
    this._subscribe = subscribe;
  }
  subscribe(observer) {
    return this._subscribe(observer);
  }
  pipe(...operators) {
    return pipe(...operators)(this);
  }
}

interval$.pipe(multiply(3), even).subscribe({
  next: (event) => console.log(event),
});
Enter fullscreen mode Exit fullscreen mode

At this point you should have got the whole picture, let's have some practice on the remaining operators: map, take and switchMapTo.

map

map is easy, we subscribe to the source observable and emit the values using the passed in projection function.

function map(projection) {
  return function (source) {
    return new Observable((observer) => {
      const subscription = source.subscribe({
        next: (value) => {
          observer.next(projection(value));
        },
        error: (e) => {
          observer.error(e);
        },
        complete: () => {
          observer.complete();
        },
      });
      return () => {
        subscription?.unsubscribe();
      };
    });
  };
}
Enter fullscreen mode Exit fullscreen mode

take

interval$
  .pipe(
    take(5),
    map((val) => val * 2)
  )
  .subscribe({ next(value) { console.log(value), complete() => console.log('End of stream') });
Enter fullscreen mode Exit fullscreen mode

In the example above we're interested only on the first 5 interval events, on the fifth event take(5):

  • completes the source observable (interval$)
  • completes the observer otherwise which also completes its observer otherwise the complete in our subscribe will never occur.
function take(maxEvents) {
  return function (source: Observable) {
    return new Observable((observer) => {
      let counter = 0;
      const subscription = source.subscribe({
        next(value) {
          observer.next(value);
          if (++counter === maxEvents) {
            subscription?.unsubscribe();
            observer.complete();
          }
        },
        error(e) {
          observer.error();
        },
        complete() {
          observer.complete();
        },
      });
      return () => {
        subscription?.unsubscribe();
      };
    });
  };
}
Enter fullscreen mode Exit fullscreen mode

switchMapTo

In switchMapTo, we are interested in the source observable only to know that a new event has occurred.
Each time we receive an event from the source observable, we switch to the destination observable, the inner observable, subscribe to it, and send value to the destination Observable.

When a new event is emitted by the source observable, we unsubscribe from the internal observable and create a new subscription. This "unsubscription" is very important because in our case we do not want to have any timers still active.

If we receive an error from the source observable or the innrer observable we pass it down to the observer right away.

If we receive a completion from the source observable we wait until the active inner observable completes then we complete the observer.

function switchMapTo(destination: Observable) {
  return function (source: Observable) {
    return new Observable((observer) => {
      let innerSubscription;
      let innerCompleted = true;
      let isComplete = false;
      const checkCompletion = () =>
        isComplete && innerCompleted && observer.complete();
      const subscription = source.subscribe({
        next: (value) => {
          innerSubscription?.unsubscribe();
          innerSubscription = destination.subscribe({
            next(value) {
              observer.next(value);
            },
            error(e) {
              observer.error();
            },
            complete() {
              innerCompleted = true;
              checkCompletion();
            },
          });
        },
        error: (e) => {
          observer.error(e);
        },
        complete: () => {
          isComplete = true;
          checkCompletion();
        },
      });
      return () => {
        innerSubscription?.unsubscribe();
        subscription?.unsubscribe();
      };
    });
  };
}
Enter fullscreen mode Exit fullscreen mode

Resources

Practice

You might have noticed that the timer does not start right away when you click on the button. To fix that we can use startWith operator.

It is your turn to implement it here.

Summary

Understanding the internal mechanisms of RxJS allowed us to develop robust operators. An operator can be considered as a helper function that is not really bound to a specific domain and that we can reuse in several applications.

In the next article, we will discuss Unicast and Multicast Observables.

Support

If you like the article, let me know, I hardly ever write, it will motivate me to produce more content.

Top comments (0)