소개
#Kafka #Streaming #DataEngineer
- 카카오 데이터 엔지니어(전: SK플래닛)
- 저서
- 아파치 카프카 애플리케이션 프로그래밍 with 자바
- 예스24: https://bit.ly/3uFmhpF
- 교보문고: https://bit.ly/39Pk0Ak
- 알라딘: https://bit.ly/3a3Xa7T
- 실시간 데이터 파이프라인 아키텍처
- 예스24: https://bit.ly/3JjY96j
- 교보문고: http://bit.ly/3WEcgGJ
- 알라딘: https://bit.ly/3Hcbwmz
- 아파치 카프카 애플리케이션 프로그래밍 with 자바
강의
전체 2로드맵
전체 1수강평
- [아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!
- [아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!
- [데브원영] 아파치 카프카 for beginners
- [아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!
게시글
질문&답변
2024.10.22
kafka retention 관련하여 질문드립니다.
안녕하세요!문의주신 내용 답변드립니다.세그먼트의 마지막 레코드가 전송된 이후, 말씀하신 바와 같이 저장되지 마자 삭제되는 상황이 발생할 가능성은 적습니다. 왜냐면, 액티브 세그먼트가 아닌 세그먼트로 변경된 시점을 기준으로 retention.ms가 지나면 삭제되기 때문입니다. retention.ms가 극도로 작다면 말씀하신바와 같이 적재와 함께 삭제될 수 있지만, 일반적으로 24hour 이상으로 설정되는 이상 적재와 함께 삭제되지는 않습니다. 그리고 말씀하신바와 같이 장애가 발생했을 때 메시지를 확인하고 싶은 니즈가 있다면 retention.ms를 충분히 길게 설정하시는 것을 추천드립니다.추가로 레코드 단위로 retention은 일반적인 스트림 상황에서 적용할 수 없습니다. kafka-streams에서 materialized view로 활용할 경우에만 key에 null을 넣는 경우도 있지만, 특수하게 사용하는 부분입니다.
- 0
- 2
- 48
질문&답변
2024.10.09
브로커의 장애복구 이후 처리과정에 대해서 질문드립니다.
안녕하세요. unclean.leader.election.enable=true 일때 ISR이 아닌상태에서 브로커 장애가 발생한 경우 프로듀서의 acks 옵션에 따라 두가지 방식으로 진행됩니다.1) producer acks=all 인 경우특정 브로커에 레코드를 전송을 완료하고 복제되기를 기다리다가 장애가 발생하는 경우일 것입니다. 브로커 입장에서는 해당 레코드가 복제되지 않았다는 것을 알고 있으며 프로듀서는 응답을 받지 못해 timeout이 발생하게 됩니다. 이에 따라 retry를 진행해서 레코드를 승급된 리더 파티션(이전엔 팔로워 였음)으로 전송하므로 레코드는 안전하게 재전송됩니다. 2) producer acks=1인 경우 특정 브로커에 레코드를 전송을 완료하고 장애가 발생하게 되는 경우일 것입니다. 브로커 입장에서 해당 레코드가 복제되지 않았고, 프로듀서 입장에서는 정상 전송되었다고 판단하므로 해당 레코드는 유실 시키고 다음 레코드를 프로듀서가 전송하게 됩니다.
- 0
- 2
- 97
질문&답변
2024.09.12
카프카의 도입 시기를 결정하는 노하우가 더 있을까요?
안녕하세요!현재 카프카 도입을 고려하고 있으나, 적당한 상황인지 고민이신것으로 이해됩니다. 말씀하신 사항을 고려하여 문의주신 내용 답변드립니다.1) 웬만한 대기업, 대형 서비스가 아니고서야 Kafka 도입은 오버스펙일까요?아닙니다. 카프카는 분산 이벤트 스트리밍 플랫폼으로써 타 플랫폼으로는 대체 불가능한 기준으로 자리잡고 있습니다. 그만큼 카프카의 특성은 이벤트 데이터를 실시간 처리하는데 매우 특화되어 있습니다. 그렇기 때문에, 실시간 데이터를 다루고 로직상 스트림 프로세싱이 필요하다면 카프카 도입은 시간 문제라고 볼 수 있겠습니다. 또한, 카프카는 작은 규모의 클러스터를 구축하여 소규모 데이터부터 시작할 수 있으며, 브로커 스케일 아웃을 통해 추후 커질 수 있는 대규모 데이터도 커버 가능하므로 대형, 대기업이 아니더라도 사용할 수 있고, 이미 많이 사용되고 있습니다. 2) 저희 회사처럼 단일 서비스에 대한 데이터 처리와 고가용성을 확보하려고 하더라도 Kafka 도입이 의미가 있을까요? 강의에서 언급된 인스턴스 스펙보다 낮은 수준의 인스턴스에서 Kafka 를 실행하는 건 괜찮을까요?단일 서비스에 대해서 스트림 데이터를 처리하면서 안전하게 데이터를 처리하기 위해서는 카프카 도입이 적당할 것으로 보입니다. 말씀하신 사항을 보면 100MB/s, 1,000,000TPS 수준의 데이터는 결코 작지 않으며 프로세싱을 위해서는 분산 처리가 필수적인데, 이러한 상황을 고려하면 카프카의 도입은 충분히 의미가 있을 거라 생각됩니다. 그리고 인스턴스 스펙은 항상 사용하는 환경에 따라 다를 수 있기 때문에 제안하는 수준의 스펙보다 크거나/작음 보다는 다루고자 하는 데이터의 크기와 양을 내부적으로 충분히 테스트하시고 결정하면 될것 같습니다. 3) Kafka 는 어느 정도의 트래픽이 발생해야 도입이 유의미할까요?절대적인 데이터 양으로 카프카의 도입 여부를 결정하기는 어려울 것 같습니다만, 굳이 정해보자면 100TPS 이상이며 스트림 프로세싱(window, mapping, aggregation 등)이 필요하다면 카프카 도입을 고려할 것 같습니다. 4) Kafka 도입을 고려하는 시점에 대한 의사결정 요소에 어떤 것들이 있을까요? 키워드 위주로만이라도 설명해주시면 제가 한 번 조사해보도록 하겠습니다.앞서 몇번 더 설명한 것과 같이, '분산', '고가용성', '스트림 프로세싱' 이 가장 중요한 키워드 일것 같습니다. 이러한 스트림 프로세싱 플랫폼 없이는 적절한 로직 개발이 매우 어렵기 때문입니다. 배치처리 또는 마이크로 배치처리로 요구사항을 만족시킨다면 문제 없겠습니다만, 스트림 처리에 특화된 기능들(window, aggregation 등) 그리고 대규모 데이터를 안정적이고 낮은 지연(latency)로 처리하기 위해 distributed processing을 만족시키기 위해서는 카프카가 많은 부분 도와줄 것이라 생각됩니다. 감사합니다. 추가적으로 궁금한 사항 있으면 편하게 문의주세요~
- 0
- 2
- 131
질문&답변
2024.09.02
min.insync.repllicas, acks옵션, 그리고 리더 파티션 승급
안녕하세요!리더 파티션 1개, 팔로워 파티션 2개가 존재할 때, 팔로워를 승급시키는 기준은 여러가지가 있습니다. ISR에 포함된 팔로워 파티션(브로커)인지, replica가 잘 되고 있는지가 가장 중요할 것 같습니다. 관련 코드는 다음과 같습니다.object PartitionLeaderElectionAlgorithms { def offlinePartitionLeaderElection(assignment: Seq[Int], isr: Seq[Int], liveReplicas: Set[Int], uncleanLeaderElectionEnabled: Boolean, controllerContext: ControllerContext): Option[Int] = { assignment.find(id => liveReplicas.contains(id) && isr.contains(id)).orElse { if (uncleanLeaderElectionEnabled) { val leaderOpt = assignment.find(liveReplicas.contains) if (leaderOpt.isDefined) controllerContext.stats.uncleanLeaderElectionRate.mark() leaderOpt } else { None } } } ... 생략상기 코드는 실제 카프카 코드로 오프라인이 발생했을 때 리더를 선정하는 코드로 질문에 대한 답이 될 것 같네요!https://github.com/apache/kafka/blob/3.8.0/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
- 0
- 1
- 44
질문&답변
2024.08.11
커밋 관련 질의
안녕하세요. 카프카에서 제공하는 기본 Kafka-clients 라이브러리를 사용하고 계신다면 말씀하신 상황들에서 커밋을 수행할 수 있습니다. 리스너라고 말씀하신걸 보니 스프링 카프카를 언급하신것 같은데, 만약 스프링 카프카에서 커밋을 수행하고 싶으시다면 shutdown hook에서 실행되고 있는 컨슈머 클라이언트 객체를 가져와서 커밋을 수행하셔야 합니다. 관련해서 acknowledgment 인터페이스를 참고하시면 좋을것 같습니다.https://docs.spring.io/spring-kafka/docs/current/api/org/springframework/kafka/support/Acknowledgment.html
- 0
- 2
- 84