묻고 답해요
141만명의 커뮤니티!! 함께 토론해봐요.
인프런 TOP Writers
-
미해결Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA)
카프카 sink connector 사용시 에러
안녕하세요. 카프카 관련 수업 듣는 중 오류가 발생하여 질문 드립니다.org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:501)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:478)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:328)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)\n\tat java.base/java.lang.Thread.run(Thread.java:833)\nCaused by: org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error: \n\tat org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:366)\n\tat org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertValue(WorkerSinkTask.java:545)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:501)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)\n\t... 13 more\nCaused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unexpected character (':' (code 58)): was expecting comma to separate Object entries\n at [Source: (byte[])\"{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"user_id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"pwd\"},{\"type\":\"string\",\"optional\":true,\"field\":\"name\"},{\"type\":\"int64\",\"optional\":true,\"name\":\"org.apache.kafka.connect.data.Timestamp\",\"version\":1,\"field\":\"created_at\"}],\"optional\":false,\"name\":\"users\"},\"payload\":{\"id”:4,”user_id\":\"user4”,”pwd\":\"1234\",\"name\":\"username4”,”created_at\":1671277849000}}\"; line: 1, column: 433]\nCaused by: com.fasterxml.jackson.core.JsonParseException: Unexpected character (':' (code 58)): was expecting comma to separate Object entries\n at [Source: (byte[])\"{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"user_id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"pwd\"},{\"type\":\"string\",\"optional\":true,\"field\":\"name\"},{\"type\":\"int64\",\"optional\":true,\"name\":\"org.apache.kafka.connect.data.Timestamp\",\"version\":1,\"field\":\"created_at\"}],\"optional\":false,\"name\":\"users\"},\"payload\":{\"id”:4,”user_id\":\"user4”,”pwd\":\"1234\",\"name\":\"username4”,”created_at\":1671277849000}}\"; line: 1, column: 433]\n\tat com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1840)\n\tat com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:712)\n\tat com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:637)\n\tat com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextFieldName(UTF8StreamJsonParser.java:1011)\n\tat com.fasterxml.jackson.databind.deser.std.BaseNodeDeserializer.deserializeObject(JsonNodeDeserializer.java:250)\n\tat com.fasterxml.jackson.databind.deser.std.BaseNodeDeserializer.deserializeObject(JsonNodeDeserializer.java:258)\n\tat com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:68)\n\tat com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:15)\n\tat com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4270)\n\tat com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2734)\n\tat org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:64)\n\tat org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:364)\n\tat org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertValue(WorkerSinkTask.java:545)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:501)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:501)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:478)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:328)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)\n\tat java.base/java.lang.Thread.run(Thread.java:833)\n발생 에러는 다음과 같은데,터미널에서 직접 produce 하는 도중 발생하였는데 payload 의 데이터가 잘못 전송 되어서 해당 오류가 발생 한 것 같아서 기존 users 테이블에 다시 insert 하는 방식으로 다시 사용했을 때 이전에 실패한 task 가 남아있어서 여전히 my_topic_users 테이블에 insert 되지 않았습니다.중간에 발생한 task 는 삭제하거나 임의로 건너 뛰거나 할 수 없는 것인가요?
-
미해결Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA)
윈도우에서 카프카 토픽 삭제 시, 카프카가 실행이 안됩니다..
https://www.inflearn.com/questions/644825제가 예전에 올린 글입니다.질문한 글에 댓글을 또 달았었는데 도저히 해결을 할 수 가 없어서 다시 질문올립니다. 제가 윈도우 환경에서 카프카 토픽을 삭제 했는데그 뒤로 답글로 제시해주신 해결책을 해보려고 해도 해볼 수가 없었습니다..즉, C:\Temp 디렉토리 하위에 zookeeper, kafka 폴더가 전혀없고저의 C:\Temp 아래는 폴더 구조는 다음과 같습니다..2020(폴더)HncDownload(폴더)Service.logAUtempR(폴더 - 안에 아무것도 없음) 그리고 기존 에 사용하고 있던 Kafka환경 폴더(강의에서 제공해주신 폴더)를삭제하고 다시 압축을 풀어서 압축을 푼 경로에zookeeper와 kafka를 실행해도 zookeeper는 실행되고 kafka는 실행되지 않습니다...즉, 동일한 에러가 발생하네요..대체 어떤 걸 지워야지 다시 kafka서버가 정상 실행될까요? 참고로 자료 올려주신 윈도우용 카프카 파일 전부 삭제하고 다시 압축 풀고 설정해도토픽목록은 동일한 에러로 안보이고, 카프카 역시 동일한 에러로 켜지지 않습니다..감사합니다.
-
미해결Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA)
카프카 토픽안쓰고 저장하면 대용량 처리에 불리한 이유가 있나요?
안녕하세요 강의를 듣다가 궁금증이 카프카 토픽을 안쓰고 저장하면 대용량 처리에서 불리한 이유가 있나 싶어 궁금해 졌습니다. 우선 인터넷에서 검색해서 찾아본 봐로는그러면 카프카 사용하면 대용량 처리에 유리한 이유가병렬처리에 의한 데이터 처리율 향상 : 카프카는 아래 보실 아키텍처에 보면 데이터를 병렬로 처리함으로서 데이터를 빠르고 효과적으로 처리할 수 있습니다. disk에 순차적으로 데이터를 적재하기 때문에 임의 접근(random access) 방식보다 훨씬 더 빠르게 데이터를 처리합니다.데이터 유실 방지 : disk에 적재되기 때문에 만약 불의의 사고로 서버가 다운되었을 시에도 데이터가 유실되는 일 없이 재시작하여 기존 데이터를 안정적으로 처리 가능합니다.클러스터링에 의한 고가용성 서비스 : Scale-out이 가능하여 시스템 확장이 용이하며 어떤 하나 혹은 몇 개의 서버가 다운되도 서비스 자체가 중단될 일 없이 시스템이 운용가능합니다.출처 : https://engkimbs.tistory.com/691정도로 보이는데요 그렇다면토픽을 사용하는 경우와카프카 커넥트를 사용하는 경우가 있었는데 (프로듀서, 컨슈머를 파이프라인을 매번 구성하기 힘들어서 사용하는 것으로 알고 있습니다.)그러면 우선 첫번째 질문은토픽을 사용할 때와 카프카 커넥트를 사용할 때 모두 카프카가 알아서 병령처리를 해주고 데이터 유실 방지를 해주는 건가요?? 또한 두번째 질문은 강의에서 order-service를 여러 개 띄우고 주문 요청을 여러 번 하면여러 개의 order-service의 db에 나눠서 저장을 하는 식으로 했는데요현업에서 MSA환경에서 프로젝트할 때도예를들면 order-service, user-service, catalog-service 각각 db를 하나씩 사용하는게 일반적인 방법인가요? 즉, 각 서비스마다 db를 하나씩 두는 것이 일반적인 방법인가요? 세번째 질문은 강의에서order-service를 여러개 띄우고 하나의 db에 저장하는 방식을 사용했는데즉, Kafka Topic에 설정된 Kafka Sink Connect를 사용해 단일 DB에 저장 했서 데이터를 동기화 했는데.그러면 카프카 커넥트를 사용하지 않고는 여러개의 order-service를 단일 db에 저장할 수 없나요?
-
미해결Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA)
kafka sink connector 관련 질문드립니다
안녕하세요, 도원님. 좋은 강의 덕분에 MSA 관련 지식이 많이 느는 것 같아 감사합니다. 다름이 아니라, kafka sink connector 쪽 관련해서 문의드리고 싶은 내용이 있어 질문을 올리게 되었습니다. confluent 의 kafka sink connector 공식 문서를 봤을 때, sink connector 는 at least once delivery 를 보장한다고 하는데, 만약 그런 경우가 발생하면 실제로 같은 데이터가 2번 적재되는 현상이 생길 거 같습니다. 이런 경우를 위해 따로 처리를 하는 로직을 넣어야하는지, 아니면 그냥 sink connector 내부에서 밀어넣었는데 에러가 나고 이를 넘기는 방향으로 처리를 하는지 여쭤보고 싶습니다.
-
미해결Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA)
카프카 에러메시지 삭제에 관해 질문이 있습니다.
안녕하세요, 처음 질문을 남겨보는것 같은데, 우선 강사님 강의에서 정말 많이 배우고 있습니다. 제가 강의를 듣고 따라하던중 Schema의 Field datatype을 string으로 적어야 하는데 String으로 잘못 적은 문제가 있었습니다. 이때 발생한 에러 메시지들이 topic에 쌓여서 지속적으로 문제가 발생했어서 topic의 메시지는 kafka-topic.sh에 retention.ms를 짧게 바꿨다가 늘리는 방법으로 해결을 했습니다. 질문드리는 내용은 1. 혹시 이와같이 kafka 작업중 에러로인해 잘못된 메시지가 쌓일경우, 이것을 처리하는 방법이 위의 방법이 가장 나은지(해결하지 않으니 잘못된 메시지들을 불러오면서 계속 에러를 발생시키더라구요) 2. 두번째로는 이러한 에러메시지들이 kafka connect에도 쌓여서 서버 기동시마다 에러메시지를 쭈욱 불러옵니다. 가동에는 문제가 없으나, 혹시 깔끔하게 정리할 수 있는 방법이 있으면 알려주시면 감사하겠습니다. 좋은 강의 해주셔서 감사드립니다.
-
미해결Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA)
안녕하세요! 질문이 있습니다.
안녕하세요! 강의 들으면서 잘 안되는 부분이 있어서 질문드립니다. 해당영상의 주문 추가 api를 수행하는 과정에서 kafka로 메시지가 정상적으로 보내지지 않는 문제가 발생했습니다. 에러 메세지는 다음과 같았습니다. Send failed; nested exception is org.apache.kafka.common.errors.TimeoutException: Topic example-catalog-topic not present in metadata after 60000 ms. 카프카에 example-catalog-topic이 존재하지 않아서 메세지를 전송하는 과정에서 timeout이 발생하는 것 같은데요. 혹시, 카프카에 컨테이너에 직접 접속해서 topic을 수동으로 만들어주는 과정이 필요한가 해서, 직접 docker exec ... 명령어로 카프카 컨테이너에 접속한 후 다음과 같은 명령어를 수행 시켜도 토픽 리스트를 읽지 못하고 명령어가 정상적으로 수행되지 않는 것으로 보였습니다. (172.18.0.101이 아닌 127.0.0.1 도 똑같이 수행되지 않았습니다.) 혹시 강의내용 외에, 추가적인 설정이 더 필요한 것인지 궁금합니다. 참고로, order-service의 KafkaProducerConfig 클래스에서 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.18.0.101:9092"); 다음과 같이 말씀해주신대로 ip를 지정해 주었고, docker-compose-single-broker.yml 파일은 다음과 같이 강의내용과 동일하게 작성했습니다. version: '2' services: zookeeper: image: wurstmeister/zookeeper ports: - "2181:2181" networks: my-network: ipv4_address: 172.18.0.100 kafka: image: wurstmeister/kafka ports: - "9092:9092" environment: KAFKA_ADVERTISED_HOST_NAME: 172.18.0.101 KAFKA_CREATE_TOPICS: "test:1:1" KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 volumes: - /var/run/docker.sock:/var/run/docker.sock depends_on: - zookeeper networks: my-network: ipv4_address: 172.18.0.101 networks: my-network: name: ecommerce-network # 172.18.0.1 ~
-
미해결[데브원영] 아파치 카프카 for beginners
controller와 leader의 차이점
안녕하세요! kafka 설치경로의 logs 파일들을 보다가 궁금한 점이 있어서 남깁니다. controller 관련 로그가 있던데요. 찾아보니 broker들중 하나가 controller가 되는것같더라구요. broker 하나가 leader 역할도 하는데, 이 controller와의 차이점이 명확하지않아서 문의드립니다. 감사합니다!