인프런 커뮤니티 질문&답변

emlookstudy님의 프로필 이미지
emlookstudy

작성한 질문수

[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!

서버에 카프카 컨슈머를 계속올려놓을경우 종료를 해야하나요?

작성

·

711

0

안녕하세요!

 

제가 카프카 컨슈머 애플리케이션을 개발해서

aws 서버에 올려놓고 계속 컨슈밍을 할 예정입니다.

강의에서 컨슈머의 안전한 종료 파트를 듣고 질문이 생겨 글을 남깁니다.

이렇게 서버에 계속 올려놓고 컨슈밍을 받아야할때 컨슈머의 종료가 필요한가요?

만약 필요하다면 종료된 컨슈머는 어떻게 재실행이 되는 건지 여쭤볼 수 있을까요?

 

일단 저는 스프링기반 카프카 컨슈머를 개발하였고

Runnable 인터페이스로 run() 메서드로 서버가 실행되면 바로 컨슈머가 실행되게 개발해놓은 상태입니다.

 

 

 

저의 코드입니다.

@PostConstruct
	public void startConsuming() {
		Thread consumerThread = new Thread(new Consumer());
		consumerThread.start();
	}

	
	private class Consumer implements Runnable {
	private final Logger logger = LoggerFactory.getLogger(Consumer.class);
	private final static String TOPIC_NAME = "";
	private final static String BOOTSTRAP_SERVERS = "";
	private final static String GROUP_ID = "";

	public void run() {
	  try {
		Properties configs = new Properties();
		configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
		configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
		configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
		configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
		// 구독 중인 주제 파티션에서 사용 가능한 가장 빠른 오프셋부터 읽기
	            configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
	            configs.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
	            configs.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "5000");
	            configs.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "300000");

		KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);

		consumer.subscribe(Arrays.asList(TOPIC_NAME));

		while (true) {
			ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
			for (ConsumerRecord<String, String> record : records) {
			logger.info("record: {}", record);
						
			// 받은 메세지
			String value = record.value();
						
						
						
						
			// commit the offset
	                consumer.commitSync(Collections.singletonMap(
	                new TopicPartition(record.topic(), record.partition()),
	                new OffsetAndMetadata(record.offset() + 1)));
						
	                    
					}
				}   
			} catch (Exception e) {
				logger.error("Error occurred while consuming messages", e);
			}
		}           
	}

답변 1

0

데브원영 DVWY님의 프로필 이미지
데브원영 DVWY
지식공유자

카프카 컨슈머 애플리케이션의 안전한 종료가 필요한 가장 대표적인 사례는 해당 애플리케이션을 배포할 때 일것 같습니다. 아무래도 배포를 위해서는 애플리케이션 종료가 필수적이기 때문인데요. 이 경우 애플리케이션을 안전하게 종료함으로써 리소스의 낭비를 막을 수 있습니다.

일반적으로 배포시에는 다음과 같은 순서로 진행됩니다.

  1. 애플리케이션의 안전한 종료(kill -term)

  2. 종료 확인

  3. 신규 애플리케이션 실행

emlookstudy님의 프로필 이미지
emlookstudy

작성한 질문수

질문하기