본문 바로가기
ReactiveX (Rx)

[RxJava] ConnectableObservable, Subject에 대해 알아보자

by 안솝우화 2022. 4. 4.
반응형

ConnectableObservable

ConnectableObservable는 뜨거운 Observable을 만들기 위한 방법 중 하나입니다
뜨거운 Observable과 차가운 Observable에 대해 궁금하시다면 아래 글을 확인해 주세요

2022.03.30 - [Reactivex (Rx)] - [RxJava] Cold Observable, Hot Observable에 대해 알아보자

 

[RxJava] Cold Observable, Hot Observable에 대해 알아보자

Cold Observable 이란? 일반적으로 사용하는 Observable이 바로 Cold Observable이다 Subscribe(구독)할 때 데이터를 발행해줍니다 (여기서 말하는 Subscribe와 Subscriber은 다른 겁니다!!) Cold 생산자는 1개의..

asuhdevstory.tistory.com

 

바로 코드를 보고 설명드리겠습니다

        val mUserList = mutableListOf<User>(
            User(1, "demo1", 15),
            User(2, "demo2", 10),
            User(3, "demo3", 35),
            User(4, "demo4", 29),
            User(5, "demo5", 12),
            User(6, "demo6", 29),
            User(7, "demo7", 29),
            User(7, "demo7", 29),
            User(8, "demo8", 15),
            User(8, "demo8", 15)
    	)

        fun hotObservable() : ConnectableObservable<User>{
    		return Observable.fromIterable(mUserList).publish() //1
		}
        
        val hotObservable = hotObservable()
        hotObservable.subscribe(
            {
                Log.d(MainActivity.TAG, "onNext : $it")
            }, {
                Log.d(MainActivity.TAG, "onError : $it")
            }, {
                Log.d(MainActivity.TAG, "onComplete")
            }
        ) //2

        hotObservable.connect() //3

        hotObservable.subscribe(
            {
                Log.d(MainActivity.TAG, "onNext : $it")
            }, {
                Log.d(MainActivity.TAG, "onError : $it")
            }, {
                Log.d(MainActivity.TAG, "onComplete 2")
            }
        ) //4

주석대로 설명해 드리겠습니다

1 : publish() 메서드를 사용하여 차가운 옵저버블에서 뜨거운 옵저버블로 변환합니다, 때문에 hotObservable() 함수의 반환형은 ConnectableObservable이 되게 됩니다

2 : 뜨거운 옵저버블은 구독한다고 해서 바로 데이터를 발행하지 않습니다

3 : publish()로 데이터 발행을 늦춘 상태에서 connect()를 호출해 데이터를 발행한다

4 : connect()를 호출하고 나서의 구독은 데이터를 발행하지 않습니다

아래 로그를 확인해 봅시다!

실행된 로그

 

 

Subject

Subject는 차가운 Observable을 뜨거운 Observable로 만들기 위한 클래스입니다

Subject 클래스는 Observable의 속성과 Observer의 속성이 모두 있습니다 (subscribe와 Observable의 연산자 사용 가능)

Observable 속성 : 데이터 발행 가능
Observer 속성 : 발행된 데이터 처리 가능

그런데 Subject는 가능하면 사용하지 말라고 권장합니다

이유는 mutable 하기 때문에 함수형 프로그래밍에 적합하지 않고 Observable의 Reactive Stream(논 블록킹 비동기 스트림의 프로그래밍 표준) 규약을 깨뜨리기 쉽기 때문입니다

 

AsyncSubject

첫 번째로 소개해 드릴 건 AsyncSubject입니다

구독자가 구독을 하게 되면 가장 마지막 발행한 데이터를 얻을 수 있습니다

아래 코드로 설명을 이어서 드리겠습니다

