인프런 커뮤니티 질문&답변

아캄님의 프로필 이미지
아캄

작성한 질문수

카프카 완벽 가이드 - 커넥트(Connect) 편

kafka connect 실행시 오류

작성

·

5.4K

0

m1 맥북사용자인데, 강사님 가이드에 따라 virtural box에 ubuntu를 해보려니, 설치가 안도고 계속 virtural box가 오류가 나서, 그냥 맥의 로컬에 설치하여 강의를 쫓아가고 있어요.

 

그런데, connector 부팅시 오류가 발생합니다.

connect-distributed $CONFLUENT_HOME/etc/kafka/connect-distributed.properties

groupId를 바꿔도 봐도 동일합니다.

해당 포트도 중복되나 싶어서 다른 것으로 바꿔도 동일합니다.

 

확인해 주실 수 있을까요?

 

 

[2023-02-12 01:06:44,572] ERROR [Worker clientId=connect-1, groupId=connect-cluster-b] Uncaught exception in herder work thread, exiting: (org.apache.kafka.connect.runtime.distributed.DistributedHerder:334)

java.lang.IllegalStateException: There are no in-flight requests for node 0

at org.apache.kafka.clients.InFlightRequests.requestQueue(InFlightRequests.java:62)

at org.apache.kafka.clients.InFlightRequests.completeNext(InFlightRequests.java:70)

at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:872)

at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:569)

at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)

at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:306)

at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:536)

at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1262)

at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231)

at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)

at org.apache.kafka.connect.util.KafkaBasedLog.poll(KafkaBasedLog.java:315)

at org.apache.kafka.connect.util.KafkaBasedLog.readToLogEnd(KafkaBasedLog.java:351)

at org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:201)

at org.apache.kafka.connect.storage.KafkaConfigBackingStore.start(KafkaConfigBackingStore.java:294)

at org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:132)

at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:320)

at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:577)

at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)

at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)

at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)

at java.base/java.lang.Thread.run(Thread.java:1589)

[2023-02-12 01:06:44,582] INFO Stopped http_localhost8084@30cecdca{HTTP/1.1, (http/1.1)}{localhost:8084} (org.eclipse.jetty.server.AbstractConnector:381)

답변 1

0

권 철민님의 프로필 이미지
권 철민
지식공유자

안녕하십니까,

음, m1에서 실습 환경 세팅을 해보셨군요. 강의 소개에서도 말씀드렸듯이 m1 맥북에서는 virtual box가 설치 문제가 있습니다 ^^;;

맥의 로컬에 설치하셨다는게, virtual box 설정 없이 바로 mac에서 실행하신다는 건지요?

일단 그렇다고 가정하고...

근데, 적어주신 오류 메시지가 어떤 오류인지 파악하기는 큰 정보가 없군요.

