해결된 질문
작성
·
420
·
수정됨
0
- 학습 관련 질문을 남겨주세요. 상세히 작성하면 더 좋아요!
- 먼저 유사한 질문이 있었는지 검색해보세요.
- 서로 예의를 지키며 존중하는 문화를 만들어가요.
- 잠깐! 인프런 서비스 운영 관련 문의는 1:1 문의하기를 이용해주세요.
선생님 안녕하십니까!
강의를 열심히 듣고 있는 수강생입니다.
다름이아니라, 토픽에 데이터가 없을 때 offset이 0이 된다는 로그는 어디서부터 기인하는 것인지 궁금하여 질문 드립니다.
제가 이해하기로 AUTO_COMMIT false가 되어있으면 명시적으로 commit을 호출하지 않으면 commit이 되지 않는 것으로 알고있습니다 (테스트도 해보았습니다.)
하지만, 강의 6:00경 시나리오.
즉, 토픽에 아무런 데이터가 없는 상태에서 컨슈머만 켠 케이스에서 저는 커밋(commitSync)을 하지 않았는데 poll을 몇번 돌다보면 offset이 0이 되었다는 로그가 발생합니다.
(실제로 __consumer_offsets-* 파일에는 offset이 기록되진 않고, 그래서 컨슈머를 껐다 켜면 offset 0부터 읽습니다)
저는 commit을 한적이 없으므로 실제로 __consumer_offsets에는 0으로 기록되지 않는데, 저 로그에 있는 offset이 0은 어디서부터 오는 것일까요?
+) 코드상으로는 poll 시점에 updateAssignmentMetadataIfNeeded (maybeSeekUnvalidated) 에서 해당 로그가 찍히는데요, 이건 무슨 정책일까요? 관련 자료가 있으면 공유주셔도 감사합니다.
// offset=0 설정이 되어버림
[main] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-group-pizza-assign-seek-maintenance-6-1, groupId=group-pizza-assign-seek-maintenance-6] Fetch position FetchPosition{offset=10, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}} is out of range for partition pizza-topic-0, resetting offset
[main] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-group-pizza-assign-seek-maintenance-6-1, groupId=group-pizza-assign-seek-maintenance-6] Resetting offset for partition pizza-topic-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}.
테스트한 코드 일부 공유드립니다.
(필요할 설정들을 했으며, 정말 커밋이 안되고 있는지 확인하기 위해 records.count()>100일 때만 명시적으로 커밋을 한 코드입니다)
props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
TopicPartition topicPartition = new TopicPartition(topicName, 0);
kafkaConsumer.assign(List.of(topicPartition));
kafkaConsumer.seek(topicPartition, 10L);
while(true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(1_000L));
for(ConsumerRecord<String, String> record: records) {
log.info("record key: {}, partition: {}, recordOffset: {}",
record.key(), record.partition(), record.offset());
}
if (records.count() > 100 ){
kafkaConsumer.commitSync();
log.info("commit sync called");
}
}
감사합니다 ^^
답변 1
1
오, 굉장히 재미있는(?) 부분을 테스트 해보셨군요.
seek(N)인데 아직 offset이 N까지 없는 경우 , (제 기억으로는) 과거 버전에서는 Out of range 오류를 내었던 걸로 기억합니다만,
근데 지금은 out of range 오류 시 default offset으로 Fetcher가 position을 잡는 걸로 보입니다. 만약 이렇게 처리하지 않으면 오류를 발생 시키던지, 아님 N 이 될 때 까지 Consumer가 계속 대기 해야 하는데, 이런 부분을 개선하기 위해서 default offset으로 resetting하는 걸로 추정됩니다.
아하!
이런 현상이 카프카 설계 원리로 인해 발생하는 것이 아니라,
현 버전의 정책정도일 뿐이고 계속해서 업데이트 될 수 있는 부분이군요!
답변 감사드립니다 선생님. 남은 강의도 열심히 듣겠습니다!
즐거운 한주 시작하십쇼!
선생님 안녕하세요.
죄송합니다. 잘 모르다보니 횡설수설 했습니다. 생각을 좀 정리하였고, 다시 한 번 말씀드리겠습니다.
조건 : seek(N)으로 데이터를 읽으려고 할 때 토픽에 N보다 적은 수의 데이터가 있는 경우 (즉 offset이 없거나 N보다 작을 경우)
현상 : 처음 FetchPosition의 offset은 N이었으나 이어서 consumer가 토픽의 마지막 offset으로 컨슈머의 FetchPosition을 덮어쓰고, topic에 데이터가 쌓이면 그 설정된 FetchPosition(토픽의 마지막 offset) 데이터를 읽어옵니다 (Fetcher.java의 resetOffsetIfNeeded() 에서 수행됩니다. )
왜 FetchPosition이 seek(N)했을 때 FetchPosition{offset=N}이 어느 순간 토픽의 마지막 offset으로 덮어씌워지는 정책을 가지는지 궁금합니다.
(FetchPosition의 offset을 seek에서 준 값을 그대로 가지고 있지, 왜 굳이 토픽의 마지막 오프셋으로 업데이트하지? 싶어서요)
테스트한 코드도 첨부드립니다.
(partiton3개인 pizzaProducer를 사용하였고 consumer도 선생님께서 강의에서 진행해주신 코드와 거의 똑같습니다)
질문이 고르지 못했던 점 양해 부탁드립니다.
주말에도 답변 감사합니다 선생님!