작성
·
92
답변 1
1
안녕하십니까,
정합성이 중요해서 시스템 재 기동 중에 아예 중복 데이터 처리를 없게 하려면 기본적으로는 Consumer를 수동 commit sync 모드로 적용하는게 좋습니다.
근데 속도 문제로 개선이 필요하다면 수동 commit인데 async로 하시면 좋습니다. 다만 sync 만큼 100% 보장이 안될 수도 있습니다.
default가 수동 async commit 이라면 아래와 같이 addShutdownHook을 사용하여 종료시 sync commit로 시도해 볼 수 있습니다(저도 정확히 작동하는가 돌려보지는 않았습니다 ^^;;) 그런데, 이걸 auto commit에서 수동 commit으로 바꿀 수 있는지는 모르겠습니다. consumer가 자동 commit인지 수동 commit인지는 Consumer 생성시에 properties를 할당해서 정해지는데 이게 변경이 가능할 것 같지 않습니다.
암튼 아래는 수동 async commit에서 수동 sync commit로 변경하는 코드 입니다.
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class KafkaConsumerExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "your.kafka.broker:9092");
props.put("group.id", "your-group-id");
props.put("enable.auto.commit", "false"); // Disable auto-commit
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("your-topic"));
Thread mainThread = Thread.currentThread();
# 아래와 wakeup으로 consumer 종료
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
consumer.wakeup();
try {
mainThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
// 평소엔 비동기 모두로 동작.
consumer.commitAsync();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
#동기 모두로 offset 저장
consumer.commitSync()
consumer.close();
}
}
}
답변 감사합니다!