먼저

  1. kafka가 제대로 기동이 되었는지 확인해 주십시요.

     

  2. 구글 검색을 해보면 port 충돌에 대한 애기가 있습니다. sonarqube H2 DB가 기존 Kafka 기동 port를 점유하고 있다는 내용입니다. 먼저 kafka 기동시 나오는 메시지를 확인해 보시고, kafka-console-producer 등으로 샘플 메시지를 broker에 던져서 제대로 메세지가 전송 되는지 확인 부탁드립니다.

     

    https://issues.apache.org/jira/browse/KAFKA-10450

     

    https://stackoverflow.com/questions/63654739/kafka-on-mac-there-are-no-in-flight-requests-for-node-1-while-starting-console

     

  3. 1, 2로 했는데도 여전히 문제가 발생하고 있으면 kafka(connect 아님) broker의 로그 메시지와 connect-distributed.properties 파일의 내용을 여기에 올려 주십시요(어떤 파라미터를 변경했는지도 함께 기재 부탁드립니다.

 

감사합니다.

아캄님의 프로필 이미지
아캄
질문자

쉬는 날인데도 답변 감사합니다.

docker로도 해보고 여러 방향으로 시도를 해보니는데, 진척이 없네요.

m1칩이 문제인지....

  1. 네 kafka 의 로그는 특별한게 없습니다.

  2. 네 저도 동일한 것을 검색했기에 포트를 확인하니 특이사항이 없습니다.

    다만, kafka를 다시 실행하면 운좋게 될때도 있고, 안될때도 있습니다.

    1. 그래서 데이터 폴더의 파일을 삭제하고 다시 하면 되더군요. 그런데 이게 랜덤으로 됐다가 안됐다가 역시 그러네요.

       

      # A comma separated list of directories under which to store log files
      log.dirs=/Users/xxx/dev/workspace/localkafka/confluent/data/kafka-logs
    2. plugin도 다 잘 올라옵니다.

           [
        {
            "class": "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirAvroSourceConnector",
            "type": "source",
            "version": "0.0.0.0"
        },
        {
            "class": "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirBinaryFileSourceConnector",
            "type": "source",
            "version": "0.0.0.0"
        },
        {
            "class": "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector",
            "type": "source",
            "version": "0.0.0.0"
        },
        {
            "class": "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirJsonSourceConnector",
            "type": "source",
            "version": "0.0.0.0"
        },
        {
            "class": "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirLineDelimitedSourceConnector",
            "type": "source",
            "version": "0.0.0.0"
        },
        {
            "class": "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirSchemaLessJsonSourceConnector",
            "type": "source",
            "version": "0.0.0.0"
        },
        {
            "class": "com.github.jcustenborder.kafka.connect.spooldir.elf.SpoolDirELFSourceConnector",
            "type": "source",
            "version": "0.0.0.0"
        },
        {
            "class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
            "type": "source",
            "version": "7.3.0-ccs"
        },
        {
            "class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
            "type": "source",
            "version": "7.3.0-ccs"
        },
        {
            "class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
            "type": "source",
            "version": "7.3.0-ccs"
        }
    ]
    1. 문제는 다시 spooldir을 등록하면 또 동일하게 에러가 발생합니다.

      {
        "name": "csv_spooldir_source",
        "config": {
          "tasks.max": "3",
          "connector.class": "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector",
          "input.path": "/Users/xxx/dev/workspace/localkafka/confluent/data/spooldir",
          "input.file.pattern": "csv-spooldir-source-01.csv",
          "error.path": "/Users/xxx/dev/workspace/localkafka/confluent/data/spooldir/error",
          "finished.path": "/Users/xxx/dev/workspace/localkafka/confluent/data/spooldir/finished",
          "empty.poll.wait.ms": 30000,
          "halt.on.error": "false",
          "topic": "spooldir-test-topic",
          "csv.first.row.as.header": "true",
          "schema.generation.enabled": "true"
         }
      }
      
      
      # Licensed to the Apache Software Foundation (ASF) under one or more
      # contributor license agreements.  See the NOTICE file distributed with
      # this work for additional information regarding copyright ownership.
      # The ASF licenses this file to You under the Apache License, Version 2.0
      # (the "License"); you may not use this file except in compliance with
      # the License.  You may obtain a copy of the License at
      #
      #    http://www.apache.org/licenses/LICENSE-2.0
      #
      # Unless required by applicable law or agreed to in writing, software
      # distributed under the License is distributed on an "AS IS" BASIS,
      # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      # See the License for the specific language governing permissions and
      # limitations under the License.
      ##
      
      # This file contains some of the configurations for the Kafka Connect distributed worker. This file is intended
      # to be used with the examples, and some settings may differ from those used in a production system, especially
      # the `bootstrap.servers` and those specifying replication factors.
      
      # A list of host/port pairs to use for establishing the initial connection to the Kafka cluster.
      bootstrap.servers=localhost:9092
      
      # unique name for the cluster, used in forming the Connect cluster group. Note that this must not conflict with consumer group IDs
      group.id=connect-cluster
      
      # The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
      # need to configure these based on the format they want their data in when loaded from or stored into Kafka
      key.converter=org.apache.kafka.connect.json.JsonConverter
      value.converter=org.apache.kafka.connect.json.JsonConverter
      # Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
      # it to
      key.converter.schemas.enable=true
      value.converter.schemas.enable=true
      
      # Topic to use for storing offsets. This topic should have many partitions and be replicated and compacted.
      # Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create
      # the topic before starting Kafka Connect if a specific topic configuration is needed.
      # Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value.
      # Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able
      # to run this example on a single-broker cluster and so here we instead set the replication factor to 1.
      offset.storage.topic=connect-offsets
      offset.storage.replication.factor=1
      #offset.storage.partitions=25
      
      # Topic to use for storing connector and task configurations; note that this should be a single partition, highly replicated,
      # and compacted topic. Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create
      # the topic before starting Kafka Connect if a specific topic configuration is needed.
      # Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value.
      # Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able
      # to run this example on a single-broker cluster and so here we instead set the replication factor to 1.
      config.storage.topic=connect-configs
      config.storage.replication.factor=1
      
      # Topic to use for storing statuses. This topic can have multiple partitions and should be replicated and compacted.
      # Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create
      # the topic before starting Kafka Connect if a specific topic configuration is needed.
      # Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value.
      # Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able
      # to run this example on a single-broker cluster and so here we instead set the replication factor to 1.
      status.storage.topic=connect-status
      status.storage.replication.factor=1
      #status.storage.partitions=5
      
      # Flush much faster than normal, which is useful for testing/debugging
      offset.flush.interval.ms=10000
      
      # List of comma-separated URIs the REST API will listen on. The supported protocols are HTTP and HTTPS.
      # Specify hostname as 0.0.0.0 to bind to all interfaces.
      # Leave hostname empty to bind to default interface.
      # Examples of legal listener lists: HTTP://myhost:8083,HTTPS://myhost:8084"
      #listeners=HTTP://:8083
      
      # The Hostname & Port that will be given out to other workers to connect to i.e. URLs that are routable from other servers.
      # If not set, it uses the value for "listeners" if configured.
      #rest.advertised.host.name=
      #rest.advertised.port=
      #rest.advertised.listener=
      
      # Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
      # (connectors, converters, transformations). The list should consist of top level directories that include 
      # any combination of: 
      # a) directories immediately containing jars with plugins and their dependencies
      # b) uber-jars with plugins and their dependencies
      # c) directories immediately containing the package directory structure of classes of plugins and their dependencies
      # Examples: 
      # plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
      plugin.path=/Users/xxx/dev/workspace/localkafka/confluent/plugins
      
      1.  

        plugin.path 이것만 수정했어요.
  3. 각종 로그

    zookeeper 로그 (정상시)

    [2023-02-13 00:05:55,412] INFO Using org.apache.zookeeper.server.watch.WatchManager as watch manager (org.apache.zookeeper.server.watch.WatchManagerFactory)
    [2023-02-13 00:05:55,412] INFO zookeeper.snapshotSizeFactor = 0.33 (org.apache.zookeeper.server.ZKDatabase)
    [2023-02-13 00:05:55,412] INFO zookeeper.commitLogCount=500 (org.apache.zookeeper.server.ZKDatabase)
    [2023-02-13 00:05:55,415] INFO zookeeper.snapshot.compression.method = CHECKED (org.apache.zookeeper.server.persistence.SnapStream)
    [2023-02-13 00:05:55,415] INFO Snapshotting: 0x0 to /Users/xxx/dev/workspace/localkafka/confluent/data/zookeeper/version-2/snapshot.0 (org.apache.zookeeper.server.persistence.FileTxnSnapLog)
    [2023-02-13 00:05:55,417] INFO Snapshot loaded in 5 ms, highest zxid is 0x0, digest is 1371985504 (org.apache.zookeeper.server.ZKDatabase)
    [2023-02-13 00:05:55,417] INFO Snapshotting: 0x0 to /Users/xxx/dev/workspace/localkafka/confluent/data/zookeeper/version-2/snapshot.0 (org.apache.zookeeper.server.persistence.FileTxnSnapLog)
    [2023-02-13 00:05:55,418] INFO Snapshot taken in 0 ms (org.apache.zookeeper.server.ZooKeeperServer)
    [2023-02-13 00:05:55,422] INFO PrepRequestProcessor (sid:0) started, reconfigEnabled=false (org.apache.zookeeper.server.PrepRequestProcessor)
    [2023-02-13 00:05:55,422] INFO zookeeper.request_throttler.shutdownTimeout = 10000 (org.apache.zookeeper.server.RequestThrottler)
    [2023-02-13 00:05:55,429] INFO Using checkIntervalMs=60000 maxPerMinute=10000 maxNeverUsedIntervalMs=0 (org.apache.zookeeper.server.ContainerManager)
    [2023-02-13 00:05:55,429] INFO ZooKeeper audit is disabled. (org.apache.zookeeper.audit.ZKAuditProvider)
    [2023-02-13 00:05:58,988] INFO Creating new log file: log.1 (org.apache.zookeeper.server.persistence.FileTxnLog)
    

    kafka로그(정상시)

[2023-02-13 00:06:26,077] INFO [GroupMetadataManager brokerId=0] Finished loading offsets and group metadata from __consumer_offsets-43 in 5 milliseconds for epoch 0, of which 5 milliseconds was spent in the scheduler. (kafka.coordinator.group.GroupMetadataManager)
[2023-02-13 00:06:26,077] INFO [GroupMetadataManager brokerId=0] Finished loading offsets and group metadata from __consumer_offsets-13 in 5 milliseconds for epoch 0, of which 5 milliseconds was spent in the scheduler. (kafka.coordinator.group.GroupMetadataManager)
[2023-02-13 00:06:26,077] INFO [GroupMetadataManager brokerId=0] Finished loading offsets and group metadata from __consumer_offsets-28 in 5 milliseconds for epoch 0, of which 5 milliseconds was spent in the scheduler. (kafka.coordinator.group.GroupMetadataManager)
[2023-02-13 00:06:26,160] INFO [GroupCoordinator 0]: Dynamic member with unknown member id joins group connect-cluster in Empty state. Created a new member id connect-1-85dbcf7c-31d2-49ce-a260-7f058b437014 and request the member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator)
[2023-02-13 00:06:26,167] INFO [GroupCoordinator 0]: Preparing to rebalance group connect-cluster in state PreparingRebalance with old generation 0 (__consumer_offsets-13) (reason: Adding new member connect-1-85dbcf7c-31d2-49ce-a260-7f058b437014 with group instance id None; client reason: rebalance failed due to MemberIdRequiredException) (kafka.coordinator.group.GroupCoordinator)
[2023-02-13 00:06:26,169] INFO [GroupCoordinator 0]: Stabilized group connect-cluster generation 1 (__consumer_offsets-13) with 1 members (kafka.coordinator.group.GroupCoordinator)
[2023-02-13 00:06:26,180] INFO [GroupCoordinator 0]: Assignment received from leader connect-1-85dbcf7c-31d2-49ce-a260-7f058b437014 for group connect-cluster for generation 1. The group has 1 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)connect로그