fun asyncSubject(){
    val subject = AsyncSubject.create<Int>()
    subject.onNext(1)

    subject.subscribe(
        {
            Log.d(MainActivity.TAG, "onNext : $it")
        }, {
            Log.d(MainActivity.TAG, "onError : $it")
        }, {
            Log.d(MainActivity.TAG, "onComplete ")
        }
    )
    subject.subscribe(
        {
            Log.d(MainActivity.TAG, "onNext2 : $it")
        }, {
            Log.d(MainActivity.TAG, "onError2 : $it")
        }, {
            Log.d(MainActivity.TAG, "onComplete2 ")
        }
    )
    subject.onNext(2)
    subject.onNext(3)
    subject.onComplete()
}

//호출 부분
asyncSubject()

실행 로그

가장 마지막 값인 3이 출력이 되는 걸 확인해 볼 수 있습니다

onComplete()를 호출하지 않으면 아무 값도 찍히지 않습니다

 

BehaviorSubject

구독자(Observer)가 구독을 하면 가장 최근 값 또는 기본값으로 데이터를 발행합니다 (기본값을 설정할 수 있습니다)

그리고 후속 데이터를 방출합니다

fun behaviorSubject(){
    val subject = BehaviorSubject.create<Int>()
    subject.onNext(0)
    subject.onNext(1)

    subject.subscribe(
        {
            Log.d(MainActivity.TAG, "onNext1 : $it")
        }, {
            Log.d(MainActivity.TAG, "onError1 : $it")
        }, {
            Log.d(MainActivity.TAG, "onComplete1 ")
        }
    )
    subject.onNext(2)


    subject.subscribe(
        {
            Log.d(MainActivity.TAG, "onNext2 : $it")
        }, {
            Log.d(MainActivity.TAG, "onError2 : $it")
        }, {
            Log.d(MainActivity.TAG, "onComplete2 ")
        }
    )
}

//호출 코드
behaviorSubject()

실행 로그

첫 번째 구독자는 가장 최근 값인 1을 발행하고 뒤에 오는 2를 발행했습니다

두 번째 구독자는 가장 최근 값인 2를 발행했고 뒤에 오는 후속 데이터는 없기 때문에 발행되지 않았습니다

 

PublishSubject

구독자가 구독을 시작하면 그 시간 뒤로 발생한 데이터를 발행한다

fun publishSubject(){
    val subject = PublishSubject.create<Int>()
    subject.onNext(0)
    subject.onNext(1)

    subject.subscribe(
        {
            Log.d(MainActivity.TAG, "onNext1 : $it")
        }, {
            Log.d(MainActivity.TAG, "onError1 : $it")
        }, {
            Log.d(MainActivity.TAG, "onComplete1 ")
        }
    )
    subject.onNext(2)
}

//호출 코드
publishSubject()

 

실행 로그

앞에서 설명한 것처럼 구독을 시작한 뒤로의 데이터를 발행하기 때문에 0, 1은 발행되지 않고 2 하나만 발행되었습니다

 

ReplaySubject

옵서버가 구독을 시작하면 시점과 관계없이 옵저버블이 발행한 모든 데이터들을 모든 옵서버에게 발행해줍니다
비유를 하자면 영상을 저장해 두었다가 모든 사람들에게 보여주는 것이라고 볼 수 있습니다
때문에 영상(데이터)을 저장하는 과정 중에 메모리 누수가 일어날 수도 있기 때문에 조심해야 합니다

fun replaySubject(){
    val subject = ReplaySubject.create<Int>()
    subject.onNext(0)
    subject.onNext(1)
    subject.subscribe(
        {
            Log.d(MainActivity.TAG, "onNext1 : $it")
        }, {
            Log.d(MainActivity.TAG, "onError1 : $it")
        }, {
            Log.d(MainActivity.TAG, "onComplete1 ")
        }
    )

    subject.onNext(2)
    subject.onNext(3)
    subject.subscribe(
        {
            Log.d(MainActivity.TAG, "onNext2 : $it")
        }, {
            Log.d(MainActivity.TAG, "onError2 : $it")
        }, {
            Log.d(MainActivity.TAG, "onComplete2 ")
        }
    )


    subject.onNext(4)
    subject.onNext(5)
}

//호출 코드
replaySubject()

실행 로그

뒤에 발행된 4, 5도 모두 발행되는 걸 확인할 수 있습니다

반응형