DEV Community

Billy Okeyo
Billy Okeyo

Posted on • Originally published at billyokeyo.com on

RXJS Observables

Reactive programming has gained significant popularity in the world of web development for its ability to handle asynchronous events efficiently. One of the key components in reactive programming is the concept of Observables, which play a crucial role in handling data streams and event handling. In this article, we will delve into the realm of RXJS Observables, exploring their fundamentals, creation, subscription mechanisms, transformation through operators, error handling strategies, distinctions between hot and cold observables, and best practices for managing subscriptions and preventing memory leaks. By the end of this article, you will have a solid understanding of how Observables work in RXJS and how to effectively utilize them in your applications.

Introduction to RXJS Observables

Think of RXJS Observables as your friendly neighborhood data streams. They are like event streams that you can listen to and react to whenever new data is emitted.

What are Observables?

Observables are like lazy loaded arrays that can emit multiple values over time. They can represent anything asynchronous - from clicks, mouse moves, HTTP requests, to ongoing data streams.

Key Concepts in Reactive Programming

Reactive programming is all about responding to changes. It’s like having a crystal ball to predict and react to the future states of your data, making your code more responsive and flexible.

  1. Subscription: Observables are lazy by nature, meaning nothing happens until you subscribe to them. Subscribing to an Observable initiates the execution of the code within it.
  2. Observer: Observers are the consumers of Observables. They are objects with three optional callbacks: next(), error(), and complete(), which correspond to the three types of events an Observable can emit.
  3. Operators: RxJS provides a rich set of operators that allow you to transform, filter, combine, and manipulate data streams emitted by Observables. Operators like map, filter, mergeMap, and debounceTime are just a few examples of what’s available.

Creating Observables

Creating Observables is like being a master chef in the kitchen of asynchronous data handling. You get to control how and when data is served.

Creating Observables from Scratch

Just like writing a recipe from scratch, you can create custom Observables to handle any type of asynchronous data source, tailoring it to your specific needs.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

import { Observable } from 'rxjs';

export class MyService {
  constructor() { }

  // Function to create and return an Observable
  createCustomObservable(): Observable<number> {
    // Create a new Observable using the Observable constructor
    return new Observable<number>(observer => {
      // Emit values asynchronously
      let count = 0;
      const interval = setInterval(() => {
        observer.next(count++);
      }, 1000);

      // Cleanup function to stop emitting values when unsubscribed
      return () => clearInterval(interval);
    });
  }
}

Enter fullscreen mode Exit fullscreen mode

Converting Promises to Observables

Ever had a promise that you wished behaved like an Observable? With RXJS, you can easily convert Promises into Observables and unlock their full reactive potential. RxJS provides a utility function called from that allows you to convert promises, iterables, or array-like objects into Observables. Here’s how you can do it:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

import { Observable, from } from 'rxjs';

export class MyService {
  constructor() { }

  // Function to convert a promise to an Observable
  convertPromiseToObservable(): Observable<any> {
    // Create a promise (for demonstration purposes)
    const myPromise = new Promise(resolve => {
      setTimeout(() => {
        resolve('Promise resolved');
      }, 2000);
    });

    // Convert the promise to an Observable
    return from(myPromise);
  }
}

Enter fullscreen mode Exit fullscreen mode

In this example, the convertPromiseToObservable function creates a new promise that resolves after 2 seconds. Then, it uses the from function to convert the promise to an Observable.

Subscribing to Observables

Subscribing to Observables is like getting front-row seats to the hottest data show in town. Buckle up and get ready to receive and react to the data emitted by your Observables.

When you subscribe to an Observable, you’re essentially saying, “I’m ready to listen to whatever you’ve got!” Get your data-catching net ready and start receiving those emitted values.

Here’s how you can subscribe to Observables in an Angular component:

  1. Import required dependencies: Make sure you import the necessary RxJS operators and Angular services.
  2. Create or obtain an Observable: You can create Observables using RxJS operators, Angular services, HTTP requests, or other sources.
  3. Subscribe to the Observable: Use the .subscribe() method to subscribe to the Observable. Provide one or more callback functions to handle emitted values, errors, and completion.
  4. Handle emitted values: Inside the next callback function, handle the values emitted by the Observable.
  5. Handle errors and completion: Optionally, provide error and complete callback functions to handle errors and the completion of the Observable, respectively.

Here’s an example demonstrating how to subscribe to an Observable in an Angular component:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

import { Component, OnInit } from '@angular/core';
import { Observable } from 'rxjs';
import { DataService } from './data.service'; // Assume DataService provides an Observable

@Component({
  selector: 'app-my-component',
  templateUrl: './my-component.component.html',
  styleUrls: ['./my-component.component.css']
})
export class MyComponent implements OnInit {
  data$: Observable<any>; // Observable to hold the data

