Custom RxJS Operators to Improve Angular Apps

Tunji AdeyeriTunji Adeyeri
5 min read

RxJS is a powerful library for reactive programming, and one of its key features being the ability to create custom operators. In this guide, we'll look at some handy custom operators and show how to implement them.

1. withLoading

Create a custom switchMap that seamlessly manages loading states for each stream.

import { Observable, of } from 'rxjs';
import { map, catchError, startWith, finalize } from 'rxjs/operators';

interface WithLoadingResult<T> {
  loading: boolean;
  data?: T;
  error?: any;
}

export function withLoading<T>() {
  return (source: Observable<T>) =>
    source.pipe(
      startWith({ loading: true }),
      map((data) => ({ loading: false, data })),
      catchError((error) => of({ loading: false, error })),
      finalize(() => ({}))
    );
}

// Usage
someObservable$.pipe(withLoading()).subscribe(({ loading, data, error }) => {
  if (loading) {
    console.log('Loading...');
  } else if (error) {
    console.error('Error:', error);
  } else {
    console.log('Data:', data);
  }
});

2. debounceAndDistinct

Effectively manage user input, minimizing unnecessary API requests. We can achieve this by combining debounce and distinctUntilChanged

import { Observable } from 'rxjs';
import { debounceTime, distinctUntilChanged } from 'rxjs/operators';

export function debounceAndDistinct<T>(time: number = 300) {
  return (source: Observable<T>) =>
    source.pipe(
      debounceTime(time),
      distinctUntilChanged()
    );
}

// Usage
searchInput$.pipe(debounceAndDistinct()).subscribe((value) => {
  // Perform search with debounced and distinct value
});

3. retryWithBackoff

Apply a backoff strategy for retrying failed requests more intelligently.

import { Observable, throwError, timer } from 'rxjs';
import { mergeMap, retryWhen } from 'rxjs/operators';

export function retryWithBackoff(
  maxRetries: number = 3,
  backoffTime: number = 1000
) {
  return (source: Observable<any>) =>
    source.pipe(
      retryWhen((errors) =>
        errors.pipe(
          mergeMap((error, index) => {
            const retryAttempt = index + 1;
            if (retryAttempt > maxRetries) {
              return throwError(error);
            }
            console.log(`Retry attempt ${retryAttempt}: retrying in ${backoffTime}ms`);
            return timer(backoffTime * retryAttempt);
          })
        )
      )
    );
}

// Usage
apiCall$.pipe(retryWithBackoff()).subscribe(
  (data) => console.log('Success:', data),
  (error) => console.error('Error:', error)
);

4. cachingOperator

Cache API responses to reduce server load and improve efficiency.

import { Observable, of } from 'rxjs';
import { tap, shareReplay } from 'rxjs/operators';

export function cachingOperator<T>(cacheTime: number = 60000) {
  let cachedData: T;
  let cachedTime: number;

  return (source: Observable<T>) =>
    new Observable<T>((observer) => {
      if (cachedData && Date.now() - cachedTime < cacheTime) {
        observer.next(cachedData);
        observer.complete();
      } else {
        source
          .pipe(
            tap((data) => {
              cachedData = data;
              cachedTime = Date.now();
            }),
            shareReplay(1)
          )
          .subscribe(observer);
      }
    });
}

// Usage
apiCall$.pipe(cachingOperator()).subscribe((data) => console.log('Data:', data));

5. progressiveLoading

Load data incrementally, emitting partial results to enhance perceived performance.

import { Observable } from 'rxjs';
import { expand, take, map } from 'rxjs/operators';

export function progressiveLoading<T>(
  pageSize: number = 10,
  maxItems: number = 100
) {
  return (source: Observable<T[]>) =>
    source.pipe(
      expand((items, index) =>
        items.length < maxItems
          ? source.pipe(
              map((newItems) => [...items, ...newItems.slice(0, pageSize)])
            )
          : []
      ),
      take(Math.ceil(maxItems / pageSize))
    );
}

// Usage
apiCall$.pipe(progressiveLoading()).subscribe((partialData) => {
  console.log('Partial data:', partialData);
});

6. errorHandlingOperator

Centralize error management for consistent and streamlined error handling.

import { Observable, throwError } from 'rxjs';
import { catchError } from 'rxjs/operators';

export function errorHandlingOperator<T>() {
  return (source: Observable<T>) =>
    source.pipe(
      catchError((error) => {
        console.error('An error occurred:', error);
        // You can add custom error handling logic here
        return throwError('Something went wrong. Please try again later.');
      })
    );
}

// Usage
apiCall$.pipe(errorHandlingOperator()).subscribe(
  (data) => console.log('Data:', data),
  (error) => console.error('Handled error:', error)
);

7. optimisticUpdate

