DEV Community

Cover image for Build Your Own RxJS - Observables - (Part 1)
Chihab Otmani
Chihab Otmani

Posted on • Edited on

Build Your Own RxJS - Observables - (Part 1)

Introduction

One of the effective ways to better know a technology is to get an idea of the underlying implementation and ideally try to build your own.

Our journey throughout this series is to build some of the core RxJS features from scratch, namely:

  • The Observable class to create and subscribe to an Observable stream
  • Observable creation utilities like fromEvent and interval
  • Operators like take, map and switchMapTo
  • The pipe utility method which simplifies the composition of several operators applied to an Observable

Consider the code below that starts a new countdown every time the user clicks on the Reset button.

<button id="start">Start</button>
<div id="text"></div>
Enter fullscreen mode Exit fullscreen mode
import { interval, fromEvent } from 'rxjs';
import { switchMapTo, map, take, startWith } from 'rxjs/operators';

const countdownElem = document.getElementById('text');

function countdown(init, delay = 1000) {
  return interval(delay).pipe(
    take(init),
    map(val => init - val - 1),
    startWith(init)
  );
}

const click$ = fromEvent(document.getElementById('start'), 'click');
const countdownFrom10$ = countdown(10);
const countdownFrom10OnClick$ = click$.pipe(switchMapTo(countdownFrom10$));

const text = document.getElementById('#text');
countdownFrom10OnClick$.subscribe({
  next: text => {
    countdownElem.innerHTML = `${text}`;
  }
});
Enter fullscreen mode Exit fullscreen mode

Here is a preview of the final result.

At the end of the article, all RxJS imports can be replaced by ours for the same result.

import { interval, fromEvent } from "./rxjs-dev";
import { switchMap, map, take } from "./rxjs-dev/operators";
Enter fullscreen mode Exit fullscreen mode

Creating Observables

fromEvent

Let's start with the fromEvent function.

import { fromEvent } from "rxjs";

const clicks$ = fromEvent(document, "click");
clicks$.subscribe({
  next: (event) => console.log(event.clientX, event.clientY),
});
Enter fullscreen mode Exit fullscreen mode

Behind the scenes we can imagine that fromEvent uses addEventListener, let's make a first version of it.

function fromEvent(target, eventName) {
  return function (listener) {
    target.addEventListener(eventName, listener);
  };
}

const click$ = fromEvent(document, "click");
click$((event) => console.log(event.clientX, event.clientY));
Enter fullscreen mode Exit fullscreen mode

Notice that fromEvent does not directly call target.addEventListener but it returns a function that calls it.

This is one of the key differences with Promises.

  • A Promise is eager, it is executed immediately, without the need to call the then method on it.

  • An Observable is lazy, it is constructed and later its logic is executed when we subscribe to it.

Let's adapt our code to get it closer to the fromEvent api:

function fromEvent(target, eventName) {
  return {
    subscribe: (observer) => {
      target.addEventListener((event) => {
        observer.next(event);
      });
    },
  };
}

const click$ = fromEvent(document, "click");
click$.subscribe({
  next: (event) => console.log(event.clientX, event.clientY),
});
Enter fullscreen mode Exit fullscreen mode

We have made two updates:

  1. fromEvent no longer returns a function but an object containing a method subscribe that calls target.addEventLister when invoked. This is the beginning of an Observable.

  2. we replaced the listener function with an object literal having a next method. This is an Observer.

Essentially, we've just replaced callback functions with objects that have these specific contracts.

class Observable {
  subscribe: (observer: Observer) => {
    const data = []; // some logic here
    observer.next(data)
  };
}

interface Observer {
  next(event: any): void;
}
Enter fullscreen mode Exit fullscreen mode

Observable

Now, rather than returning an object literal, we want to create the Observable instance from the Observable class we shaped earlier.

function fromEvent(target, eventName): Observable {
  // return {
  //   subscribe(observer: Observer) {
  //     target.addEventListener(eventName, (event) => {
  //       observer.next(event);
  //     });
  //   },
  // };
  return new Observable((observer: Observer) => {
    target.addEventListener(eventName, (event) => {
      observer.next(event);
    });
  });
}
Enter fullscreen mode Exit fullscreen mode

Notice that the callback function passed to the Observable constructor is exactly the subscribe method we put in the object literal, we just need to store it for a later use; when the method subscribe is actually called.

class Observable {
  private _subscribe;
  constructor(subscribe) {
    this._subscribe = subscribe;
  }
  subscribe(observer: Observer) {
    this._subscribe(observer);
  }
}

const obs$ = new Observable((observer: Observer) => {
  observer.next('some data');
});
const anObserver: Observer = {
  next: (value) => console.log(value)
}
obs$.subscribe(anObserver);
Enter fullscreen mode Exit fullscreen mode

