인프런 커뮤니티 질문&답변

dwa08님의 프로필 이미지
dwa08

작성한 질문수

Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA)

Kafka Source Connect 사용

Source Connector 오류

작성

·

2.2K

2

  • - 학습 관련 질문을 남겨주세요. 상세히 작성하면 더 좋아요!
  • - 먼저 유사한 질문이 있었는지 검색해보세요.
  • - 서로 예의를 지키며 존중하는 문화를 만들어가요.
  • - 잠깐! 인프런 서비스 운영 관련 문의는 1:1 문의하기를 이용해주세요.

Postman 으로 데이터값

{
    "name": "my-source-connect",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "connection.url": "jdbc:mariadb://localhost:3306/mydb",
        "connection.user": "root",
        "connection.password": "1234",
        "mode": "incrementing",
        "incrementing.column.name": "id",
        "table.whitelist": "users",
        "topic.prefix": "my_topic_",
        "tasks.max": "1"
    }
}

을 Post 방식으로 보냈고 (h2 console 창에서 연결 확인했습니다 ) 다시 Get 방식으로 보냈을때 my-source-connect 확인했습니다.  그 후 connect 콘솔 창에 뜬 로그 입니다.

[2022-08-08 23:05:06,233] INFO JdbcSourceConnectorConfig values: 
	batch.max.rows = 100
	catalog.pattern = null
	connection.attempts = 3
	connection.backoff.ms = 10000
	connection.password = [hidden]
	connection.url = jdbc:mariadb://localhost:3306/mydb
	connection.user = root
	db.timezone = UTC
	dialect.name = 
	incrementing.column.name = id
	mode = incrementing
	numeric.mapping = null
	numeric.precision.mapping = false
	poll.interval.ms = 5000
	query = 
	query.retry.attempts = -1
	query.suffix = 
	quote.sql.identifiers = ALWAYS
	schema.pattern = null
	table.blacklist = []
	table.monitoring.startup.polling.limit.ms = 10000
	table.poll.interval.ms = 60000
	table.types = [TABLE]
	table.whitelist = [users]
	timestamp.column.name = []
	timestamp.delay.interval.ms = 0
	timestamp.granularity = connect_logical
	timestamp.initial = null
	topic.prefix = my_topic_
	transaction.isolation.mode = DEFAULT
	validate.non.null = true
 (io.confluent.connect.jdbc.source.JdbcSourceConnectorConfig:361)
[2022-08-08 23:05:06,241] INFO AbstractConfig values: 
 (org.apache.kafka.common.config.AbstractConfig:361)
[2022-08-08 23:05:06,270] INFO [Worker clientId=connect-1, groupId=connect-cluster] Connector my-source-connect config updated (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1534)
[2022-08-08 23:05:06,275] INFO [Worker clientId=connect-1, groupId=connect-cluster] Rebalance started (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:225)
[2022-08-08 23:05:06,275] INFO [Worker clientId=connect-1, groupId=connect-cluster] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:540)
[2022-08-08 23:05:06,280] INFO [Worker clientId=connect-1, groupId=connect-cluster] Successfully joined group with generation Generation{generationId=48, memberId='connect-1-74e3b810-49fe-4cd5-90cf-0c9408ba73ab', protocol='sessioned'} (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:596)
[2022-08-08 23:05:06,297] INFO [Worker clientId=connect-1, groupId=connect-cluster] Successfully synced group in generation Generation{generationId=48, memberId='connect-1-74e3b810-49fe-4cd5-90cf-0c9408ba73ab', protocol='sessioned'} (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:756)
[2022-08-08 23:05:06,298] INFO [Worker clientId=connect-1, groupId=connect-cluster] Joined group at generation 48 with protocol version 2 and got assignment: Assignment{error=0, leader='connect-1-74e3b810-49fe-4cd5-90cf-0c9408ba73ab', leaderUrl='http://127.0.0.1:8083/', offset=44, connectorIds=[my-source-connect], taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1689)
[2022-08-08 23:05:06,303] INFO [Worker clientId=connect-1, groupId=connect-cluster] Starting connectors and tasks using config offset 44 (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1216)
[2022-08-08 23:05:06,308] INFO [Worker clientId=connect-1, groupId=connect-cluster] Starting connector my-source-connect (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1299)
[2022-08-08 23:05:06,313] INFO Creating connector my-source-connect of type io.confluent.connect.jdbc.JdbcSourceConnector (org.apache.kafka.connect.runtime.Worker:274)
[2022-08-08 23:05:06,316] INFO SourceConnectorConfig values: 
	config.action.reload = restart
	connector.class = io.confluent.connect.jdbc.JdbcSourceConnector
	errors.log.enable = false
	errors.log.include.messages = false
	errors.retry.delay.max.ms = 60000
	errors.retry.timeout = 0
	errors.tolerance = none
	header.converter = null
	key.converter = null
	name = my-source-connect
	predicates = []
	tasks.max = 1
	topic.creation.groups = []
	transforms = []
	value.converter = null
 (org.apache.kafka.connect.runtime.SourceConnectorConfig:361)
