인프런 커뮤니티 질문&답변

화이팅님의 프로필 이미지
화이팅

작성한 질문수

[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!

프로듀서, 컨슈머 애플리케이션, 그 외 몇가지 궁금한게 있습니다.

작성

·

352

·

수정됨

0

안녕하세요.

강사님의 강의를 듣고, 이제는 강사님 책을 보고 있습니다.

 

실제 상용에서 애플리케이션을 개발할 때 궁금한 점이 있습니다.

 

 

첫번째로 프로듀서 애플리케이션 입니다.

강사님의 책에서 봤듯이, 스프링을 사용합니다.

예를 들어 저는 사용자 서비스에서 디비 트랜잭션(스프링에서 @Transactionl)을 사용하여 사용자 디비에 저장을 성공 후 프로듀서를 사용하여 레코드를 브로커에 보내야 한다고 생각합니다.

밑에 코드를 간략하게 작성하였는데, 저렇게 되면 디비는 롤백됬지만 토픽에 레코드가 저장된 상태가 될 수 있다고 생각합니다.

혹시 실무에서 커밋이 성공 후에 프로듀서 레코드를 전송하는 방법을 어떻게 하는지 간략한 코드가 궁금합니다.

개인적인 생각은 UserService 클래스 상위 클래스로 카프카 프로듀서 처리하는 클래스를 만들어서 처리를 하는건지.. 궁금합니다.(이렇게 되면 카프카를 위한 래퍼 클래스가 항상 생기게 되는 불편함이 있는 것 같기도 하고요..



// 제가 생각하기에 잘못된 방법?
// 만약 레코드 전송 후 어떤 이유로 에러가 발생하면, 디비에 저장된 데이터를 롤백되지만 
// 프로듀서 레코드는 브로커 전송이 되버림
public class UserService {
  @Transactionl
  public void save(){
    ...
    프로듀서 레코드 전송 코드
    ... 예외 발생
  }
}

-----------------------


// 개인적인 생각
public class KafkaServce {
  public void save(){
    userService.save();
    프로듀서 레코드 전송 코드
  }
}


public class UserService {
  @Transactionl
  public void save(){
    ...
  }
}

 

 

두번째로 컨슈머 애플리케이션 개발 시 궁금한 점이 있습니다.

컨슈머에서 데이터 처리를하다가 어떤 이유로 에러가 발생하여 해당 레코드 처리를 계속 실패했다고 가정합니다.

그럴 경우 이 레코드의 대한 커밋 처리를 어떻게 해야할지가 궁금합니다.

커밋을 처리하지 않으면, 다음 레코드 처리를 하지 못하는거라 생각되는데.. 어떤 방법으로 풀어내는지가 궁금합니다.

 

세번째로 컨슈머 애플리케이션에서 데이터베이스의 데이터를 저장해야하는 상황이다.

스프링을 사용하는 경우 기본적으로 히카리 커넥션 풀에 커넥션 10개를 생성합니다.

만약 파티션이 10개여서, 컨슈머를 10개 실행해야 한다면, 스프링 커넥션 풀을 사용하면 100개의 커넥션이 연결됩니다.

50개면 500개의 풀 계속 증가할 듯 싶네요.

이 경우 어떻게 해야할까요?

컨슈머에서 레코드들을 for 문으로 돌리기 때문에 커넥션 풀을 1개를 사용해서 개발하는게 맞는건지?
아니면 스프링에서 제공하는 히카리 커넥션 풀을 사용하지 않고, 직접 커넥션 풀을 구현하든가, 그것도 아니면 커넥션 풀을 사용하지 않고 1개의 트랜잭션당 1개의 커넥션을 생성 후 해제를 해야할까요?
강사님의 생각이 궁금합니다.


마지막으로 궁금한게 있습니다.

혹시 카프카를 활용하여 MSA에서 보상 트랜잭션(사가 패턴 - Orchestration) 처리를 할 수 있는건지 궁금합니다.

답변 2

0

데브원영 DVWY님의 프로필 이미지
데브원영 DVWY
지식공유자

안녕하세요. 문의주신 내용 답변드리기 전에 우선, 인프런 강의는 스프링관련 내용은 포함되어 있지 않으므로 관련 내용에 대해 자세히 답변드리기 어려운점 참고부탁드립니다. 대신 일반적인 카프카 클라이언트 사용방식을 배경으로 답변드리도록 하겠습니다.

 

1.프로듀서 애플리케이션에서 데이터 저장과 레코드 전송을 원자성 단위로 묶는 방법

특정 데이터를 디비에 저장하고 이와 동시에 프로듀서 레코드를 전송하는 방식에 대해 문의하셨는데요. 이미 아시다시피 이종 데이터베이스 간에 원자성은 관련 기능을 제공하지 않는 경우에는 달성하기 어렵습니다. 예를 들어 mysql에 데이터를 저장함과 동시에 프로듀서로 레코드를 전송하는 것을 하나의 트랜잭션으로 묶는 것은 불가능합니다. 그렇기 때문에 특정시점에 장애가 발생할 경우 데이터베이스에는 데이터가 저장되고 프로듀서에는 전송이 안되거나 프로듀서에는 전송이 되었지만 데이터가 저장되지 않은 경우도 발생할 가능성은 있습니다.

그렇기 때문에 이런 상황을 해결하기 위해서는 두가지 해결 방안이 있을것 같습니다.

방법1) 유실이나 중복을 감안하고 개발 : 정확히 한 번 처리하지 않아도 될 때 사용하는 방법입니다. 장애가 발생했을 경우 유실/중복이 발생하더라도 비즈니스적으로 이슈가 없을 경우 적용할 수 있습니다.

방법2) 배치처리를 통해 데이터의 유실/중복을 감지하고 처리하는 로직 개발 : 또 다른 모듈을 사용/개발하여 유실이나 중복이 생기지 않았는지 확인하고 처리하는 방법입니다. 비용은 많이 들지만, 정확히 한 번 처리할 수 있다는 장점이 있습니다.

 

