인프런 커뮤니티 질문&답변

jh.park141님의 프로필 이미지

작성한 질문수

Kevin의 알기 쉬운 RxJava 1부

Error 발생 시에도 계속 처리 방법

22.04.04 21:42 작성

·

560

0

안녕하세요. 강의 잘 수강중인 수강생입니다.

 

받아온 리스트를 flatMap을 통해 각각 observable 작업을 만들고 순차적으로 실행할 때

중간에 하나가 실패하더라도 계속 그 다음 Observable로 넘어가도록 하는 방법은 없나요?

아니면 혹시 좋은 처리 방법이 있을지요.

try-catch를 flatMap 내부 블럭에서 쓰기엔 rxJava스럽지 않은 방법이고...

onErrorReturnNext같은 함수를 쓰기도 좀 애매하네요.

 

가운데 fetchData() 부분만 봐주시면 될 것 같습니다

 

import io.reactivex.Completable
import io.reactivex.Observable
import io.reactivex.Single
import org.mockito.ArgumentMatchers.anyInt
import org.mockito.Mockito
import org.mockito.kotlin.eq
import org.mockito.kotlin.given
import org.mockito.kotlin.whenever
import java.lang.Exception

val db = mutableListOf<String>()

fun <T : Any> safeEq(value: T): T = eq(value) ?: value

fun main() {
    val fakeSentenceMaker: SentenceMaker = Mockito.mock(SentenceMaker::class.java)
    whenever(fakeSentenceMaker.getNonLuckySentence(anyInt()))
        .thenCallRealMethod()
    whenever(fakeSentenceMaker.getLuckySentence())
        .thenCallRealMethod()
    given(fakeSentenceMaker.getNonLuckySentence(safeEq(4)))
        .willAnswer{ throw Exception("실패!") }

    val repository = Repository(fakeSentenceMaker)

    repository.fetchData()
        .subscribe({
            println("성공 >> ")
        }, {
            println("실패 >> $it")
        })

    Thread.sleep(1000)

    db.also {
        println(it)
    }
}

class Repository(
    private val sentenceMaker: SentenceMaker
) {
    private fun getList(): Single<List<Int>> {
        return Single.just(listOf(1, 2, 3, 4, 5, 6, 7, 8))
    }

    fun fetchData(): Completable {
        return getList()
            .flatMapObservable { Observable.fromIterable(it) }
            .flatMap {
                println("현재 데이터 >> $it")
                if (it == 7) {
                    sentenceMaker.getLuckySentence()
                } else {
                    sentenceMaker.getNonLuckySentence(it)
                }
            }
 //           .onErrorResumeNext(Observable.just("Error!"))
            .flatMapCompletable {
                println("before emit >> $it")
                saveData(it)
            }
    }

    fun saveData(data: String): Completable {
        return Completable.create { emitter ->
            println("emit >> $data")
            db.add(data)
            emitter.onComplete()
        }
    }

}

class SentenceMaker {
    fun getLuckySentence(): Observable<String> {
        val luckySentence = "7은 행운의 숫자입니다"

        return Observable.just(luckySentence)
    }

    fun getNonLuckySentence(num: Int): Observable<String> {
        val nonLuckySentence = "${num}은 행운의 숫자가 아닙니다"
        return Observable.just(nonLuckySentence)
    }
}

답변 2

1

Kevin님의 프로필 이미지
Kevin
지식공유자

2022. 04. 05. 10:40

네, 무슨 말씀이신지 이해했습니다. 저도 개발자 출신이기 때문에 기술에 대한 이야기 하는걸 좋아해요.ㅎ

 

다만, 제가 하반기 도서 출간 때문에 집필 중이라 사실 정신이 없거든요. (거의 여기에 시간을 집중 투자하고 있습니다. ㅜㅜ)

 

질문자님하고 이것 저것 개발 프로토타입도 만들어보면서 논의도 해보면 좋은데 그러지 못하는 점 양해 부탁드릴게요. ㅜㅜ

 

대신에 제가 시간을 조금만 투입해서 확인해 볼 수 있는 내용이면 좀 늦더라도 답변을 드리도록 하겠습니다.

 

열심히 개발하시는 모습 정말 보기 좋아보입니다. ^^

1

Kevin님의 프로필 이미지
Kevin
지식공유자

2022. 04. 05. 00:02

jh.park141님 안녕하세요?

질문해주신 내용은 잘 봤습니다.

우선 RxJava에서 error는 terminal event이기 때문에 해당 에러를 무시하고 다시 실행하려면 말씀하신대로 flatMap 내부에서 try ~ catch 문을 써서 무시하면 되겠지만(의도된 예외일 경우에만) 제가 좀 의문스러운건 위 코드 상에서는 에러가 날 소지가 있는 부분이 없는것 같은데 어떤 에러를 무시하시려는 건지 잘 모르겠네요.

 

RxJava에서는 입력으로 들어오는 데이터에 error가 발생하면 일정 횟수나 주기동안 다시 시도하든가 아니면 대체 데이터 전달 및 대체 처리 등의 방법 이외에 질문자님께서 의도하는 처리 방법은 없는 걸로 알고 있습니다.

 

