해결된 질문
작성
·
1.2K
·
수정됨
1
안녕하세요. 선생님
Confluent AVRO Install 하고 나서 sink connect 등록하고 상태를 확인 해보니 아래와 같이 에러가 발생 했습니다.
ubuntu@ip-172-31-0-29:~$ curl --location --request POST 'http://localhost:8083/connectors' \
--header 'Content-Type: application/json' \
--data-raw '{
"name": "mysql-sink-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "source_mysqldb.hesdp_mgr.TB_ME_TERMS_BAS",
"connection.url": "jdbc:mysql://lg-mysql:3306/aurora2",
"connection.user": "sink",
"connection.password": "sink1234",
"table.name.format": "aurora2.TB_ME_TERMS_BAS",
"insert.mode": "upsert",
"pk.fields": "TERMS_VER_ID",
"pk.mode": "record_key",
"delete.enabled": "true",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://lg-schema01:8081",
"value.converter.schema.registry.url": "http://lg-schema01:8081"
}
}'
{"name":"mysql-sink-connector","config":{"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector","tasks.max":"1","topics":"source_mysqldb.hesdp_mgr.TB_ME_TERMS_BAS","connection.url":"jdbc:mysql://lg-mysql:3306/aurora2","connection.user":"sink","connection.password":"sink1234","table.name.format":"aurora2.TB_ME_TERMS_BAS","insert.mode":"upsert","pk.fields":"TERMS_VER_ID","pk.mode":"record_key","delete.enabled":"true","key.converter":"io.confluent.connect.avro.AvroConverter","value.converter":"io.confluent.connect.avro.AvroConverter","key.converter.schema.registry.url":"http://lg-schema01:8081","value.converter.schema.registry.url":"http://lg-schema01:8081","name":"mysql-sink-connector"},"tasks":[],"type":"sink"}
ubuntu@ip-172-31-0-29:~$ curl -X GET http://localhost:8083/connectors/mysql-sink-connector/status | jq '.'
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 1898 100 1898 0 0 31616 0 --:--:-- --:--:-- --:--:-- 32169
{
"name": "mysql-sink-connector",
"connector": {
"state": "RUNNING",
"worker_id": "172.31.13.238:8083"
},
"tasks": [
{
"id": 0,
"state": "FAILED",
"worker_id": "172.31.13.238:8083",
"trace": "java.lang.NoClassDefFoundError: com/google/common/base/Ticker\n\tat io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.<init>(CachedSchemaRegistryClient.java:175)\n\tat io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.<init>(CachedSchemaRegistryClient.java:158)\n\tat io.confluent.kafka.schemaregistry.client.SchemaRegistryClientFactory.newClient(SchemaRegistryClientFactory.java:36)\n\tat io.confluent.connect.avro.AvroConverter.configure(AvroConverter.java:68)\n\tat org.apache.kafka.connect.runtime.isolation.Plugins.newConverter(Plugins.java:297)\n\tat org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:640)\n\tat org.apache.kafka.connect.runtime.Worker.startSinkTask(Worker.java:544)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:1703)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$getTaskStartingCallable$31(DistributedHerder.java:1753)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: java.lang.ClassNotFoundException: com.google.common.base.Ticker\n\tat java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476)\n\tat java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)\n\tat org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:103)\n\tat java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)\n\t... 13 more\n"
}
],
"type": "sink"
}
참고로 스키마 레지스토리 서버가 잘 붙나 확인 해본 결과 잘 붙습니다.
ubuntu@ip-172-31-0-29:~$ curl -v lg-schema01:8081
* Trying 172.31.46.33:8081...
* Connected to lg-schema01 (172.31.46.33) port 8081 (#0)
> GET / HTTP/1.1
> Host: lg-schema01:8081
> User-Agent: curl/7.81.0
> Accept: /
>
* Mark bundle as not supporting multiuse
< HTTP/1.1 200 OK
< Date: Mon, 13 Mar 2023 06:14:56 GMT
< Content-Type: application/vnd.schemaregistry.v1+json
< Vary: Accept-Encoding, User-Agent
< Content-Length: 2
<
* Connection #0 to host lg-schema01 left intact
{}ubuntu@ip-172-31-0-29:~$
현재 제가 구성한 환경 정보 입니다.
Apache Zookeeper 3.8.1 3개
Apache Kafka 2.13-3.3.2 3개
=> Source connect : Debezium mysql connect 2.1.2
=> Sink connect : Confluent JDBC Connector 10.6.3
=> Avro : Confluent Avro converter 7.3
Confluent Schema registry 7.3.0 1개
mysql DB 1 개 (Source 와 Sink 는 user 로 분리해서 데이터 넣는 방식으로 테스트 중입니다)
구글링 검색 결과 Guava 를 못찾아서 그런거 같다는데, 제가 설치한 환경의 디펜던시가 문제일까요?
답변 1
0
선생님, 위 에러 원인 찾다가
Kafka plug-in 설치된 곳을 가보니
/usr/local/kafka/connector-plugin/confluentinc-kafka-connect-avro-converter/lib/
경로안에 "guava-30.1.1-jre.jar" 이 파일이 없어서 제가 추가 했습니다.
스키마에는 잘 등록 되어서 아래와 같이 나왔습니다.
ubuntu@ip-172-31-46-33:~$ http http://localhost:8081/schemas
HTTP/1.1 200 OK
Content-Encoding: gzip
Content-Length: 195
Content-Type: application/vnd.schemaregistry.v1+json
Date: Mon, 13 Mar 2023 06:50:47 GMT
Vary: Accept-Encoding, User-Agent
[
{
"id": 2,
"schema": "{\"type\":\"record\",\"name\":\"Key\",\"namespace\":\"source_mysqldb.hesdp_mgr.TB_ME_TERMS_BAS_TEST\",\"fields\":[{\"name\":\"TERMS_VER_ID\",\"type\":\"string\"}],\"connect.name\":\"source_mysqldb.hesdp_mgr.TB_ME_TERMS_BAS_TEST.Key\"}",
"subject": "source_mysqldb.hesdp_mgr.TB_ME_TERMS_BAS_TEST-key",
"version": 1
}
]
source 쪽에 등록은 정상적으로 되었는데 sink 쪽 등록시 아래와 같이 에러가 납니다.
curl --location --request POST 'http://localhost:8083/connectors' \
--header 'Content-Type: application/json' \
--data-raw '{
"name": "mysql-sink-connector-test",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "source_mysqldb.hesdp_mgr.TB_ME_TERMS_BAS_TEST",
"connection.url": "jdbc:mysql://lg-mysql:3306/aurora2",
"connection.user": "sink",
"connection.password": "sink1234",
"table.name.format": "aurora2.TB_ME_TERMS_BAS_TEST",
"insert.mode": "upsert",
"pk.fields": "TERMS_VER_ID",
"pk.mode": "record_key",
"delete.enabled": "true",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://lg-schema01:8081",
"value.converter.schema.registry.url": "http://lg-schema01:8081"
}
}'
ubuntu@ip-172-31-0-29:/data/kafka-logs$ curl -X GET http://localhost:8083/connectors/mysql-sink-connector-test/status | jq '.'
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 2913 100 2913 0 0 176k 0 --:--:-- --:--:-- --:--:-- 284k
{
"name": "mysql-sink-connector-test",
"connector": {
"state": "RUNNING",
"worker_id": "172.31.10.77:8083"
},
"tasks": [
{
"id": 0,
"state": "FAILED",
"worker_id": "172.31.10.77:8083",
"trace": "org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:223)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:149)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:513)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:493)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:332)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic source_mysqldb.hesdp_mgr.TB_ME_TERMS_BAS_TEST to Avro: \n\tat io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:124)\n\tat org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:88)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$3(WorkerSinkTask.java:513)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:173)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:207)\n\t... 13 more\nCaused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!\n\tat io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.getByteBuffer(AbstractKafkaSchemaSerDe.java:244)\n\tat io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.<init>(AbstractKafkaAvroDeserializer.java:334)\n\tat io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:202)\n\tat io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:172)\n\tat io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:107)\n\t... 17 more\n"
}
],
"type": "sink"
}
질문1. 제가 Guava file 을 kafka connect avro converter 설치된 lib 에 추가 하는게 맞나요?
질문2. 바로 위에 에러 난건 Sink connect 등록시 무었을 누락 시켜서 일까요?
안녕하세요. 선생님
제가 수업에서 들은 카프카캣으로 내부 스키마 내용을 확인 해봤는데 AVRO로 변환이 하나도 안된거 같습니다.
ubuntu@ip-172-31-0-29:~$ kafkacat -b lg-kafka01:9092 -C -t source_mysqldb.hesdp_mgr.TB_ME_TERMS_BAS -J -u -q |jq '.'
{
"topic": "source_mysqldb.hesdp_mgr.TB_ME_TERMS_BAS",
"partition": 0,
"offset": 640,
"tstype": "create",
"ts": 1678689074221,
"broker": -1,
"key": "{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"string\",\"optional\":false,\"field\":\"TERMS_VER_ID\"}],\"optional\":false,\"name\":\"source_mysqldb.hesdp_mgr.TB_ME_TERMS_BAS.Key\"},\"payload\":{\"TERMS_VER_ID\":\"1\"}}",
"payload": "{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"string\",\"optional\":false,\"field\":\"TERMS_VER_ID\"},{\"type\":\"string\",\"optional\":false,\"field\":\"TERMS_TP_CODE\"},{\"type\":\"string\",\"optional\":true,\"field\":\"AGR_NOTI_FLAG\"},{\"type\":\"string\",\"optional\":true,\"field\":\"PROD_CODE\"},{\"type\":\"string\",\"optional\":true,\"field\":\"PLFM_CODE\"},{\"type\":\"string\",\"optional\":false,\"field\":\"CNTRY_CODE\"},{\"type\":\"string\",\"optional\":true,\"field\":\"TERMS_EXPLN\"},{\"type\":\"string\",\"optional\":true,\"field\":\"APPLY_STRT_YMD\"},{\"type\":\"string\",\"optional\":true,\"field\":\"STAT_CODE\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"GEN_USR_NO\"},{\"type\":\"int64\",\"optional\":true,\"name\":\"org.apache.kafka.connect.data.Timestamp\",\"version\":1,\"field\":\"CRT_DATE\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"LAST_CHG_USR_NO\"},{\"type\":\"int64\",\"optional\":true,\"name\":\"org.apache.kafka.connect.data.Timestamp\",\"version\":1,\"field\":\"LAST_CHG_DATE\"},{\"type\":\"string\",\"optional\":false,\"field\":\"USE_FLAG\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"APPRVR_NO\"},{\"type\":\"int64\",\"optional\":true,\"name\":\"org.apache.kafka.connect.data.Timestamp\",\"version\":1,\"field\":\"APPRV_DATE\"},{\"type\":\"string\",\"optional\":true,\"field\":\"APPRV_OPNN\"},{\"type\":\"string\",\"optional\":true,\"field\":\"APPRV_STAT_CODE\"},{\"type\":\"string\",\"optional\":true,\"field\":\"ENTP_WIDE_MBRSHP_FLAG\"},{\"type\":\"string\",\"optional\":false,\"field\":\"TERMS_MGT_TP_CODE\"}],\"optional\":false,\"name\":\"source_mysqldb.hesdp_mgr.TB_ME_TERMS_BAS.Value\"},\"payload\":{\"TERMS_VER_ID\":\"1\",\"TERMS_TP_CODE\":\"6\",\"AGR_NOTI_FLAG\":null,\"PROD_CODE\":\"3\",\"PLFM_CODE\":\"4\",\"CNTRY_CODE\":\"6\",\"TERMS_EXPLN\":\"7\",\"APPLY_STRT_YMD\":\"\",\"STAT_CODE\":null,\"GEN_USR_NO\":null,\"CRT_DATE\":null,\"LAST_CHG_USR_NO\":null,\"LAST_CHG_DATE\":null,\"USE_FLAG\":\"4\",\"APPRVR_NO\":null,\"APPRV_DATE\":null,\"APPRV_OPNN\":null,\"APPRV_STAT_CODE\":null,\"ENTP_WIDE_MBRSHP_FLAG\":null,\"TERMS_MGT_TP_CODE\":\"4\"}}"
}
원래는 위에 내용이 바이너리 형태로 되어있어야 했던거 같은데 순수 json 으로 보여집니다.
아마 제가 AVRO 설정을 잘못 한 느낌이 듭니다.
스키마에서 AVRO 로 안되어 있다보니, Source 쪽에서 바로 에러 나는거 같습니다.
왠지 처음부터 다시 해야 하는 느낌이 드네요.....
음, Source Connector에서 key와 value converter를 avro로 설정했는데, topic 메시지가 avro가 안될리가 없습니다만..
뭔가 지금 환경 구성이 잘못된건 아닌지요? Source Connector json 파일이 제대로 작성되고, 해당 json 파일로 connector가 잘 생성이 되었는지 확인이 필요할 것 같습니다.
혹시 지금 읽어들인 topic 메시지가 avro 메시지를 보내기 전에 json 메시지로 미리 만들어진 topic 메시지는 아닌지요?
뭔가 지금 작업 하시면서 살짝 꼬인 부분이 있는 것 같습니다. 기존 config 설정등의 작업 파일을 back 디렉토리에 옮겨놓으시고 기존 작업 파일들을 삭제하신 뒤에 차분히 하나씩 내용을 검토하면서 다시 작업을 해보시면 어떨까 싶습니다
먼저 이전에 질문 올리신 Source Connector는 이제 잘 동작하나요?
일단 그런걸로 가정하고,
뭔가 avro converter에 계속 문제가 있는 것 같군요. confluent hub 에서 다운로드 받은 io.confluent.connect.avro.AvroConverter 가 Guava 까지 필요한가요?
저도 apache kafka 에서 수동으로 Avro Converter를 이용해 본지가 넘 오래되서 기억이 가물가물한데 Guava 까지 필요했던건 아닌것 같습니다만...
오류만 보면 sink connector에서 avro 메시지를 deserialization하는데 문제가 발생하는 것 같습니다.
일단 Source Connector를 통해 topic에 메시지가 제대로 avro형태로 입력되었는지 먼저 확인해 보시지요. 강의를 참조하셔서 kafka-avro-console-consumer 명령어를 참조하셔서 해당 토픽에 메시지가 제대로 저장되었는지 확인해 보시고, 안되면 다시 글 부탁드립니다.