인프런 커뮤니티 질문&답변

strato721224님의 프로필 이미지
strato721224

작성한 질문수

카프카 완벽 가이드 - 커넥트(Connect) 편

Sink connect 등록시 에러가 납니다.

해결된 질문

작성

·

1.2K

·

수정됨

1

안녕하세요. 선생님

Confluent AVRO Install 하고 나서 sink connect 등록하고 상태를 확인 해보니 아래와 같이 에러가 발생 했습니다.


ubuntu@ip-172-31-0-29:~$ curl --location --request POST 'http://localhost:8083/connectors' \

--header 'Content-Type: application/json' \

--data-raw '{

"name": "mysql-sink-connector",

"config": {

"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",

"tasks.max": "1",

"topics": "source_mysqldb.hesdp_mgr.TB_ME_TERMS_BAS",

"connection.url": "jdbc:mysql://lg-mysql:3306/aurora2",

"connection.user": "sink",

"connection.password": "sink1234",

"table.name.format": "aurora2.TB_ME_TERMS_BAS",

"insert.mode": "upsert",

"pk.fields": "TERMS_VER_ID",

"pk.mode": "record_key",

"delete.enabled": "true",

"key.converter": "io.confluent.connect.avro.AvroConverter",

"value.converter": "io.confluent.connect.avro.AvroConverter",

"key.converter.schema.registry.url": "http://lg-schema01:8081",

"value.converter.schema.registry.url": "http://lg-schema01:8081"

}

}'

{"name":"mysql-sink-connector","config":{"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector","tasks.max":"1","topics":"source_mysqldb.hesdp_mgr.TB_ME_TERMS_BAS","connection.url":"jdbc:mysql://lg-mysql:3306/aurora2","connection.user":"sink","connection.password":"sink1234","table.name.format":"aurora2.TB_ME_TERMS_BAS","insert.mode":"upsert","pk.fields":"TERMS_VER_ID","pk.mode":"record_key","delete.enabled":"true","key.converter":"io.confluent.connect.avro.AvroConverter","value.converter":"io.confluent.connect.avro.AvroConverter","key.converter.schema.registry.url":"http://lg-schema01:8081","value.converter.schema.registry.url":"http://lg-schema01:8081","name":"mysql-sink-connector"},"tasks":[],"type":"sink"}

 


ubuntu@ip-172-31-0-29:~$ curl -X GET http://localhost:8083/connectors/mysql-sink-connector/status | jq '.'

% Total % Received % Xferd Average Speed Time Time Time Current

Dload Upload Total Spent Left Speed

100 1898 100 1898 0 0 31616 0 --:--:-- --:--:-- --:--:-- 32169

{

"name": "mysql-sink-connector",

"connector": {

"state": "RUNNING",

"worker_id": "172.31.13.238:8083"

},

"tasks": [

{

"id": 0,

"state": "FAILED",

"worker_id": "172.31.13.238:8083",

"trace": "java.lang.NoClassDefFoundError: com/google/common/base/Ticker\n\tat io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.<init>(CachedSchemaRegistryClient.java:175)\n\tat io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.<init>(CachedSchemaRegistryClient.java:158)\n\tat io.confluent.kafka.schemaregistry.client.SchemaRegistryClientFactory.newClient(SchemaRegistryClientFactory.java:36)\n\tat io.confluent.connect.avro.AvroConverter.configure(AvroConverter.java:68)\n\tat org.apache.kafka.connect.runtime.isolation.Plugins.newConverter(Plugins.java:297)\n\tat org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:640)\n\tat org.apache.kafka.connect.runtime.Worker.startSinkTask(Worker.java:544)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:1703)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$getTaskStartingCallable$31(DistributedHerder.java:1753)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: java.lang.ClassNotFoundException: com.google.common.base.Ticker\n\tat java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476)\n\tat java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)\n\tat org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:103)\n\tat java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)\n\t... 13 more\n"

}

],

"type": "sink"

}


 참고로 스키마 레지스토리 서버가 잘 붙나 확인 해본 결과 잘 붙습니다.

