묻고 답해요
141만명의 커뮤니티!! 함께 토론해봐요.
인프런 TOP Writers
-
미해결처음하는 MongoDB(몽고DB) 와 NoSQL(빅데이터) 데이터베이스 부트캠프 [입문부터 활용까지] (업데이트)
stduio 3t 설치 완료후 처음 실행하고 Login진행 시키면 웹 화면에서 계속 멈춰있습니다 더이상 진행도 안됩니다.
- 본 강의 영상 학습 관련 문의에 대해 답변을 드립니다. (어떤 챕터 몇분 몇초를 꼭 기재부탁드립니다)- 이외의 문의등은 평생강의이므로 양해를 부탁드립니다- 현업과 병행하는 관계로 주말/휴가 제외 최대한 3일내로 답변을 드리려 노력하고 있습니다- 잠깐! 인프런 서비스 운영 관련 문의는 1:1 문의하기를 이용해주세요.
-
미해결스파크 머신러닝 완벽 가이드 - Part 1
MLOps 관련 MLFLow 및 Databricks 모델 서빙
안녕하세요 강사님, 좋은 강의 감사드립니다! 요즘 MLOps 관련 model deploy 및 serving 하는 것이 더 중요해지고 있는데 관련 강의를 준비하고 계신지 궁금합니다. 처음에는 이 강의에 추가로 강의를 올리시는 것도 계획 중이신지 여쭙고 싶었으나, 해당 강의와 model을 deploy하고 serving 하는 것은 강의의 범위에서 벗어나는 것도 같네요 ㅎㅎ Databricks에서도 MLFlow 이용해서 experiment를 하는 것이 꽤 잘 되어 있는 것 같던데 혹시라도 관련 강의 준비하고 계시다면 너무 기대됩니다!
-
해결됨Airflow 마스터 클래스
docker-compose up 이후 웹서버가 뜨지 않습니다
안녕하세요 도커&에어플로우 설치 부분 강사님 강의보며 실습하던 중에 크롬창에서 웹서버 접속이 되지 않아 문의드립니다.환경은 mac os 입니다.sudo docker-compose up airflow-initsudo docker-compose up 명령어 사용해서 진행했습니다.로그 중에 permission denied가 나는 부분이 있는데 이 부분 때문일까요?airflow-init-1 | chown: changing ownership of '/sources/logs': Permission denied airflow-init-1 | chown: changing ownership of '/sources/dags': Permission denied airflow-init-1 | chown: changing ownership of '/sources/plugins': Permission denied 로그 전문 공유드립니다.(airflow-dockercompose) jeremy 😹 ~/dev/airflow-dockercompose sudo docker-compose up airflow-init Password: [+] Running 44/3 ✔ postgres 13 layers [⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿] 0B/0B Pulled 72.8s ✔ redis 6 layers [⣿⣿⣿⣿⣿⣿] 0B/0B Pulled 84.6s ✔ airflow-init 22 layers [⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿] 0B/0B Pulled 68.5s [+] Running 3/3 ✔ Container airflow-dockercompose-redis-1 Created 0.3s ✔ Container airflow-dockercompose-postgres-1 Created 0.3s ✔ Container airflow-dockercompose-airflow-init-1 Created 0.0s Attaching to airflow-init-1 airflow-init-1 | chown: changing ownership of '/sources/logs': Permission denied airflow-init-1 | chown: changing ownership of '/sources/dags': Permission denied airflow-init-1 | chown: changing ownership of '/sources/plugins': Permission denied airflow-init-1 | The container is run as root user. For security, consider using a regular user account. airflow-init-1 | airflow-init-1 | DB: postgresql+psycopg2://airflow:***@postgres/airflow airflow-init-1 | Performing upgrade to the metadata database postgresql+psycopg2://airflow:***@postgres/airflow airflow-init-1 | [2024-02-13T12:36:11.330+0000] {migration.py:216} INFO - Context impl PostgresqlImpl. airflow-init-1 | [2024-02-13T12:36:11.332+0000] {migration.py:219} INFO - Will assume transactional DDL. airflow-init-1 | [2024-02-13T12:36:11.344+0000] {db.py:1616} INFO - Creating tables airflow-init-1 | INFO [alembic.runtime.migration] Context impl PostgresqlImpl. airflow-init-1 | INFO [alembic.runtime.migration] Will assume transactional DDL. airflow-init-1 | OpenBLAS WARNING - could not determine the L2 cache size on this system, assuming 256k airflow-init-1 | Database migrating done! airflow-init-1 | /home/airflow/.local/lib/python3.8/site-packages/flask_limiter/extension.py:336 UserWarning: Using the in-memory storage for tracking rate limits as no storage was explicitly specified. This is not recommended for production use. See: https://flask-limiter.readthedocs.io#configuring-a-storage-backend for documentation about configuring the storage backend. airflow-init-1 | OpenBLAS WARNING - could not determine the L2 cache size on this system, assuming 256k airflow-init-1 | airflow already exist in the db airflow-init-1 | 2.8.1 airflow-init-1 exited with code 0 (airflow-dockercompose) jeremy 😹 ~/dev/airflow-dockercompose sudo docker-compose up [+] Running 7/7 ✔ Container airflow-dockercompose-postgres-1 Running 0.0s ✔ Container airflow-dockercompose-redis-1 Running 0.0s ✔ Container airflow-dockercompose-airflow-init-1 Created 0.0s ✔ Container airflow-dockercompose-airflow-worker-1 Created 0.1s ✔ Container airflow-dockercompose-airflow-scheduler-1 Created 0.1s ✔ Container airflow-dockercompose-airflow-triggerer-1 Created 0.1s ✔ Container airflow-dockercompose-airflow-webserver-1 Created 0.1s Attaching to airflow-init-1, airflow-scheduler-1, airflow-triggerer-1, airflow-webserver-1, airflow-worker-1, postgres-1, redis-1 airflow-init-1 | chown: changing ownership of '/sources/logs': Permission denied airflow-init-1 | chown: changing ownership of '/sources/dags': Permission denied airflow-init-1 | chown: changing ownership of '/sources/plugins': Permission denied airflow-init-1 | The container is run as root user. For security, consider using a regular user account. airflow-init-1 | airflow-init-1 | DB: postgresql+psycopg2://airflow:***@postgres/airflow airflow-init-1 | Performing upgrade to the metadata database postgresql+psycopg2://airflow:***@postgres/airflow airflow-init-1 | [2024-02-13T12:37:00.495+0000] {migration.py:216} INFO - Context impl PostgresqlImpl. airflow-init-1 | [2024-02-13T12:37:00.496+0000] {migration.py:219} INFO - Will assume transactional DDL. airflow-init-1 | [2024-02-13T12:37:00.509+0000] {db.py:1616} INFO - Creating tables airflow-init-1 | INFO [alembic.runtime.migration] Context impl PostgresqlImpl. airflow-init-1 | INFO [alembic.runtime.migration] Will assume transactional DDL. airflow-init-1 | OpenBLAS WARNING - could not determine the L2 cache size on this system, assuming 256k airflow-init-1 | Database migrating done! airflow-init-1 | /home/airflow/.local/lib/python3.8/site-packages/flask_limiter/extension.py:336 UserWarning: Using the in-memory storage for tracking rate limits as no storage was explicitly specified. This is not recommended for production use. See: https://flask-limiter.readthedocs.io#configuring-a-storage-backend for documentation about configuring the storage backend. airflow-init-1 | OpenBLAS WARNING - could not determine the L2 cache size on this system, assuming 256k airflow-init-1 | airflow already exist in the db airflow-init-1 | 2.8.1 airflow-init-1 exited with code 0 airflow-triggerer-1 | airflow-worker-1 | airflow-webserver-1 | airflow-scheduler-1 | airflow-webserver-1 | [2024-02-13T12:37:28.080+0000] {configuration.py:2065} INFO - Creating new FAB webserver config file in: /opt/airflow/webserver_config.py airflow-triggerer-1 | ____________ _____________ airflow-triggerer-1 | ____ |__( )_________ __/__ /________ __ airflow-triggerer-1 | ____ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / / airflow-triggerer-1 | ___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ / airflow-triggerer-1 | _/_/ |_/_/ /_/ /_/ /_/ \____/____/|__/
-
미해결RDBMS Modeling 실습
시 군 구로 나눈 이유가 궁금합니다.
서울특별시 무슨무슨구 무슨무슨 동이면하나의 컬럼이아니라 여러개의 컬럼으로 나누어서 sd,sgg.. 등의 컬럼을 만들어서 하나의 테이블로 가능할것 같은데 이름 하나 때문에 테이블이 3개가 더 생기는 것 같은데 이유가 있을까요?
-
미해결15일간의 빅데이터 파일럿 프로젝트
클라우데라 ERR_CONNECTION_REFUSED 문제
안녕하세요 VM 통합 환경 구성 중입니다.현재 인텔 MAC 사용중이고 ,HOST,NAC 설정 까지 다 해주었는데 연결이 안돼서 진행을 못하고 있습니다 . ㅠ추가로 putty 접속도 안되네요.. 원인이 뭘까요??..확인 한번 부탁드립니다..++ 수정네트워크를 다음과 같이 변경후 서버 재시작하였더니이제 refused는 뜨지 않지만 time out 에러가 뜨네요 ㅠputty도 마찬가지입니다. ++ 수정 server02 는 현재 ssh 접속이 가능합니다..!정확하게 host정보를 입력한거 같은데 server01은 접속이 안되네요 ! ++ server 01 에서 바로 서비스체크 해보았습니다.클라우데라 매니저 잘 작동 중이고 ,, 리스타도 해보았는데여전히 http://server01.hadoop.com/ 치고 들어가면 refused 뜨네요 ㅜㅜ
-
미해결mongoDB 기초부터 실무까지(feat. Node.js)
섹션6의 3번째 강의 faker파일이 어디있나요?
전 강의에 올려놓으셨다고 하셨는데 Mongoose v6 업데이트 관련 공지만 있고 faker 관련 코드를 못 찾겠어서 질문 남깁니다.
-
미해결실리콘밸리 엔지니어와 함께하는 Apache Airflow
메타데이터 의미
강사님 안녕하세요메타데이터는 테이터의 설명 즉, 테이블에 대한 명세서 라고 정의 하는것을 봤습니다. 그런데 또 다른 의미로 원천 데이터 라는 의미로도 사용될 수 있을까요? 예를들어 overview강의에서 6:24초에 "다른 메타 데이터와 합성해서"라고 말씀 해주셔서요! 사소하지만 조금 헷갈려서 문의 드립니다!강의 잘 듣고 있습니다. 감사합니다!
-
미해결카프카 완벽 가이드 - 코어편
nocommit 관련 질문
package com.example; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.serialization.StringDeserializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.time.Duration; import java.util.Arrays; import java.util.Map; import java.util.Properties; public class ConsumerPartitionAssignSeek { public static final Logger logger = LoggerFactory.getLogger(ConsumerPartitionAssignSeek.class.getName()); public static void main(String[] args) { String topicName = "pizza-topic"; Properties props = new Properties(); props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.101:9092"); props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); //props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group_pizza_assign_seek_v001"); //props.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "6000"); props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(props); TopicPartition topicPartition = new TopicPartition(topicName, 0); //kafkaConsumer.subscribe(List.of(topicName)); kafkaConsumer.assign(Arrays.asList(topicPartition)); kafkaConsumer.seek(topicPartition, 5L); //main thread Thread mainThread = Thread.currentThread(); //main thread 종료시 별도의 thread로 KafkaConsumer wakeup()메소드를 호출하게 함. Runtime.getRuntime().addShutdownHook(new Thread() { public void run() { logger.info(" main program starts to exit by calling wakeup"); kafkaConsumer.wakeup(); try { mainThread.join(); } catch(InterruptedException e) { e.printStackTrace();} } }); //kafkaConsumer.close(); //pollAutoCommit(kafkaConsumer); //pollCommitSync(kafkaConsumer); //pollCommitAsync(kafkaConsumer); pollNoCommit(kafkaConsumer); } private static void pollNoCommit(KafkaConsumer<String, String> kafkaConsumer) { int loopCnt = 0; try { while (true) { ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000)); logger.info(" ######## loopCnt: {} consumerRecords count:{}", loopCnt++, consumerRecords.count()); for (ConsumerRecord record : consumerRecords) { logger.info("record key:{}, partition:{}, record offset:{} record value:{}", record.key(), record.partition(), record.offset(), record.value()); } } }catch(WakeupException e) { logger.error("wakeup exception has been called"); }catch(Exception e) { logger.error(e.getMessage()); }finally { logger.info("finally consumer is closing"); kafkaConsumer.close(); } } private static void pollCommitAsync(KafkaConsumer<String, String> kafkaConsumer) { int loopCnt = 0; try { while (true) { ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000)); logger.info(" ######## loopCnt: {} consumerRecords count:{}", loopCnt++, consumerRecords.count()); for (ConsumerRecord record : consumerRecords) { logger.info("record key:{}, partition:{}, record offset:{} record value:{}", record.key(), record.partition(), record.offset(), record.value()); } kafkaConsumer.commitAsync(new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { if(exception != null) { logger.error("offsets {} is not completed, error:{}", offsets, exception.getMessage()); } } }); } }catch(WakeupException e) { logger.error("wakeup exception has been called"); }catch(Exception e) { logger.error(e.getMessage()); }finally { logger.info("##### commit sync before closing"); kafkaConsumer.commitSync(); logger.info("finally consumer is closing"); kafkaConsumer.close(); } } private static void pollCommitSync(KafkaConsumer<String, String> kafkaConsumer) { int loopCnt = 0; try { while (true) { ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000)); logger.info(" ######## loopCnt: {} consumerRecords count:{}", loopCnt++, consumerRecords.count()); for (ConsumerRecord record : consumerRecords) { logger.info("record key:{}, partition:{}, record offset:{} record value:{}", record.key(), record.partition(), record.offset(), record.value()); } try { if(consumerRecords.count() > 0 ) { kafkaConsumer.commitSync(); logger.info("commit sync has been called"); } } catch(CommitFailedException e) { logger.error(e.getMessage()); } } }catch(WakeupException e) { logger.error("wakeup exception has been called"); }catch(Exception e) { logger.error(e.getMessage()); }finally { logger.info("finally consumer is closing"); kafkaConsumer.close(); } } public static void pollAutoCommit(KafkaConsumer<String, String> kafkaConsumer) { int loopCnt = 0; try { while (true) { ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000)); logger.info(" ######## loopCnt: {} consumerRecords count:{}", loopCnt++, consumerRecords.count()); for (ConsumerRecord record : consumerRecords) { logger.info("record key:{}, partition:{}, record offset:{} record value:{}", record.key(), record.partition(), record.offset(), record.value()); } try { logger.info("main thread is sleeping {} ms during while loop", 10000); Thread.sleep(10000); }catch(InterruptedException e) { e.printStackTrace(); } } }catch(WakeupException e) { logger.error("wakeup exception has been called"); }finally { logger.info("finally consumer is closing"); kafkaConsumer.close(); } } } 해당 코드에 문제가 없는 것으로 보입니다. java.lang.IllegalStateException: This consumer has already been closed. at org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:2456) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1218) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211) at com.example.ConsumerPartitionAssignSeek.pollNoCommit(ConsumerPartitionAssignSeek.java:63) at com.example.ConsumerPartitionAssignSeek.main(ConsumerPartitionAssignSeek.java:53) 해당하는 에러가 띄는데 이유를 알 수 있을까요? git 코드는 잘돌아가는 것을 확인했습니다. 차이점이 알 수가 없어서 질문드립니다,
-
미해결카프카 완벽 가이드 - ksqlDB
stream format 관련 질문
안녕하세요, stream의 key.format, value.format 관련 질문있습니다.이 두 옵션이 직렬화에 대한 옵션이라고 하셨는데, select 시에는 consumer가 동작하고 insert에는 producer가 동작한다고 하셨던 것과 조금 혼동이 되어서 질문드립니다. insert 시에 stream옵션으로 직렬화가 되면 producer의 직렬화 옵션은 어떻게 되는건가요?그리고 select시 에는 topic 메시지를 consumer가 역직렬화해서 가져온 것을 다시 직렬화 하는건가요? stream의 직렬화 시점에 대해 조금 이해가 되지 않습니다. 감사합니다.
-
미해결15일간의 빅데이터 파일럿 프로젝트
managed_smartcar_status_info 테이블
안녕하세요.제가 최초 학습/실습했을 때도, 겪었던 문제인데, 계속 해결을 하지 못해 해당 문제를 여쭙습니다. [현재 문제상황]먼저 문제는, Oozie를 강제 실행했을 때, managed_smartcar_status_info 테이블은 생성되지만, 빈 테이블로, 내부에 아무 데이터가 조회되지 않는 것입니다. 제가 오타가 있을까 싶어, 테이블 삭제에 여러 날을 데이터 생성부터 재시도를 4..5번 정도 했는데, 모두 동일하게 데이터가 조회되지 않았습니다. 우선, 제가 세팅한 값과 생성되어 cat-batch-log 에 저장된 데이터의 이미지를 보여드리겠습니다.제가 작업을 실행한 것은 금일(2024-02-08)이고, 데이터 생성 시, 입력한 일자는 2022-03-22입니다. 따라서, 강의에서 말씀하신 용어를 기준으로, working_date와 biz_date는 각각, 20240208, 20220322입니다. Oozie에 task를 할당할 때는, download한 쿼리를 그대로 복사해서 붙여서 task를 정의했습니다. 이는 오타가 발생할 수 없고, 여러번 시도했었던 부분이기 때문에 해당 부분에는 에러가 없을 것으로 기대하고 있습니다. task에 대한 scheduling은 아래와 같이 구성하였습니다.Oozie를 강제 실행하였을 때, working_date에는 오늘 일자인 20240208을 입력하고, [저장] - [실행]을 했습니다. 실행 과정에서 에러는 발생하지 않았고, 로그에도 별다른 특이사항은 없었습니다. 이 후, 새로운 테이블 managed_smartcar_status_info이 생성되었음을 refresh를 통해 확인하였으나, 해당 테이블을 조회하면, 데이터가 조회되지 않는 것이 ... 일주일동안 해결하지 못하는 트러블 ...입니다.테이블을 조건절(where) 없이 조회했을 때도, 조회된 데이터가 마찬가지로 없었기 때문에 이 또한, 조건절 오타로 인해 조회되지 않는 것은 아닌 것으로 추측됩니다. [현재 확인된 부분]hdfs 쿼리를 통해 정상적으로 파일이 생성되었음을 확인했습니다.redis에서 데이터가 저장되어있음을 확인했습니다.hue를 통한 hbase 브라우저에 DriveCarInfo 내 key값을 이용하여 데이터를 조회했을 때, 조회되는 부분을 확인하였습니다. [개인적인 의견]HiveQL/그림-6.71.hql 내 작성된 쿼리를 통해 현재 제가 겪고 있는 문제를 생각해보았는데,현재 아래 두 쿼리는 조회 값이 NULL이 아니고, 충분히 많은 데이터가 조회됩니다. 그런데,select distinct car_number from smartCar_master_over18; select distinct car_number from SmartCar_Status_Info where wrk_date = '20240208';그런데, insert 내 select 부분만 추출하여 실행하면,select t1.car_number, t1.sex, t1.age, t1.marriage, t1.region, t1.job, t1.car_capacity, t1.car_year, t1.car_model, t2.tire_fl, t2.tire_fr, t2.tire_bl, t2.tire_br, t2.light_fl, t2.light_fr, t2.light_bl, t2.light_br, t2.engine, t2.break, t2.battery, t2.reg_date, substring(t2.reg_date, 0, 8) as biz_date from SmartCar_Master_Over18 t1 join SmartCar_Status_Info t2 on t1.car_number = t2.car_number and t2.wrk_date = '20240208';조회가 되지 않는다는 것을 알게 되었습니다. 위 쿼리를 조금 더 간략히 하면,select distinct t.car_number from smartcar_master_over18 t join smartcar_status_info son t.car_number = s.car_number where s.wrk_date = '20240208'; 가 되고, 이 또한 조회된 값이 없습니다. 현재 제가 학습을 하면서 이해를 하기론, smartcar_status_info 데이터는 flume에 의해 입수된 로그 데이터이고, smartcar_master_over18는 특정 조건에 의해 redis에 저장된 일부 데이터로 알고 있는데, 이렇게 되면,smartcar_master_over18 이 smartcar_status_info 에 포함되는 관계가 필연적으로 되어야하는게 아닌가하는게 제 생각입니다. 만약 제 생각이 맞다면, 로그가 잘못 생성되었다는 뜻인데, 혹시 강사님께선 제가 생각하는 것 외에 확인해야할 부분이 있다고 생각하신다면, 조언해주시길 부탁드립니다. 항상 친절한 가르침 감사합니다.즐거운 연휴 보내시고, 새해 복 많이 받으시길 바랍니다.감사합니다. """(마음의 소리)많이 바쁘시겠지만, 가능하다면 오늘 해결 방법 및 조언을 듣고, 긴 연휴.. 프로젝트를 마무리 및 정리를 하고 싶습니다... !! ㅠㅠ """
-
미해결카프카 완벽 가이드 - ksqlDB
전통적 분석 시스템 한계에 대해 질문있습니다.
안녕하세요, 실시간 분석 시스템 아키텍처에 ksqlDB 사용 명분(?)을 좀 더 확실히 하고 싶어서 질문드립니다. 제가 이해한 것은 전통적 분석 시스템은 운영 DB 부하로 분석 시스템을 직접 붙일 수 없고, DW/Batch을 분석용으로 따로 두는 것으로 이해했습니다.운영계에서 DW로 데이터를 전송하는 주기가 하루 주기인 것도 마찬가지로 운영계 I/O 부하 문제인걸까요? 또한 실시간 분석 시스템의 경우 CDC를 통해 일단위 데이터 전송에서 실시간으로 전송이 가능한 것으로 보이는데 이것은 redo dump file 전송은 DB에 직접적인 부하를 주지 않기 때문에 가능한 것인가요? 마지막으로 CDC 기반으로 실시간 데이터 전송을 했을 때 타겟 DBMS가 좋은 퍼포먼스를 가져야 함은 실시간 데이터에 대한 부하를 견딜 수 있어야 하기 때문인건가요? 질문이 많네요.. 늘 좋은 강의 감사드립니다.
-
미해결카프카 완벽 가이드 - 커넥트(Connect) 편
해결하지 못한 에러가 발생 하였습니다.
안녕하세요 개발자님 에러를 해결 하지 못해 도움을 받고 싶습니다.ksqldb 마지막 강의를 마치고 실습을 하던중 mysql 테이블을topic으로 connector(avro)를 한후 ksqldb에서 table을 만드는 과정에서 타입 변환 문제가 발생 하였습니다. avro를 통해 register에 스키마를 저장 하고 사용 하고자 하였습니다.강의 해주신 .properties 설정은 하였구요.topic에 데이터 들어온느거 확인스키마 확인sqldb 테이블 생성은 되지만 검색시 밑에와 같은 에러가 발생합니다. source, sink connector 실습은 잘 되었으며, ksqldb 거치지 않고 ELK에 데이터도 잘 보내 집니다. ksqldb에서 table 생성 과정에서 PRIMARY KEY설정을 하고 생성이 됩니다. 하지만 검색을 하면 밑에와 같은 에러가 납니다.PRIMARY KEY없이 table을 생성하면 key값이 보내면 Json형태의 키로 배출됩니다. {CUSTOMER_ID=1}key 타입을 INTEGER, bigint, int 타입 해보았습니다.mysql table도 다른걸로 만들어보고 했습니다.혹시 네가 노친것이 무엇인가요?어떻게 해야 할까여? register 실행 로그를 보니 WARNING: A provider io.confluent.kafka.schemaregistry.rest.resources.SchemasResource regi stered in SERVER runtime does not implement any provider interfaces applicable in the SER VER runtime. Due to constraint configuration problems the provider io.confluent.kafka.sch emaregistry.rest.resources.SchemasResource will be ignored.Feb. 08, 2024 5:59:52 A.M. org.glassfish.jersey.internal.inject.Providers checkProviderRu ntime있습니다. 어떻게 해야 하나요? [2024-02-08 00:47:43,983] ERROR {"type":0,"deserializationError":{"target":"key","errorMessage":"Error deserializing message from topic: mysqlavro022.oc.customers","recordB64":null,"cause":["Cannot deserialize type struct as type int32 for path: "],"topic":"mysqlavro022.oc.customers"},"recordProcessingError":null,"productionError":null,"serializationError":null,"kafkaStreamsThreadError":null} (processing.transient_OC_CUSTOMER_3798951142359913405.KsqlTopic.Source.deserializer:44)[2024-02-08 00:47:43,988] WARN stream-thread [_confluent-ksql-default_transient_transient_OC_CUSTOMER_3798951142359913405_1707320860130-b2b59a3e-3875-4eab-ad2a-185533cf65bc-StreamThread-1] task [0_0] Skipping record due to deserialization error. topic=[mysqlavro022.oc.customers] partition=[0] offset=[0] (org.apache.kafka.streams.processor.internals.RecordDeserializer:89)org.apache.kafka.common.errors.SerializationException: Error deserializing message from topic: mysqlavro022.oc.customersat io.confluent.ksql.serde.connect.KsqlConnectDeserializer.deserialize(KsqlConnectDeserializer.java:55)at io.confluent.ksql.serde.tls.ThreadLocalDeserializer.deserialize(ThreadLocalDeserializer.java:37)at io.confluent.ksql.serde.unwrapped.UnwrappedDeserializer.deserialize(UnwrappedDeserializer.java:47)at io.confluent.ksql.serde.unwrapped.UnwrappedDeserializer.deserialize(UnwrappedDeserializer.java:26)at io.confluent.ksql.serde.GenericDeserializer.deserialize(GenericDeserializer.java:59)at io.confluent.ksql.logging.processing.LoggingDeserializer.tryDeserialize(LoggingDeserializer.java:61)at io.confluent.ksql.logging.processing.LoggingDeserializer.deserialize(LoggingDeserializer.java:48)at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)at org.apache.kafka.streams.processor.internals.SourceNode.deserializeKey(SourceNode.java:54)at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:65)at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:178)at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:112)at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:304)at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:968)at org.apache.kafka.streams.processor.internals.TaskManager.addRecordsToTasks(TaskManager.java:1068)at org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:962)at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:751)at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:604)at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:576)Caused by: org.apache.kafka.connect.errors.DataException: Cannot deserialize type struct as type int32 for path:at io.confluent.ksql.serde.connect.ConnectDataTranslator.throwTypeMismatchException(ConnectDataTranslator.java:71)at io.confluent.ksql.serde.connect.ConnectDataTranslator.validateType(ConnectDataTranslator.java:90)at io.confluent.ksql.serde.connect.ConnectDataTranslator.validateSchema(ConnectDataTranslator.java:154)at io.confluent.ksql.serde.connect.ConnectDataTranslator.toKsqlValue(ConnectDataTranslator.java:200)at io.confluent.ksql.serde.connect.ConnectDataTranslator.toKsqlRow(ConnectDataTranslator.java:54)at io.confluent.ksql.serde.avro.AvroDataTranslator.toKsqlRow(AvroDataTranslator.java:67)at io.confluent.ksql.serde.connect.KsqlConnectDeserializer.deserialize(KsqlConnectDeserializer.java:51)... 18 more user01@ubuntu-20:~/kafka/connector_configs/cdc_source_mysql$ register_connector cdc_source_mysql/mysql_cdc_ops_source_avro_01.jsonHTTP/1.1 201 CreatedContent-Length: 1007Content-Type: application/jsonDate: Wed, 07 Feb 2024 15:42:52 GMTLocation: http://localhost:8083/connectors/mysql_cdc_ops_source_avro_03Server: Jetty(9.4.44.v20210927){"config": {"connector.class": "io.debezium.connector.mysql.MySqlConnector","database.connectionTimezone": "Asia/Seoul","database.history.kafka.bootstrap.servers": "localhost:9092","database.history.kafka.topic": "schema-changes.mysql.oc","database.hostname": "192.168.0.26","database.include.list": "oc","database.password": "1234","database.port": "3306","database.server.id": "31002","database.server.name": "mysqlavro022","database.user": "cnt_dev","key.converter": "io.confluent.connect.avro.AvroConverter","key.converter.schema.registry.url": "http://localhost:8081","name": "mysql_cdc_ops_source_avro_03","table.include.list": "oc.customers","tasks.max": "1","time.precision.mode": "connect","transforms": "unwrap","transforms.unwrap.drop.tombstones": "false","transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState","value.converter": "io.confluent.connect.avro.AvroConverter","value.converter.schema.registry.url": "http://localhost:8081"},"name": "mysql_cdc_ops_source_avro_03","tasks": [],"type": "source"} user01@ubuntu-20:~/kafka/data/kafka-logs$ show_topic_messages avro mysqlavro022.oc.customers{"customer_id": 1}{"customer_id": 1,"email_address": "test","full_name": "test"}user01@ubuntu-20:~/kafka$ http GET http://localhost:8081/schemas{"id": 23,"schema": "{\"type\":\"record\",\"name\":\"Key\",\"namespace\":\"mysqlavro022.oc.customers\",\"fields\":[{\"name\":\"customer_id\",\"type\":\"int\"}],\"connect.name\":\"mysqlavro022.oc.customers.Key\"}","subject": "mysqlavro022.oc.customers-key","version": 1},{"id": 24,"schema": "{\"type\":\"record\",\"name\":\"Value\",\"namespace\":\"mysqlavro022.oc.customers\",\"fields\":[{\"name\":\"customer_id\",\"type\":\"int\"},{\"name\":\"email_address\",\"type\":\"string\"},{\"name\":\"full_name\",\"type\":\"string\"}],\"connect.name\":\"mysqlavro022.oc.customers.Value\"}","subject": "mysqlavro022.oc.customers-value","version": 1}, CREATE TABLE oc_customer (customer_id int PRIMARY KEY,email_address varchar,full_name varchar) WITH (KAFKA_TOPIC = 'mysqlavro022.oc.customers',KEY_FORMAT = 'AVRO',VALUE_FORMAT = 'AVRO'); ksql> describe oc_customer extended;Name : OC_CUSTOMERType : TABLETimestamp field : Not set - using <ROWTIME>Key format : AVROValue format : AVROKafka topic : mysqlavro022.oc.customers (partitions: 1, replication: 1)Statement : CREATE TABLE OC_CUSTOMER (CUSTOMER_ID INTEGER PRIMARY KEY, EMAIL_ADDRESS STRING, FULL_NAME STRING) WITH (KAFKA_TOPIC='mysqlavro022.oc.customers', KEY_FORMAT='AVRO', VALUE_FORMAT='AVRO');Field | Type------------------------------------------------CUSTOMER_ID | INTEGER (primary key)EMAIL_ADDRESS | VARCHAR(STRING)FULL_NAME | VARCHAR(STRING)------------------------------------------------Local runtime statistics------------------------
-
미해결다양한 사례로 익히는 SQL 데이터 분석
캐글데이터 Postgresql 사용
안녕하세요! 강의 정말 잘 보고 있습니다!.강의 참고하면서, Kaggle에서 괜찮은 데이터로 매출분석, 퍼널분석 프로젝트해서 포트폴리오 만들어 보려고 하고 있습니다.그런데 Kaggle csv데이터를 Postgresql에서 쉽게 로드할 수 있는 방법이 있을까요?검색해보니 pgAdmin이라는 툴도 있는 것 같던데, DBeaver하고는 사용하는 법이 많이 다른 것 같아서요.DBeaver에서 csv 파일 DB에 적재하려면, 어떻게 해야할까요?? 감사합니다!
-
미해결카프카 완벽 가이드 - 코어편
Confluent Kafka 라이센스
Confluent Kafka의 커뮤니티 버전은아파치 카프카처럼 회사에서도 사용해도 문제없나요?
-
미해결[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!
Kafka Offset 및 LAG 관련 질문
안녕하세요. 강의 수강 중 궁금한점이 있어 문의드립니다. [구성]파티션 - 5개컨슈머 - 5개 [문의]1. 컨슈머 offset과 lag 관련 문의프로듀서에서 메시지를 생성해서 Kafka로 전송하면 Offset값이 2씩 증가합니다.CURRENT-OFFSET - 2증가 (7 -> 9)LOG-END-OFFSET - 2증가 (8 -> 10)메시지가 소비되고 나면 LAG은 항상 1이 유지되고 있는데 어떤 부분을 체크해 봐야할까요? (추가확인 : LAG이 항상 1이 유지되는데 kafka-console.consumer.sh로 확인해보면 메시지는 없는데 LAG이 0으로 변경됩니다.) 2. 소비된 메시지가 다시 소비되는 현상프로듀서로 메시지 생성 후 컨슈머에서 메시지를 소비하였는데 한참 시간이 지난 후 새벽시간(12시간 이후)에 이미 처리된 메시지가 컨슈머에서 다시 처리되는 현상이 발생하는데 설정값에 따라 발생할 수 있는 현상일까요? 3. 이중화 (Active-Active) 구성일 경우 컨슈머 설정이중화 구성이 되어 있는 경우 컨슈머를 @KafkaListener( concurrency = "2")로 설정하면 컨슈머는 총 4개로 운영되는 구조가 맞는지 궁금합니다. 감사합니다.
-
미해결15일간의 빅데이터 파일럿 프로젝트
Cloudera에서 식별되는 HDFS, HBase 에러
안녕하세요지난 2월 1일, Ooize를 통해 table을 create, alter, insert에 대한 task 정의하고, workflow를 실행하고, 아래와 같은 쿼리를 실행했습니다.SELECT * FROM MANAGED_SMARTCAR_STATUS_INFO WHERE BIZ_DATE = '20220201' LIMIT 10;,그러나, 중간의 파일명이 꼬였는지, 조회값이 나오지 않았습니다(제가 자투리 시간을 이용하여 공부하다보니, 20220130, 20220131, 20220201에 대한 log 파일이 모두 존재한 상황이었습니다. 해서, 맨 처음 flume 단계부터 다시 실행하기 위해 실행되는 모든 service를 종료하고, cloudera를 다시 실행하였더니, 첨부된 사진과 같이 HDFS와 HBase에 에러가 감지 되었습니다. 현재 shell을 통해 확인하니, HDFS는 에러가 있었음에도 정상적으로 파일을 저장하였지만, HBase는http://server02.hadoop.com:8888/에 접속이 되지 않아, 이후 공부가 진행을 하지 못하는 상황입니다... 무엇이 문제인지와 해결방법에 대해 알려주시면, 감사하겠습니다. P.S. 요즘 너무 많은 에러를 겪고, 이걸 스스로 해결을 하지 못하는 경우가 너무 많아 의지가 많이 약해지네요.. ㅠㅠ 주말에 리프레시 하고.. 다음주 월요일부터 다시 화이팅하겠습니다 ..
-
해결됨카프카 완벽 가이드 - 코어편
Kafka 클라이언트 To VM kafka 연결 질문
안녕하세요.강의 잘 보고 있습니다. 제가 회사에서 강의를 보고 있어서 그런데 강의 세팅과 조금 다르게 진행해서 연결에서 막힙니다. 일단 저는, 개인 PC로 IP - 192.168.100.170 인 서버 컴퓨터로 원격 연결을 하고그 안에서 VB로 ubuntu VM을 생성했습니다.VM의 고정 IP는 192.168.88.111로 설정했습니다.이후 편한 환경을 위해 putty같은 프로그램으로 ssh 연결을 했습니다.VM의 Port Forwarding으로ssh는 192.168.100.170:27722 -> 192.168.88.111:22192.168.100.170:29092 -> 192.168.88.111:9092 으로 진행했고 성공했습니다.이후 개인 PC에서 Intelij로 SimpleProducer 실습을 진행하는데, props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.100.170:29092");로 나름 머리를 써서 작성했습니다. 물론, VM의 server.properties에서 외부 연결을 허용하도록 했습니다만, 정확한지 확신은 없습니다. 이후, 코드를 실행했더니, socket timeout 에러가 나오고 카프카 컨슈머에 들어오지 않았습니다. Log를 자세히 보니 분명히 kafka topicId를 인지하는 걸 보니 연결은 된 것 같은데 뭐가 문제인지 모르겠습니다.Starting Gradle Daemon... Gradle Daemon started in 1 s 324 ms > Task :producers:compileJava UP-TO-DATE > Task :producers:processResources NO-SOURCE > Task :producers:classes UP-TO-DATE > Task :producers:SimpleProducer.main() [main] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values: acks = -1 batch.size = 16384 bootstrap.servers = [192.168.100.170:39092] buffer.memory = 33554432 client.dns.lookup = use_all_dns_ips client.id = producer-1 compression.type = none connections.max.idle.ms = 540000 delivery.timeout.ms = 120000 enable.idempotence = true interceptor.classes = [] key.serializer = class org.apache.kafka.common.serialization.StringSerializer linger.ms = 0 max.block.ms = 60000 max.in.flight.requests.per.connection = 5 max.request.size = 1048576 metadata.max.age.ms = 300000 metadata.max.idle.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner receive.buffer.bytes = 32768 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retries = 2147483647 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.connect.timeout.ms = null sasl.login.read.timeout.ms = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.login.retry.backoff.max.ms = 10000 sasl.login.retry.backoff.ms = 100 sasl.mechanism = GSSAPI sasl.oauthbearer.clock.skew.seconds = 30 sasl.oauthbearer.expected.audience = null sasl.oauthbearer.expected.issuer = null sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000 sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000 sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100 sasl.oauthbearer.jwks.endpoint.url = null sasl.oauthbearer.scope.claim.name = scope sasl.oauthbearer.sub.claim.name = sub sasl.oauthbearer.token.endpoint.url = null security.protocol = PLAINTEXT security.providers = null send.buffer.bytes = 131072 socket.connection.setup.timeout.max.ms = 30000 socket.connection.setup.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.3] ssl.endpoint.identification.algorithm = https ssl.engine.factory.class = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.certificate.chain = null ssl.keystore.key = null ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLSv1.3 ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.certificates = null ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS transaction.timeout.ms = 60000 transactional.id = null value.serializer = class org.apache.kafka.common.serialization.StringSerializer [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.1.0 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 37edeed0777bacb3 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1706742127571 [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Resetting the last seen epoch of partition test-topic-0 to 0 since the associated topicId changed from null to jRkpHnfwT8mfWJ3PB9HHmg [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Cluster ID: ysNHdh2DQTKvR3X0yruxdg [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Disconnecting from node 0 due to socket connection setup timeout. The timeout value is 9728 ms. [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Disconnecting from node 0 due to socket connection setup timeout. The timeout value is 18153 ms. [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Node 0 disconnected. [kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Connection to node 0 (/192.168.88.111:9092) could not be established. Broker may not be available. [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Node 0 disconnected. [kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Connection to node 0 (/192.168.88.111:9092) could not be established. Broker may not be available. [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Node 0 disconnected. [kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Connection to node 0 (/192.168.88.111:9092) could not be established. Broker may not be available. [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Node 0 disconnected. [kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Connection to node 0 (/192.168.88.111:9092) could not be established. Broker may not be available. [main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. [main] INFO org.apache.kafka.common.metrics.Metrics - Metrics scheduler closed [main] INFO org.apache.kafka.common.metrics.Metrics - Closing reporter org.apache.kafka.common.metrics.JmxReporter [main] INFO org.apache.kafka.common.metrics.Metrics - Metrics reporters closed [main] INFO org.apache.kafka.common.utils.AppInfoParser - App info kafka.producer for producer-1 unregistered Deprecated Gradle features were used in this build, making it incompatible with Gradle 9.0. You can use '--warning-mode all' to show the individual deprecation warnings and determine if they come from your own scripts or plugins. For more on this, please refer to https://docs.gradle.org/8.4/userguide/command_line_interface.html#sec:command_line_warnings in the Gradle documentation. BUILD SUCCESSFUL in 2m 5s 2 actionable tasks: 1 executed, 1 up-to-date 오전 8:04:07: Execution finished ':producers:SimpleProducer.main()'. [Producer clientId=producer-1] Resetting the last seen epoch of partition test-topic-0 to 0 since the associated topicId changed from null to jRkpHnfwT8mfWJ3PB9HHmg이 부분을 보아하니 토픽은 인지하는 것 같은데 말이죠..감사합니다.
-
미해결데이터 분석을 위한 파이썬: 입문부터 꿀팁까지
If 윤년판별기 질문
해답지와 다른데 elif 로 해도 되나요? 오류가 나는 이유도 잘 모르겠습니다 ㅠㅠ 그리고, 해답지에 if가 3번 나오고 나서 else 가 3번 나온 구조도 신기합니다. 왜 이렇게 하는 건가요? 첫번째 if 랑 첫번째 else 랑 이런식으로 짝꿍인데 그냥 띄어서 쓴건가요?if year % 4 == 0: if year % 100 == 0: if year % 400 == 0: print(year, "년도는 윤년입니다") else: print(year, "년도는 평년입니다") else: print(year, "년도는 윤년입니다") else: print(year, "년도는 평년입니다")
-
미해결15일간의 빅데이터 파일럿 프로젝트
데이터 저장 공간 관련 문의 드립니다.
안녕하세요 하둡 에코시스템은 여러가지 소프트웨어로 구성이 되어 있는거 같은데요기존 시스템의 경우 데이터가 저장되는 공간은 DB로만 보면 되었던거 같은데하둡에서는 수집된 데이터가 저장되는 공간이 HDFS, HIVE, Hbase 등 여러공간에 다 저장이 되어 있다고 봐야 하나요?그럼 컴플라이언스 관점에서 저장시 암호화 요건을 갖추려면 모든 저장공간에 암호화한 상태로 보관을 해야 하는건지 문의 드립니다.
-
미해결카프카 완벽 가이드 - 커넥트(Connect) 편
디비지움 sink connector 사용에 대해 질문있습니다.
디비지움은 source connector로만 사용하고 sink는 jdbc connector를 사용한다고 하셨는데요,그 이유가 sink connector를 디비지움 커넥터로 구축 시 소스 DB의 DDL 변경을 타겟 DB에서 반영하기 어렵다는 것인가요? 만약 맞다면 JDBC sink에서는 DDL에 대해서 잘 반영하는 것인지 궁금합니다.