묻고 답해요
141만명의 커뮤니티!! 함께 토론해봐요.
인프런 TOP Writers
-
미해결Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA)
kafka sink 설정 시 테이블 생성이 안됩니다
{ "name": "my-sink-connect", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "connection.url": "jdbc:mariadb://localhost:3306/mydb", "connection.user": "root", "connection.password": "test1234", "mode": "incrementing", "incrementing.column.name": "id", "auto.create": "true", "auto.evolve": "true", "delete.enabled": "false", "tasks.max": "1", "topic": "my_topic_users", "table.whitelist": "mydb.users" } }confluent-community-connect-7.5.0-zOS confluentinc-kafka-connect-jdbc-10.7.4이렇게 사용중이고 모드를 빼면 에러가 발생하네요커넨터 로그엔 에러가 없어요 [2023-12-14 00:57:25,395] INFO SourceConnectorConfig values: config.action.reload = restart connector.class = io.confluent.connect.jdbc.JdbcSourceConnector errors.log.enable = false errors.log.include.messages = false errors.retry.delay.max.ms = 60000 errors.retry.timeout = 0 errors.tolerance = none exactly.once.support = requested header.converter = null key.converter = null name = my-sink-connect offsets.storage.topic = null predicates = [] tasks.max = 1 topic.creation.groups = [] transaction.boundary = poll transaction.boundary.interval.ms = null transforms = [] value.converter = null (org.apache.kafka.connect.runtime.SourceConnectorConfig:369)[2023-12-14 00:57:25,396] INFO [my-sink-connect|task-0] Validating JDBC URL. (io.confluent.connect.jdbc.dialect.DatabaseDialects:171)[2023-12-14 00:57:25,396] INFO [my-sink-connect|task-0] Validated JDBC URL. (io.confluent.connect.jdbc.dialect.DatabaseDialects:174)[2023-12-14 00:57:25,396] INFO [my-sink-connect|task-0] Using JDBC dialect MySql (io.confluent.connect.jdbc.source.JdbcSourceTask:138)[2023-12-14 00:57:25,396] INFO EnrichedConnectorConfig values: config.action.reload = restart connector.class = io.confluent.connect.jdbc.JdbcSourceConnector errors.log.enable = false errors.log.include.messages = false errors.retry.delay.max.ms = 60000 errors.retry.timeout = 0 errors.tolerance = none exactly.once.support = requested header.converter = null key.converter = null name = my-sink-connect offsets.storage.topic = null predicates = [] tasks.max = 1 topic.creation.groups = [] transaction.boundary = poll transaction.boundary.interval.ms = null transforms = [] value.converter = null (org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:369)[2023-12-14 00:57:25,397] INFO [my-sink-connect|task-0] [Producer clientId=connector-producer-my-sink-connect-0] Cluster ID: 61ETmEcJQASp3yeJGdTmPw (org.apache.kafka.clients.Metadata:287)[2023-12-14 00:57:25,413] INFO [my-sink-connect|task-0] Found offset {{table=users}=null, {protocol=1, table=mydb.users}={incrementing=17}} for partition {protocol=1, table=mydb.users} (io.confluent.connect.jdbc.source.JdbcSourceTask:234)[2023-12-14 00:57:25,414] INFO [my-sink-connect|task-0] Started JDBC source task (io.confluent.connect.jdbc.source.JdbcSourceTask:307)[2023-12-14 00:57:25,414] INFO [my-sink-connect|task-0] WorkerSourceTask{id=my-sink-connect-0} Source task finished initialization and start (org.apache.kafka.connect.runtime.AbstractWorkerSourceTask:275)[2023-12-14 00:57:25,414] INFO [my-sink-connect|task-0] Begin using SQL query: SELECT * FROM mydb.`users` WHERE mydb.`users`.`id` > ? ORDER BY mydb.`users`.`id` ASC (io.confluent.connect.jdbc.source.TableQuerier:182)
-
미해결Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA)
스프링 시큐리티 최신버전 코드 있을까요?
제일 최근에 질문한 글 참고해도 적용이 안되네요...혹시 현재 버전에서 적용 가능한 코드가 있을까요?아니면 참고할 수 있는 자료라도 있을까요? https://start.spring.io/ 에서 gradle로 생성해서 사용중입니다. 스프링 3.2버전 사용하고있어요.
-
미해결실습으로 배우는 선착순 이벤트 시스템
예제 프로젝트 상에서의 Kafka 사용시 궁금한점
강의 잘 듣고 있습니다. 질문사항이 두개 있습니다.1.4강의 [문제점] 영상에서 쿠폰생성 10000개 요청으로 인해 mysql이 1분에 100개의 insert가 가능하다고 가정할 시 '주문생성/회원가입요청이 타임아웃 또는 10분뒤에 실행' 된다고 하셨는데요.예제로 사용하신 Kafka 사용 예제에서는 Consumer 프로젝트도 어차피 API프로젝트와 같은 DB를 바라보고 있으므로, 어차피 Kafka를 사용하여도 '주문생성/회원가입요청이 타임아웃 또는 10분뒤에 실행'되지 않나요? 왜 여쭤보냐면, 강의 내에서 Kafka 미사용시 주문생성/회원가입요청의 타임아웃 및 10분뒤 실행에 대한 해결책을 Kafka로 사용하셔서 문의드립니다.2.5강의 [Consumer 사용하기] 영상을 보면 API 프로젝트 Consumer 프로젝트가 별개로 존재합니다.그러므로 API프로젝트의 테스트 케이스가 종료되어도 Consumer 프로젝트는 이미 Kafka로 100개의 데이터가 스트림으로 들어오는 상태이므로, 테스트케이스가 종료되어도(즉, API프로젝트가 종료되어도) Cunsumer 프로젝트는 종료가 되지 않은 상태이므로 100개의 쿠폰이 DB에 생성이 되어야 하는게 아닌지요?왜 여쭤보냐면, 강의 내에서는 API프로젝트가 종료되면 Consumer 프로젝트도 작업이 멈추는 현상이 있어서 문의드립니다.
-
미해결Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA)
SpringBoot 3점대 버전 Spring Security 설정
Spring Security 가 3점대 버전으로 오면서 상당한 변화가 있습니다. 강의 내용을 따라 하다보니 순환참조나, 현재는 지원하지 않는 기능이 상당수 존재하였습니다. 현재 작업한 코드가 문제 해결에 많은 도움이 되면 좋겠어서 글을 첨부합니다. SecurityConfig.class 입니다.@Configuration @EnableWebSecurity @RequiredArgsConstructor public class SecurityConfig{ private final CustomAuthenticationManager customAuthenticationManager; private final UserFindPort userFindPort; private final Environment environment; @Bean public WebSecurityCustomizer webSecurityCustomizer() { return (web) -> web.ignoring(). requestMatchers(new AntPathRequestMatcher("/h2-console/**")) .requestMatchers(new AntPathRequestMatcher( "/favicon.ico")) .requestMatchers(new AntPathRequestMatcher( "/css/**")) .requestMatchers(new AntPathRequestMatcher( "/js/**")) .requestMatchers(new AntPathRequestMatcher( "/img/**")) .requestMatchers(new AntPathRequestMatcher( "/lib/**")); } @Bean protected SecurityFilterChain filterChain(HttpSecurity http, HandlerMappingIntrospector introspector) throws Exception { http.csrf(AbstractHttpConfigurer::disable); http.authorizeHttpRequests(authorize -> authorize.requestMatchers(new MvcRequestMatcher(introspector, "/**")).permitAll() // requestMatchers(new MvcRequestMatcher.Builder(introspector).pattern(HttpMethod.GET, "/users/**")).permitAll() // .requestMatchers(new MvcRequestMatcher(introspector, "/greeting")).permitAll() // .requestMatchers(new MvcRequestMatcher(introspector, "/welcome")).permitAll() // .requestMatchers(new MvcRequestMatcher(introspector, "/health-check")).permitAll() // .requestMatchers(new MvcRequestMatcher.Builder(introspector).pattern(HttpMethod.POST, "/users")).permitAll() .anyRequest() .authenticated()) .addFilter(getAuthenticationFilter()) .httpBasic(Customizer.withDefaults()); return http.build(); } private AuthenticationFilter getAuthenticationFilter() { return new AuthenticationFilter(customAuthenticationManager, userFindPort, environment); } }requestMatcher에서 AntPathRequestMatcher, MvcRequestMatcher에 관한 설명은부족하지만 https://velog.io/@dktlsk6/Spring-Security-RequestMatcher에서 확인 가능하십니다. CustomUserDetailService.class 입니다. 순환참조 문제가 발생하여 강의와 달리 새로 CustomService를 생성하여 implements 하였습니다.@Component @RequiredArgsConstructor public class CustomUserDetailService implements UserDetailsService { private final UserFindPort userFindPort; @Override public UserDetails loadUserByUsername(String username) throws UsernameNotFoundException { UserDto userByEmail = userFindPort.findUserByEmail(username); if (userByEmail == null) { throw new UsernameNotFoundException("User Not Found"); } return new User(userByEmail.getEmail(), userByEmail.getEncPasswd(), true, true, true, true, new ArrayList<>()); } } CustomAuthenticationManager.class 입니다. AuthenticationFilter의 AuthenticationManager로 사용할 것입니다.@Component @RequiredArgsConstructor @Slf4j public class CustomAuthenticationManager implements AuthenticationManager { private final CustomUserDetailService customUserDetailService; @Bean protected PasswordEncoder passwordEncoder() { return new BCryptPasswordEncoder(); } @Override public Authentication authenticate(Authentication authentication) throws AuthenticationException { UserDetails userDetails = customUserDetailService.loadUserByUsername(authentication.getName()); if (!passwordEncoder().matches(authentication.getCredentials().toString(), userDetails.getPassword())) { throw new BadCredentialsException("Wrong password"); } return new UsernamePasswordAuthenticationToken(userDetails.getUsername(), userDetails.getPassword(), userDetails.getAuthorities()); } } AuthenticationFilter.class 입니다. 해당 부분은 강의와 차이점이 없습니다.@Slf4j public class AuthenticationFilter extends UsernamePasswordAuthenticationFilter { private final UserFindPort userFindPort; private final Environment environment; public AuthenticationFilter(AuthenticationManager authenticationManager, UserFindPort userFindPort, Environment environment) { super.setAuthenticationManager(authenticationManager); this.userFindPort = userFindPort; this.environment = environment; } @Override public Authentication attemptAuthentication(HttpServletRequest request, HttpServletResponse response) throws AuthenticationException { try { LoginRequestDto creds = new ObjectMapper().readValue(request.getInputStream(), LoginRequestDto.class); UsernamePasswordAuthenticationToken token = new UsernamePasswordAuthenticationToken(creds.getEmail(), creds.getPassword(), new ArrayList<>()); return getAuthenticationManager().authenticate(token); } catch (IOException e) { throw new RuntimeException(e); } } @Override protected void successfulAuthentication(HttpServletRequest request, HttpServletResponse response, FilterChain chain, Authentication authResult) throws IOException, ServletException { String username = authResult.getName(); UserDto user = userFindPort.findUserByEmail(username); if (user == null) { throw new UsernameNotFoundException(username); } log.debug("user id {}", user.getUserId()); String token = Jwts.builder() .setSubject(user.getUserId()) .setExpiration(new Date(System.currentTimeMillis() + Long.parseLong(environment.getProperty("token.expiration.time")))) .signWith(SignatureAlgorithm.HS512, environment.getProperty("token.secret")) .compact(); response.addHeader("token", token); response.addHeader("userId", user.getUserId()); } } 아래는 실제 결과입니다.404가 뜨는 이유는 login 성공시 redirect url을 설정해주지 않아서 /(루트) 경로로 이동해서입니다. 해당 경로와 매핑되는 resource나 api가 없기 때문에 해당 오류가 발생한것이므로 정상작동으로 생각하시면 됩니다.아래는 잘못된 정보를 기입하여 실패 테스트 입니다. 추후 강의를 들으며 업데이트 하도록 하겠습니다.
-
미해결[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!
컨슈머 랙 모니터링 아키텍처 관련 질문
안녕하세요. 좋은 강의 잘 보고 있습니다. 컨슈머 랙 모니터링 아키텍처 관련 질문이 있습니다.카프카 버로우, 텔레그래프 application에 대해서 각각의 노드에서 구성하는 것이 일반적인지 아니면 카프카 버로우, 텔레그래프를 하나의 노드에서 동작시켜도 무방한 건지에 대한 부분이 궁금합니다.
-
미해결Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA)
섹션14 Prometheus와 Grafana 설치 강의 내용중 문의 드립니다
강의 내용대로 진행하였는데http://localhost:8000/user-service/actuator/prometheushttp://localhost:8000/order-service/actuator/prometheus둘다 아래 이미지와같은 에러가 발생합니다 apigateway application.yml 설정- id: user-service uri: lb://USER-SERVICE predicates: - Path=/user-service/actuator/** - Method=GET,POST filters: - RemoveRequestHeader=Cookie - RewritePath=/user-service/(?<segment>.*), /$\{segment} - id: order-service uri: lb://ORDER-SERVICE predicates: - Path=/order-service/actuator/** - Method=GET filters: - RemoveRequestHeader=Cookie - RewritePath=/order-service/(?<segment>.*), /$\{segment} 프로메테우스.yml 설정static_configs: - targets: ["localhost:9090"] - job_name: 'user-service' scrape_interval: 15s metrics_path: '/user-service/actuator/prometheus' static_configs: - targets: ['localhost:8000'] - job_name: 'order-service' scrape_interval: 15s metrics_path: '/order-service/actuator/prometheus' static_configs: - targets: ['localhost:8000'] - job_name: 'apigateway-service' scrape_interval: 15s metrics_path: '/actuator/prometheus' static_configs: - targets: ['localhost:8000'] apigateway 의 AuthorizationHeaderFilter.java// 절차 // login -> token반환받음 -> client에서 apigateway로 정보 요청 시 (토큰정보를 가지고 요청함) -> 서버에서는 토큰정보 검증 (header 안에 토큰이 포함됨) 38줄 @Override public GatewayFilter apply(Config config) { return ((exchange, chain) -> { ServerHttpRequest request = exchange.getRequest(); if(!request.getHeaders().containsKey(HttpHeaders.AUTHORIZATION)) { return onError(exchange, "no authorization header", HttpStatus.UNAUTHORIZED); } String authorizationHeader = request.getHeaders().get(HttpHeaders.AUTHORIZATION).get(0); // 반환값은 list배열이기에 0 String jwt = authorizationHeader.replace("Bearer", ""); if(!isJwtValid(jwt)) { return onError(exchange, "JWT token is not valid", HttpStatus.UNAUTHORIZED); } return chain.filter(exchange); }); }여기서 지속적으로 아래 오류가 발생중입니다2023-09-10 01:15:35.241 INFO 31372 --- [tor-http-nio-10] c.e.a.filter.GlobalFilter : Global Filter baseMessage: Spring Cloud Gateway Global Filter2023-09-10 01:15:35.241 INFO 31372 --- [tor-http-nio-10] c.e.a.filter.GlobalFilter : Global Filter Start: request id -> a5f590c1-10982023-09-10 01:15:35.270 INFO 31372 --- [tor-http-nio-10] c.e.a.filter.GlobalFilter : Global Filter End: response code -> 404 NOT_FOUND2023-09-10 01:15:35.528 INFO 31372 --- [tor-http-nio-12] c.e.a.filter.GlobalFilter : Global Filter baseMessage: Spring Cloud Gateway Global Filter2023-09-10 01:15:35.528 INFO 31372 --- [tor-http-nio-12] c.e.a.filter.GlobalFilter : Global Filter Start: request id -> d5aec101-10992023-09-10 01:15:35.528 ERROR 31372 --- [tor-http-nio-12] c.e.a.filter.AuthorizationHeaderFilter : no authorization header2023-09-10 01:15:35.528 INFO 31372 --- [tor-http-nio-12] c.e.a.filter.GlobalFilter : Global Filter End: response code -> 401 UNAUTHORIZED 이 필터관련 수정한건 없는것으로 기억하는데 제가 놓친게있을까요답답하네요 ㅠ
-
미해결실습으로 배우는 선착순 이벤트 시스템
쿠폰 카운트를 Redis에 의존하고 있는데요
만약 Redis에 장애가 발생한다면 2차 장치로 DB Count에 의존할 수 밖에 없는걸까요?실무에선 어떻게 대응하시는지 궁금합니다!
-
미해결[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!
KStream, KTable 조인 스트림즈 애플리케이션에서 에러가 발생하고 있습니다.
... [order-join-application-05b24bf4-65d2-4fda-83be-a754a4988a43-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [order-join-application-05b24bf4-65d2-4fda-83be-a754a4988a43-StreamThread-1] State transition from PARTITIONS_ASSIGNED to PENDING_SHUTDOWN [order-join-application-05b24bf4-65d2-4fda-83be-a754a4988a43-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [order-join-application-05b24bf4-65d2-4fda-83be-a754a4988a43-StreamThread-1] Shutting down [order-join-application-05b24bf4-65d2-4fda-83be-a754a4988a43-StreamThread-1] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=order-join-application-05b24bf4-65d2-4fda-83be-a754a4988a43-StreamThread-1-restore-consumer, groupId=null] Unsubscribed all topics or patterns and assigned partitions [order-join-application-05b24bf4-65d2-4fda-83be-a754a4988a43-StreamThread-1] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=order-join-application-05b24bf4-65d2-4fda-83be-a754a4988a43-StreamThread-1-producer] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. [order-join-application-05b24bf4-65d2-4fda-83be-a754a4988a43-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [order-join-application-05b24bf4-65d2-4fda-83be-a754a4988a43-StreamThread-1] State transition from PENDING_SHUTDOWN to DEAD [order-join-application-05b24bf4-65d2-4fda-83be-a754a4988a43-StreamThread-1] INFO org.apache.kafka.streams.KafkaStreams - stream-client [order-join-application-05b24bf4-65d2-4fda-83be-a754a4988a43] State transition from REBALANCING to ERROR [order-join-application-05b24bf4-65d2-4fda-83be-a754a4988a43-StreamThread-1] ERROR org.apache.kafka.streams.KafkaStreams - stream-client [order-join-application-05b24bf4-65d2-4fda-83be-a754a4988a43] All stream threads have died. The instance will be in error state and should be closed. [order-join-application-05b24bf4-65d2-4fda-83be-a754a4988a43-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [order-join-application-05b24bf4-65d2-4fda-83be-a754a4988a43-StreamThread-1] Shutdown complete Exception in thread "order-join-application-05b24bf4-65d2-4fda-83be-a754a4988a43-StreamThread-1" java.lang.UnsatisfiedLinkError: /private/var/folders/16/xqv9hsq91sn7glvzckc__r100000gn/T/librocksdbjni3612565276450787735.jnilib: dlopen(/private/var/folders/16/xqv9hsq91sn7glvzckc__r100000gn/T/librocksdbjni3612565276450787735.jnilib, 0x0001): tried: '/private/var/folders/16/xqv9hsq91sn7glvzckc__r100000gn/T/librocksdbjni3612565276450787735.jnilib' (mach-o file, but is an incompatible architecture (have 'x86_64', need 'arm64e'))샘플 코드 실행 시 해당 에러가 계속 발생하고 있습니다.원인을 알 수 있을까요?
-
미해결[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!
프로듀서를 통해 카프카 클러스터에 레코드를 보낼 때 항상 1개의 배치로만 tcp 통신이 이루어지나요?
카프카 프로듀서 소개 강의를 보다 질문드립니다.프로듀서에서 레코드를 send하면 Accumulator에서 배치로 묶는 과정을 한다고 하셨는데요.실제 Sender가 발생하는 시점에 프로듀서 애플리케이션에서 항상 1개의 배치 단위로 tcp 통신이 발생하나요?아니면 내부적으로 여러개의 배치를 한번의 tcp 통신으로 통신할까요?
-
미해결Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA)
2023.08.10 기준 Spring Boot 3버전 대 Security Config 설정 파일 공유합니다.
@Configuration @EnableWebSecurity @RequiredArgsConstructor public class WebSecurity { private final UserService userService; private final BCryptPasswordEncoder bCryptPasswordEncoder; private final Environment environment; @Bean public SecurityFilterChain filterChain(HttpSecurity http) throws Exception { http.csrf(AbstractHttpConfigurer::disable) .authorizeHttpRequests(request ->{ request.requestMatchers(antMatcher("/actuator/**")).permitAll(); request.requestMatchers(antMatcher("/**")).permitAll();}) // .headers(header -> header.frameOptions( // frameOptionsConfig -> frameOptionsConfig.disable())) .apply(new MyCustomSecurity()); return http.build(); } public class MyCustomSecurity extends AbstractHttpConfigurer<MyCustomSecurity, HttpSecurity> { @Override public void configure(HttpSecurity http) throws Exception { AuthenticationManager authenticationManager = http.getSharedObject( AuthenticationManager.class); AuthenticationFilter authenticationFilter = new AuthenticationFilter(authenticationManager, userService, environment); http.addFilter(authenticationFilter); } protected void configure2(AuthenticationManagerBuilder auth) throws Exception { auth.userDetailsService(userService).passwordEncoder(bCryptPasswordEncoder); } } }해당 코드는 앞 내용까지 포함하여 설정이 적용된 버전입니다. 저같은 경우는 처음부터 mysql로 진행하여서 h2 콘솔을 사용하지 않아 frameOptions를 주석처리 하였으나, 혹여나 h2 콘솔 사용하시는 분은 주석 해제 후 사용하시면 되고, 23.08.10 기준으로 hasIpAddress는 사용 불가능합니다.
-
미해결Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA)
spring boot 3버전대에서 metrics에서 정보 안나오는 문제 해결
@Configuration @EnableAspectJAutoProxy public class MetricsConfig { @Bean public TimedAspect timedAspect(MeterRegistry registry) { return new TimedAspect(registry); } }해당 config 파일을 추가해주세요.
-
해결됨Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA)
(필수정보) 레거시 bootstrap 을 사용하지 않는 방법
* bootstrap 라이브러리를 추가하고 아래와 같이 application.yml 파일만으로 설정하면 적용이 안됩니다.dependency에 bootstrap을 의존성 추가하지 않습니다.그 후 application.yml파일에 아래와 같이 설정합니다.spring: cloud: config: name: ecommerce # yml 파일명 앞부분 config: import: optional:configserver:http://localhost:8888 # 구성정보 설정강의에서 나오는 bootstrap.yml 설정 정보를 Spring Boot 2.4버전 이후부터는 application.yml 설정 정보에서 사용할 수 있습니다. (공식홈페이지)위 방법은 공식 홈페이지 목차에서 Spring Cloud Config Client - Spring Boot Config Data Import 부분에 나와있습니다.만약 bootstrap 라이브러리를 사용한 구성설정을 하고 싶다면 공식홈페이지 목차에서 Spring Cloud Config Client - Config First Bootstrap 부분을 살펴보시면 되겠습니다.
-
미해결[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!
카프카 브로커 cpu 사용률 관련입니다.
안녕하세요, 강의 잘 봤습니다.카프카의 실제 운영관점의 질문을 드리고자 합니다. 상황에 따라 다르겠지만, 카프카 브로커의 권장 cpu 스펙이 있을까요? 카프카 브로커의 cpu사용률은 무엇에 크게 좌우될지 궁금합니다.예를들면, 토픽 및 파티션의 수에 비례한다든지, 메시지 사이즈에 비례한다든지 질문드리는 이유는 카프카를 신규 구성 예정인데요, 내부적으로 테스트 해봤을때 업무량에 크게 좌우되지는 않는 것 같았는데, 파티션 수가 많은 경우에 튀어 보이긴 합니다. 참고로 압축기능은 사용하지 않습니다.
-
미해결카프카 완벽 가이드 - 커넥트(Connect) 편
Debezium Source 에서 topic에 저장되는 UTC시간대 질문
안녕하세요. topic에 UTC 시간대로 저장되는 문제가 있습니다.해결 접근방법에 조언을 듣고 싶습니다.Sink를 적용했을때 customers, products,order_items는 문제없이 적용되었으나 orders테이블의 timestamp타입의 order_datetime컬럼에 문제가 발생하여 SMT 옵션을 추가하다가 발견한 문제입니다.결론적으로 mysql_cdc_oc_sink_orders_01.json에 "transforms": "ConvertDateTimeType", "transforms.ConvertDateTimeType.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value", "transforms.ConvertDateTimeType.target.type": "Timestamp", "transforms.ConvertDateTimeType.field": "order_datetime", "transforms.ConvertDateTimeType.format": "yyyy-MM-dd'T'HH:mm:ss'Z'", "transforms.ConvertDateTimeType.timezone": "Asia/Seoul"위 옵션을 추가하여 sink로 저장을 해결하였으나 topic에 저장되는 시간이 다르게 저장되는것을 발견했습니다.source 데이터베이스에서는 2023-06-20 13:56:40 에 저장하였으나sink 데이터베이스에서는 2023-06-20 04:56:40으로 저장되고 있었습니다.이에 topic을 확인해보니 저장되는 시간대가 2023-06-20 04:56:40으로 topic에서부터 저장되는 값이 다른 것을 알 수 있었습니다.따라서 source설정쪽이 문제일 것 같은데 "database.connectionTimeZone": "Asia/Seoul"옵션을 넣었음에도 UTC로 적용되고있어 질문드립니다. 감사합니다. mysql_cdc_oc_source_01.json{ "name": "mysql_cdc_oc_source_01", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "192.168.56.101", "database.port": "3306", "database.user": "connect_dev", "database.password": "connect_dev", "database.server.id": "10001", "database.server.name": "mysql01", "database.include.list": "oc", "table.include.list": "oc.customers, oc.products, oc.orders, oc.order_items", "database.history.kafka.bootstrap.servers": "192.168.56.101:9092", "database.history.kafka.topic": "schema-changes.mysql.oc", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "transforms": "unwrap", "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", "transforms.unwrap.drop.tombstones": "false", "database.connectionTimeZone": "Asia/Seoul" } }MYSQL의 TIME_ZONE은 한국시간대입니다.mysql> select @@system_time_zone; +--------------------+ | @@system_time_zone | +--------------------+ | KST | +--------------------+
-
미해결Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA)
No suitable driver 나오시는 분들
https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.33/mysql-connector-j-8.0.33.jar제 경우 mariadb 커넥터 대신 mysql 커넥터 사용해서 해결했습니다.(... confluentXXX/share/java/kafka 폴더에 위치)정말.. 하루종일 아래 예외로 애먹었네여..Error while starting connector ...No suitable driver found for jdbc:mysql://localhost:3307/mydb mariadb 버전별 커넥터를 5개정도 해봐도 안되어서자포자기한 채로 mysql 커넥터를 사용하니 되었습니다 ㅠㅠ자세한 버전 정보입니다. (윈도우)kafka : 2.13-3.4.0kafka-connect-jdbc : 10.7.2confluent : 7.4.0mariaDB : 10.11java : 17 설정 정보입니다.confluent/etc/kafka/connect-distributed.properties89번 행 근처 plugin.path=\C:\\Work\\confluentinc-kafka-connect-jdbc-10.7.2\\lib confluent/bin/windows/kafka-run-class.bat97번 행 근처 rem classpath addition for LSB style path if exist "%BASE_DIR%\share\java\kafka\*" ( call :concat "%BASE_DIR%\share\java\kafka\*" ) (...) 122번 행 근처 rem Log4j settings IF ["%KAFKA_LOG4J_OPTS%"] EQU [""] ( if exist %~dp0../../etc/kafka/tools-log4j.properties ( set KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:%~dp0../../etc/kafka/tools-log4j.properties ) else ( set KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:%BASE_DIR%/etc/kafka/tools-log4j.properties ) ) ELSE ( rem create logs directory IF not exist "%LOG_DIR%" ( mkdir "%LOG_DIR%" ) set KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:%~dp0../../etc/kafka/tools-log4j.properties ) confluent/bin/windows/connect-distributed.bat28번 행 근처 rem Log4j settings IF ["%KAFKA_LOG4J_OPTS%"] EQU [""] ( set KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:%BASE_DIR%/etc/kafka/connect-log4j.properties -Dlog4j.config.dir=%BASE_DIR%/etc/kafka )POST Request{ "name":"my-source-connect", "config":{ "connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector", "connection.url":"jdbc:mysql://localhost:3307/mydb", "connection.user":"root", "connection.password":"1234", "mode":"incrementing", "incrementing.column.name":"id", "table.whitelist":"users", "topic.prefix":"my_topic_", "tasks.max":"1" } }
-
미해결[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!
브로커에 연결이 안됩니다
원영님 강의를 보고 현재 스프링 프레임워크 기반의 회사 솔루션에 카프카 도입을 구현하고 있습니다. 터미널 상에서 컨슈머와 프로듀서는 잘 주고 받는데 IDE로 넘어와 적용하면 에러가 발생합니다브로커의 로그를 확인해보니 Processing automatic preferred replica leader election (kafka.controller.KafkaController)Checking need to trigger auto leader balancing (kafka.controller.KafkaController)Topics not in preferred replica for broker 0 Map() (kafka.controller.KafkaController)이런 로그들이 찍히는데 구글링해도 이렇다 할 해결방법을 못찾고 있습니다 컨슈머 프로퍼티에 부트스트랩 서버도 localhost:9092 프로듀서 프로퍼티의 부트스트랩 서버도 동일하며 서버 프로퍼티의 애드버타이즈드 리스너도 localhost:9092로 설정해뒀습니다혹시 제가 빠뜨린 부분이 있을까요? 도움 주시면 감사하겠습니다..
-
해결됨Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA)
강사님 Schema에 대해 궁금점이 있습니다.
좋은 강의 감사드립니다.덕분에 많은 걸 배워서 마음이 든든해 집니다.그런데, 하나 궁금한 점이 있어서 여쭤보고 싶습니다.우리 개발자들은 사실 귀찮은 걸 싫어하기 때문에 기술이 나날이 발전해 가는 것이라 생각이 되는데요, 강사님이 설명주신 부분처럼 Schema나 Payload를 계속해서 수동으로 만드는 것은 개발자 입장에서는 매우 비효율적일거라 생각이 됩니다.따라서, 이미 저 부분을 해결하기 위한 많은 부분들이 고려되었을 것으로 예상이 되는데요, 혹시 저 부분을 편하게 등록할 수 있는 다른 방법이 있을까요? 키워드라도 주시면 찾아서 공부해 나가는데 큰 도움이 될 것 같습니다. 감사합니다.
-
미해결15일간의 빅데이터 파일럿 프로젝트
오라클 Virtualbox 설치 후 서버 실행시 오류입니다.
VM Name: Server01Failed to open/create the internal network 'HostInterfaceNetworking-VirtualBox Host-Only Ethernet Adapter' (VERR_INTNET_FLT_IF_NOT_FOUND).Failed to attach the network LUN (VERR_INTNET_FLT_IF_NOT_FOUND).Result Code:E_FAIL (0X80004005)Component:ConsoleWrapInterface:IConsole {6ac83d89-6ee7-4e33-8ae6-b257b2e81be8} 이런 오류가 뜨는데 어떻게 해결하면 좋을까요?? 구글링 해보니 host-only-ethernet을 새로만들고 다시 해보라고 하던데 그 방법도 되지 않습니다.Virtualbox 버전은 7.0.8 최신버전입니다.
-
미해결[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!
카프카 이해
제가 카프카에 대한 구조와 사용법을 이해하지 못해서 학습 재미가 떨어지는데요.카프카를 이렇게 써야 하는지를 맞는지가 궁금합니다. 많은 영상을 봤지만서도 사용법이 잘 이해가 안되서 질문드립니다
-
미해결Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA)
bootstrap.yml 파일의 "encrypt.key-store.password" 암호화
spring config를 적용하면 설정 값들에 암호화를 수행하여 "{cipher}..." 구문으로 적용할 수 있는데, bootstrap.yml 파일의 "encrypt.key-store.password", "encrypt.key-store.secret" 항목의 값은 암호화를 어떻게 할 수 있는지 방법 부탁드립니다.