묻고 답해요
141만명의 커뮤니티!! 함께 토론해봐요.
인프런 TOP Writers
-
미해결Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA)
/actuator/metrics 에서 names 값에 대한 질문
스프링 부트 버전 : 3.2.2JDK : 17 spring cloud gateway 를 사용하여127.0.0.1:8000/user-service/welcome, 127.0.0.1:8000/user-service/heath_check 주소를 호출했습니다.http://127.0.0.1:8000/user-service/actuator/prometheus 를 호출결과 중 일부입니다.# HELP http_server_requests_seconds # TYPE http_server_requests_seconds summary http_server_requests_seconds_count{error="none",exception="none",method="GET",outcome="SUCCESS",status="200",uri="/heath_check",} 5.0 http_server_requests_seconds_sum{error="none",exception="none",method="GET",outcome="SUCCESS",status="200",uri="/heath_check",} 0.020372399 http_server_requests_seconds_count{error="none",exception="none",method="GET",outcome="SUCCESS",status="200",uri="/actuator/prometheus",} 1.0 http_server_requests_seconds_sum{error="none",exception="none",method="GET",outcome="SUCCESS",status="200",uri="/actuator/prometheus",} 0.8039679 http_server_requests_seconds_count{error="none",exception="none",method="GET",outcome="SUCCESS",status="200",uri="/actuator/health",} 1.0 http_server_requests_seconds_sum{error="none",exception="none",method="GET",outcome="SUCCESS",status="200",uri="/actuator/health",} 0.0116107 http_server_requests_seconds_count{error="none",exception="none",method="GET",outcome="SUCCESS",status="200",uri="/welcome",} 3.0 http_server_requests_seconds_sum{error="none",exception="none",method="GET",outcome="SUCCESS",status="200",uri="/welcome",} 3.827017999 http_server_requests_seconds_count{error="none",exception="none",method="GET",outcome="SUCCESS",status="200",uri="/actuator/metrics",} 8.0 http_server_requests_seconds_sum{error="none",exception="none",method="GET",outcome="SUCCESS",status="200",uri="/actuator/metrics",} 0.039744298 http_server_requests_seconds_count{error="none",exception="none",method="GET",outcome="CLIENT_ERROR",status="404",uri="/**",} 1.0 http_server_requests_seconds_sum{error="none",exception="none",method="GET",outcome="CLIENT_ERROR",status="404",uri="/**",} 0.014729801 http_server_requests_seconds_count{error="none",exception="none",method="GET",outcome="SUCCESS",status="200",uri="/actuator/info",} 1.0 http_server_requests_seconds_sum{error="none",exception="none",method="GET",outcome="SUCCESS",status="200",uri="/actuator/info",} 0.090424499 # HELP http_server_requests_seconds_max # TYPE http_server_requests_seconds_max gauge http_server_requests_seconds_max{error="none",exception="none",method="GET",outcome="SUCCESS",status="200",uri="/heath_check",} 0.003873199 http_server_requests_seconds_max{error="none",exception="none",method="GET",outcome="SUCCESS",status="200",uri="/actuator/prometheus",} 0.8039679 http_server_requests_seconds_max{error="none",exception="none",method="GET",outcome="SUCCESS",status="200",uri="/actuator/health",} 0.0 http_server_requests_seconds_max{error="none",exception="none",method="GET",outcome="SUCCESS",status="200",uri="/welcome",} 0.0 http_server_requests_seconds_max{error="none",exception="none",method="GET",outcome="SUCCESS",status="200",uri="/actuator/metrics",} 0.0 http_server_requests_seconds_max{error="none",exception="none",method="GET",outcome="CLIENT_ERROR",status="404",uri="/**",} 0.0 http_server_requests_seconds_max{error="none",exception="none",method="GET",outcome="SUCCESS",status="200",uri="/actuator/info",} 0.0http://localhost:8000/user-service/actuator/metrics 를 호출결과 names 배열값에강의화면과 다르게 users.welcome, users.status 값이 없습니다. pom.xml<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>io.micrometer</groupId> <artifactId>micrometer-tracing-bridge-brave</artifactId> </dependency> <dependency> <groupId>io.zipkin.reporter2</groupId> <artifactId>zipkin-reporter-brave</artifactId> </dependency> <dependency> <groupId>io.github.openfeign</groupId> <artifactId>feign-micrometer</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-zipkin</artifactId> <version>2.2.8.RELEASE</version> </dependency> <dependency> <groupId>io.micrometer</groupId> <artifactId>micrometer-registry-prometheus</artifactId> </dependency> application.ymlmanagement: endpoints: web: exposure: include: refresh, health, beans, busrefresh, info, metrics, prometheus tracing: sampling: probability: 1.0 propagation: consume: b3 produce: b3 zipkin: tracing: endpoint: "http://localhost:9411/api/v2/spans" controller.javapackage com.example.userservice.controller; import com.example.userservice.dto.UserDto; import com.example.userservice.jpa.UserEntity; import com.example.userservice.service.UserService; import com.example.userservice.vo.Greeting; import com.example.userservice.vo.RequestUser; import com.example.userservice.vo.ResponseUser; import io.micrometer.core.annotation.Timed; import org.modelmapper.ModelMapper; import org.modelmapper.convention.MatchingStrategies; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.core.env.Environment; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.*; import java.util.ArrayList; import java.util.List; @RestController public class UserController { private Environment env; private UserService userService; @Autowired private Greeting greeting; public UserController(Environment env, UserService userService) { this.env = env; this.userService = userService; } @GetMapping("/heath_check") @Timed(value = "users.status", longTask = true) public String status() { return String.format("It's Working in User Service " + ", port(local.server.port)=" + env.getProperty("local.server.port") + ", port(server.port)=" + env.getProperty("server.port") + ", port(token.secret)=" + env.getProperty("token.secret") + ", port(token.expiration_time)=" + env.getProperty("token.expiration_time") ); } @GetMapping("/welcome") @Timed(value = "users.welcome", longTask = true) public String welcome() { // return env.getProperty("greeting.message"); return greeting.getMessage(); } @PostMapping("/users") public ResponseEntity<ResponseUser> createUser(@RequestBody RequestUser user) { ModelMapper mapper = new ModelMapper(); mapper.getConfiguration().setMatchingStrategy(MatchingStrategies.STRICT); UserDto userDto = mapper.map(user, UserDto.class); userService.createUser(userDto); ResponseUser responseUser = mapper.map(userDto, ResponseUser.class); return ResponseEntity.status(HttpStatus.CREATED).body(responseUser); } @GetMapping("/users") public ResponseEntity<List<ResponseUser>> getUsers() { Iterable<UserEntity> userList = userService.getUserByAll(); List<ResponseUser> result = new ArrayList<>(); userList.forEach(v -> { result.add(new ModelMapper().map(v, ResponseUser.class)); }); return ResponseEntity.status(HttpStatus.OK).body(result); } @GetMapping("/users/{userId}") public ResponseEntity<ResponseUser> getUser(@PathVariable String userId) { UserDto userDto = userService.getUserById(userId); ResponseUser returnValue = new ModelMapper().map(userDto, ResponseUser.class); return ResponseEntity.status(HttpStatus.OK).body(returnValue); } }
-
미해결15일간의 빅데이터 파일럿 프로젝트
파일럿 프로젝트 pc 환경 구성 질문입니다.
혹시 docker를 활용해서 실습환경을 구축하여도 문제가 있을까요?
-
미해결Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA)
RabbitMq 에 대해
현재 강의에서 RabbitMq를 사용하여 어떻게 각각의 서버로 변경된 데이터를 전달하는지 아키텍처가 궁금합니다. 각각의 서버는 RabbitMq서버를 구독하고있는건가요?푸시방식이라고 했는데 각각의 서버가 RabbitMq의 큐에서 메시지를 가져가는 폴링방식이 아닌가요?구독하고있는 각 서버로 브로드캐스트하여 푸시하는 방식이지 폴링방식이 아니라는건가요?busrefresh를 통하여 큐에 메시지를 발행하고, 각 서버는 큐에서 메시지를 꺼내어가져가는건가요(폴링)? 아니면 큐에서 메시지를 RabbitMq가 직접 각각의 서버로 전송하는 푸시방식인가요? 푸시방식이 맞다면 AMQP프로토콜로 서버들이 실행할때마다 이미 연결이 되어있는 상태를 유지하고 있기 때문에 푸시가 가능한건가요?원리가 궁금합니다..
-
미해결카프카 완벽 가이드 - 코어편
강사님 인텔리제이 출력메시지 줄이는 방법 한번만 다시 부탁드립니다.
지난번 강의중에 intellj 메시지 줄이는 방법을 알려주셨는데제가 깜빡 잊고 설정을 하지 않았습니다.다시 그 강의를 찾아보려니 엄두가 나지 않아서 번거로우겠지만 다시 부탁드려도 되는지요?
-
미해결Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA)
같은 서비스간에 api 통신
강의에서는 다른 서비스 간에 통신을 위해 open feign 이나 rest template을 사용하고 있는데 같은 서비스 안에서 어떤 api가 다른 api를 호출할때도 마찬가지로 둘중 하나를 사용하나요? 그리고 그때는 @FeignClient url 주소값을 자기 자신으로 넣으면 되나요?
-
미해결Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA)
Remote Git Repository
제목 : Remote Git Repository 수업에서컨피그 설정 레포지토리의 브랜치가 main 이면http://127.0.0.1:8888/ecommerce/dev접속하여도 정보를 가져오지못하는데..무조건 브랜치가 master 이어야 하나요?master 이면 정보를 가져옵니다..
-
미해결카프카 완벽 가이드 - 코어편
멀티 노드구성 관련 질문입니다
안녕하세요!좋은 강의 잘 듣고 있는 수강생입니다.강의에 나온 예제는 하나의 VM에 멀티 노드를 구성하는 예제인데! 예를 들어 VM3대에 각각 한 개의 브로커를 띄우고 VM3대를 하나의 클러스터로 구성하려면 강의에서 보여주신 설정이외에 추가적인 설정이 필요한건지 궁금해 질문을 남깁니다. (전제: VM3대가 네트워크로 연결되어있다) (추측이지만, 실제로 멀티 브로커를 구성하는 상황이라면 여러대의 VM에 브로커를 구성할 거 같다고 생각했기 때문입니다)
-
미해결카프카 완벽 가이드 - 코어편
클러스터 관련 질문입니다!
안녕하세요! 카프카를 멀티 브로커로 구성하는 기준이 있을까요?예를 들면 어느정도 이상부터 멀티 브로커로 구성을 하는지에 대한 best practice가 있을까요?아니면, 단일 노드로 구성하고 scale out하는 방식으로 해야 하는 걸까요?
-
미해결Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA)
h2-console 왼쪽 메뉴에 테이블이 안보입니다.
빨간색 영역에 생성된 테이블이 있어야 하는데 안보입니다.refresh 버튼을 누르거나 로그아웃 후 로그인해도 안보입니다.show tables; 명령어를 실행하면 테이블은 존재합니다.
-
미해결Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA)
hasIpAddress 설정 403
왜 게이트웨이로 접근이 안될가요?hasipaddress값에 제 컴 아이피를 적고http://220.86.33.96:9295/welcome > 접근가능한데 http://220.86.33.96:8000/user-service/welcome > 접근불가떠요..이해가안됩니다 게이트웨이서버건 유저 서버건모두 제 컴퓨터고 제 서버 아이피가 220.86.33.96인데..hasIpAddress값을 127.0.0.1로 주고 접근하면 모든곳에서 접근이 됩니다..
-
해결됨15일간의 빅데이터 파일럿 프로젝트
고사양 server03 이미지
안녕하세요고사양 pc에서는 server01, 02, 03 이 필요한 것으로 알고 있는데 혹시 server03은 어디서 받을 수 있을까요? 섹션 1~2에 업로드된 첨부파일 확인했는데 server03 이미지를 찾지 못하여 문의 드립니다!
-
미해결Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA)
강의 업데이트 시기를 알 수 있을까요?
제목이 보채는것 같이보여 죄송합니다.최대한 소심? 하게 물어보고 싶은데 단어가 안떠오르네요. 아무튼 저번 업데이트된 1편 너무 잘 봤습니다.혹시 2편 강의 내용 업데이트 날자를 대충이라도(6월 전 이렇게...) 알 수 있을까요..?
-
해결됨실습으로 배우는 선착순 이벤트 시스템
kafka를 왜 사용하는지가 잘 이해가 안가서 질문 남깁니다!
안녕하세요! 강의 잘 듣고 있습니다. 감사합니다. https://www.inflearn.com/course/lecture?courseSlug=%EC%84%A0%EC%B0%A9%EC%88%9C-%EC%9D%B4%EB%B2%A4%ED%8A%B8-%EC%8B%9C%EC%8A%A4%ED%85%9C-%EC%8B%A4%EC%8A%B5&unitId=156125&category=questionDetail&tab=community&q=1029856해당 질문과 답변을 보고 추가 질문 드리려고 합니다. 제가 kafka나 redis, 분산서버 등에 대해 이해도가 낮은 점 양해 부탁드립니다! 1.Kafka 미사용시 주문생성/회원가입요청의 타임아웃 및 10분뒤 실행에 대한 해결책으로 Kafka 를 선택한 이유는 배압조절(back pressure) 때문입니다.이렇게 말씀을 해주셨는데요,답변에서 말씀하신 예시에서 처럼 10000개 요청이 있고,카프카를 사용한다면,요청 100개가 쌓일때마다 db에 insert를 하고, 다시 요청 100개가 쌓일때까지 기다렸다가 insert 하기를 반복한다는 것으로 이해하면 될까요?2. 그게 맞다면, 강의에서 구현한 apply 메서드에서 100개의 요청이 왔는지 확인하지 않고, kafka를 사용해서 다른곳에 전달하여 처리하는 이유는 무엇인가요?예시로 apply 메서드 안에서 redis의 incr 값을 체크하면 요청이 몇개가 쌓였는지 알 수 있을테고, 데이터를 임시저장하다가 100개마다 처리할 수 있을거란 생각이 들었습니다. kafka로 다른 모듈로 전달하는 것과의 차이점이 무엇인가 궁금합니다.3.실제로 consumer에서 100개의 작업이 완료되었는지는 일반적으로 어떻게 확인하는 걸까요? db에 저장하기 전에 100개의 데이터는 어디에 임시저장을 하나요?
-
해결됨15일간의 빅데이터 파일럿 프로젝트
VirtualBox-5.0.40 실행불가
VirtualBox-5.0.40윈도우에서는 실행이 안된다고 에러메시지가 뜹니다. 다른분이 질문하신 게시글에서 버츄얼박스는 '흰색남자'님의 OS(Windows 10)에 최적화된 버젼을 설치해 사용 하시면 됩니다. 라고 하셨는데 그 글도 삭제된 것 같아서요 VirtualBox-5.0.40-115130-Win다른 방안을 알려주시면 감사하겠습니다.
-
미해결Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA)
인증서, 공개키 파일은 언제 사용하나요?
안녕하세요. 비대칭키 암호화 강의 영상을 시청했습니다.강의를 들으면서 keystore 폴더에 keytool을 사용하여 3가지 파일을 생성했습니다.비공개키 파일: apiEncryptionKey.jks인증서 파일: trustServer.cer공개키 파일: publicKey.jks spring cloud config 프로젝트에서 다음과 같이 코드를 작성했습니다.pom.xml <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-bus-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-bootstrap</artifactId> </dependency> ConfigServiceApplication.java@SpringBootApplication @EnableConfigServer public class ConfigServiceApplication { public static void main(String[] args) { SpringApplication.run(ConfigServiceApplication.class, args); } } bootstrap.yml encrypt: # key: abcdefghijklmnopqrstuvwxyz0123456789 key-store: location: file:///${user.home}/Desktop/keystore/apiEncryptionKey.jks password: test1234 alias: apiEncryptionKey 강의를 다 듣고 난 후 궁금한점이 생겨서 테스트를 진행했습니다.keystore 폴더에서 인증서 파일, 공개키 파일을 삭제했습니다. http://127.0.0.1:8888/encrypthttp://127.0.0.1:8888/decrypthttp://localhost:8888/ecommerce/default강의영상에 나온 위 api 호출을 다시 시도해봤습니다.마치 대칭키 암호화 방식처럼 비공개키 하나로 암호화, 복호화를 하고 있습니다. 질문1: 인증서 파일, 공개키 파일은 언제 사용하는건지 궁금합니다.질문2: spring cloud config 프로젝트에서 비대칭키 암호화를 사용하고 있는건지 궁금합니다.
-
미해결Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA)
Windows에서 mysql.server 수행 안 됨
"Orders Microservice에서 MariaDB 연동" 강의에서 MariaDB만 설치한 채로 mysql.server start 명령을 수행하시는데 저는 해당 명령이 없다고 나옵니다. MariaDB 바이너리 설치 경로에서 수행해도 마찬가지입니다. 설치한 버전은 강의에서 사용한 버전과 동일합니다.해당 폴더를 보면 mariadb.exe, mariadbd.exe, mysql.exe, mysqld.exe 등의 파일이 있긴 합니다.
-
미해결Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA)
UsernamePasswordAuthenticatioToken
안녕하세요, 문의 사항이 있어 글 납깁니다...당장에 이해가 안되어서요,,,,UserDetailService . loadUserByUsername()은 password활용 없이 ID만을 통해 find하게 짜도 문제 없나요,,? 어디선가 password를 따로 비교해주나요???....?어떤 컬럼인지 어떻게 알고 비교해주지,,,ㅁ 질문드립니다
-
미해결15일간의 빅데이터 파일럿 프로젝트
클라우데라 ERR_CONNECTION_REFUSED 문제
안녕하세요 VM 통합 환경 구성 중입니다.현재 인텔 MAC 사용중이고 ,HOST,NAC 설정 까지 다 해주었는데 연결이 안돼서 진행을 못하고 있습니다 . ㅠ추가로 putty 접속도 안되네요.. 원인이 뭘까요??..확인 한번 부탁드립니다..++ 수정네트워크를 다음과 같이 변경후 서버 재시작하였더니이제 refused는 뜨지 않지만 time out 에러가 뜨네요 ㅠputty도 마찬가지입니다. ++ 수정 server02 는 현재 ssh 접속이 가능합니다..!정확하게 host정보를 입력한거 같은데 server01은 접속이 안되네요 ! ++ server 01 에서 바로 서비스체크 해보았습니다.클라우데라 매니저 잘 작동 중이고 ,, 리스타도 해보았는데여전히 http://server01.hadoop.com/ 치고 들어가면 refused 뜨네요 ㅜㅜ
-
해결됨실습으로 배우는 선착순 이벤트 시스템
consumer에서 숫자가 출력되지 않습니다ㅠ
안녕하세요제가 누락된 곳이 있는건지 테스를 시작하면터미널에서 컨슈머쪽에 숫자가 찍히지 않는데 무슨 문제일까요....?아예 실행자체가 안되는 것 같은데 혹시 터미널로 Producer테스트 코드 여쭤볼 수 있을까요..혹시 아니면 프로젝트 코드 문제가 있을 것 같아서주소 첨부드립니다..감사합니다 https://github.com/KMSKang/coupon-system [터미널에 입력한 consumer 명령어]docker exec -it kafka kafka-console-consumer.sh --topic coupon_create --bootstrap-server localhost:9092 --key-deserializer "org.apache.kafka.common.serialization.StringDeserializer" --value-deserializer "org.apache.kafka.common.serialization.LongDeserializer"ProducerConfig values: acks = -1 batch.size = 16384 bootstrap.servers = [localhost:9092] buffer.memory = 33554432 client.dns.lookup = use_all_dns_ips client.id = producer-100 compression.type = none connections.max.idle.ms = 540000 delivery.timeout.ms = 120000 enable.idempotence = true interceptor.classes = [] key.serializer = class com.fasterxml.jackson.databind.ser.std.StringSerializer linger.ms = 0 max.block.ms = 60000 max.in.flight.requests.per.connection = 5 max.request.size = 1048576 metadata.max.age.ms = 300000 metadata.max.idle.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partitioner.adaptive.partitioning.enable = true partitioner.availability.timeout.ms = 0 partitioner.class = null partitioner.ignore.keys = false receive.buffer.bytes = 32768 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retries = 2147483647 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.connect.timeout.ms = null sasl.login.read.timeout.ms = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.login.retry.backoff.max.ms = 10000 sasl.login.retry.backoff.ms = 100 sasl.mechanism = GSSAPI sasl.oauthbearer.clock.skew.seconds = 30 sasl.oauthbearer.expected.audience = null sasl.oauthbearer.expected.issuer = null sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000 sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000 sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100 sasl.oauthbearer.jwks.endpoint.url = null sasl.oauthbearer.scope.claim.name = scope sasl.oauthbearer.sub.claim.name = sub sasl.oauthbearer.token.endpoint.url = null security.protocol = PLAINTEXT security.providers = null send.buffer.bytes = 131072 socket.connection.setup.timeout.max.ms = 30000 socket.connection.setup.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.3] ssl.endpoint.identification.algorithm = https ssl.engine.factory.class = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.certificate.chain = null ssl.keystore.key = null ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLSv1.3 ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.certificates = null ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS transaction.timeout.ms = 60000 transactional.id = null value.serializer = class org.apache.kafka.common.serialization.LongSerializer
-
미해결카프카 완벽 가이드 - 코어편
nocommit 관련 질문
package com.example; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.serialization.StringDeserializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.time.Duration; import java.util.Arrays; import java.util.Map; import java.util.Properties; public class ConsumerPartitionAssignSeek { public static final Logger logger = LoggerFactory.getLogger(ConsumerPartitionAssignSeek.class.getName()); public static void main(String[] args) { String topicName = "pizza-topic"; Properties props = new Properties(); props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.101:9092"); props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); //props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group_pizza_assign_seek_v001"); //props.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "6000"); props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(props); TopicPartition topicPartition = new TopicPartition(topicName, 0); //kafkaConsumer.subscribe(List.of(topicName)); kafkaConsumer.assign(Arrays.asList(topicPartition)); kafkaConsumer.seek(topicPartition, 5L); //main thread Thread mainThread = Thread.currentThread(); //main thread 종료시 별도의 thread로 KafkaConsumer wakeup()메소드를 호출하게 함. Runtime.getRuntime().addShutdownHook(new Thread() { public void run() { logger.info(" main program starts to exit by calling wakeup"); kafkaConsumer.wakeup(); try { mainThread.join(); } catch(InterruptedException e) { e.printStackTrace();} } }); //kafkaConsumer.close(); //pollAutoCommit(kafkaConsumer); //pollCommitSync(kafkaConsumer); //pollCommitAsync(kafkaConsumer); pollNoCommit(kafkaConsumer); } private static void pollNoCommit(KafkaConsumer<String, String> kafkaConsumer) { int loopCnt = 0; try { while (true) { ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000)); logger.info(" ######## loopCnt: {} consumerRecords count:{}", loopCnt++, consumerRecords.count()); for (ConsumerRecord record : consumerRecords) { logger.info("record key:{}, partition:{}, record offset:{} record value:{}", record.key(), record.partition(), record.offset(), record.value()); } } }catch(WakeupException e) { logger.error("wakeup exception has been called"); }catch(Exception e) { logger.error(e.getMessage()); }finally { logger.info("finally consumer is closing"); kafkaConsumer.close(); } } private static void pollCommitAsync(KafkaConsumer<String, String> kafkaConsumer) { int loopCnt = 0; try { while (true) { ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000)); logger.info(" ######## loopCnt: {} consumerRecords count:{}", loopCnt++, consumerRecords.count()); for (ConsumerRecord record : consumerRecords) { logger.info("record key:{}, partition:{}, record offset:{} record value:{}", record.key(), record.partition(), record.offset(), record.value()); } kafkaConsumer.commitAsync(new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { if(exception != null) { logger.error("offsets {} is not completed, error:{}", offsets, exception.getMessage()); } } }); } }catch(WakeupException e) { logger.error("wakeup exception has been called"); }catch(Exception e) { logger.error(e.getMessage()); }finally { logger.info("##### commit sync before closing"); kafkaConsumer.commitSync(); logger.info("finally consumer is closing"); kafkaConsumer.close(); } } private static void pollCommitSync(KafkaConsumer<String, String> kafkaConsumer) { int loopCnt = 0; try { while (true) { ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000)); logger.info(" ######## loopCnt: {} consumerRecords count:{}", loopCnt++, consumerRecords.count()); for (ConsumerRecord record : consumerRecords) { logger.info("record key:{}, partition:{}, record offset:{} record value:{}", record.key(), record.partition(), record.offset(), record.value()); } try { if(consumerRecords.count() > 0 ) { kafkaConsumer.commitSync(); logger.info("commit sync has been called"); } } catch(CommitFailedException e) { logger.error(e.getMessage()); } } }catch(WakeupException e) { logger.error("wakeup exception has been called"); }catch(Exception e) { logger.error(e.getMessage()); }finally { logger.info("finally consumer is closing"); kafkaConsumer.close(); } } public static void pollAutoCommit(KafkaConsumer<String, String> kafkaConsumer) { int loopCnt = 0; try { while (true) { ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000)); logger.info(" ######## loopCnt: {} consumerRecords count:{}", loopCnt++, consumerRecords.count()); for (ConsumerRecord record : consumerRecords) { logger.info("record key:{}, partition:{}, record offset:{} record value:{}", record.key(), record.partition(), record.offset(), record.value()); } try { logger.info("main thread is sleeping {} ms during while loop", 10000); Thread.sleep(10000); }catch(InterruptedException e) { e.printStackTrace(); } } }catch(WakeupException e) { logger.error("wakeup exception has been called"); }finally { logger.info("finally consumer is closing"); kafkaConsumer.close(); } } } 해당 코드에 문제가 없는 것으로 보입니다. java.lang.IllegalStateException: This consumer has already been closed. at org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:2456) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1218) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211) at com.example.ConsumerPartitionAssignSeek.pollNoCommit(ConsumerPartitionAssignSeek.java:63) at com.example.ConsumerPartitionAssignSeek.main(ConsumerPartitionAssignSeek.java:53) 해당하는 에러가 띄는데 이유를 알 수 있을까요? git 코드는 잘돌아가는 것을 확인했습니다. 차이점이 알 수가 없어서 질문드립니다,