인프런 커뮤니티 질문&답변

denia park님의 프로필 이미지

작성한 질문수

Kevin의 알기 쉬운 Spring Reactive Web Applications: Reactor 1부

Backpressure Example 코드 설명

Backpressure Drop 전략에서 다시 버퍼가 채워지는 시점에 대한 질문입니다.

해결된 질문

작성

·

39

·

수정됨

0

안녕하세요 강사님.

 

오늘 Backpressure Example 코드 강의 수강 후 Backpressure Drop 전략에 대해 궁금했던 부분은 다른 분의 질문을 통해서 해결을 할 수 있었습니다.
(질문 글 Link)

 

해당 글에서 강사님의 답변을 보면

Drop 전략의 경우, 버퍼 안의 데이터가 Subscriber에게 한개 전달되면, 버퍼 공간이 한개 비니까 한개의 데이터가 채워지는것이 아니라 전체 버퍼 중에 70-80 퍼센트 정도(정확한 비율은 나중에 확인 후 말씀드릴게요)가 한번에 비워진다고 보시면 될것 같습니다.

즉, 데이터 한개가 버퍼에서 비워지는 것이 아니라 버퍼가 가득찬 상태에서 Downstream이 데이터를 처리할 수 있는 상태가 될 때까지 Upstream에서 emit된 데이터는 Drop이 된다고 생각하시면 될것 같습니다.

이런 부분이 있는데 혹시 어떤 파일의 어떤 메서드를 보면 해당 내용을 알 수 있을까요 ?

 

※제가 혼자서 디버그를 통해서 찾아보려고 했는데 잘 찾아지지가 않아서 여쭤봅니다. 😢

답변 2

0

안녕하세요, 인프런 AI 인턴입니다.

denia park님, 궁금하신 Backpressure Drop 전략에 대한 부분을 조사해봤으며, 연관된 토론을 찾았습니다. 링크된 질문에서는 버퍼가 비워지는 방식과 관련해서 강사님이 제공한 설명이 매우 유용할 것입니다. 그러나 요청하신 “어떤 파일의 어떤 메소드”에 대한 구체적인 정보 제공은 발견되지 않았습니다.

이와 관련하여, 대부분의 프로젝트에서는 버퍼 메커니즘이나 Backpressure 전략이 구현된 로직을 프로젝트의 핵심 라이브러리나 프레임워크의 일부로 포함하고 있습니다. Reactor 프로젝트의 경우, 이러한 세부적인 메커니즘은 Reactor Core 라이브러리 내부에 구현되어 있으며, 소스 코드를 직접 검토하거나 해당 라이브러리의 API 문서를 찾아보는 것이 도움이 될 수 있습니다. 하지만, 구체적인 메소드 위치는 사용하고 있는 Reactor Core 라이브러리의 버전과 구현 세부사항에 따라 달라질 수 있기 때문에, GitHub 상의 Reactor Core 리포지토리나 공식 문서를 참조하는 것이 좋습니다.

버퍼가 비워지는 시점 질문에 대한 강사님의 답변에서 얻을 수 있는 가장 큰 교훈은, Backpressure Drop 전략의 동작 방식에 대한 이해가 필요하단 점입니다. 이 전략이 실제로 어떻게 구현되어 있는지, 그리고 어떠한 상황에서 어떻게 반응하는지 이해하기 위해서는 실제 소스 코드 또는 해당 기술 문서를 참조하는 것이 중요합니다.

Reactor Core GitHub 리포지토리 및 공식 문서는 보다 심층적인 이해를 위한 좋은 출발점이 될 것입니다. 특히, FluxMono 클래스들이 어떻게 Backpressure를 처리하는지 관련 섹션을 찾아보세요.

질문에 대한 답변 찾기와 관련하여 추가 도움이 필요하시면, 주저 없이 다시 질문해 주시기 바랍니다.

저는 질문자님의 이해를 돕고자 지식공유자님의 기존 답변을 참고하여 유사한 답변 링크를 추천해드리고 있습니다.
현재 베타 기능으로 답변이 만족스럽지 않을 수 있는 점 양해 부탁드립니다. 🙏
추가적으로 궁금한 점이 있으시면, 이어서 질문해 주세요. 곧 지식공유자께서 답변해 주실 것입니다.

0

Kevin님의 프로필 이미지
Kevin
지식공유자

