월 33,000원
5개월 할부 시다른 수강생들이 자주 물어보는 질문이 궁금하신가요?
- 미해결[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!
kafka 멀티 파티션 관련해 질문드립니다.
안녕하세요. 회사에 카프카 도입을 위해 강의를 수강중에 궁금한 사항이 있어 질문드립니다. 파티션을 여러개 만들어 사용하게 되면 같은 파티션 내에서는 순서를 보장할 수 있지만 멀티 파티션의 경우 파티션간의 순서는 보장하지 못하는걸로 아는대요 병렬처리가 되어야 처리량이 어느정도 확보가 될 것 같아 멀티파티션을 고려해야할 것 같은대 이러한 경우 멀티파티션에서도 순서를 보장할 수 있는 방법이 있을까요? 검색을 좀 해보니 스트림즈 사용시에 멀티파티션이어도 순서를 보장할 수 있다고 하는거 같기도한대 어떻게 하면 되는지 궁금합니다.
- 미해결[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!
스트림즈 DSL interval 기능
안녕하세요 강사님스트림즈DSL과 프로세서API 를 설명하는 부분에서 스트림즈 DSL이 일부기능을 지원하지 않는다고 하셨는데요 이중에서 스트림즈DSL에서는 인터벌마다 데이터를 처리하는 부분을 지원하지 않는다고 하셨는데요.강의 후반에 나오는 window 프로세싱을 이용하면 인터벌 데이터 처리도 가능한게 아닐까 라는 생각이 드는데요, 맞을까요? 어떤 부분에서 스트림DSL로는 인터벌마다 데이터를 처리하는게 안된다고 하신건지 궁금합니다!
- 미해결[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!
직렬화, 역직렬화 관련
안녕하세요. 좋은 강의 감사합니다.직렬화, 역직렬화 관련해서 질문이 있습니다.저는 Spring 프레임워크를 사용해서 프로듀서와, 컨슈머를 각각 따로 서버를 만들어서 개발 하고 있습니다.수업 내용은 단순히 String이지만 제가 실무에 사용하려고하는건 웹 애플리케이션 과 같이 DTO 클래스로받은 데이터를 그대로 프로듀서에서 send()에 담아서 보내고 있습니다. 컨슈머에서 DTO로 받으려고관련 자료를 찾아 보니 JsonSerializer가 있어 해보았습니다. Object 로 받아서 여러 DTO를 받을 수 있는Consumer factory를 만들다 보니 여러 에러가 발생해서 ObjectMapper를 사용했습니다.@Component public class ObjectMapperService { private final ObjectMapper objectMapper = new ObjectMapper(); public <T> T convertValue(String json, Class<T> valueType) throws JsonProcessingException { return objectMapper.readValue(json, valueType); } }@Component @Slf4j @Data @RequiredArgsConstructor public class KafkaConsumer { private final ObjectMapperService mapper; @KafkaListener(topics = "member-join", containerFactory = "commonKafkaListenerContainerFactory") public void receiveMemberJoin(ConsumerRecord<String, String> consumerRecord, Acknowledgment acknowledgment) throws Exception { String value = consumerRecord.value(); MemberJoin memberJoin = mapper.convertValue(value, MemberJoin.class); String recommendCode = memberJoin.getRecommendCode(); log.info("recommendCode: {}", recommendCode); log.info("received payload = {}", memberJoin.toString()); acknowledgment.acknowledge(); } }이런식으로 사용해서 DTO클래스로 사용했는데 실무에서는 어떤식으로 하는지 궁금합니다. 혹시 참고할 수 있는 정보가 있을까요?감사합니다.
- 미해결[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!
KStreamJoinKTable 실행시 에러
안녕하세요 KStreamJoinKTable 실행시 에러가 발생해서요 ㅠ 에러는 아래와 같습니다. [order-join-application-517e77af-95a4-4398-9800-1cef92f224d1-StreamThread-1] ERROR org.apache.kafka.streams.KafkaStreams - stream-client [order-join-application-517e77af-95a4-4398-9800-1cef92f224d1] All stream threads have died. The instance will be in error state and should be closed. [order-join-application-517e77af-95a4-4398-9800-1cef92f224d1-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [order-join-application-517e77af-95a4-4398-9800-1cef92f224d1-StreamThread-1] Shutdown complete Exception in thread "order-join-application-517e77af-95a4-4398-9800-1cef92f224d1-StreamThread-1" java.lang.UnsatisfiedLinkError: /private/var/folders/x8/qq1k6wyn1qs76952l1d1t0qh0000gn/T/librocksdbjni11881960449213041495.jnilib: dlopen(/private/var/folders/x8/qq1k6wyn1qs76952l1d1t0qh0000gn/T/librocksdbjni11881960449213041495.jnilib, 0x0001): tried: '/private/var/folders/x8/qq1k6wyn1qs76952l1d1t0qh0000gn/T/librocksdbjni11881960449213041495.jnilib' (mach-o file, but is an incompatible architecture (have 'x86_64', need 'arm64e' or 'arm64')), '/System/Volumes/Preboot/Cryptexes/OS/private/var/folders/x8/qq1k6wyn1qs76952l1d1t0qh0000gn/T/librocksdbjni11881960449213041495.jnilib' (no such file), '/private/var/folders/x8/qq1k6wyn1qs76952l1d1t0qh0000gn/T/librocksdbjni11881960449213041495.jnilib' (mach-o file, but is an incompatible architecture (have 'x86_64', need 'arm64e' or 'arm64')) at java.base/jdk.internal.loader.NativeLibraries.load(Native Method) at java.base/jdk.internal.loader.NativeLibraries$NativeLibraryImpl.open(NativeLibraries.java:388) at java.base/jdk.internal.loader.NativeLibraries.loadLibrary(NativeLibraries.java:232) at java.base/jdk.internal.loader.NativeLibraries.loadLibrary(NativeLibraries.java:174) at java.base/java.lang.ClassLoader.loadLibrary(ClassLoader.java:2389) at java.base/java.lang.Runtime.load0(Runtime.java:755) at java.base/java.lang.System.load(System.java:1953) at org.rocksdb.NativeLibraryLoader.loadLibraryFromJar(NativeLibraryLoader.java:78) at org.rocksdb.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:56) at org.rocksdb.RocksDB.loadLibrary(RocksDB.java:64) at org.rocksdb.RocksDB.<clinit>(RocksDB.java:35) at org.rocksdb.DBOptions.<clinit>(DBOptions.java:21) at org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:133) at org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:229) at org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48) at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:44) at org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48) at org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:58) at org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48) at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$init$0(MeteredKeyValueStore.java:101) at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806) at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:101) at org.apache.kafka.streams.processor.internals.AbstractTask.registerStateStores(AbstractTask.java:210) at org.apache.kafka.streams.processor.internals.StreamTask.initializeStateStores(StreamTask.java:275) at org.apache.kafka.streams.processor.internals.AssignedTasks.initializeNewTasks(AssignedTasks.java:76) at org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:397) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:779) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670) - 카프카를 공부하시면서 생긴 질문들을 남겨주세요. 상세히 작성하면 더 좋아요! - 먼저 유사한 질문이 있었는지 검색해보세요. - 서로 예의를 지키며 존중하는 문화를 만들어가요. - 잠깐! 인프런 서비스 운영 관련 문의는 1:1 문의하기를 이용해주세요.
- 미해결[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!
카프카 3버전
안녕하세요. 카프카를 실무 도입전 공부를 하고 있습니다. 강의에서 3버전대는 아직 실무에 도입한 사례가 없다고 하시는데 강의가 2년전이라서 요즈음에도 아직 3버전이 크게 상용화가 안되었나요? 첫 도입 하려고 하다보니 신경쓸게 많아 질문 드립니다.감사합니다
- 미해결[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!
오프셋 커밋 과정에서 장애 발생 시 카프카에서는 어떤 처리가 일어나는지 궁금합니다
자동 커밋 옵션을 활성화한 경우 일정 시간마다 오프셋 커밋을 실행하는 것을 이해했습니다. 결제 이벤트를 처리하는 카프카 컨슈머가 100건의 레코드를 가져와서 한번에 처리하도록 구현되어 있을 때 레코드 처리 중간에 장애가 발생하여 오프셋 커밋을 하지 못하는 경우 어떤 일이 일어나는지 궁금합니다. 50번째 레코드를 처리하다가 장애가 발생했다고 가정하면 오프셋 커밋이 되지 못했으므로 다른 컨슈머 애플리케이션이 이미 처리된 100건의 레코드를 다시 가져와서 1~50번째 레코드가 중복으로 이벤트가 처리될 것 같습니다. 이는 컨슈머 애플리케이션은 언제든지 중복으로 레코드를 소비할 수 있다는 것으로 이해가 되고, 컨슈머 애플리케이션 로직을 작성할 때는 항상 중복 레코드가 처리되지 않도록 방어해야한다는 것을 의미한다고 생각합니다. 제가 만약 방어 로직을 작성한다고 하면 레코드의 메시지마다 유일한 키값을 같이 전송하여 해당 값을 Redis에 저장하고, redis에 저장된 상태라면 메시지를 처리하지 않도록 구현할 것 같습니다. 제가 오프셋과 관련하여 이해한 내용이 정확한지, 그리고 실무에서도 Redis를 활용한 중복 처리 방지 로직이 효과적인 방법인지 궁금합니다. 또한, 이와 같은 상황을 해결하기 위해 다른 방법을 사용하시는지도 알고 싶습니다. 답변 주시면 감사하겠습니다!
- 미해결[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!
파티션 복제의 성능이슈는 없나요?
안녕하세요 강의 잘 보고 있습니다.복제강의 수강중 궁금한점이리더 - 팔로워 간의 복제시 팔로워에서 리더의 오프셋을 확인해 차이점이 있다면 팔로워로 저장한다고 이해했는데요.이러한 복제 과정으로 인해 리더파티션에 이슈가 생길만한 케이스가 있는지 궁금합니다.기본적으로 리더 파티션이 프로듀서 - 컨슈머와의 통신을 담당하기에 팔로워 파티션에서 복제과정을 담당하게 되는 형태인것 같아 문제가 없을것 같긴한데 .. 꼭 이렇게 생각하고 넘어가면 문제가 생기더라구요 ㅎ..혹시 복제과정이 리더 파티션 성능에 영향을 주게될만한 이슈가 있을지 우려되서 질문드립니다
- 미해결[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!
온 프라미스의 서버랙, 클라우드의 리전 장애에 대비할 수 있는 카프카 설정이 무엇인가요??
안녕하세요! 2강에서 설명해주신 레플리케이션 설정을 통한 고가용성 외에 서버 랙, 리전 장애에도 대비할 수 있는 카프카 설정이 무엇인지 궁금하여 질문드립니다.
- 미해결[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!
윈도우 연산시 주의해야할 사항 + 기타 질문
윈도우 연산시 주의해야 할 사항에텀블링 윈도우에 한해서 윈도윙이 5초간격이라면커밋 간격도 5초로 맞춰주면굳이 최신 데이터를 어떤 로직처리에 의해서 뽑아줄 필요가 없지 않을까요?스트림 처리할때만 커밋 간격을 윈도우 크기만큼 맞추고, 나머지는 디폴트나 설정한 값으로 처리하면 될것 같은데요이렇게 하는건 어떻게 생각하시는지 궁금합니다 자바의 main 말고 카프카의 데이터를 컨슘해서 프론트엔드로 서빙하는 스프링 예제provectuslabs/kafka-ui의 이미지를 사용해서 도커컴포즈로 브로커를 3대 띄우고 운영한다던지(아무래도 쿠버네티스 환경까지 들어가면 너무 깊어지니깐)스키마 레지스트리와 KSQL 등에 대한 것추가로 2, 3, 4번 예제는 강의에 없는것 같은데 혹시 집필하신 책에 나와있을까요? 많은 내용을 배워서 갑사합니다추가로 현업에서 사용되는 다양한 처리 방법을 조금 더 심도있게 배우고 싶습니다
- 미해결[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!
데이터가 많은 브로커에서는 ISR이 유지되기 어렵나요?
replication lag이라는게 존재하면, 데이터가 리더 파티션에 들어올 때 마다 약간의 지연 시간이 팔로워에서 발생한다는 건데, 그말인 즉슨 ISR이 거의 유지될 수 없다처럼 느껴져서요.. 이부분이 궁금합니다.
- 미해결[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!
retention.byte에 대해 질문있습니다.
안녕하세요. 카프카가 데이터를 삭제할 때 세그먼트, 즉 파일 단위로 삭제한다고 말씀해주셨습니다.retention.ms는 active 세그먼트가 아닌 세그먼트가 해당 기간을 지났을 때 삭제한다는 것이 명료해서 이해가 갔는데, retention.byte는 조금 헷갈립니다.세그먼트의 기본 크기가 1GB로 알고 있습니다. 만약 retention.ms를 사용하지 않는다고 가정하면 (매우 큰값이라고 가정한다면) 세그먼트 삭제는 retention.byte에 의해 좌우될텐데요, retention.byte를 1GB보다 크게 주는 경우, 세그먼트는 영영 삭제되지 않는 것일까요?
- 미해결[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!
파티션 복제
안녕하세요.카프카 클러스터 내에 여러 개의 브로커를 실행한 다음 토픽의 파티션을 복제하고 리더 파티션과 팔로워 파티션이 있는 구조를 구성해보고 싶습니다. 위와 같은 구조를 직접 구현해보고 싶은데 혹시 참고할 만한 게 있을까요?감사합니다.
- 미해결[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!
카프카 적용 시 API 를 조회 해야 된다면 어떻게 해야 되는지 궁금합니다.
카프카 인강을 듣고 처음 적용하려고 하고 있는데요.혹시 프로듀서에서 컨슈머로 데이터를 받고, API 통신을 4회 정도 하여 DB에 넣는다고 하면, 적정하지 않은 컨슈머 렉으로 진행되는 부분이 있지 않을까 고민이 되어서요.비동기 방식으로 진행하고 큐에 데이터를 쌓고 진행하려고 하는데, 큐에 데이터가 많이 쌓인다면 메모리 이슈로도 진행될 것 같아서요. 요점은프로듀서에서 컨슈머로 데이터가 들어오면 비동기 방식으로 큐에 쌓은다.큐에 쌓은 데이터를 API 통신으로 데이터를 가공한 후 DB에 넣어준다.이렇게 되면 카프카에서 컨슈머로 렉이 발생되지는 않겠지만 큐에 데이터가 쌓이면서 메모리 문제가 되지 않을까 고민이 있습니다.
- 미해결[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!
Kafka Offset 및 LAG 관련 질문
안녕하세요. 강의 수강 중 궁금한점이 있어 문의드립니다. [구성]파티션 - 5개컨슈머 - 5개 [문의]1. 컨슈머 offset과 lag 관련 문의프로듀서에서 메시지를 생성해서 Kafka로 전송하면 Offset값이 2씩 증가합니다.CURRENT-OFFSET - 2증가 (7 -> 9)LOG-END-OFFSET - 2증가 (8 -> 10)메시지가 소비되고 나면 LAG은 항상 1이 유지되고 있는데 어떤 부분을 체크해 봐야할까요? (추가확인 : LAG이 항상 1이 유지되는데 kafka-console.consumer.sh로 확인해보면 메시지는 없는데 LAG이 0으로 변경됩니다.) 2. 소비된 메시지가 다시 소비되는 현상프로듀서로 메시지 생성 후 컨슈머에서 메시지를 소비하였는데 한참 시간이 지난 후 새벽시간(12시간 이후)에 이미 처리된 메시지가 컨슈머에서 다시 처리되는 현상이 발생하는데 설정값에 따라 발생할 수 있는 현상일까요? 3. 이중화 (Active-Active) 구성일 경우 컨슈머 설정이중화 구성이 되어 있는 경우 컨슈머를 @KafkaListener( concurrency = "2")로 설정하면 컨슈머는 총 4개로 운영되는 구조가 맞는지 궁금합니다. 감사합니다.
- 미해결[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!
영상 데이터 처리 문제
안녕하세요. 수업 잘 듣고 있습니다.다름이 아니라, 카프카 수업을 듣고, 영상을 카프카로 넘겨서 다시 웹으로 받는 작업을 테스트 하고 있습니다.그런데 캠cam을 연결할 때는 제대로데이터가 잘 넘어가서 잘 보여지는데,rtmp와 같은 미디어 서버에서 받은 영상 정보를카프카로 넘기면영상이 진행이 안되고, 계속 1초 전으로 되돌아가는 이상한 현상이 발생합니다.마치 재생을 시켰더니 다시 처음부터 재생하는 느낌...이 해결을 위해서 카프카 설정을 만져야 하는지프로듀서 쪽을 건들어야 할 지 감이 안 와서 질문 드립니다.감사합니다.
- 미해결[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!
CCDAK 질문있습니다!
안녕하세요, 이 강의를 듣고 CCDAK 도전해보려하는데요시험 문제를 푸는 것 외에 다른 평가 요소가 있을까요?예를 들면 감독관과 인터뷰를 진행해야 한다던지 만약 그렇다면 모든 과정은 영어로 진행되는 것인지 궁금합니다!
- 미해결[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!
토픽 파티션 브로커 구조
안녕하세요, 토픽, 파티션, 브로커의 구조가 잘 이해가 가지 않아서 질문 드립니다. 브로커안에 토픽이 있고 그 토픽은 1개 이상의 파티션들로 구성되어 있는 것 인가요? 파티션 한개가 있으면 토픽 한개는 무조건 존재한다고 할 수 있는건가요? 파티션 여러개를 묶어서 구분할때 지칭하기 위해 토픽이라는 말을 사용하는 것 일까요?
- 해결됨[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!
connect 관련 질문
안녕하세요 카프카 컨넥트 공부중 schema-registry, debezium 의 존재에 대해서 알게 되었습니다connect + schema-registry + debezium plugin 을 이용하면 source db에 실시간적으로 등록되는 데이터를 sink db 에 내가 원하는 포맷에 맞게 동기화 시켜주는 것으로 이해하였는데 맞는건가요?이때 제가 이해한 바로는 카프카 connector, task를 직접 개발해서 jar 파일을 올리지않고 registry schema 에 avro 를 등록해주면 connector 에서 registry 에 등록된 avro 를 읽고 source db 에서 sink db 로 데이터가 동기화 되는건가요?제가 생각한 예제 (col1, col2,col3 소스테이블에서 from_col1, from_col2 싱크테이블로 동기화)(source) col1[varchar], col2[int], col3[datetime](sink) from_col1[varchar], from_col2[int]만약 이게 아니라면 schema-registry 의 용도가 뭔지 알수 있을까요????
- 미해결[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!
오프셋 커밋이 실패된 이후 consume 하는 경우 문의
안녕하세요.컨슈머에서 오프셋 커밋이 네트워크 등의 장애로 실패하고나면 동일한 컨슈머에서는 해당 오프셋을 다시 읽게 되나요?아니면, 해당 오프셋은 skip 되나요?예를 들어서, 아래와 같이 한 개의 파티션에 10개의 레코드가 있는 경우를 가정해볼게요.(비동기 수동 커밋 가정, 가져오는 레코드 개수는 1개씩) 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 1 ~ 2 번까지는 오프셋 커밋이 정상적으로 이루어지고, 3번을 제외한 4번까지 오프셋 커밋이 되었다고 가정해보면, 다음 poll() 메서드에서는 5번 오프셋을 가져오게 되나요?아니면 3번 오프셋을 다시 조회하게 되나요? 감사합니다.
- 미해결[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!
컨슈머 --partition 옵션
producer와 consumer는 리더 파티션과 통신해야한다고 알고 있었는데요, --partition 옵션으로 지정하는 파티션이 리더 파티션이 아닌 경우에도 메시지를 소비하는 것이 가능한것인가요?