RxJS와 반응형 프로그래밍

yerinyerin
8 min read

Think of RxJS as Lodash for events.

참고 블로그

공식문서

개요

RxJS(Reactive Extensions for JavaScript)는 JavaScript를 위한 반응형 프로그래밍 라이브러리이다. 비동기 이벤트를 마치 배열처럼 다룰 수 있게 만들어주는 라이브러리이다. RxJS의 핵심 아이디어는 다음과 같다:

  1. 함수형 프로그래밍: RxJS는 순수 함수를 사용하여 부수효과를 최소화하고 코드의 예측 가능성을 높인다.

  2. 옵저버 패턴: 객체(관찰 대상)의 상태가 변경되면 그에 의존하는 모든 객체들이 자동으로 통지받는 방식으로, 비동기 이벤트 처리에 적합하다. RxJS는 옵저버 패턴을 사용하여 객체(옵저버블)의 상태 변화를 다른 객체들(옵저버)에게 자동으로 알리는 방식을 구현한다.

  3. 이터레이터 패턴: 컬렉션의 요소들을 순회하는 방법을 제공하는 디자인 패턴. RxJS에서는 이터레이터 패턴을 사용하여 데이터 스트림을 순차적으로 처리하고 동기 및 비동기 데이터를 처리할 수 있다.

특히 Angular는 RxJS를 기본적으로 채택하고 있어서 HTTP 요청이나 폼 처리 등에서 RxJS의 옵저버블을 사용하고 있다. 이를 통해 개발자는 비동기 작업을 효과적으로 처리하고 복잡한 데이터 흐름을 쉽게 관리할 수 있다.

반응형 프로그래밍 이해하기

반응형 프로그래밍은 데이터 스트림과 변화의 전파에 중점을 둔 프로그래밍 패러다임이다. 이 접근 방식은 비동기 데이터 스트림을 쉽게 만들고 조작할 수 있게 해준다.

  • 데이터 스트림

    • 시간에 따라 발생하는 이벤트나 값의 연속.

    • e.g. 마우스 클릭, 키보드 입력, API 응답 등.

  • 변화의 전파

    • 하나의 데이터 스트림의 변화가 다른 데이터나 UI의 변화를 자동으로 트리거한다.
  • 선언적 프로그래밍

    • 반응형 프로그래밍은 '무엇을' 할 것인지를 정의하는 선언적 방식을 사용한다.
  • 함수형 프로그래밍의 개념 활용

    • 맵, 필터, 리듀스 등의 고차 함수를 사용하여 데이터 스트림을 변형하고 조작한다.
  • 추상화

    • 시간에 따른 데이터의 흐름을 하나의 스트림으로 추상화하여 다룰 수 있다.

    • RxJS는 시간에 따라 발생하는 모든 종류의 이벤트나 데이터를 하나의 스트림으로 취급한다. 이는 마치 배열을 다루는 것처럼 시간에 따른 이벤트를 다룰 수 있게 한다.

import { fromEvent } from 'rxjs';
import { debounceTime, map } from 'rxjs/operators';

const button = document.querySelector('button');
const clicks = fromEvent(button, 'click');

clicks.pipe(
  debounceTime(1000),
  map(event => event.clientX)
).subscribe(x => console.log(x));

위 코드에서 const clicks = fromEvent(button, 'click')는 버튼 클릭을 스트림으로 변환하는 것. 이런 방식으로 RxJS는 복잡한 비동기 작업을 간단하고 선형적인(linear) 형태로 표현할 수 있게 한다. 또한 시간에 따른 데이터 흐름을 하나의 일관된 방식으로 다룰 수 있어서 다양한 종류의 비동기 작업을 동일한 패턴으로 처리할 수 있게 된다.

RxJS의 핵심 개념

RxJS의 핵심 개념은 다음과 같다:

  1. Observable:

    • 시간에 따라 여러 값을 방출할 수 있는 데이터 스트림, 미래의 이벤트를 일으키거나 값을 방출할 수 있는 컬렉션.

    • 비동기적으로 데이터를 생성하고 전달하는 메커니즘을 제공한다.

