묻고 답해요
141만명의 커뮤니티!! 함께 토론해봐요.
인프런 TOP Writers
-
해결됨카프카 완벽 가이드 - 코어편
[섹션2] 메세지 비동기 전송 부분에 기본적인 질문인데요
카프카 관련 질문이라기 보다는.. 자바에 익숙하지 않아서 자바에 관한 질문입니다. kafkaProducer.send(producerRecord, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception == null) { logger.info("partiion: " + metadata.partition()); logger.info("offset: " + metadata.offset()); logger.info("timestamp: " + metadata.timestamp()); } else { logger.error("exception error from broker: " + exception.getMessage()); } } });여기서 sendThread에서 callback에 대한 부분을 호출할때, 이런식으로 동작하는것으로 생각했습니다. 그래서 sendThread에서 broker에서 response를받아 callback에 해당하는 부분을 채워넣을때 이와 같이 동작한다고 생각합니다. (java에 익숙하지 않아서... python코드로 그냥 이해한대로 적어보겠씁니다.. ) def responseCallback(record, callback) { callback.onCompletion(record, exception) }이런식으로 callback 객체의 onCompletion 메서드를 호출하고 받은 정보를 parameter로 넘기는것으로 이해했는데요. 근데, lambda형식으로 바꾸게 되면, kafkaProducer.send(producerRecord, (metadata, exception) -> { if (exception == null) { logger.info("partiion: " + metadata.partition()); logger.info("offset: " + metadata.offset()); logger.info("timestamp: " + metadata.timestamp()); } else { logger.error("exception error from broker: " + exception.getMessage()); } } });이렇게 코드를 작성되는데, 이렇게 되면 callback 함수를 호출할때, onCompletion 메서드를 호출을 안하게 되는건가요?callback(metadata, exception)이와같이 호출을 하는건가요?? lambda에서의 호출방법으로 호출하는건지, 기존의 callback 객체를 호출하는 방식이 맞는건지.. 어떠한 부분이 맞는건지 궁금합니다.
-
미해결[2024 리뉴얼] 처음하는 SQL과 데이터베이스(MySQL) 부트캠프 [입문부터 활용까지]
SAKILA 폴더에서 가져온 SCHEMA와 DATA가 정확히 뭘까요
SCHEMA는 관계 정보고 (DESC명령어로 보는)DATA는 관계에 들어가는 데이터(SELECT 명령어로 보는)인가요?DB정보를 넘길때는 보통 이 두개 파일을 넘기게 되나요?
-
미해결Airflow 마스터 클래스
task가 실행되지 않습니다.
task가 제대로 수행되지 않습니다작업의 로그를 확인하면아래와 같은 메세지만 나타나구요 *** Could not read served logs: Request URL is missing an 'http://' or 'https://' protocol. 아래에서 저와 같은 증상을 겪은 사람을 찾았습니다만링크메모리 사용량을 늘려서 해결했다고만 나타납니다 저는 이미 WSL의 메모리를 8GB에 swap도 2GB도 준 상태구요... airflow의 worker 자체의 에러로그를 찾아보았더니 아래와 같이 권한 문제가 나타납니다[2023-10-31 15:51:22,188: ERROR/ForkPoolWorker-15] [4c24a6e8-1133-46a5-99ac-5fd6bdb3c730] Failed to execute task [Errno 13] Permission denied: '/opt/airflow/logs/dag_id=dags_bash_operator'.zz5414-airflow-worker-1 | Traceback (most recent call last):zz5414-airflow-worker-1 | File "/usr/local/lib/python3.8/pathlib.py", line 1288, in mkdirzz5414-airflow-worker-1 | self._accessor.mkdir(self, mode)zz5414-airflow-worker-1 | FileNotFoundError: [Errno 2] No such file or directory: '/opt/airflow/logs/dag_id=dags_bash_operator/run_id=manual__2023-10-31T15:51:20.844476+00:00/task_id=bash_t1' worker 컨테이너안의 /opt/airflow/logs/dag_id=dags_bash_operator라는 파일에 권한이 없어서 발생하는 문제로 보입니다. 컨테이너 내부 파일의 권한은 어떻게 설정하는지를 모르겠습니다 제가 만든 dag말고도 example_bash_operator도 마찬가지로 실행되지 않고 같은 에러입니다. 해결해주실 수 있을까요?정말 airflow 열심히 배워보려고 했거든요 ㅠㅠ
-
미해결[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!
스트림즈dsl의 state.dir에 대해
state.dir을 설명하시다가 /tmp의 생명주기가 다르다고 하셨는데 os 마다 /tmp의 데이터가 삭제되는 조건들이 다르다는 말씀인가요?
-
미해결따라하며 배우는 도커와 CI환경 [2023.11 업데이트]
docker build ./ 했는데 이미지 ID가 안나옵니다
mac os m2 모델을 사용하는데, 질문게시판에 있는대로 buildkit 부분을 건드리려고 설정에서 Docker Engine에 들어가니, 해당부분이 저는 없더라구요. 찾아봐도 이에 대한 언급은 없는데, m1/m2 mac silicon 도커에서는 이미지 Id를 다른 방식으로 찾아야하나요?
-
미해결[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!
MSA에서 카프카 사용
학습 목적으로 카프카를 사용 중인데, MSA 구조에서의 카프카 프로듀서, 컨슈머 개념이 잘 이해가 가지 않습니다 ㅠspring boot로 MSA 구조를 구축한 상태입니다. 각 서비스별로 스프링 부트 서버가 존재합니다. 각 서비스가 하나의 데이터베이스 (MySQL 혹은 MongoDB)를 공유하여 사용하려고 합니다. 이 때 스프링 부트가 카프카 토픽에 데이터를 저장하고, 토픽에 있는 데이터를 DB에 저장하여 MSA 환경에서 DB의 일관성을 유지하고자 하는데 이 경우에 카프카를 사용하는 것이 적합할까요?또한 스프링 부트 서버에서 카프카 토픽에 데이터를 주고받을 프로듀서와 컨슈머, MySQL에 토픽의 데이터를 넣고 빼올 프로듀서와 컨슈머 이런식으로 한 서버 당 최소 4개씩을 각각 모두 설정해야하나요?
-
미해결[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!
rebalancing 관리 관련 질문이 있습니다
안녕하세요 강사님rebalancing에 질문이 있습니다 commit이 Fail하여 rebalanced나 assigned partitions 같은 에러가 나올떄는보통 어떻게 관리를 하나요?rebalance가 안날순 없다고 알고 있습니다.보통 어떻게 이런 오류를 관리하고 처리하는지 알고싶습니다.따로 consumer를 restart하는 방법도 있나요? 그러면 문제가 될 게 있는지도 궁금합니다.
-
미해결[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!
조인관련해서 실행하려면 오류가 뜹니다
jnilib가 없다는 에러가 뜨는데 뭔가 설정을 해야하는 걸까요?Exception in thread "global-table-join-application-76f56ff6-212f-4940-b4fb-fd8379e83d55-GlobalStreamThread" java.lang.UnsatisfiedLinkError: Can't load library: /var/folders/zy/b19yps9j095601_vkghc25wh0000gn/T/librocksdbjni17187958455810980136.jnilib
-
해결됨실리콘밸리 엔지니어와 함께하는 Apache Airflow
섹션1 apache airflow 설치하기 질문
강의 3:17 에서"그대로 카피하셔서 설치하면" 이라고 하셨는데 이게 무슨뜻이죠? 구체적인 방법을 알려주시면 감사하겠습니다.카피해서 터미널에 붙여넣기를 하면 오류가 떠서요
-
미해결Airflow 마스터 클래스
WSL 설치
WSL 설치시 하위시스템이 이미 설치되어있습니다.라고 나오는데 Ubuntu 22.04.1 LTS는 없네요.이미지 다운 어떻게 받아서 설치하나요?
-
해결됨mongoDB 기초부터 실무까지(feat. Node.js)
updateMany에서 user._id를 못찾는 상황
강좌대로 Blog.updateMany({ "user._id": userId }, { "user.name": name })로 하니 블로그 데이터 유저 정보가 변경이 계속 안되고 있습니다 ㅠㅠ 유저 데이터는 정상적으로 변경이 되었는데요 문제점을 모르겠습니다.해당 코드 깃허브 : https://github.com/alinfanclub/KimDevlogServer/blob/updateMany/src/routes/userRouter.js userRouter.put("/:userId", async (req, res) => { try { const { userId } = req.params; if (!mongoose.isValidObjectId(userId)) return res.status(400).send({ err: "invalid userId" }); const { age, name } = req.body; if (!age && !name) return res.status(400).send({ err: "age or name is required" }); if (age && typeof age !== "number") return res.status(400).send({ err: "age must be a number" }); if (name && typeof name.first !== "string" && typeof name.last !== "string") return res.status(400).send({ err: "first and last name are strings" }); // let updateBody = {}; // if(age) updateBody.age = age; // if(name) updateBody.name = name; // const user = await User.findByIdAndUpdate(userId, updateBody, { new: true }); let user = await User.findById(userId); if (age) user.age = age; if (name) { user.name = name; await Blog.updateMany({ "user._id": userId }, { "user.name": name }); } await user.save(); return res.send({ user }); } catch (err) { console.log(err); return res.status(500).send({ err: err.message }); } });
-
미해결데이터베이스 중급(Modeling)
PK에 임의의 식별자(정수형 시퀀스값)부여에 관한 질문드립니다.
영상 마지막에 나온것처럼 PK에 해당하는 칼럼의 값을 프로그래머를 위해 넘겨주어야 한다고 말씀하신거처럼.클라이언트 화면에서는 임의의 식별자 데이터는 렌더링하지는 않지만 사용자(클라이언트 프로그램)가 어떤 데이터를 요청할 때 클라이언트는 해당 데이터(레코드)에 해당하는 PK의 값을 서버에 전달. 과 같은 방식일까요? 질문이 조금 매끄럽지가 않은것같아 좀 더 말해보면에로들어 도서 관리 DB의 도서(Book)테이블에 PK가 도서 번호(1,2,3,4..)이며 나머지 속성은 책 이름, 출판사 등의 속성을 가지고 있고, 책 테이블의 도서 번호를 참조한 자식 관계를 가진 대여 기록 테이블이 있을 때 사용자가 'RDBMS Modeling 기초'라는 책의 대여기록을 보고싶어서 해당 책이름을 클릭하면 내부 코드에서는 클라이언트 코드에서는 해당 책의 PK인 책번호를 서버에게 전달 후 서버는 해당 책번호를 통해 대여기록 테이블과 JOIN하여 클라이언트에 응답. 과 같은 방식이 일반적인지 궁금합니다
-
해결됨카프카 완벽 가이드 - 코어편
브로커가 추가될 때 파티션 재분배
안녕하세요 선생님! 완강 후에 정리하며 이것저것 테스트를 하는 와중에 궁금한게 생겨 질문드립니다. 이미 특정 토픽의 파티션이 브로커들에게 분배된 상태에서, 새로운 브로커가 추가됐을 때 새로운 브로커는 특정 토픽의 파티션을 가질 수 있는 대상으로 선정되지 않는 것 같습니다.새로운 브로커가 추가 됐을 때 새 브로커에도 기존의 토픽의 파티션 재분배를 하는 방법이 있나요?불가능 하다면, 이런 모델을 가지는 이유가 있을까요? 테스트 과정 공유드립니다.broker #1, #2 총 2개 띄운 상태에서 partition 3개, replication 2개의 토픽 생성 (topic-p3r2)Topic: topic-p3r2 Partition: 0 Leader: 2 Replicas: 2,1 Isr: 2,1 Offline: Topic: topic-p3r2 Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2 Offline: Topic: topic-p3r2 Partition: 2 Leader: 2 Replicas: 2,1 Isr: 2,1 Offline:broker #3 추가 후 topic-p3r2 토픽 상태Topic: topic-p3r2 Partition: 0 Leader: 2 Replicas: 2,1 Isr: 2,1 Offline: Topic: topic-p3r2 Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2 Offline: Topic: topic-p3r2 Partition: 2 Leader: 2 Replicas: 2,1 Isr: 2,1 Offline:브로커#3은 후보에도 오르지 않았습니다. 제가 예상했던 건 아래와 같이 브로커#3이 추가되었을 때 브로커#3도 토픽의 파티션을 갖는 것이었습니다.(아래 로그는 제가 상상한 것을 임의로 만든 것입니다)Topic: topic-p3r2 Partition: 0 Leader: 2 Replicas: 2,1 Isr: 2,1 Offline: Topic: topic-p3r2 Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2 Offline: Topic: topic-p3r2 Partition: 2 Leader: 3 Replicas: 3,1 Isr: 3,1 Offline: 감사합니다!!
-
미해결[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!
강사님 오류 관련하여 질문이 있습니다.
저는 지금 AIOkafka를 사용하고 있는데 commit()을 해주면 종종commit cannot be completed since the group has already rebalanced이 에러가 나오더라구요찾아보니 aio는 자동으로 리밸런싱 해서 그렇다는데 그렇다면 commit을 어떻게 써야 중복도 안되고 자동 리밸런싱으로 오류도 안생길까요?
-
미해결[2024 리뉴얼] 처음하는 SQL과 데이터베이스(MySQL) 부트캠프 [입문부터 활용까지]
pymysql import 시 찾을 수 없는 모듈이라 나옵니다
pip install 시 설치가 이미 되어있다고 나오는데 import 시에는 찾을 수 없다고 나옵니다...
-
미해결다양한 사례로 익히는 SQL 데이터 분석
사용자별 월별 세션 접속 횟수의 구간별 분포 집계 SQL - where절
선생님 안녕하세요?강의 잘 보고 있습니다.월 말일 기준으로 2일전에 생성한 user를 제외하고, session 수를 카운트 하기 위해서 where 절 안에 아래와 같이 수업시간에 말씀주셨었는데요. select a.user_id, date_trunc('month', visit_stime)::date as month, count(*) as monthly_user_cntfrom ga.ga_sess ajoin ga.ga_users b on a.user_id = b.user_idwhere b.create_time <= (date_trunc('month', b.create_time) +interval '1 month' - interval '1 day')::date -2group by a.user_id, date_trunc('month', visit_stime)::date 이 부분에 의하면, 말일 기준으로 2일 전부터 말일까지 create된 user의 경우, 모든 month에서 session 데이터가 필터링 되는데 의도하신 바가 맞으신지요? 어떤 user가 9월 29일에 create 하고, 9월 30일에 session 기록이 있으면, 이건 count되지 않고,10월 3일의 session 기록은 10월에 count하는 것이 의도하신 것이 아닌지요?그럴경우에는 아래와 같이 where 절을 수정해야 count가 될 것 같아요.where b.create_time <= (date_trunc('month', a.visit_stime)+interval '1 month' - interval '1 day')::date -2
-
미해결Airflow 마스터 클래스
"Python Operator에서 Xcom 사용" 강의 질문
안녕하세요.강의 잘 듣고 있습니다. "Python Operator에서 Xcom 사용" 강의에서 task flow가python_xcom_push_by_return = xcom_push_result() xcom_pull_2(python_xcom_push_by_return) python_xcom_push_by_return >> xcom_pull_1()이렇게 되고, 그래프가 python_xcom_push_by_return -> xcom_pull_2python_xcom_push_by_return -> xcom_pull_1 이렇게 그려지는데, 이 경우 처리도 병렬적으로 되나요?아니면 그래프만 저렇게 나오고 실제 동작은파이썬 동작하는 것처럼 윗줄이 먼저 실행되어서 실제로는python_xcom_push_by_return -> xcom_pull_2 -> xcom_pull_1이렇게 실행이 되나요??
-
미해결따라하며 배우는 도커와 CI환경 [2023.11 업데이트]
강의내용 따라가고있는데 에러가 발생되었습니다. 혹시 몰라 깃헙 코드도 가져와서 해봤는데 동일한 오류이고 js:818에 대한 레퍼런스도 부족해서 문의 남깁니다 ㅠㅠ
imsang-gyu@limsanggyu-MacBookPro nodejs-docker-app % docker run -p 5000:8080 limsanggyu/nodejsinternal/modules/cjs/loader.js:818 throw err; ^Error: Cannot find module '/nodemon' at Function.Module._resolveFilename (internal/modules/cjs/loader.js:815:15) at Function.Module._load (internal/modules/cjs/loader.js:667:27) at Function.executeUserEntryPoint [as runMain] (internal/modules/run_main.js:60:12) at internal/main/run_main_module.js:17:47 { code: 'MODULE_NOT_FOUND', requireStack: []}
-
해결됨15일간의 빅데이터 파일럿 프로젝트
휴 설치 에러 yum install scl-utils
yum install centos-release-scl 까지는 설치가 됐는데utils 부분에서 에러가 나네요 ㅠㅠ 어떻게 해결하나요? 그런데 python27은 정상적으로 설치가 됐어요.. 문제없는건가요? 추가로 휴 설치 이후 HBase 탭에서 DriverCarInfo에 들어왔는데 실시간 적재 데이터가 하나도 없는데 실시간 적재 데이터를 다시 수행해야 하는 건지 아니면 데이터가 날라간건지 궁금합니다 .. ㅠㅠ
-
해결됨카프카 완벽 가이드 - 코어편
토픽에 데이터가 없을 때 offset이 0이 되는 현상 문의
- 학습 관련 질문을 남겨주세요. 상세히 작성하면 더 좋아요! - 먼저 유사한 질문이 있었는지 검색해보세요. - 서로 예의를 지키며 존중하는 문화를 만들어가요. - 잠깐! 인프런 서비스 운영 관련 문의는 1:1 문의하기를 이용해주세요. 선생님 안녕하십니까!강의를 열심히 듣고 있는 수강생입니다. 다름이아니라, 토픽에 데이터가 없을 때 offset이 0이 된다는 로그는 어디서부터 기인하는 것인지 궁금하여 질문 드립니다. 제가 이해하기로 AUTO_COMMIT false가 되어있으면 명시적으로 commit을 호출하지 않으면 commit이 되지 않는 것으로 알고있습니다 (테스트도 해보았습니다.) 하지만, 강의 6:00경 시나리오.즉, 토픽에 아무런 데이터가 없는 상태에서 컨슈머만 켠 케이스에서 저는 커밋(commitSync)을 하지 않았는데 poll을 몇번 돌다보면 offset이 0이 되었다는 로그가 발생합니다.(실제로 __consumer_offsets-* 파일에는 offset이 기록되진 않고, 그래서 컨슈머를 껐다 켜면 offset 0부터 읽습니다) 저는 commit을 한적이 없으므로 실제로 __consumer_offsets에는 0으로 기록되지 않는데, 저 로그에 있는 offset이 0은 어디서부터 오는 것일까요? +) 코드상으로는 poll 시점에 updateAssignmentMetadataIfNeeded (maybeSeekUnvalidated) 에서 해당 로그가 찍히는데요, 이건 무슨 정책일까요? 관련 자료가 있으면 공유주셔도 감사합니다. // offset=0 설정이 되어버림 [main] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-group-pizza-assign-seek-maintenance-6-1, groupId=group-pizza-assign-seek-maintenance-6] Fetch position FetchPosition{offset=10, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}} is out of range for partition pizza-topic-0, resetting offset [main] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-group-pizza-assign-seek-maintenance-6-1, groupId=group-pizza-assign-seek-maintenance-6] Resetting offset for partition pizza-topic-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}. 테스트한 코드 일부 공유드립니다.(필요할 설정들을 했으며, 정말 커밋이 안되고 있는지 확인하기 위해 records.count()>100일 때만 명시적으로 커밋을 한 코드입니다)props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); TopicPartition topicPartition = new TopicPartition(topicName, 0); kafkaConsumer.assign(List.of(topicPartition)); kafkaConsumer.seek(topicPartition, 10L);while(true) { ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(1_000L)); for(ConsumerRecord<String, String> record: records) { log.info("record key: {}, partition: {}, recordOffset: {}", record.key(), record.partition(), record.offset()); } if (records.count() > 100 ){ kafkaConsumer.commitSync(); log.info("commit sync called"); } } 감사합니다 ^^