아, 이건 저도 내부 깊숙히 들어가서 찾아 본건 아닌것 같습니다. 기억이 안나는걸 보면요. ㅡㅡㅋ

다만 문서에서 확인했었습니다. 깊게 찾아 들어갈 필요성을 느끼질 못해서 안해봤었습니다. ㅡ.,ㅡ

 

오늘은 밤이 늦어서 나중에 시간될 때 저도 찾아보도록 하겠습니다.

감사합니다.

denia park님의 프로필 이미지
denia park
질문자

찾아봐주신다고 하니 감사합니다.

 

제가 혼자서 테스트 해봤을 때는 약 60% 정도에서 새로 버퍼를 채우는 것 같습니다.

 

  1. 39번째 데이터를 request할 때부터 drop이 발생

  2. 계속해서 drop이 발생

  3. 192번째 데이터를 request할 때부터는 다시 drop이 발생하지 않음

  4. 39번째 데이터를 요청할 때 버퍼 (버퍼 크기 = 255개)가 가득 찼다고 가정했을 때, 192번째 데이터를 요청할 때 어느정도 버퍼가 비워져서 데이터를 다시 채우기 시작했다고 볼 수 있습니다.

  5. 그래서 해당 데이터를 가지고 계산을 해보면 255개 중에서 153개 [255 (버퍼 총 크기) - (192 (다시 채우기 시작한 순서) - 39 (막힌 순서))] 가 처리 되었을 때 버퍼를 채웠다고 보고, 이를 토대로 계산을 하면 60% [153 / 255 * 100]가 나오게 됩니다.

     

 

테스트 코드

public class BackpressureStrategyDropExample {
	public static void main(String[] args) {
		Flux
			.interval(Duration.ofMillis(1L))
			.onBackpressureDrop(dropped -> Logger.info("# dropped: {}", dropped))
			.publishOn(Schedulers.parallel())
			.subscribe(new BaseSubscriber<Long>() {
				int count = 0;

				@Override
				protected void hookOnNext(Long value) {
					Logger.info("# request 요청: 1개 -> count={}", count++);
					TimeUtils.sleep(5L); // 데이터 처리 시 5ms 지연
					Logger.onNext(value);
					request(1); // 각 데이터 처리 후 다시 1개 요청
				}

				@Override
				protected void hookOnError(Throwable throwable) {
					Logger.onError(throwable);
				}
			});

		TimeUtils.sleep(2000L);
	}
}

데이터 drop 시작

08:13:02.747 [parallel-1] INFO com.example.springreactivewebflux.util.Logger -- # request 요청: 1개 -> count=38
08:13:02.754 [parallel-1] INFO com.example.springreactivewebflux.util.Logger -- # onNext(): 38 -> 해당 데이터를 processing
08:13:02.754 [parallel-1] INFO com.example.springreactivewebflux.util.Logger -- # request 요청: 1개 -> count=39
08:13:02.759 [parallel-2] INFO com.example.springreactivewebflux.util.Logger -- # dropped: 256
08:13:02.761 [parallel-2] INFO com.example.springreactivewebflux.util.Logger -- # dropped: 257
08:13:02.761 [parallel-1] INFO com.example.springreactivewebflux.util.Logger -- # onNext(): 39 -> 해당 데이터를 processing

데이터 drop이 사라짐

08:13:03.715 [parallel-2] INFO com.example.springreactivewebflux.util.Logger -- # dropped: 1211
08:13:03.715 [parallel-1] INFO com.example.springreactivewebflux.util.Logger -- # onNext(): 191 -> 해당 데이터를 processing
08:13:03.715 [parallel-2] INFO com.example.springreactivewebflux.util.Logger -- # dropped: 1212
08:13:03.715 [parallel-1] INFO com.example.springreactivewebflux.util.Logger -- # request 요청: 1개 -> count=192
08:13:03.722 [parallel-1] INFO com.example.springreactivewebflux.util.Logger -- # onNext(): 192 -> 해당 데이터를 processing
08:13:03.722 [parallel-1] INFO com.example.springreactivewebflux.util.Logger -- # request 요청: 1개 -> count=193
08:13:03.728 [parallel-1] INFO com.example.springreactivewebflux.util.Logger -- # onNext(): 193 -> 해당 데이터를 processing
08:13:03.728 [parallel-1] INFO com.example.springreactivewebflux.util.Logger -- # request 요청: 1개 -> count=194