TL;DR
I created a small library called FleuveJS. This post shares how I've come to create it and how it has evolved. Also, I am looking for your feedback on it :-)
Two years ago, I was working on an Angular project (I always work on Angular projects). For those of you who do not know: Angular comes with RxJs, an awesome library to manipulate Observables.
I have always found this library pretty remarkable: the diversity of operators and the structures you can manipulate (Observables, Subjects, BehaviorSubjects) allow developers to write reactive and powerful code. And as I was working on my Angular project, I started to wonder: how hard would it be to write an RxJs-like library? I was really curious about the challenges they would face and how they would overcome it.
Also, I was wondering: would it be possible to have an infinite source of data with Observables?
And so, I started my project.
What should be the MVP?
My first question was: what should be the minimum my library should be able to do?
I knew I wanted to manipulate Observables. What can we do with an Observable ?
To me, an Observable should allow the following operations:
- subscribe
- next
- pipe
I never used the forEach
method so I skipped it. The next
method was because I wanted to have something mutable.
There it was. My first version should propose to developers a simple structure with three methods: subscribe
, next
and pipe
.
The technical stack
I wanted to keep it as simple as possible: I only used TypeScript to develop this library, webpack for the bundling and Jest for testing.
The first implementation
TDD all the way!
I also wanted to try TDD, so I started by writing a test.
The first ones were the following
- As an Observable, it:
- should create a new Observable with no emitting value;
- should create a new Observable with an emitting value;
-
subscribe
method, it:- should throw an error if the wrong argument is passed;
- should add a subscriber to the list of subscribers;
- should not execute the subscriber if there is no emitting value;
- should execute the subscriber if there is at least one emitting value;
-
pipe
method, it:- should return a new Observable with no value if the initial Observable is empty;
- should return a new Observable with the original Observable's value;
Since I had no operators yet, I did not see any other tests I could implement.
Of course, the tests were failing, so now it was time to implement the Observable
class.
The Observable
class
First of all, the Observable
class should have a generic parameter.
class Observable<T = never>
Then, it should have an inner sequence, which would be an array of items of type T
.
private _innerSequence!: T[];
So far, this simple class:
class Observable<T = never> {
private _innerSequence!: T[];
constructor(...initialSequence: T[]) {
this._innerSequence = initialSequence;
}
}
allows to create an observable with some values in it. The problem is: we still cannot pipe it, nor subscribe to it.
The Subscriptions
and the Subscribers
How can a Subscriber
be represented?
If we take a look at RxJs, we can see that a Subscriber
should have three methods: one to handle the emitted values, one to handle the errors, and one to handle when an Observable
is completed (i.e: the sequence last element has been emitted).
A simple interface is enough, with an interface for each method as well:
interface Subscriber<T = any> {
next?: OnNext<T>, error?: OnError, complete?: OnComplete
}
interface OnNext<T> { (t: T): void; }
interface OnError { (err: Error): void }
interface OnComplete { (): void };
Also, everytime someone subscribes to an Observable
, they should receive a Subscription
in order to manage if they want to unsubscribe
or not. Hence the following class
:
class Subscription {
constructor(private _unsubscribeCallback?: UnsubscribeCallback) {}
unsubscribe(): void {
this._unsubscribeCallback && this._unsubscribeCallback();
}
}
Hold on a minute! What is this UnsubscribeCallback
and why is it optional?
Let's see how our Observable
class should behave when someone subscribes to it.
Everytime there is a subscribing, we should:
- check if the subscriber is actually a
Subscriber
(is either of typeOnNext<T>
orSubscriber<T>
), else throw an error; - if there is no subscriber passed as an argument, we should read the inner sequence without executing anything;
- if there is an actual subscriber, add it to the list of subscribers of the
Observable
and execute its methods according to the inner sequence; - finally, we should return a new
Subscription
.
And when this Subscription
has its unsubscribe
method called, we should finally remove the corresponding Subscriber
of the Observable
list of subscribers. A simple way to do it is by passing to the Subscription
constructor a callback that will simply filter out the corresponding Subscriber
, like so:
() => (this._subscribers = this._subscribers.filter((s) => s !== subscriber))
This callback will then be executed when the unsubscribe
method is called.
Now, why is it optional?
There is a case where we do not need any callback to be executed: it is when one subscribes to an Observable
without providing any Subscriber
. In this particular case, we won't add anything to our list of subscribers.
pipe
and the operators
At this step of my project, I need to implement operators so my Observables
can be transformed.
An operator, such as map
, filter
, tap
or even switchMap
, is nothing but a function taking a callback as an input, and returning a result as an input. The result can be anything: a number
, a string
, a boolean
, void
, or even another Observable
.
The callback can have a generalized representation we can call OperatorFunction
. There are operators that will take an argument of type T
, and output a result of type U
, such as U
can be anything, including T
. And there are operators that will take an argument of type T
, but won't necessarily output anything.
Given this information, we have a type OperatorFunction<T, U = never> = (source: T) => U
.
Now, there is a tricky case (there are many but this one was very challenging for me): the switchMap
operator.
This operator takes a value of type T
, and returns an Observable
of type U
. The question is: how can I use the pipe
method with any operators I want, with each operator not having to care if the previous returns a new Observable
or not?
In short, the following code should work:
const obs$ = of(12, 13, 14)
.pipe(
map((x) => x + 1), //will output 13, 14, 15
switchMap((x) => of(x - 3)), //will output Observable(10), Observable(11), Observable(12)
filter((x) => x > 10) //will output false, true, true
);
But how can the filter
operator receive the values wrapped in the Observables
returned by the switchMap
operator, and not the actual Observables
?
To answer that, I chose to specify the result of every operator as an OperationResult
. An OperationResult
should bear a value (the result), as well as an error if the operation threw one, and a flag.
What are flags for?
I saw four cases:
- the result comes from an operator such as
switchMap
and mus actually be unwrapped; - the previous operator was a predicate that wasn't matched and the rest of the operation pipeline must stop for the whole inner sequence;
- the previous operator was a predicate that wasn't matched and the rest of the operation pipeline will be skipped to the next inner sequence item;
- the previous operator threw an error.
It gives us the following enum:
enum OperationResultFlag {
UnwrapSwitch = 'UnwrapSwitch',
MustStop = 'MustStop',
FilterNotMatched = 'FilterNotMatched',
OperationError = 'OperationError',
}
As for the OperationResult
, it's a class, implemented as follow:
class OperationResult<T> {
constructor(private _value: T, private _flag?: OperationResultFlag, private _error?: Error) {}
get value() {return this._value;}
get flag() {return this._flag;}
get error() {return this._error;}
isUnwrapSwitch(): boolean {
return this._flag === OperationResultFlag.UnwrapSwitch;
}
isMustStop(): boolean {
return this._flag === OperationResultFlag.MustStop;
}
isFilterNotMatched(): boolean {
return this._flag === OperationResultFlag.FilterNotMatched;
}
isOperationError(): boolean {
return this._flag === OperationResultFlag.OperationError
}
Now that we have a class for our operation results, we can actually combine multiple operators. In our Observable
class, we will internally check after each operation if the result is flagged, and according to it what we need to do.
Follow for the next part
In the next part, I will explain how I came up with more specific Observable
implementations such as MutableObservable
or ObservableFork
.
These implementations implied a lot of change and thinking in my code, and are still evolving.
Anyway, I hope you liked this article, do not hesitate to leave your feedback!
Top comments (0)