작성
·
483
1
{
"name": "my-sink-connect",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:mariadb://localhost:3306/mydb",
"connection.user": "root",
"connection.password": "test1234",
"mode": "incrementing",
"incrementing.column.name": "id",
"auto.create": "true",
"auto.evolve": "true",
"delete.enabled": "false",
"tasks.max": "1",
"topic": "my_topic_users",
"table.whitelist": "mydb.users"
}
}
confluent-community-connect-7.5.0-zOS
confluentinc-kafka-connect-jdbc-10.7.4
이렇게 사용중이고
모드를 빼면 에러가 발생하네요
커넨터 로그엔 에러가 없어요
[2023-12-14 00:57:25,395] INFO SourceConnectorConfig values:
config.action.reload = restart
connector.class = io.confluent.connect.jdbc.JdbcSourceConnector
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
exactly.once.support = requested
header.converter = null
key.converter = null
name = my-sink-connect
offsets.storage.topic = null
predicates = []
tasks.max = 1
topic.creation.groups = []
transaction.boundary = poll
transaction.boundary.interval.ms = null
transforms = []
value.converter = null
(org.apache.kafka.connect.runtime.SourceConnectorConfig:369)
[2023-12-14 00:57:25,396] INFO [my-sink-connect|task-0] Validating JDBC URL. (io.confluent.connect.jdbc.dialect.DatabaseDialects:171)
[2023-12-14 00:57:25,396] INFO [my-sink-connect|task-0] Validated JDBC URL. (io.confluent.connect.jdbc.dialect.DatabaseDialects:174)
[2023-12-14 00:57:25,396] INFO [my-sink-connect|task-0] Using JDBC dialect MySql (io.confluent.connect.jdbc.source.JdbcSourceTask:138)
[2023-12-14 00:57:25,396] INFO EnrichedConnectorConfig values:
config.action.reload = restart
connector.class = io.confluent.connect.jdbc.JdbcSourceConnector
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
exactly.once.support = requested
header.converter = null
key.converter = null
name = my-sink-connect
offsets.storage.topic = null
predicates = []
tasks.max = 1
topic.creation.groups = []
transaction.boundary = poll
transaction.boundary.interval.ms = null
transforms = []
value.converter = null
(org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:369)
[2023-12-14 00:57:25,397] INFO [my-sink-connect|task-0] [Producer clientId=connector-producer-my-sink-connect-0] Cluster ID: 61ETmEcJQASp3yeJGdTmPw (org.apache.kafka.clients.Metadata:287)
[2023-12-14 00:57:25,413] INFO [my-sink-connect|task-0] Found offset {{table=users}=null, {protocol=1, table=mydb.users}={incrementing=17}} for partition {protocol=1, table=mydb.users} (io.confluent.connect.jdbc.source.JdbcSourceTask:234)
[2023-12-14 00:57:25,414] INFO [my-sink-connect|task-0] Started JDBC source task (io.confluent.connect.jdbc.source.JdbcSourceTask:307)
[2023-12-14 00:57:25,414] INFO [my-sink-connect|task-0] WorkerSourceTask{id=my-sink-connect-0} Source task finished initialization and start (org.apache.kafka.connect.runtime.AbstractWorkerSourceTask:275)
[2023-12-14 00:57:25,414] INFO [my-sink-connect|task-0] Begin using SQL query: SELECT * FROM mydb
.`users` WHERE mydb
.`users`.`id` > ? ORDER BY mydb
.`users`.`id` ASC (io.confluent.connect.jdbc.source.TableQuerier:182)
답변 1
0
안녕하세요, 인프런 AI 인턴입니다.
kim미소 파파님, Kafka Connect를 사용하여 sink 설정 시 테이블 생성이 안 되는 문제를 겪고 계신 것 같군요. 설정해주신 JSON 파일을 통해 볼 때, 마리아DB를 사용하고 있으며, auto.create
는 true
로 설정되어 자동으로 테이블을 생성하도록 설정되어 있습니다.
문제 해결을 위해서 확인할 점은 다음과 같습니다:
1. Kafka Connect JDBC Connector의 권한 확인: 사용하는 데이터베이스 사용자(root
)가 테이블 생성 권한을 가지고 있는지 확인해보세요.
2. 테이블 이름과 스키마: table.whitelist
에 명시된 테이블 이름과 스키마가 데이터베이스에 존재하는지 검토해 보세요.
3. Incrementing 컬럼: 설정하신 incrementing.column.name
이 실제 테이블에 존재하며 올바른지 확인해보세요.
4. Kafka Connect 버전과 호환성: 사용하고 계신 Kafka Connect의 버전과 JDBC Connector의 버전이 서로 호환되는지 확인해보세요.
5. 에러 로그: 에러 로그가 없다고 하셨으나, 다시 한번 전체 로그를 검토하여 문제가 될 만한 부분이 없는지 확인해보세요.
참고로 Kafka 메시징 지원과 관련된 Spring Boot의 설정 예시는 다음과 같이 application.properties
나 application.yml
에 선언할 수 있습니다:
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup
또는
spring:
kafka:
bootstrap-servers: "localhost:9092"
consumer:
group-id: "myGroup"
따라서 이 정보들을 참고하여 설정을 재점검해보시면 좋겠습니다. 만약 여전히 문제를 해결하지 못하셨다면, 추가적으로 Kafka Connect의 로그나 특정 에러 메시지를 제공해주시면 더 구체적인 해결 방안을 찾는 데 도움이 될 수 있겠습니다.