DEV Community

Cree
Cree

Posted on • Edited on

Intro to RxJS Concepts with Vanilla JavaScript

A webinar recently inspired me that egghead.io hosted with Andre Staltz and I wanted to share what I learned. Before the webinar, I was unfamiliar with RxJS and it was the first time I was exposed to the observer pattern. Before it was broken down, Observers seemed like magic.

JavaScript has multiple APIs that use callback functions that all do nearly the same thing with slight variations.

Streams

stream.on('data', data => {
   console.log(data)
})
stream.on('end', () => {
   console.log("Finished")
})
stream.on('error', err => {
   console.error(err)
})
Enter fullscreen mode Exit fullscreen mode

Promises

somePromise()
  .then(data => console.log(data))
  .catch(err => console.error(err))
Enter fullscreen mode Exit fullscreen mode

Event Listeners

document.addEventListener('click', event => {
  console.log(event.clientX)
})
Enter fullscreen mode Exit fullscreen mode

The rough pattern you see is that there is an object, and inside the object, you have some method that takes a function, in other words, a callback. They're all solving the same problem, but in different ways, this causes you to have to carry the mental overhead of remembering the specific syntax for each of these APIs. That's where RxJS comes in. RxJS unifies all of this under one common abstraction.

So what even is an observable? It's an abstraction in the same way that arrays, functions, or objects are all abstractions. A promise can either resolve or reject, giving you back one value. An observable is capable of emitting values over time. You could consume streams of data from a server or listen for DOM events.

πŸ’€ Observable Skeleton

const observable = {
  subscribe: observer => {

  },
  pipe: operator => {

  },
}
Enter fullscreen mode Exit fullscreen mode

Observables are just objects that contain a subscribe and pipe method. Wait, what's going on here? What's an observer, or an operator? Observers are just objects that contain the callback methods for next, error, and complete. The subscribe method consumes an observer and passes values to it. So observable is acting as a producer, and the observer is its consumer.

πŸ‘€ An Observer

const observer = {
  next: x => {
    console.log(x)
  },
  error: err => {
    console.log(err)
  },
  complete: () => {
    console.log("done")
  }
}
Enter fullscreen mode Exit fullscreen mode

Inside of that subscribe method you pass some form of data to the observer's methods.

Subscribe Method

const observable = {
  subscribe: observer => {
    document.addEventListener("click", event => {
      observer.next(event.clientX)
    })
  },
  pipe: operator => {

  },
}
Enter fullscreen mode Exit fullscreen mode

Here we are just listening for clicks made anywhere in the document. If we ran this and made a call to observable.subscribe(observer), we would see the x coordinates of your clicks showing up in the console. So what about this pipe method? The pipe method consumes an operator and returns a function, and makes a call to the resulting function with the observable.

Pipe Method

const observable = {
  subscribe: observer => {
    document.addEventListener("click", event => {
      observer.next(event.clientX)
    })
  },
  pipe: operator => {
    return operator(this)
  },
}
Enter fullscreen mode Exit fullscreen mode

Cool but what's an operator? Operators are for transforming your data. Arrays have operators, like map. map lets you take a step back and run some function over everything in the array. You could have an array and then another array that is a mapped version of the first.

Let's write a map function for our observable.

πŸ—ΊοΈ Map Operator

const map = f => {
  return observable => {
    subscribe: observer => {
      observable.subscribe({
        next: x => {
          observer.next(f(x))
        },
        error: err => {
          console.error(err)
        },
        complete: () => {
          console.log("finished")
        }
      })
    },
    pipe: operator => {
      return operator(this)
    },
  }
}
Enter fullscreen mode Exit fullscreen mode

A lot is going on here so let's break it down.