  constructor(private dataService: DataService) { }

  ngOnInit(): void {
    // Assuming dataService.getData() returns an Observable
    this.data$ = this.dataService.getData();

    // Subscribe to the Observable
    this.data$.subscribe({
      next: data => {
        console.log('Received data:', data);
        // Handle the emitted data here, such as updating component properties
      },
      error: err => {
        console.error('An error occurred:', err);
        // Handle errors
      },
      complete: () => {
        console.log('Observable completed');
        // Perform cleanup or additional tasks when the Observable completes
      }
    });
  }
}

Enter fullscreen mode Exit fullscreen mode

In the example

  • We have an Angular component MyComponent that injects a DataService.
  • Inside the ngOnInit lifecycle hook, we call a method getData() from DataService that returns an Observable (data$).
  • We subscribe to this Observable using the .subscribe() method and provide callback functions for handling emitted data (next), errors (error), and completion (complete).
  • Inside the next callback function, we log the received data and perform any necessary actions.
  • Inside the error callback function, we log any errors that occur during the subscription.
  • Inside the complete callback function, we log a message indicating that the Observable has completed.

Unsubscribing and Cleanup

Just like cleaning up after a wild party, unsubscribing from Observables ensures that you free up resources and prevent memory leaks. It’s the responsible thing to do in the world of reactive programming.

Here’s how you can unsubscribe from Observables in an Angular component:

  1. Store the subscription: Assign the subscription returned by the subscribe() method to a class property.
  2. Unsubscribe in the ngOnDestroy lifecycle hook: Implement the ngOnDestroy() lifecycle hook to unsubscribe from the Observable when the component is destroyed.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43

import { Component, OnInit, OnDestroy } from '@angular/core';
import { Observable, Subscription } from 'rxjs';
import { DataService } from './data.service'; // Assume DataService provides an Observable

@Component({
  selector: 'app-my-component',
  templateUrl: './my-component.component.html',
  styleUrls: ['./my-component.component.css']
})
export class MyComponent implements OnInit, OnDestroy {
  data$: Observable<any>; // Observable to hold the data
  private subscription: Subscription; // Subscription to manage

  constructor(private dataService: DataService) { }

  ngOnInit(): void {
    // Assuming dataService.getData() returns an Observable
    this.data$ = this.dataService.getData();

    // Subscribe to the Observable and store the subscription
    this.subscription = this.data$.subscribe({
      next: data => {
        console.log('Received data:', data);
        // Handle the emitted data here, such as updating component properties
      },
      error: err => {
        console.error('An error occurred:', err);
        // Handle errors
      },
      complete: () => {
        console.log('Observable completed');
        // Perform cleanup or additional tasks when the Observable completes
      }
    });
  }

  ngOnDestroy(): void {
    // Unsubscribe from the Observable when the component is destroyed
    if (this.subscription) {
      this.subscription.unsubscribe();
    }
  }
}

Enter fullscreen mode Exit fullscreen mode

The example here is similar to the one we used when subsribing to an event but we have added a few things:

  • We’ve added a private property subscription to store the subscription returned by the subscribe() method.
  • In the ngOnDestroy() lifecycle hook, we check if subscription exists and call unsubscribe() on it to unsubscribe from the Observable when the component is destroyed. This ensures that the subscription is cleaned up properly, preventing memory leaks and unnecessary processing.

Operators and Transformation of Observables

Operators are like the secret sauce that takes your Observables to the next level. They allow you to transform, filter, combine, and manipulate data streams like a pro. From mapping to filtering to reducing, there’s an operator for every data manipulation need.

Commonly Used Operators

Get ready to level up your reactive programming game with commonly used operators like map, filter, mergeMap, and debounceTime. These operators are the bread and butter of transforming your data streams with ease.

RxJS provides a wide range of operators that allow you to manipulate, transform, filter, combine, and control the flow of data emitted by Observables. Here are some commonly used RxJS operators:

  1. map : Transforms each value emitted by the source Observable using a given project function.

1
2
3
4
5

import { map } from 'rxjs/operators';

source$.pipe(
  map(value => value * 2)
);

Enter fullscreen mode Exit fullscreen mode
  1. filter : Filters values emitted by the source Observable based on a predicate function.

1
2
3
4
5

import { filter } from 'rxjs/operators';

source$.pipe(
  filter(value => value > 5)
);

Enter fullscreen mode Exit fullscreen mode
  1. take : Emits only the first n values emitted by the source Observable.

1
2
3
4
5

import { take } from 'rxjs/operators';

source$.pipe(
  take(5)
);

Enter fullscreen mode Exit fullscreen mode
  1. tap (formerly do): Performs side effects for each emission on the source Observable without affecting the emitted values.

1
2
3
4
5

import { tap } from 'rxjs/operators';

source$.pipe(
  tap(value => console.log('Received value:', value))
);

