Observable 생성 연산자 또는 Operator는 말 그대로 Observable을 생성하는 연산자입니다
공식문서에서는 다음과 같이 설명하고 있습니다
Creating Observables
Operators that originate new Observables.
오늘은 생성자 중 어떤 연산자들이 있는지 알아보도록 하겠습니다
create 연산자
함수 내부에서 emitter가 직접 onNext, onComplete, onError 등으로 데이터를 전달 가능한 연산자입니다
val mListNum = mutableListOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12)
fun createOperator() : Observable<Int>{
return Observable.create(ObservableOnSubscribe<Int> {
try {
for(i in mListNum){
it.onNext(i * 5)
}
it.onComplete()
}catch (e : Exception){
it.onError(e)
}
})
}
간단하게 mListNum에 들어있는 숫자들의 값에 *5를 해서 보여주고 onComplete를 호출하거나 오류가 나면 onError를 호출해주는 코드입니다
createOperator().subscribe({
Log.d(MainActivity.TAG, "onNext : $it")
},{
Log.d(MainActivity.TAG, "onError : $it")
},{
Log.d(MainActivity.TAG, "onComplete")
})
직접 호출하는 코드 부분입니다, 람다식을 이용해 간단하게 작성했습니다
just 연산자
최대 10개 짜기의 데이터를 전달할 수 있는 just 연산자입니다. 10개 이상이 넘어가면 에러가 발생합니다
fun justOperator() {
val observable = Observable.just(1, 2, 3, 4, 5)
val observer = object : Observer<List<Int>> {
override fun onSubscribe(d: Disposable) {
Log.d(MainActivity.TAG, "onSubscribe")
}
override fun onError(e: Throwable) {
Log.d(MainActivity.TAG, "onError : $e")
}
override fun onComplete() {
Log.d(MainActivity.TAG, "onComplete")
}
override fun onNext(t: List<Int>) {
Log.d(MainActivity.TAG, "onNext : $t")
}
}
observable.subscribe(observer)
}
이번에는 람다식을 사용하지 않고 호출해 보겠습니다, 정상 출력이 되는 걸 확인하실 수 있습니다
여기서 만약 list의 size가 10이 넘어가면 어떻게 될까요??
정답은 잘 호출됩니다!
val mListNum = mutableListOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12)
fun justOperator() {
val observable = Observable.just(mListNum)
val observer = object : Observer<List<Int>> {
override fun onSubscribe(d: Disposable) {
Log.d(MainActivity.TAG, "onSubscribe")
}
override fun onError(e: Throwable) {
Log.d(MainActivity.TAG, "onError : $e")
}
override fun onComplete() {
Log.d(MainActivity.TAG, "onComplete")
}
override fun onNext(t: List<Int>) {
Log.d(MainActivity.TAG, "onNext : $t")
}
}
observable.subscribe(observer)
}
잘 출력되는 걸 확인할 수 있습니다
interval 연산자
정해진 시간 동안 계속해서 0부터 1씩 증가합니다
fun intervalOperator(): Observable<Long> {
return Observable.interval(1, TimeUnit.SECONDS)
}
위 코드는 1초에 1씩 증가하게 만든 코드입니다
intervalOperator().subscribe({
Log.d(MainActivity.TAG, "onNext : $it")
},{
Log.d(MainActivity.TAG, "onError : $it")
},{
Log.d(MainActivity.TAG, "onComplete")
})
호출을 해보면 잘 찍히는 것을 확인할 수 있습니다. 이때 로그는 계속해서 찍히게 됩니다
fun intervalOperator(): Observable<Long> {
return Observable.interval(1, TimeUnit.SECONDS).takeWhile { value ->
value <= 10
}
}
아래 코드로 intervalOperator()를 변경하게 되면 takeWhile을 이용해 10까지만 실행이 되고 끝나는 것을 확인해 보실 수 있습니다
range 연산자
start부터 count까지 1씩 증가하는 연산자입니다
fun rangeOperator(): Observable<Int> {
return Observable.range(1, 1900)
}
1부터 1900까지 1씩 증가하라는 코드입니다
rangeOperator().subscribe({
Log.d(MainActivity.TAG, "onNext : $it")
},{
Log.d(MainActivity.TAG, "onError : $it")
},{
Log.d(MainActivity.TAG, "onComplete")
})
1부터 1900까지 로그가 잘 찍히고 onComplete가 호출된 걸 확인해 보실 수 있습니다
repeat 연산자
설정한 횟수만큼 반속 시켜주는 연산자입니다
fun repeatOperator(): Observable<Int> {
return Observable.range(1, 10).repeat(2)
}
바로 위에서 배운 range 연산자를 사용해 보겠습니다
repeatOperator().subscribe({
Log.d(MainActivity.TAG, "onNext : $it")
},{
Log.d(MainActivity.TAG, "onError : $it")
},{
Log.d(MainActivity.TAG, "onComplete")
})
예상하신 데로 1부터 10까지 로고가 2번 찍히고 onComplete가 호출됩니다!
timer 연산자
정한 delay만큼 기다렸다가 0을 전달하는 연산자입니다
fun timerOperator(): Observable<Long> {
return Observable.timer(5, TimeUnit.SECONDS)
}
5초 딜레이를 걸어줍니다
timerOperator().subscribe({
Log.d(MainActivity.TAG, "onNext : $it")
},{
Log.d(MainActivity.TAG, "onError : $it")
},{
Log.d(MainActivity.TAG, "onComplete")
})
5초 뒤에 0이 1번 찍히는 걸 확인할 수 있습니다
from 연산자
Array, Iterable, Callable로부터 Observable을 만들어주는 연산자입니다
실제 코드상에서는 fromArray, fromIterable, fromCallable로 사용한다
val mListNum : MutableList<Int> = mutableListOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12)
fun fromIterableOperator() {
val observable = Observable.fromIterable(mListNum)
val observer = object : Observer<Int> {
override fun onSubscribe(d: Disposable) {
Log.d(MainActivity.TAG, "onSubscribe")
}
override fun onError(e: Throwable) {
Log.d(MainActivity.TAG, "onError : $e")
}
override fun onComplete() {
Log.d(MainActivity.TAG, "onComplete")
}
override fun onNext(t: Int) {
Log.d(MainActivity.TAG, "onNext : $t")
}
}
observable.subscribe(observer)
}
예시 코드로는 fromIterable로 작성했습니다.
처음 구독하 onSubscribe가 호출되고 1부터 12까지 잘 호출된 후 onComplete가 호출됩니다
'ReactiveX (Rx)' 카테고리의 다른 글
[RxJava] ConnectableObservable, Subject에 대해 알아보자 (0) | 2022.04.04 |
---|---|
[RxJava] Cold Observable, Hot Observable에 대해 알아보자 (0) | 2022.03.30 |
[RxJava] 생산자와 소비자, 데이터 스트림이란? (Observable, Observer, Flowable, Subscriber) (0) | 2022.03.29 |
RxJava, RxAndroid Scheduler에 대해 알아보자 (0) | 2022.03.29 |
[RxJava] Observable, Single, Maybe, Completable 이란? (0) | 2022.03.24 |