DEV Community

Cover image for Polling using RxJS
Hendrik
Hendrik

Posted on • Updated on

Polling using RxJS

As observables are gaining more and more popularity in JavaScript we are looking to accomplish our everyday tasks using them and evaluating whether they are really worth all the hype. One task you might find yourself doing is polling the backend to know whether a longer running task has completed.

We will walk through an example of such a scenario and implement a solution using RxJS. On our way we will learn some basic operators for RxJS, and a few techniques as well as how to avoid a pitfall or two. At the end I will present a real-world example to show you how to implement what we learned in a specific scenario.

You should bring a basic understanding of Streams / Observables as well as solid foundations in JavaScript to enjoy this post. For the rest of this post I will treat Stream and Observable as interchangeable words for the same thing. While we will cover a lot of basic things they will mostly be RxJS specifics and less the basics about Streams. Should you be looking for a general introduction consider the gist title “The introduction to Reactive Programming you’ve been missing”.

Code for this post was tested using RxJS 6.2.0.

Scenario

Lets say we have a backend that exposes an endpoint /tasks/[taskId] which you can query to learn about the status of a specific task. It’s returning an object like such:

{
  // Whether the task is still running
  processing: boolean;
  // A unique ID for this task
  taskId: string;
}
Enter fullscreen mode Exit fullscreen mode

Once we start polling we want to get the current state of this task twice a second and stop polling once processing === false.

Programmatic solution

To start out we are going to look at a programmatic solution for this problem.

    async pollUntilTaskFinished(taskId) {
      const fetchResponse = await fetch(`/tasks/${taskId}`)
      const responseObject = await fetchResponse.json()
      if (responseObject.processing) {
        setTimeout(() => pollUntilTaskFinished(taskId), 500)
      } else {
        pollingFinishedFor(taskId)
      }
    }
Enter fullscreen mode Exit fullscreen mode

Here we simply invoke a new timeout every time the backend is still processing.

Using RxJS

Now we are going to achieve the same behavior using RxJS.

First of all we need something to emit an event every x time. RxJS provides two function for this:

  • interval

  • timer

While interval emits the first event after a given time and then continuously with the same interval, timer starts after a given time to emit events every x time. For our two updates per second we can start by using timer(0, 500). This will start firing events right of the bat and after that twice a second.

Let’s first see that in action by logging something to the console.

    import { timer } from 'rxjs'

    timer(0, 500)
      .subscribe(() => console.log('polling'))
Enter fullscreen mode Exit fullscreen mode

You should see your console print “polling” twice a second now.

Take care to import from the correct package (rxjs or rxjs/operators). Sadly RxJS documentation might not be up to speed with the version you are using.

Next we want to turns these “ticks” into requests to our backend. We are going to use the same fetch from above but this time turn the promise into an Observable. Luckily RxJS provides convenient functions for this, namely from. Using this we can now create an Observable (or stream) representing a request to the backend on every tick and continue working with that.

    import { timer, from } from 'rxjs'
    import { map } from 'rxjs/operators'

    timer(0, 500)
      .pipe(from(fetch(`/tasks/${taskId}`)).pipe(map(response => response.json())))
Enter fullscreen mode Exit fullscreen mode

.pipe is RxJS’s way to specify that a transform will now happen on the stream. By extracting operators into their own imports RxJS enables better treeshaking than an overloaded Observable implementation ever could, see this explanation for more context.

Pipe is RxJS’s wrapper around transforms that are applied to a Stream of event.

The result of this will be a stream of streams. Each emitted value will itself be an observable. To manage the mayhem we can pipe it through concatMap which will flatten all the Streams into a single one containing the nested values.

    import { timer, from } from 'rxjs'
    import { map, concatMap } from 'rxjs/operators'

    timer(0, 500)
      .pipe(concatMap(() => from(fetch(`/tasks/${taskId}`))
        .pipe(map(response => response.json())))
      )
Enter fullscreen mode Exit fullscreen mode

Finish polling

Finally we really care about getting an event that tells us the backend finished processing, that our polling is done. We can achieve this by filtering for events where the backend is no longer processing and only taking the first one of those. By using take(1) we specify that we only care about a single (the first) event telling us that processing is finished. This will stop our polling once the backend is done processing the task.

    import { timer, from } from 'rxjs'
    import { map, concatMap, filter, take } from 'rxjs/operators'

    timer(0, 500)
      .pipe(concatMap(() => from(fetch(`/tasks/${taskId}`))
        .pipe(map(response => response.json())))
      )
      .pipe(filter(backendData => backendData.processing === false))
      .pipe(take(1))
Enter fullscreen mode Exit fullscreen mode

Putting it all together

Now it’s time to put it all together and replace our function from up above using the new RxJS-based code. The final touch is to use subscribe at the end of our Stream to work with the single event our Stream emits.

    import { timer, from } from 'rxjs'
    import { map, concatMap, filter, take } from 'rxjs/operators'

    pollUntilTaskFinished(taskId) {
      timer(0, 500)
        .pipe(concatMap(() => from(fetch(`/tasks/${taskId}`))
          .pipe(map(response => response.json())))
        )
        .pipe(filter(backendData => backendData.processing === false))
        .pipe(take(1))
        .subscribe(() => pollingFinishedFor(taskId))
    }