ubuntu@ip-172-31-0-29:~$ curl -v lg-schema01:8081

* Trying 172.31.46.33:8081...

* Connected to lg-schema01 (172.31.46.33) port 8081 (#0)

> GET / HTTP/1.1

> Host: lg-schema01:8081

> User-Agent: curl/7.81.0

> Accept: /

>

* Mark bundle as not supporting multiuse

< HTTP/1.1 200 OK

< Date: Mon, 13 Mar 2023 06:14:56 GMT

< Content-Type: application/vnd.schemaregistry.v1+json

< Vary: Accept-Encoding, User-Agent

< Content-Length: 2

<

* Connection #0 to host lg-schema01 left intact

{}ubuntu@ip-172-31-0-29:~$

 


현재 제가 구성한 환경 정보 입니다.

  • Apache Zookeeper 3.8.1 3개

  • Apache Kafka 2.13-3.3.2 3개

    => Source connect : Debezium mysql connect 2.1.2

    => Sink connect : Confluent JDBC Connector 10.6.3

    => Avro : Confluent Avro converter 7.3

  • Confluent Schema registry 7.3.0 1개

  • mysql DB 1 개 (Source 와 Sink 는 user 로 분리해서 데이터 넣는 방식으로 테스트 중입니다)

 

구글링 검색 결과 Guava 를 못찾아서 그런거 같다는데, 제가 설치한 환경의 디펜던시가 문제일까요?

 

 

 

 

 

 

답변 1

0

strato721224님의 프로필 이미지
strato721224
질문자

선생님, 위 에러 원인 찾다가

Kafka plug-in 설치된 곳을 가보니

/usr/local/kafka/connector-plugin/confluentinc-kafka-connect-avro-converter/lib/

경로안에 "guava-30.1.1-jre.jar" 이 파일이 없어서 제가 추가 했습니다.

스키마에는 잘 등록 되어서 아래와 같이 나왔습니다.

 

ubuntu@ip-172-31-46-33:~$ http http://localhost:8081/schemas

HTTP/1.1 200 OK

Content-Encoding: gzip

Content-Length: 195

Content-Type: application/vnd.schemaregistry.v1+json

Date: Mon, 13 Mar 2023 06:50:47 GMT

Vary: Accept-Encoding, User-Agent

[

{

"id": 2,

"schema": "{\"type\":\"record\",\"name\":\"Key\",\"namespace\":\"source_mysqldb.hesdp_mgr.TB_ME_TERMS_BAS_TEST\",\"fields\":[{\"name\":\"TERMS_VER_ID\",\"type\":\"string\"}],\"connect.name\":\"source_mysqldb.hesdp_mgr.TB_ME_TERMS_BAS_TEST.Key\"}",

"subject": "source_mysqldb.hesdp_mgr.TB_ME_TERMS_BAS_TEST-key",

"version": 1

}

]


 

source 쪽에 등록은 정상적으로 되었는데 sink 쪽 등록시 아래와 같이 에러가 납니다.

 

curl --location --request POST 'http://localhost:8083/connectors' \

--header 'Content-Type: application/json' \

--data-raw '{

"name": "mysql-sink-connector-test",

"config": {

"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",

"tasks.max": "1",

"topics": "source_mysqldb.hesdp_mgr.TB_ME_TERMS_BAS_TEST",

"connection.url": "jdbc:mysql://lg-mysql:3306/aurora2",

"connection.user": "sink",

"connection.password": "sink1234",

"table.name.format": "aurora2.TB_ME_TERMS_BAS_TEST",

"insert.mode": "upsert",

"pk.fields": "TERMS_VER_ID",

"pk.mode": "record_key",

"delete.enabled": "true",

"key.converter": "io.confluent.connect.avro.AvroConverter",

"value.converter": "io.confluent.connect.avro.AvroConverter",

"key.converter.schema.registry.url": "http://lg-schema01:8081",

"value.converter.schema.registry.url": "http://lg-schema01:8081"

}

}'

 

 

 