const observable = new Observable(subscriber => {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  subscriber.complete();
});
  • Observable 생성자는 구독함수를 인자로 받는다.

  • 구독함수는 subscriber(observer)를 매개변수로 받아 데이터를 발행한다.

  1. Observer:

    • Observable에서 방출된 값을 소비하는 객체이다.

    • next(), error(), complete() 메서드를 가질 수 있다.

import { Observable } from 'rxjs';

const observable = new Observable<number>(subscriber => {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  subscriber.complete();
});

// 1. 단일 함수를 전달하는 경우 (next 핸들러로 간주)
console.log("1. 단일 함수 (next 핸들러):");
observable.subscribe(
  value => console.log(`Next: ${value}`)
);

// 2. 객체를 사용한 명시적 방법
console.log("\n2. 객체를 사용한 명시적 방법:");
observable.subscribe({
  next: value => console.log(`Next: ${value}`),
  error: err => console.error(`Error: ${err}`),
  complete: () => console.log('Completed')
});

// 3. 세 개의 개별 함수를 전달하는 방법
// 순서대로 next, error, complete로 간주되기 때문에 순서가 중요.
console.log("\n3. 세 개의 개별 함수:");
observable.subscribe(
  value => console.log(`Next: ${value}`),
  err => console.error(`Error: ${err}`),
  () => console.log('Completed')
);

// 4. 에러 핸들러만 제공하는 경우 (이는 작동하지 않는다.)
console.log("\n4. 에러 핸들러만 제공 (작동하지 않음):");
observable.subscribe(
  undefined,
  err => console.error(`Error: ${err}`)
);
  1. Subscription:

    • Observable의 실행을 나타내며, 주로 실행을 취소하는 데 사용된다.

    • JavaScript의 타이머 함수(예: setTimeout)를 취소하는 것과 유사한 개념.

    • 리소스 해제나 실행 취소에 유용하다.

const observable = interval(1000);
const subscription = observable.subscribe(x => console.log(x));

// 나중에 구독을 취소
setTimeout(() => {
  subscription.unsubscribe();
}, 5000);
  1. 연산자:

    • 순수 함수를 사용하여 컬렉션을 처리하는 방법으로, Observable을 변환하고 조작하는 데 사용된다.

    • 파이프라인을 통해 데이터 스트림을 변형할 수 있다.

import { of } from 'rxjs';
import { map, filter } from 'rxjs/operators';

const nums = of(1, 2, 3, 4, 5);

const squareOddVals = nums.pipe(
  filter((n: number) => n % 2 !== 0),
  map(n => n * n)
);

squareOddVals.subscribe(x => console.log(x));

이 외에 subject, schedulers라는 개념도 있지만, 우선은 이 정도로만 정리하고 연산자를 더 자세히 살펴보고자 한다.

RxJS의 다양한 연산자

생성 연산자

  • of(): 주어진 인자들을 차례로 발행하는 Observable 생성.
const obs = of(1, 2, 3, 4, 5);
  • from(): 배열, 프로미스, 이터러블 객체 등을 Observable로 변환.
const obs = from([1, 2, 3, 4, 5])
  • range(): 지정된 범위의 숫자를 발행하는 Observable을 생성.
const obs = range(1,5) // 1부터 5까지
  • interval(): 주어진 시간 간격으로 정수를 순차적으로 발행.
const obs = interval(1000) // 1초마다 0, 1, 2, 3, ... 발행
  • timer(): 지정된 시간 후에 값을 발행하거나, 주기적으로 값을 발행.
const obs = timer(3000, 1000) // 3초 후 시작하여 그 후에 1초마다 발행.
  • fromEvent(): 이벤트로부터 Observable 생성. fromEvent로 생성된 Observable은 원본 이벤트 객체를 발행한다.
const mouseMoves$ = fromEvent(document, 'mousemove'); 
mouseMoves$.subscribe((event: MouseEvent) => { 
console.log(`Mouse position: ${event.clientX}, ${event.clientY}`); });

