먹고 기도하고 코딩하라

[RxSwift] (1) 반응형 프로그래밍과 Observable 개요 본문

앱/Swift

[RxSwift] (1) 반응형 프로그래밍과 Observable 개요

사과먹는사람 2022. 11. 14. 23:08
728x90
728x90

 

Rx는 무엇인가?

ReactiveX 공식 사이트에 가보면, Rx는 관찰 가능한 스트림과 비동기 프로그래밍을 위한 API(An API for asynchronous programming with observable streams)라고 적혀 있다.

Rx는 Observer 패턴을 더욱 확장해서, 데이터나 이벤트 시퀀스를 지원하며 개발자들이 프로그램에 갖는 로우레벨 스레딩, 동기화, 스레드 세이프(Thread-safety), 동시 데이터 구조나 논블로킹 I/O 등에 대한 걱정을 없애고 Observable을 조정할 수 있는 연산자를 지원한다.

Rx는 Observer, Iterator 패턴, 그리고 함수형 프로그래밍의 좋은 부분만을 합한 것과 같다고 한다. 이벤트 스트림이나 데이터 스트림을 생성하고(Create), 스트림을 오퍼레이터로 결합하고 변형하며(Combine), 사이드 이펙트를 부르는 관찰 가능한 스트림(observable stream)을 구독한다(Listen).

 

Rx를 이용한 프로그래밍 패러다임을 반응형 프로그래밍(Reactive programming)이라고 하는데, 데이터 스트림과 변경 사항 전파를 중심으로 하는 비동기 프로그래밍 패러다임을 뜻한다. 이런 반응형 프로그래밍에서는 Observable 만들고, Observable의 변화를 subscribe해서 변화가 일어날 때마다 그에 반응해 처리가 일어나게 한다.

Rx는 Observable들을 필터링하고, 선택하고, 변형하고, 합치고, 구성하는 등의 오퍼레이터들을 제공한다.

 

 

Rx의 구성요소

Observable, Operator, Scheduler가 있다. 하나 더 더하자면 Subject가 있는데, 이것은 Observable도 될 수 있고 Observer도 될 수 있는 것이다.

RxSwift의 장점으로, 비동기 코드를 동기 코드처럼 작성할 수 있다는 것을 꼽을 수 있다.

단점으로는 디버깅이 어려울 수 있고, 클로저 사용이 많아 retain cycle이 생기지 않도록 주의해야 한다는 점, 그리고 러닝 커브가 조금 있다는 점이 있다. 

 

 

Observable은 뭘까?

RxSwift에서, Observable은 기본적으로 스위프트의 Sequence와 비슷한 개념이라고 한다.

스위프트에서 Sequence는 프로토콜로, 이 프로토콜을 준수하는 타입들은 자기 자신의 요소 값을 반복자로 접근하고 순회할 수 있다. 대표적으로는 Collection 타입이 Sequence 프로토콜을 준수하고 있다.

이렇게 마블 다이어그램을 통해서 Observable과 전환을 표현할 수 있다.

  • 박스 위의 선은 Observable의 타임라인이다. 시간은 왼쪽부터 오른쪽으로 흐른다.
  • 선 위의 아이템들은 Observable이 방출하는 아이템들이다.
  • 위의 선의 세로선은 Observable이 성공적으로 종료됐다는 것을 의미한다.
  • 아이템 하단에서 내려가는 점선은 Observable에 변형이 가해진다는 의미이다. 박스 안의 글자가 변형을 의미하는 메소드 이름 혹은 변형 자체를 설명한다.
  • 박스 아래의 선은 변형의 결과인 Observable을 나타낸다.
  • 어떤 이유로 Observable이 비정상적으로 종료된다면 세로선은 X 자로 대체된다. 즉, X 자는 에러 상황을 의미한다.

 

 