const map = f => {
  return observable => {
Enter fullscreen mode Exit fullscreen mode

Here we are passing in a function and returning a function that expects an observable. Remember our pipe method?

pipe: operator => {
  return operator(this)
},
Enter fullscreen mode Exit fullscreen mode

To run the operator on the observable, it needs to get passed into pipe. pipe is going to pass the observable it's called on into the function that our operator returns.

subscribe: observer => {
  observable.subscribe({
Enter fullscreen mode Exit fullscreen mode

Next, we are defining the subscribe method for the observable that we are returning. It expects an observer, which it receives in the future when .subscribe gets called on the returned observable, either through another operator or explicitly. Then, a call gets made to observable.subscribe with an observer object.

{
  next: x => {
    observer.next(f(x))
  },
  error: err => {
    console.error(err)
  },
  complete: () => {
    console.log("finished")
  }
}
Enter fullscreen mode Exit fullscreen mode

In the observer's next method you can see that a call to a future observer's next is made with the function that we originally passed into map and an x value passed into next. Let's run our new map operator on our observable!

observable
  .pipe(map(e => e.clientX))
  .pipe(map(x => x - 1000))
  .subscribe(observer)
Enter fullscreen mode Exit fullscreen mode

That final subscribe is needed or none of the operations inside of those operators execute, that's because they are all wrapped up in their observer's subscribe methods. In those subscribe methods is a call to subscribe the previous observer in the chain, but the chain has to begin somewhere.

So let's follow what happens when this runs.

  1. The first pipe gets called on observable, map gets curried with this
  2. map is called with e => e.clientX and it returns a function
  3. The function gets called with the original observable and an observable gets returned
    1. We'll call it observable2
  4. pipe is called on observable2 and curries map with this
  5. map is called with x => x - 1000 and it returns a function
  6. That function gets called with observable2 and an observable gets returned
    1. We'll call it observable3
  7. .subscribe gets called on observable3 with an observer passed in
  8. .subscribe gets called on observable2 with the operator's observer passed in
  9. .subscribe is called on the original observable with the operator's observer passed in
  10. A click event happens with a clientX of 100
  11. observer2.next(100) gets called
  12. observer3.next(100) gets called
  13. observer.next(-900) gets called and logs -900 to the console.
  14. Done!

You can see the chain happen here. When you call subscribe you are asking for information, each link asks the previous link in the chain for it until it reaches the data and the next method from its observer gets called. That data then rises back up the chain, getting transformed along the way, until it then reaches the final observer.

Here is the code in its entirety.

const observable = {
  subscribe: observer => {
    document.addEventListener("click", event => {
      observer.next(event.clientX)
    })
  },
  pipe: operator => {
    return operator(this)
  }
}

const observer = {
  next: x => {
    console.log(x)
  },
  error: err => {
    console.log(err)
  },
  complete: () => {
    console.log("done")
  }
}

const map = f => {
  return observable => {
    subscribe: observer => {
      observable.subscribe({
        next: x => {
          observer.next(f(x))
        },
        error: err => {
          console.error(err)
        },
        complete: () => {
          console.log("finished")
        }
      })
    },
    pipe: operator => {
      return operator(this)
    },
  }
}
Enter fullscreen mode Exit fullscreen mode

Top comments (8)

Collapse
 
christianschulze profile image
Christian Schulze

I didn't find the list of steps useful, but having code for observable, observer, operator (map) and the use case fitting on a page allowed me to finally grok how RXJS works on a fundamental level.

Choosing map as the operator really resonated with me!

Thanks for sharing πŸ‘

Collapse
 
kishmu profile image
kishmu

Thanks for the article. It is well written.

One comment: Arrow function may not work when it uses 'this'.

pipe: operator => {
    return operator(this)
  }

Changing it to function works

pipe: function(operator) {
    return operator(this)
  }

Another fix in the line to pass the event instead of event.clientX

const observable = {
  subscribe: observer => {
    document.addEventListener('click', event => {
      observer.next(event);
    });
  },
Collapse
 
creeland profile image
Cree

Good catches! I forgot about the scope differences between => and function

Collapse
 
gitvitor profile image
vitor

Congrats for the article.
U explain in a easy way what's producer and consumer, I never had think like ur example haha

But about explain a map operator (or any operator) I really think is massive to rxjs begginers

Collapse
 
mandrewdarts profile image
M. Andrew Darts

This is great! Really appreciate the breakdown πŸ‘

Collapse
 
babak2000ir profile image
Babak

Good job, fixed few things, here is the working code:

const observer = {
next: x => {
console.log(x)
},
error: err => {
console.log(err)
},
complete: () => {
console.log("done")
}
}

const observable = {
subscribe: observer => {
document.addEventListener("click", event => {
observer.next(event.clientX)
})
},
pipe: function (operator) {
return operator(this)
}
}

mostly here, correct pattern is return and return! ->
const map = function (f) {
return observable => {
return {
subscribe: observer => {
observable.subscribe({
next: x => {
observer.next(f(x))
},
error: err => {
console.error(err)
},
complete: () => {
console.log("finished")
}
})
},
pipe: function (operator) {
return operator(this)
},
}
}
}

//observable.subscribe(observer);
observable
.pipe(map(e => e.clientX))
.pipe(map(x => x - 1000))
.subscribe(observer)

aworking example for node.js using std-input is here too:

github.com/babak2000ir/rxjs-simula...

Collapse
 
amit777 profile image
amit777

Best explanation I’ve read on the web. Thanks! It would be awesome if you had another example of a slightly more complicated operator than map also.

Collapse
 
pantsme profile image
Antonio Savage

yep. Still have a lot to learn.

I thought I followed up until you said you watched a webinar and then I lost you. That's on me tho not your article.