DEV Community

Cover image for Rewriting 28 RxJS operators as React hooks
Mike Pearson
Mike Pearson

Posted on • Edited on

Rewriting 28 RxJS operators as React hooks

I wanted to prove that React hooks could do everything RxJS can do, so I tried to convert all 114 operators into React hooks. But there are important differences between React state and RxJS that made me question my goal after 28.

Events vs State

RxJS pushes values through streams. If you push 1 into a new Subject<number>() several times, everything downstream will receive each value and process it, unless prevented with a distinctUntilChanged.

React reacts to state changes. If you setState(1) several times, there is no state change, so nothing will be re-rendered.

To get React to react to multiple events represented by the same value, you need to wrap the value in an event object like setState({payload: 1}) or pass a DOM event directly.

Here's an example with increment events:

function Incrementer() {
  const [event, setEvent] = useState<MouseEvent>();
  const [count, setCount] = useState(0);

  useEffect(() => {
    if (!event) return;
    setCount(n => n + 1);
  }, [event])

  return <button onClick={setEvent}>Increment: {count}</button>
}
Enter fullscreen mode Exit fullscreen mode

Obviously this isn't the best way to implement a simple increment feature, but it shows the most flexible way to deal with values as events as opposed to state. The drawback is it makes React render an extra time when setCount is called.

A simpler (although less flexible) option is useReducer:

function Incrementer2() {
  const [count, increment] = useReducer(
    (state: number, e: MouseEvent) => state + 1,
    0
  );

  return <button onClick={increment}>Increment: {count}</button>;
}
Enter fullscreen mode Exit fullscreen mode

Most async logic occurs before state needs to be set, so this would probably work for the vast majority of cases.

Filtering

In RxJS you can filter values to avoid unnecessary downstream calculations.

In React, everything renders, so you need to handle default values.

Hooks cannot be called conditionally, so you need to implement an enabled option for each of them, like React Query does, or you need to make them handle default values appropriately.

Complexity and Efficiency

There are 114 operators on the RxJS docs site. I only converted 28 of them.

When I got to concatMap I realized that while it is possible to implement all RxJS operators as hooks, it would take a lot of planning. If there is enough interest, I will do it, but upload it as a GitHub repository, and possibly make an npm package for it.

There were inefficiencies with using React state for some of these operators, because intermediate state needed to be updated synchronously inside useEffects. RxJS seems nicer the more complex an async data flow is.

React Query is the gold standard for using async data declaratively with hooks. I ended up needing to modify even my simple useTimer hook to work more like useQuery to take multiple keys in order to work as an inner observable for other operators.

However, even storing all inputs as keys and returning the async data from the latest input is not good enough: I anticipate that a more advanced pattern would be needed to work inside mergeMap, where inputs may output many responses in any order, and all of them are necessary.

There may also be more limitations yet to be discovered from the other 86 operators.

Summary

In short, I stopped because I realized that the complexity required to handle all async pipelines was enough to justify a dedicated npm package, but the value of creating one is marginal since RxJS already exists, is more efficient, and is easy to use in React.

Notes and Code

(This list is from the RxJS docs site)

Check out the demos on StackBlitz.

Creation Operators

Join Creation Operators

Transformation Operators

Creation Operators

ajax

fetch

bindCallback

See bindCallback.

You will create a hook creator. Each unique function that takes a callback as its last argument will have its own hook. Here is the hook creator I made:

function getUseBindCallback<Args extends any[], Arg>(
  fnWithCb: (...args: [...Args, (arg: Arg) => void]) => void
) {
  return function useBindCallback(initialEvent: Arg, ...args: Args) {
    const [event, setEvent] = useState<Arg>(initialEvent);

    useEffect(() => {
      if (event !== initialEvent) return; // Only run once
      fnWithCb.call(null, ...args, (e: Arg) => {
        setEvent(e);
      });
    }, args);

    return event;
  };
}
Enter fullscreen mode Exit fullscreen mode

