인프런 커뮤니티 질문&답변

서준영님의 프로필 이미지
서준영

작성한 질문수

카프카 완벽 가이드 - 커넥트(Connect) 편

offset 커밋 관련 질문

작성

·

92

0

커넥터와 별개긴 한데, 카프카 관련하여 문의드립니다.

현재 프로젝트 디폴트 설정이 자동 커밋인데,

리스너를 내리거나 서버를 내릴때에 예외적으로 커밋을 치고 싶습니다.

이런 경우 수동커밋이 가능한지 문의드립니다.

답변 1

1

권 철민님의 프로필 이미지
권 철민
지식공유자

안녕하십니까,

정합성이 중요해서 시스템 재 기동 중에 아예 중복 데이터 처리를 없게 하려면 기본적으로는 Consumer를 수동 commit sync 모드로 적용하는게 좋습니다.

근데 속도 문제로 개선이 필요하다면 수동 commit인데 async로 하시면 좋습니다. 다만 sync 만큼 100% 보장이 안될 수도 있습니다.

default가 수동 async commit 이라면 아래와 같이 addShutdownHook을 사용하여 종료시 sync commit로 시도해 볼 수 있습니다(저도 정확히 작동하는가 돌려보지는 않았습니다 ^^;;) 그런데, 이걸 auto commit에서 수동 commit으로 바꿀 수 있는지는 모르겠습니다. consumer가 자동 commit인지 수동 commit인지는 Consumer 생성시에 properties를 할당해서 정해지는데 이게 변경이 가능할 것 같지 않습니다.

암튼 아래는 수동 async commit에서 수동 sync commit로 변경하는 코드 입니다.

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.apache.kafka.clients.consumer.ConsumerRecords;

import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;

import java.util.Arrays;

import java.util.Properties;

public class KafkaConsumerExample {

public static void main(String[] args) {

Properties props = new Properties();

props.put("bootstrap.servers", "your.kafka.broker:9092");

props.put("group.id", "your-group-id");

props.put("enable.auto.commit", "false"); // Disable auto-commit

props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

consumer.subscribe(Arrays.asList("your-topic"));

Thread mainThread = Thread.currentThread();

 

# 아래와 wakeup으로 consumer 종료

Runtime.getRuntime().addShutdownHook(new Thread() {

public void run() {

consumer.wakeup();

try {

mainThread.join();

} catch (InterruptedException e) {

e.printStackTrace();

}

}

});

try {

while (true) {

ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

for (ConsumerRecord<String, String> record : records) {

System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());

}

// 평소엔 비동기 모두로 동작.

consumer.commitAsync();

}

} catch (Exception e) {

e.printStackTrace();

} finally {

#동기 모두로 offset 저장

consumer.commitSync()

consumer.close();

}

}

}

 

 

서준영님의 프로필 이미지
서준영
질문자

답변 감사합니다!

서준영님의 프로필 이미지
서준영

작성한 질문수

질문하기