인프런 커뮤니티 질문&답변

kim미소 파파님의 프로필 이미지
kim미소 파파

작성한 질문수

Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA)

Kafka Sink Connect 사용

kafka sink 설정 시 테이블 생성이 안됩니다

작성

·

483

1

{
 "name": "my-sink-connect",
"config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "connection.url": "jdbc:mariadb://localhost:3306/mydb",
    "connection.user": "root",
    "connection.password": "test1234",
     "mode": "incrementing",
    "incrementing.column.name": "id",
    "auto.create": "true",
    "auto.evolve": "true",
    "delete.enabled": "false",
    "tasks.max": "1",
    "topic": "my_topic_users",  
    "table.whitelist": "mydb.users"  
  }
}

confluent-community-connect-7.5.0-zOS

 

confluentinc-kafka-connect-jdbc-10.7.4

이렇게 사용중이고

 

모드를 빼면 에러가 발생하네요

커넨터 로그엔 에러가 없어요

 

[2023-12-14 00:57:25,395] INFO SourceConnectorConfig values:

config.action.reload = restart

connector.class = io.confluent.connect.jdbc.JdbcSourceConnector

errors.log.enable = false

errors.log.include.messages = false

errors.retry.delay.max.ms = 60000

errors.retry.timeout = 0

errors.tolerance = none

exactly.once.support = requested

header.converter = null

key.converter = null

name = my-sink-connect

offsets.storage.topic = null

predicates = []

tasks.max = 1

topic.creation.groups = []

transaction.boundary = poll

transaction.boundary.interval.ms = null

transforms = []

value.converter = null

(org.apache.kafka.connect.runtime.SourceConnectorConfig:369)

[2023-12-14 00:57:25,396] INFO [my-sink-connect|task-0] Validating JDBC URL. (io.confluent.connect.jdbc.dialect.DatabaseDialects:171)

[2023-12-14 00:57:25,396] INFO [my-sink-connect|task-0] Validated JDBC URL. (io.confluent.connect.jdbc.dialect.DatabaseDialects:174)

[2023-12-14 00:57:25,396] INFO [my-sink-connect|task-0] Using JDBC dialect MySql (io.confluent.connect.jdbc.source.JdbcSourceTask:138)

[2023-12-14 00:57:25,396] INFO EnrichedConnectorConfig values:

config.action.reload = restart

connector.class = io.confluent.connect.jdbc.JdbcSourceConnector

errors.log.enable = false

errors.log.include.messages = false

errors.retry.delay.max.ms = 60000

errors.retry.timeout = 0

errors.tolerance = none

exactly.once.support = requested

header.converter = null

key.converter = null

name = my-sink-connect

offsets.storage.topic = null

predicates = []

tasks.max = 1

topic.creation.groups = []

transaction.boundary = poll

transaction.boundary.interval.ms = null

transforms = []

value.converter = null

(org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:369)

[2023-12-14 00:57:25,397] INFO [my-sink-connect|task-0] [Producer clientId=connector-producer-my-sink-connect-0] Cluster ID: 61ETmEcJQASp3yeJGdTmPw (org.apache.kafka.clients.Metadata:287)

[2023-12-14 00:57:25,413] INFO [my-sink-connect|task-0] Found offset {{table=users}=null, {protocol=1, table=mydb.users}={incrementing=17}} for partition {protocol=1, table=mydb.users} (io.confluent.connect.jdbc.source.JdbcSourceTask:234)

[2023-12-14 00:57:25,414] INFO [my-sink-connect|task-0] Started JDBC source task (io.confluent.connect.jdbc.source.JdbcSourceTask:307)

[2023-12-14 00:57:25,414] INFO [my-sink-connect|task-0] WorkerSourceTask{id=my-sink-connect-0} Source task finished initialization and start (org.apache.kafka.connect.runtime.AbstractWorkerSourceTask:275)

[2023-12-14 00:57:25,414] INFO [my-sink-connect|task-0] Begin using SQL query: SELECT * FROM mydb.`users` WHERE mydb.`users`.`id` > ? ORDER BY mydb.`users`.`id` ASC (io.confluent.connect.jdbc.source.TableQuerier:182)

답변 1

0

안녕하세요, 인프런 AI 인턴입니다.
kim미소 파파님, Kafka Connect를 사용하여 sink 설정 시 테이블 생성이 안 되는 문제를 겪고 계신 것 같군요. 설정해주신 JSON 파일을 통해 볼 때, 마리아DB를 사용하고 있으며, auto.createtrue로 설정되어 자동으로 테이블을 생성하도록 설정되어 있습니다.

문제 해결을 위해서 확인할 점은 다음과 같습니다:
1. Kafka Connect JDBC Connector의 권한 확인: 사용하는 데이터베이스 사용자(root)가 테이블 생성 권한을 가지고 있는지 확인해보세요.
2. 테이블 이름과 스키마: table.whitelist에 명시된 테이블 이름과 스키마가 데이터베이스에 존재하는지 검토해 보세요.
3. Incrementing 컬럼: 설정하신 incrementing.column.name이 실제 테이블에 존재하며 올바른지 확인해보세요.
4. Kafka Connect 버전과 호환성: 사용하고 계신 Kafka Connect의 버전과 JDBC Connector의 버전이 서로 호환되는지 확인해보세요.
5. 에러 로그: 에러 로그가 없다고 하셨으나, 다시 한번 전체 로그를 검토하여 문제가 될 만한 부분이 없는지 확인해보세요.

참고로 Kafka 메시징 지원과 관련된 Spring Boot의 설정 예시는 다음과 같이 application.propertiesapplication.yml에 선언할 수 있습니다:

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup

또는

spring:
  kafka:
    bootstrap-servers: "localhost:9092"
    consumer:
      group-id: "myGroup"

Apache Kafka 지원 문서

따라서 이 정보들을 참고하여 설정을 재점검해보시면 좋겠습니다. 만약 여전히 문제를 해결하지 못하셨다면, 추가적으로 Kafka Connect의 로그나 특정 에러 메시지를 제공해주시면 더 구체적인 해결 방안을 찾는 데 도움이 될 수 있겠습니다.

kim미소 파파님의 프로필 이미지
kim미소 파파

작성한 질문수

질문하기