So basically, the purpose of an Observable is to wrap our usual callbacks with specific contracts so that we can compose them and build utilities around them as we will see next.

interval

Let's create the interval utility that creates an Observable that emits sequential numbers every specified interval of time.

const interval = (period) => Observable {
  return new Observable((observer: Observer) => {
    let tick = 0;
    setInterval((event) => {
      observer.next(tick++);
    }, period);
  });
};

const interval$ = interval(1000);
interval$.subscribe({
  next: (tick) => console.log(tick),
});
Enter fullscreen mode Exit fullscreen mode

Pretty straightforward, right?

unsubscribe

Unsubscribing from an observable means we're no longer interested on its future events. This is how we unsubscribe from an Observable in RxJS.

const subscription: Subscription = interval$.subscribe({
  next: console.log,
});

// Later
subscription.unsubscribe();
Enter fullscreen mode Exit fullscreen mode

Unsubscribing from interval Observable means clearing the interval which has been set by setInterval earlier because we're no longer interested on its data.

const interval = (period) => {
  return new Observable((observer) => {
    let tick = 0;
    const timer = setInterval((event) => {
      observer.next(tick++);
    }, period);

    return () => {
      clearInterval(timer);
    };
  });
};
Enter fullscreen mode Exit fullscreen mode

The teardown function returned on line 8 should be returned to be called using subscription.unsubscribe(). subscription.unsubscribe is our teardown function on line 8.

Let's adapt our Observable accordingly:

interface Subscription {
  unsubscribe(): void;
}

class Observable {
  private _subscribe;
  constructor(subscribe) {
    this._subscribe = subscribe;
  }
  subscribe(observer: Observer): Subscription {
    const tearDownFunction = this._subscribe(observer);
    return {
      unsubscribe: tearDownFunction
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

Subscribing similiraly in fromEvent:

function fromEvent(target, eventName): Observable {
  return new Observable((observer: Observer) => {
    const listener = observer.next;
    target.addEventListener(eventName, listener);
    return () => {
      target.removeListener(listener);
    };
  });
}
const subscription: Subscription = fromEvent(document, "click").subscribe({
  next: console.log,
});

// Later
subscription.unsubscribe();
Enter fullscreen mode Exit fullscreen mode

Observable contract

There are three types of values an Observable Execution can deliver:

  • "Next" sends a value
  • "Error" sends an error and stops the observable
  • "Complete" does not send a value and stops the observable
interface Observer {
  next(data: any): void;
  complete(): void;
  error(error: any): void;
}
Enter fullscreen mode Exit fullscreen mode

The Observable contract stipulates that whenever a complete or error messages are sent to the Observer, the Observable stops, wich entails the following:

  • The Observable unsubscribe method is called
  • All the future calls to the observer methods are ignored

Given the code below:

new Observable((observer: Observer) => {
  observer.next("Message 1");
  observer.error();
  observer.next("Message 2");
  observer.complete();
  return () => {
    console.log("Unsubscribed!");
  };
}).subscribe({
  next: (value) => console.log(value),
  complete: () => console.log("Complete"),
  error: () => console.log("Error"),
});
Enter fullscreen mode Exit fullscreen mode

The expected output according to the Observable contract is:

Message 1
Error
Unsubscribed
Enter fullscreen mode Exit fullscreen mode

whereas the current output is:

Message 1
Error
Message 2
Complete
Enter fullscreen mode Exit fullscreen mode

To fix our Observable, we have to hook into the observer methods and, depending on the state of the Observable, decide whether to call its methods or not and unsubscribe in case of an error or completion.

class Observable {
  private _subscribe;
  private _unsubscribe;
  private _stopped = true;
  constructor(subscribe) {
    this._subscribe = subscribe;
  }
  _stop() {
    this._stopped = true;
    setTimeout(() => {
      this._unsubscribe();
    });
  }
  subscribe(observer) {
    this._stopped = false;
    this._unsubscribe = this._subscribe({
      next: (value) => {
        if (!this._stopped) {
          observer.next(value);
        }
      },
      complete: () => {
        if (!this._stopped) {
          observer.complete();
          this._stop();
        }
      },
      error: () => {
        if (!this._stopped) {
          observer.error();
          this._stop();
        }
      },
    });
    return { unsubscribe: this._unsubscribe };
  }
}
Enter fullscreen mode Exit fullscreen mode

And that's it!

Summary

We have seen that by passing some functions around we can build a minimalist version of an RxJS Observable. It goes without saying that it is not ready for production. ☠️

Resources

Practice

You might have noticed that the timer does not start right away when you click on the button. To fix that we can replace interval with timer`.

It is your turn to implement it here.

On the next article we're going to re-implement some of the most used RxJS operators.

If you like the article, let me know, I hardly ever write, it will motivate me to produce more content.

Top comments (0)