본문 바로가기
ReactiveX (Rx)

[RxJava] Observable, Single, Maybe, Completable 이란?

by 안솝우화 2022. 3. 24.
반응형

안녕하세요 오늘은 Rxjava의 생산자(Observable, Single, Maybe, Completable)에 관해 알아보도록 하겠습니다

Observable

개념

Observable은 데이터 흐름에 맞게 알림을 보내줘 구독한 Observer가 데이터를 사용할 수 있도록 해줍니다

발생 이벤트

  • onNext() : 하나씩 순차적으로 데이터를 발행한다
  • onComplete() : 데이터 발행이 끝났을 때 호출된다
  • onError() : 오류가 발생했을 때 호출된다

코드

fun createObservable() : Observable<Int> {
    return Observable.create{ emitter ->
        try {
            if (!emitter.isDisposed){
                for (i in 0..100){
                    emitter.onNext(i)
                }
                emitter.onComplete()
            }
        }catch (e : Exception){
            emitter.onError(e)
        }
    }
}

fun observer() : Observer<Int>{
    return object : Observer<Int>{
        override fun onSubscribe(d: Disposable) {
            Log.d(TAG,"onSubscribe")
        }

        override fun onNext(t: Int) {
            Log.d(TAG,"onNext : $t")
        }

        override fun onError(e: Throwable) {
            Log.d(TAG,"onError")
        }

        override fun onComplete() {
            Log.d(TAG,"onComplete")
        }

    }
}

Observable과 Observer 생성

createObservable().subscribe(observer())

호출 코드

로그

1에서 100까지 잘 찍히고 onComplete 호출 (앞쪽은 길어서 생략)

 

 

Single

개념

앞에서 설명한 Observable과 달리 한 개의 데이터 또는 에러를 발행합니다, 때문에 네트워크 통신 등에서 자주 유용하게 사용합니다

발생 이벤트

  • onSuccess : 데이터 하나를 발행하고 종료된다
  • onError() : 오류가 발생했을 때 호출된다

코드

fun createSingleObservable() : Single<Int>{
    return Single.create{ emitter ->
        try {
            if (!emitter.isDisposed){
                for (i in 0..100){
                    emitter.onSuccess(i)
                }
            }
        }catch (e : Exception){
            emitter.onError(e)
        }
    }
}

fun observerSingleObservable() : SingleObserver<Int>{
    return object : SingleObserver<Int>{
        override fun onSubscribe(d: Disposable) {
            Log.d(TAG,"onSubscribe")
        }

        override fun onError(e: Throwable) {
            Log.d(TAG,"onError")
        }

        override fun onSuccess(t: Int) {
            Log.d(TAG,"onSuccess : $t")
        }

    }
}

SingleObservable과 SingleObserver 생성

 createSingleObservable().subscribe(observerSingleObservable())

호출 코드

로그

onSubscribe를 시작 시 호출 후 한번만 호출

 

 

Maybe

개념

Maybe는 최대로 발행할 수 있는 데이터가 하나입니다 때문에 데이터가 없이 완료될 수도 있습니다
Single과 다른 점은 onComplete(완료) 이벤트가 추가됩니다

발생 이벤트

  • onSuccess : 데이터 하나를 발행하고 종료된다
  • onError() : 오류가 발생했을 때 호출된다
  • onComplete : 1건의 데이터 발행이 없이 데이터 발행이 완료됐을 때 호출된다

코드

data class User(
    val id : Long,
    val name : String,
    val age : Int
)

val mUserList = mutableListOf<User>(
    User(1, "demo1", 15),
    User(2, "demo2", 10),
    User(3, "demo3", 35),
    User(4, "demo4", 29),
    User(5, "demo5", 12),
    User(6, "demo6", 29),
    User(7, "demo7", 29),
    User(7, "demo7", 29),
    User(8, "demo8", 15),
    User(8, "demo8", 15)
    )

data class와 list생성

fun createMaybeObservable() : Maybe<List<User>>{
    return Maybe.just(mUserList)
}

fun observerMaybeObservable() : MaybeObserver<List<User>> {
    return object : MaybeObserver<List<User>>{
        override fun onSubscribe(d: Disposable) {
            Log.d(TAG,"onSubscribe : $d")
        }

        override fun onSuccess(t: List<User>) {
            t.forEach {  Log.d(TAG,"onSuccess : $it") }

        }

        override fun onError(e: Throwable) {
            Log.d(TAG,"onError : $e")
        }

        override fun onComplete() {
            Log.d(TAG,"onComplete")
        }

    }
}

Maybebservable과 MaybeObserver 생성

createMaybeObservable().subscribe(observerMaybeObservable())

호출 코드

로그

잘 호출이 된다 (뒤에 값은 길어서 생략)

 

Completable

개념

Completable은 완료와 에러만 보내는 특수한 형태의 스트림입니다 (정말 간단하지요?;)

발생 이벤트

  • onComplete() : 데이터 발행이 끝났을 때 호출된다
  • onError() : 오류가 발생했을 때 호출된다

코드

fun createCompletableObservable() : Completable{
    return Completable.create{ emitter ->
        try {
            if (!emitter.isDisposed){
                getLocation()
                emitter.onComplete()
            }
        }catch (e : Exception){
            emitter.onError(e)
        }
    }
}

fun observeCompletableObservable(): CompletableObserver{
    return object  : CompletableObserver{
        override fun onSubscribe(d: Disposable) {
            Log.d(TAG,"onSubscribe : $d")
        }

        override fun onComplete() {
            Log.d(TAG,"onComplete")
        }

        override fun onError(e: Throwable) {
            Log.d(TAG,"onError : $e")
        }

    }
}

private fun getLocation() {
	//onError를 호출하려면 아래 throw Exception을 사용하면 된다
    //throw Exception("에러 ")
    Log.d(TAG, "Latitude : 102.0303 Longitude : 1.2355")
}

Completablebservable과 CompletableObserver 생성

createCompletableObservable().subscribe(observeCompletableObservable())

호출 코드

로그

잘 마무리 되는걸 확인할 수 있다

 

여러 데이터 스트림이 존재하므로 상황에 따라 맞는 스트림을 사용하면 될 것 같습니다
안드에서는 네트워크 통신 때문에 Single를 많이 사용하는 것 같습니다

반응형