[2022-08-08 23:05:06,317] INFO EnrichedConnectorConfig values: 
	config.action.reload = restart
	connector.class = io.confluent.connect.jdbc.JdbcSourceConnector
	errors.log.enable = false
	errors.log.include.messages = false
	errors.retry.delay.max.ms = 60000
	errors.retry.timeout = 0
	errors.tolerance = none
	header.converter = null
	key.converter = null
	name = my-source-connect
	predicates = []
	tasks.max = 1
	topic.creation.groups = []
	transforms = []
	value.converter = null
 (org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:361)
[2022-08-08 23:05:06,339] INFO Instantiated connector my-source-connect with version 10.5.1 of type class io.confluent.connect.jdbc.JdbcSourceConnector (org.apache.kafka.connect.runtime.Worker:284)
[2022-08-08 23:05:06,339] INFO Finished creating connector my-source-connect (org.apache.kafka.connect.runtime.Worker:310)
[2022-08-08 23:05:06,341] INFO Starting JDBC Source Connector (io.confluent.connect.jdbc.JdbcSourceConnector:71)
[2022-08-08 23:05:06,344] INFO [Worker clientId=connect-1, groupId=connect-cluster] Finished starting connectors and tasks (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1244)
[2022-08-08 23:05:06,358] INFO JdbcSourceConnectorConfig values: 
	batch.max.rows = 100
	catalog.pattern = null
	connection.attempts = 3
	connection.backoff.ms = 10000
	connection.password = [hidden]
	connection.url = jdbc:mariadb://localhost:3306/mydb
	connection.user = root
	db.timezone = UTC
	dialect.name = 
	incrementing.column.name = id
	mode = incrementing
	numeric.mapping = null
	numeric.precision.mapping = false
	poll.interval.ms = 5000
	query = 
	query.retry.attempts = -1
	query.suffix = 
	quote.sql.identifiers = ALWAYS
	schema.pattern = null
	table.blacklist = []
	table.monitoring.startup.polling.limit.ms = 10000
	table.poll.interval.ms = 60000
	table.types = [TABLE]
	table.whitelist = [users]
	timestamp.column.name = []
	timestamp.delay.interval.ms = 0
	timestamp.granularity = connect_logical
	timestamp.initial = null
	topic.prefix = my_topic_
	transaction.isolation.mode = DEFAULT
	validate.non.null = true
 (io.confluent.connect.jdbc.source.JdbcSourceConnectorConfig:361)
[2022-08-08 23:05:06,369] INFO Attempting to open connection #1 to MySql (io.confluent.connect.jdbc.util.CachedConnectionProvider:79)
[2022-08-08 23:05:06,446] INFO Starting thread to monitor tables. (io.confluent.connect.jdbc.source.TableMonitorThread:82)
[2022-08-08 23:05:06,494] INFO SourceConnectorConfig values: 
	config.action.reload = restart
	connector.class = io.confluent.connect.jdbc.JdbcSourceConnector
	errors.log.enable = false
	errors.log.include.messages = false
	errors.retry.delay.max.ms = 60000
	errors.retry.timeout = 0
	errors.tolerance = none
	header.converter = null
	key.converter = null
	name = my-source-connect
	predicates = []
	tasks.max = 1
	topic.creation.groups = []
	transforms = []
	value.converter = null
 (org.apache.kafka.connect.runtime.SourceConnectorConfig:361)
[2022-08-08 23:05:06,498] INFO EnrichedConnectorConfig values: 
	config.action.reload = restart
	connector.class = io.confluent.connect.jdbc.JdbcSourceConnector
	errors.log.enable = false
	errors.log.include.messages = false
	errors.retry.delay.max.ms = 60000
	errors.retry.timeout = 0
	errors.tolerance = none
	header.converter = null
	key.converter = null
	name = my-source-connect
	predicates = []
	tasks.max = 1
	topic.creation.groups = []
	transforms = []
	value.converter = null
 (org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:361)
