인프런 커뮤니티 질문&답변

babobabo님의 프로필 이미지
babobabo

작성한 질문수

Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA)

Kafka Connect 설치 ②

kafka connectors 에러

작성

·

1K

·

수정됨

0

안녕하세요 강의 잘 듣고 있습니다.!!

127.0.0.1:8083/connectors로 POST 요청으로

{
    "name": "my-source-connect",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "connection.url": "jdbc:mariadb://localhost:3306/mydb",
        "connection.user": "root",
        "connection.password": "지정한 password",
        "mode": "incrementing",
        "incrementing.column.name": "id",
        "table.whitelist": "users",
        "topic.prefix": "my_topic_",
        "tasks.max": "1"
    }
}

201 응답으로 잘 왔습니다.

{
    "name": "my-source-connect2",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "connection.url": "jdbc:mariadb://localhost:3306/mydb",
        "connection.user": "root",
        "connection.password": "지정한 password",
        "mode": "incrementing",
        "incrementing.column.name": "id",
        "table.whitelist": "users",
        "topic.prefix": "my_topic_",
        "tasks.max": "1",
        "name": "my-source-connect2"
    },
    "tasks": [],
    "type": "source"
}

127.0.0.1:8083/connectors/my-source-connect/status를 보니

{
    "name": "my-source-connect",
    "connector": {
        "state": "FAILED",
        "worker_id": "192.168.200.174:8083",
        "trace": "org.apache.kafka.connect.errors.ConnectException: java.sql.SQLException: No suitable driver found for jdbc:mysql://localhost:3306/mydb\n\tat io.confluent.connect.jdbc.util.CachedConnectionProvider.getConnection(CachedConnectionProvider.java:62)\n\tat io.confluent.connect.jdbc.JdbcSourceConnector.start(JdbcSourceConnector.java:95)\n\tat org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:190)\n\tat org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:215)\n\tat org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:360)\n\tat org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:343)\n\tat org.apache.kafka.connect.runtime.WorkerConnector.doRun(WorkerConnector.java:143)\n\tat org.apache.kafka.connect.runtime.WorkerConnector.run(WorkerConnector.java:121)\n\tat org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)\n\tat java.base/java.lang.Thread.run(Thread.java:833)\nCaused by: java.sql.SQLException: No suitable driver found for jdbc:mysql://localhost:3306/mydb\n\tat java.sql/java.sql.DriverManager.getConnection(DriverManager.java:706)\n\tat java.sql/java.sql.DriverManager.getConnection(DriverManager.java:190)\n\tat io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.getConnection(GenericDatabaseDialect.java:250)\n\tat io.confluent.connect.jdbc.util.CachedConnectionProvider.newConnection(CachedConnectionProvider.java:84)\n\tat io.confluent.connect.jdbc.util.CachedConnectionProvider.getConnection(CachedConnectionProvider.java:54)\n\t... 13 more\n"
    },
    "tasks": [],
    "type": "source"
}

터미널에서 확인해보니 127.0.0.1:8083/connectors로 POST 요청으로 보낼 때