ubuntu@ip-172-31-0-29:/data/kafka-logs$ curl -X GET http://localhost:8083/connectors/mysql-sink-connector-test/status | jq '.'

% Total % Received % Xferd Average Speed Time Time Time Current

Dload Upload Total Spent Left Speed

100 2913 100 2913 0 0 176k 0 --:--:-- --:--:-- --:--:-- 284k

{

"name": "mysql-sink-connector-test",

"connector": {

"state": "RUNNING",

"worker_id": "172.31.10.77:8083"

},

"tasks": [

{

"id": 0,

"state": "FAILED",

"worker_id": "172.31.10.77:8083",

"trace": "org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:223)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:149)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:513)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:493)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:332)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic source_mysqldb.hesdp_mgr.TB_ME_TERMS_BAS_TEST to Avro: \n\tat io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:124)\n\tat org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:88)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$3(WorkerSinkTask.java:513)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:173)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:207)\n\t... 13 more\nCaused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!\n\tat io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.getByteBuffer(AbstractKafkaSchemaSerDe.java:244)\n\tat io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.<init>(AbstractKafkaAvroDeserializer.java:334)\n\tat io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:202)\n\tat io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:172)\n\tat io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:107)\n\t... 17 more\n"

}

],

"type": "sink"

}

 


질문1. 제가 Guava file 을 kafka connect avro converter 설치된 lib 에 추가 하는게 맞나요?

질문2. 바로 위에 에러 난건 Sink connect 등록시 무었을 누락 시켜서 일까요?

 

권 철민님의 프로필 이미지
권 철민
지식공유자

먼저 이전에 질문 올리신 Source Connector는 이제 잘 동작하나요?

일단 그런걸로 가정하고,

뭔가 avro converter에 계속 문제가 있는 것 같군요. confluent hub 에서 다운로드 받은 io.confluent.connect.avro.AvroConverter 가 Guava 까지 필요한가요?

저도 apache kafka 에서 수동으로 Avro Converter를 이용해 본지가 넘 오래되서 기억이 가물가물한데 Guava 까지 필요했던건 아닌것 같습니다만...

오류만 보면 sink connector에서 avro 메시지를 deserialization하는데 문제가 발생하는 것 같습니다.

일단 Source Connector를 통해 topic에 메시지가 제대로 avro형태로 입력되었는지 먼저 확인해 보시지요. 강의를 참조하셔서 kafka-avro-console-consumer 명령어를 참조하셔서 해당 토픽에 메시지가 제대로 저장되었는지 확인해 보시고, 안되면 다시 글 부탁드립니다.

strato721224님의 프로필 이미지
strato721224
질문자

안녕하세요. 선생님

제가 수업에서 들은 카프카캣으로 내부 스키마 내용을 확인 해봤는데 AVRO로 변환이 하나도 안된거 같습니다.

 

ubuntu@ip-172-31-0-29:~$ kafkacat -b lg-kafka01:9092 -C -t source_mysqldb.hesdp_mgr.TB_ME_TERMS_BAS -J -u -q |jq '.'