Here is an example function and corresponding hook:

const someFunctionWithCallback = (
  a: string,
  b: number,
  cb: ({ a, b }: { a: string; b: number }) => void
) => {
  setTimeout(() => cb({ a, b }), 3000);
};

const useBindCallback = getUseBindCallback(someFunctionWithCallback);
Enter fullscreen mode Exit fullscreen mode

Demo:

function UseBindCallbackDemo() {
  const { a, b } = useBindCallback({ a: '-', b: 0 }, 'Callback finished', 1);
  return (
    <div>
      {a} {b}
    </div>
  );
}
Enter fullscreen mode Exit fullscreen mode

First in DOM: - 0
After 3 seconds: Callback finished 1

bindNodeCallback

See bindNodeCallback.

It's just like bindCallback, but the callback is expected to be of type callback(error, result).

So we can use the same hook creator as with bindCallback, but with extra generics Err, Result and Arg extends [Err, Result]. And the callback args will be collected into an array.

function getUseBindNodeCallback<
  Args extends any[],
  Err,
  Result,
  Arg extends [Err, Result]
>(fnWithCb: (...args: [...Args, (...arg: Arg) => void]) => void) {
  return function useBindNodeCallback(initialEvent: Arg, ...args: Args) {
    const [event, setEvent] = useState<Arg>(initialEvent);

    useEffect(() => {
      if (event !== initialEvent) return; // Only run once
      fnWithCb.call(null, ...args, (...e: Arg) => {
        setEvent(e);
      });
    }, args);

    return event;
  };
}
Enter fullscreen mode Exit fullscreen mode

defer

defer relies on the usefulness of lazy subscriptions in RxJS. In React, everything in a component is rendered eagerly, so the way to implement laziness is to change some state from false to true so some logic downstream can switch from not executing to executing in a render. If it's just computationally expensive, a ternary would work:

function DeferDemo() {
  const [enabled, setEnabled] = useState(false);
  const expensiveCalculation = enabled && Math.max(1, 2, 3);

  return (
    <button onClick={() => setEnabled(true)}>
      Enable? Result: {expensiveCalculation}
    </button>
  );
}
Enter fullscreen mode Exit fullscreen mode

Sometimes you want to defer a side-effect that occurs when subscribing, like a data fetch. You can create a hook that takes enabled as an option and escapes out of a useEffect if !enabled:

export function useData(enabled = true) {
  const [data, setData] = useState<any>();

  useEffect(() => {
    if (!enabled) return;
    fetchData().then(setData);
  }, [enabled]);

  return data;
}
Enter fullscreen mode Exit fullscreen mode

empty

I don't think this has a use in React.

from

I don't think this has a use in React. But just in case, this could be the implementation:

function useStates<T extends any[]>(states: T) {
  const [stateIdx, setStateIdx] = useState(0);

  useEffect(() => {
    if (stateIdx === states.length - 1) return;
    setStateIdx(stateIdx + 1);
  }, [stateIdx]);

  return states[stateIdx];
}
Enter fullscreen mode Exit fullscreen mode
function UseStatesDemo() {
  const state = useStates([1, 3, 5, 7, 9]);
  console.log('state', state);
  return <span>UseStatesDemo</span>;
}
Enter fullscreen mode Exit fullscreen mode

That demo immediately logs this:

state 1
state 3
state 5
state 7
state 9
Enter fullscreen mode Exit fullscreen mode

If you care about reacting to events downstream instead of only distinct states, you can modify the hook to wrap each in an event object like {payload: states[stateIdx]} or something.

Most likely you have a scenario where you should just calculate a state using reduce on an array.

fromEvent

This converts a DOM element into a stream of DOM events on that element. You can use setState to set event objects to state and react downstream, or you can call useReducer in the event handler in the JSX. See "Events vs state" above.

fromEventPattern

