작성
·
713
0
안녕하세요. 강사님
좋은 강의 잘 듣고 있습니다. 카프카 관련해서 아래와 같은
에러가 발생하여 질문 드립니다.
에러 상황은 sql insert문으로 데이터가 잘 들어가졌는데,
producer를 이용하여 json구조의 데이터를 넣은 후 부터 정
상적으로 동작하지 않고, sink의 status를 보면 아래와 같은 에
러가 보입니다.
원인과 해결방법을 알려주시면 감사하겠습니다.
에러 -
json구조 데이터 -
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"user_id"},{"type":"string","optional":true,"field":"pwd"},{"type":"string","optional":true,"field":"name"},{"type":"int64","optional":true,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"created_at"}],"optional":false,"name":"users"},"payload":{"id":9,"user_id":"admin12","pwd":"1111234","name":"admin","created_at":1628604236000}}
답변 3
2
안녕하세요. 강사님
관련하여 위 에러는 producer에서
위 형식이 아닌 다른 형식으로 메시지를 보내게 되면
db insert를 하지 못하는 에러가 발생하고,
그 후에 정상적인 메시지형태(db insert할 수 있는)로 보내더라도 기존에 잘못 보냈던 메시지가 처리되지 않아 발생하는 에러 인 것으로 보입니다.
sink, source를 지우고 producer에서 정상적인 형태로만 메시지를 보내보니 디비에 정상 저장되는 것을 확인하였는데,
그렇다면 위 처럼 잘못된 메시지가 온 경우 이를 무시하거나 삭제하고 다음 메시지를 이어서 처리하는 방법이 있을까요?
0
안녕하세요. 강사님
파일 첨부가 안돼서 아래에 텍스트로 첨부하였습니다.
불편하시겠지만 확인 부탁드리겠습니다.
감사합니다.
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler\r\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)\r\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)\r\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:495)\r\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:472)\r\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:328)\r\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)\r\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:204)\r\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)\r\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)\r\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\r\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\r\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\r\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\r\n\tat java.lang.Thread.run(Thread.java:748)\r\nCaused by: org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires \"schema\" and \"payload\" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.\r\n\tat org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:370)\r\n\tat org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)\r\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:495)\r\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)\r\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)\r\n\t... 13 more\r\n
0
안녕하세요, 이도원입니다.
query를 이용해서 처리했을 때는 sink connect도 잘 작동되지만, Producer를 이용하였을 때는 오류가 발생한다는 문제는 Producer에서 전달하는 message가 DB에 저장될 수 있는 형식이 아니어서 그런것 같습니다. 위에 올려주신 에러 메시지는 확인하기에 너무 작네요.(ㅡ.ㅡ) 전체 에러 메시지를 텍스트 형식이어도 좋으니 다시 한번 첨부해 주시면 확인하는데에 도움이 될 것 같습니다.
감사합니다.