게시글
질문&답변
Webflux의 Non-Blocking 특성에 대해 문의 드립니다.
안녕하세요. Non-Blocking에 관해서 질문을 주셨는데요.WebFlux에서 Non-Blocking이라는 개념은,Java에서 쓰레드를 추가로 할당해서 동시성의 형태로 여러 요청을 번갈아 처리하는 구조라기보다는,요청을 처리하는 쓰레드(이벤트 루프)가 특정 작업에 묶이지 않고 자유롭게 움직일 수 있는 상태, 즉 Blocking 되지 않은 상태를 말합니다.즉, Reactor 체인에서 별도의 쓰레드를 사용하지 않아도, Reactor-Netty 서버는 각 요청을 처리하는 흐름이 Blocking되지 않으면 하나의 이벤트 루프 ㅆ레드로도 여러 요청을 빠르게 처리할 수 있습니다.하지만 만약 어떤 요청이 순차적으로만 처리되고, 다른 요청이 기다리는 현상이 발생한다면,그건 보통 요청 처리 로직에서 Blocking 연산이 있거나, CPU 연산이 오래 걸려서이벤트 루프가 다음 요청으로 넘어가지 못하고 쓰레드가 점유된 상태일 가능성이 큽니다.즉, WebFlux의 Non-Blocking은 쓰레드가 작업 도중 Blocking 되지 않게 만드는 처리 방식임을 이해하는 것이 중요하다고 생각합니다.아직 3부를 제작하지 않은 상태여서 Spring MVC와 Spring WebFlux를 비교하는 샘플 테스트에서는 클라이언트에서 단순 for문을 이용해서 요청 처리를 시뮬레이션 했는데요.만약에 실무적으로 쓰레드가 Blocking 되지 않는다 라는 사실을 확인하려면 여러 클라이언트가 동시 다발적으로 요청을 보내는 상황을 만들어야 하고, 서버측 WebFlux 애플리케이션에서도 DB 등의 I/O까지 Fully Non-Blocking하게 처리하는 상황을 만들어서 테스트 해야하는데, 말씀하신 코드를 지금 다 만들기에는 사실상 무리가 있어서 3부 강의 때 말씀드려야 할 것 같아요.이점 양해 부탁드리고, Non-Blocking의 의미는 쓰레드가 Blocking 되지 않게 하는것이지 우리가 알고 있는 멀티 쓰레드 작업 처리 방식처럼 동시에 요청을 처리하는것은 아니라는 사실은 꼭 기억을 해주시면 감사드리겠습니다.
- 1
- 3
- 68
질문&답변
Downstream Mono, Flux가 subscriber인가요?
안녕하세요? Downstream Flux, Mono에 대해서 질문을 주셨는데요.아래 간단한 코드를 예로 들면(Mono도 같은 원리라고 보시면 됩니다.)Flux .just(1, 2, 4) .filter(num -> num % 2 == 0) .subscribe(result -> System.out.println(result));just()는 일반적으로 최초로 데이터를 emit하기 때문에 생산자, Publisher, 발행자로 이해하면 됩니다.그런데 filter() 입장에서는 just()가 Upstream Flux가 되고, 반대로 just() 입장에서는 filter()가 Downstream Flux가 됩니다.이렇게 부르는 이유는 just()와 filter() 같은 Operator들이 리턴하는 리턴 타입이 Mono 또는 Flux이기 때문입니다.즉, filter() 입장에서는 윗쪽에 있는 just()가 리턴하는 Flux를 이용해서 다음 흐름을 이어가기 때문에 just()가 Upstream Flux가 되는 것입니다. 그리고 Subscriber는 내부적으로 좀 복잡한 과정을 거치는데 다 이해하실 필요는 없고, 일반적으로 .subscribe(...) 여기에서 ... 에 해당되는 람다 표현식이 Subscriber라고 이해하셔도 무방할 것 같습니다. 혹시 제 설명이 좀 어려우시다면 just()와 filter() 같은 Operator 내부로 들어가셔서 리턴 타입을 직접 눈으로 확인하시면 좋을 것 같아요. 감사합니다!
- 0
- 1
- 68
질문&답변
onErrorResume을 사용하지 않는 모든 경우 예외 발생 시, 시퀀스는 종료되나요?
안녕하세요. 주말이라서 답변이 좀 늦었네요.onErrorResume 같은 에러 처리를 위한 Operator를 사용하지 않으면 기본적으로 Sequence 내에서 에러가 발생하면 Subscriber에게 onError Signal 형태로 에러 정보가 Exception 형태로 전달됩니다. 다만, onErrorResume을 사용하지는 않지만 특정 상황에서 onErrorContinue Operator를 사용하거나 retry Operator를 사용하면 에러 발생 시점에 Sequence가 즉시 중단되지 않을 수는 있습니다.방금 말씀 드린 내용은 error handling 섹션에 있는 영상으로 확인하실 수 있습니다. AI 인턴이 답변 남겨두었듯이 doOnError 같은 Operator를 이용하면 에러를 직접적으로 핸들링하는건 아니지만 에러가 발생했을 때, 로그를 남긴다든지 하는 후속 작업을 통해 에러를 간접적으로 알려서 추후에 디버깅을 할 수 있습니다. 디버깅 관련해서는 1부 영상을 확인하시면 될 것 같습니다.감사합니다.
- 0
- 2
- 95
질문&답변
source.next와 source.emit의 차이에 대한 질문입니다.
안녕하세요? 어제 일찍 잠이 드는 바람에 답변이 좀 늦어졌네요. 양해 부탁드리겠습니다. ^^; 우선 아래 예제를 실행시켜 보시면 next() 호출 후에 onComplete signal이 발생하지 않기 때문에 Sequence가 종료 되지 않고 무한 대기 상태가 되는걸 확인하실 수가 있는데요. TestPublisher source = TestPublisher.create(); StepVerifier .create(source.flux()) .expectSubscription() .then(() -> source.next(2, 4, 6, 8, 10)) .expectNext(2, 4, 6, 8, 10) .expectComplete() .verify(); 말씀하셨던대로 TestPublisherTestExample01에서 expectComplete()이 통과하는 이유는 zipWith()를 사용했기 때문입니다. 아래 그림을 보시면 zipWith()의 파라미터로 전달한 Flux에서 데이터 emit이 끝나면 onComplete signal이 발생하는데 zipWith()의 경우 둘 중 하나의 Sequenc가 종료될 경우 최종적으로 합쳐진 Source Flux에서 onComplete signal이 발생합니다.(사진)
- 0
- 2
- 114
질문&답변
StepVerifier를 이용한 Testing (1) - expectNoEvent 에 관해서
안녕하세요? expectNoEvent()의 의미가 조금 헷갈리시죠?테스트 하셨던 코드처럼 .expectNoEvent(Duration.ofMinutes(1)) 를 두 번 연달아 호출했다고 해서 2분의 딜레이 타임을 가진다라는 의미가 아니라 매 1분(여기서는 두 번 호출했으니까 총 2분이겠죠) 즉, 총 2분 동안 단지 어떠한 이벤트도 발생하지 않았는지를 단순히 검증만 하는 것이라서 .expectNoEvent(Duration.ofMinutes(1)) 를 호출했다고 해서 getVoteCount()에서 리턴한 Flux의 Tuple 데이터가 emit 되지는 않고 예제 코드에서처럼 expectNext()를 호출해야 데이터 하나가 emit이 됩니다.지금 테스트 해 보셨던 코드에서는 ... .expectNoEvent(Duration.ofMinutes(1)) .expectNoEvent(Duration.ofMinutes(1)) .expectNext(Tuples.of("강서구", 32040)) ... ....expectNoEvent(Duration.ofMinutes(1)) 를 두 번 호출 한 다음에 호출한 .expectNext()가 세번째 .expectNext() 이기 때문에 "강서구"가 맞으므로 테스트에 통과하는 것입니다. 그리고 아직 테스트 대상 메서드의 Flux에서 emit되지 않은 데이터가 남았는데 expectNext()나 expectNextCount() 등으로 남아있는 데이터의 emit에 해당하는 onNext 이벤트를 트리거 해주지 않으면 테스트 코드가 종료되지 않고 무한정 기다릴 수 있는 상황도 발생할 수 있으니 참고해 주시면 좋을 것 같아요.
- 1
- 1
- 150
질문&답변
backpressure latest 전략
안녕하세요.Discard Support: Each time a new element comes in (the new "latest"), this operator discards the previously retained element. 제가 알고있는 바로는 공식 문서에 나와 있는 이 문장의 의미는 매 시점마다 들어오는 데이터(new element) 중에서 바로 직전 시점에 남겨진 데이터를 discard하고 그 다음 시점에 들어온 데이터가 최신 데이터가 되어서 버퍼에 들어갈 공간이 있게되는 시점에 버퍼 안에 채워지는걸로 알고 있습니다.여기서 말하는 previously retained element는 들어오는 데이터들 중에서 가장 최근에 들어오는 데이터 중에서 오래된 데이터를 의미하는데, 버퍼를 다 비운다는 설명이 구체적으로 나와 있지는 않습니다. 말씀하신것과 조금 유사한 방식으로 버퍼 내부에서 어떤 데이터를 drop 할 것인지를 핸들링하는 전략인 onBackpressureBuffer()를 사용할 수 있습니다.
- 0
- 2
- 144
질문&답변
StepVerifier를 이용한 Testing (1) 의 StepVerifiter 질문
안녕하세요. 답변이 조금 늦어서 죄송합니다.전에 비슷한 질문에 답변 드린적이 있어서 해당 질문의 답변 내용 확인하시면 도움이 되실것 같아요.아래 링크 확인해주시면 감사드리겠습니다.https://www.inflearn.com/community/questions/1286668
- 0
- 1
- 130
질문&답변
[Scheduler의 종류 강의] Schedulers.newParallel 메서드에 관한 질문
안녕하세요.Spring WebFlux에서 work-stealing 같은 처리를 지원하는지 질문 주셨는데요.제가 알기로는 아래와 같이 ParallelFlux가 work-stealing 기능을 지원은 하는걸로 알고 있습니다.(사진) 다만, 작업을 가져가서 처리하는 work-stealing이라는 일반적인 의미와는 조금 다른 의미라고 생각해요. 리액티브 프로그래밍에서는 들어오는 데이터 즉, 데이터 스트림을 처리할 때 라운드 로빈 방식으로 데이터를 처리하는데 처리할 데이터를 미리 fetch(prefetch)해서 작업 효율성을 높일 수 있는데요. 이 prefetch하는 데이터의 양을 내부적으로 동적으로 조절해서 처리하는걸로 알고 있습니다.한마디로 다른 쓰레드보다 들어오는 데이터를 더 빨리 처리하는 쓰레드는 데이터를 조금 더 많이 prefetch 해서 처리한다고 생각하시면 될 것 같습니다. Spring WebFlux에서 이런 기능을 지원한다기 보다는 Reactor 같은 리액티브 프로그래밍 라이브러리에서 지원한다고 보시면 될 것 같아요.
- 0
- 2
- 126
질문&답변
Backpressure Drop 전략에서 다시 버퍼가 채워지는 시점에 대한 질문입니다.
아, 이건 저도 내부 깊숙히 들어가서 찾아 본건 아닌것 같습니다. 기억이 안나는걸 보면요. ㅡㅡㅋ다만 문서에서 확인했었습니다. 깊게 찾아 들어갈 필요성을 느끼질 못해서 안해봤었습니다. ㅡ.,ㅡ 오늘은 밤이 늦어서 나중에 시간될 때 저도 찾아보도록 하겠습니다.감사합니다.
- 0
- 2
- 112
질문&답변
Reactor 3부의 오픈 일정에 관해서 문의드립니다!
사실 3부 이후도 머리속에 기획은 대부분 되어 있는데, 제가 지금 오픈 준비 중인 강의 이후로 일정을 계획하고 있어서 시간이 조금 걸릴 것 같습니다.그래서 죄송스런 마음입니다. ^^;Non-Blocking I/O 방식의 애플리케이션에서 핵심은 사실 Reactor라서 Reactor를 우선 열심히 학습해 주시면 감사드릴게요.
- 0
- 1
- 122