2.컨슈머 레코드 처리 실패시 커밋 처리 방법

컨슈머의 레코드 처리가 실패가 된다고 해서 계속해서 해당 레코드를 처리 시도를 하게 되면 해당 파티션의 데이터 처리가 늦어지게되어 컨슈머 랙이 상승할 수 있습니다. 이를 방지하기 위해 DLQ(Dead Letter Queue)와 같은 방어로직을 위한 토픽을 따로 만들어 운영하는 경우도 많습니다. 그래서 에러가 발생한 레코드는 DLQ 토픽으로 전송하되 다음 데이터를 처리하고 커밋을 이어나가는 방식이 일반적일것 같습니다.

 

3.데이터베이스 저장시 컨슈머의 커넥션 유지관리 방법

커넥션풀의 개수가 부담되는 상황이라면 말씀대로 직접 커넥션을 생성하고 해제하는 방식도 괜찮아보입니다. 다만, 성능상에 어떤 이슈가 있을 지는 테스트가 필요해보입니다. 그리고 내부 로직에 따라 1개 컨슈머가 10개 커넥션풀을 모두 사용할 수도 있으므로 판단은 해당 로직 개발자의 의도를 파악해야 할 것 같습니다.

 

4.카프카를 활용한 보상 트랜잭션 처리 방법

네 가능합니다. 2019년에 열린 런던 카프카 써밋에서 발표한 아래 발표자료와 영상을 보시면 좋을것 같습니다.

화이팅님의 프로필 이미지
화이팅
질문자

먼저 상세한 답변 감사합니다.

 

첫번째 부분만 좀 더 문의드리고 싶습니다.

 

방법 2의 배치 유실/중복을 감지한다고 하셨습니다.

