묻고 답해요
141만명의 커뮤니티!! 함께 토론해봐요.
인프런 TOP Writers
-
해결됨Kevin의 알기 쉬운 Spring Reactive Web Applications: Reactor 1부
source.next와 source.emit의 차이에 대한 질문입니다.
안녕하세요.강의 잘 보고 있습니다. 마지막에 와서 의문이 드는게 있어 질문 남겨봅니다.첫 번째 예제 TestPublisherTestExample01에서는 source.next로 테스트를 진행했고,세 번째 예제 TestPublisherTestExample03에서는 source.emit으로 테스트를 진행했는데요.emit의 경우 내부적으로 complete()을 통해 signal을 발생 시키는 것으로 이해했습니다. 그런데, 말씀대로라면 첫 번째 테스트에서 expectComplete()이 통과하는게 이해가 가지 않아서요.둘의 차이는 결국 Flux를 create 해주는 방식에 있는 것 같은데, 첫 번째 예제의 경우 zipWith을 통해 complete이 발생하는 걸까요? 잘 감이 오지 않는데, 설명해주시면 감사하겠습니다!
-
해결됨Kevin의 알기 쉬운 Spring Reactive Web Applications: Reactor 1부
StepVerifier를 이용한 Testing (1) - expectNoEvent 에 관해서
안녕하세요! 강의 잘 듣고 있습니다. 다름이아니라 section10.class01.StepVerifierTimeBasedTestExample04 여기서 expectNoEvent 이 오퍼레이터가 시간을 기다려 주는 기능을 한다 이렇게 이해했는데 제가 조금 커스텀해서 찍어보다 보니 작동 방식이 도저히 이해가 되지 않아서 질문 남겼습니다. 아래 코드 처럼 1분마다 emit되는 예제에서 중구, 서초구는 1분씩 기다렸으니 순서대로 나오는게 맞는데 그 다음은 2분을 기다렸으니 강동구가 나와야 할 것 같았는데 강서구가 나오더라고요. expectNoEvent라는 오퍼레이터가 가지는 의미를 어떻게 이해야 할지 도저히 감이 안잡힙니다...ㅠ [전체코드(테스트성공)]public class StepVerifierTimeBasedTestExample04 { @Test public void getCOVID19CountTest() { StepVerifier .withVirtualTime(() -> TimeBasedExample.getVoteCount( Flux.interval(Duration.ofMinutes(1)) // 1분마다 data를 emit ) ) .expectSubscription() .expectNoEvent(Duration.ofMinutes(1)) // 1분동안 아무런 이벤트가 발생하지 않음 .expectNext(Tuples.of("중구", 15400)) // 첫번째 튜플 .expectNoEvent(Duration.ofMinutes(1)) .expectNext(Tuples.of("서초구", 20020)) .expectNoEvent(Duration.ofMinutes(1)) .expectNoEvent(Duration.ofMinutes(1)) .expectNext(Tuples.of("강서구", 32040)) .expectNoEvent(Duration.ofMinutes(1)) .expectNextCount(2) .expectComplete() .verify(); } } [getVoteCount 함수]public static Flux<Tuple2<String, Integer>> getVoteCount(Flux<Long> source) { return source .zipWith(Flux.just( Tuples.of("중구", 15400), Tuples.of("서초구", 20020), Tuples.of("강서구", 32040), Tuples.of("강동구", 14506), Tuples.of("서대문구", 35650) ) ) .map(Tuple2::getT2); }
-
해결됨Kevin의 알기 쉬운 Spring Reactive Web Applications: Reactor 1부
backpressure latest 전략
백프레셔 latest 전략으로 코드를 돌려보니 버퍼가 가득찼는데 새로 데이터가 들어오면 기존에 버퍼에 있던 데이터들이 모두 사라지는 것처럼 보여서 reactor 공식문서를 찾아보니 Discard Support: Each time a new element comes in (the new "latest"), this operator discards the previously retained element. 라고 하는걸로봐서 버퍼가 가득 찬 상태에서 새로 데이터가 들어오면 버퍼에 기존에 있던것들 다 비워버리고 최신 데이터를 버퍼에 넣는 것 같아요
-
해결됨Kevin의 알기 쉬운 Spring Reactive Web Applications: Reactor 1부
StepVerifier를 이용한 Testing (1) 의 StepVerifiter 질문
안녕하세요 강사님StepVerifiter 관련된 내용 중에 해당 2개의 메서드 사용법이 어떻게 다른지 잘 이해가 가지 않아서 질문을 드리게 됐습니다..then(() -> VirtualTimeScheduler.get().advanceTimeBy(Duration.ofHours(12))).thenAwait(Duration.ofHours(12))1번의 경우 시간을 당겨서 테스트를 진행하고, 2번의 경우 시간을 기다려서 테스트를 진행하다는 것으로 이해를 했습니다.그런데 2개의 메서드 모두 동일한 테스트 결과를 얻을 수 있는데, 굳이 해당 메서드들을 구분해서 사용해야 할까요? 아니면 각각의 메서드가 유용한 케이스가 따로 있는 걸까요 ?혹시 각각의 메서드를 따로 사용해야 하는 케이스가 있다면, 어떤 경우에 어떤 메서드가 더 유용한지 조금 더 자세한 예시를 알려주신다면 정말 큰 도움이 될 것 같습니다.
-
해결됨Kevin의 알기 쉬운 Spring Reactive Web Applications: Reactor 1부
[Scheduler의 종류 강의] Schedulers.newParallel 메서드에 관한 질문
안녕하세요 강사님 Scheduler의 종류 강의를 수강하고 공부를 하던 와중에 newParallel 메서드에 대해서 궁금증이 생겨서 질문을 하게 됐습니다. 예제 코드에서는 newParallel의 parallelism변수 값을 4를 주셨는데 저는 3을 줘서 테스트를 해봤습니다. 해당 코드로 실행을 하면, 다음과 같은 로그가 나오게 됩니다.public class SchedulersNewParallelExample01 { public static void main(String[] args) { Mono<Integer> flux = Mono .just(1) .publishOn(Schedulers.newParallel("Parallel Thread", 3, true)); //쓰레드 할당 flux.subscribe(data -> { TimeUtils.sleep(5000L); Logger.onNext("subscribe 1", data); }); //쓰레드 할당 flux.subscribe(data -> { TimeUtils.sleep(4000L); Logger.onNext("subscribe 2", data); }); //쓰레드 할당 flux.subscribe(data -> { TimeUtils.sleep(3000L); Logger.onNext("subscribe 3", data); }); //쓰레드 할당 flux.subscribe(data -> { TimeUtils.sleep(2000L); Logger.onNext("subscribe 4", data); }); TimeUtils.sleep(8000L); } } 16:18:03.554 [Parallel Thread-3] INFO com.example.springwebflux.util.Logger -- # subscribe 3 onNext(): 1 16:18:04.548 [Parallel Thread-2] INFO com.example.springwebflux.util.Logger -- # subscribe 2 onNext(): 1 16:18:05.560 [Parallel Thread-1] INFO com.example.springwebflux.util.Logger -- # subscribe 1 onNext(): 1 16:18:07.561 [Parallel Thread-1] INFO com.example.springwebflux.util.Logger -- # subscribe 4 onNext(): 1제가 여기서 여쭤보고 싶은 사항은 다음과 같습니다.Round-Robin 방식을 통해서 각 쓰레드에 작업을 분배하고 더 많은 작업이 들어오면 각 쓰레드의 큐에 작업을 적재하는 것처럼 보이는데요. 작업들을 효율적으로 처리하기 위해서는 먼저 작업이 끝난 Parallel Thread-3에서 남아있는 작업을 가져가서 처리를 하면 좋을 것 같은데 [워크 스틸링(Work Stealing) 같은 처리]Spring WebFlux에서는 해당 기능은 지원하지 않는 것인지 궁금합니다.감사합니다
-
해결됨Kevin의 알기 쉬운 Spring Reactive Web Applications: Reactor 1부
Backpressure Drop 전략에서 다시 버퍼가 채워지는 시점에 대한 질문입니다.
안녕하세요 강사님. 오늘 Backpressure Example 코드 강의 수강 후 Backpressure Drop 전략에 대해 궁금했던 부분은 다른 분의 질문을 통해서 해결을 할 수 있었습니다.(질문 글 Link) 해당 글에서 강사님의 답변을 보면Drop 전략의 경우, 버퍼 안의 데이터가 Subscriber에게 한개 전달되면, 버퍼 공간이 한개 비니까 한개의 데이터가 채워지는것이 아니라 전체 버퍼 중에 70-80 퍼센트 정도(정확한 비율은 나중에 확인 후 말씀드릴게요)가 한번에 비워진다고 보시면 될것 같습니다.즉, 데이터 한개가 버퍼에서 비워지는 것이 아니라 버퍼가 가득찬 상태에서 Downstream이 데이터를 처리할 수 있는 상태가 될 때까지 Upstream에서 emit된 데이터는 Drop이 된다고 생각하시면 될것 같습니다.이런 부분이 있는데 혹시 어떤 파일의 어떤 메서드를 보면 해당 내용을 알 수 있을까요 ? ※제가 혼자서 디버그를 통해서 찾아보려고 했는데 잘 찾아지지가 않아서 여쭤봅니다. 😢
-
해결됨Kevin의 알기 쉬운 Spring Reactive Web Applications: Reactor 1부
Reactor 3부의 오픈 일정에 관해서 문의드립니다!
안녕하세요 최근에 Spring MVC 말고도 Spring WebFlux에 대해서도 관심을 가지게 되어 Reactor 공부를 막 시작한 주니어입니다. 해당 강의가 3부까지 기획되어 있는 것으로 강의소개를 봤었습니다. 강사님께서도 무척이나 바쁘시겠지만, 혹시 3부는 언제쯤 오픈할 예정이신지 여쭤봐도 될까요? 강의 잘 보고 있습니다! 감사합니다
-
해결됨Kevin의 알기 쉬운 Spring Reactive Web Applications: Reactor 1부
inner sequence context 관련 질문
안녕하세요 강사님 좋은 강의 감사드립니다.해당 영상에서 예제 코드가 잘 이해가 되지 않아 질문 남깁니다. inner Sequence 내부에서 Context에 저장된 데이터를 외부에서 읽을 수 없다면 subscribe할 때 Context에 저장된 job은 inner 시퀀스에서 저장한 것이므로 읽을 수 없어야 한다고 생각이 들었습니다. 근데 실행 결과에서는 job에 대한 데이터를 잘 읽어오길래 혼란이 왔습니다.제가 아직 리액티브 프로그래밍 동작 방식이 익숙치 않아서 그런데 확인해주시고 부연 설명해주시면 정말 감사드리겠습니다.@Slf4j public class ContextFeatureExample4 { public static void main(String[] args) throws InterruptedException { String key1 = "id"; Mono.just("Kevin") // flatMap 내부: inner Sequence .flatMap(name -> Mono.deferContextual(ctx -> Mono.just(ctx.get(key1) + ", " + name) .transformDeferredContextual((mono, innerCtx) -> mono.map(data -> data + ", " + innerCtx.get("job"))) .contextWrite(context -> context.put("job", "Software Engineer")) ) ) .publishOn(Schedulers.parallel()) .contextWrite(context -> context.put(key1, "itVillage")) .subscribe(data -> log.info("onNext = {}", data)); Thread.sleep(100L); } }
-
미해결Kevin의 알기 쉬운 Spring Reactive Web Applications: Reactor 1부
boundedElastic 관련 질문
안녕하세요 강사님 좋은 강의 감사드립니다.해당 영상에서 Schedulers.boundedElastic() 이 아닌, newBoundedElastic() 예제를 보여주셨습니다.newBoundedElastic 예제는 어느정도 이해가 되었는데 boundedElastic() 실행 결과가 궁금하여 예제에서의 newBoundedElastic()을 boundedElastic() 으로 바꿔 실행해봤습니다.실행결과가 첨부한 사진처럼 나오던데 스레드 풀에 있던 스레드 6개가 사용된 것으로 보면 될까요?확인해주시고 답변 주시면 감사드리겠습니다 !
-
미해결Kevin의 알기 쉬운 Spring Reactive Web Applications: Reactor 1부
강의자료 관련 질문드립니다.
안녕하세요 강사님 덕분에 잘 수강하고 있는 수강생입니다. 수업자료 관련하여 간혹가다 zip파일이 존재하던데, 해당 파일들은 어떻게 열어야 할까요? zip파일을 해제하면 구성 요소들만 존재하는 것 같아요예시 첨부드립니다. 감사합니다!.
-
미해결Kevin의 알기 쉬운 Spring Reactive Web Applications: Reactor 1부
newBoundedElastic 에서 queue 에 쌓이는 룰
안녕하세요 좋은 강의 감사합니다.강의 <Sceduler의 종류> 에서 13:24 쯤에 나오는 내용이 궁금합니다."subscribe 3 doing" 는 왜 T-1 쓰레드의 Q1 큐에 쌓이고, "subscribe 4 doing" 는 왜 T-1 쓰레드의 Q2 큐에 쌓에 쌓이는지 궁금합니다.그 다음"subscribe 5 doing" 역시 왜 T-1쓰레드 Q1 큐에 쌓이는 지도 궁금합니다.순서가 차례로 하나씩 할당되게 되는 것인가요감사합니다.
-
미해결Kevin의 알기 쉬운 Spring Reactive Web Applications: Reactor 1부
import com.itvillage.utils.Logger;
안녕하세요. '리액터의 구성 요소 및 용어 정의' 강의에서Flux sample 코드 작성시 import com.itvillage.utils.Logger; 를 import 해오는데, build.gradle 설정에서 어떤 의존성을 더 추가해야 할까요?plugins { id 'java' id 'org.springframework.boot' version '3.3.1' id 'io.spring.dependency-management' version '1.1.5' } group = 'com.example' version = '0.0.1-SNAPSHOT' java { toolchain { languageVersion = JavaLanguageVersion.of(17) } } configurations { compileOnly { extendsFrom annotationProcessor } } repositories { mavenCentral() } dependencies { implementation 'org.springframework.boot:spring-boot-starter-webflux' testImplementation 'org.springframework.boot:spring-boot-starter-test' testImplementation 'io.projectreactor:reactor-test' testRuntimeOnly 'org.junit.platform:junit-platform-launcher' compileOnly 'org.projectlombok:lombok' annotationProcessor 'org.projectlombok:lombok' } tasks.named('test') { useJUnitPlatform() } 검색해서 implementation 'com.itvillage:utils:1.0.0' 라는 것을 build.gradle 에 추가해봐도 되지 않더라구요.좋은 강의 잘 보고 있습니다. 감사합니다.
-
미해결Kevin의 알기 쉬운 Spring Reactive Web Applications: Reactor 1부
Interceptor에서 reactor Context 유지하는 방법
외부에서 들어온 요청 헤더 정보를 Reactor Context에 저장하고, 내부적으로 grpc호출 할때 clientInterceptor에서 저장된 Context를 읽어오고 싶은데, Context0과 함께 읽지 못하는 문제가 있습니다.Interceptor에서도 읽어오려면 어떻게 해야할까요 ㅜㅜ외부 요청 헤더를 Context에 저장하는 코드@Component class RequestHeaderFilter() : WebFilter { companion object { const val X_REQUEST_ID_KEY = "X-Request-Id" } override fun filter(exchange: ServerWebExchange, chain: WebFilterChain): Mono<Void> { val requestId = exchange.request.headers.getFirst(HeaderNames.REQUEST_ID) return chain.filter(exchange) .contextWrite { context -> context.put(X_REQUEST_ID_KEY, requestId ?: "test!!") } .doFinally { } .subscribeOn(Schedulers.boundedElastic()) } }grpc 요청 interceptor에서 reator Context 조회하여 grpc Metadata에 추가하는 코드class CustomHeaderInterceptor() : ClientInterceptor { companion object { const val X_REQUEST_ID_KEY = "X-Request-Id" val X_REQUEST_ID_HEADER_KEY: Metadata.Key<String> = Metadata.Key.of("x-request-id", Metadata.ASCII_STRING_MARSHALLER) } override fun <ReqT : Any, RespT : Any> interceptCall( method: MethodDescriptor<ReqT, RespT>, callOptions: CallOptions, next: Channel ): ClientCall<ReqT, RespT> { return object : ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) { override fun start(responseListener: Listener<RespT>, headers: Metadata) { Mono.deferContextual { context -> val requestId: String = context.getOrDefault(X_REQUEST_ID_KEY, fallbackRequestId()) ?: fallbackRequestId() headers.put(X_REQUEST_ID_HEADER_KEY, requestId) delegate().start(responseListener, headers) Mono.just(requestId) }.subscribe() } } } private fun fallbackRequestId() = "${UidUtils.generateUid()}"
-
미해결Kevin의 알기 쉬운 Spring Reactive Web Applications: Reactor 2부
filterWhen의 차이
filter와 달리 filterWhen은 비동기로 동작한다고 했는데, 실제 예제를 돌려보면 동일하게 동기로 돌아가는 것 같습니다. FilterWhenExample01 예제에서 종료 sleep을 10초로 늘려주고, 조건을 3_000 변경 후public static void main(String[] args) { Flux .fromIterable(SampleData.coronaVaccineNames) /** filterWhen : 데이터를 비동기적으로 filtering 하고 싶을때 사용 */ .filterWhen(vaccine -> isGreaterThan(vaccine, 3_000)) .subscribe(Logger::onNext); TimeUtils.sleep(10000); } isGreaterThan 메소드에서 비동기 동작 확인을 위해 sleep 1초를 주었습니다.public static Mono<Boolean> isGreaterThan(SampleData.CoronaVaccine coronaVaccine, int amount) { TimeUtils.sleep(1000); return Mono .just(vaccineMap.get(coronaVaccine).getT2() > amount) .publishOn(Schedulers.parallel()); } 예상 결과로 아래 출력 5건이 1초 후 동시에 나올 것이라 생각했는데 동기와 동일하게 1초당 1건씩 출력이 됩니다.> Task :FilterWhenExample01.main()14:37:55.393 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework14:37:56.494 [parallel-1] INFO com.itvillage.utils.Logger - # onNext(): Pfizer14:37:57.501 [parallel-2] INFO com.itvillage.utils.Logger - # onNext(): AstraZeneca14:37:58.513 [parallel-3] INFO com.itvillage.utils.Logger - # onNext(): Moderna14:37:59.527 [parallel-4] INFO com.itvillage.utils.Logger - # onNext(): Janssen14:38:00.537 [parallel-5] INFO com.itvillage.utils.Logger - # onNext(): Novavax 좀더 간단한 예제로 아래 코드는 동기적으로 1초당 1건씩 출력되어, filter와의 차이점을 모르겠습니다.public static void main(String[] args) { Flux .range(1, 20) .filterWhen(num -> { TimeUtils.sleep(1000); // 예시를 위해 잠시 대기 return Mono.just(num % 2 == 0); }) .subscribe(Logger::onNext); }filterWhen의 특성을 정확하게 나타낼 수 있는 예제와 설명을 부탁드립니다감사합니다~
-
미해결Kevin의 알기 쉬운 Spring Reactive Web Applications: Reactor 1부
예시코드는 webflux의 이점보단 webclient의 이점 아닌가요?
for문 5번 도는데, resttemplate은 blocking / webclient은 non-blocking 이라 결과가 달라진 것 같아서요 보내는게 non-blocking 이면 mvc도 결과가 동일 했을 것 같아요
-
미해결Kevin의 알기 쉬운 Spring Reactive Web Applications: Reactor 1부
advancedTimeBy와 thenAwait 사용 예시가 궁금합니다
안녕하세요, advancedTimeBy와 thenAwait 사용 예시를 모르겠어서 문의드립니다.강의 중 advancedTimeBy는 '특정 시간을 당겨서 앞서 나가는 것'이라 설명해주셨고, thenAwait은 해당 시간이 빠르게 다가오는 걸로 이해하면 된다고 설명해주셨습니다. 궁금한 점은1) 각각의 테스트가 필요한 예시를 조금만 더 자세히 들어주실 수 있으실까요? 특정 프로젝트에서 어떤 상황일 때 해당 방법이 필요한지 감이 잡히지 않습니다.1-2) 어떤 상황에 필요한 것인지 모르다보니, advancedTimeBy와 thenAwait의 쓰임이 분명히 구분돼 있을텐데도 유사한 방식이 아닌가? 라는 생각이 듭니다. 이 부분도 예시로 설명해주실 수 있다면 함께 설명해주시면 감사하겠습니다.
-
미해결Kevin의 알기 쉬운 Spring Reactive Web Applications: Reactor 1부
context의 read, write의 실행 순서
안녕하세요, context의 read와 write 부분을 듣다 궁금한 점이 생겨 질문을 남깁니다.Context는 체인이 맨 아래에서부터 위로 전파된다는 내용은 이해했습니다.궁금한 점은 'Context read 읽는 동작이 Context write 동작 밑에 있을 경우에는 write된 값을 read할 수 없'는 동작을 컴퓨터는 어떻게 이해하고 실행하는지를 모르겠습니다. 위에서부터 아래로 코드를 읽어나가면 contextWrite()는 마지막에 읽히게 되니까요.컴파일하면서 write의 위치가 바뀌어 write가 먼저 진행되는지 혹은 subscribe()를 만나기 전까지는 실제 stream이 동작하지 않아 write된 값이 저장돼 있는지..코드 작동 순서/원리가 궁금합니다. 감사합니다.
-
미해결Kevin의 알기 쉬운 Spring Reactive Web Applications: Reactor 1부
DROP 전략과 LATEST 전략의 차이점이 무엇인가요?
안녕하세요. 수업 잘 듣고 있습니다~백프레셔 전략 중 DROP과 LATEST 전략은 결국 버퍼가 비워질 때까지 Publsiher에서 emit되는 데이터를 제거(drop or discard)하는 것으로 이해했습니다. 버퍼가 비어지는 시점 이후로 emit되는 데이터를 다시 버퍼에 채운다는 점에서 두 전략은 같은 것으로 보입니다. 차이점을 알려주시면 감사하겠습니다~
-
미해결Kevin의 알기 쉬운 Spring Reactive Web Applications: Reactor 1부
백프레셔 전략 관련해서
폐기와 드랍의 차이를 정확히 모르겠습니다. subscriber 입장에선 버퍼가 다시 비어져 있을 때 처리는 똑같아 보이는데 폐기는 publisher 에서 데이터 삭제를 의미하고 drop 은 데이터 건너 뛰기로 이해 하면 될까요?
-
미해결Kevin의 알기 쉬운 Spring Reactive Web Applications: Reactor 1부
Backpressure Example 코드 질문드립니다
기존 예제 (sleep 시간이 5L인 경우)에선 Exception이 발생하는 것을 확인했습니다. 하지만 sleep 시간을 더 늘리니까 Erorr가 발생하지않았습니다. 제 예상대로라면 버퍼가 더 빨리 차기 때문에 에러가 발생해야하는데, 동작이 이해가 되지 않습니다. public class BackpressureStrategyErrorExample { public static void main(String[] args) { Flux .interval(Duration.ofMillis(1L)) .onBackpressureError() .doOnNext(Logger::doOnNext) .publishOn(Schedulers.parallel()) .subscribe(data -> { // 왜 50L, 500L로 하면 에러가 발생하지 않을까? TimeUtils.sleep(500L); Logger.onNext(data); }, error -> Logger.onError(error)); TimeUtils.sleep(5000L); } }