[2023-08-08 19:11:24,848] INFO JdbcSourceConnectorConfig values: batch.max.rows = 100 catalog.pattern = null connection.attempts = 3 connection.backoff.ms = 10000 connection.password = [hidden] connection.url = jdbc:mariadb://localhost:3306/mydb connection.user = root db.timezone = UTC dialect.name = incrementing.column.name = id mode = incrementing numeric.mapping = null numeric.precision.mapping = false poll.interval.ms = 5000 query = query.retry.attempts = -1 query.suffix = quote.sql.identifiers = ALWAYS schema.pattern = null table.blacklist = [] table.monitoring.startup.polling.limit.ms = 10000 table.poll.interval.ms = 60000 table.types = [TABLE] table.whitelist = [users] timestamp.column.name = [] timestamp.delay.interval.ms = 0 timestamp.granularity = connect_logical timestamp.initial = null topic.prefix = my_topic_ transaction.isolation.mode = DEFAULT validate.non.null = true (io.confluent.connect.jdbc.source.JdbcSourceConnectorConfig:376) [2023-08-08 19:11:24,850] INFO AbstractConfig values: (org.apache.kafka.common.config.AbstractConfig:376) [2023-08-08 19:11:24,856] INFO [Worker clientId=connect-1, groupId=connect-cluster] Connector my-source-connect2 config updated (org.apache.kafka.connect.runtime.distributed.DistributedHerder:2092) [2023-08-08 19:11:24,858] INFO [Worker clientId=connect-1, groupId=connect-cluster] Rebalance started (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:231) [2023-08-08 19:11:24,858] INFO [Worker clientId=connect-1, groupId=connect-cluster] (Re-)joining group (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:566) [2023-08-08 19:11:24,860] INFO 127.0.0.1 - - [08/8월/2023:10:11:24 +0000] "POST /connectors HTTP/1.1" 201 400 "-" "PostmanRuntime/7.32.3" 18 (org.apache.kafka.connect.runtime.rest.RestServer:62) [2023-08-08 19:11:24,861] INFO [Worker clientId=connect-1, groupId=connect-cluster] Successfully joined group with generation Generation{generationId=4, memberId='connect-1-a7b22fdb-774a-479a-9618-b4e9504e4e95', protocol='sessioned'} (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:627) [2023-08-08 19:11:24,866] INFO [Worker clientId=connect-1, groupId=connect-cluster] Successfully synced group in generation Generation{generationId=4, memberId='connect-1-a7b22fdb-774a-479a-9618-b4e9504e4e95', protocol='sessioned'} (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:802) [2023-08-08 19:11:24,866] INFO [Worker clientId=connect-1, groupId=connect-cluster] Joined group at generation 4 with protocol version 2 and got assignment: Assignment{error=0, leader='connect-1-a7b22fdb-774a-479a-9618-b4e9504e4e95', leaderUrl='http://192.168.200.174:8083/', offset=5, connectorIds=[my-source-connect2, my-source-connect1, my-source-connect], taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 (org.apache.kafka.connect.runtime.distributed.DistributedHerder:2282) [2023-08-08 19:11:24,866] INFO [Worker clientId=connect-1, groupId=connect-cluster] Starting connectors and tasks using config offset 5 (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1676) [2023-08-08 19:11:24,867] INFO [Worker clientId=connect-1, groupId=connect-cluster] Starting connector my-source-connect2 (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1793) [2023-08-08 19:11:24,867] INFO [my-source-connect2|worker] Creating connector my-source-connect2 of type io.confluent.connect.jdbc.JdbcSourceConnector (org.apache.kafka.connect.runtime.Worker:293) [2023-08-08 19:11:24,867] INFO [my-source-connect2|worker] SourceConnectorConfig values: config.action.reload = restart connector.class = io.confluent.connect.jdbc.JdbcSourceConnector errors.log.enable = false errors.log.include.messages = false errors.retry.delay.max.ms = 60000 errors.retry.timeout = 0 errors.tolerance = none exactly.once.support = requested header.converter = null key.converter = null name = my-source-connect2 offsets.storage.topic = null predicates = [] tasks.max = 1 topic.creation.groups = [] transaction.boundary = poll transaction.boundary.interval.ms = null transforms = [] value.converter = null (org.apache.kafka.connect.runtime.SourceConnectorConfig:376) [2023-08-08 19:11:24,867] INFO [my-source-connect2|worker] EnrichedConnectorConfig values: config.action.reload = restart connector.class = io.confluent.connect.jdbc.JdbcSourceConnector errors.log.enable = false errors.log.include.messages = false errors.retry.delay.max.ms = 60000 errors.retry.timeout = 0 errors.tolerance = none exactly.once.support = requested header.converter = null key.converter = null name = my-source-connect2 offsets.storage.topic = null predicates = [] tasks.max = 1 topic.creation.groups = [] transaction.boundary = poll transaction.boundary.interval.ms = null transforms = [] value.converter = null (org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:376) [2023-08-08 19:11:24,868] INFO [my-source-connect2|worker] Instantiated connector my-source-connect2 with version 10.7.3 of type class io.confluent.connect.jdbc.JdbcSourceConnector (org.apache.kafka.connect.runtime.Worker:315) [2023-08-08 19:11:24,868] INFO [my-source-connect2|worker] Finished creating connector my-source-connect2 (org.apache.kafka.connect.runtime.Worker:336) [2023-08-08 19:11:24,868] INFO [Worker clientId=connect-1, groupId=connect-cluster] Finished starting connectors and tasks (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1704) [2023-08-08 19:11:24,868] INFO [my-source-connect2|worker] Starting JDBC Source Connector (io.confluent.connect.jdbc.JdbcSourceConnector:71) [2023-08-08 19:11:24,869] INFO [my-source-connect2|worker] JdbcSourceConnectorConfig values: batch.max.rows = 100 catalog.pattern = null connection.attempts = 3 connection.backoff.ms = 10000 connection.password = [hidden] connection.url = jdbc:mariadb://localhost:3306/mydb connection.user = root db.timezone = UTC dialect.name = incrementing.column.name = id mode = incrementing numeric.mapping = null numeric.precision.mapping = false poll.interval.ms = 5000 query = query.retry.attempts = -1 query.suffix = quote.sql.identifiers = ALWAYS schema.pattern = null table.blacklist = [] table.monitoring.startup.polling.limit.ms = 10000 table.poll.interval.ms = 60000 table.types = [TABLE] table.whitelist = [users] timestamp.column.name = [] timestamp.delay.interval.ms = 0 timestamp.granularity = connect_logical timestamp.initial = null topic.prefix = my_topic_ transaction.isolation.mode = DEFAULT validate.non.null = true (io.confluent.connect.jdbc.source.JdbcSourceConnectorConfig:376) [2023-08-08 19:11:24,869] INFO [my-source-connect2|worker] Validating JDBC URL. (io.confluent.connect.jdbc.dialect.DatabaseDialects:171) [2023-08-08 19:11:24,869] INFO [my-source-connect2|worker] Validated JDBC URL. (io.confluent.connect.jdbc.dialect.DatabaseDialects:174) [2023-08-08 19:11:24,869] INFO [my-source-connect2|worker] Validating JDBC URL. (io.confluent.connect.jdbc.dialect.DatabaseDialects:171) [2023-08-08 19:11:24,869] INFO [my-source-connect2|worker] Validated JDBC URL. (io.confluent.connect.jdbc.dialect.DatabaseDialects:174) [2023-08-08 19:11:24,869] INFO [my-source-connect2|worker] Initial connection attempt with the database. (io.confluent.connect.jdbc.JdbcSourceConnector:94) [2023-08-08 19:11:24,871] INFO [my-source-connect2|worker] Unable to connect to database on attempt 1/3. Will retry in 10000 ms. (io.confluent.connect.jdbc.util.CachedConnectionProvider:90) java.sql.SQLException: No suitable driver found for jdbc:mariadb://localhost:3306/mydb at java.sql/java.sql.DriverManager.getConnection(DriverManager.java:706) at java.sql/java.sql.DriverManager.getConnection(DriverManager.java:190) at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.getConnection(GenericDatabaseDialect.java:250) at io.confluent.connect.jdbc.util.CachedConnectionProvider.newConnection(CachedConnectionProvider.java:84) at io.confluent.connect.jdbc.util.CachedConnectionProvider.getConnection(CachedConnectionProvider.java:54) at io.confluent.connect.jdbc.JdbcSourceConnector.start(JdbcSourceConnector.java:95) at org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:190) at org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:215) at org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:360) at org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:343) at org.apache.kafka.connect.runtime.WorkerConnector.doRun(WorkerConnector.java:143) at org.apache.kafka.connect.runtime.WorkerConnector.run(WorkerConnector.java:121) at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:833) [2023-08-08 19:11:34,877] INFO [my-source-connect2|worker] Unable to connect to database on attempt 2/3. Will retry in 10000 ms. (io.confluent.connect.jdbc.util.CachedConnectionProvider:90) java.sql.SQLException: No suitable driver found for jdbc:mariadb://localhost:3306/mydb at java.sql/java.sql.DriverManager.getConnection(DriverManager.java:706) at java.sql/java.sql.DriverManager.getConnection(DriverManager.java:190) at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.getConnection(GenericDatabaseDialect.java:250) at io.confluent.connect.jdbc.util.CachedConnectionProvider.newConnection(CachedConnectionProvider.java:84) at io.confluent.connect.jdbc.util.CachedConnectionProvider.getConnection(CachedConnectionProvider.java:54) at io.confluent.connect.jdbc.JdbcSourceConnector.start(JdbcSourceConnector.java:95) at org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:190) at org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:215) at org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:360) at org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:343) at org.apache.kafka.connect.runtime.WorkerConnector.doRun(WorkerConnector.java:143) at org.apache.kafka.connect.runtime.WorkerConnector.run(WorkerConnector.java:121) at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:833)

