Reactive Programming : RxJS 알아가기

Rxjs 는 ReactiveX의 자바스크립트 버전이다.(Reactive extensions library for JavaScript) 나는 Anuglar를 통해 처음 접해보았지만, Angular만이 아닌 RxJava나 RxSwift 등 여러 언어도 지원하는 것으로 보인다. 앵귤러를 하면서 Promise 대신 비동기 http 서비스 콜을 처리하는 부분과, 검색 단어를 타이핑하는 이벤트를 감지하여 처리하는 부분 (debouncetime을 준 검색기능) 과 subject를 이용하여 컴포넌트간 이벤트 기반의 실시간 데이터 통신 등을 수행했다. 하지만 하면서도 헷갈리고 자세히 알지 못하는 부분이 많아 좀더 자세히 보려한다.
소개페이지에 보면 “Observable 시퀀스와 표현력있는 쿼리 연산자를 사용하는 비동기적, 이벤트 기반의 프로그램을 구성하기 위한 라이브러리의 집합” 이라고 나와있다.
그 의미를 이해하려면 ‘Observer/Observable’, ‘Stream’, ‘Subject’등에 대해 어느정도 알고있어야 개념정리가 될 것 같다.
참조


1. Observer / Observable

옵저버와 옵저버블 이 두개의 단어를 단순하게 바라본다면 쉽게 다가갈 수 있다. 관찰하는 사람과 관찰할만한(?) 것. 옵저버블이 이벤트를 발생시키면 옵저버는 옵저버블을 구독하고 있다가 해당 이벤트에 대한 것에 반응 한다.
위와 같은 단순한 패턴을 코드상에 적용하게 된다면 HTTP 리퀘스트 함수를 옵저버블형태로 옵저버가 관찰하게 되면, 응답이 오는 것을 구독하여 해당 결과값을 처리할 수 있다. (여기서 구독이란 subscribe( result => {} ); 함수를 의미) 옵저버블은 이벤트/데이터를 지니며 연속된 값들을 지니고 있기도 한다. 그래서 스트림을 만들 수 있다고도 하며, 이 데이터를 가지고 가공(병합, Split, Mash 등)이 가능하다. 예를 들면 검색창에 사용자가 입력하는 입력 글자 하나하나를 확인하여 검색을 실시간으로 하거나 입력텀을 계산하여 입력이 멈추면 검색을 시작한다거나 하는 다양한 처리가 가능하다.

1-1. Observer/Observable 동작

일반적인 함수는 아래와 같이 동작한다.

// 메서드를 호출하고, 리턴 값을 `returnVal`에 할당한다
returnVal = someMethod(itsParameters);
// returnVal을 통해 필요한 작업을 진행한다

비동기 모델에서는 아래와 같은 흐름대로 코드가 실행된다.

이를 코드로 구현하면 아래와 같다:

// 옵저버의 onNext 핸들러를 정의한다, 하지만 실행하지는 않는다
// (이 예제에서는, 단순히 옵저버에 onNext 핸들러만 구현한다)
def myOnNext = { it -> /* 필요한 연산을 처리한다 */ };
// Observable을 정의하지만, 역시 실행하지는 않는다
def myObservable = someObservable(itsParameters);
// 옵저버가 Observable을 구독한다. 그리고 Observable을 실행한다
myObservable.subscribe(myOnNext);
// 필요한 코드를 구현한다

1-2. onNext, onCompleted, 그리고 onError

Subscribe 메서드를 통해 옵저버와 Observable을 연결하며, 아래 onNext/onError/onCompleted 매서드를 통해 동작을 정의한다.

onNext는 0번 이상 호출 될 수 있으며 그 후에는 onCompleted 또는 onError 둘 중 하나를 마지막으로 호출한다. 단, 이 둘 모두를 호출하지는 않는다.

예시

def myOnNext     = { item -> /* 필요한 연산을 처리한다 */ };
def myError      = { throwable -> /* 실패한 호출에 대응한다 */ };
def myComplete   = { /* 최종 응답 후 정리 작업을 한다 */ };
def myObservable = someMethod(itsParameters);
myObservable.subscribe(myOnNext, myError, myComplete);
// 필요한 코드를 계속 구현한다

1-3. Angular 예시

마지막으로 RxJS를 기본 비동기 처리 라이브러리로 사용하고 있는 Angular에서의 예를 살펴보자.
Angular에서의 심플한 예

// observable 객체 변수는 뒤에 '$'를 붙여준다.
const one$ = new Observable(observer => {
  observer.next(1);
  observer.complete();
});

one$.subscribe( result => {
  console.log(result) // 1
});
// Create simple observable that emits three values
const myObservable = Observable.of(1, 2, 3);

// Create observer object
const myObserver = {
  next: x => console.log('Observer got a next value: ' + x),
  error: err => console.error('Observer got an error: ' + err),
  complete: () => console.log('Observer got a complete notification'),
};

// Execute with the observer object
myObservable.subscribe(myObserver);
// Logs:
// Observer got a next value: 1
// Observer got a next value: 2
// Observer got a next value: 3
// Observer got a complete notification

2. Subject

Subject는 Observable 과 Observer 둘 다 될 수 있는 특별한 형태로 Subject는 Observables을 subscribe(구독) 할 수 있고 다시 emit(방출)할 수도 있다. Subject에 값을 담아서 가지고 있다가 observable 형태로 제공해주기도 하며, Observer로써 next함수를 통해 구독자에게 값을 넘겨주는 역할을 하기도 한다. 나는 Angular에서 두 개의 컴포넌트가 값을 공유하면서 공유 시 이벤트전달도 서로 하는 용도로 Subject를 사용을 주로 했으며, 아주 편리하게 사용했다.

Subject와 Observer관계

2-1. Subject 종류

Subject는 4종류로 분류된다.

2-2. Angular 적용 예시

아래는 기본 Subject를 이용한 심플한 예시다.

// Service
@Service

...

transferSubject: Subject<any> = new Subject<any>();

setObj(inData: any) {
    this.transferSubject.next(inData);
}

getObj(): Observable<any> {
    return this.transferSubject.asObservable();
}


// Component1
// 데이터를 서비스의 함수를 호출하여 전달하며, 이벤트를 바로 발생시켜 컴포넌트 2에서 바로 로직을 실행시키는 트리거 역할을 해줌
this.transferService.setObj({hello: 'World!'});


// Component2
// 서비스의 데이터 저장을 구독하여, 데이터가 저장되면 해당하는 값을 구독하여 바로 콘솔로 찍어주는 기능.
this.transferService.getObj().subscribe(result => console.log(result));

3. 기타..

RxJS를 사용하여 Angular 내에서 다양한 컴포넌트들이 공통의 데이터의 업데이트에 대한 이벤트를 구독하여 공통적으로 처리하는 기능이나 다양한 유틸성 기능을 구현하는 예제는 향후 포스트에서 다루기로 하자.