변환 연산자

Observable에서 발행된 값을 변형하여 새로운 Observable을 생성한다.

  • map(): 각 값을 새로운 값으로 변환.
import { from } from 'rxjs';
import { map } from 'rxjs/operators';

from([1, 2, 3]).pipe(
  map(x => x * 2)
).subscribe(console.log);
// 출력: 2, 4, 6
  • mergeMap(): 각 값을 Observable로 변환하고 결과를 하나의 Observable로 병합.
import { of } from 'rxjs';
import { mergeMap } from 'rxjs/operators';

of(1, 2, 3).pipe(
  mergeMap(x => of(x, x * 2))
).subscribe(console.log);
// 출력: 1, 2, 2, 4, 3, 6
  • scan(): 누적 값을 계산하는데 그 중간 계산 결과를 즉시 방출한다. 최종 누적 결과만을 한 번 방출하는 reduce와는 다름!
import { from } from 'rxjs';
import { scan } from 'rxjs/operators';

from([1, 2, 3, 4, 5]).pipe(
  scan((acc, val) => acc + val, 0)
).subscribe(console.log);
// 출력: 1, 3, 6, 10, 15

필터링 연산자

필터링 연산자는 Observable에서 특정 조건을 만족하는 값만 선택한다.

  • filter(): 조건을 만족하는 값만 통과시킨다.
import { from } from 'rxjs';
import { filter } from 'rxjs/operators';

from([1, 2, 3, 4, 5]).pipe(
  filter(x => x % 2 === 0)
).subscribe(console.log);
// 출력: 2, 4
  • take(): 지정된 개수의 값만 가져옵니다.
import { interval } from 'rxjs';
import { take } from 'rxjs/operators';

interval(1000).pipe(
  take(3)
).subscribe(console.log);
// 출력: 0, 1, 2 (1초 간격으로)
  • debounceTime(): 지정된 시간 동안 새 값이 발행되지 않으면 마지막 값을 발행합니다.
import { fromEvent } from 'rxjs';
import { debounceTime } from 'rxjs/operators';

const input = document.getElementById('search-input');
fromEvent(input, 'input').pipe(
  debounceTime(300)
).subscribe(() => console.log('Input debounced'));
// 입력이 300ms 동안 없을 때 로그 출력

결합 연산자

결합 연산자는 여러 Observable을 하나로 결합한다.

  • merge(): 여러 Observable의 값을 하나의 Observable로 병합합니다.
import { merge, interval } from 'rxjs';
import { take } from 'rxjs/operators';

const timer1$ = interval(1000).pipe(take(3));
const timer2$ = interval(500).pipe(take(3));
merge(timer1$, timer2$).subscribe(console.log);
// 출력: 0, 0, 1, 2, 1, 2 (시간에 따라 순서가 다를 수 있음)
  • combineLatest(): 여러 Observable의 최신 값을 결합한다. 여러 Observable에서 각각 적어도 하나의 값이 방출될 때까지 기다린다. 그 후, 어떤 Observable에서 새로운 값이 방출될 때마다 모든 Observable의 최신 값을 배열로 결합하여 방출한다. 그 결과는 각 Observable에서 방출하는 최신 값을 포함하게 될 것.
import { combineLatest, timer } from 'rxjs';

const firstTimer$ = timer(0, 1000);
const secondTimer$ = timer(500, 1000);

combineLatest([firstTimer$, secondTimer$]).subscribe(
  ([first, second]) => console.log(`Timer1: ${first}, Timer2: ${second}`)
);
// 출력: 각 타이머의 최신 값 조합
  • zip(): 여러 Observable의 값을 순서대로 결합합니다.
import { zip, of } from 'rxjs';

const age$ = of(27, 25, 29);
const name$ = of('Foo', 'Bar', 'Beer');
const isDev$ = of(true, true, false);

zip(age$, name$, isDev$).subscribe(
  ([age, name, isDev]) => console.log(`${name}: ${age} ${isDev ? 'Developer' : 'Not a developer'}`)
);