[2022-08-08 23:05:06,530] ERROR Encountered an unrecoverable error while reading tables from the database (io.confluent.connect.jdbc.source.TableMonitorThread:224)
org.apache.kafka.connect.errors.ConnectException: The connector uses the unqualified table name as the topic name and has detected duplicate unqualified table names. This could lead to mixed data types in the topic and downstream processing errors. To prevent such processing errors, the JDBC Source connector fails to start when it detects duplicate table name configurations. Update the connector's 'table.whitelist' config to include exactly one table in each of the tables listed below.
	[["mydb"."users", "performance_schema"."users"]]
	at io.confluent.connect.jdbc.source.TableMonitorThread.tables(TableMonitorThread.java:152)
	at io.confluent.connect.jdbc.JdbcSourceConnector.taskConfigs(JdbcSourceConnector.java:164)
	at org.apache.kafka.connect.runtime.Worker.connectorTaskConfigs(Worker.java:359)
	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnector(DistributedHerder.java:1428)
	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnectorTasksWithRetry(DistributedHerder.java:1366)
	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1000(DistributedHerder.java:128)
	at org.apache.kafka.connect.runtime.distributed.DistributedHerder$12.call(DistributedHerder.java:1318)
	at org.apache.kafka.connect.runtime.distributed.DistributedHerder$12.call(DistributedHerder.java:1312)
	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:371)
	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:295)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
[2022-08-08 23:05:06,538] ERROR WorkerConnector{id=my-source-connect} Connector raised an error (org.apache.kafka.connect.runtime.WorkerConnector:506)
org.apache.kafka.connect.errors.ConnectException: Encountered an unrecoverable error while reading tables from the database
	at io.confluent.connect.jdbc.source.TableMonitorThread.fail(TableMonitorThread.java:226)
	at io.confluent.connect.jdbc.source.TableMonitorThread.tables(TableMonitorThread.java:153)
	at io.confluent.connect.jdbc.JdbcSourceConnector.taskConfigs(JdbcSourceConnector.java:164)
	at org.apache.kafka.connect.runtime.Worker.connectorTaskConfigs(Worker.java:359)
	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnector(DistributedHerder.java:1428)
	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnectorTasksWithRetry(DistributedHerder.java:1366)
	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1000(DistributedHerder.java:128)
	at org.apache.kafka.connect.runtime.distributed.DistributedHerder$12.call(DistributedHerder.java:1318)
	at org.apache.kafka.connect.runtime.distributed.DistributedHerder$12.call(DistributedHerder.java:1312)
	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:371)
	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:295)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.connect.errors.ConnectException: The connector uses the unqualified table name as the topic name and has detected duplicate unqualified table names. This could lead to mixed data types in the topic and downstream processing errors. To prevent such processing errors, the JDBC Source connector fails to start when it detects duplicate table name configurations. Update the connector's 'table.whitelist' config to include exactly one table in each of the tables listed below.
	[["mydb"."users", "performance_schema"."users"]]
	at io.confluent.connect.jdbc.source.TableMonitorThread.tables(TableMonitorThread.java:152)
	... 14 more
[2022-08-08 23:05:06,542] ERROR [Worker clientId=connect-1, groupId=connect-cluster] Failed to reconfigure connector's tasks, retrying after backoff: (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1377)
org.apache.kafka.connect.errors.ConnectException: Encountered an unrecoverable error while reading tables from the database
	at io.confluent.connect.jdbc.source.TableMonitorThread.fail(TableMonitorThread.java:226)
	at io.confluent.connect.jdbc.source.TableMonitorThread.tables(TableMonitorThread.java:153)
	at io.confluent.connect.jdbc.JdbcSourceConnector.taskConfigs(JdbcSourceConnector.java:164)
	at org.apache.kafka.connect.runtime.Worker.connectorTaskConfigs(Worker.java:359)
	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnector(DistributedHerder.java:1428)
	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnectorTasksWithRetry(DistributedHerder.java:1366)
	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1000(DistributedHerder.java:128)
	at org.apache.kafka.connect.runtime.distributed.DistributedHerder$12.call(DistributedHerder.java:1318)
	at org.apache.kafka.connect.runtime.distributed.DistributedHerder$12.call(DistributedHerder.java:1312)
	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:371)
	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:295)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.connect.errors.ConnectException: The connector uses the unqualified table name as the topic name and has detected duplicate unqualified table names. This could lead to mixed data types in the topic and downstream processing errors. To prevent such processing errors, the JDBC Source connector fails to start when it detects duplicate table name configurations. Update the connector's 'table.whitelist' config to include exactly one table in each of the tables listed below.
	[["mydb"."users", "performance_schema"."users"]]
	at io.confluent.connect.jdbc.source.TableMonitorThread.tables(TableMonitorThread.java:152)
	... 14 more
[2022-08-08 23:05:06,546] INFO [Worker clientId=connect-1, groupId=connect-cluster] Skipping reconfiguration of connector my-source-connect since it is not running (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1415)
[2022-08-08 23:05:06,795] INFO [Worker clientId=connect-1, groupId=connect-cluster] Skipping reconfiguration of connector my-source-connect since it is not running (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1415)

 계속 구글링하고 찾아봤는데  뭐가 잘못되었는지 잘모르겠습니다 ㅠ 