으로 발생합니다.

 

에러를 보니
java.sql.SQLException: No suitable driver found for jdbc:mariadb://localhost:3306/mydb

Kafka Connect 런타임에서 MariaDB의 JDBC 드라이버를 찾지 못한거 같았습니다.

~/confluent-7.4.0/etc/kafka/connectdistributed.properties 경로에

plugin.path=/Users/ryu/kafka/confluentinc-kafka-connect-jdbc-10.7.3/lib

잘 지정해준거 같았고

 

문제가 예상되는 부분은 현재 gradle을 사용해서 예제를 따라하고 있었습니다.

...Maven Local Repository에 라이브러리 배포하는 식으로해서

plugins {
    id 'java-library'
    id 'maven-publish'
    id 'org.springframework.boot' version '2.7.14'
    id 'io.spring.dependency-management' version '1.0.15.RELEASE'
}

group = 'com.spring'
version = '0.0.1-SNAPSHOT'

java {
    sourceCompatibility = '17'
}

configurations {
    compileOnly {
        extendsFrom annotationProcessor
    }
}

repositories {
    mavenCentral()
}

ext {
    set('springCloudVersion', "2021.0.8")
}

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
    implementation 'org.springframework.boot:spring-boot-starter-web'
    implementation 'org.springframework.cloud:spring-cloud-starter-netflix-eureka-client'
    implementation 'org.modelmapper:modelmapper:2.4.5'
    implementation 'org.mariadb.jdbc:mariadb-java-client:2.7.2'

    compileOnly 'org.projectlombok:lombok'
    developmentOnly 'org.springframework.boot:spring-boot-devtools'
    runtimeOnly 'com.h2database:h2:1.3.176'
    annotationProcessor 'org.projectlombok:lombok'
    testImplementation 'org.springframework.boot:spring-boot-starter-test'
}

