RxJS is a powerful library for reactive programming, and one of its key features being the ability to create custom operators. In this guide, we'll look at some handy custom operators and show how to implement them.
1. withLoading
Create a custom switchMap
that seamlessly manages loading states for each stream.
import { Observable, of } from 'rxjs';
import { map, catchError, startWith, finalize } from 'rxjs/operators';
interface WithLoadingResult<T> {
loading: boolean;
data?: T;
error?: any;
}
export function withLoading<T>() {
return (source: Observable<T>) =>
source.pipe(
startWith({ loading: true }),
map((data) => ({ loading: false, data })),
catchError((error) => of({ loading: false, error })),
finalize(() => ({}))
);
}
// Usage
someObservable$.pipe(withLoading()).subscribe(({ loading, data, error }) => {
if (loading) {
console.log('Loading...');
} else if (error) {
console.error('Error:', error);
} else {
console.log('Data:', data);
}
});
2. debounceAndDistinct
Effectively manage user input, minimizing unnecessary API requests. We can achieve this by combining debounce and distinctUntilChanged
import { Observable } from 'rxjs';
import { debounceTime, distinctUntilChanged } from 'rxjs/operators';
export function debounceAndDistinct<T>(time: number = 300) {
return (source: Observable<T>) =>
source.pipe(
debounceTime(time),
distinctUntilChanged()
);
}
// Usage
searchInput$.pipe(debounceAndDistinct()).subscribe((value) => {
// Perform search with debounced and distinct value
});
3. retryWithBackoff
Apply a backoff strategy for retrying failed requests more intelligently.
import { Observable, throwError, timer } from 'rxjs';
import { mergeMap, retryWhen } from 'rxjs/operators';
export function retryWithBackoff(
maxRetries: number = 3,
backoffTime: number = 1000
) {
return (source: Observable<any>) =>
source.pipe(
retryWhen((errors) =>
errors.pipe(
mergeMap((error, index) => {
const retryAttempt = index + 1;
if (retryAttempt > maxRetries) {
return throwError(error);
}
console.log(`Retry attempt ${retryAttempt}: retrying in ${backoffTime}ms`);
return timer(backoffTime * retryAttempt);
})
)
)
);
}
// Usage
apiCall$.pipe(retryWithBackoff()).subscribe(
(data) => console.log('Success:', data),
(error) => console.error('Error:', error)
);
4. cachingOperator
Cache API responses to reduce server load and improve efficiency.
import { Observable, of } from 'rxjs';
import { tap, shareReplay } from 'rxjs/operators';
export function cachingOperator<T>(cacheTime: number = 60000) {
let cachedData: T;
let cachedTime: number;
return (source: Observable<T>) =>
new Observable<T>((observer) => {
if (cachedData && Date.now() - cachedTime < cacheTime) {
observer.next(cachedData);
observer.complete();
} else {
source
.pipe(
tap((data) => {
cachedData = data;
cachedTime = Date.now();
}),
shareReplay(1)
)
.subscribe(observer);
}
});
}
// Usage
apiCall$.pipe(cachingOperator()).subscribe((data) => console.log('Data:', data));
5. progressiveLoading
Load data incrementally, emitting partial results to enhance perceived performance.
import { Observable } from 'rxjs';
import { expand, take, map } from 'rxjs/operators';
export function progressiveLoading<T>(
pageSize: number = 10,
maxItems: number = 100
) {
return (source: Observable<T[]>) =>
source.pipe(
expand((items, index) =>
items.length < maxItems
? source.pipe(
map((newItems) => [...items, ...newItems.slice(0, pageSize)])
)
: []
),
take(Math.ceil(maxItems / pageSize))
);
}
// Usage
apiCall$.pipe(progressiveLoading()).subscribe((partialData) => {
console.log('Partial data:', partialData);
});
6. errorHandlingOperator
Centralize error management for consistent and streamlined error handling.
import { Observable, throwError } from 'rxjs';
import { catchError } from 'rxjs/operators';
export function errorHandlingOperator<T>() {
return (source: Observable<T>) =>
source.pipe(
catchError((error) => {
console.error('An error occurred:', error);
// You can add custom error handling logic here
return throwError('Something went wrong. Please try again later.');
})
);
}
// Usage
apiCall$.pipe(errorHandlingOperator()).subscribe(
(data) => console.log('Data:', data),
(error) => console.error('Handled error:', error)
);
7. optimisticUpdate
Immediately update the UI before API confirmation, with a rollback in case of failure.
import { Observable, of } from 'rxjs';
import { map, catchError, switchMap } from 'rxjs/operators';
export function optimisticUpdate<T, R>(
updateFn: (data: T) => T,
apiCall: (data: T) => Observable<R>
) {
return (source: Observable<T>) =>
source.pipe(
map((data) => ({ optimistic: updateFn(data), original: data })),
switchMap(({ optimistic, original }) =>
apiCall(optimistic).pipe(
map(() => optimistic),
catchError(() => {
console.warn('API call failed. Reverting to original data.');
return of(original);
})
)
)
);
}
// Usage
const updateTodo = (todo: Todo): Todo => ({ ...todo, completed: true });
const apiUpdateTodo = (todo: Todo): Observable<Todo> => // API call implementation
todoStream$.pipe(optimisticUpdate(updateTodo, apiUpdateTodo))
.subscribe((updatedTodo) => console.log('Updated todo:', updatedTodo));
8. throttleAndBuffer
Use throttle combined with buffer to batch updates and manage data streams effectively.
import { Observable } from 'rxjs';
import { buffer, throttleTime } from 'rxjs/operators';
export function throttleAndBuffer<T>(time: number = 1000) {
return (source: Observable<T>) =>
source.pipe(
buffer(source.pipe(throttleTime(time))),
filter((batch) => batch.length > 0)
);
}
// Usage
dataStream$.pipe(throttleAndBuffer()).subscribe((batch) => {
console.log('Batched updates:', batch);
});
9. conditionalMerge
Merge multiple observables based on dynamic conditions for flexible data combination.
import { Observable, merge } from 'rxjs';
import { filter } from 'rxjs/operators';
export function conditionalMerge<T>(
...sources: Array<[Observable<T>, (value: T) => boolean]>
) {
return merge(
...sources.map(([source, condition]) =>
source.pipe(filter(condition))
)
);
}
// Usage
const source1$ = of(1, 2, 3, 4);
const source2$ = of('a', 'b', 'c', 'd');
conditionalMerge(
[source1$, (value) => value % 2 === 0],
[source2$, (value) => ['a', 'c'].includes(value)]
).subscribe((value) => console.log('Merged value:', value));
10. smartPolling
Implement adaptive polling intervals that respond to data changes or user activity.
import { Observable, timer } from 'rxjs';
import { switchMap, tap, distinctUntilChanged } from 'rxjs/operators';
export function smartPolling<T>(
pollFn: () => Observable<T>,
baseInterval: number = 5000,
maxInterval: number = 60000
) {
let currentInterval = baseInterval;
let lastValue: T;
return new Observable<T>((observer) => {
const subscription = timer(0, baseInterval)
.pipe(
switchMap(() => pollFn()),
tap((value) => {
if (JSON.stringify(value) !== JSON.stringify(lastValue)) {
currentInterval = baseInterval;
} else {
currentInterval = Math.min(currentInterval * 2, maxInterval);
}
lastValue = value;
}),
distinctUntilChanged()
)
.subscribe(observer);
return () => subscription.unsubscribe();
});
}
// Usage
const pollApi = (): Observable<Data> => // API call implementation
smartPolling(pollApi).subscribe((data) => console.log('Polled data:', data));
11. filterOnlyPresent
Filters out null or undefined values from the stream, ensuring only valid data is emitted.
import { Observable } from 'rxjs';
import { filter } from 'rxjs/operators';
export function filterOnlyPresent<T>() {
return (source: Observable<T>) =>
source.pipe(
filter((value): value is NonNullable<T> => value !== null && value !== undefined)
);
}
// Usage
someStream$.pipe(filterOnlyPresent()).subscribe((value) => {
console.log('Non-null value:', value);
});
12. filterOnlyPropertyPresent
Emits only when a specified property in the returned object is neither null nor undefined.
import { Observable } from 'rxjs';
import { filter } from 'rxjs/operators';
export function filterOnlyPropertyPresent<T>(prop: keyof T) {
return (source: Observable<T>) =>
source.pipe(
filter((value) => value[prop] !== null && value[prop] !== undefined)
);
}
// Usage
interface User {
id: number;
name: string | null;
}
userStream$.pipe(filterOnlyPropertyPresent('name')).subscribe((user) => {
console.log('User with name:', user);
});
These custom operators can greatly improve your RxJS workflows, making your code more efficient and cleaner. Be sure to thoroughly test these operators in your specific scenarios and adjust them as needed.
Top comments (0)