connect 로그(정상시)

[2023-02-13 00:06:26,171] INFO [Worker clientId=connect-1, groupId=connect-cluster] Successfully joined group with generation Generation{generationId=1, memberId='connect-1-85dbcf7c-31d2-49ce-a260-7f058b437014', protocol='sessioned'} (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:627)
[2023-02-13 00:06:26,201] INFO [Worker clientId=connect-1, groupId=connect-cluster] Successfully synced group in generation Generation{generationId=1, memberId='connect-1-85dbcf7c-31d2-49ce-a260-7f058b437014', protocol='sessioned'} (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:802)
[2023-02-13 00:06:26,202] INFO [Worker clientId=connect-1, groupId=connect-cluster] Joined group at generation 1 with protocol version 2 and got assignment: Assignment{error=0, leader='connect-1-85dbcf7c-31d2-49ce-a260-7f058b437014', leaderUrl='http://192.168.50.95:8083/', offset=-1, connectorIds=[], taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 (org.apache.kafka.connect.runtime.distributed.DistributedHerder:2256)
[2023-02-13 00:06:26,202] INFO [Worker clientId=connect-1, groupId=connect-cluster] Starting connectors and tasks using config offset -1 (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1659)
[2023-02-13 00:06:26,202] INFO [Worker clientId=connect-1, groupId=connect-cluster] Finished starting connectors and tasks (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1687)
[2023-02-13 00:06:26,212] INFO [Producer clientId=producer-3] Resetting the last seen epoch of partition connect-configs-0 to 0 since the associated topicId changed from null to LDyOD2srSuiEjugXOIZRFg (org.apache.kafka.clients.Metadata:402)
[2023-02-13 00:06:26,234] INFO [Worker clientId=connect-1, groupId=connect-cluster] Session key updated (org.apache.kafka.connect.runtime.distributed.DistributedHerder:2124)

 

spooldir 등록 후(오류시) connect 로그 오류 발생

[2023-02-13 00:10:03,062] ERROR Unexpected exception in Thread[#80,KafkaBasedLog Work Thread - connect-configs,5,main] (org.apache.kafka.connect.util.KafkaBasedLog:525)
java.lang.IllegalStateException: There are no in-flight requests for node 0
	at org.apache.kafka.clients.InFlightRequests.requestQueue(InFlightRequests.java:62)
	at org.apache.kafka.clients.InFlightRequests.completeNext(InFlightRequests.java:70)
	at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:872)
	at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:569)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:280)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:321)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:572)
	at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1276)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1240)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1220)
	at org.apache.kafka.connect.util.KafkaBasedLog.poll(KafkaBasedLog.java:379)
	at org.apache.kafka.connect.util.KafkaBasedLog.readToLogEnd(KafkaBasedLog.java:415)
	at org.apache.kafka.connect.util.KafkaBasedLog.access$400(KafkaBasedLog.java:80)
	at org.apache.kafka.connect.util.KafkaBasedLog$WorkThread.run(KafkaBasedLog.java:491)
