Reactive Programming with RxJS

Introduction:

Reactive Programming has revolutionized the way developers handle asynchronous data streams in JavaScript applications. Among the plethora of libraries available, RxJS stands out as a powerful tool for managing and manipulating these streams effortlessly. In this guide, we'll delve into RxJS, exploring its major functions and demonstrating how to leverage them effectively through practical examples.

What is RxJS??

RxJS, short for Reactive Extensions for JavaScript, is a library for composing asynchronous and event-based programs using observable sequences. It provides a rich set of operators to work with these observables, allowing developers to transform, filter, merge, and handle asynchronous data streams with ease.

Why use RxJS??

RxJS provides a simple and powerful way to handle asynchronous data streams. It allows you to write code that is easy to read and maintain. RxJS also provides a set of operators that can be used to transform and manipulate data streams, making it easier to handle complex data processing tasks.

Getting Started:

Before diving into the details, let's set up a basic environment to work with RxJS. You can include RxJS in your project using npm or yarn:

npm install rxjs

Once installed, you can import RxJS into your project:

import { Observable, of, from } from 'rxjs';
import { map, catchError } from 'rxjs/operators';

Now, let's explore some of the major functions provided by RxJS:

  • Creating Observables:

    Observables are the core building blocks of RxJS. You can create observables from various sources such as events, timers, promises, or even existing data arrays.

      const observable1 = new Observable((observer) => {
        observer.next('Hello!!');
        observer.next('Its RxJS');
        observer.complete();
      });
      const observable2 = of([1, 2, 3], 4, 5, 6);
      const observable3 = from([1, 2, 3]);
    
  • Subscribing to Observables:

    To consume the data emitted by observables, you need to subscribe to them. Subscriptions define what to do with the emitted values, errors, and completion signals.

      observable.subscribe({
        next: (value) => console.log(value),
        error: (err) => console.error(err),
        complete: () => console.log('Completed'),
      });
    
  • Error Handling:

    You can handle errors emitted by observables using error handling operators like catchError.

      observable.pipe(
        map((value) => {
          if (value === 'RxJS') {
            // intentionally throwing error to handle it later.
            throw new Error('Something went wrong');
          }
          return value;
        }),
        catchError((err) => {
          console.error('Error:', err.message);
          return of('Default Value');
        })
      ).subscribe(console.log);
    
  • Operators:

    RxJS provides a wide range of operators to transform, filter, combine, and manipulate observable streams.

    • mergeMap / flatMap:

      This operator maps each value emitted by the source observable to an inner observable, then flattens all these inner observables into a single observable stream.

        import { fromEvent } from 'rxjs';
        import { mergeMap } from 'rxjs/operators';
      
        const clicks = fromEvent(document, 'click');
        const innerObservable = clicks.pipe(
          mergeMap((event) => {
            return fromEvent(document, 'mousemove');
          })
        );
      
        innerObservable.subscribe((position) => {
            console.log(position)
        });
      
    • debounceTime:

      Debounce time delays the emission of items from the source observable until a specified amount of time has passed without another source emission.

        import { fromEvent } from 'rxjs';
        import { debounceTime } from 'rxjs/operators';
      
        const input = document.getElementById('input');
        const inputObservable = fromEvent(input, 'input');
        const debouncedInput = inputObservable.pipe(
          debounceTime(300)
        );
      
        debouncedInput.subscribe((event) => {
            console.log(event.target.value)
        });
      
    • scan:

      Scan operator applies an accumulator function over the source observable sequence and returns each intermediate result.

        import { from } from 'rxjs';
        import { scan } from 'rxjs/operators';
      
        const source = from([1, 2, 3, 4, 5]);
        const scanned = source.pipe(
          scan((acc, curr) => acc + curr, 0)
        );
      
        scanned.subscribe((value) => {
            console.log(value)
        }); // Output: 1, 3, 6, 10, 15
      
    • distinctUntilChanged:

      This operator suppresses consecutive duplicate items emitted by the source observable.

        import { of } from 'rxjs';
        import { distinctUntilChanged } from 'rxjs/operators';
      
        const source = of(1, 1, 2, 2, 3, 3, 3, 4, 5);
        const distinct = source.pipe(
          distinctUntilChanged()
        );
      
        distinct.subscribe((value) => {
            console.log(value)
        }); // Output: 1, 2, 3, 4, 5
      
    • takeUntil:

      TakeUntil emits the values emitted by the source observable until a notifier observable emits a value.

        import { interval, fromEvent } from 'rxjs';
        import { takeUntil } from 'rxjs/operators';
      
        const source = interval(1000);
        const stopTimer = fromEvent(document, 'click');
        const timer = source.pipe(
          takeUntil(stopTimer)
        );
      
        timer.subscribe((value) => {
            console.log(value)
        }); // Output: 0, 1, 2, ... until a click event occurs
      
    • concat:

      It preserves the order of the observables and starts emitting values from the first observable provided, waits for it to complete, then moves to the next observable, and so on.

        import { concat, of, interval } from 'rxjs';
        import { take } from 'rxjs/operators';
      
        // Emits 0, 1, 2 over time with a one-second interval
        const source1 = interval(1000).pipe(take(3));
        // Emits 'A', 'B', 'C' immediately
        const source2 = of('A', 'B', 'C');
      
        const concatenated = concat(source1, source2);
      
        concatenated.subscribe({
          next: (value) => console.log(value),
          complete: () => console.log('Concatenation completed'),
        });
        // Output:
        /* 0 // emitted after 1 second
           1 // emitted after another second
           2 // emitted after another second
           A
           B
           C
           Concatenation completed
        */
      

These are just a few examples of the many operators available in RxJS. Experimenting with these operators in different scenarios will give you a better understanding of their capabilities and how they can be utilized to enhance your reactive programming workflows.

"Thank you for reading! Stay tuned for more insightful content, and don't hesitate to reach out with any questions or feedback. Happy learning and coding!! "

0
Subscribe to my newsletter

Read articles from Rishabh Singh Sengar directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Rishabh Singh Sengar
Rishabh Singh Sengar