해결된 질문
작성
·
794
·
수정됨
0
안녕하세요 선생님
가상머신 우분투에 올려서 터미널로 프로듀싱한 레코드를
스프링에서 컨슈밍해서 보려고하는데
터미널 컨슈머에서는 정상으로 레코드를 받아오는데
스프링에서는 브로커에 닿지 않는 것 같은데 혹시 서버 프로퍼티 문제일까요?
어떻게 하면 브로커에 연결이 될까요 ?
package com.example;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.time.Duration;
import java.util.*;
public class SimpleConsumer {
private final static Logger logger = LoggerFactory.getLogger(SimpleConsumer.class);
private final static String TOPIC_NAME = "test";
private final static String BOOTSTRAP_SERVERS = "localhost:9092";
private final static String GROUP_ID = "test-group";
public static void main(String[] args) {
Properties configs = new Properties();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
logger.info("record:{}", record);
String message = record.value();
System.out.println(message);
}
}
}
구동시 에러
[main] INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.dns.lookup = default
client.id =
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = test-group
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.2
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 2.5.0
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 66563e712b0b9f84
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1679530659001
[main] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-test-group-1, groupId=test-group] Subscribed to topic(s): test
[main] WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-group-1, groupId=test-group] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
[main] WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-group-1, groupId=test-group] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected
[main] WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-group-1, groupId=test-group] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
[main] WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-group-1, groupId=test-group] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected
[main] WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-group-1, groupId=test-group] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
[main] WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-group-1, groupId=test-group] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected
[main] WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-group-1, groupId=test-group] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
[main] WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-group-1, groupId=test-group] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected
[main] WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-group-1, groupId=test-group] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
[main] WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-group-1, groupId=test-group] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected
server properties
-----------------------------------------------------
++ 로컬 터미널에서는 스프링에 정상적으로 연동이 되는데
가상머신에서 띄운 터미널은
로컬터미널과 같은 토픽인데도 컨슈밍을 하지 않습니다
(스프링에도 가상머신에 띄운 프로듀서 레코드 전송은 안되는데 로컬 터미널 프로듀서에서 보낸 것은 받음 )
로컬
가상머신
위와 같이 동시에 각각 터미널에 같은 토픽, 부트스트랩서버로 보냈는데 각각 터미널에서만 통신이 되는 것 같습니다
public class SimpleConsumer {
private final static Logger logger = LoggerFactory.getLogger(SimpleConsumer.class);
private final static String TOPIC_NAME = "study";
private final static String BOOTSTRAP_SERVERS = "localhost:9092";
private final static String GROUP_ID = "test-group";
public static void main(String[] args) {
Properties configs = new Properties();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
logger.info("record:{}", record);
String message = record.value();
System.out.println("message : " + message);
}
}
}
}
[main] INFO com.example.SimpleConsumer - record:ConsumerRecord(topic = study, partition = 0, leaderEpoch = 0, offset = 3, CreateTime = 1679534086551, serialized key size = 1, serialized value size = 3, headers = RecordHeaders(headers = [], isReadOnly = false), key = 4, value = aaa)
message : aaa
[main] INFO com.example.SimpleConsumer - record:ConsumerRecord(topic = study, partition = 0, leaderEpoch = 0, offset = 4, CreateTime = 1679534345840, serialized key size = 1, serialized value size = 3, headers = RecordHeaders(headers = [], isReadOnly = false), key = 5, value = 555)
message : 555
위에 터미널 프로듀서로 보낸 값만 넘어옵니다
이상입니다
감사합니다.
답변 1
0
안녕하십니까,
올려주신 오류 내용이 많아서 제가 헷갈리는 군요.
첫번째 올려 주신 오류 내용으로만 봐서는 스프링에서 Broker를 못찾는 것 같습니다만.
지금 Kafka Broker도 실습 VM에 떠 있고, 스프링도 실습 VM에 떠 있는데 브로커 주소를 localhost:9092 로 했는데 못찾는 오류가 발생한다는 건가요?
만약에 스프링이 local pc에 기동되어 있다면 브로커 주소를 localhost:9092로 하시면 실습 VM은 Guest 로컬 PC에서 localhost로 접속할 수 없기 때문에 당연히 오류가 발생합니다.
강의에서는 실습 VM을 고정 ip로 설정해서 실습 환경을 구축하도록 되어 있습니다. 192.168.56.101 일 겁니다. 만약 강의대로 실습 VM을 192.168.56.101로 고정 IP 설정하셨으면 스프링에서 브로커 주소를 192.168.56.101:9092 로 변경해서 다시 수행해 보십시요.
감사합니다.