DEV Community

Cover image for Mastering JavaScript: Unleash the Power of Functional Reactive Programming with Higher-Order Streams
Aarav Joshi
Aarav Joshi

Posted on

Mastering JavaScript: Unleash the Power of Functional Reactive Programming with Higher-Order Streams

Functional Reactive Programming (FRP) with higher-order streams in JavaScript is a powerful approach to handling complex, time-based interactions in our code. It's a way of thinking about our programs as a series of data flows, rather than a sequence of imperative commands.

Let's start by understanding what streams are. In FRP, a stream is a sequence of values over time. It could be anything from mouse clicks to API responses. The magic happens when we start treating these streams as first-class citizens in our code.

Higher-order streams take this concept a step further. They're streams of streams, allowing us to model even more complex scenarios. Imagine a stream of user searches, where each search triggers a new stream of results. That's a higher-order stream in action.

I've found that one of the best ways to grasp these concepts is through practical examples. Let's dive into some code:

const { fromEvent } = rxjs;
const { map, switchMap } = rxjs.operators;

const searchInput = document.getElementById('search-input');
const searchButton = document.getElementById('search-button');

const searchStream = fromEvent(searchButton, 'click').pipe(
  map(() => searchInput.value),
  switchMap(query => fetchSearchResults(query))
);

searchStream.subscribe(results => {
  // Display results
});

function fetchSearchResults(query) {
  // Simulate API call
  return new Promise(resolve => {
    setTimeout(() => {
      resolve(`Results for ${query}`);
    }, 1000);
  });
}
Enter fullscreen mode Exit fullscreen mode

In this example, we're creating a stream of search queries. Each time the search button is clicked, we map the click event to the current value of the search input. Then, we use switchMap to create a new stream for each search query.

The beauty of this approach is how it handles rapid-fire events. If a user clicks the search button multiple times quickly, switchMap will cancel any in-progress searches and only give us the results of the latest query.

One of the key benefits of FRP is how it helps us manage complexity. By thinking in terms of streams, we can break down complex interactions into smaller, more manageable pieces.

Let's look at another example. Suppose we're building a collaborative document editor. We want to sync changes to the server, but we don't want to send every keystroke. We can use FRP to create a debounced stream of changes:

const { fromEvent } = rxjs;
const { debounceTime, map } = rxjs.operators;

const editor = document.getElementById('editor');

const changeStream = fromEvent(editor, 'input').pipe(
  debounceTime(300),
  map(event => event.target.value)
);

changeStream.subscribe(content => {
  sendToServer(content);
});

function sendToServer(content) {
  // Simulated server send
  console.log('Sending to server:', content);
}
Enter fullscreen mode Exit fullscreen mode

Here, we're creating a stream of input events, debouncing them by 300 milliseconds, and then mapping to the editor's content. This means we'll only send updates to the server if the user pauses typing for at least 300ms.

One of the challenges in FRP is managing shared state. The functional paradigm encourages us to avoid mutable state, but sometimes we need to keep track of things. Streams give us a way to do this cleanly:

const { BehaviorSubject } = rxjs;
const { scan } = rxjs.operators;

const initialState = { count: 0 };
const state$ = new BehaviorSubject(initialState);

const increment$ = new BehaviorSubject(1);
const decrement$ = new BehaviorSubject(-1);

const counter$ = state$.pipe(
  scan((state, change) => ({ count: state.count + change }), initialState)
);

increment$.subscribe(state$);
decrement$.subscribe(state$);

counter$.subscribe(state => console.log(state.count));

// Increment
increment$.next(1);
// Decrement
decrement$.next(-1);
Enter fullscreen mode Exit fullscreen mode

In this example, we're using a BehaviorSubject to represent our application state. We create separate streams for increment and decrement actions, and then use the scan operator to accumulate these changes into a new state.

This pattern gives us the benefits of immutable state updates while still allowing us to model our application as a series of streams.

One of the most powerful aspects of FRP is how it lets us compose complex behaviors from simple building blocks. Let's look at an example of how we might implement drag-and-drop functionality:

const { fromEvent, merge } = rxjs;
const { map, takeUntil, switchMap } = rxjs.operators;

const draggable = document.getElementById('draggable');

const mousedown$ = fromEvent(draggable, 'mousedown');
const mousemove$ = fromEvent(document, 'mousemove');
const mouseup$ = fromEvent(document, 'mouseup');