[2023-02-13 00:10:33,063] ERROR Failed to write connector configuration to Kafka:  (org.apache.kafka.connect.storage.KafkaConfigBackingStore:469)
java.util.concurrent.TimeoutException: Timed out waiting for future
	at org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:106)
	at org.apache.kafka.connect.storage.KafkaConfigBackingStore.putConnectorConfig(KafkaConfigBackingStore.java:467)
	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$null$10(DistributedHerder.java:1057)
	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.writeToConfigTopicAsLeader(DistributedHerder.java:1472)
	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$null$11(DistributedHerder.java:1057)
	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.runRequest(DistributedHerder.java:2025)
	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:446)
	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:347)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:577)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1589)
[2023-02-13 00:10:33,090] ERROR Uncaught exception in REST call to /connectors (org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper:61)
org.apache.kafka.connect.errors.ConnectException: Error writing connector configuration to Kafka
	at org.apache.kafka.connect.storage.KafkaConfigBackingStore.putConnectorConfig(KafkaConfigBackingStore.java:470)
	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$null$10(DistributedHerder.java:1057)
	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.writeToConfigTopicAsLeader(DistributedHerder.java:1472)
	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$null$11(DistributedHerder.java:1057)
	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.runRequest(DistributedHerder.java:2025)
	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:446)
	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:347)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:577)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1589)
