인프런 커뮤니티 질문&답변

이하정님의 프로필 이미지
이하정

작성한 질문수

[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!

커스텀 elasticsearch sink connector를 만들었는데 class를 찾지 못하고 있어요

작성

·

342

0

{

    "error_code": 500,

    "message": "Error trying to forward REST request: Failed to find any class that implements Connector and which name matches com.example.connector.kafka.ElasticsearchSinkConnector, available connectors are: 

	PluginDesc{klass=class com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector, name='com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector', version='0.0.0.0', encodedVersion=0.0.0.0, type=source, typeName='source', location='file:/data01/connectors/spooldir_source/'}, 

	PluginDesc{klass=class com.github.jcustenborder.kafka.connect.spooldir.SpoolDirJsonSourceConnector, name='com.github.jcustenborder.kafka.connect.spooldir.SpoolDirJsonSourceConnector', version='0.0.0.0', encodedVersion=0.0.0.0, type=source, typeName='source', location='file:/data01/connectors/spooldir_source/'}, 

	PluginDesc{klass=class com.github.jcustenborder.kafka.connect.spooldir.SpoolDirLineDelimitedSourceConnector, name='com.github.jcustenborder.kafka.connect.spooldir.SpoolDirLineDelimitedSourceConnector', version='0.0.0.0', encodedVersion=0.0.0.0, type=source, typeName='source', location='file:/data01/connectors/spooldir_source/'}, 

	PluginDesc{klass=class com.github.jcustenborder.kafka.connect.spooldir.SpoolDirSchemaLessJsonSourceConnector, name='com.github.jcustenborder.kafka.connect.spooldir.SpoolDirSchemaLessJsonSourceConnector', version='0.0.0.0', encodedVersion=0.0.0.0, type=source, typeName='source', location='file:/data01/connectors/spooldir_source/'}, 

	PluginDesc{klass=class com.github.jcustenborder.kafka.connect.spooldir.elf.SpoolDirELFSourceConnector, name='com.github.jcustenborder.kafka.connect.spooldir.elf.SpoolDirELFSourceConnector', version='0.0.0.0', encodedVersion=0.0.0.0, type=source, typeName='source', location='file:/data01/connectors/spooldir_source/'}, 

	PluginDesc{klass=class io.confluent.connect.elasticsearch.ElasticsearchSinkConnector, name='io.confluent.connect.elasticsearch.ElasticsearchSinkConnector', version='11.0.0', encodedVersion=11.0.0, type=sink, typeName='sink', location='file:/data01/connectors/elasticsearch_sink/'}, 

	PluginDesc{klass=class io.debezium.connector.sqlserver.SqlServerConnector, name='io.debezium.connector.sqlserver.SqlServerConnector', version='1.7.1.Final', encodedVersion=1.7.1.Final, type=source, typeName='source', location='file:/data01/connectors/mssql_source/'}, 

	PluginDesc{klass=class org.apache.kafka.connect.file.FileStreamSinkConnector, name='org.apache.kafka.connect.file.FileStreamSinkConnector', version='2.8.2', encodedVersion=2.8.2, type=sink, typeName='sink', location='classpath'}, 

	PluginDesc{klass=class org.apache.kafka.connect.file.FileStreamSourceConnector, name='org.apache.kafka.connect.file.FileStreamSourceConnector', version='2.8.2', encodedVersion=2.8.2, type=source, typeName='source', location='classpath'}, 

	PluginDesc{klass=class org.apache.kafka.connect.mirror.MirrorCheckpointConnector, name='org.apache.kafka.connect.mirror.MirrorCheckpointConnector', version='1', encodedVersion=1, type=source, typeName='source', location='classpath'}, 

	PluginDesc{klass=class org.apache.kafka.connect.mirror.MirrorHeartbeatConnector, name='org.apache.kafka.connect.mirror.MirrorHeartbeatConnector', version='1', encodedVersion=1, type=source, typeName='source', location='classpath'}, 

	PluginDesc{klass=class org.apache.kafka.connect.mirror.MirrorSourceConnector, name='org.apache.kafka.connect.mirror.MirrorSourceConnector', version='1', encodedVersion=1, type=source, typeName='source', location='classpath'}, 

	PluginDesc{klass=class org.apache.kafka.connect.tools.MockConnector, name='org.apache.kafka.connect.tools.MockConnector', version='2.8.2', encodedVersion=2.8.2, type=connector, typeName='connector', location='classpath'}, 

	PluginDesc{klass=class org.apache.kafka.connect.tools.MockSinkConnector, name='org.apache.kafka.connect.tools.MockSinkConnector', version='2.8.2', encodedVersion=2.8.2, type=sink, typeName='sink', location='classpath'}, 

	PluginDesc{klass=class org.apache.kafka.connect.tools.MockSourceConnector, name='org.apache.kafka.connect.tools.MockSourceConnector', version='2.8.2', encodedVersion=2.8.2, type=source, typeName='source', location='classpath'}, 

	PluginDesc{klass=class org.apache.kafka.connect.tools.SchemaSourceConnector, name='org.apache.kafka.connect.tools.SchemaSourceConnector', version='2.8.2', encodedVersion=2.8.2, type=source, typeName='source', location='classpath'}, 

	PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSinkConnector, name='org.apache.kafka.connect.tools.VerifiableSinkConnector', version='2.8.2', encodedVersion=2.8.2, type=source, typeName='source', location='classpath'}, 

	PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSourceConnector, name='org.apache.kafka.connect.tools.VerifiableSourceConnector', version='2.8.2', encodedVersion=2.8.2, type=source, typeName='source', location='classpath'}"

}

