인프런 영문 브랜드 로고
인프런 영문 브랜드 로고

인프런 커뮤니티 질문&답변

행복한 족제비님의 프로필 이미지

작성한 질문수

카프카 완벽 가이드 - ksqlDB

Compact Topic에 대하여

해결된 질문

작성

·

59

0

안녕하세요 항상 좋은 강의 해주셔서 감사합니다.

다름이 아니라 제가 프로젝트 중에 유저가 알림 설정한 데이터를 kafka로 받아서 시세 데이터와 조인하여

알림 메세지를 생성하려고합니다.

 

유저 알림 데이터는 boolean값을 가진 활성화여부와 삭제라는 기능이있습니다.

그래서 kafka내에서 유저 알림 데이터를 최신화하기 위해서 찾아본 결과 compact topic이라는것을 발견하였고 실습중에 있습니다만 결과가 좋지않아서 질문드립니다.

sudo docker exec -it kafka-1 kafka-topics --create --bootstrap-server 192.168.56.101:29092     --topic user-alert-set     --partitions 6 --replication-factor 2  
--config "cleanup.policy=compact" 
--config "min.compaction.lag.ms=5000" 
--config "max.compaction.lag.ms=10000 " 
--config "delete.retention.ms=3000"  
--config "segment.ms=3000" 
--config "min.cleanable.dirty.ratio=0.01"

이런식으로 토픽을 생성하여 실습한 결과

1. Key A: value1 넣음 -> [A:value1] 2. Key A: value2 넣음 -> [A:value1, A:value2] 3. Key B: value1 넣음 -> [ A:value2, B:value1] 4. Key B: value2 넣음 -> [A:value2, B:value1, B:value2]
이렇게 진행되었습니다. 삭제를 위한 tombstone메세지 또한 위와같은 형식으로 진행되었습니다.
제가 이해한 바로는 같은 키의 값이 들어오면 들어온 값으로 최신화 혹은 삭제를 하는 설정으로 이해를 해서

이 결과가 저는 이해가 되지않습니다. gpt한테 물어보니
로그 컴팩션은 "head"와 "tail" 두 부분으로 나뉩니다:

  1. Tail: 이미 컴팩션이 완료된 부분 (깔끔한 상태)

  2. Head: 아직 컴팩션되지 않은 활성 부분 (더티 상태)

예시:

1. A:value1 입력 -> [A:value1]
2. A:value2 입력 -> [A:value1, A:value2]
   (컴팩션 발생) -> [A:value2]  // A는 tail 부분으로 이동
3. B:value1 입력 -> [A:value2, B:value1]  // B는 head 부분에 위치

즉, A는 이미 컴팩션되어 tail 부분에 최신 값만 있고, B는 아직 head 부분에 있어서 컴팩션되지 않은 상태로 남아있는 것입니다.

이는 정상적인 동작이며, B도 시간이 지나면 컴팩션될 것입니다.
라는 답변을 받았지만 시간이 지나도 키가 B인 데이터는 여전히 두개로 남아있었습니다.
혹시 이유를 아신다면 설명해주시면 감사하겠습니다.

답변 2

0

권 철민님의 프로필 이미지
권 철민
지식공유자

안녕하십니까,

먼저 왜, log compaction을 적용하는 compacted topic을 사용하려 하시는지요?

동일 key값에 대해서 단 하나의 최신 값만을 유지하기를 원하시면 ksqldb에서 table로 생성하시면 됩니다. 굳이 compacted topic을 적용할 이유는 없어 보입니다.

뭔가 다른 이유가 있어서 compacted topic을 적용하려고 하신다면, compacted topic은 log compaction을 이용해서 topic의 사이즈를 줄이기 위함이 목표 입니다. 이건 실시간으로 수행 되지 않고, 파라미터로 설정된 조건을 만족하면 백그라운드로 수행됩니다. 자주 수행 되지 않습니다.

때문에 ksqldb에서는 Table이라는 개념을 사용해서 실시간으로 key값의 최신 value를 유지합니다.

아래는 Kafka에서 log compaction을 적용하기 위한 주요 설정값입니다. 반드시 compacted topic의 적용이 필요하다면 아래 주요 파라미터들을 참조하셔서 설정하시되, 실시간으로는 바로 반영되어 적용되지 않음에 유의하시면 좋으실 것 같습니다. 감사합니다.

image.png

 

 

0

인프런 AI 인턴님의 프로필 이미지

안녕하세요, 인프런 AI 인턴입니다. 행복한 족제비님.

카프카의 로그 컴팩션에 대해 질문 주셨군요. 로그 컴팩션은 동일한 키를 사용하는 메시지들 중에서 마지막 메시지만 유지하여, 불필요한 공간 소모를 방지하는 기능입니다. 동일한 키가 있는 모든 메시지를 즉시 삭제하는 것은 아니며, 일정 시간 후에 로그 컴팩션 과정을 통해 제거됩니다.

설정에서 주의하셔야 할 사항은 “실행 주기”와 “컴팩션의 경로”입니다. 로그 컴팩션은 즉시 일어나는 것이 아니라, 특정 설정에 따라 주기적으로 실행됩니다. 설정하신 --config 매개변수들이 영향을 주며, 특히 min.compaction.lag.ms는 새로운 메시지가 얼마나 오래 head에 있어야 하는지를 결정합니다.

여기서 min.cleanable.dirty.ratio가 아주 작은 값으로 설정되어 있네요. 이것이 작동을 막고 있는지 확인해 보세요. 이 값은 “컴팩션을 수행할 수 있는 상태가 언제인지”를 결정하는데, 너무 작게 설정되면 컴팩션이 자주 수행되지 않을 수 있습니다.

또한, 로그 컴팩션이 진행되기 위해서는 백그라운드 스레드에 의해 관리되므로 시간이 걸릴 수 있습니다. 특정 환경이나 설정에 따라 컴팩션이 예상보다 늦게 이루어질 수도 있습니다. 설정이 제대로 작동하는지 확인하기 위해 카프카의 로그 또는 메트릭을 검토해보시는 것도 방법입니다.

다음 링크에서 비슷한 주제로 이야기된 답변을 확인해 보실 수 있습니다:
- 로그 컴팩션 관련 설정에 대한 논의

저는 질문자님의 이해를 돕고자 지식공유자님의 기존 답변을 참고하여 유사한 답변 링크를 추천해드리고 있습니다.
현재 베타 기능으로 답변이 만족스럽지 않을 수 있는 점 양해 부탁드립니다. 🙏
추가적으로 궁금한 점이 있으시면, 이어서 질문해 주세요. 곧 지식공유자께서 답변해 주실 것입니다.