{

"topic": "source_mysqldb.hesdp_mgr.TB_ME_TERMS_BAS",

"partition": 0,

"offset": 640,

"tstype": "create",

"ts": 1678689074221,

"broker": -1,

"key": "{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"string\",\"optional\":false,\"field\":\"TERMS_VER_ID\"}],\"optional\":false,\"name\":\"source_mysqldb.hesdp_mgr.TB_ME_TERMS_BAS.Key\"},\"payload\":{\"TERMS_VER_ID\":\"1\"}}",

"payload": "{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"string\",\"optional\":false,\"field\":\"TERMS_VER_ID\"},{\"type\":\"string\",\"optional\":false,\"field\":\"TERMS_TP_CODE\"},{\"type\":\"string\",\"optional\":true,\"field\":\"AGR_NOTI_FLAG\"},{\"type\":\"string\",\"optional\":true,\"field\":\"PROD_CODE\"},{\"type\":\"string\",\"optional\":true,\"field\":\"PLFM_CODE\"},{\"type\":\"string\",\"optional\":false,\"field\":\"CNTRY_CODE\"},{\"type\":\"string\",\"optional\":true,\"field\":\"TERMS_EXPLN\"},{\"type\":\"string\",\"optional\":true,\"field\":\"APPLY_STRT_YMD\"},{\"type\":\"string\",\"optional\":true,\"field\":\"STAT_CODE\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"GEN_USR_NO\"},{\"type\":\"int64\",\"optional\":true,\"name\":\"org.apache.kafka.connect.data.Timestamp\",\"version\":1,\"field\":\"CRT_DATE\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"LAST_CHG_USR_NO\"},{\"type\":\"int64\",\"optional\":true,\"name\":\"org.apache.kafka.connect.data.Timestamp\",\"version\":1,\"field\":\"LAST_CHG_DATE\"},{\"type\":\"string\",\"optional\":false,\"field\":\"USE_FLAG\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"APPRVR_NO\"},{\"type\":\"int64\",\"optional\":true,\"name\":\"org.apache.kafka.connect.data.Timestamp\",\"version\":1,\"field\":\"APPRV_DATE\"},{\"type\":\"string\",\"optional\":true,\"field\":\"APPRV_OPNN\"},{\"type\":\"string\",\"optional\":true,\"field\":\"APPRV_STAT_CODE\"},{\"type\":\"string\",\"optional\":true,\"field\":\"ENTP_WIDE_MBRSHP_FLAG\"},{\"type\":\"string\",\"optional\":false,\"field\":\"TERMS_MGT_TP_CODE\"}],\"optional\":false,\"name\":\"source_mysqldb.hesdp_mgr.TB_ME_TERMS_BAS.Value\"},\"payload\":{\"TERMS_VER_ID\":\"1\",\"TERMS_TP_CODE\":\"6\",\"AGR_NOTI_FLAG\":null,\"PROD_CODE\":\"3\",\"PLFM_CODE\":\"4\",\"CNTRY_CODE\":\"6\",\"TERMS_EXPLN\":\"7\",\"APPLY_STRT_YMD\":\"\",\"STAT_CODE\":null,\"GEN_USR_NO\":null,\"CRT_DATE\":null,\"LAST_CHG_USR_NO\":null,\"LAST_CHG_DATE\":null,\"USE_FLAG\":\"4\",\"APPRVR_NO\":null,\"APPRV_DATE\":null,\"APPRV_OPNN\":null,\"APPRV_STAT_CODE\":null,\"ENTP_WIDE_MBRSHP_FLAG\":null,\"TERMS_MGT_TP_CODE\":\"4\"}}"

}

 

원래는 위에 내용이 바이너리 형태로 되어있어야 했던거 같은데 순수 json 으로 보여집니다.

아마 제가 AVRO 설정을 잘못 한 느낌이 듭니다.

스키마에서 AVRO 로 안되어 있다보니, Source 쪽에서 바로 에러 나는거 같습니다.

왠지 처음부터 다시 해야 하는 느낌이 드네요.....

 

권 철민님의 프로필 이미지
권 철민
지식공유자

음, Source Connector에서 key와 value converter를 avro로 설정했는데, topic 메시지가 avro가 안될리가 없습니다만..

뭔가 지금 환경 구성이 잘못된건 아닌지요? Source Connector json 파일이 제대로 작성되고, 해당 json 파일로 connector가 잘 생성이 되었는지 확인이 필요할 것 같습니다.

혹시 지금 읽어들인 topic 메시지가 avro 메시지를 보내기 전에 json 메시지로 미리 만들어진 topic 메시지는 아닌지요?

뭔가 지금 작업 하시면서 살짝 꼬인 부분이 있는 것 같습니다. 기존 config 설정등의 작업 파일을 back 디렉토리에 옮겨놓으시고 기존 작업 파일들을 삭제하신 뒤에 차분히 하나씩 내용을 검토하면서 다시 작업을 해보시면 어떨까 싶습니다

strato721224님의 프로필 이미지
strato721224

작성한 질문수

질문하기