connector 등록 api를 날리면 이런 에러가 뜹니다.

그런데 /connector-plugins request를 날리면 plugin으로 인식하고 있어요.

SinkConnector를 상속한 클래스의 path도 맞는데 왜 동작하지 않는 건지 모르겠어요

[
    {
        "class": "com.example.connector.kafka.ElasticsearchSinkConnector",
        "type": "sink",
        "version": "1.0"
    },
    {
        "class": "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector",
        "type": "source",
        "version": "0.0.0.0"
    },
    {
        "class": "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirJsonSourceConnector",
        "type": "source",
        "version": "0.0.0.0"
    },
    {
        "class": "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirLineDelimitedSourceConnector",
        "type": "source",
        "version": "0.0.0.0"
    },
    {
        "class": "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirSchemaLessJsonSourceConnector",
        "type": "source",
        "version": "0.0.0.0"
    },
    {
        "class": "com.github.jcustenborder.kafka.connect.spooldir.elf.SpoolDirELFSourceConnector",
        "type": "source",
        "version": "0.0.0.0"
    },
    {
        "class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
        "type": "sink",
        "version": "11.0.0"
    },
    {
        "class": "io.debezium.connector.sqlserver.SqlServerConnector",
        "type": "source",
        "version": "1.8.1.Final"
    },
    {
        "class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
        "type": "sink",
        "version": "2.8.2"
    },
    {
        "class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
        "type": "source",
        "version": "2.8.2"
    },
    {
        "class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
        "type": "source",
        "version": "1"
    },
    {
        "class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
        "type": "source",
        "version": "1"
    },
    {
        "class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
        "type": "source",
        "version": "1"
    }
]


로그를 봐도 커넥트 로그만 나오고 등록한 커넥터의 로그는 나오지 않는 것 같은데 혹시 카프카와 연결해서 커넥터를 ide에서 디버깅할 수 있는 방법이 있을까요?

답변 2

0

데브원영 DVWY님의 프로필 이미지
데브원영 DVWY
지식공유자

안녕하세요!

커스텀 커넥터 jar가 정상적으로 plugin.path를 통해 등록되었음에도 불구하고 로드되지 못하는 이유는 다양할 수 있습니다. 올려주신 로그 메시지 만으로는 정확한 이유를 찾기는 어려운데요.

  1. 패키지 경로가 정확한지 확인

  2. ElasticsearchSinkConnector가 SinkConnector로 정상적으로 상속되었는지 확인

  3. uberJar(혹은 fatJar)로 만들 때 JDK 1.8 버전으로 컴파일 했는지 확인

위 3가지 방법을 시도해보시면 좋을것 같습니다.

0

안녕하세요 이하정 님, 인프런 AI 인턴이에요.

커넥터 등록 과정에서 클래스를 찾지 못하는 오류가 발생하고 있습니다. 이 경우에는 몇 가지 원인이 있을 수 있습니다.

  1. 커넥터 클래스가 올바른 패키지에 있는지 확인해주세요. 클래스의 경로와 패키지명이 일치하는지 확인해야 합니다.

  2. 커넥터 JAR 파일이 올바른 위치에 있는지 확인해주세요. 컨넥터 JAR 파일은 Kafka Connect 런타임의 plugin.path에 정의된 디렉토리에 위치해야 합니다.

  3. 커넥터 클래스 내에서 필요한 종속성이 올바르게 제공되는지 확인해주세요. 커넥터 클래스가 의존하는 라이브러리나 파일들이 올바르게 제공되지 않으면 클래스를 찾지 못하는 오류가 발생할 수 있습니다.

IDE에서 커넥터를 디버깅하려면 Kafka Connect 런타임을 직접 실행하고 해당 런타임에 커넥터를 등록해야 합니다. 특정 IDE(예: IntelliJ)에서는 디버깅 환경을 구성하여 Kafka Connect를 실행하고 디버깅 세션을 시작할 수도 있습니다. 이를 통해 커넥터 클래스 내에서 발생하는 문제를 더 쉽게 찾고 해결할 수 있습니다.

위의 과정들을 확인하고 문제가 계속되면 더 자세한 로그나 오류 메시지를 확인하여 문제 해결을 시도해보세요. 도움이 되셨길 바랍니다!

이하정님의 프로필 이미지
이하정

작성한 질문수

질문하기