묻고 답해요
141만명의 커뮤니티!! 함께 토론해봐요.
인프런 TOP Writers
-
미해결Airflow 마스터 클래스
"Python Operator에서 Xcom 사용" 강의 질문
안녕하세요.강의 잘 듣고 있습니다. "Python Operator에서 Xcom 사용" 강의에서 task flow가python_xcom_push_by_return = xcom_push_result() xcom_pull_2(python_xcom_push_by_return) python_xcom_push_by_return >> xcom_pull_1()이렇게 되고, 그래프가 python_xcom_push_by_return -> xcom_pull_2python_xcom_push_by_return -> xcom_pull_1 이렇게 그려지는데, 이 경우 처리도 병렬적으로 되나요?아니면 그래프만 저렇게 나오고 실제 동작은파이썬 동작하는 것처럼 윗줄이 먼저 실행되어서 실제로는python_xcom_push_by_return -> xcom_pull_2 -> xcom_pull_1이렇게 실행이 되나요??
-
미해결Airflow 마스터 클래스
Chapter 16 스케줄러 부하 줄이기에 관한 질문입니다.
멀티 스케줄러 환경에서, 각 스케줄러는 Dag ParsingDB에 해당정보를 기록Scheduler Loop의 과정을 거친다고 이해했습니다. 여기서, Dag Parsing에 관한 부분은 DB에 관한 정보를 참고해서 하는것이 아닌 코드만을 가지고서 돌려보는 것으로 이해하고 있는데, 그렇다면 1번 과정(Dag Parsing)은 멀티 스케줄러 환경을 구성한다고 해서 시간 단축의 효과를 얻을 수 없는것인가요?? DAG이 10개가 있고, 스케줄러가 두개가 있다면,1번 스케줄러가 첫번째 DAG를 돌렸고, Lock을 걸은 후, 해당 작업이 큐까지 들어가게 되면 DB에 관한 정보의 Lock을 해제 하나요? 그렇다면 만약 다른 스케줄러가 해당 DAG의 정보를 검색했을때, 해당 DAG에 관한 Scheduler Loop를 돌것이고, 그렇다면 이 상황에서는 멀티스케줄러의 이점을 얻을 수 없는것인지가 궁금합니다. 감사합니다.
-
해결됨Airflow 마스터 클래스
Task 실행관련 질문입니다.
안녕하세요! 'Bash Operator & 외부 쉘파일 수행하기' 강의를 듣고 실습하던 중 에러가 생겨서, 해결을 시도했는데 계속해서 실패해서 질문을 남겨봅니다!airflow를 실행하고 태스크를 수행하면 fail이 뜨는데, 로그를 확인하면 파일을 찾을 수 없다고 합니다. [2023-10-12, 14:12:25 KST] {subprocess.py:93} INFO - /bin/bash: line 1: /opt/***_log/plugins/shell/select_fruit.sh: No such file or directory [2023-10-12, 14:12:25 KST] {subprocess.py:97} INFO - Command exited with return code 127 [2023-10-12, 14:12:25 KST] {taskinstance.py:1935} ERROR - Task failed with exception Traceback (most recent call last): File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/bash.py", line 210, in execute raise AirflowException( airflow.exceptions.AirflowException: Bash command failed. The command returned a non-zero exit code 127. 저는 wsl에 디렉토리는 아래 경로처럼 설정했습니다.(select_fruit.sh파일에 실행 권한도 주었습니다.)/airflow_log/plugins/shell/select_fruit.sh 이후 docker-compose.yaml에 volumes 항목을 아래와 같이 설정을 했습니다. ${AIRFLOW_PROJ_DIR:-.}/airflow_log/plugins:/opt/airflow/plugins Vscdoe에서 경로는 아래와 같이 설정을 했습니다.t1_orange = BashOperator( task_id="t1_orange", bash_command="/opt/airflow_log/plugins/shell/select_fruit.sh ORANGE", ) t2_banana = BashOperator( task_id="t2_banana", bash_command="/opt/airflow_log/plugins/shell/select_fruit.sh BANANA", )경로 문제인지, 아니면 다른 문제인지.. 도움 요청해봅니다.저에게 너무 필요한 강의라서 잘 듣고 있습니다! 앞으로도 좋은 강의 부탁드리겠습니다!
-
해결됨Airflow 마스터 클래스
dags 경로 설정 질문 있습니다.
안녕하세요! 현재 Bash operator DAG 만들기 & DAG 디렉토리 셋팅 강의를 수강하고 있습니다.19분 50초 즈음 dags 폴더의 경로를 설정하는 과정에서 git repo 내의 dags를 지정하는 것에 궁금증이 생겨 질문을 작성하게 되었습니다. 만약 제가 다른 repo를 파서 새로운 dags 폴더를 만들게 되면, 그 때는 저 환경 설정을 다시 해야한다고 생각하는데, 이게 맞는지 궁금합니다. 다시 설정을 하지 않으려면, 설정한 dags 안에 모든 dag가 들어가야 하는데, 나중에 헷갈리지는 않을까 걱정이 되어 질문 드립니다. 덕분에 airflow 공부 하는 것이 기대가 됩니다. 좋은 강의 해주셔서 정말 감사합니다.
-
미해결Airflow 마스터 클래스
auto refresh
안녕하세요.강의 잘 듣고 있습니다. 저는 Mac환경에서 수강중이고,airflow 2.7.1을 사용중입니다. 강의에서 dag을 pull하고 2~3분 정도 기다리면 auto-refresh되어 compose를 재실행하지 않아도 된다고 하셨는데,제 경우엔 auto-refresh가 된 이후에도 새로 추가된 dag이 리스트에 나타나지 않더라구요. 그래서 매번 compose를 재실행하고 있는데,혹시 의심가는 원인이 있다면 말씀해주실 수 있으실까요?구글링을 해봐도 해결방법을 찾기 쉽지 않아서 질문 남깁니다ㅜㅜ
-
미해결Airflow 마스터 클래스
mac 환경에서 실습할 경우
안녕하세요 mac 환경에서 강의를 수강하려고 합니다.이러한 경우 wsl 설치 없이 다음 강의로 넘어가면 될까요??
-
미해결실리콘밸리 엔지니어와 함께하는 Apache Airflow
my_first_dag.py 파일 질문 입니다
Standalone 환경을 처음 세팅하게 되면 dags 디렉토리가 별도로 없는데 사용자가 직접 만들어주면 되는지 질문드립니다. 그리고 standalone 으로 airflow 서버를 작동시킨 이후 다시 exit 할 경우 자꾸 localhost에 서버가 남아있어 재 실행이 안되서 컴퓨터 리붓을 해야하는데 혹시 다른 방법이 있을까요?아래와 같은 방식으로 PID 서버를 kill 하지만 그래도 안될 경우가 많은 것 같습니다.(수정) 아래 방법은 작동하는 것 같네요ps -ef | grep airflowsudo kill -9 [PID port]
-
미해결Airflow 마스터 클래스
1년에 한 번 돌리는 스케줄도 에어플로우로 관리하면 좋을까요?
안녕하세요.수업 잘 듣고 있습니다! 년 단위의 데이터를 가져오는 경우엔1년에 딱 한 번만 코드를 작동하면 되는데,이런 경우에도 에어플로우로 관리를 해주는 게 좋을까요?태스크에 비해 너무 자원을 많이 쓰는 건지,그래도 프로세스를 보기 편하게 관리할 수 있다는 점에도 활용하는 게 좋을지고민이 많이 되네요. 강사님의 의견이 궁금합니다!
-
미해결Airflow 마스터 클래스
맥북으로 실습 진행이 가능한지 궁금합니다.
안녕하세요, 현재 M2 macbook을 사용하고 있습니다.강의에서 윈도우 환경에 WSL을 설치한 후 그 위에 airflow를 설치하여 실습을 진행하는 것으로 이해했습니다.구글링을 좀 해보니 맥북에서도 airflow 설치는 충분히 가능한 것으로 보이는데, 만약 맥북에서 설치했다고 하더라도 뒷부분 실습과정에서 많은 차이가 있는지 궁금합니다!
-
미해결실리콘밸리 엔지니어와 함께하는 Apache Airflow
Docker 에서 airflow 사용시 질문드립니다
강의에서는 강사님께서 airflow standalone 을 사용하셨는데,docker image로 airflow 서버 생성한 다음 postgres 서버와 연결하려고 하니 테스트 fail이 나고 DAG도 돌아가지 않는 것 같습니다. 혹시 docker image 상에서 DB connect 하는 방법 가이드 부탁가능하실까요?Airflow - connection settingDB connection info
-
미해결실리콘밸리 엔지니어와 함께하는 Apache Airflow
airflow tasks test error
안녕하세요.강의 실습중에 'airflow tasks test postgres_loader execute_sql_query 2023-01-01 ' 터미널에서 실행후 아래와 같은 오류가 발생해서 여쭤보고자 질문드립니다.Error 본문DB 연결 정보DB Table 생성 정보Airflow UI실행 환경Mac 14 proPython 3.11.5pip freeze aiohttp==3.8.5 aiosignal==1.3.1 alembic==1.11.3 anyio==3.7.1 apache-airflow==2.7.0 apache-airflow-providers-common-sql==1.7.0 apache-airflow-providers-ftp==3.5.0 apache-airflow-providers-http==4.5.0 apache-airflow-providers-imap==3.3.0 apache-airflow-providers-postgres==5.6.0 apache-airflow-providers-sqlite==3.4.3 apispec==6.3.0 argcomplete==3.1.1 asgiref==3.7.2 async-timeout==4.0.3 attrs==23.1.0 autobahn==23.1.2 Automat==22.10.0 Babel==2.12.1 backoff==1.10.0 blinker==1.6.2 cachelib==0.9.0 cachetools==5.3.1 cattrs==23.1.2 certifi==2023.7.22 cffi==1.15.1 channels==4.0.0 charset-normalizer==3.2.0 click==8.1.7 clickclick==20.10.2 colorama==0.4.6 colorlog==4.8.0 ConfigUpdater==3.1.1 connexion==2.14.2 constantly==15.1.0 cron-descriptor==1.4.0 croniter==1.4.1 cryptography==41.0.3 daphne==4.0.0 Deprecated==1.2.14 dill==0.3.1.1 distlib==0.3.5 Django==4.1.4 djangorestframework==3.11.2 dnspython==2.4.2 docutils==0.20.1 email-validator==1.3.1 exceptiongroup==1.1.3 filelock==3.8.0 Flask==2.2.5 Flask-AppBuilder==4.3.3 Flask-Babel==2.0.0 Flask-Caching==2.0.2 Flask-JWT-Extended==4.5.2 Flask-Limiter==3.3.1 Flask-Login==0.6.2 Flask-Session==0.5.0 Flask-SQLAlchemy==2.5.1 Flask-WTF==1.1.1 frozenlist==1.4.0 google-auth==2.22.0 google-re2==1.1 googleapis-common-protos==1.60.0 graphviz==0.20.1 grpcio==1.57.0 gunicorn==21.2.0 h11==0.14.0 httpcore==0.16.3 httpx==0.23.3 hyperlink==21.0.0 idna==3.4 importlib-resources==6.0.1 incremental==22.10.0 inflection==0.5.1 itsdangerous==2.1.2 Jinja2==3.1.2 jsonschema==4.19.0 jsonschema-specifications==2023.7.1 kubernetes==27.2.0 lazy-object-proxy==1.9.0 limits==3.5.0 linkify-it-py==2.0.2 lockfile==0.12.2 Mako==1.2.4 Markdown==3.4.4 markdown-it-py==3.0.0 MarkupSafe==2.1.3 marshmallow==3.20.1 marshmallow-oneofschema==3.0.1 marshmallow-sqlalchemy==0.26.1 mdit-py-plugins==0.4.0 mdurl==0.1.2 multidict==6.0.4 oauthlib==3.2.2 opentelemetry-api==1.15.0 opentelemetry-exporter-otlp==1.15.0 opentelemetry-exporter-otlp-proto-grpc==1.15.0 opentelemetry-exporter-otlp-proto-http==1.15.0 opentelemetry-proto==1.15.0 opentelemetry-sdk==1.15.0 opentelemetry-semantic-conventions==0.36b0 ordered-set==4.1.0 packaging==23.1 pathspec==0.11.2 pendulum==2.1.2 platformdirs==2.5.2 pluggy==1.2.0 prison==0.2.1 protobuf==4.21.12 psutil==5.9.5 psycopg2-binary==2.9.7 pyasn1==0.4.8 pyasn1-modules==0.2.8 pycparser==2.21 pydantic==1.10.12 Pygments==2.16.1 PyJWT==2.8.0 pyOpenSSL==23.0.0 python-daemon==3.0.1 python-dateutil==2.8.2 python-nvd3==0.15.0 python-slugify==8.0.1 pytz==2023.3 pytzdata==2020.1 PyYAML==6.0.1 referencing==0.30.2 requests==2.31.0 requests-oauthlib==1.3.1 requests-toolbelt==1.0.0 rfc3339-validator==0.1.4 rfc3986==1.5.0 rich==13.5.2 rich-argparse==1.2.0 rpds-py==0.9.2 rsa==4.9 service-identity==21.1.0 setproctitle==1.3.2 six==1.16.0 sniffio==1.3.0 SQLAlchemy==1.4.49 SQLAlchemy-JSONField==1.0.1.post0 SQLAlchemy-Utils==0.41.1 sqlparse==0.4.4 tabulate==0.9.0 tenacity==8.2.3 termcolor==2.3.0 text-unidecode==1.3 Twisted==22.10.0 txaio==23.1.1 typing_extensions==4.7.1 uc-micro-py==1.0.2 unicodecsv==0.14.1 urllib3==1.26.16 virtualenv==20.16.3 websocket-client==1.6.2 Werkzeug==2.2.3 wrapt==1.15.0 WTForms==3.0.1 yarl==1.9.2 zope.interface==5.5.2제가 생각하는 에러 원인오류 본문을 보고 DB 연결 문제라 생각이 들어 DB를 재생성 하여지만 아직도 원인은 똑같았습니다. 제가 작성한 코드는 아래 github에 올렸습니다.https://github.com/saohwan/learn-airflow
-
미해결Airflow 마스터 클래스
pythonoperator 를 이용한 unzip 오류
python operator를 이용한 파일 unzip 시 permision denied 오류 발생했습니다.airflow 는 wsl을 이용하여 설치하였고, unzip을 이용하여 네트워크 드라이브에 압축을 풀어 저장하고자 합니다.이때[2023-09-01, 18:12:33 KST] {taskinstance.py:1824} ERROR - Task failed with exception Traceback (most recent call last): File "/home/xxx/airflow_env/lib/python3.10/site-packages/airflow/operators/python.py", line 181, in execute return_value = self.execute_callable() File "/home/xxx/airflow_env/lib/python3.10/site-packages/airflow/operators/python.py", line 198, in execute_callable return self.python_callable(*self.op_args, **self.op_kwargs) File "/home/xxx/airflow/dags/Answersheet_dp_2.py", line 66, in unzipfiles zip_ref.extractall(extract_path) File "/usr/lib/python3.10/zipfile.py", line 1647, in extractall self._extract_member(zipinfo, path, pwd) File "/usr/lib/python3.10/zipfile.py", line 1701, in extractmember open(targetpath, "wb") as target: PermissionError: [Errno 13] Permission denied: '/mnt/s/04.xxxxx/'와 같은 오류가 발생하였습니다.위 문제를 해결하려면 어떻게 해야하나요?
-
미해결실리콘밸리 엔지니어와 함께하는 Apache Airflow
블로그에 글을 올려도 되나요?
강의 내용들을 정리하여 까먹지 않고 앞으로도 찾아서 사용할 수 있게 블로그에 올리고 싶은데 가능할까요?가능하다면 적절한 정도가 어느정도 될지, 그이외에 주의할 점으로 링크등을 남긴다는 점등이 필요할지가 궁금합니다감사합니다
-
해결됨Airflow 마스터 클래스
섹션7: CustomOperator 개발 실습부분 pandas에 관하여
안녕하세요. 강사님 코드를 참조하며 공부를 하다보니, 어느새 새벽시간대가 되었네요다름이 아니라, 방금 전 쯤에 질문 드린 xmltodict는 import 에러가 발생했는데, (비주얼스튜디오 환경에서 pip install 및 wsl2환경에서 pip install 했음에도 불구하고)섹션7: CustomOperator 개발 실습 부근에서 pandas를 사용하는데, 비주얼 스튜디오 코드 가상환경(venv)에서 pandas가 설치되어있지 않아서 제가 pip install pandas로 설치를 하고 진행을 했는데, airflow 상에서 import error 가 나지 않는 점이 궁금해서 질문을 드리게 되었습니다. 추가)xml 데이터가 자꾸 안되어서 다른 데이터를 사용해보자는 마음으로 일단 json 양식까지 지원되는 open api를 물색해봤는데, 잘 동작하네요.---서울시 전기차 충전소 정보(서울시 설치)서울시 전기차 급속충전기 정보 현황(서울시 설치)서울시 자동차 전용도로 위치 정보(좌표계: GS80) 데이터 사이언티스트 쪽에서 데이터 엔지니어로 직무를 바꾸게 되었는데, 아무래도 정보가 정말 없다보니, 뭐부터 시작을 해야할지 막막했었습니다.아직도 신입으로 취업하기 위해서는 구체적으로 어떤 것을 완성해야할지 모르지만, 인프런에서 좋은 강의를 알게 되어서 강사님께 항상 감사하게 생각하고 있습니다.
-
해결됨Airflow 마스터 클래스
섹션7: SimpleHttpOperator import error문제
안녕하세요. 서울시 공공데이터 포털에 있는 실시간 데이터를 바탕으로, 진행해보고 싶어서 강사님께서 짜신 코드와 조금 다르게 구성을 했는데,패키지 에러 문제가 나타났습니다. 에러 사항은 import xmltodict error문제 입니다.(1) 데이터 수집https://data.seoul.go.kr/dataList/OA-21285/A/1/datasetView.do샘플 데이터 양식: http://openapi.seoul.go.kr:8088/{API_KEY}/xml/citydata/{startnum}/{endnum}/POI{num}'POI{num}의 경우, 해당 num은 숫자가 아닌 str형태입니다. (001 ~ 113)(저는 여기서, 전기차 충전소와 충전기 데이터 분석을 통해, 급속 충전기 최적 입지 데이터 분석을 해보고 싶어 크롤링과 MYSQL에 데이터를 수집은 해놓았습니다. )(강사님께 airflow를 배우며, airflow를 통해 배치 기반으로 데이터 수집을 자동화하고 싶습니다.)[충전소 데이터][충전기 데이터]--- (2) 에러 발생!(3) 코드 개요 from airflow import DAG from airflow.operators.bash import BashOperator from airflow.decorators import task from airflow.providers.http.operators.http import SimpleHttpOperator # import common.poi_func as poi_array# poi nums import pendulum with DAG( dag_id="dags_simple_http_operators", start_date=pendulum.datetime(2023, 8, 1, tz="Asia/Seoul"), catchup=False, schedule="0 18-20 * * *" #매일 오후 6시~8시 1시간 간격으로 실행 ) as dag: """ 서울시 실시간 전기차 충전소/충전기 데이터 정보 """ # http_conn_id = 'openapi.seoul.go.kr' #array = poi_array() #for poi_number in array: tb_electric_station_info = SimpleHttpOperator( task_id="tb_electric_station_info", http_conn_id='openapi.seoul.go.kr', endpoint= '{{var.value.apikey_openapi_seoul_go_kr}}/xml/citydata/1/1/광화문·덕수궁', method='GET', headers={"Content-Type": "application/xml"} ) @task(task_id='python_1') def python_1(**kwargs): ti = kwargs['ti'] rslt = ti.xcom_pull(task_ids='tb_electric_station_info') from pprint import pprint import xmltodict response_dict = xmltodict.parse(rslt) pprint(response_dict) tb_electric_station_info >> python_1() 강사님께서는 json.load()를 사용했지만, 해당 실시간 데이터는 xml 형식으로만 받아올 수 있어서, 출력을 하기 위해 xmltodict 를 import 해와서, 잘 받아오는지 테스트를 해보고자 하였습니다. 하나의 페이지에 무수히 많은 태그들이 있기 때문에, 페이지는 1페이지로만 설정을 했습니다. 해당 에러를 해결하기 위해서, pip install xmltodict를 진행하였습니다.그럼에도, 해결이 되지 않아서, git이 연동되어있는 쪽에,pip install xmltodict로 패키지를 설치해주었습니다.혹시 몰라서, docker compose down을 시킨 뒤,다시 docker_compose.yaml 파일이 있는 곳에서 docker compose up을 실행하여 airflow 환경에 접속을 했는데,ModuleNotFoundError: No module named 'xmltodict' [2023-08-15, 23:47:23 KST] {taskinstance.py:1350} INFO - Marking task as FAILED. dag_id=dags_simple_http_operators, task_id=python_1, execution_date=20230815T144715, start_date=20230815T144723, end_date=20230815T144723 [2023-08-15, 23:47:23 KST] {standard_task_runner.py:109} ERROR - Failed to execute job 206 for task python_1 (No module named 'xmltodict'; 77)여전히 해당 모듈이 없다는 에러가 뜨네요 ..어떻게 해결해야할까요? ---[추가1]xcom에는 그래도 데이터가 잘 받아와지는 걸 확인할 수 있었습니다. 해당 데이터를 파싱하는 과정에서 에러가 난 것 같습니다.[추가2]나름대로 방법을 찾아봤는데, https://stackoverflow.com/questions/67851351/cannot-install-additional-requirements-to-apache-airflowdocker_compose.yaml파일을 수정해봤는데도...종료가 되네요..
-
미해결Airflow 마스터 클래스
강의 소스코드 문의
강의에서 진행된 소스코드는 공유안해주시나요?
-
해결됨Airflow 마스터 클래스
Bash Operator with Macros 수업에서 날짜에 대해 올바르게 해석했는지 질문드리고 싶습니다.
안녕하세요. airflow에 관한 질 높은 수업을 정말 잘 듣고 있습니다. 다른 공부보다 airflow 공부하는 게 재밌어서, 시간 가는 줄 모르고 노션에 정리하며 실습을 해보고 있습니다. 다름이 아니라, airflow의 날짜 개념에 대해 제가 올바르게 이해했는지 확인을 하고 싶어서 문의를 드리게 되었습니다.강의 내용은 Bash Operator with macros부분이며dags_bash_with_macros_eg1과 dags_bash_with_macros_eg2에 대한 airflow log 값에 대한 부분입니다 START_DATE와 END_DATE에 대해서 올바르게 이해했는지 여쭈어보고 싶습니다.