This is similar to fromEvent, but it's more general, so it's worth implementing with hooks:

type NodeEventHandler = (...args: any[]) => void;

function getUseEventPattern<T>(
  addHandler: (handler: NodeEventHandler) => any,
  removeHandler?: (handler: NodeEventHandler, signal?: any) => void,
  resultSelector?: (...args: any[]) => T
): T {
  const [state, setState] = useState<T>();

  useEffect(() => {
    if (!addHandler || !removeHandler) return;
    const handler = (...e: T[]) => {
      const val = e.length === 1 ? e[0] : e;
      const newState = resultSelector ? resultSelector(val) : val;
      setState(newState as T);
    };
    const retValue = addHandler(handler);
    return () => (removeHandler ? removeHandler(handler, retValue) : undefined);
  }, [addHandler, removeHandler, resultSelector]);

  return state;
}
Enter fullscreen mode Exit fullscreen mode

Demo:

function addClickHandler<Handler extends EventListener>(handler: Handler) {
  document.addEventListener('click', handler);
}

function removeClickHandler<Handler extends EventListener>(handler: Handler) {
  document.removeEventListener('click', handler);
}

function UseEventPatternDemo() {
  const event = getUseEventPattern<MouseEvent>(
    addClickHandler,
    removeClickHandler
  );

  return <div>{event?.clientX}</div>;
}
Enter fullscreen mode Exit fullscreen mode

This listens to document click events and prints out their clientX property.

generate

This is a simple utility that generates a stream of values. It doesn't have unique async logic so I am not going to convert it to hooks. See from.

interval

function useInterval(dt = 1000) {
  const [count, setCount] = useState<number>();

  useEffect(() => {
    const interval = setInterval(() => setCount((n = -1) => n + 1), dt);
    return () => clearInterval(interval);
  }, [dt]);

  return count;
}
Enter fullscreen mode Exit fullscreen mode

Demo:

function UseIntervalDemo() {
  const count = useInterval(2000);
  return <div>{count}</div>;
}
Enter fullscreen mode Exit fullscreen mode

of

This immediately returns a value, so you can just define something with const.

range

Another utility method for generating a stream of values. See from.

throwError

I think you can just throw 'some error'.

timer

function useTimer(wait = 1000, dt?: number) {
  const [count, setCount] = useState<number>();

  useEffect(() => {
    const timeout = count == null && setTimeout(() => setCount(0), wait);
    const interval =
      dt && count != null && setInterval(() => setCount((n) => n + 1), dt);
    return () => {
      if (timeout) clearTimeout(timeout);
      if (interval) clearInterval(interval);
    };
  }, [wait, dt, count]);

  return count;
}
Enter fullscreen mode Exit fullscreen mode

Demo:

function UseTimerDemo() {
  const count = useTimer(1000, 500);
  return <div>{count}</div>;
}
Enter fullscreen mode Exit fullscreen mode

iif

This observable creator could also be written as const obs$ = defer(() => condition ? observable1 : observable2);. Refer to defer.

Join Creation Operators

combineLatest

This reduces input streams into states, so it's very simple in React.

If you had this in RxJS:

const a$ = new BehaviorSubject(1);
const b$ = new BehaviorSubject(2);
const total$ = combineLatest(a$, b$).pipe(
  map(([a, b]) => a + b),
);
Enter fullscreen mode Exit fullscreen mode

It would just be this in React:

const [a, setA] = useState(1);
const [b, setB] = useState(2);
const total = a + b;
Enter fullscreen mode Exit fullscreen mode

concat

Let's convert this to hooks:

const result$ = concat(
  timer(2000),
  timer(1000),
  timer(3000),
);
Enter fullscreen mode Exit fullscreen mode

If we used 3 useTimers, they would all start at the same time. But with concat each observable needs to wait for the previous to complete. So, we need to implement an enabled option in whatever observable we want to convert to be part of our concat chain.

