Sink connect 등록시 에러가 납니다.
선생님, 위 에러 원인 찾다가Kafka plug-in 설치된 곳을 가보니/usr/local/kafka/connector-plugin/confluentinc-kafka-connect-avro-converter/lib/경로안에 "guava-30.1.1-jre.jar" 이 파일이 없어서 제가 추가 했습니다.스키마에는 잘 등록 되어서 아래와 같이 나왔습니다. ubuntu@ip-172-31-46-33:~$ http http://localhost:8081/schemasHTTP/1.1 200 OKContent-Encoding: gzipContent-Length: 195Content-Type: application/vnd.schemaregistry.v1+jsonDate: Mon, 13 Mar 2023 06:50:47 GMTVary: Accept-Encoding, User-Agent[{"id": 2,"schema": "{\"type\":\"record\",\"name\":\"Key\",\"namespace\":\"source_mysqldb.hesdp_mgr.TB_ME_TERMS_BAS_TEST\",\"fields\":[{\"name\":\"TERMS_VER_ID\",\"type\":\"string\"}],\"connect.name\":\"source_mysqldb.hesdp_mgr.TB_ME_TERMS_BAS_TEST.Key\"}","subject": "source_mysqldb.hesdp_mgr.TB_ME_TERMS_BAS_TEST-key","version": 1}] source 쪽에 등록은 정상적으로 되었는데 sink 쪽 등록시 아래와 같이 에러가 납니다. curl --location --request POST 'http://localhost:8083/connectors' \--header 'Content-Type: application/json' \--data-raw '{"name": "mysql-sink-connector-test","config": {"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector","tasks.max": "1","topics": "source_mysqldb.hesdp_mgr.TB_ME_TERMS_BAS_TEST","connection.url": "jdbc:mysql://lg-mysql:3306/aurora2","connection.user": "sink","connection.password": "sink1234","table.name.format": "aurora2.TB_ME_TERMS_BAS_TEST","insert.mode": "upsert","pk.fields": "TERMS_VER_ID","pk.mode": "record_key","delete.enabled": "true","key.converter": "io.confluent.connect.avro.AvroConverter","value.converter": "io.confluent.connect.avro.AvroConverter","key.converter.schema.registry.url": "http://lg-schema01:8081","value.converter.schema.registry.url": "http://lg-schema01:8081"}}' ubuntu@ip-172-31-0-29:/data/kafka-logs$ curl -X GET http://localhost:8083/connectors/mysql-sink-connector-test/status | jq '.'% Total % Received % Xferd Average Speed Time Time Time CurrentDload Upload Total Spent Left Speed100 2913 100 2913 0 0 176k 0 --:--:-- --:--:-- --:--:-- 284k{"name": "mysql-sink-connector-test","connector": {"state": "RUNNING","worker_id": "172.31.10.77:8083"},"tasks": [{"id": 0,"state": "FAILED","worker_id": "172.31.10.77:8083","trace": "org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:223)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:149)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:513)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:493)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:332)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic source_mysqldb.hesdp_mgr.TB_ME_TERMS_BAS_TEST to Avro: \n\tat io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:124)\n\tat org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:88)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$3(WorkerSinkTask.java:513)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:173)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:207)\n\t... 13 more\nCaused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!\n\tat io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.getByteBuffer(AbstractKafkaSchemaSerDe.java:244)\n\tat io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.(AbstractKafkaAvroDeserializer.java:334)\n\tat io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:202)\n\tat io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:172)\n\tat io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:107)\n\t... 17 more\n"}],"type": "sink"} 질문1. 제가 Guava file 을 kafka connect avro converter 설치된 lib 에 추가 하는게 맞나요? 질문2. 바로 위에 에러 난건 Sink connect 등록시 무었을 누락 시켜서 일까요?