Caused by: java.util.concurrent.TimeoutException: Timed out waiting for future
	at org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:106)
	at org.apache.kafka.connect.storage.KafkaConfigBackingStore.putConnectorConfig(KafkaConfigBackingStore.java:467)
	... 11 more

post맨으로 응답 받은 오류

{
    "error_code": 500,
    "message": "Error writing connector configuration to Kafka"
}

 

kafka는 초보지만 로그가 자세히 나오지 않아서 답답하네요.

염치 불구하고 도움을 받을 수 있을까요?

 

아캄님의 프로필 이미지
아캄
질문자

listeners=HTTP://localhost:8083

이걸 바꿔봐도 동일하네요.

권 철민님의 프로필 이미지
권 철민
지식공유자

M1에서 여러가지 설정을 많이 시도해 보셨군요.

오류 메시지만 보면, connector 문제인지 kafka broker 문제인지 알수 없습니다만, 제 지금 추축은 kafka broker가 일단 문제 인것 같습니다.

오류 내용을 보면, 아래와 같은 부분에서 오류가 출발하는 것 같습니다.

[2023-02-13 00:10:33,090] ERROR Uncaught exception in REST call to /connectors (org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper:61) org.apache.kafka.connect.errors.ConnectException: Error writing connector configuration to Kafka