// 출력:
// Foo: 27 Developer
// Bar: 25 Developer
// Beer: 29 Not a developer

유틸리티 연산자(보조 연산자)

다른 연산자들과 함께 사용되어 Observable의 동작을 더 세밀하게 제어할 수 있다. Observable의 부수 효과를 관리하는 데 중점을 둔다.

  • tap: Observable 스트림을 변경하지 않고 부수 효과를 수행할 때 사용.

  • 주로 디버깅이나 로깅에 사용됨.

import { of } from 'rxjs';
import { tap, map } from 'rxjs/operators';

of(1, 2, 3).pipe(
  tap(value => console.log('Before map:', value)),
  map(value => value * 2),
  tap(value => console.log('After map:', value))
).subscribe();

// 출력:
// Before map: 1
// After map: 2
// Before map: 2
// After map: 4
// Before map: 3
// After map: 6
  • finalize: Observable이 완료되거나 에러가 발생했을 때 실행될 콜백을 지정할 수 있다.

  • 주로 리소스 정리나 로깅에 사용.

import { of } from 'rxjs';
import { finalize } from 'rxjs/operators';

of(1, 2, 3).pipe(
  finalize(() => console.log('Observable 완료'))
).subscribe(
  value => console.log(value),
  err => console.error(err),
  () => console.log('구독 완료')
);

// 출력:
// 1
// 2
// 3
// 구독 완료
// Observable 완료

RxJS를 사용한 예시 코드

예시 코드 1.

import { Component } from '@angular/core';
import { Observable, of } from 'rxjs';
import { map, catchError } from 'rxjs/operators';
import { HttpClient } from '@angular/common/http';

@Component({
  selector: 'app-root',
  template: `
    <div>
      <h1>{{title}}</h1>
      <p>Data: {{data$ | async}}</p>
    </div>
  `
})
export class AppComponent {
  title = 'RxJS in Angular Example';
  data$: Observable<string>;

  constructor(private http: HttpClient) {
    this.data$ = this.http.get<any>('https://api.example.com/data')
      .pipe(
        map(response => response.data),
        catchError(error => {
          console.error('Error fetching data', error);
          return of('Error fetching data');
        })
      );
  }
}
  • Angular의 HTTPClient는 HTTP GET 요청을 보내는데, http.get() 메서드는 Observable을 반환하여 비동기적으로 데이터를 받아올 수 있게 한다.

  • pipe() 를 통해 반환된 Observable에 연산자들을 체이닝하여 적용할 수 있다.

    • map() 연산자로 응답 데이터를 변환한다.

    • catchError() 연산자로 오류를 처리한다.

  • 결과 Observable(data$)을 컴포넌트에서 async 파이프를 사용하여 자동으로 구독하고 해제할 수 있다.

    • async를 사용하지 않으면 수동으로 구독해야 한다.

예시 코드 2.

import { fromEvent } from 'rxjs';
import { throttleTime, map } from 'rxjs/operators';

let button = document.querySelector('button');
let clicks = fromEvent(button, 'click');

clicks.pipe(
  throttleTime(1000),
  map(event => event.clientX)
).subscribe(x => console.log(x));
  • 클릭 이벤트를 스트림으로 처리하고, 1초 동안 추가 클릭을 무시(throttle)하며, 각 클릭의 X 좌표만을 추출하여 로그로 출력한다.

마무리

RxJS는 복잡한 비동기 작업과 이벤트 기반 프로그래밍을 더욱 간결하고 효율적으로 만들어주는 강력한 라이브러리이다. 다양한 연산자를 통해 데이터 스트림을 변형, 결합, 필터링하는 능력은 복잡한 비동기 로직을 선언적이고 읽기 쉬운 코드로 표현할 수 있게 해준다.

RxJS를 완전히 이해하고 활용하기까지는 시간이 많이 걸리겠지만, 일단 익숙해지면 비동기 데이터 흐름 관리를 쉽고 효과적으로 할 수 있을 것이다.

0
Subscribe to my newsletter

Read articles from yerin directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

yerin
yerin