Observable의 특징

  • 하나 이상의 Observer들이 구독할 수 있으며, 이 Observer들은 Observable이 어떤 아이템을 방출하면 실시간으로 반응한다.
  • Observable은 3가지 유형의 이벤트를 방출한다.
    • next(Element) // 최신, 다음 데이터를 전달한다.
    • error(Swift.Error) // 에러로 Observable이 종료된다. 
    • completed()
  • Observable이 방출하는 아이템 시퀀스는 무한할 수도 있고 유한할 수도 있다.

 

Observable을 왜 쓸까?

Observable 모델을 사용함으로써, 비동기 이벤트의 스트림을 단순하고 구성 가능한 작업으로 처리할 수 있다. (마치 배열 같은 컬렉션을 쓰는 것처럼) 콜백 지옥에서 해방될 수 있으며, 코드의 가독성이 좋아지고 버그의 가능성도 낮아진다!

Rx에서 Observer들은 Observable을 구독한다. 그리고 Observable이 emit(이하 방출)하는 한 개의 아이템 혹은 아이템 시퀀스에 Observer들이 반응한다. 1개의 Observable에 1개 이상의 다수의 Observer들이 붙을 수 있다. 말인즉슨 동시성 연산이 가능하다는 의미이다.

그래서 기존에 코드를 짜던 방식과는 좀 다르게 생각해야 한다.

어떤 일들이 순차적으로 일어나게 하고 싶은가? 그렇다면 Observer를 1개만 붙이고, onNext에 순차적으로 일어나게 하고 싶은 일들을 클로저로 넘겨야 할 것이다.

순서와 상관없이, 서로 의존 관계가 없는 여러 작업이 동시에 일어나게 하고 싶은가? 그렇다면 각 작업을 하는 Observer들이 Observable을 subscribe하게 만들면 된다. 같은 Observable을 관찰하고 있는 Observer들끼리는 (개발자가 정하지 않는)임의의 순서에 따라, 그저 Observable이 뭔가를 방출하면 그에 반응할 뿐이다.

 

Observable은 유연하다

Rx Observable은 단순한 단일값만 방출(emit)하지 않고, 값의 시퀀스, 심지어 무한 스트림까지 방출할 수 있다.

  • 값 반환 -> onNext(T)
  • 에러 발생 -> onError(Exception)
  • 완료 -> onCompleted()

 

일반적인 흐름과 비동기 모델의 흐름

일반적으로는 메소드를 호출했을 때 대개 다음과 같은 흐름으로 진행된다고 한다.

  1. 메소드를 호출
  2. 메소드의 반환값을 변수에 저장
  3. 결과 값을 가진 변수를 통해 필요 연산을 처리

문서에서는 파이썬으로 설명하고 있는데, 이것을 swift로 바꿔보겠다.

// 메서드를 호출하고, 리턴 값을 `returnVal`에 할당한다
let returnVal = someMethod(itsParameters);
// returnVal을 통해 필요한 작업을 진행한다

비동기 모델에서는 아래와 같은 흐름대로 코드가 실행된다.

  1. 메소드를 먼저 정의한다. 이 메소드는 비동기 메소드 호출로 결과를 반환받고, 필요한 동작을 처리한다. (이 메소드는 Observer의 일부이다)
  2. Observable로 비동기 호출을 정의한다.
  3. subscribe로 Observer를 Observable에 구독시킨다. Observable 동작을 초기화한다.
  4. 필요한 코드를 계속 구현한다. 메소드 호출로 결과를 반환받을 때마다, Observer의 메소드는 반환값이나 emit 값들을 사용해 연산한다.
// 옵저버의 onNext 핸들러를 정의한다, 하지만 실행하지는 않는다
// (이 예제에서는, 단순히 옵저버에 onNext 핸들러만 구현한다)
let myOnNext = { it -> /* 필요한 연산을 처리한다 */ };
// Observable을 정의하지만, 역시 실행하지는 않는다
let myObservable = someObservable(itsParameters);
// 옵저버가 Observable을 구독한다. 그리고 Observable을 실행한다
myObservable.subscribe(myOnNext);
// 필요한 코드를 구현한다
let myOnNext = { print($0) }	// onNext 메소드를 구현한다.
let myObservable = Observable.of(1, 2, 3)	// Observable을 생성하고, 초기화한다.
myObservable.subscribe(onNext: myOnNext).dispose()	// subscribe한다.
// 1 2 3

