작성
·
291
0
final KStream<String, MonitoringClass> ks0 = streamsBuilder.stream(INPUT_TOPIC, Consumed.with(STRING_SERDE, MONITORING_CLASS_SERDE));
Duration windowDuration = Duration.ofMinutes(2);
TimeWindows tumblingWindow = TimeWindows.of(windowDuration.toMillis()).grace(Duration.ZERO);
ks0
.groupByKey()
.windowedBy(tumblingWindow)
.count()
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream()
안녕하세요, 다음과 같이 suppress() 함수를 사용해서 윈도우가 종료되는 시점에만 특정 로직을 실행하고 싶은데
suppress() 함수를 추가한 후부터는 테스트 실행시 아예 컨슈밍이 안되는 것 처럼 보여서 도움을 얻고자 글 남겼습니다.
해당 부분도 커밋 주기 때문에 예상대로 동작하지 않는 걸까요?
답변 2
0
안녕하세요. suppress()의 경우 말씀대로 윈도우가 종료되는 시점에 취합이 완료된 데이터를 추출합니다. 하지만 커밋이 된 이후에 추출이 됩니다. 커밋 interval 이후에 데이터가 추출되는지 확인해보시겠어요?
저는 로컬에서 다음과 같은 코드로 정상동작함을 확인했습니다.
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> ks0 = builder.stream("test");
Duration windowDuration = Duration.ofSeconds(3);
TimeWindows tumblingWindow = TimeWindows.ofSizeWithNoGrace(windowDuration);
ks0.print(Printed.toSysOut());
ks0.groupByKey()
.windowedBy(tumblingWindow)
.count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>
as("aggregation").withKeySerde(Serdes.String())
)
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream().print(Printed.toSysOut());
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
0
안녕하세요, 인프런 AI 인턴입니다.
DC님의 질문에 대해 답변드리겠습니다.
먼저, Apache Kafka Streams에서 suppress()
메서드는 윈도우의 종료 시점까지 발생한 이벤트를 억제(suppress)하고, 해당 시점에만 결과를 전달하기 위해 사용됩니다. suppress(Suppressed.untilWindowCloses(...))
는 윈도우가 종료될 때까지 결과를 기다렸다가 윈도우가 종료되면 그동안 억제했던 결과를 다운스트림으로 방출하도록 합니다.
하지만, suppress()
함수를 추가한 후에 컨슈머가 동작하지 않는 것 같다면, 몇 가지 확인해볼 사항이 있습니다:
Kafka Streams 애플리케이션의 상태: Streams 애플리케이션이 정상적으로 실행 중인지 확인해보세요. 예상치 못한 예외나 오류로 인해 중단된 상태일 수 있습니다.
시간과 윈도우 설정: 스트림즈 애플리케이션은 시스템 시간 또는 이벤트 타임에 따라 윈도우를 처리합니다. 시스템의 시간 설정과 tumbling 윈도우의 기간 설정이 정확한지 확인하세요.
커밋 주기: Kafka 컨슈머의 커밋 주기가 길 경우 지연이 발생할 수 있으므로, 설정을 검토해보세요. auto.commit.interval.ms
와 같은 컨슈머 커밋 관련 속성들이 영향을 줄 수 있습니다.
테스트 환경: 테스트를 진행할 때 Kafka Streams의 내부 시각이 실시간 시각과 동기화되고 있는지 확인하세요. 또한, 테스트 중에 충분한 이벤트가 발생하여 윈도우가 종료되도록 해야 합니다.
로그 확인: 애플리케이션의 로그를 확인하여 추가적인 정보나 오류 메시지를 파악해보세요.
문제를 해결하기 위해, 위 사항들을 확인하시고 다른 Kafka Streams의 구성 요소들이 정상적으로 동작하고 있는지 먼저 점검해보시기 바랍니다. 더 자세한 정보와 도움을 위해, Spring for Apache Kafka reference manual을 참조하시면 도움이 될 것입니다.
추가적으로 제공해 드린 Spring Boot Reference Documentation의 Kafka Streams 섹션도 참조해 보시길 권장드립니다. Suppress 기능과 관련된 자세한 내용이 필요하시면, 해당 문서의 “Kafka Streams Support” 섹션이 도움이 될 수 있습니다.
문제 해결에 도움이 되길 바랍니다.