const drag$ = mousedown$.pipe(
  switchMap(start => {
    const startX = start.clientX - draggable.offsetLeft;
    const startY = start.clientY - draggable.offsetTop;

    return mousemove$.pipe(
      map(move => ({
        x: move.clientX - startX,
        y: move.clientY - startY
      })),
      takeUntil(mouseup$)
    );
  })
);

drag$.subscribe(pos => {
  draggable.style.left = `${pos.x}px`;
  draggable.style.top = `${pos.y}px`;
});
Enter fullscreen mode Exit fullscreen mode

Here, we're combining multiple event streams to create a higher-order stream that represents the drag operation. The switchMap operator lets us create a new stream for each drag, and takeUntil ensures that we stop tracking mouse movement when the user releases the mouse button.

One of the challenges in FRP is handling backpressure - what happens when our stream produces values faster than we can consume them? RxJS provides several strategies for this. Let's look at an example using the bufferTime operator:

const { interval } = rxjs;
const { bufferTime } = rxjs.operators;

const fastStream$ = interval(10); // Emits every 10ms

const bufferedStream$ = fastStream$.pipe(
  bufferTime(1000) // Collect values for 1 second
);

bufferedStream$.subscribe(buffer => {
  console.log(`Received ${buffer.length} values`);
});
Enter fullscreen mode Exit fullscreen mode

In this example, we're buffering values from a fast stream into arrays that we emit once per second. This can be useful for dealing with high-frequency events like mouse movements or sensor readings.

As we delve deeper into FRP, we often find ourselves wanting to create custom operators. RxJS makes this relatively straightforward:

const { Observable } = rxjs;

function customOperator() {
  return (source$) => {
    return new Observable(observer => {
      return source$.subscribe({
        next(value) {
          if (value % 2 === 0) {
            observer.next(value * 2);
          }
        },
        error(err) { observer.error(err); },
        complete() { observer.complete(); }
      });
    });
  };
}

const source$ = of(1, 2, 3, 4, 5);
const result$ = source$.pipe(customOperator());

result$.subscribe(x => console.log(x)); // Outputs: 4, 8
Enter fullscreen mode Exit fullscreen mode

This custom operator doubles even numbers and filters out odd numbers. Creating custom operators allows us to encapsulate complex stream manipulations and reuse them across our application.

One area where FRP really shines is in handling complex asynchronous operations. Let's look at an example of how we might implement a retry mechanism with exponential backoff:

const { of, throwError } = rxjs;
const { mergeMap, delay, retry } = rxjs.operators;

function fetchWithRetry(url) {
  return of(url).pipe(
    mergeMap(u => {
      // Simulate a failing API call
      return Math.random() < 0.5 ? throwError('API error') : of(`Response from ${u}`);
    }),
    retry({
      count: 3,
      delay: (error, retryCount) => {
        const delay = Math.pow(2, retryCount) * 1000;
        console.log(`Retrying in ${delay}ms`);
        return of(null).pipe(delay(delay));
      }
    })
  );
}

fetchWithRetry('https://api.example.com')
  .subscribe(
    response => console.log(response),
    error => console.error('Failed after 3 retries', error)
  );
Enter fullscreen mode Exit fullscreen mode

In this example, we're using the retry operator with a custom delay function that implements exponential backoff. This kind of complex async behavior becomes much more manageable when expressed as a stream.

As we build larger applications with FRP, we often need to manage multiple streams that interact with each other. The combineLatest operator is incredibly useful for this:

const { combineLatest, BehaviorSubject } = rxjs;

const userProfile$ = new BehaviorSubject({ name: 'John' });
const userPreferences$ = new BehaviorSubject({ theme: 'light' });
const currentRoute$ = new BehaviorSubject('/home');

const appState$ = combineLatest([
  userProfile$,
  userPreferences$,
  currentRoute$
]).pipe(
  map(([profile, preferences, route]) => ({
    profile,
    preferences,
    route
  }))
);

appState$.subscribe(state => {
  console.log('App state updated:', state);
});

// Update individual streams
userPreferences$.next({ theme: 'dark' });
currentRoute$.next('/settings');
Enter fullscreen mode Exit fullscreen mode

This pattern allows us to maintain separate streams for different aspects of our application state, while still being able to react to changes in the overall state.

One of the most powerful aspects of FRP is how it changes the way we think about our code. Instead of imperatively describing step-by-step what our program should do, we declaratively describe data flows and transformations. This often leads to code that's easier to reason about and test.