So we can modify useTimer to take in a 3rd option enabled = true, add it to the useEffect's dependency array and add if (!enabled) return; at the top of the useEffect.

Here is the demo:

function ConcatDemo() {
  const count1 = useTimer(2000);
  const count2 = useTimer(1000, null, count1 != null);
  const count3 = useTimer(3000, null, count2 != null);
  const result = [count3, count2, count1].findIndex(count => count != null);
  return <div>{result}</div>;
}
Enter fullscreen mode Exit fullscreen mode

First, count1, count2 and count3 are all undefined. Only the first useTimer sets a timeout. After 2 seconds, count1 receives the value of 0 and count1 != null evaluates to true so the 2nd timeout is kicked off. result started at -1 because all 3 were undefined at first, but now it's 2. After the 2nd timeout finishes, result becomes 1, then after 3 more seconds it ends up as 0. This is the same behavior as RxJS.

forkJoin

This operator us overused. It has the same behavior as combineLatest for http requests (observables that complete after 1st value), but developers just love to flex their RxJS knowledge and use a more limited operator for no reason at all. But occasionally data sources convert to streams of multiple values, and in these cases you actually need combineLatest.

Anyway, if someone provides a concrete example of forkJoin that the combineLatest example is insufficient for, I'll work on this.

merge

We need several useEffects that react to different inputs but all set the same output state:

function useMerge<S1, S2>(s1: S1, s2: S2) {
  const [state, setState] = useState<S1 | S2>();

  useEffect(() => {
    setState(s1);
  }, [s1]);

  useEffect(() => {
    setState(s2);
  }, [s2]);

  return state;
}
Enter fullscreen mode Exit fullscreen mode

Demo:

function UseMergeDemo() {
  const interval1 = useInterval(900);
  const interval2 = useInterval(1100);
  const interval = useMerge(interval1, interval2);
  return <div>{interval1} {interval2} {interval}</div>
}
Enter fullscreen mode Exit fullscreen mode

Partition

Not sure when you'd use this, but it's easy to implement:

function usePartition<T>(state: T, partition: (s: T) => boolean) {
  const [state1, setState1] = useState<T>();
  const [state2, setState2] = useState<T>();

  useEffect(() => {
    const setState = partition(state) ? setState1 : setState2;
    setState(state);
  }, [state]);

  return [state1, state2];
}
Enter fullscreen mode Exit fullscreen mode

Demo:

function UsePartitionDemo() {
  const interval = useInterval(1000);
  const [evens, odds] = usePartition(interval, n => !!(n % 2));
  return <div>{evens} {odds}</div>
}
Enter fullscreen mode Exit fullscreen mode

race

For this one, we'll start with two states, each undefined, and when the first one gets defined we'll update a third state to that value but then stop paying attention after that.

function useRace<S1, S2>(s1: S1, s2: S2) {
  const [state, setState] = useState<S1 | S2>();

  useEffect(() => {
    if (state === undefined && s1 !== undefined) setState(s1);
  }, [state, s1]);

  useEffect(() => {
    if (state === undefined && s2 !== undefined) setState(s2);
  }, [state, s2]);

  return state;
}
Enter fullscreen mode Exit fullscreen mode

Demo:

function UseRaceDemo() {
  const timer1 = useTimer(3000);
  const timer2 = useTimer(2500);
  const timer2Plus1 = timer2 !== undefined ? timer2 + 1 : timer2;
  const firstToChange = useRace(timer1, timer2Plus1)
  return (
  <div>
    <div>timer1: {timer1}</div>
    <div>timer2Plus1: {timer2Plus1}</div>
    <div>Race: {firstToChange}</div>
  </div>)
}
Enter fullscreen mode Exit fullscreen mode

zip

This is another operator I've never had a use for, but it seems fun to convert.

I don't like the repeated code in my implementation, but I'm not going to spend the time to clean it up.

