인프런 커뮤니티 질문&답변

mj Song님의 프로필 이미지
mj Song

작성한 질문수

Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA)

Orders Microservice 수정 - Order Kafka Producer

kafka sink 생성 시 tasks state failed

작성

·

878

0

안녕하세요 강사님!

강의 잘보고 있습니다.

강의시간 18:58에 sink 생성 후 status 확인 값에서 tasks.state 값이 failed라고 뜹니다.

그래서 connect 로그를 보면 아래와 같이 뜹니다.

[2023-05-02 00:04:12,636] ERROR [my-order-sink-connect|task-0] WorkerSinkTask{id=my-order-sink-connect-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:196)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:223)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:149)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:516)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:493)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:332)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
        at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:328)
        at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:88)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$4(WorkerSinkTask.java:516)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:173)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:207)
        ... 13 more

 

구글링 해봐도 원하는 답변들이 없어 남겨봅니다.

답변 2

0

mj Song님의 프로필 이미지
mj Song
질문자

자답합니다...

이유는 모르겠으나 등록했던 connector, topic 전부 날리고 새로 시작했더니 되네요...

허허...

0

mj Song님의 프로필 이미지
mj Song
질문자

추가적으로 spring 단에서는 별다른 에러는 없습니다.

kafka-console-consumer로 orders topic 로그를 보면 정상적으로 나옵니다.

다만 db에 데이터가 적재가 안되는데 connector에 tasks가 작동 중이 아니라 그런 것 같습니다.

mj Song님의 프로필 이미지
mj Song

작성한 질문수

질문하기