본문 바로가기
ReactiveX (Rx)

[RxJava] Observable 생성 연산자(Operators)

by 안솝우화 2022. 3. 28.
반응형

Observable 생성 연산자 또는 Operator는 말 그대로 Observable을 생성하는 연산자입니다

공식문서에서는 다음과 같이 설명하고 있습니다

Creating Observables
Operators that originate new Observables.

 

오늘은 생성자 중 어떤 연산자들이 있는지 알아보도록 하겠습니다

 

create 연산자

함수 내부에서 emitter가 직접 onNext, onComplete, onError 등으로 데이터를 전달 가능한 연산자입니다

val mListNum = mutableListOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12)

fun createOperator() : Observable<Int>{
    return Observable.create(ObservableOnSubscribe<Int> {
        try {
            for(i in mListNum){
                it.onNext(i * 5)
            }

            it.onComplete()

        }catch (e : Exception){
            it.onError(e)
        }
    })
}

간단하게 mListNum에 들어있는 숫자들의 값에 *5를 해서 보여주고 onComplete를 호출하거나 오류가 나면 onError를 호출해주는 코드입니다

		createOperator().subscribe({
                  Log.d(MainActivity.TAG, "onNext : $it")
              },{
                  Log.d(MainActivity.TAG, "onError : $it")
              },{
                  Log.d(MainActivity.TAG, "onComplete")
              })

직접 호출하는 코드 부분입니다, 람다식을 이용해 간단하게 작성했습니다

 

just 연산자

최대 10개 짜기의 데이터를 전달할 수 있는 just 연산자입니다. 10개 이상이 넘어가면 에러가 발생합니다

fun justOperator() {
    val observable = Observable.just(1, 2, 3, 4, 5)

    val observer = object : Observer<List<Int>> {
        override fun onSubscribe(d: Disposable) {
            Log.d(MainActivity.TAG, "onSubscribe")
        }

        override fun onError(e: Throwable) {
            Log.d(MainActivity.TAG, "onError : $e")
        }

        override fun onComplete() {
            Log.d(MainActivity.TAG, "onComplete")
        }

        override fun onNext(t: List<Int>) {
            Log.d(MainActivity.TAG, "onNext : $t")
        }

    }

    observable.subscribe(observer)
}

이번에는 람다식을 사용하지 않고 호출해 보겠습니다, 정상 출력이 되는 걸 확인하실 수 있습니다
여기서 만약 list의 size가 10이 넘어가면 어떻게 될까요??
정답은 잘 호출됩니다!

val mListNum = mutableListOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12)

fun justOperator() {
    val observable = Observable.just(mListNum)

    val observer = object : Observer<List<Int>> {
        override fun onSubscribe(d: Disposable) {
            Log.d(MainActivity.TAG, "onSubscribe")
        }

        override fun onError(e: Throwable) {
            Log.d(MainActivity.TAG, "onError : $e")
        }

        override fun onComplete() {
            Log.d(MainActivity.TAG, "onComplete")
        }

        override fun onNext(t: List<Int>) {
            Log.d(MainActivity.TAG, "onNext : $t")
        }

    }

    observable.subscribe(observer)
}

잘 출력되는 걸 확인할 수 있습니다

 

interval 연산자

정해진 시간 동안 계속해서 0부터 1씩 증가합니다

fun intervalOperator(): Observable<Long> {
    return Observable.interval(1, TimeUnit.SECONDS)
}

위 코드는 1초에 1씩 증가하게 만든 코드입니다

          intervalOperator().subscribe({
                Log.d(MainActivity.TAG, "onNext : $it")
            },{
                Log.d(MainActivity.TAG, "onError : $it")
            },{
                Log.d(MainActivity.TAG, "onComplete")
            })

호출을 해보면 잘 찍히는 것을 확인할 수 있습니다. 이때 로그는 계속해서 찍히게 됩니다

fun intervalOperator(): Observable<Long> {
    return Observable.interval(1, TimeUnit.SECONDS).takeWhile { value ->
        value <= 10
    }
}

아래 코드로 intervalOperator()를 변경하게 되면 takeWhile을 이용해 10까지만 실행이 되고 끝나는 것을 확인해 보실 수 있습니다

 

range 연산자

start부터 count까지 1씩 증가하는 연산자입니다

fun rangeOperator(): Observable<Int> {
    return Observable.range(1, 1900)
}

1부터 1900까지 1씩 증가하라는 코드입니다

rangeOperator().subscribe({
    Log.d(MainActivity.TAG, "onNext : $it")
},{
    Log.d(MainActivity.TAG, "onError : $it")
},{
    Log.d(MainActivity.TAG, "onComplete")
})

1부터 1900까지 로그가 잘 찍히고 onComplete가 호출된 걸 확인해 보실 수 있습니다

 

repeat 연산자

설정한 횟수만큼 반속 시켜주는 연산자입니다

fun repeatOperator(): Observable<Int> {
    return Observable.range(1, 10).repeat(2)
}

바로 위에서 배운 range 연산자를 사용해 보겠습니다

repeatOperator().subscribe({
    Log.d(MainActivity.TAG, "onNext : $it")
},{
    Log.d(MainActivity.TAG, "onError : $it")
},{
    Log.d(MainActivity.TAG, "onComplete")
})

예상하신 데로 1부터 10까지 로고가 2번 찍히고 onComplete가 호출됩니다!

 

timer 연산자

정한 delay만큼 기다렸다가 0을 전달하는 연산자입니다

fun timerOperator(): Observable<Long> {
    return Observable.timer(5, TimeUnit.SECONDS)
}

5초 딜레이를 걸어줍니다

timerOperator().subscribe({
    Log.d(MainActivity.TAG, "onNext : $it")
},{
    Log.d(MainActivity.TAG, "onError : $it")
},{
    Log.d(MainActivity.TAG, "onComplete")
})

5초 뒤에 0이 1번 찍히는 걸 확인할 수 있습니다

 

from 연산자

Array, Iterable, Callable로부터 Observable을 만들어주는 연산자입니다
실제 코드상에서는 fromArray, fromIterable, fromCallable로 사용한다

val mListNum : MutableList<Int> = mutableListOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12)

fun fromIterableOperator() {
    val observable = Observable.fromIterable(mListNum)

    val observer = object : Observer<Int> {
        override fun onSubscribe(d: Disposable) {
            Log.d(MainActivity.TAG, "onSubscribe")
        }

        override fun onError(e: Throwable) {
            Log.d(MainActivity.TAG, "onError : $e")
        }

        override fun onComplete() {
            Log.d(MainActivity.TAG, "onComplete")
        }

        override fun onNext(t: Int) {
            Log.d(MainActivity.TAG, "onNext : $t")
        }
    }

    observable.subscribe(observer)
}

예시 코드로는 fromIterable로 작성했습니다.
처음 구독하 onSubscribe가 호출되고 1부터 12까지 잘 호출된 후 onComplete가 호출됩니다

반응형