묻고 답해요
141만명의 커뮤니티!! 함께 토론해봐요.
인프런 TOP Writers
-
미해결Airflow 마스터 클래스
localhost:8080 에서 로그인이 안됩니다.
위 상황에서 인터넷창에 localhost:8080 후 진입하면 우선 선생님과 같은 로그인 화면이 아닌 다른 UI가 나옵니다. 그리고 초기 아이디/비번으로 설정된 airflow/airflow 로 시도해도 로그인이 안되고 있어서 해결방법을 알수있을까요?
-
해결됨Airflow 마스터 클래스
task 동시성 제한 및 중복 호출 방지
안녕하세요, 수업을 대부분 수강하고 실제 현업에서 사용중에 있는데 문의사항이 있어서 질문 드립니다. 현재 상황은 이렇습니다.DAG 구성- 5분단위 스케줄링 -4개의 task - task1 >> task2 >> task3 >> task4 - 각 task 별 timeout =5분 문제 상황은 task 2번이 한달에 한번씩 data 가 많아지면 5분까지 타임아웃이 걸릴때가 있는 것인데요,이때 그다음 Dag run 이 수행되면서 task 2 번이 동시에 수행 되는 시간이 조금 있는데 그때 데이터 처리가 중복으로 처리되는 현상이 발생하게 됩니다. 그래서 가능하면 task2 을 동시에 돌리는걸 막고 싶었는데요,처음 생각해낸 방법은 task_concurrency 옵션을 task 에 주어서 1개만 돌수 있게 바꾸고 timeout 을 조금더 넉넉하게 주려고 했으나, 만에하나 해당 task 가 10분이상 걸린다면 dag run 이 수행되고있는것 제외 2개가 더 웨이팅을 하는것이 되고, 이게 누적이 될수도 있을것으로 보여서 문제로 인지 했습니다.서비스 적으로 5분내에 돌수 있게 하거나, 아니면 5분 스케줄링을 변경하는 방법을 고려해야 하지만 해당 고려 없이 혹시 airflow 단에서 할수 있는 작업이 있을까요?ex . runninng 중인 task 와 대기중인 task 가 하나정도 있다면 해당 task 는 스킵하는 옵션 등입니다..
-
미해결실리콘밸리 엔지니어와 함께하는 Apache Airflow
airflow와 postgres간의 connection 오류
airflow와 postgres 간의 connection 오류 문제입니다.airflow UI -> admin-> connections에서 postgres 연결설정docker-compose.yaml 설정 dag 코드입력 airflow tasks test postgres_loader execute_sql_query 2023-01-01 시에 오류가 뜹니다ㅠ[2024-06-21T15:40:45.514+0900] {dagbag.py:545} INFO - Filling up the DagBag from /home/kim/airflow/dags [2024-06-21T15:40:45.805+0900] {taskinstance.py:2076} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: postgres_loader.execute_sql_query __airflow_temporary_run_2024-06-21T06:40:45.755970+00:00__ [None]> [2024-06-21T15:40:45.811+0900] {taskinstance.py:2076} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: postgres_loader.execute_sql_query __airflow_temporary_run_2024-06-21T06:40:45.755970+00:00__ [None]> [2024-06-21T15:40:45.812+0900] {taskinstance.py:2306} INFO - Starting attempt 1 of 1 [2024-06-21T15:40:45.812+0900] {taskinstance.py:2388} WARNING - cannot record queued_duration for task execute_sql_query because previous state change time has not been saved [2024-06-21T15:40:45.813+0900] {taskinstance.py:2330} INFO - Executing <Task(PostgresOperator): execute_sql_query> on 2023-01-01 00:00:00+00:00 [2024-06-21T15:40:45.855+0900] {taskinstance.py:2648} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='postgres_loader' AIRFLOW_CTX_TASK_ID='execute_sql_query' AIRFLOW_CTX_EXECUTION_DATE='2023-01-01T00:00:00+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='__airflow_temporary_run_2024-06-21T06:40:45.755970+00:00__' [2024-06-21T15:40:45.858+0900] {taskinstance.py:430} INFO - ::endgroup:: [2024-06-21T15:40:45.870+0900] {sql.py:276} INFO - Executing: INSERT INTO sample_table (key, value) VALUES ('hello', 'world') [2024-06-21T15:40:45.875+0900] {taskinstance.py:441} INFO - ::group::Post task execution logs [2024-06-21T15:40:45.875+0900] {taskinstance.py:2905} ERROR - Task failed with exception Traceback (most recent call last): File "/home/kim/.local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 465, in _execute_task result = _execute_callable(context=context, **execute_callable_kwargs) File "/home/kim/.local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 432, in _execute_callable return execute_callable(context=context, **execute_callable_kwargs) File "/home/kim/.local/lib/python3.10/site-packages/airflow/models/baseoperator.py", line 401, in wrapper return func(self, *args, **kwargs) File "/home/kim/.local/lib/python3.10/site-packages/airflow/providers/common/sql/operators/sql.py", line 277, in execute hook = self.get_db_hook() File "/home/kim/.local/lib/python3.10/site-packages/airflow/providers/common/sql/operators/sql.py", line 188, in get_db_hook return self._hook File "/usr/lib/python3.10/functools.py", line 981, in __get__ val = self.func(instance) File "/home/kim/.local/lib/python3.10/site-packages/airflow/providers/common/sql/operators/sql.py", line 150, in _hook conn = BaseHook.get_connection(conn_id) File "/home/kim/.local/lib/python3.10/site-packages/airflow/hooks/base.py", line 83, in get_connection conn = Connection.get_connection_from_secrets(conn_id) File "/home/kim/.local/lib/python3.10/site-packages/airflow/models/connection.py", line 519, in get_connection_from_secrets raise AirflowNotFoundException(f"The conn_id `{conn_id}` isn't defined") airflow.exceptions.AirflowNotFoundException: The conn_id `my_postgres_connection` isn't defined
-
미해결카프카 완벽 가이드 - 커넥트(Connect) 편
안녕하세요 질문이 있어 문의드립니다.
이미 Kafka Core편은 들었고 현재 Kafka Connect부분중 Sink Connector듣기 전입니다.이 강의를 듣게된 동기이기도 한데요.. DBToDB방식으로 해결하지 않고 Source Connector가 없이 Topic를 만들고 Topic에 Produce하는 방법이 가능한지요? Topic에 생성된 메시지는 Sink Connector 로 DB에 적재되는 구조입니다. Topic의 메시지 구조는 Source Connector에서 생성된 Message 체계로 Topic에 메시지를 적재하는 식으로 처리할려고 합니다.Kafka core에서 배웠던 Produce하는 방법을 이용하는경우입니다. 정리) Source Connector없이 Topic에 메시지를 보내고 이를 SinkConnect로 DB에 적재하는게 가능한지요?감사합니다.
-
미해결카프카 완벽 가이드 - ksqlDB
table의 데이터가 실시간으로 topic에 담기지 않습니다
mysql에 debezium source connector로 topic에 가져온 데이터를 받는 stream을 만들고그 stream을 기반으로 하여CREATE TABLE timeout WITH (KAFKA_TOPIC='timeout' , KEY_FORMAT='AVRO', VALUE_FORMAT='AVRO', PARTITIONS=1) AS > SELECT > order_id -> order_id AS order_id, > TIMESTAMPADD(MILLISECONDS, 9 * 3600 * 1000, PARSE_TIMESTAMP(LATEST_BY_OFFSET(order_datetime), 'yyyy-MM-dd''T''HH:mm:ssX')) AS last_log_time > FROM orders > GROUP BY order_id -> order_id > HAVING ((UNIX_TIMESTAMP(CONVERT_TZ(FROM_UNIXTIME(UNIX_TIMESTAMP()), 'UTC', 'Asia/Seoul')) - UNIX_TIMESTAMP(TIMESTAMPADD(MILLISECONDS, 9 * 3600 * 1000, PARSE_TIMESTAMP(LATEST_BY_OFFSET(order_datetime), 'yyyy-MM-dd''T''HH:mm:ssX')))) / 1000 > 600) > EMIT CHANGES;이런식으로 id별로 마지막 로그 시간이 오고 10분 이상이 지나면 table에 담기도록 만들었습니다처음에 이미 10분이 지난 데이터를 넣으면 table에도 들어가고 topic에도 잘 들어가는데현재시간의 데이터를 넣고 10분이 지나면 table에는 들어가는데 topic에는 들어가지 않습니다table에도 담기고 topic에도 담기려면 어떻게 해야하나요? 아니면 원래 불가능한건가요?기반한 stream은 데이터를 넣으면 곧 바로 stream과 토픽에 잘 들어갑니다.|ORDER_ID |CALCULATED_TIME |LAST_LOG_TIME | +------------------------------------------+------------------------------------------+------------------------------------------+ |1 |78088 |2024-06-16T12:30:00.000 | |2 |69988 |2024-06-16T14:45:00.000 | |3 |72088 |2024-06-16T14:10:00.000 | |4 |32739088 |2023-06-04T12:00:00.000 | |5 |32637088 |2023-06-05T16:20:00.000 | |6 |32567788 |2023-06-06T11:35:00.000 | |7 |69058 |2024-06-16T15:00:30.000 | |8 |68698 |2024-06-16T15:06:30.000 | |9 |66958 |2024-06-16T15:35:30.000 | |10 |65698 |2024-06-16T15:56:30.000 | |11 |66298 |2024-06-16T15:46:30.000 | |12 |4258 |2024-06-17T09:00:30.000 | |13 |3418 |2024-06-17T09:14:30.000 | |14 |1918 |2024-06-17T09:39:30.000 | |15 |2429 |2024-06-17T09:30:59.000 | Query terminated ksql> print result7777; Key format: AVRO or KAFKA_STRING Value format: AVRO rowtime: 2024/06/16 04:23:23.878 Z, key: 1, value: {"CALCULATED_TIME": 12183, "LAST_LOG_TIME": 1718541000000}, partition: 0 rowtime: 2024/06/16 04:23:23.879 Z, key: 2, value: {"CALCULATED_TIME": 4083, "LAST_LOG_TIME": 1718549100000}, partition: 0 rowtime: 2024/06/16 05:10:08.498 Z, key: 3, value: {"CALCULATED_TIME": 6183, "LAST_LOG_TIME": 1718547000000}, partition: 0 rowtime: 2024/06/16 06:06:52.365 Z, key: 4, value: {"CALCULATED_TIME": 32673183, "LAST_LOG_TIME": 1685880000000}, partition: 0 rowtime: 2024/06/16 06:06:52.373 Z, key: 5, value: {"CALCULATED_TIME": 32571183, "LAST_LOG_TIME": 1685982000000}, partition: 0 rowtime: 2024/06/16 06:06:52.377 Z, key: 6, value: {"CALCULATED_TIME": 32501883, "LAST_LOG_TIME": 1686051300000}, partition: 0 rowtime: 2024/06/16 06:09:36.530 Z, key: 7, value: {"CALCULATED_TIME": 3153, "LAST_LOG_TIME": 1718550030000}, partition: 0 rowtime: 2024/06/16 06:15:08.351 Z, key: 8, value: {"CALCULATED_TIME": 2793, "LAST_LOG_TIME": 1718550390000}, partition: 0 rowtime: 2024/06/16 06:41:28.920 Z, key: 9, value: {"CALCULATED_TIME": 1053, "LAST_LOG_TIME": 1718552130000}, partition: 0 rowtime: 2024/06/17 00:23:09.442 Z, key: 12, value: {"CALCULATED_TIME": 1372, "LAST_LOG_TIME": 1718614830000}, partition: 01-9, 12 이미 10분이 지난 데이터 // 그 외 = 데이터가 mysql에 담기고 10분이 지나 table에 담긴 데이터
-
미해결[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!
auto_commit_interval_ms_config 질문
- 카프카를 공부하시면서 생긴 질문들을 남겨주세요. 상세히 작성하면 더 좋아요! - 먼저 유사한 질문이 있었는지 검색해보세요. - 서로 예의를 지키며 존중하는 문화를 만들어가요. - 잠깐! 인프런 서비스 운영 관련 문의는 1:1 문의하기를 이용해주세요. 안녕하세요~auto_commit_interval_ms_config 에 대해 궁금한 점이 있는데요 찾아봐도 모호해서 질문드립니다.auto_commit_interval_ms_config = 60이라고 가정했을때위 옵션은 poll() 호출 여부와 관계없이 60초 마다 자동으로 커밋을 해주는건가요?아니면 마지막 자동커밋 발생하고 60초 이후에 poll() 이 호출될때 커밋을 해준다는건가요?
-
미해결Database - SQL
MySQL 실습
CUSTOMER ID FORMATTING- 1: 00001- 2: 00002- 13: 00013SELECT CUSTOMERID, CONCAT(REPEAT('0', 5-LENGTH(CUSTOMERID)), CUSTOMERID)FROM CUSTOMER;MySQL은 문자열 + 하기 연산이 없어서CONCAT 함수를 사용해야 합니다
-
미해결카프카 완벽 가이드 - 코어편
강의 질문
제가 EXCEL파일에 있는 데이터를 카프카를 통해 db에 저장을 해야하는데 이 강의를 들으면 할 수 있을까요?
-
해결됨카프카 완벽 가이드 - ksqlDB
debezium에서 ksqldb로
제가 구상하고있는 구조가 mysql에서 debezium source connector가 topic에 넘기고 ksqldb의 streams나 table로 재구성하여 다른 topic으로 넘긴 후mysql sink database에서 받는다. 라는걸 구상중인데요 ksqldb에서 직접 insert를 하면 json 형식이 아니라서 sink connector가 읽지 못하는거 같습니다.-- debezium.json --{ "name": "debezium", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "localhost", "database.port": "3306", "database.user": "root", "database.password": "1234", "database.allowPublicKeyRetrieval": "true", "database.server.id": "10777", "database.server.name": "debe01", "database.include.list": "debe", "table.include.list": "debe.user", "database.history.kafka.bootstrap.servers": "localhost:9092", "database.history.kafka.topic": "schema-changesde.mysql.oc", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable": "false", "value.converter.schemas.enable": "false", "database.connectionTimeZone": "Asia/Seoul", "time.precision.mode": "connect", "transforms": "unwrap", "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", "transforms.unwrap.drop.tombstones": "false" } } 그래서 debezium으로 mysql의 data를 읽어서 topic으로 가져왔는데ksqldb에서 그 data를 읽는 부분에서 막혔습니다강의에 나온거처럼 ksqldb와 debezium을 연동을 해야 가능한건가요?아니면 어떤 방법이 있을까요?
-
미해결따라하며 배우는 도커와 CI환경 [2023.11 업데이트]
Docker Volume 오류(reference, lowercase)
#ERROR1repository 이름이 소문자여야 한다는 에러입니다. 제가 폴더 명을 "Docker"로 해놨었더니 이런 오류가 뜨더라고요. 폴더명을 "docker"로 바꿨더니 해결되었습니다.kim-yaegun@gim-yegeons-MacBook-Air Docker % docker run -p 5001:8080 -v /usr/src/app/node_modules -v $(pwd):/usr/src/app yaegun/nodedocker: invalid reference format: repository name (Docker) must be lowercase.See 'docker run --help'.#ERROR2레퍼런스가 유효하지 않다는 에러입니다.kim-yaegun@gim-yegeons-MacBook-Air docker % docker run -p 5001:8000 -v /usr/src/app/node_modules -v $(pwd):/usr/src/app yaegunkim/nodedocker: invalid reference format.See 'docker run --help'.이건 $(pwd)를"$(pwd)"로 바꾸어 주니 해결되었습니다.kim-yaegun@gim-yegeons-MacBook-Air docker % docker run -p 5001:8000 -v /usr/src/app/node_modules -v "$(pwd)":/usr/src/app yaegunkim/nodeSuccessful
-
미해결따라하며 배우는 도커와 CI환경 [2023.11 업데이트]
[섹션3 - 내가 만든 이미지 기억하기 쉬운 이름 주기] 네이밍/태그 에러
"docker build -t YaegunKim/hello:latest ./"로 빌드를 잘 한 것 같은데 아래와 같은 에러가 나오더라고요.#COMMANDdocker run -it YaegunKim/hello#ERRORUnable to find image 'YaegunKim/hello:latest' locallydocker: Error response from daemon: Get "https://YaegunKim/v2/": dialing YaegunKim:443 container via direct connection because has no HTTPS proxy: resolving host YaegunKim: lookup YaegunKim: no such host.See 'docker run --help'.근데 또 아이디로 run을 하면 잘 되고...#PROBLEM-SOLVING이유는 버전은 "latest"가 아닌 "lastest"로 해서 에러가 났던 것이었습니다. 다시 빌드 하고 실행해보니 hello가 잘 출력되네요 ㅎㅎ
-
미해결Airflow 마스터 클래스
도커를 사용하지 않는 방법
안녕하세요 HPC를 사용하고 있는데 도커가 사용 불가능한 HPC라 우선은 구글링하여 airflow를 설치하고 강의를 듣고 있습니다. 아직 1강인데, 혹시 차후에 도커가 없어서 강의를 못따라가는 상황이 생길까요? 수강신청전에 미리 확인해봤어야 했는데 죄송합니다 ㅜㅜ!
-
미해결실리콘밸리 엔지니어와 함께하는 Apache Airflow
from airflow.sensors.sql import SqlSensor에 대해 질문 있습니다.
선생님이 4:21초에 from airflow.sensors.sql import SqlSensor는 provider에 있는게 아니라 core에 있는 sensor라고 알려주셨는데 airflow 버전 2.9.1에서는 SqlSensor가 apache-airflow-providers-common-sql 패키지에 포함되어 있다고 하는데 그러면 버전 2.9.1에서는 airflow core에 있는 sensor를 사용하지 못하는 건가요??
-
미해결15일간의 빅데이터 파일럿 프로젝트
tail -f flume-cmf-flume-AGENT-server02.hadoop.com.log 오류
tail -f flume-cmf-flume-AGENT-server02.hadoop.com.log 했을때 Creating이나 강의에 말씀한 내용 나오지않고, 아래처럼 나오기만 하는데 Flume Config파일도 정상적이고 재시동도 해봤는데 안되는데 또 조치해야할게 있을까요?
-
해결됨카프카 완벽 가이드 - ksqlDB
CLI로 실행과 코드로 실행하면 결과가 다르게 나옵니다
CREATE STREAM add_stream WITH (KAFKA_TOPIC='column_stream_topic', VALUE_FORMAT='JSON') AS SELECT *, CAST(NULL AS INT) AS new_column FROM test_stream EMIT CHANGES;이렇게 기존 test_stream에서 column을 추가한 add_stream을 만들려고 CLI문을 실행시키면원래 test_stream에 담겨있는 data가 담아져서 나오는데package com.example.service; import io.confluent.ksql.api.client.Client; import io.confluent.ksql.api.client.ClientOptions; import io.confluent.ksql.api.client.ExecuteStatementResult; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; import java.util.concurrent.ExecutionException; @Service public class streamPracticeAdd { @Value("${ksqldb.server.host}") private String ksqlDbHost; private int ksqlDbPort; private Client client; @PostConstruct public void init() { ClientOptions options = ClientOptions.create() .setHost(ksqlDbHost) .setPort(ksqlDbPort); client = Client.create(options); } public void streamsAdd(String columnName, String dataType) { String createStreamKsql = "CREATE STREAM add_stream WITH (KAFKA_TOPIC='column_stream_topic', VALUE_FORMAT='JSON') AS SELECT *, CAST(NULL AS " + dataType + ") AS " + columnName + " FROM test_stream EMIT CHANGES;"; try { ExecuteStatementResult result = client.executeStatement(createStreamKsql).get(); System.out.println("Stream created and data inserted into new topic: " + result.queryId()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } }/kafka/addColumn/new_column/INT 인 API 요청을 줘서 새 stream을 만드는 코드인데실행시키면 기존 column에 새 column까지 추가는 되는데 기존 data가 하나도 들어오지 않습니다.검색을 해봤는데도 잘 안나와서 질문 남깁니다 감사합니다.
-
미해결데이터베이스 중급(Modeling)
선생님 도메인이란게 뭔가요???
데이터베이스에서 말하는 도메인이란 어떤것을 뜻하나요??구글링을 해봐도 이해를 잘 못하겠어요 ㅠㅠ
-
미해결RDBMS Modeling 실습
Sales 테이블에 복합키로 기본키를 구성하는 이유가 무엇인가요?
복합키로 기본키를 구성하는 이유가 궁금합니다.seq 하나만 기본키로 잡았을때 장점이 더 많지 않나요?
-
미해결카프카 완벽 가이드 - 코어편
컨슈머 리벨런싱 도중 에러시 session.timeout.ms 관련 질문
컨슈머 리벨런싱 도중에 cpu 과부하 같은 오류가 발생해서 리벨런싱이 완료되지 않은 경우heartbeat.interval.ms 설정은 컨슈머가 안붙었기 때문에 보내지 못하고 session.timeout.ms 설정 값을 통해서 그룹 코디네이터가 컨슈머가 죽었다고 판단해서 리벨런싱이 일어나야 할 것 같은데 --describe 컨슈머 그룹 상태를 확인했을 때 계속 Consumer group '그룹이름' has no active members. 같이 컨슈머가 안붙는다면 서버 재시작을 통해서 강제 리벨런싱을 일어나게 하면 다시 컨슈머가 붙어서 해결이 가능하지만 이 방법을 떠나서 session.timeout.ms 같은 값을 통해서 자동 리벨런싱이 일어나지 않을 수도 있는건지 궁금합니다!
-
미해결카프카 완벽 가이드 - 커넥트(Connect) 편
재설치 후 커넥트 실행 오류
안녕하세요. 저번에 카프카 재기동 오류에 관해 질문을 올렸던 수강생입니다. 카프카 실행에 관한 오류는 해결하였습니다만, 이번에는 connector가 실행되지 않는 오류가 발생하여 강사님 말씀대로 카프카 삭제 후 강의와 동일한 버전으로 재설치해보았습니다. 하지만 커넥터는 계속 똑같이 실행되지 않고 있습니다.. 아래는 connect_start_log.sh 입력 후 출력되는 로그 내용입니다.... 이 뒤로는 Retrying to fetch metadata만 반복적으로 출력됩니다. 끝없는 오류의 굴레에서 벗어나고 싶습니다.. 제발 도와주세요
-
미해결[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!
카프카 스트림즈 관련해 질문있습니다.
안녕하세요 강의 막마지 수강중인대요topic에서 데이터를 가져와 가공을 한 뒤 다른 topic에(싱크프로세서) 데이터를 저장할 때 스트림즈를 사용하면 좋다고 강의를 통해 알게는 되었는대요 궁금한점은 topic 데이터를 가져와 처리후 다른 토픽이 아닌 db같은 곳에 데이터를 저장할 수도 있잖아요?그런경우는 스트림즈를 사용하지 않나요?위와 같은 흐름이 필요한대 그럴경우 어떻게 구성하는게 좋은가요? 스트림즈로 데이터를 처리해 다른 토픽에 넣고커넥트를 사용해 그 토픽에서 db로 저장하도록 구현하는게 좋을지 스트림을 통해 db까지 저장할 수 있는건지 궁금합니다 추가로, 데이터 가공시에 스트림즈에서 외부 db에서 데이터를 조회해서 가공하도록 해도 괜찮은가요?