onErrorDropped explained

RxJS merge operator

웹플럭스나 리액티브로 서비스를 개발하고 있다면 로그에서 onErrorDropped 메시지를 보게될 확률이 높다.  리액티브 애플리케이션을 사용하는 주된 이유는 높은 동시성을 달성하기 위한 것인데 동시성이 높아져 비동기로 여러 장소에서 데이터를 수신하는 경우 해당 에러가 자주 발생한다.

onErrorDropped가 발생 이유

이 에러 메시지는  정상적인 경우라면 오류가 발생했을 때 다운스트림으로 onError 에러를 전달해야 하지만 이미 다른 스레드에서 onError가 발생해 전체 스트림이 terminated된 상황임을 알려주는 것이다. 이미 종료된 스트림에 onError를 전달하는 행동은 리액티브 사양을 위반한다. 이 사양 링크 7번째 항목을 참고하자.

모든 것이 동기화된 세상에서는 이런 비정상적인 에러를 일어나지 않는다. Operators.onErrorDropped가 사용된 곳을 라이브러리에서 검색해보면 flatMap, merge와 같이 순서가 없이 실행되는 연산자들에서 주로 사용되고 업스트림의 순서를 보장하는 map이나 concat에서는 사용되지 않는다.

스프링 부트 애플리케이션이 WebClient를 통해 여러 데이터 세트를 동시에 가져온다면 발생한 에러는 이미 진행중인 다른 요청들에 영향을 줄 수 있다.

대처방법

onErrorDropped 로그가 너무 많다면 진짜 중요한 에러를 숨기고 있을 가느성이 높다. onErroDropped의 로그 레벨을 낮추는 몇가지 방법이 존재한다. (Project Reactor 코어에서 버전 업그레이드를 통해 조정될 가능성도 있다)

Hook을 사용

onErrorDropped 의 Hook을 설정하면 onErrorDropped가 발생했을 때 Hook에서 지정한 람다 함수가 실행된다. 프로젝트 리액터의 스케쥴러와 마찬가지로 Hook은 글로벌로 등록된다.

Hooks.onErrorDropped { log.info("onError Dropped.", it) }

onErrorContinue사용

동시성이 발생할 수 있는 기존 연산자를 확장해 각 Source 퍼블리셔들에 onErrorContinue를 추가해준다. 이 방법은 위 방법보다는 영향이 덜하며(less intrusive) 원하는 곳에만 적용이 가능하다. 다만 onErrorContinue의 사용법이 다소 복잡해 특정 연산만을 대상으로 작동함으로 메뉴얼을 잘 읽어봐야 한다. 아래는 merge연산자를 확장해

fun merge(sources: Iterable<Mono<T>>): Flux<T> = Flux.merge( 
    sources.map { 
        it.onErrorContinue { throwable, any -> log.info("Suppressed error in merging flux, {}", any, throwable)} 
}
)