dependencyManagement {
    imports {
        mavenBom "org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}"
    }
}

tasks.named('test') {
    useJUnitPlatform()
}

publishing {
    publications {
        maven(MavenPublication) {
            groupId = 'org.mariadb.jdbc' // groupId
            artifactId = 'mariadb-java-client' // artifactId
            version = '2.7.2'       // version
            from components.java
        }
    }
}
  • publishMavenPublicationToMavenLocal 을 통해서 배포해버리고...

maven의 기본 repository인 .m2에서 확인했을 때

.jar 파일이 mariadb-java-client-3.1.4-plain.jar 밖에 없어 사용했는데 혹시 gradle을 이용하면서 해결할 수 있는 방법이 있을까요?

 

 

답변 1

0

babobabo님의 프로필 이미지
babobabo
질문자

..하핳 자문자답이요

 https://mariadb.com/downloads/connectors/connectors-data-access/java8-connector/
따로 드라이버 설치해서 해결했습니다.

org.apache.kafka.connect.errors.ConnectException: The connector uses the unqualified table name as the topic name and has detected duplicate unqualified table names.

https://www.inflearn.com/questions/618181/source-connector-%EC%98%A4%EB%A5%98

babobabo님의 프로필 이미지
babobabo

작성한 질문수

질문하기