작성
·
711
0
안녕하세요!
제가 카프카 컨슈머 애플리케이션을 개발해서
aws 서버에 올려놓고 계속 컨슈밍을 할 예정입니다.
강의에서 컨슈머의 안전한 종료 파트를 듣고 질문이 생겨 글을 남깁니다.
이렇게 서버에 계속 올려놓고 컨슈밍을 받아야할때 컨슈머의 종료가 필요한가요?
만약 필요하다면 종료된 컨슈머는 어떻게 재실행이 되는 건지 여쭤볼 수 있을까요?
일단 저는 스프링기반 카프카 컨슈머를 개발하였고
Runnable 인터페이스로 run() 메서드로 서버가 실행되면 바로 컨슈머가 실행되게 개발해놓은 상태입니다.
저의 코드입니다.
@PostConstruct
public void startConsuming() {
Thread consumerThread = new Thread(new Consumer());
consumerThread.start();
}
private class Consumer implements Runnable {
private final Logger logger = LoggerFactory.getLogger(Consumer.class);
private final static String TOPIC_NAME = "";
private final static String BOOTSTRAP_SERVERS = "";
private final static String GROUP_ID = "";
public void run() {
try {
Properties configs = new Properties();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 구독 중인 주제 파티션에서 사용 가능한 가장 빠른 오프셋부터 읽기
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
configs.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
configs.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "5000");
configs.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "300000");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
logger.info("record: {}", record);
// 받은 메세지
String value = record.value();
// commit the offset
consumer.commitSync(Collections.singletonMap(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1)));
}
}
} catch (Exception e) {
logger.error("Error occurred while consuming messages", e);
}
}
}
답변 1
0
카프카 컨슈머 애플리케이션의 안전한 종료가 필요한 가장 대표적인 사례는 해당 애플리케이션을 배포할 때 일것 같습니다. 아무래도 배포를 위해서는 애플리케이션 종료가 필수적이기 때문인데요. 이 경우 애플리케이션을 안전하게 종료함으로써 리소스의 낭비를 막을 수 있습니다.
일반적으로 배포시에는 다음과 같은 순서로 진행됩니다.
애플리케이션의 안전한 종료(kill -term)
종료 확인
신규 애플리케이션 실행