Enter fullscreen mode Exit fullscreen mode
  1. mergeMap (formerly flatMap): Projects each source value to an Observable and flattens the resulting Observables into one Observable.

1
2
3
4
5

import { mergeMap } from 'rxjs/operators';

source$.pipe(
  mergeMap(value => fetchDataFromAPI(value))
);

Enter fullscreen mode Exit fullscreen mode
  1. combineLatest : Combines multiple Observables to emit an array of the most recent values from each Observable whenever any of the source Observables emit.

1
2
3

import { combineLatest } from 'rxjs';

combineLatest(observable1$, observable2$, observable3$);

Enter fullscreen mode Exit fullscreen mode
  1. concat : Concatenates multiple Observables sequentially, emitting values from each source Observable one after the other.

1
2
3

import { concat } from 'rxjs';

concat(observable1$, observable2$, observable3$);

Enter fullscreen mode Exit fullscreen mode
  1. debounceTime : Emits a value from the source Observable only after a specified duration has passed without any other value being emitted.

1
2
3
4
5

import { debounceTime } from 'rxjs/operators';

source$.pipe(
  debounceTime(1000)
);

Enter fullscreen mode Exit fullscreen mode
  1. distinctUntilChanged : Emits values from the source Observable only if they are different from the previous value emitted.

1
2
3
4
5

import { distinctUntilChanged } from 'rxjs/operators';

source$.pipe(
  distinctUntilChanged()
);

Enter fullscreen mode Exit fullscreen mode
  1. retry : Re-subscribes to the source Observable a specified number of times when an error occurs.

1
2
3
4
5

import { retry } from 'rxjs/operators';

source$.pipe(
  retry(3)
);

Enter fullscreen mode Exit fullscreen mode

These are just a few of the many operators available in RxJS. Each operator provides powerful capabilities for handling asynchronous data streams effectively in your Angular applications. Familiarizing yourself with these operators and understanding how to use them can greatly enhance your ability to work with Observables in Angular.

Error Handling in Observables

Handling errors in RxJS observables is an essential aspect of reactive programming. When it comes to error handling in observables, there are various strategies you can employ to gracefully manage errors that may occur during the data stream processing.

Error Handling Strategies

One common approach is to use the catchError operator to intercept errors emitted by the observable and handle them in a controlled manner. This operator allows you to catch errors, perform necessary actions, and either recover from the error or gracefully propagate it downstream.

Another strategy is to use the retry operator, which resubscribes to the source observable when an error occurs, allowing for automatic retry attempts. This can be useful in scenarios where transient errors are expected and can be resolved with a retry mechanism.

Handling Errors in Observable Chains

When working with complex observable chains, it’s crucial to handle errors at different stages of the stream. You can strategically place error handling operators like catchError or retry at specific points in the chain to address errors effectively and ensure the smooth processing of data.

Hot vs Cold Observables

Understanding the distinction between hot and cold observables is fundamental to mastering RxJS observables. These two types of observables behave differently in terms of data emission and subscription handling.

Hot Observables

Hot observables emit data regardless of whether there are active subscriptions. Subscribers receive the data stream as it happens, and late subscribers may miss out on previously emitted values. Examples of hot observables include events from user interactions or web sockets.

Cold Observables

Cold observables, on the other hand, begin emitting data only when a subscriber triggers the subscription. Each subscriber receives the full data stream independently, ensuring that all values are delivered to every subscriber. Examples of cold observables include HTTP requests or timer-based observables.

Managing Subscriptions and Memory Leaks

Properly managing subscriptions in RxJS is crucial to prevent memory leaks and ensure optimal performance of your reactive applications. By following best practices and implementing preventive measures, you can effectively handle subscriptions and mitigate the risk of memory leaks.

Best Practices for Managing Subscriptions

One essential practice is to unsubscribe from observables when they are no longer needed to release allocated resources and prevent memory leaks. You can use operators like takeUntil or unsubscribe methods to manage the lifecycle of subscriptions and clean up resources appropriately.

Preventing Memory Leaks

Memory leaks can occur when subscriptions are not properly disposed of, leading to a buildup of unused resources over time. To prevent memory leaks, make sure to unsubscribe from observables when components are destroyed, use operators like takeUntil to automate unsubscription, and implement cleanup logic in ngOnDestroy hooks or teardown functions.In conclusion, mastering RXJS Observables opens up a world of possibilities for building reactive and efficient applications. By understanding the core concepts and best practices outlined in this article, you are equipped to leverage Observables effectively in your projects. Whether you are handling real-time data streams, asynchronous operations, or event-driven scenarios, RXJS Observables provide a powerful toolset to streamline your development process and enhance the responsiveness of your applications. Embrace the reactive paradigm and elevate your programming skills with the versatile capabilities of RXJS Observables.

Top comments (0)