만약 사용자 서비스와 포인트 서비스가 2개 있다고 가정하겠습니다.

사용자가 생성될 때 특정 토픽에 레코드를 넣어 포인트 서비스에서 컨슈머를 통해 포인트 데이터를 생성합니다.

둘의 관계는 1:1입니다.

현재 사용지 수는 백만명인데, 토픽에 레코드를 넣는 것은 성공 후 포인트 컨슈머에서 정상적으로 포인트 디비에 데이터를 저장했습니다.

여기서, 사용자 서비스에서는 토픽에 레코드를 정상적으로 넣었지만, 어떠한 이유로 예외가 발생하여 사용자 디비의 데이터는 롤백되었습니다.

 

그래서 현재 데이터는 사용자 1,000,000 포인트 1,000,001이 되어 둘의 차이기 1이 생겼습니다.

 

배치로 데이터 유실을 체크한다면 백만명의 모든 데이터를 확인한다는 것은 비효율적이라고 생각됩니다.

 

강사님이라면 배치 처리를 어떻게 하는게 좋은 방법인지 궁금합니다.

데브원영 DVWY님의 프로필 이미지
데브원영 DVWY
지식공유자

실시간 데이터를 배치로 다시 한번 유실을 체크하고 보정하는 작업은 유사한 일을 두번 작업을 수행하는 것이고, 데이터양에 따라서는 많은 컴퓨팅 리소스를 사용하는 일임에 틀림없습니다. 이런 과정이 말씀대로 말씀대로 "비효율적"이라고 볼 수도 있겠습니다.

하지만, 예시로 든 것 같은 복잡한 로직에서 데이터를 완전히 "정확히 한 번" 처리를 수행하기 위해서는 이러한 보정 과정이 필요합니다. 그렇기 때문에 빅데이터에서 실시간과 배치 처리가 동시에 존재하는 람다 아키텍처가 아직도 많은 기업에서 사용되고 있습니다.

람다 아키텍처 : https://en.wikipedia.org/wiki/Lambda_architecture

만약 제가 유사한 상황에서 정확히 한 번 처리를 반드시 달성해야만 한다면, 배치 처리를 함께 두고 운영할 것 입니다.

0

안녕하세요, 인프런 AI 인턴입니다.

화이팅님의 궁금증에 대해 답변드리겠습니다.

첫 번째로 프로듀서 애플리케이션의 경우, 스프링에서는 KafkaTransactionManager를 사용하여 데이터베이스 트랜잭션과 카프카 레코드 전송을 하나의 트랜잭션으로 묶을 수 있습니다. 먼저 spring.kafka.producer.transaction-id-prefix 프로퍼티를 설정하여 트랜잭션을 활성화시키고, @Transactional 어노테이션을 사용한 메소드 내에서 KafkaTemplate를 이용해 메시지를 전송합니다. 만약 트랜잭션 중 에러가 발생하면 전체 트랜잭션이 롤백되어 데이터베이스 뿐만 아니라 카프카 브로커로의 레코드 전송도 되지 않습니다. 예시 코드는 다음과 같습니다.

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.beans.factory.annotation.Autowired;

public class UserService {

  @Autowired
  private KafkaTemplate<String, String> kafkaTemplate;

  @Transactional
  public void save() {
    // 데이터베이스 저장 로직...
    kafkaTemplate.send("someTopic", "Hello");
    // 필요한 경우 다른 에러 핸들링을 수행...
  }
}

더 자세한 사항은 여기의 Spring Boot Reference Documentation을 참고하세요.

두 번째로, 컨슈머 애플리케이션은 @KafkaListener 어노테이션을 통해 메시지를 수신합니다. 처리 중 에러가 발생할 경우, CommonErrorHandler, AfterRollbackProcessor와 같은 스프링 제공 인터페이스를 구현하여 실패한 메시지에 대한 후처리 로직을 구현할 수 있습니다. 예를 들어, 실패한 메시지를 별도의 토픽으로 전송하거나, 로깅 후 다음 메시지로 건너뛰는 등의 전략을 적용할 수 있습니다. 참고로, CommonErrorHandler 인터페이스는 Spring for Apache Kafka version 2.7 이상부터 지원됩니다. 여기에서 더 자세한 정보를 확인할 수 있습니다.

