묻고 답해요
141만명의 커뮤니티!! 함께 토론해봐요.
인프런 TOP Writers
-
미해결Airflow 마스터 클래스
SSHOperator, BashOperator차이
강의에서는 BashOperator을 이용해서 DAG를 만들었는데,제 실무에서는 SSHOperator을 사용하더라구요.어떤 차이가 있는지 궁금합니다.
-
미해결Airflow 마스터 클래스
EmailOperator OSError
안녕하세요. EmailOperator 실습과정에서 오류가 발생해서 질문을 남깁니다!설정을 한 후 실행을 하니. OSError: [Errno 99] Cannot assign requested address 가 발생해서 도움이 필요합니다..!docker-compose.yaml 파일에 이렇게 설정을 하였고, 에러는 다음과 같습니다!
-
미해결Airflow 마스터 클래스
실무에서는 깃헙말고 뭘통해 업로드하나요?
실무는 어떻게하는지 궁금해요
-
미해결Airflow 마스터 클래스
8장 postgres 에 insert 하는 Dag 수행오류
안녕하세요. 8장에서 postgres 에 insert하는 dag를 실행하면 아래와 같은 오류로 대기하다가 실패로 끝납니다.구글에 찾아보면 'secret_key'를 모두 동일하게 적용해주라고 하는데, 어떻게 적용해야 되는지 찾지는 못해서 이렇게 문의하게 되었습니다. 아래 소스도 git에 올라와 있는 소스 그대로 올려서 수행했기 때문에 따로 올리지는 않았습니다.docker-compose.yamldags_python_with_postgres 위에 상태에서 최종 오류로 떨어지면 오류 메세지를 확인할 수 있었습니다 .ㅜ사실 원인은 위에 메세지가 아닌것 같아요.ERROR - Failed to execute job 1119 for task insrt_postgres (connection to server at "172" (0.0.0.172), port 5432 failed: Connection timed out Is the server running on that host and accepting TCP/IP connections? connection to server at "28.0.3" (28.0.0.3), port 5432 failed: Connection timed out Is the server running on that host and accepting TCP/IP connections?그리고, 저의 docker-compose.yaml 에서 postgres_custom, postgres의 설정부분입니다.그리고, airflow의 connection 연결할 때도 포트는 5432로 했습니다. 사실 질문답변을 검색해보면 저와 비슷한 오류가 1개 있어서 확인을 해봤는데요그 오류는 5429에서 5432로 변경해서 해결됐다고 나온 것 같고 저는 5432인데 해결이 안돼서제가 실수한게 있는지 확인부탁드립니다. ㅜ
-
미해결Airflow 마스터 클래스
EmailOperator 수업 중 RemovedInAirflow3Warning 오류
아래와 같이 dag 작성하고 smtp 설정을 해주었는데 오류가 발생했습니다.RemovedInAirflow3Warning: Fetching SMTP credentials from configuration variables will be deprecated in a future release. Please set credentials using a connection instead. send_mime_email(e_from=mail_from, e_to=recipients, mime_msg=msg, conn_id=conn_id, dryrun=dryrun) [2024-07-22, 06:53:07 UTC] {configuration.py:1053} WARNING - section/key [smtp/smtp_user] not found in config [2024-07-22, 06:53:07 UTC] {email.py:271} INFO - Email alerting: attempt 1오류내용은 위와같습니다!
-
미해결Airflow 마스터 클래스
Bash operator 만들기, log 확인하는법
섹션2 Bash operator 만들기 수업에서 bash_t1, bash_t2 task 모두 실행은 됐는데log탭이 보이질 않습니다.현재 Airflow (2.9.3 ver) 화면입니다. Auditlog 에서도 볼 수 없는데 어디서 볼수있나요?
-
미해결Airflow 마스터 클래스
.env 파일 변수를 PythonOperator에 적용할 경우 질문드립니다!
안녕하세요! 강사님서울시 공공데이터 API를 이용해 데이터를 추출하는 것을 보고저도 넥슨 오픈 API를 가지고 데이터를 출력해보려고 DAG과 PythonOperator를 만들어봤는데요, 보통 민감정보는 .env파일에다 적어놓고 가져와서 사용하기에 이렇게 짰는데,.env파일은 gitignore에 들어가니 dag실행시 읽어올 수가 없는데이런 경우는 어떻게 할까요...? 강의에서 보여주신 것처럼 서울시 공공 api에서 simplehttpoperator를 사용하고, airflow웹에서 variable 변수를 사용했듯이 이것도 이렇게 해야할까요..?!
-
해결됨Airflow 마스터 클래스
permission denied 오류 관련 문의드립니다!
안녕하세요! 강사님 질문이 하나 있는데요,제가 중간에 docker를 삭제했다가 다시 설치해서 하고 있는데다른 것들은 잘 되는데, 서울시 공공 데이터 API를 가지고 Custome Operator를 하는 과정에서 dags_seoul_api_corona.py 이 실행이 안되어서 문의드려요!처음에 directory가 없다는 오류가 떠서 생성하는 코드를 추가해봤는데, 추가한 이후에 dag 실행해도 실패해서 로그를 확인해보니 permission 거부 문제로 오류가 발생한 것을 확인하였습니다.docker-compose.yaml에서 volumes는 잘 설정되었는데, 혹시 제가 AIRFLOW_CORE_EXECUTOR를 LocalExecutor로 바꾼 게 문제가 되었을까요?databricks와 airflow를 연동하면서 dns를 설정하고, command를 좀 수정하였는데 이 부분이 문제였을까요...? ChatGPT에 물어보니 Webserver로 들어가서 Docker 컨테이너 내에서 이 디렉토리의 소유자를 airflow 사용자로 변경하면 된다고 하는데, [sudo] password for default: 부분에서 비밀번호 에러가 나네요...!저는 따로 default에 대한 비밀번호를 설정하지 않아서 해당 비밀번호를 잘 모르는 상태입니다..! docker를 재설치해야할 것 같은데... 하기 전에 강사님께 문의 드립니다!편하게 답변 주시면 감사하겠습니다!!
-
미해결Airflow 마스터 클래스
섹션3-2 외부 파이썬 함수 수행하기 에러코드 질문드립니다.
아래와 같은 에러가 발생했는데, common 파일을 못찾는 것 같습니다. 혹시 해결방법이 있을까요?
-
미해결Airflow 마스터 클래스
localhost:8080 에서 로그인이 안됩니다.
위 상황에서 인터넷창에 localhost:8080 후 진입하면 우선 선생님과 같은 로그인 화면이 아닌 다른 UI가 나옵니다. 그리고 초기 아이디/비번으로 설정된 airflow/airflow 로 시도해도 로그인이 안되고 있어서 해결방법을 알수있을까요?
-
해결됨Airflow 마스터 클래스
task 동시성 제한 및 중복 호출 방지
안녕하세요, 수업을 대부분 수강하고 실제 현업에서 사용중에 있는데 문의사항이 있어서 질문 드립니다. 현재 상황은 이렇습니다.DAG 구성- 5분단위 스케줄링 -4개의 task - task1 >> task2 >> task3 >> task4 - 각 task 별 timeout =5분 문제 상황은 task 2번이 한달에 한번씩 data 가 많아지면 5분까지 타임아웃이 걸릴때가 있는 것인데요,이때 그다음 Dag run 이 수행되면서 task 2 번이 동시에 수행 되는 시간이 조금 있는데 그때 데이터 처리가 중복으로 처리되는 현상이 발생하게 됩니다. 그래서 가능하면 task2 을 동시에 돌리는걸 막고 싶었는데요,처음 생각해낸 방법은 task_concurrency 옵션을 task 에 주어서 1개만 돌수 있게 바꾸고 timeout 을 조금더 넉넉하게 주려고 했으나, 만에하나 해당 task 가 10분이상 걸린다면 dag run 이 수행되고있는것 제외 2개가 더 웨이팅을 하는것이 되고, 이게 누적이 될수도 있을것으로 보여서 문제로 인지 했습니다.서비스 적으로 5분내에 돌수 있게 하거나, 아니면 5분 스케줄링을 변경하는 방법을 고려해야 하지만 해당 고려 없이 혹시 airflow 단에서 할수 있는 작업이 있을까요?ex . runninng 중인 task 와 대기중인 task 가 하나정도 있다면 해당 task 는 스킵하는 옵션 등입니다..
-
미해결실리콘밸리 엔지니어와 함께하는 Apache Airflow
airflow와 postgres간의 connection 오류
airflow와 postgres 간의 connection 오류 문제입니다.airflow UI -> admin-> connections에서 postgres 연결설정docker-compose.yaml 설정 dag 코드입력 airflow tasks test postgres_loader execute_sql_query 2023-01-01 시에 오류가 뜹니다ㅠ[2024-06-21T15:40:45.514+0900] {dagbag.py:545} INFO - Filling up the DagBag from /home/kim/airflow/dags [2024-06-21T15:40:45.805+0900] {taskinstance.py:2076} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: postgres_loader.execute_sql_query __airflow_temporary_run_2024-06-21T06:40:45.755970+00:00__ [None]> [2024-06-21T15:40:45.811+0900] {taskinstance.py:2076} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: postgres_loader.execute_sql_query __airflow_temporary_run_2024-06-21T06:40:45.755970+00:00__ [None]> [2024-06-21T15:40:45.812+0900] {taskinstance.py:2306} INFO - Starting attempt 1 of 1 [2024-06-21T15:40:45.812+0900] {taskinstance.py:2388} WARNING - cannot record queued_duration for task execute_sql_query because previous state change time has not been saved [2024-06-21T15:40:45.813+0900] {taskinstance.py:2330} INFO - Executing <Task(PostgresOperator): execute_sql_query> on 2023-01-01 00:00:00+00:00 [2024-06-21T15:40:45.855+0900] {taskinstance.py:2648} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='postgres_loader' AIRFLOW_CTX_TASK_ID='execute_sql_query' AIRFLOW_CTX_EXECUTION_DATE='2023-01-01T00:00:00+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='__airflow_temporary_run_2024-06-21T06:40:45.755970+00:00__' [2024-06-21T15:40:45.858+0900] {taskinstance.py:430} INFO - ::endgroup:: [2024-06-21T15:40:45.870+0900] {sql.py:276} INFO - Executing: INSERT INTO sample_table (key, value) VALUES ('hello', 'world') [2024-06-21T15:40:45.875+0900] {taskinstance.py:441} INFO - ::group::Post task execution logs [2024-06-21T15:40:45.875+0900] {taskinstance.py:2905} ERROR - Task failed with exception Traceback (most recent call last): File "/home/kim/.local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 465, in _execute_task result = _execute_callable(context=context, **execute_callable_kwargs) File "/home/kim/.local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 432, in _execute_callable return execute_callable(context=context, **execute_callable_kwargs) File "/home/kim/.local/lib/python3.10/site-packages/airflow/models/baseoperator.py", line 401, in wrapper return func(self, *args, **kwargs) File "/home/kim/.local/lib/python3.10/site-packages/airflow/providers/common/sql/operators/sql.py", line 277, in execute hook = self.get_db_hook() File "/home/kim/.local/lib/python3.10/site-packages/airflow/providers/common/sql/operators/sql.py", line 188, in get_db_hook return self._hook File "/usr/lib/python3.10/functools.py", line 981, in __get__ val = self.func(instance) File "/home/kim/.local/lib/python3.10/site-packages/airflow/providers/common/sql/operators/sql.py", line 150, in _hook conn = BaseHook.get_connection(conn_id) File "/home/kim/.local/lib/python3.10/site-packages/airflow/hooks/base.py", line 83, in get_connection conn = Connection.get_connection_from_secrets(conn_id) File "/home/kim/.local/lib/python3.10/site-packages/airflow/models/connection.py", line 519, in get_connection_from_secrets raise AirflowNotFoundException(f"The conn_id `{conn_id}` isn't defined") airflow.exceptions.AirflowNotFoundException: The conn_id `my_postgres_connection` isn't defined
-
미해결Airflow 마스터 클래스
도커를 사용하지 않는 방법
안녕하세요 HPC를 사용하고 있는데 도커가 사용 불가능한 HPC라 우선은 구글링하여 airflow를 설치하고 강의를 듣고 있습니다. 아직 1강인데, 혹시 차후에 도커가 없어서 강의를 못따라가는 상황이 생길까요? 수강신청전에 미리 확인해봤어야 했는데 죄송합니다 ㅜㅜ!
-
미해결실리콘밸리 엔지니어와 함께하는 Apache Airflow
from airflow.sensors.sql import SqlSensor에 대해 질문 있습니다.
선생님이 4:21초에 from airflow.sensors.sql import SqlSensor는 provider에 있는게 아니라 core에 있는 sensor라고 알려주셨는데 airflow 버전 2.9.1에서는 SqlSensor가 apache-airflow-providers-common-sql 패키지에 포함되어 있다고 하는데 그러면 버전 2.9.1에서는 airflow core에 있는 sensor를 사용하지 못하는 건가요??
-
미해결Airflow 마스터 클래스
dag 파일 오류수정 적용방법
안녕하세요 실습중에 오류 발생해서 질문드립니다. dag파일 잘못 작성하여 dag import error가 발생한 경우,dag파일을 수정하고 적용시키기 위해서는 매번 docker compose down 후, compose up으로 재기동 해줘야하나요? 간단한 오탈자 수정하는데도 재기동이 필요한지 궁금합니다.
-
미해결Airflow 마스터 클래스
powershell 이 없어 질문드립니다.
안녕하세요, wsl을 이용한 linux설치 강의를 따라하고 있었는데 시작버튼을 눌러 powershell 을 검색하면windows powershell ISE 만 나오고windows powershell 은 나오지 않습니다.별도의 windows powershell 설치가 필요할까요?구글링을 해봐도 아직 방법을 못찾았기에 질문드립니다.어떻게 진행하면 될까요 ?
-
미해결Airflow 마스터 클래스
데이터 엔지니어 업무 초보자가 궁금한 점 질문드립니다..
선생님 안녕하세요! 덕분에 Airflow에 대해 깊이 있게 공부하고 있습니다! 감사합니다. 다름이 아니라 Airflow를 잘 쓰고자 하는 마음에 질문드립니다!제가 지금 구축해야하는 환경이 Google Cloud 기반에서 DataLake와 Warehouse를 구축 해야 하는 상황에서 Airflow 강의를 참고해 도입 예정에 있습니다. 사 내 인프라 팀은 잘 갖춰져 있으나 데이터팀은 아직 미약한 상태에요.. 구글링 해서 살펴보았을 때 Airflow의 전처리의 대부분이 BigQuery의 SQL을 통해원하는 데이터를 가져와 전처리하는 로직으로 구성되어 있는거 같더라구요. 저는 Pandas라는 라이브러리가 익숙한 것도 있고 SQL 쿼리로 관리하기보다Pandas 코드로 관리하고자하는 마음에 Airflow와 Pandas의 조합은 어떻게 쓰면 좋다라거나 참고 블로그에 대해 알고싶고 또 선생님 조언을 들어보고 싶습니다 ㅠㅠ 아직 Airflow를 완전히 이해하지 못했지만 걱정되는 점은Pandas 사용 시 데이터를 읽었을 때 인메모리에 많은 양의 데이터가 올라가주의하지 않으면 구축하려는 Cloud Composer의 스펙이 오버될거 같은 느낌이 들어서요.. 또 다른 궁금한 점은 전처리 구간이 많을 수록 BigQuery에 저장하면서 불러들이는 식으로 작업하시는 지도 궁금합니다!! 장애 발생 시 어떤 구간에서 발생했으며 Retry 시 저장하면서 가야 정확한 에러 구간에 대해 모니터링이 가능해보여서요.. 마지막으로.. dags를 관리하는 아키텍쳐? 방안에 대해서 유행하거나 픽스된 방법론이 있는 지도 궁금해요백엔드의 디자인패턴과 유사한.. 질문이 많죠.. 백엔드하다 데이터 엔지니어 업무가 처음이다 보니 궁금한게 많네요.. 다시 정리를 하면 질문은 아래와 같습니다. 긴 글 읽어주셔서 감사합니다 ( _ _ )Airflow와 Pandas 조합을 사용하고자 할 때 선생님의 조언이 궁금합니다.전처리 구간 마다 생기는 View Table이 데이터 양이 많을 때 저장하는 지 궁금합니다.git에서 dags를 관리하는 방법론이 궁금합니다.
-
미해결Airflow 마스터 클래스
강의 내용 정리
안녕하세요! 에어플로우를 사용하고 싶어서 강의를 수강하게 되었는데 수업을 너무 잘 해주셔서 많은 도움을 받고 있습니다.다름이 아니라 제가 개인적인 공부를 목적으로 개인 블로그를 하고 있는데, 에어플로우에 관해 강의 자료를 토대로 정리해서 게시물을 작성해도 될지.. 여쭤보고 싶습니다..! 강의 자료를 그대로 사용하지는 않고 내용을 바탕으로 정리해서 작성할 계획입니다..!
-
해결됨Airflow 마스터 클래스
파이참에서 외부 파이썬 함수 수행하기
안녕하세요. 외부 파이썬 함수 수행하기가 안되어서 문의드리게 되었습니다. 저는 Pycharm이 익숙해서 Pycharm으로 하고 있었는데,Pycharm의 경우 .env파일이 인식이 안되는 걸까요..?common 모듈을 발견하지 못하네요....env파일을 아래와 같이 설정하였고dags_python_import_func.py에서도Enable EnvFile에 체크표시를 하였는데여전히 해당 모듈을 읽지 못하네요...ㅠㅠ혹시 Pycharm의 경우 .env파일을 다르게 설정해야하는 걸까요...?
-
미해결Airflow 마스터 클래스
xcom_pull 메서드 사용 질문
안녕하세요강의에서 xcom_pull 메서드 사용시 task_ids를 지정하지 않으면 가장 마지막 태스크의 키값을 가져온다고 하셨는데요, 실습코드를 돌려보니 먼저 실행되었던 태스크의 밸류값을 리턴합니다. from airflow import DAG import pendulum from airflow.decorators import task with DAG( dag_id="dags_python_with_xcom_eg1", schedule="30 6 * * *", start_date=pendulum.datetime(2024, 3, 1, tz="Asia/Seoul"), catchup=False, ) as dag: @task(task_id="python_xcom_push_task1") def xcom_push1(**kwargs): ti = kwargs["ti"] ti.xcom_push(key="result1", value="value_1") ti.xcom_push(key="result2", value=[1, 2, 3, 4]) @task(task_id="python_xcom_push_task2") def xcom_push2(**kwargs): ti = kwargs["ti"] ti.xcom_push(key="reuslt1", value="value_2") ti.xcom_push(key="reuslt2", value=[1, 2, 3]) @task(task_id="python_xcom_pull_task") def xcom_pull(**kwargs): ti = kwargs["ti"] value1 = ti.xcom_pull(key="result2") # [1, 2, 3] value2 = ti.xcom_pull( key="result1", task_ids="python_xcom_push_task1" ) # value_1 print(value1) print(value2) xcom_push1() >> xcom_push2() >> xcom_pull() dag는 위와 같이 작성했고요, [1, 2, 3]이 반환될 것으로 기대했던 부분에서 [1, 2, 3, 4]가 반환되어서 혹시 제가 코드를 잘못 작성한건지 궁금해서 여쭤봅니다