묻고 답해요
141만명의 커뮤니티!! 함께 토론해봐요.
인프런 TOP Writers
-
미해결Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA)
Kafka 테스트.. 오류
D:\코딩\kafka_demo\kafka_2.13-3.7.0> .\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties The input line is too long. The syntax of the command is incorrect. D:\코딩\kafka_demo\kafka_2.13-3.7.0>입력 명령어가 너무 길다고 계속 오류가 나는데 버전을 낮춰야할까요..?
-
미해결[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!
직렬화, 역직렬화 관련
안녕하세요. 좋은 강의 감사합니다.직렬화, 역직렬화 관련해서 질문이 있습니다.저는 Spring 프레임워크를 사용해서 프로듀서와, 컨슈머를 각각 따로 서버를 만들어서 개발 하고 있습니다.수업 내용은 단순히 String이지만 제가 실무에 사용하려고하는건 웹 애플리케이션 과 같이 DTO 클래스로받은 데이터를 그대로 프로듀서에서 send()에 담아서 보내고 있습니다. 컨슈머에서 DTO로 받으려고관련 자료를 찾아 보니 JsonSerializer가 있어 해보았습니다. Object 로 받아서 여러 DTO를 받을 수 있는Consumer factory를 만들다 보니 여러 에러가 발생해서 ObjectMapper를 사용했습니다.@Component public class ObjectMapperService { private final ObjectMapper objectMapper = new ObjectMapper(); public <T> T convertValue(String json, Class<T> valueType) throws JsonProcessingException { return objectMapper.readValue(json, valueType); } }@Component @Slf4j @Data @RequiredArgsConstructor public class KafkaConsumer { private final ObjectMapperService mapper; @KafkaListener(topics = "member-join", containerFactory = "commonKafkaListenerContainerFactory") public void receiveMemberJoin(ConsumerRecord<String, String> consumerRecord, Acknowledgment acknowledgment) throws Exception { String value = consumerRecord.value(); MemberJoin memberJoin = mapper.convertValue(value, MemberJoin.class); String recommendCode = memberJoin.getRecommendCode(); log.info("recommendCode: {}", recommendCode); log.info("received payload = {}", memberJoin.toString()); acknowledgment.acknowledge(); } }이런식으로 사용해서 DTO클래스로 사용했는데 실무에서는 어떤식으로 하는지 궁금합니다. 혹시 참고할 수 있는 정보가 있을까요?감사합니다.
-
미해결카프카 완벽 가이드 - 코어편
같은 대역의 PC로 연결 후 Java code 질문
저번 질문이 많이 도움이 되었습니다. 그래서 같은 대역의 PC 2개를 가지고하나는 kafka 서버로 만들어서 ssh로 연결이 되었고 여러 명령들도 잘 실행되었습니다.이번엔 java code로 만들어본 simpleproducer를 이용하여 메세지를 보내보고싶은데IP 부분의 code를 어떻게 설정하면 좋을까요?kafka 서버의 PC의 ip는 210.110.32.125 이고 포트번호는 12345로 포트 포워딩을 통해 22번 변경했습니다 Properties props = new Properties(); //bootstrap.servers, key.serializer.class, value.serializer.class //props.setProperty("bootstrap.servers", "192.168.56.101:9092"); props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.101:9092");
-
미해결카프카 완벽 가이드 - 코어편
인텔리제이 Producer실행 관련
인텔리제이에서 메인문을 실행하였을 때,이러한 로그가 반복되며 무한히 나옵니다. gradle에서slf4j도 정상적으로 적용시켰습니다만 해결이 안되네요. 뭐가 문제일까요?
-
해결됨카프카 완벽 가이드 - 코어편
kafka 서버 구성한 기기 외 다른 기기에서 접속하고싶은데..
강의에서는 한 기기에서 모든게 이루어져서강의에서 진행되는 기기는 kafka 서버로 두고외부 기기 2개로 producer와 consumer 코드를 실행시켜보고 싶은데설정을 어떻게 해야할지 모르겠습니다.검색을해보면 advertised.listeners에 IP를 입력해야 하는거 같은데무슨 IP를 입력하면 좋을까요? 아니면 또 다른 설정이 있나요?kafka서버 PC 한대 와 다른 PC 2대인 상황이고 같은 와이파이를 공유하고 있는 상황입니다.producer 코드를 가진 PC를 실행시켜 kafka 서버 PC에 접속하여 consumer 코드를 가진 PC로 받고싶습니다.
-
미해결Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA)
MSA 환경에서 Prometheus 궁금한 점이 있습니다.
강의 잘보고 있습니다.다름이 아니라 MSA 환경에서는 한 서비스가 여러 인스턴스로 올라가있는 경우가 있는데, prometheus.yml에서 해당 인스턴스를 다 지정해주지 않아도 괜찮을까요? 또한 spring cloud gateway에서 lb://ORDER-SERVICE 형태로 로드밸런싱 처리가 되어있는데, 그렇다면 프로메테우스에서 actuator로 가져오는 데이터가 여러 인스턴스 각각 가져오는게 아닌 로드밸런싱되는데로 데이터를 가져오게 되지않나요? 호기심에 질문해봅니다.
-
미해결카프카 완벽 가이드 - 코어편
broker config vs topic config
broker와 A topic config가 각각 log.retention.ms = 1일,retention.ms=1일 인 상황에서broker config를 1주일로 바꿨습니다.A 토픽 메세지 보관 기간은 여전히 1일인가요 ? 브로커쪽 설정 수정이 global 하게 적용되는게 아닌건가요?감사합니다. - 학습 관련 질문을 남겨주세요. 상세히 작성하면 더 좋아요! - 먼저 유사한 질문이 있었는지 검색해보세요. - 서로 예의를 지키며 존중하는 문화를 만들어가요. - 잠깐! 인프런 서비스 운영 관련 문의는 1:1 문의하기를 이용해주세요.
-
미해결실습으로 배우는 선착순 이벤트 시스템
kafka Consumer
Kafka를 사용하는 주된 이유 중 하나는 확실히 데이터 처리의 유연성과 부하 분산에 있습니다. Kafka를 활용함으로써, 많은 양의 이벤트(예: 쿠폰 발행 요청)를 바로 처리하지 않고 큐에 보관했다가, 시스템의 부하가 적은 시기에 또는 자원이 더욱 충분할 때 일괄적으로 처리할 수 있게 됩니다.kafka 관련해서 다른 분 질문에 답글이 위처럼 달린 것을 확인했는데요.. 큐에 보관했다가 일괄적으로 처리를 할 수 있다고 했는데 그럼 컨슈머 listener 에 언제 카프카에 있는 이벤트를 받아서 처리할 것인지에 대한 설정을 할 수 있는 것인가요?시스템 부하가 적은 시기나 자원이 충분할 때가 언제인지 어떻게 알고 처리를 하는지 궁금합니다.
-
미해결Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA)
강의 수강 후 질문
안녕하세요 강의를 모두 수강후 , 강의를 통해 MSA관련 배운부분을 적용시키기 위해 프로젝트를 진행했습니다!Spring Cloud를 활용하고 서비스를 모두 개발완료 후 배포를 진행중입니다. 현재 마이크로 채팅 서비스가 ec2 a인스턴스에있고 gateway는 b인스턴스에 있고 모두 docker로 배포를 진행했습니다. eureka서버는 b인스턴스에 있고 gateway를 비롯해서 모든 마이크로서비스가 정상적으로 등록된 상태입니다.그러던중에, 아래와 같은 에러가 gateway 로그에 찍혔습니다. ec2 보안그룹도 정상적으로 포트를 열어줬고, 외부에서 접근이 가능하게 docker로 포트 매핑도 진행했고 , 방화벽도 문제가없는데 원래 gateway에서 ==> 다른 인스턴스에 있는 마이크로서비스를 연결을 하지못하는걸까요? 구글링을 많이해봤지만, 비슷한 내용이 없고 해결하기가 어려워 질문 남깁니다ㅜㅜ+)추가로 라우팅도 정상적으로 되어 매칭이 됬다는 로그도 나오는 상태에서 finishConnect(..) failed: Connection refused 에러가 생깁니다..a.w.r.e.AbstractErrorWebExceptionHandler : [1e416af9] 500 Server Error for HTTP GET "/plant-chat-service/chatroom/exist/seller?tradeBoardNo=2&memberNo=1" io.netty.channel.AbstractChannel$AnnotatedConnectException: finishConnect(..) failed: Connection refused: /172.18.0.5:46183 Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: Error has been observed at the following site(s): *__checkpoint ⇢ org.springframework.boot.actuate.web.trace.reactive.HttpTraceWebFilter [DefaultWebFilterChain] *__checkpoint ⇢ org.springframework.cloud.gateway.filter.WeightCalculatorWebFilter [DefaultWebFilterChain] *__checkpoint ⇢ org.springframework.boot.actuate.metrics.web.reactive.server.MetricsWebFilter [DefaultWebFilterChain] *__checkpoint ⇢ HTTP GET "/plant-chat-service/chatroom/exist/seller?tradeBoardNo=2&memberNo=1" [ExceptionHandlingWebHandler] Original Stack Trace: Caused by: java.net.ConnectException: finishConnect(..) failed: Connection refused at io.netty.channel.unix.Errors.newConnectException0(Errors.java:155) ~[netty-transport-native-unix-common-4.1.85.Final.jar!/:4.1.85.Final] at io.netty.channel.unix.Errors.handleConnectErrno(Errors.java:128) ~[netty-transport-native-unix-common-4.1.85.Final.jar!/:4.1.85.Final] at io.netty.channel.unix.Socket.finishConnect(Socket.java:359) ~[netty-transport-native-unix-common-4.1.85.Final.jar!/:4.1.85.Final] at io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.doFinishConnect(AbstractEpollChannel.java:710) ~[netty-transport-classes-epoll-4.1.85.Final.jar!/:4.1.85.Final] at io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.finishConnect(AbstractEpollChannel.java:687) ~[netty-transport-classes-epoll-4.1.85.Final.jar!/:4.1.85.Final] at io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.epollOutReady(AbstractEpollChannel.java:567) ~[netty-transport-classes-epoll-4.1.85.Final.jar!/:4.1.85.Final] at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:489) ~[netty-transport-classes-epoll-4.1.85.Final.jar!/:4.1.85.Final] at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:397) ~[netty-transport-classes-epoll-4.1.85.Final.jar!/:4.1.85.Final] at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[netty-common-4.1.85.Final.jar!/:4.1.85.Final] at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.85.Final.jar!/:4.1.85.Final] at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.85.Final.jar!/:4.1.85.Final] at java.base/java.lang.Thread.run(Thread.java:831) ~[na:na]
-
미해결실습으로 배우는 선착순 이벤트 시스템
docker compose up 실행 오류
version: '2' services: zookeeper: image: wurstmeister/zookeeper container_name: zookeeper ports: - "2181:2181" kafka: image: wurstmeister/kafka:2.12-2.5.0 container_name: kafka ports: - "9092:9092" environment: KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 volumes: - /var/run/docker.sock:/var/run/docker.sock 강의자료 내용대로 작성하고 실행했는데 다음과 같은 오류가 계속 발생하는데 원인을 모르겠습니다. 도커 로그인을 했는데도 계속 발생하네요. 오류 원인이 뭘까요? $ docker-compose up -d kafka Pulling zookeeper Pulling kafka Error zookeeper ErrorError response from daemon: pull access denied for wurstmeister/kafka, repository does not exist or may require 'docker login': denied: requested access to the resource is denied
-
미해결카프카 완벽 가이드 - 코어편
OrderDBHandler 에서 데이터가 다 안들어가고 누락이 됩니다.
원인을 찾아보니insertOrders 메소드에서pstmt.executeUpdate();이 부분이 문제입니다.대신에pstmt.executeBatch();이렇게 하니누락된 데이터가 없이다 들어갑니다.
-
미해결실습으로 배우는 선착순 이벤트 시스템
consumer 셋팅 숫자 출력이 안됩니다.
docker exec -it kafka kafka-console-consumer.sh --topic coupon_create --bootstrap-server localhost:9092 --key-deserializer "org.apache.kafka.common.serialization.StringDeserializer" --value-deserializer "org.apache.kafka.common.serialization.LongDeserializer"라고 터미너에 입력한 후에 테스트 코드package com.example.coupon_server.service; import com.example.coupon_server.repository.CouponRepository; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import static org.assertj.core.api.AssertionsForClassTypes.assertThat; @SpringBootTest public class ApplyServiceTest { @Autowired private ApplyService applyService; @Autowired private CouponRepository couponRepository; @Test @DisplayName("쿠폰 한개 적용 테스트") public void applyOneCoupon() { applyService.applyCoupon(1L); long count = couponRepository.count(); assertThat(count).isEqualTo(1); } @Test @DisplayName("쿠폰 여러개 적용 테스트") public void applyMultiCoupon() throws InterruptedException { int threadCount = 1000; ExecutorService executorService = Executors.newFixedThreadPool(32); CountDownLatch latch = new CountDownLatch(threadCount); for (int i = 0; i < threadCount; i++) { long userId = i; executorService.submit(() -> { try { applyService.applyCoupon(userId); } finally { latch.countDown(); } }); } latch.await(); Thread.sleep(10000); long count = couponRepository.count(); assertThat(count).isEqualTo(100); } @Test @DisplayName("한명당 한개의 쿠폰만 발급") public void applyOneCouponPerUser() throws InterruptedException { int threadCount = 1000; ExecutorService executorService = Executors.newFixedThreadPool(32); CountDownLatch latch = new CountDownLatch(threadCount); for (int i = 0; i < threadCount; i++) { long userId = i; executorService.submit(() -> { try { applyService.applyCoupon(1L); } finally { latch.countDown(); } }); } latch.await(); Thread.sleep(10000); long count = couponRepository.count(); assertThat(count).isEqualTo(1); } }로 작성했지만 터미널에서 강사님과 같이 숫자들이 찍히자 않습니다.그리고 터미널 안의 글씨 너무 작은것 같아요
-
미해결실습으로 배우는 선착순 이벤트 시스템
멀티 모듈
프로젝트 나누신것 같은데그냥 최상단 경로에서 프로젝트 하나 더 생성하고 난뒤에최상단에서 열면 되는건가요 ?
-
미해결카프카 완벽 가이드 - 코어편
Producer의 acks 설정 관련 실습 질문
안녕하세요 선생님 질문이 있습니다.acks = -1 이 All이라 하셨는데예제에서 erorr를 따로 던지지 않는 이유가 min_insync_replicas 가 기본값이 0 이라서 그냥 성공하는것인가요?
-
미해결Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA)
상품 목록조회를 하려고 하는데 데이터가 안불러와져요
db는 mysql로 바꿔서 하고 있습니다.데이터는 있는데 안불러와지는데 어떤게 문제 일까요?
-
미해결Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA)
CustomFilter 의 비동기 방식 request / response 메시지 관련
return (exchange, chain) -> { ServerHttpRequest request = exchange.getRequest(); ServerHttpResponse response = exchange.getResponse(); log.info("Custom PRE filter: request id -> {}", request.getId() ); // Custom Post Filter return chain.filter(exchange).then(Mono.fromRunnable(() -> { log.info("Custom POST filter: response code -> {}", response.getStatusCode()); })); }; //return null;강사님. 안녕하세요. 좋은 강의 잘 듣고 있어요. 이전에 devops 강의 너무 좋아서 이 강의도 듣게 되었습니다. 다름이 아니라 위의 람다코드는 getId 이후에 statusCode 부분을 비동기 방식으로 찍는 것 같은데 서버에서 Filter 적용시 비동기로 찍는 것은 아무래도 성능 때문인지 궁금하여 글 남깁니다. 감사합니다. !!!
-
미해결Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA)
Spring Cloud Gateway - Load Balancer 1강 보완 요청
일단 제 환경은 윈도우구요apigateway-service 내의 application.yml 파일의 url를 일반 ip 어드레스가 아닌 loadbalancer 방식으로 변경하여 테스트하면 진행되지 않습니다;트러블슈팅에 장시간이 걸렸는데 혹시나 해서 제 피씨 네임을 인식하지 못하나 해서 hosts 파일에 본인 pc 네임과 127.0.0.1을 추가해주면 됩니다. 만약 네 pc가 '뽀로로'인 경우뽀로로 127.0.0.1을 'C:\Windows\System32\drivers\etc'의 파일에 추가해줍니다.
-
미해결Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA)
안녕하세요 질문이 있습니다.
여러 서비스가 Kafka에 메시지를 전달해서 이를 단일 서버에서 저장하는 것은 이해가 갑니다! 그렇다면 조회할 때는 어떤 식으로 흐름이 발생하는건가요? 기존에 어플리케이션이 직접 DB와 커넥션이 되어있는 상태라면 조회문 날리면 그만인데 조회의 경우엔 조회 이벤트가 발생할 때마다 조회 이벤트를 보내면 DB가 카프카에 쿼리를 실행한 결과값을 저장하고, 어플리케이션에서는 또 컨슈머를 만들어서 그값을 가져오는 형태인가요?
-
미해결카프카 완벽 가이드 - 코어편
카프카 브로커와 레플리카 수 관계
안녕하세요 카프카 브로커 수와 토픽의 레플리카 팩터 수 관계가 궁금합니다! 토픽 레플리카 팩터를 3으로 할꺼면브로커는 최소 3개를 가져가야하는지 궁금합니다. 토픽 레플리카 팩터와 브로커 수가 연관이 있는지요!
-
미해결실습으로 배우는 선착순 이벤트 시스템
kafka 사용 이유
안녕하세요,궁금한 내용이 생겨 기존에 질문들을 살펴보고 제가 이해한 것이 맞는지 확인차 질문드립니다. redis의 싱글스레드 특성으로 100개 발급에 대한 race condition 해결 --> 그러나 insert 시 DB 처리량에 부하가 발생할 수 있음kafka 미들웨어를 통해 100개의 쿠폰 저장 이벤트를 보관해두었다가 컨슈머에서 원할때 꺼내어 처리할 수 있는 여지를 주어 처리량 부하를 분산시킬 수 있음ex) 쿠폰 발급은 당장 안해도 되니, kafka에 이벤트만 잘 발행되어 있다면 DB 작업량이 적은 새벽에 꺼내서 저장해도 됨 이렇게 이해했는데, 맞을까요?