connect는 최초 기동시 기본적인 환경 구성을 kafka topic에 기록합니다. 즉 connect-offsets, connect-configs, connect-status 등의 kafka topic을 생성하는 데 이게 생성시 문제가 발생하는 것 같습니다.

근데, 올려주신 글을 보면, connect가 기동이 되는데 spool dir connector만 등록하면 오류가 발생한다고 되어 있는데, 이게 앞에 말씀드린 추측과 어긋나는 부분도 있습니다. 원래는 최초 기동할 때 무조건 connect 기본 topic을 만들게 되어 있는데, 최초는 정상적으로 기동되고 spool dir을 등록하면 안된다고 해서 좀 헷갈리는 부분이 있습니다.

하나씩 가능한 원인을 제거해보는게 필요할 것 같습니다. 먼저 kafka broker가 제대로 기능을 하는지 확인해 보십시요.

이전 답변에서도 말씀 드렸듯이 kafka-console-producer kafka-console-consumer 등을 이용해서 kafka broker로 메시지를 보내보고, 정상적으로 메시지가 받아지는지 먼저 확인 부탁드립니다. 혹 위 명령어를 사용하실지 모른다면 아래와 같이 수행해보십시요.

  1. 먼저 test용 kafka topic(이름은 test-topic)을 생성합니다.

    kafka-topics --bootstrap-server localhost:9092 --create --topic test-topic

  2. 1에서 생성한 test-topic에 메시지를 보냅니다. (문자열을 치고 엔터를 치면 메시지가 날라 갑니다.)

    kafka-console-producer --bootstrap-server localhost:9092 --topic test-topic

  3. 다른 console 창을 열고 kafka-console-consumer로 메시지를 수신합니다.

    kafka-console-consumer --bootstrap-server localhost:9092 --topic test-topic --from-beginning

위 명령어들 수행해 보시고, kafka broker에서 메시지가 어떻게 나오는지, 그리고 kafka-console-consumer에서 명령어가 제대로 수신되는지 확인 부탁드립니다.

권 철민님의 프로필 이미지
권 철민
지식공유자

추가적으로,

베타 버전이긴 하지만 Virtual box 7.0에서 M1을 지원하고 있습니다.

개발자 preview 버전이지만, 한번 설치해 보시는 것도 좋을 것 같습니다.

https://www.virtualbox.org/wiki/Downloads 에서 developer preview mac을 선택하시면 됩니다.

아캄님의 프로필 이미지
아캄
질문자

  1. 이미 m1 칩용 Virtual box 를 사용했습니다. 다른 것은 설치 조차가 안됩니다.^^;

  2. 지금 노트북의 m1 producer와 consumer는 첫메세지만 consumer까지 나오고, 두번째는 동일한 오류가 발생하더군요. 여러버전 어려 환경을 구성하다가 캐쉬파일 또는 다른 더미정보를 참조하는 것 같습니다.

  3. 결론적으로 집에 있는 다른 맥미니 m1칩에서 vm 없이 해보니, 말씀하신 producer와 consumer는 정상 됩니다.

사용자의 문제인 듯 합니다. ㅠ

답변 감사합니다.

다만, 현재의 노트북 환경을 완전 kafka 관련 부분을 완전 초기화 하는 방법이 있을까요?
data 폴더도 여러번의 confluent도 재설치도 했고..

tmp도 의심가는 것은 삭제를 했었습니다.

터미널창에서의 chracter 문자가 문제인지...

앞서 설치된 아나콘다 영향으로 shell이 문제인지...

첫단어만 전다달이 성공하고 에러가 발생하는 것을 보면 뭔가 문자열이 깨진건지...

결국 포맷을 해야하는지...

상세 에러없이 아래 처럼 오류만 발생하니 답답하네요.ㅎ

ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$)

org.apache.kafka.common.protocol.types.SchemaException: Buffer underflow while parsing response for request with header RequestHeader(apiKey=FETCH, apiVersion=13, clientId=console-consumer, correlationId=80)

at org.apache.kafka.clients.NetworkClient.parseResponse(NetworkClient.java:731)

at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:874)

at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:569)

at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)

at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)

at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1297)

at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238)

at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)