Immediately update the UI before API confirmation, with a rollback in case of failure.

import { Observable, of } from 'rxjs';
import { map, catchError, switchMap } from 'rxjs/operators';

export function optimisticUpdate<T, R>(
  updateFn: (data: T) => T,
  apiCall: (data: T) => Observable<R>
) {
  return (source: Observable<T>) =>
    source.pipe(
      map((data) => ({ optimistic: updateFn(data), original: data })),
      switchMap(({ optimistic, original }) =>
        apiCall(optimistic).pipe(
          map(() => optimistic),
          catchError(() => {
            console.warn('API call failed. Reverting to original data.');
            return of(original);
          })
        )
      )
    );
}

// Usage
const updateTodo = (todo: Todo): Todo => ({ ...todo, completed: true });
const apiUpdateTodo = (todo: Todo): Observable<Todo> => // API call implementation

todoStream$.pipe(optimisticUpdate(updateTodo, apiUpdateTodo))
  .subscribe((updatedTodo) => console.log('Updated todo:', updatedTodo));

8. throttleAndBuffer

Use throttle combined with buffer to batch updates and manage data streams effectively.

import { Observable } from 'rxjs';
import { buffer, throttleTime } from 'rxjs/operators';

export function throttleAndBuffer<T>(time: number = 1000) {
  return (source: Observable<T>) =>
    source.pipe(
      buffer(source.pipe(throttleTime(time))),
      filter((batch) => batch.length > 0)
    );
}

// Usage
dataStream$.pipe(throttleAndBuffer()).subscribe((batch) => {
  console.log('Batched updates:', batch);
});

9. conditionalMerge

Merge multiple observables based on dynamic conditions for flexible data combination.

import { Observable, merge } from 'rxjs';
import { filter } from 'rxjs/operators';

export function conditionalMerge<T>(
  ...sources: Array<[Observable<T>, (value: T) => boolean]>
) {
  return merge(
    ...sources.map(([source, condition]) =>
      source.pipe(filter(condition))
    )
  );
}

// Usage
const source1$ = of(1, 2, 3, 4);
const source2$ = of('a', 'b', 'c', 'd');

conditionalMerge(
  [source1$, (value) => value % 2 === 0],
  [source2$, (value) => ['a', 'c'].includes(value)]
).subscribe((value) => console.log('Merged value:', value));

10. smartPolling

Implement adaptive polling intervals that respond to data changes or user activity.

import { Observable, timer } from 'rxjs';
import { switchMap, tap, distinctUntilChanged } from 'rxjs/operators';

export function smartPolling<T>(
  pollFn: () => Observable<T>,
  baseInterval: number = 5000,
  maxInterval: number = 60000
) {
  let currentInterval = baseInterval;
  let lastValue: T;

  return new Observable<T>((observer) => {
    const subscription = timer(0, baseInterval)
      .pipe(
        switchMap(() => pollFn()),
        tap((value) => {
          if (JSON.stringify(value) !== JSON.stringify(lastValue)) {
            currentInterval = baseInterval;
          } else {
            currentInterval = Math.min(currentInterval * 2, maxInterval);
          }
          lastValue = value;
        }),
        distinctUntilChanged()
      )
      .subscribe(observer);

    return () => subscription.unsubscribe();
  });
}

// Usage
const pollApi = (): Observable<Data> => // API call implementation

smartPolling(pollApi).subscribe((data) => console.log('Polled data:', data));

11. filterOnlyPresent

Filters out null or undefined values from the stream, ensuring only valid data is emitted.

import { Observable } from 'rxjs';
import { filter } from 'rxjs/operators';

export function filterOnlyPresent<T>() {
  return (source: Observable<T>) =>
    source.pipe(
      filter((value): value is NonNullable<T> => value !== null && value !== undefined)
    );
}

// Usage
someStream$.pipe(filterOnlyPresent()).subscribe((value) => {
  console.log('Non-null value:', value);
});

12. filterOnlyPropertyPresent

Emits only when a specified property in the returned object is neither null nor undefined.

import { Observable } from 'rxjs';
import { filter } from 'rxjs/operators';

export function filterOnlyPropertyPresent<T>(prop: keyof T) {
  return (source: Observable<T>) =>
    source.pipe(
      filter((value) => value[prop] !== null && value[prop] !== undefined)
    );
}

// Usage
interface User {
  id: number;
  name: string | null;
}

userStream$.pipe(filterOnlyPropertyPresent('name')).subscribe((user) => {
  console.log('User with name:', user);
});

These custom operators can greatly improve your RxJS workflows, making your code more efficient and cleaner. Be sure to thoroughly test these operators in your specific scenarios and adjust them as needed.

0
Subscribe to my newsletter

Read articles from Tunji Adeyeri directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Tunji Adeyeri
Tunji Adeyeri