작성
·
2.2K
2
Postman 으로 데이터값
{
"name": "my-source-connect",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:mariadb://localhost:3306/mydb",
"connection.user": "root",
"connection.password": "1234",
"mode": "incrementing",
"incrementing.column.name": "id",
"table.whitelist": "users",
"topic.prefix": "my_topic_",
"tasks.max": "1"
}
}
을 Post 방식으로 보냈고 (h2 console 창에서 연결 확인했습니다 ) 다시 Get 방식으로 보냈을때 my-source-connect 확인했습니다. 그 후 connect 콘솔 창에 뜬 로그 입니다.
[2022-08-08 23:05:06,233] INFO JdbcSourceConnectorConfig values:
batch.max.rows = 100
catalog.pattern = null
connection.attempts = 3
connection.backoff.ms = 10000
connection.password = [hidden]
connection.url = jdbc:mariadb://localhost:3306/mydb
connection.user = root
db.timezone = UTC
dialect.name =
incrementing.column.name = id
mode = incrementing
numeric.mapping = null
numeric.precision.mapping = false
poll.interval.ms = 5000
query =
query.retry.attempts = -1
query.suffix =
quote.sql.identifiers = ALWAYS
schema.pattern = null
table.blacklist = []
table.monitoring.startup.polling.limit.ms = 10000
table.poll.interval.ms = 60000
table.types = [TABLE]
table.whitelist = [users]
timestamp.column.name = []
timestamp.delay.interval.ms = 0
timestamp.granularity = connect_logical
timestamp.initial = null
topic.prefix = my_topic_
transaction.isolation.mode = DEFAULT
validate.non.null = true
(io.confluent.connect.jdbc.source.JdbcSourceConnectorConfig:361)
[2022-08-08 23:05:06,241] INFO AbstractConfig values:
(org.apache.kafka.common.config.AbstractConfig:361)
[2022-08-08 23:05:06,270] INFO [Worker clientId=connect-1, groupId=connect-cluster] Connector my-source-connect config updated (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1534)
[2022-08-08 23:05:06,275] INFO [Worker clientId=connect-1, groupId=connect-cluster] Rebalance started (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:225)
[2022-08-08 23:05:06,275] INFO [Worker clientId=connect-1, groupId=connect-cluster] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:540)
[2022-08-08 23:05:06,280] INFO [Worker clientId=connect-1, groupId=connect-cluster] Successfully joined group with generation Generation{generationId=48, memberId='connect-1-74e3b810-49fe-4cd5-90cf-0c9408ba73ab', protocol='sessioned'} (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:596)
[2022-08-08 23:05:06,297] INFO [Worker clientId=connect-1, groupId=connect-cluster] Successfully synced group in generation Generation{generationId=48, memberId='connect-1-74e3b810-49fe-4cd5-90cf-0c9408ba73ab', protocol='sessioned'} (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:756)
[2022-08-08 23:05:06,298] INFO [Worker clientId=connect-1, groupId=connect-cluster] Joined group at generation 48 with protocol version 2 and got assignment: Assignment{error=0, leader='connect-1-74e3b810-49fe-4cd5-90cf-0c9408ba73ab', leaderUrl='http://127.0.0.1:8083/', offset=44, connectorIds=[my-source-connect], taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1689)
[2022-08-08 23:05:06,303] INFO [Worker clientId=connect-1, groupId=connect-cluster] Starting connectors and tasks using config offset 44 (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1216)
[2022-08-08 23:05:06,308] INFO [Worker clientId=connect-1, groupId=connect-cluster] Starting connector my-source-connect (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1299)
[2022-08-08 23:05:06,313] INFO Creating connector my-source-connect of type io.confluent.connect.jdbc.JdbcSourceConnector (org.apache.kafka.connect.runtime.Worker:274)
[2022-08-08 23:05:06,316] INFO SourceConnectorConfig values:
config.action.reload = restart
connector.class = io.confluent.connect.jdbc.JdbcSourceConnector
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = null
name = my-source-connect
predicates = []
tasks.max = 1
topic.creation.groups = []
transforms = []
value.converter = null
(org.apache.kafka.connect.runtime.SourceConnectorConfig:361)
[2022-08-08 23:05:06,317] INFO EnrichedConnectorConfig values:
config.action.reload = restart
connector.class = io.confluent.connect.jdbc.JdbcSourceConnector
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = null
name = my-source-connect
predicates = []
tasks.max = 1
topic.creation.groups = []
transforms = []
value.converter = null
(org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:361)
[2022-08-08 23:05:06,339] INFO Instantiated connector my-source-connect with version 10.5.1 of type class io.confluent.connect.jdbc.JdbcSourceConnector (org.apache.kafka.connect.runtime.Worker:284)
[2022-08-08 23:05:06,339] INFO Finished creating connector my-source-connect (org.apache.kafka.connect.runtime.Worker:310)
[2022-08-08 23:05:06,341] INFO Starting JDBC Source Connector (io.confluent.connect.jdbc.JdbcSourceConnector:71)
[2022-08-08 23:05:06,344] INFO [Worker clientId=connect-1, groupId=connect-cluster] Finished starting connectors and tasks (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1244)
[2022-08-08 23:05:06,358] INFO JdbcSourceConnectorConfig values:
batch.max.rows = 100
catalog.pattern = null
connection.attempts = 3
connection.backoff.ms = 10000
connection.password = [hidden]
connection.url = jdbc:mariadb://localhost:3306/mydb
connection.user = root
db.timezone = UTC
dialect.name =
incrementing.column.name = id
mode = incrementing
numeric.mapping = null
numeric.precision.mapping = false
poll.interval.ms = 5000
query =
query.retry.attempts = -1
query.suffix =
quote.sql.identifiers = ALWAYS
schema.pattern = null
table.blacklist = []
table.monitoring.startup.polling.limit.ms = 10000
table.poll.interval.ms = 60000
table.types = [TABLE]
table.whitelist = [users]
timestamp.column.name = []
timestamp.delay.interval.ms = 0
timestamp.granularity = connect_logical
timestamp.initial = null
topic.prefix = my_topic_
transaction.isolation.mode = DEFAULT
validate.non.null = true
(io.confluent.connect.jdbc.source.JdbcSourceConnectorConfig:361)
[2022-08-08 23:05:06,369] INFO Attempting to open connection #1 to MySql (io.confluent.connect.jdbc.util.CachedConnectionProvider:79)
[2022-08-08 23:05:06,446] INFO Starting thread to monitor tables. (io.confluent.connect.jdbc.source.TableMonitorThread:82)
[2022-08-08 23:05:06,494] INFO SourceConnectorConfig values:
config.action.reload = restart
connector.class = io.confluent.connect.jdbc.JdbcSourceConnector
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = null
name = my-source-connect
predicates = []
tasks.max = 1
topic.creation.groups = []
transforms = []
value.converter = null
(org.apache.kafka.connect.runtime.SourceConnectorConfig:361)
[2022-08-08 23:05:06,498] INFO EnrichedConnectorConfig values:
config.action.reload = restart
connector.class = io.confluent.connect.jdbc.JdbcSourceConnector
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = null
name = my-source-connect
predicates = []
tasks.max = 1
topic.creation.groups = []
transforms = []
value.converter = null
(org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:361)
[2022-08-08 23:05:06,530] ERROR Encountered an unrecoverable error while reading tables from the database (io.confluent.connect.jdbc.source.TableMonitorThread:224)
org.apache.kafka.connect.errors.ConnectException: The connector uses the unqualified table name as the topic name and has detected duplicate unqualified table names. This could lead to mixed data types in the topic and downstream processing errors. To prevent such processing errors, the JDBC Source connector fails to start when it detects duplicate table name configurations. Update the connector's 'table.whitelist' config to include exactly one table in each of the tables listed below.
[["mydb"."users", "performance_schema"."users"]]
at io.confluent.connect.jdbc.source.TableMonitorThread.tables(TableMonitorThread.java:152)
at io.confluent.connect.jdbc.JdbcSourceConnector.taskConfigs(JdbcSourceConnector.java:164)
at org.apache.kafka.connect.runtime.Worker.connectorTaskConfigs(Worker.java:359)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnector(DistributedHerder.java:1428)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnectorTasksWithRetry(DistributedHerder.java:1366)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1000(DistributedHerder.java:128)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$12.call(DistributedHerder.java:1318)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$12.call(DistributedHerder.java:1312)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:371)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:295)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
[2022-08-08 23:05:06,538] ERROR WorkerConnector{id=my-source-connect} Connector raised an error (org.apache.kafka.connect.runtime.WorkerConnector:506)
org.apache.kafka.connect.errors.ConnectException: Encountered an unrecoverable error while reading tables from the database
at io.confluent.connect.jdbc.source.TableMonitorThread.fail(TableMonitorThread.java:226)
at io.confluent.connect.jdbc.source.TableMonitorThread.tables(TableMonitorThread.java:153)
at io.confluent.connect.jdbc.JdbcSourceConnector.taskConfigs(JdbcSourceConnector.java:164)
at org.apache.kafka.connect.runtime.Worker.connectorTaskConfigs(Worker.java:359)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnector(DistributedHerder.java:1428)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnectorTasksWithRetry(DistributedHerder.java:1366)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1000(DistributedHerder.java:128)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$12.call(DistributedHerder.java:1318)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$12.call(DistributedHerder.java:1312)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:371)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:295)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.connect.errors.ConnectException: The connector uses the unqualified table name as the topic name and has detected duplicate unqualified table names. This could lead to mixed data types in the topic and downstream processing errors. To prevent such processing errors, the JDBC Source connector fails to start when it detects duplicate table name configurations. Update the connector's 'table.whitelist' config to include exactly one table in each of the tables listed below.
[["mydb"."users", "performance_schema"."users"]]
at io.confluent.connect.jdbc.source.TableMonitorThread.tables(TableMonitorThread.java:152)
... 14 more
[2022-08-08 23:05:06,542] ERROR [Worker clientId=connect-1, groupId=connect-cluster] Failed to reconfigure connector's tasks, retrying after backoff: (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1377)
org.apache.kafka.connect.errors.ConnectException: Encountered an unrecoverable error while reading tables from the database
at io.confluent.connect.jdbc.source.TableMonitorThread.fail(TableMonitorThread.java:226)
at io.confluent.connect.jdbc.source.TableMonitorThread.tables(TableMonitorThread.java:153)
at io.confluent.connect.jdbc.JdbcSourceConnector.taskConfigs(JdbcSourceConnector.java:164)
at org.apache.kafka.connect.runtime.Worker.connectorTaskConfigs(Worker.java:359)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnector(DistributedHerder.java:1428)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnectorTasksWithRetry(DistributedHerder.java:1366)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1000(DistributedHerder.java:128)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$12.call(DistributedHerder.java:1318)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$12.call(DistributedHerder.java:1312)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:371)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:295)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.connect.errors.ConnectException: The connector uses the unqualified table name as the topic name and has detected duplicate unqualified table names. This could lead to mixed data types in the topic and downstream processing errors. To prevent such processing errors, the JDBC Source connector fails to start when it detects duplicate table name configurations. Update the connector's 'table.whitelist' config to include exactly one table in each of the tables listed below.
[["mydb"."users", "performance_schema"."users"]]
at io.confluent.connect.jdbc.source.TableMonitorThread.tables(TableMonitorThread.java:152)
... 14 more
[2022-08-08 23:05:06,546] INFO [Worker clientId=connect-1, groupId=connect-cluster] Skipping reconfiguration of connector my-source-connect since it is not running (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1415)
[2022-08-08 23:05:06,795] INFO [Worker clientId=connect-1, groupId=connect-cluster] Skipping reconfiguration of connector my-source-connect since it is not running (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1415)
계속 구글링하고 찾아봤는데 뭐가 잘못되었는지 잘모르겠습니다 ㅠ
답변 3
7
저도 같은 예외를 만났는데 확인해보니
Caused by: org.apache.kafka.connect.errors.ConnectException:
The connector uses the unqualified table name as the topic name and has detected duplicate
unqualified table names. This could lead to mixed data types in the topic and downstream
processing errors. To prevent such processing errors, the JDBC Source connector fails to
start when it detects duplicate table name configurations. Update the connector's
'table.whitelist'
config to include exactly one table in each of the tables listed below.
[["linkocean"."users", "linkoceantest"."users", "mydb"."users"]]
at io.confluent.connect.jdbc.source.TableMonitorThread.tables(TableMonitorThread.java:152)
... 14 more
이런 로그기 찍혀 보니 다른 schema 라 할지라도 현재 연결된 데이터베이스에 같은 이름의 table 이 있으면 테이블을 찾지 못하는 예외가 발생하는 것 같습니다.
dwa08 님은 테이블 이름을 변경하는 방식으로 해결하셨는데
커넥션을 생성하는 api 요청시 table.whitelist 에 점을 찍어 shema 를 명시해주는 방식으로도 해결할 수 있네요
{
"name" : "my-source-connect",
"config" : {
"connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url":"jdbc:mysql://localhost:3306/mydb",
"connection.user":"root",
"connection.password":"root1234",
"mode": "incrementing",
"incrementing.column.name" : "id",
"table.whitelist":"mydb.users",
"topic.prefix" : "my_topic_",
"tasks.max" : "1"
}
}
0
주신 로그 하단에 보시면 다음과 같이 나온 부분을 볼 수 있습니다.
config to include exactly one table in each of the tables listed below.
[["mydb"."users", "performance_schema"."users"]]
at io.confluent.connect.jdbc.source.TableMonitorThread.tables(TableMonitorThread.java:152)
... 14 more
즉 users 테이블을 명시적으로 적어라 라는 내용입니다.
이 말은 이미 DBMS 안에 users 라는 테이블이 이미 존재하기 때문입니다.
https://docs.actian.com/psql/psqlv13/index.html#page/sqlref%2Fsqlkword.htm%23ww78995
위 문서는 관계형 DBMS 에 존재하는 예약어들을 나열해둔 사이트인데,
보시면 USERS 는 이미 사용중인 예약어임을 알 수 있습니다.
그래서 보통 USERS 라는 테이블 명 대신에 다른 예약어를 쓰기도 하고요.
에러로그에서 보면
[["mydb"."users", "performance_schema"."users"]]
performance_schema 라는 데이터베이스에
users 라는 테이블이 존재함을 확인할 수 있습니다.
따라서 다른 분이 말씀해주신 것처럼
mydb.users 라고 명시적으로 mydb 데이터베이스에 존재하는 users 를 명시하기
이미 강의대로 따라하셨다면
PUT http://localhost:8083/connectors/my-source-connect/config
위 url과 아래의 내용을 body 로 남드셔서 보내시면 설정 수정됩니다!
{
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:mariadb://localhost:3306/mydb", 이 부분도 지금 버전에서는 mariadb 라고 해야 인식하더라구요 ㅠ
"connection.user": "root",
"connection.password": "test1357",
"mode": "incrementing",
"incrementing.column.name": "id",
"table.whitelist": "mydb.users",
"topic.prefix": "my_topic_",
"tasks.max": "1"
}
mydb 에서 users 라는 테이블 이름을 수정하기
밑에 분은 users1 로 수정하셔서 해결하셨습니다 :)
둘 중 아무거나 선택하셔도 강의 진행하는 데는 무리없으실거에요!
저처럼 고통받지 마시길 ㅠ
0
안녕하세요, 이도원입니다.
먼저 해결을 하셨다니 의미가 없을수도 있겠지만, 올려주신 로그의 하단을 확인해 보니, mydb에 users 테이블을 사용하는 부분에서의 오류인거 같습니다. 다음에도 동일한 문제가 발생하면 Kafka 서버를 재기동하시고 관련 Source Connector와 Topic을 삭제하신 다음에 다시 진행해 보시기 바랍니다.
감사합니다.