Enter fullscreen mode Exit fullscreen mode

You might not want to call a function once you are done though but use the output of your Observable to render your UI. Through the use of merge, which merges two streams together we can map our polling onto two states and use the output directly for our UI.

To achieve this we will merge our stream from above together with an initial value that we turn into a Stream using of.

    import { timer, from, merge, of } from 'rxjs'
    import { map, concatMap, filter, take } from 'rxjs/operators'

    const loadingEmoji = merge(
      of(true),
      timer(0, 500)
        .pipe(concatMap(() => from(fetch(`/tasks/${taskId}`))
          .pipe(map(response => response.json())))
        )
        .pipe(filter(backendData => backendData.processing === false))
      )
        .pipe(take(2))
        .pipe(map(processing => processing ? '' : ''));
Enter fullscreen mode Exit fullscreen mode

After we map the response from our backend onto the processing attribute using map, we can in turn map the result onto an emoji to display to our users.

A real world example

Theory is always nice but the real world usually poses a different challenge from a nicely written and contained tutorial. Let me present you with the solution to a problem we faced when building our knowledge about polling using RxJS.

The situation: We have an Angular application for which we use NGXS as a state manager. Similar to Redux it uses Actions to represent events changing the state.

As it turns out NGXS provides a stream of all Actions dispatched as an Observable we can hook into. Here is our final solution to poll the backend for processing states for each Document *that *gets added to the state and update the state once the backend is done processing.

    .actions$
      .pipe(ofActionSuccessful(AddDocument))
      .pipe(filter((action: AddDocument) => action.document.externalProcessingState === environment.documentStates.processing))
      .pipe(map((action: AddDocument) => action.document))
      .pipe(mergeMap((document: Document) => timer(environment.polling.startingOffset, environment.polling.interval)
         // Here we want a new stream per document add.
        .pipe(concatMap(() => from(this.backend.fetch(`/documents/${document.uuid}`))))
        .pipe(concatMap(response => from(response.json())))
        .pipe(filter((polledDocument: Document) => polledDocument.externalProcessingState !== environment.documentStates.processing))
        .pipe(take(1)))
      )
      .subscribe((polledDocument: Document) => {
                    this.store.dispatch(new AddDocument(polledDocument));
      });
Enter fullscreen mode Exit fullscreen mode

A few notes:

  • environment is an Angular environment providing configuration for our application.

  • backend is a Service providing connection to our backend. It inserts a few required headers and such.

  • This uses TypeScript so polledDocument: Document describes a variable named “polledDocument” which follows the type “Document”.

A tricky thing here is that we need to create a new “polling Stream” per document getting added to our state. At first we tried pulling logic into a single level but that ended with us only being able to poll for a single document per page load as take(1) would block the Stream for all future pollings.

Wrapping up

Today we built our first polling logic using RxJS learning about this great library along the way. We also took a look at a real world example and saw how expressive it can make our code.

Now, go out and apply your newfound knowledge.


Other great resources

https://blog.strongbrew.io/rxjs-polling/

https://www.sitepoint.com/angular-rxjs-create-api-service-rest-backend/

https://www.learnrxjs.io/recipes/http-polling.html

Originally published on makeitnew.io on August 30 2018.

Top comments (7)

Collapse
 
qm3ster profile image
Mihail Malo

Seems dangerous. What if our connection has high latency, and it takes us more than 500ms to get a response? This way we would start more and more requests.

Collapse
 
hoverbaum profile image
Hendrik

Also quite valid. To get around this you could use switchMap learnrxjs.io/operators/transformat....

It cancels running streams and prevents sending to many requests.

Collapse
 
qm3ster profile image
Mihail Malo

Seems like exactly the right tool.
Do you know of an example that uses real fetch cancellation together with switchMap?

Thread Thread
 
hoverbaum profile image
Hendrik

Sadly not. We haven't checked into it since the approach from this post hasn't created any trouble for us, yet.

Collapse
 
pappa profile image
Pappa

If all requests took longer than 500ms, wouldn't this approach just keep cancelling slow requests every 500ms, and never receive a response?

Collapse
 
qm3ster profile image
Mihail Malo • Edited

(in the first, programmatic code example) But what if you get back an object saying {errors: ['lololo']}? You'd be in right trouble then, with your if (responseObject.processing)!

Collapse
 
hoverbaum profile image
Hendrik

Valid point!

You should always deal with errors scenarios. In this case however I decided to focus on the points essential for the post.

Here an error object coming back from the server would trigger pollingFinishedFor and in there we could handle an error. It's probably even wanted behavior to stop polling should we get a processing error back from the server. Maybe pollingFinishedFor would get the item from teh backend again and then do something based on the return value.