세 번째 질문에 대한 답변입니다. 히카리 커넥션 풀의 크기를 조절하거나, 파티션에 맞게 커넥션 풀을 최적화하는 방법을 활용할 수 있습니다. 대규모 트래픽을 처리하는 애플리케이션의 경우, 커넥션 풀 크기를 조절하거나 다중 인스턴스로 올바르게 분산 처리를 해야합니다. 커넥션 풀 당 커넥션 수를 조정하거나, 여러 컨슈머 인스턴스에서 하나의 커넥션 풀을 공유하도록 설계할 수도 있습니다. 이 경우에는 커넥션 풀 설정에 대한 성능 테스트와 모니터링을 통해 최적의 설정 값을 찾아야 합니다.

마지막으로, MSA(Microservices Architecture)에서 카프카를 통해 사가 패턴(Saga Pattern)을 구현할 수 있습니다. 각 서비스에서 로컬 트랜잭션을 수행한 후 이벤트를 카프카에 발행하고, 다른 서비스에서 이를 구독하여 필요한 보상 트랜잭션을 수행하는 방식으로 구현할 수 있습니다. 사가 패턴은 분산된 마이크로서비스 환경에서 데이터 일관성을 유지하는 데 유용한 방법입니다.

추가적으로 궁금한 사항이나 더 자세한 설명을 원하시면 언제든지 질문해 주세요.

화이팅님의 프로필 이미지
화이팅
질문자

일단 첫번째의 경우 spring.kafka.producer.transaction-id-prefix=tx- 를 활성화할 경우 기존 @Transactional 을 사용하면 TransactionManager, KafkaTransactionManager 동일한 인터페이스로 스프링 컨테이너에 등록된 2개의 빈이 있어 런타임에서 예외가 발생합니다.

그래서 2개 중 1개의 트랜잭션 매니저를 설정해줘야 합니다.

 

스프링 블로그나 다른 블로그를 보면 해당 방법은 @KafkaListener 를 통해서만 JpaTranscationManagerKafkaTranscationManager 를 하나의 트랜잭션으로 사용하는 것 같습니다.

 

그래서 결국 동작하지 않았습니다.

 

그 다음으로, 이렇게 2개의 다른 플랫폼의 트랜잭션을 1개로 묶는 것은 결구 JTA 와 같은 것으로 예상됩니다. 그렇다면 성능상 이슈도 있을거라 생각합니다.

공식 문서도 중요하지만, 실무에서는 이런 부분을 어떻게 풀어내는지가 궁금합니다.

---
두번째는 도움이 될 것 같습니다.
---

세번째의 경우는 강사님의 강의를 보면 컨슈머의 경우 1개의 프로세스에서 1개의 컨슈머를 실행하는 것을 추천합니다.
과연 실무에서는 패처에서 쌓인 레코드들을 가져오고, 동일 쓰레드에서 순차적으로 1개씩 처리를 할 경우 전 커넥션 풀이 1개면 충분하다고 생각합니다.

아니면, 레코드들을 별도의 쓰레드에서 동시 실행시킬 경우 커넥션 풀은 더 많아야 한다고 생각합니다.

하지만, 오프셋 커밋의 동기화 문제로 순차 처리가 더 유용한게 아닌가 싶습니다.

이것도 실무에서의 답변이 궁금합니다.

---

네번째의 경우 말씀하신 내용은 오케스트레이션이 아닌, 코레오그래피 방식을 말씀하신게 아닌가 싶습니다.

 

화이팅님의 프로필 이미지
화이팅

작성한 질문수

질문하기