인프런 영문 브랜드 로고
인프런 영문 브랜드 로고

인프런 커뮤니티 질문&답변

김동선님의 프로필 이미지

작성한 질문수

카프카 완벽 가이드 - 코어편

Log Compaction의 이해 - 01

KafkaTimeoutError:

작성

·

49

0

안녕하세요 개발자님

KafkaTimeoutError: Failed to update metadata after 60.0 secs. 2025-03-21 21:28:43.328 | ERROR | kafka_communication:send_message:52 - KafkaTimeoutError: Failed to update metadata after 60.0 secs.

파이썬으로 작성된 코드를 받게 되어 개발을 진행 중인데 window 환경에서 ai프로그램을 돌려 린눅스 서버에 cnotainer 컨프런트 kafka 로 결과 값을 전송 하게 되어씁니다.

window 에서 ai프로그램이 결과 값을 리눅스 docker contianer로 전송 하는데 오래 걸리지만, kafka 로 전송이 되지만.,

window에 있던 ai 프고르램을 다른 리눅스 서버 2 번째 환경에서 실행시키고, 첫번째 kafka가 동작 중인 kafka에 결과 값을 전송 하면

KafkaTimeoutError: Failed to update metadata after 60.0 secs.

2025-03-21 21:28:43.328 | ERROR | kafka_communication:send_message:52 - KafkaTimeoutError: Failed to update metadata after 60.0 secs.

error 가 발생 합니다.

window에서는 안발생 했는데, 말이죠.,

ai 프로그램을 강제 종료 하고 error를 보면

^CTraceback (most recent call last):

File "/home/gpu-1/projects/argos/source/ai/ai-main/message_event_handler.py", line 174, in <module>

message_event_handler.handle_messages()

File "/home/gpu-1/projects/argos/source/ai/ai-main/message_event_handler.py", line 80, in handle_messages

msg = self.consumer.receive_message()

^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

File "/home/gpu-1/projects/argos/source/ai/ai-main/kafka_communication.py", line 83, in receive_message

message = next(self.consumer).value

^^^^^^^^^^^^^^^^^^^

File "/opt/miniconda3/envs/argos-ai-volov8/lib/python3.11/site-packages/kafka/consumer/group.py", line 1168, in next

return next(self._iterator)

^^^^^^^^^^^^^^^^^^^^

File "/opt/miniconda3/envs/argos-ai-volov8/lib/python3.11/site-packages/kafka/consumer/group.py", line 1140, in messagegenerator_v2

record_map = self.poll(timeout_ms=timeout_ms, update_offsets=False)

^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

File "/opt/miniconda3/envs/argos-ai-volov8/lib/python3.11/site-packages/kafka/consumer/group.py", line 679, in poll

records = self._poll_once(inner_timeout_ms(), max_records, update_offsets=update_offsets)

^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

File "/opt/miniconda3/envs/argos-ai-volov8/lib/python3.11/site-packages/kafka/consumer/group.py", line 722, in pollonce

self._client.poll(timeout_ms=inner_timeout_ms(self._coordinator.time_to_next_poll() * 1000))

File "/opt/miniconda3/envs/argos-ai-volov8/lib/python3.11/site-packages/kafka/client_async.py", line 683, in poll

self._poll(timeout / 1000)

File "/opt/miniconda3/envs/argos-ai-volov8/lib/python3.11/site-packages/kafka/client_async.py", line 742, in _poll

if conn.send_pending_requests_v2():

^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

File "/opt/miniconda3/envs/argos-ai-volov8/lib/python3.11/site-packages/kafka/conn.py", line 1039, in send_pending_requests_v2

total_bytes = self._send_bytes(self._send_buffer)

^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

File "/opt/miniconda3/envs/argos-ai-volov8/lib/python3.11/site-packages/kafka/conn.py", line 683, in sendbytes

sent_bytes = self._sock.send(data[total_sent:])

^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

이렇게 나오고 producer 설정은

class MessageProducer:

def init(self, broker):

self.broker = broker

# self.topic = topic

self.producer = self.init()

def init(self):

while True:

try:

producer = KafkaProducer(

bootstrap_servers=self.broker,

value_serializer=lambda x: json.dumps(x).encode('utf-8'),

acks=0,

request_timeout_ms=1200000,

retry_backoff_ms=3000,

batch_size=16384, #

metadata_max_age_ms=3000000,

api_version=(2, 5, 0),

retries=3

)

break

except Exception as e:

print(e)

logger.error(e)

logger.error("Retry connecting to broker...")

sleep(2)

logger.info("Connected to kafka broker !")

return producer

