인프런 영문 브랜드 로고
인프런 영문 브랜드 로고

인프런 커뮤니티 질문&답변

emlookstudy님의 프로필 이미지

작성한 질문수

카프카 완벽 가이드 - 커넥트(Connect) 편

스프링 연결시 오류

해결된 질문

작성

·

794

·

수정됨

0

안녕하세요 선생님

 

가상머신 우분투에 올려서 터미널로 프로듀싱한 레코드를

스프링에서 컨슈밍해서 보려고하는데

 

터미널 컨슈머에서는 정상으로 레코드를 받아오는데

스프링에서는 브로커에 닿지 않는 것 같은데 혹시 서버 프로퍼티 문제일까요?

 

어떻게 하면 브로커에 연결이 될까요 ?

 

 

 

 

package com.example;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.time.Duration;
import java.util.*;

public class SimpleConsumer {
    private final static Logger logger = LoggerFactory.getLogger(SimpleConsumer.class);
    private final static String TOPIC_NAME = "test";
    private final static String BOOTSTRAP_SERVERS = "localhost:9092";
    private final static String GROUP_ID = "test-group";

    public static void main(String[] args) {
        Properties configs = new Properties();
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);

        consumer.subscribe(Arrays.asList(TOPIC_NAME));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String, String> record : records) {
                logger.info("record:{}", record);

                String message = record.value();
                System.out.println(message);



            }
        }
    }

 

구동시 에러

[main] INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values: 
	allow.auto.create.topics = true
	auto.commit.interval.ms = 5000
	auto.offset.reset = latest
	bootstrap.servers = [localhost:9092]
	check.crcs = true
	client.dns.lookup = default
	client.id = 
	client.rack = 
	connections.max.idle.ms = 540000
	default.api.timeout.ms = 60000
	enable.auto.commit = true
	exclude.internal.topics = true
	fetch.max.bytes = 52428800
	fetch.max.wait.ms = 500
	fetch.min.bytes = 1
	group.id = test-group
	group.instance.id = null
	heartbeat.interval.ms = 3000
	interceptor.classes = []
	internal.leave.group.on.close = true
	isolation.level = read_uncommitted
	key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
	max.partition.fetch.bytes = 1048576
	max.poll.interval.ms = 300000
	max.poll.records = 500
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
	receive.buffer.bytes = 65536
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	security.providers = null
	send.buffer.bytes = 131072
	session.timeout.ms = 10000
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2]
	ssl.endpoint.identification.algorithm = https
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLSv1.2
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 2.5.0
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 66563e712b0b9f84
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1679530659001
[main] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-test-group-1, groupId=test-group] Subscribed to topic(s): test
[main] WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-group-1, groupId=test-group] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
[main] WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-group-1, groupId=test-group] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected
[main] WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-group-1, groupId=test-group] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
[main] WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-group-1, groupId=test-group] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected
[main] WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-group-1, groupId=test-group] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
[main] WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-group-1, groupId=test-group] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected
[main] WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-group-1, groupId=test-group] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
[main] WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-group-1, groupId=test-group] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected
[main] WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-group-1, groupId=test-group] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
[main] WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-group-1, groupId=test-group] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected

 

 

server properties

 

 

 -----------------------------------------------------

++ 로컬 터미널에서는 스프링에 정상적으로 연동이 되는데

가상머신에서 띄운 터미널은

로컬터미널과 같은 토픽인데도 컨슈밍을 하지 않습니다

(스프링에도 가상머신에 띄운 프로듀서 레코드 전송은 안되는데 로컬 터미널 프로듀서에서 보낸 것은 받음 )

 

로컬

 

가상머신

위와 같이 동시에 각각 터미널에 같은 토픽, 부트스트랩서버로 보냈는데 각각 터미널에서만 통신이 되는 것 같습니다

 

 

public class SimpleConsumer {
    private final static Logger logger = LoggerFactory.getLogger(SimpleConsumer.class);
    private final static String TOPIC_NAME = "study";
    private final static String BOOTSTRAP_SERVERS = "localhost:9092";
    private final static String GROUP_ID = "test-group";

    public static void main(String[] args) {
        Properties configs = new Properties();
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);

        consumer.subscribe(Arrays.asList(TOPIC_NAME));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String, String> record : records) {
                logger.info("record:{}", record);

                String message = record.value();
                System.out.println("message : " + message);



            }
        }
    }
}
[main] INFO com.example.SimpleConsumer - record:ConsumerRecord(topic = study, partition = 0, leaderEpoch = 0, offset = 3, CreateTime = 1679534086551, serialized key size = 1, serialized value size = 3, headers = RecordHeaders(headers = [], isReadOnly = false), key = 4, value = aaa)
message : aaa
[main] INFO com.example.SimpleConsumer - record:ConsumerRecord(topic = study, partition = 0, leaderEpoch = 0, offset = 4, CreateTime = 1679534345840, serialized key size = 1, serialized value size = 3, headers = RecordHeaders(headers = [], isReadOnly = false), key = 5, value = 555)
message : 555

위에 터미널 프로듀서로 보낸 값만 넘어옵니다

 

이상입니다

감사합니다.

답변 1

0

권 철민님의 프로필 이미지
권 철민
지식공유자

안녕하십니까,

올려주신 오류 내용이 많아서 제가 헷갈리는 군요.

첫번째 올려 주신 오류 내용으로만 봐서는 스프링에서 Broker를 못찾는 것 같습니다만.

지금 Kafka Broker도 실습 VM에 떠 있고, 스프링도 실습 VM에 떠 있는데 브로커 주소를 localhost:9092 로 했는데 못찾는 오류가 발생한다는 건가요?

만약에 스프링이 local pc에 기동되어 있다면 브로커 주소를 localhost:9092로 하시면 실습 VM은 Guest 로컬 PC에서 localhost로 접속할 수 없기 때문에 당연히 오류가 발생합니다.

강의에서는 실습 VM을 고정 ip로 설정해서 실습 환경을 구축하도록 되어 있습니다. 192.168.56.101 일 겁니다. 만약 강의대로 실습 VM을 192.168.56.101로 고정 IP 설정하셨으면 스프링에서 브로커 주소를 192.168.56.101:9092 로 변경해서 다시 수행해 보십시요.

감사합니다.