function useZip<S1, S2>(s1: S1, s2: S2) {
  const [state, setState] = useState<[S1, S2][]>([]);

  useEffect(() => {
    if (s1 === undefined) return;
    setState((oldState) => {
      const missingS1Idx = oldState.findIndex(([oldS1]) => oldS1 === undefined);

      const [oldS1, oldS2] = oldState[missingS1Idx] || [];
      const bothUndefined = oldS2 === undefined;
      return bothUndefined
        ? [...oldState, [s1, undefined]]
        : [[s1, oldS2], ...oldState.slice(2)];
    });
  }, [s1]);

  useEffect(() => {
    if (s2 === undefined) return;
    setState((oldState) => {
      const missingS2Idx = oldState.findIndex(
        ([oldS1, oldS2]) => oldS2 === undefined
      );

      const [oldS1, oldS2] = oldState[missingS2Idx] || [];
      const bothUndefined = oldS1 === undefined;
      return bothUndefined
        ? [...oldState, [undefined, s2]]
        : [[oldS1, s2], ...oldState.slice(2)];
    });
  }, [s2]);

  return state[0];
}
Enter fullscreen mode Exit fullscreen mode

In my demo I will zip together 2 intervals of the same speed to avoid a memory leak, but stagger their start times.

function UseZipDemo() {
  const timer1 = useTimer(1000, 4000);
  const timer2 = useTimer(3000, 4000);
  const timer2Times10 = timer2 !== undefined ? timer2 * 10 : undefined;
  const zipped = useZip(timer1, timer2Times10);
  return (
    <div>
      <div>timer1: {timer1}</div>
      <div>timer2Times10: {timer2Times10}</div>
      <div>Zip: {JSON.stringify(zipped)?.replace(',', ', ')}</div>
    </div>
  );
}
Enter fullscreen mode Exit fullscreen mode

Transformation Operators

buffer

function useBuffer<T, V>(event: T, cutoffEvent: V) {
  const [{ lastBatch }, setState] = useState<{
    lastBatch: T[];
    buffer: T[];
  }>({ lastBatch: [], buffer: [] });

  useEffect(() => {
    if (event === undefined) return;
    setState((state) => ({
      ...state,
      buffer: state.buffer.concat(event),
    }));
  }, [event]);

  useEffect(() => {
    if (cutoffEvent === undefined) return;
    setState((state) => ({
      lastBatch: state.buffer,
      buffer: [],
    }));
  }, [cutoffEvent]);

  return lastBatch;
}
Enter fullscreen mode Exit fullscreen mode

Demo:

function UseBufferDemo() {
  const count = useInterval(700);
  const interval = useInterval(3000);
  const batch = useBuffer(count, interval);
  return (
    <div>
      <div>count: {count}</div>
      <div>interval: {interval}</div>
      <div>Batch: {JSON.stringify(batch)}</div>
    </div>
  );
}
Enter fullscreen mode Exit fullscreen mode

bufferCount

Similar to buffer:

function useBufferCount<T>(event: T, size: number) {
  const [{ lastBatch }, setState] = useState<{
    lastBatch: T[];
    buffer: T[];
  }>({ lastBatch: [], buffer: [] });

  useEffect(() => {
    if (event === undefined) return;
    setState((state) => {
      const full = state.buffer.length === size;
      return {
        lastBatch: full ? state.buffer : state.lastBatch,
        buffer: full ? [] : state.buffer.concat(event),
      };
    });
  }, [event]);

  return lastBatch;
}
Enter fullscreen mode Exit fullscreen mode

Demo:

function UseBufferCountDemo() {
  const count = useInterval(700);
  const batch = useBufferCount(count, 5);
  return (
    <div>
      <div>count: {count}</div>
      <div>Batch: {JSON.stringify(batch)}</div>
    </div>
  );
}
Enter fullscreen mode Exit fullscreen mode

bufferTime

Similar to buffer:

function useBufferTime<T>(event: T, time: number) {
  const interval = useInterval(time);
  const [{ lastBatch }, setState] = useState<{
    lastBatch: T[];
    buffer: T[];
  }>({ lastBatch: [], buffer: [] });

  useEffect(() => {
    if (event === undefined) return;
    setState((state) => ({
      ...state,
      buffer: state.buffer.concat(event),
    }));
  }, [event]);

  useEffect(() => {
    setState((state) => ({
      lastBatch: state.buffer,
      buffer: [],
    }));
  }, [interval]);

  return lastBatch;
}
Enter fullscreen mode Exit fullscreen mode

Demo:

function UseBufferTimeDemo() {
  const count = useInterval(700);
  const batch = useBufferTime(count, 3500);
  return (
    <div>
      <div>count: {count}</div>
      <div>Batch: {JSON.stringify(batch)}</div>
    </div>
  );
}
Enter fullscreen mode Exit fullscreen mode

bufferToggle

This operator takes in an observable and an observable factory:

const clicks = fromEvent(document, 'click');
const openings = interval(1000);
const buffered = clicks.pipe(bufferToggle(openings, i =>
  i % 2 ? interval(500) : EMPTY
));
buffered.subscribe(x => console.log(x));
Enter fullscreen mode Exit fullscreen mode

React hooks can't be dynamically defined like observables, but since the values emitted from the hooks are externally available, the logic of the observable factory can be defined outside as well if we use useBuffer. You might have to modify it with an enabled option.

Write a comment if you want an example. I haven't ever seen this operator used.

bufferWhen

Similar to bufferToggle.

concatMap

Let's convert this to hooks:

const result$ = interval(1000).pipe(
  concatMap(i => i < 10 ? timer(2700 - i * 300) : of(null)),
);
Enter fullscreen mode Exit fullscreen mode

This is going to be different from concat, because we cannot know how many inner observables will be created.

The inner observables will build up but they will all run and complete.

With concatMap, the outer observable might emit much more rapidly than the inner observable completes, so we need to have a queue of future inner observables as some state somewhere. Either we need to add a change to every other hook to keep track of a queue of inputs, or we need to keep this state independent and expose a function to shift queued values off when inner async tasks complete. This is what I will do.

Here is useQueue:

function useQueue<T>(value: T, enabled = true) {
  const [queue, setQueue] = useState<T[]>([]);
  const shiftQueue = useCallback(() => setQueue((q) => q.slice(1)), [setQueue]);

  useEffect(() => {
    if (value === undefined || !enabled) return;
    setQueue((s) => s.concat(value));
  }, [value]);

  return [queue, shiftQueue] as const;
}
Enter fullscreen mode Exit fullscreen mode

And here is the demo:

function UseConcatMapDemo() {
  const count = useInterval(700) % 30;
  const queueEnabled = count < 10;
  const [queue, shiftQueue] = useQueue(count, queueEnabled);
  useTimer(2700 - queue[0] * 300, undefined, !!queue.length, shiftQueue);
  return (
    <div>
      <div>count: {count}</div>
      <div>Queue: {JSON.stringify(queue)}</div>
    </div>
  );
}
Enter fullscreen mode Exit fullscreen mode

If you run this you will see queue build up for 10 entries, then empty as each timeout completes more quickly than the last.

After making this work, I think I need to implement an options parameter in each hook:

interface Options<T> {
  enabled?: boolean;
  onComplete?: () => void;
}
Enter fullscreen mode Exit fullscreen mode

Also, each inner observable needs to be able to output something related to the values that created it. Since the hooks are asynchronous, those values will no longer be available when it returns a value in the future. So those inputs need to be stored as state somehow to be accessed in the future.

Maybe each hook needs to work like React Query by storing return values by the keys that were used to create the requests. Or at least every observable creation operator.

Or maybe we should just use RxJS...

Top comments (0)