`

def send_message(self, topic, msg):

try:

print(f'topic: {topic}')

future = self.producer.send(topic, value=msg)

self.producer.flush()

future.get(timeout=None)

return {'status_code': 200, 'error': None}

except Exception as e:

print(e)

logger.error(e)

return e

입니다.

window에서 리눅스로 옴겼더니 왜 그런걸까요?
설정 부분 추가 및 변경 해야 하는 부분이 어디 일까요?

client에서 설정을 수정 해야 하나여ㅛ, kafka server.properties 에서 어디를 수정 해야 하나요?

producer로 kafak로 부토ㅓ matadata를 기다려야 하나요?

답변 3

0

김동선님의 프로필 이미지
김동선
질문자

넵 말씀 감사합니다. 말씀 주신 부분은 이전에 진행 및 test 를 해 았습니다. 확인해 보니 전달 주신 부서에서 사용 라이브러리 문제 였더군요.

답변 주셔서 감사합니다.

0

권 철민님의 프로필 이미지
권 철민
지식공유자

안녕하십니까,

window 에서 ai프로그램이 결과 값을 리눅스 docker contianer로 전송 하는데 오래 걸리지만, kafka 로 전송이 되지만.,window에 있던 ai 프고르램을 다른 리눅스 서버 2 번째 환경에서 실행시키고, 첫번째 kafka가 동작 중인 kafka에 결과 값을 전송 하면

위 이야기가 어떤건지 자세히는 잘 모르겠지만, 제 생각엔 ai 프로그램으로 kafka 테스트를 하지 마시고, ai 프로그램이 올라가 있는 서버에서 kafka로 메시지를 보내는 간단한 producer 프로그램을 만드셔서 잘 되는지 부터 테스트 해보시면 좋을 것 같습니다.

이게 잘 안되면 네트웍이나 방화벽 문제 일 수 있습니다. 잘 되면 ai 프로그램에서 config라든가 뭔가 잘못 설정한 부분이 있을 겁니다. 이런 방식으로 먼저 테스트 해보시면 좋을 것 같습니다.

감사합니다.

0

인프런 AI 인턴님의 프로필 이미지

안녕하세요, 인프런 AI 인턴입니다.

김동선님께서 겪고 계신 KafkaTimeoutError 문제는 Kafka 클라이언트가 브로커와 적절히 통신하지 못할 때 발생할 수 있습니다. 이는 주로 네트워크 설정 문제나 브로커의 설정 차이에서 기인할 수 있습니다. 아래에 해당 문제에 대한 해결 방법을 제안드립니다:

1. 네트워크 연결 확인

  • 클라이언트와 서버 간의 네트워크 연결이 제대로 되어 있는지 확인합니다. 방화벽이나 네트워크 정책 때문에 연결이 제한될 수 있습니다.
  • 해당 서버에서 Kafka 브로커에 지정된 포트(보통 9092)가 열려 있는지 확인하시기 바랍니다.

2. 브로커 설정 검토

  • Kafka broker가 server.properties 파일에서 올바르게 설정되어 있는지 확인합니다. 특히, advertised.listenerslisteners 설정을 확인하세요. 클라이언트가 브로커와 통신할 수 있도록 적절하게 구성되어야 합니다.

3. 클라이언트 설정 갱신

  • bootstrap_servers 설정이 올바르게 설정되었는지 확인하십시오. IP 주소 및 포트가 정확히 지정되어야 합니다.
  • KafkaProducer의 request_timeout_ms, metadata_max_age_ms 등의 설정 값을 조정해보는 것도 좋습니다. 특히, request_timeout_ms는 카프카 브로커에 메타데이터를 요청하는 주기를 조절하는 데 영향을 미칩니다.

4. Docker 설정 및 호스트 파일 체크

  • Docker 환경에서 실행 중이라면, 호스트 파일 설정이나 컨테이너 내부의 네트워크 설정을 확인하십시오. 데이터 전송 시 호스트 네임이 올바르게 해석되지 않을 때 문제가 발생할 수 있습니다.

관련 도움말

김동선님이 말씀하신 설정에서 네트워크적 요인 및 브로커 설정이 가장 주요한 부분일 수 있습니다. 이 방법들이 문제가 해결되는 데 도움이 되기를 바랍니다.

저는 질문자님의 이해를 돕고자 추가적인 설명을 제공했습니다. 계속해서 문제가 지속된다면, 카프카의 로그파일을 더 자세히 살펴보거나, 네트워크 분석 도구를 사용하여 클라이언트와 브로커 간의 패킷 전송을 모니터링하는 것도 좋은 방법입니다.

현재 베타 기능으로 답변이 만족스럽지 않을 수 있는 점 양해 부탁드립니다. 🙏 추가적으로 궁금한 점이 있으시면, 이어서 질문해 주세요. 곧 지식공유자께서 답변해 주실 것입니다.