In this article we will cover RxJS schedulers. Many RxJS users haven't heard of them or don't know about their use cases.
A
Scheduler
lets you define in what execution context will anObservable
deliver notifications to itsObserver
.
(C) RxJS Documentation
In other words, a scheduler manages the execution order and time of an Observable
operation. Let's take a look at an example.
import { of } from 'rxjs';
console.log('Start');
of('Stream').subscribe(console.log);
console.log('End');
/**
* Logs:
* Start
* Stream
* End
*/
As we can see the stream code executes synchronously. In other words, the stream code executes in synchronous execution context.
How many execution contexts are there?
In browser code executes in following order:
- The synchronous code is executed first (call stack).
- Then the microtask queue commands are executed (
Promise
). - After that the macrotask queue commands are executed (
setTimeout
,setInterval
,XMLHttpRequest
, ...). - In the end there is a queue of calls which are executed right before the next cycle of rerendering (
requestAnimationFrame
).
For each of the points above there is a scheduler in RxJS:
-
queueScheduler
- synchronous -
asapScheduler
- microtask -
asyncScheduler
- macrotask -
animationFrameScheduler
- animation frame
How to schedule?
- To schedule in what execution context observable values will be emitted use the
observeOn
operator. - To schedule in what execution context the
subscribe()
call happen use thesubscribeOn
operator. By default, asubscribe()
call on anObservable
happens synchronously.
In other words, the observeOn
operator plans in what execution context Observable.next()
, Observable.error()
, Observable.complete()
methods will execute, and the subscribeOn
operator affects the Subscriber
, so the subscribe()
call will execute in another context.
We can confirm the execution order of the same code using different schedulers.
import {
animationFrameScheduler,
asapScheduler,
asyncScheduler,
merge,
of,
queueScheduler
} from 'rxjs';
import {observeOn} from 'rxjs/operators';
const queue$ = of('queueScheduler').pipe(observeOn(queueScheduler));
const asap$ = of('asapScheduler').pipe(observeOn(asapScheduler));
const asynch$ = of('asyncScheduler').pipe(observeOn(asyncScheduler));
const animationFrame$ = of('animationFrameScheduler').pipe(
observeOn(animationFrameScheduler)
);
merge(
queue$,
asap$,
asynch$,
animationFrame$
).subscribe(console.log);
console.log('synchronous code');
/**
* Logs:
* queueScheduler
* synchronous code
* asapScheduler
* asyncScheduler
* animationFrameScheduler
*/
observeOn
and subscribeOn
operators take delay
as second argument. By default its value is 0
.
Note: for any provided non-zero
delay
andscheduler
,asyncScheduler
will be used.
Before RxJS version 6.5.0 we could provide a scheduler to several creational operators such as of
, from
, merge
. In newer versions this behavior is deprecated. Now there is a function called scheduled
instead.
import {asyncScheduler, of, scheduled} from 'rxjs';
/**
* Deprecated:
* of('async', asyncScheduler).subscribe(console.log);
*/
scheduled(of('async'), asyncScheduler).subscribe(console.log);
Scheduler use cases
Cached observables
Suppose we have an Angular service MovieService
. It has a method which gets a movie by ID and caches the result. We would implement it as follows:
@Injectable({providedIn: 'root'})
export class MovieService {
private cache: Map<number, Movie> = new Map<number, Movie>();
constructor(private readonly http: HttpClient) {}
// ...
public getById(id: number): Observable<Movie> {
if (this.cache.has(id)) {
return of(this.cache.get(id));
}
return this.http.get<Movie>(...).pipe(
tap((movie: Movie) => this.cache.set(id, movie))
);
}
This implementation works, but there is a catch. After calling this method with a specific ID for the first time the result will arrive asynchronously, but after calling it again the result will arrive synchronously. If you are interested why is this bad check out this article:
Intentionally unleashing Zalgo with synchronous promises | by Daniel Brain | Medium
Daniel Brain γ» γ»
bluepnume.Medium
The issue is easily fixed with asyncScheduler
:
@Injectable({providedIn: 'root'})
export class MovieService {
private cache: Map<number, Movie> = new Map<number, Movie>();
constructor(private readonly http: HttpClient) {}
// ...
public getById(id: number): Observable<Movie> {
if (this.cache.has(id)) {
return scheduled(of(this.cache.get(id)), asyncScheduler);
}
return this.http.get<Movie>(...);
}
Mocking services
The same issue will appear when we mock MovieService
for tests.
Suppose our test file looks like this:
let movieService: MovieService;
beforeEach(() => {
TestBed.configureTestingModule({
providers: [
{
provide: MovieService,
useValue: jasmine.createSpyObj('MovieService', 'getById')
},
...
],
...
});
movieService = TestBed.get(MovieService);
(movieService.getById as jasmine.Spy).and.returnValue(of(mockMovie));
});
Tests will work, but it's wrong, since we return a synchronous result instead of an asynchronous result which is a realistic scenario. Again, the issue is fixed with asyncScheduler
:
let movieService: MovieService;
beforeEach(() => {
TestBed.configureTestingModule({
providers: [
{
provide: MovieService,
useValue: jasmine.createSpyObj('MovieService', 'getById')
},
...
],
...
});
movieService = TestBed.get(MovieService);
(movieService.getById as jasmine.Spy).and.returnValue(
scheduled(of(mockMovie), asyncScheduler)
);
});
ExpressionChangedAfterItHasBeenCheckedError
Every Angular developer has encountered this error and found different ways to resolve it.
The following code will result in the error:
import { AfterViewInit, Component } from '@angular/core';
@Component({
selector: 'hello',
template: `<h1>Hello {{name}}!</h1>`,
styles: [],
})
export class AppComponent implements AfterViewInit {
public name: string;
ngAfterViewInit() {
this.name = 'John';
}
}
The first solution is updating the name
property using setTimeout
:
import { AfterViewInit, Component } from '@angular/core';
@Component({
selector: 'hello',
template: `<h1>Hello {{name}}!</h1>`,
styles: [],
})
export class AppComponent implements AfterViewInit {
public name: string;
ngAfterViewInit() {
setTimeout(() => {
this.name = 'John';
})
}
}
Another solution is running change detection after updating the property:
import {
AfterViewInit,
ChangeDetectorRef,
Component,
} from '@angular/core';
@Component({
selector: 'hello',
template: `<h1>Hello {{name}}!</h1>`,
styles: [],
})
export class AppComponent implements AfterViewInit {
constructor(private readonly cdRef: ChangeDetectorRef) {}
public name: string;
ngAfterViewInit() {
this.name = 'John';
this.cdRef.detectChanges();
}
}
Each of RxJS schedulers has schedule
method, which runs provided callback function in the scheduler's respective execution context. Using that method also helps to solve the issue:
import { AfterViewInit, Component } from '@angular/core';
@Component({
selector: 'hello',
template: `<h1>Hello {{name}}!</h1>`,
styles: [],
})
export class AppComponent implements AfterViewInit {
public name: string;
ngAfterViewInit() {
/**
* any of asyncScheduler, asapScheduler and animationFrameScheduler
* solves the issue
*/
asyncScheduler.schedule(() => {
this.name = 'John';
});
}
}
Top comments (0)