주의할 점은, myObservable.subscribe(myOnNext)라서 자칫하면 myObservable이 myOnNext를 subscribe하는 것처럼 보일 수도 있다는 건데 myObservable은 여기서 Observable<Int> 타입이다.

여기에서는 사실상 "Observer"라는 객체가 명시적으로 생성되지는 않는다. 다만 Observable에 .subscribe를 붙이는 것 자체가 이 Observable을 구독하는 Observer가 생긴다는 의미인 것이다. (처음에, 이걸 이해를 못 해서 너무 어려웠다 -_-)

 

onNext, onCompleted, onError

subscribe 메소드를 통해 Observer와 Observable을 연결한다. subscribe 메소드 내에서는 onNext, onError, onCompleted, onDisposed를 구현하게 된다.

  • onNext((Self.Element) -> Void)? : Observable이 새로운 아이템을 방출할 때마다 이 메소드를 호출한다. Observable이 방출하는 아이템을 파라미터로 받는다. 예를 들어 위의 myObservable 예제에서 onNext의 매개변수 타입은 Int이다.
  • onError((Error) -> Void)? : Observable에 오류가 생겼거나 할 경우 오류를 알리기 위해 이 메소드를 호출한다. 이 메소드 이후에는 onNext, onCompleted가 호출되지 않고 해당 Observable이 종료되며 구독 중인 Observer들도 더 이상 아이템 방출 이벤트를 받을 수 없다(연예인 인스타 라이브가 네트워크 문제로 강제종료되어 시청자들이 강제퇴장당한다고 생각하면 쉽다). onError 메소드는 오류 정보를 갖고 있는 Error 프로토콜 준수 객체를 매개변수로 받는다.
  • onCompleted(() -> Void)? : Observable이 오류 없이 마지막 onNext를 호출했다면, 이 메소드를 호출한다. 추가적으로 더 이상 아이템을 방출하지 않음을 의미하며, 성공적으로 생명주기가 끝난 것이다. 
  • onDisposed(() -> Void)? : Observer에는 있는 메소드로, Observer가 Observable에 대한 구독을 취소하거나 메모리에서 해제(dispose)될 때 호출한다. 

구독 취소에 대해 좀 더 설명하자면, Observer가 어떤 Observable을 더 이상 구독하길 원하지 않는 경우 dispose()를 호출해서 구독을 해지할 수 있다. 그렇게 해서 Observer들이 하나하나 떨어져나가서 Observable에 어떤 Observer도 없다면, Observable은 새로운 아이템을 방출하지 않게 된다.

 

(?) dispose는 Observer가 구독을 취소하는 것인가, Observable이 이벤트 방출을 멈추는 것인가?

onDisposed에 대해서 더 조사해볼 것.

 

Hot Observable / Cold Observable

Observable의 종류에 따라 연속된 아이템들을 방출하는 시기가 다른데, Hot의 경우 Observer의 유무와 상관없이 아이템을 방출할 수 있다. 그래서 생성되자마자 아이템을 방출하기도 한다. 그래서 이 Observable을 구독하는 Observer들은 중간부터 구독해야 할 수도 있다.

반대로 Cold의 경우 Observer 구독이 있기 전까지는 아이템을 방출하지 않기 때문에, 이 Observable을 구독하는 Observer들은 방출하는 아이템 전체를 구독할 수 있게 된다. 이 Cold Observable의 개념이 흔히 말하는 Observable의 개념(Observable은 단순히 시퀀스 정의일 뿐 Observer가 생기기 전까지 아무것도 하지 않는다는)이다.

 

 

 

 

References

 

728x90
반응형
Comments