반응형
안녕하세요 오늘은 Rxjava의 생산자(Observable, Single, Maybe, Completable)에 관해 알아보도록 하겠습니다
Observable
개념
Observable은 데이터 흐름에 맞게 알림을 보내줘 구독한 Observer가 데이터를 사용할 수 있도록 해줍니다
발생 이벤트
- onNext() : 하나씩 순차적으로 데이터를 발행한다
- onComplete() : 데이터 발행이 끝났을 때 호출된다
- onError() : 오류가 발생했을 때 호출된다
코드
fun createObservable() : Observable<Int> {
return Observable.create{ emitter ->
try {
if (!emitter.isDisposed){
for (i in 0..100){
emitter.onNext(i)
}
emitter.onComplete()
}
}catch (e : Exception){
emitter.onError(e)
}
}
}
fun observer() : Observer<Int>{
return object : Observer<Int>{
override fun onSubscribe(d: Disposable) {
Log.d(TAG,"onSubscribe")
}
override fun onNext(t: Int) {
Log.d(TAG,"onNext : $t")
}
override fun onError(e: Throwable) {
Log.d(TAG,"onError")
}
override fun onComplete() {
Log.d(TAG,"onComplete")
}
}
}
Observable과 Observer 생성
createObservable().subscribe(observer())
호출 코드
로그
Single
개념
앞에서 설명한 Observable과 달리 한 개의 데이터 또는 에러를 발행합니다, 때문에 네트워크 통신 등에서 자주 유용하게 사용합니다
발생 이벤트
- onSuccess : 데이터 하나를 발행하고 종료된다
- onError() : 오류가 발생했을 때 호출된다
코드
fun createSingleObservable() : Single<Int>{
return Single.create{ emitter ->
try {
if (!emitter.isDisposed){
for (i in 0..100){
emitter.onSuccess(i)
}
}
}catch (e : Exception){
emitter.onError(e)
}
}
}
fun observerSingleObservable() : SingleObserver<Int>{
return object : SingleObserver<Int>{
override fun onSubscribe(d: Disposable) {
Log.d(TAG,"onSubscribe")
}
override fun onError(e: Throwable) {
Log.d(TAG,"onError")
}
override fun onSuccess(t: Int) {
Log.d(TAG,"onSuccess : $t")
}
}
}
SingleObservable과 SingleObserver 생성
createSingleObservable().subscribe(observerSingleObservable())
호출 코드
로그
Maybe
개념
Maybe는 최대로 발행할 수 있는 데이터가 하나입니다 때문에 데이터가 없이 완료될 수도 있습니다
Single과 다른 점은 onComplete(완료) 이벤트가 추가됩니다
발생 이벤트
- onSuccess : 데이터 하나를 발행하고 종료된다
- onError() : 오류가 발생했을 때 호출된다
- onComplete : 1건의 데이터 발행이 없이 데이터 발행이 완료됐을 때 호출된다
코드
data class User(
val id : Long,
val name : String,
val age : Int
)
val mUserList = mutableListOf<User>(
User(1, "demo1", 15),
User(2, "demo2", 10),
User(3, "demo3", 35),
User(4, "demo4", 29),
User(5, "demo5", 12),
User(6, "demo6", 29),
User(7, "demo7", 29),
User(7, "demo7", 29),
User(8, "demo8", 15),
User(8, "demo8", 15)
)
data class와 list생성
fun createMaybeObservable() : Maybe<List<User>>{
return Maybe.just(mUserList)
}
fun observerMaybeObservable() : MaybeObserver<List<User>> {
return object : MaybeObserver<List<User>>{
override fun onSubscribe(d: Disposable) {
Log.d(TAG,"onSubscribe : $d")
}
override fun onSuccess(t: List<User>) {
t.forEach { Log.d(TAG,"onSuccess : $it") }
}
override fun onError(e: Throwable) {
Log.d(TAG,"onError : $e")
}
override fun onComplete() {
Log.d(TAG,"onComplete")
}
}
}
Maybebservable과 MaybeObserver 생성
createMaybeObservable().subscribe(observerMaybeObservable())
호출 코드
로그
Completable
개념
Completable은 완료와 에러만 보내는 특수한 형태의 스트림입니다 (정말 간단하지요?;)
발생 이벤트
- onComplete() : 데이터 발행이 끝났을 때 호출된다
- onError() : 오류가 발생했을 때 호출된다
코드
fun createCompletableObservable() : Completable{
return Completable.create{ emitter ->
try {
if (!emitter.isDisposed){
getLocation()
emitter.onComplete()
}
}catch (e : Exception){
emitter.onError(e)
}
}
}
fun observeCompletableObservable(): CompletableObserver{
return object : CompletableObserver{
override fun onSubscribe(d: Disposable) {
Log.d(TAG,"onSubscribe : $d")
}
override fun onComplete() {
Log.d(TAG,"onComplete")
}
override fun onError(e: Throwable) {
Log.d(TAG,"onError : $e")
}
}
}
private fun getLocation() {
//onError를 호출하려면 아래 throw Exception을 사용하면 된다
//throw Exception("에러 ")
Log.d(TAG, "Latitude : 102.0303 Longitude : 1.2355")
}
Completablebservable과 CompletableObserver 생성
createCompletableObservable().subscribe(observeCompletableObservable())
호출 코드
로그
여러 데이터 스트림이 존재하므로 상황에 따라 맞는 스트림을 사용하면 될 것 같습니다
안드에서는 네트워크 통신 때문에 Single를 많이 사용하는 것 같습니다
반응형
'ReactiveX (Rx)' 카테고리의 다른 글
[RxJava] ConnectableObservable, Subject에 대해 알아보자 (0) | 2022.04.04 |
---|---|
[RxJava] Cold Observable, Hot Observable에 대해 알아보자 (0) | 2022.03.30 |
[RxJava] 생산자와 소비자, 데이터 스트림이란? (Observable, Observer, Flowable, Subscriber) (0) | 2022.03.29 |
RxJava, RxAndroid Scheduler에 대해 알아보자 (0) | 2022.03.29 |
[RxJava] Observable 생성 연산자(Operators) (0) | 2022.03.28 |