Speaking of testing, FRP can make our tests more robust and less brittle. Instead of relying on complex mocks and stubs, we can test our streams directly:

const { TestScheduler } = require('rxjs/testing');

describe('My Observable', () => {
  let testScheduler;

  beforeEach(() => {
    testScheduler = new TestScheduler((actual, expected) => {
      expect(actual).toEqual(expected);
    });
  });

  it('should filter even numbers', () => {
    testScheduler.run(({ cold, expectObservable }) => {
      const source$ = cold('a-b-c-d-e-|', { a: 1, b: 2, c: 3, d: 4, e: 5 });
      const expected = '---b---d-|';

      const result$ = source$.pipe(filter(x => x % 2 === 0));

      expectObservable(result$).toBe(expected, { b: 2, d: 4 });
    });
  });
});
Enter fullscreen mode Exit fullscreen mode

This example uses RxJS's TestScheduler to test a simple filtering operation. The beauty of this approach is that we can test complex asynchronous behavior in a synchronous, deterministic way.

As we've seen, FRP with higher-order streams offers a powerful toolkit for managing complexity in our JavaScript applications. It allows us to express complex, time-based interactions in a declarative way, leading to code that's often more maintainable and easier to reason about.

However, it's not a silver bullet. Like any paradigm, FRP has its learning curve and potential pitfalls. It's important to use it judiciously, and to understand when a more traditional imperative approach might be simpler.

As we continue to build increasingly complex, reactive systems, FRP provides us with a robust set of tools and patterns. By thinking in streams, we can create more resilient, responsive, and maintainable applications. Whether we're handling user input, managing application state, or orchestrating complex asynchronous operations, FRP gives us the power to express our intent clearly and concisely.

The journey into FRP can be challenging, but it's also incredibly rewarding. As we become more comfortable with these concepts, we'll find ourselves able to tackle problems that once seemed intractable. We'll write code that's more declarative, more composable, and ultimately, more powerful.

So let's embrace the stream. Let's think in flows and transformations. Let's build applications that are truly reactive, responding elegantly to the complex, ever-changing world of user interactions and data flows. With FRP and higher-order streams, we have the tools to create the next generation of responsive, resilient JavaScript applications.


Our Creations

Be sure to check out our creations:

Investor Central | Smart Living | Epochs & Echoes | Puzzling Mysteries | Hindutva | Elite Dev | JS Schools


We are on Medium

Tech Koala Insights | Epochs & Echoes World | Investor Central Medium | Puzzling Mysteries Medium | Science & Epochs Medium | Modern Hindutva

Top comments (7)

Collapse
 
alekseiberezkin profile image
Aleksei Berezkin

Stream libs are indeed useful, but for me the main blocker is their size. E.g. rxjs can take 70 kB (minified) of the bundle, which feels a lot, to be honest

Collapse
 
dariomannu profile image
Dario Mannu

RxJS is all tree shakeable, so if you only use Subject and map, it will discard everything else and will only add a couple of KB in total. Additional operators only add a few hundred bytes each, so it scales amazingly

Collapse
 
aaravjoshi profile image
Aarav Joshi

I agree. For a small project it is just a overhead.

Collapse
 
dariomannu profile image
Dario Mannu

I use it in small projects, as well, even in 5-liners. All projects grow, and by the time you use 2, 3, 4 operators you realise you'd write much more code by not using RxJS

Thread Thread
 
aaravjoshi profile image
Aarav Joshi

There is a saying "Once Addicted to RxJS, you are always addicted.." You prove it :haha

Thread Thread
 
dariomannu profile image
Dario Mannu

That could be said of Promises, Functions, any design patterns. Why would someone want to work without Promises and revert to Callbacks, for instance? Why drop Functions for... goto statements? Observables are a major evolutionary step in software development, there's no reason to go back.

Collapse
 
juniourrau profile image
Ravin Rau

Great post! I really like how you explained streams step by step - from basic ones to the more complex higher-order streams. The real-world examples were super helpful, especially how you showed the document editor and drag-and-drop feature. It's amazing how FRP makes handling complex timing and events so much easier.

For anyone just starting with FRP - don't worry if it seems hard at first! I found it helps to start small, like with the search example, before trying the trickier stuff. Once you play around with RxJS's basic operators, the more advanced features start making more sense.

By the way, I'm curious - what's your go-to method for handling shared state in bigger apps? The BehaviorSubject example was cool, but I'd love to know how you'd set things up in a larger project.