다만, 어떤 상황에서 에러를 무시하고 원래 데이터 소스를 계속 진행하려는지 모르겠지만 정 에러를 무시하고 계속 처리를 하고 싶으시다면 에러 발생 시, onErrorResumeNext() 내에서 원본 데이터 소스를 다시 입력으로 받아 들인 후에 Notification을 통해 원본 데이터를 통지하면 되지 않을까 싶어요. (물론 이 경우에는 에러 발생 전에 이미 통지된 데이터가 있을테지만요.)

 

subscriber쪽에서 Notification 객체를 받은 후에 이 Notification이 onNext 이벤트인지 onError 이벤트인지 판단해서 onError 이벤트라면 무시하면 될 것 같습니다.

 

질문자님의 정확한 의도를 제가 잘 이해하지 못해서 답변이 만족스럽지 못했더라도 이해해주시면 감사드릴게요.

jh.park141님의 프로필 이미지
jh.park141
질문자

2022. 04. 05. 08:27

답변 감사합니다.

윗줄에 보시면 given ... willAnswer 부분 코드 입력에 4가 들어오면 throw하게끔 처리가 되어 있습니다 (이미 아시겠지만 Mockito를 사용하여 mock데이터를 넣고 테스트하기 위함입니다)

너무 긴 코드를 첨부해 혼란을 드렸나보네요...! 

 

그리고 질문을 드렸던 핵심 목적은 

- 이런 경우에 rxjava에서 지원하는 깔끔한 해결 방법이 있는가

- 충분히 발생할 법한 문제인데 다른 경험자 분들은 어떻게 처리하고 있는가

하는 부분이었습니다.

실제 코드에서 샘플코드와 유사한 상황이 있었는데

Flatmap 내부에서 네트워크 통신을 통하여 데이터를 받아오는 observable로 변환해주는 코드가 있었는데, 특정 아이템이 오류가 발생하면 바로 전체 observable이 종료되는 문제가 있었습니다. 

예를 들면, 1~10번 물건들이 전체 인풋 흐름이고

각각의 물건의 가격을 구하는 네트워크 흐름으로 변환하는 상황이었다고 보시면 될 것 같습니다.

 

혹시 추가적으로 답변해주실 내용 있으시면 기다리겠습니다.

감사합니다!

Kevin님의 프로필 이미지
Kevin
지식공유자

2022. 04. 05. 09:22

제가 밤에 시간이 별로 없어서 fetchData() 쪽만 집중적으로보면 된다고 얘기하셔서 윗쪽을 제대로 못봤네요. ^^;

코틀린을 아직 사용해보적이 없다보니 더 대충 본거 같습니다. 죄송합니다. ^^;

 

일단 해외 사례나 아니면 rxjava github issue 같은데서 조금 더 찾아봐야 할 것 같긴한데, Reactor의 경우에는 flatMap 내부에서 에러 발생 시, onErrorResume()을 통해서 에러를 무시할 수 있다고 나옵니다.

xxxx.flatMap(id -> repository.retrieveById(id)
                          .doOnError(System.err::println)
                          .onErrorResume(e -> Mono.empty()))

이런식으로 처리를 한다고 하니 참고하시면 될 것 같습니다.

그리고, onErrorContinue()를 통해서도 오류가 난 요소를 drop하고 계속 처리를 진행할 수 있다고 나오긴해요.

이 부분은 저도 아직 사용을 안해봤는데, 이번에 책 쓸때 포함을 시켜봐야겠습니다.

 

제가 나중에 시간이 될 때 RxJava에서 다른 대안이 있는지 한번 찾아보도록 하겠습니다.

좋은 질문 감사드릴게요

jh.park141님의 프로필 이미지
jh.park141
질문자

2022. 04. 05. 09:42

빠르고 상세한 답변 감사드립니다.

보통 RxJava로 저렇게 쓰는 경우가 많이 없나보군요 ㅎㅎ

안드로이드 개발자로 근무하고 있는데 그동안 비동기 로직으로는 코루틴을 주로 사용했었고

RxJava는 레거시 유지보수만 하다가 이번에 비동기 코드를 짤 일이 좀 있어서 새로 공부하고 있는데

생각보다 생산자, 오퍼레이터 등이 다양해서 사용하기가 좀 더 까다로운 것 같습니다.

 

추가적으로 한 가지만 더 질문드리고 싶은데

RxJava에서 여러가지 Observable을 병렬로 처리하는 best practice가 있을까요?

기존에 Observable로 정의된 함수들을 여러 개 병렬로 처리하여

나중에 결과 처리는 한 번에 할 수 있는 로직이 필요한 상황입니다.

 

현재는 flowable로 변환하여 pararell() 을 하는 방법까지만 알아낸 상황인데

혹시 더 좋은 권장 방법이 있으시다면 조언 부탁드리겠습니다.

 

그리고 혹시 제가 처음에 질문드린 상황의 경우 RxJava로는 잘 처리하지 않는 로직인가요?

만약 제가 말씀드린 상황이라면 kevin님께서는 어떤 방식으로 처리하실 것 같은지 궁금합니다.

 

같이 의견을 나눌 사람이 필요한 거라 편한 마음으로 답변 주셔도 됩니다!