at kafka.tools.ConsoleConsumer$ConsumerWrapper.receive(ConsoleConsumer.scala:456)

at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:101)

at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:75)

at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:52)

at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)

Caused by: java.nio.BufferUnderflowException

at java.base/java.nio.Buffer.nextGetIndex(Buffer.java:651)

at java.base/java.nio.HeapByteBuffer.getInt(HeapByteBuffer.java:402)

at org.apache.kafka.common.protocol.ByteBufferAccessor.readInt(ByteBufferAccessor.java:43)

at org.apache.kafka.common.message.ResponseHeaderData.read(ResponseHeaderData.java:90)

at org.apache.kafka.common.message.ResponseHeaderData.<init>(ResponseHeaderData.java:66)

at org.apache.kafka.common.requests.ResponseHeader.parse(ResponseHeader.java:71)

at org.apache.kafka.common.requests.AbstractResponse.parseResponse(AbstractResponse.java:100)

at org.apache.kafka.clients.NetworkClient.parseResponse(NetworkClient.java:729)

... 12 more

[2023-02-14 12:16:33,205] ERROR [Consumer clientId=console-consumer, groupId=console-consumer-60057] Heartbeat thread failed due to unexpected error (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)

java.lang.IllegalStateException: There are no in-flight requests for node 0

at org.apache.kafka.clients.InFlightRequests.requestQueue(InFlightRequests.java:62)

at org.apache.kafka.clients.InFlightRequests.completeNext(InFlightRequests.java:70)

at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:872)

at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:569)

at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)

at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:306)

at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1386)

Processed a total of 1 messages

 

그동안 답변 감사합니다.^^

강사님 목소리가 인강으로 너무나 좋네요.ㅎ

권 철민님의 프로필 이미지
권 철민
지식공유자

ㅎㅎ 목소리 칭찬 감사합니다.

첫단어만 성공하고 이후에 안되는 오류는 저도 처음이군요.

제 생각엔 kafka가 제대로 동작하지 않는 것 같습니다.

여러버전을 설치하셔도 kafka는 binary를 그대로 사용하기 때문에 설치한 디렉토리를 다 날리시면 됩니다.

설치한 kafka binary 디렉토리를 모두 날리시고, 예를 들어 강의에서는 /home/min/confluent 디렉토리에 kafka를 설치했는데 /home/min/confluent 디렉토리를 다 날려 주십시요.

그리고 .bashrc 에 export CONFLUENT_HOME 설정도 다 삭제해 주십시요(맥의 .bashrc 에 해당하는 부분에서 삭제해 주십시요)

그리고 data 디렉토리도 모두 삭제해 주십시요(강의에서는 /home/min/data로 되어 있는 부분)

이렇게 하면 kafka 관련 부분은 모두 삭제가 되었습니다. 혹 /tmp 밑에 zookeeper나 kafka-logs가 있으면 이것도 삭제해 주십시요.

이렇게 하시고, 강의대로 kafka를 한번 더 설치해 보시지요.

 

아캄님의 프로필 이미지
아캄
질문자

원인을 찾았습니다.

일단 server.propertises 를 변경하지 않은 상태에서 지속적으로 테스트를 해보니,

회사PC는 hostname이 다른 것으로 되어 있더군요.

결론은 아래와 같이 그냥 동일하게 이름을 등록하고,

sudo scutil --set ComputerName "newname"
sudo scutil --set LocalHostName "newname"
sudo scutil --set HostName "newname"

sudo vi /etc/hosts #파일에 새롭게 등록된 이름을 등록해주면 되네요.

127.0.0.1 newname 

이젠 정상적으로 producer와 consumer가 메세지가 주고 받네요.

보아하니, 처음에는 메세지를 받을땐 producer 가 localhost로 보내주는데,

응답받은 consumer가 다른 HostName으로 받았다는 응답을 보내다가 못 찾아서 내는 오류로 보입니다.

여튼 이젠 진도 빼야겠습니다.

감사합니다.

권 철민님의 프로필 이미지
권 철민
지식공유자

오, 해결이 되었다니 다행입니다. 즐강하십시요 ^^

아캄님의 프로필 이미지
아캄

작성한 질문수

질문하기