답변 3

7

저도 같은 예외를 만났는데 확인해보니

 


Caused by: org.apache.kafka.connect.errors.ConnectException: 
The connector uses the unqualified table name as the topic name and has detected duplicate 
unqualified table names. This could lead to mixed data types in the topic and downstream 
processing errors. To prevent such processing errors, the JDBC Source connector fails to 
start when it detects duplicate table name configurations. Update the connector's 
'table.whitelist' 
config to include exactly one table in each of the tables listed below.

	[["linkocean"."users", "linkoceantest"."users", "mydb"."users"]]
	at io.confluent.connect.jdbc.source.TableMonitorThread.tables(TableMonitorThread.java:152)
	... 14 more

이런 로그기 찍혀 보니 다른 schema 라 할지라도 현재 연결된 데이터베이스에 같은 이름의 table 이 있으면 테이블을 찾지 못하는 예외가 발생하는 것 같습니다.

 

dwa08 님은 테이블 이름을 변경하는 방식으로 해결하셨는데

커넥션을 생성하는 api 요청시 table.whitelist 에 점을 찍어 shema 를 명시해주는 방식으로도 해결할 수 있네요

 

{
    "name" : "my-source-connect",
    "config" : {
        "connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector",
        "connection.url":"jdbc:mysql://localhost:3306/mydb",
        "connection.user":"root",
        "connection.password":"root1234",
        "mode": "incrementing",
        "incrementing.column.name" : "id",
        "table.whitelist":"mydb.users",
        "topic.prefix" : "my_topic_",
        "tasks.max" : "1"
        }
}

 

 

 

 

 

0

주신 로그 하단에 보시면 다음과 같이 나온 부분을 볼 수 있습니다.

 

 config to include exactly one table in each of the tables listed below.
	[["mydb"."users", "performance_schema"."users"]]
	at io.confluent.connect.jdbc.source.TableMonitorThread.tables(TableMonitorThread.java:152)
	... 14 more

즉 users 테이블을 명시적으로 적어라 라는 내용입니다.

이 말은 이미 DBMS 안에 users 라는 테이블이 이미 존재하기 때문입니다.


https://docs.actian.com/psql/psqlv13/index.html#page/sqlref%2Fsqlkword.htm%23ww78995

위 문서는 관계형 DBMS 에 존재하는 예약어들을 나열해둔 사이트인데,

보시면 USERS 는 이미 사용중인 예약어임을 알 수 있습니다.

그래서 보통 USERS 라는 테이블 명 대신에 다른 예약어를 쓰기도 하고요.

 

에러로그에서 보면

[["mydb"."users", "performance_schema"."users"]]

performance_schema 라는 데이터베이스에

users 라는 테이블이 존재함을 확인할 수 있습니다.

 

따라서 다른 분이 말씀해주신 것처럼

  1. mydb.users 라고 명시적으로 mydb 데이터베이스에 존재하는 users 를 명시하기

    이미 강의대로 따라하셨다면 
    PUT http://localhost:8083/connectors/my-source-connect/config
    위 url과 아래의 내용을 body 로 남드셔서 보내시면 설정 수정됩니다!
    
    {
        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "connection.url": "jdbc:mariadb://localhost:3306/mydb", 이 부분도 지금 버전에서는 mariadb 라고 해야 인식하더라구요 ㅠ
        "connection.user": "root",
        "connection.password": "test1357",
        "mode": "incrementing",
        "incrementing.column.name": "id",
        "table.whitelist": "mydb.users",
        "topic.prefix": "my_topic_",
        "tasks.max": "1"
    }
  2. mydb 에서 users 라는 테이블 이름을 수정하기
    밑에 분은 users1 로 수정하셔서 해결하셨습니다 :)

둘 중 아무거나 선택하셔도 강의 진행하는 데는 무리없으실거에요!

저처럼 고통받지 마시길 ㅠ

0

dwa08님의 프로필 이미지
dwa08
질문자

table 명을 users1으로변경하여 해결했습니다! 

Dowon Lee님의 프로필 이미지
Dowon Lee
지식공유자

안녕하세요, 이도원입니다.

먼저 해결을 하셨다니 의미가 없을수도 있겠지만, 올려주신 로그의 하단을 확인해 보니, mydb에 users 테이블을 사용하는 부분에서의 오류인거 같습니다. 다음에도 동일한 문제가 발생하면 Kafka 서버를 재기동하시고 관련 Source Connector와 Topic을 삭제하신 다음에 다시 진행해 보시기 바랍니다.

감사합니다.

dwa08님의 프로필 이미지
dwa08

작성한 질문수

질문하기