묻고 답해요
141만명의 커뮤니티!! 함께 토론해봐요.
인프런 TOP Writers
-
미해결Airflow 마스터 클래스
외부 파이썬 함수 수행하기 관련 질문 드립니다.
안녕하세요. 좋은 강의 감사히 잘 들었습니다.강의에서 궁금한 점이 있어서 질문 드립니다.1) 첫 번째 질문강의 제목 : 외부 파이썬 함수 수행하기환경 구성 : macOS, Docker, Airflow 2.7문의 내용 : 강의 초반에 언급한 환경변수로 설정하는 방법상세 내용 : 강의 후반에 알려주신 내용은 방법을 이해했습니다. 다만 초반에 알려주신 항목 중 sys.path 는 명시적 구현이라 쉽게 가능했으나, 환경변수 쪽은 찾아봐도 쉽게 이해가 안 돼서 질문하게 됐습니다. 복습하면서 여러 방법을 알면 좋을 것 같아서 질문 드립니다.2) 두 번째 질문강의 제목 : 없음환경 구성 : Docker 설치 시 생성하는 디렉터리 이외에 추가 디렉터리 생성 후 볼륨 마운트. 예를 들어 utils 라는 디렉터리를 생성 후 docker-compose.yaml 의 volumes 에 ${AIRFLOW_PROJ_DIR:-.}/utils:/opt/airflow/utils 를 추가 할 경우문의 내용 : 추가로 디렉터리 생성할 경우 plugins 디렉터리 외에서 py 파일을 읽어야 하는 경우상세 내용 : 첫 번째 질문과 유사하게 환경변수에 관한 질문일 것 같습니다. 새로운 디렉터리를 생성해서 작업할 때 경로를 인식 시켜야 하는데, sys.path 로만 하는 것은 번거로운 것 같아서 어떤 방식으로 접근을 하면 좋을지 조언을 듣고 싶습니다. 간략하게 정리하면 plugins 디렉터리 이외의 환경도 같이 사용을 하는 방법이 궁금합니다.3) 세 번째 질문강의 제목 : 없음환경 구성 : 1, 2와 동일문의 내용 : 새로운 패키지 설치 할 때 설치 방법 및 운영 관리 노하우상세 내용 : docker compose 할 때 yaml 에 PIPADDITIONAL_REQUIREMENTS 부분에 설치할 패키지 목록을 입력해서 설치를 하고 있습니다. 이렇게 해도 되는 것 같긴한데, 나중에 더 많은 패키지를 설치하게 될 경우 좋지 않은 형태라고 생각이 들었습니다. 실제로 현업에서 업무를 하실 때 다양한 패키지를 어떤 식으로 설치 및 운영 관리 하시는지 궁금합니다.감사합니다. 다른 강의도 기대하겠습니다.
-
미해결Airflow 마스터 클래스
Bind for 0.0.0.0:8080 failed: port is already allocated
아무것도 변경한게 없는데 아래 에러가 나옵니다Error response from daemon: driver failed programming external connectivity on endpointBind for 0.0.0.0:8080 failed: port is already allocated 컴퓨터를 재시작해도 나옵니다. 도커 데스크탑이 깔려있긴 하지만 종료한 상태입니다. 설마 도커 데스크탑 설치했다고 이러는 건 아니겠죠..?
-
해결됨Airflow 마스터 클래스
DB volume 설정 - 컨테이너 재기동시 DBeaver 테이블 사라지는 경우
안녕하세요, docker volume 설정에 관해서 궁금한 점이 있어 질문드립니다. (상황설명)postgres 컨테이너 올리는 내용을 참고하여 mariadb 컨테이너를 올리고자 하는 상황입니다. (docker-compose.yaml에서 mariadb 컨테이너 내용 추가 --> docker compose up --> dbeaver 연결) (질문)도커의 경우 컨테이너를 내리면 데이터가 모두 사라지기 때문에, 데이터를 영속적으로 저장하고 있기 위해서 볼륨을 설정한다고 이해했습니다. 제가 이해한 바에 따르면 DB 컨테이너에서 볼륨을 지정했을 때엔, 도커를 재기동해도 데이터가 남아있게 되는 것인데 --> 도커 재기동시 dbeaver mariadb의 테이블과 데이터가 사라지게 되는 상황이 맞는 걸까요..? 도커 재기동시에도 dbeaver 데이터를 남겨두고 싶은데 제가 볼륨 설정을 잘못한 것인지? 혹은 제가 볼륨에 대한 이해한 것이 잘못 되었다면 어떻게 설정해야 재기동시에도 dbeaver mariadb 데이터가 남아있을 수 있을지? 여쭤봅니다. *도커 재기동 => docker compose down / docker compose up -d(참고내용)#docker-compose.yamlservices: mariadb: image: mariadb:10 container_name: mariadb-container environment: MYSQL_USER: user MYSQL_PASSWORD: passwd MYSQL_ROOT_PASSWORD: root_pw MYSQL_DATABASE: mariadb TZ: Asia/Seoul volumes: - mariadb-db-volume:/var/lib/mysql/data restart: always ports: - 3307:3306 networks: network_custom: ipv4_address: 172.28.0.2 ... volumes: postgres-db-volume: mariadb-db-volume: networks: network_custom: driver: bridge ipam: driver: default config: - subnet: 172.28.0.0/16 gateway: 172.28.0.1#sudo docker volume ls#sudo docker inspect dhkim_mariadb-db-volume#volume mount directorycd /var/lib/docker/volumes/dhkim_mariadb-db-volume/_datals -al#도커재기동시 dbeaver 화면(table 사라짐)
-
미해결Airflow 마스터 클래스
trigger rule 설정 질문
안녕하세요. 트리거룰 기능 관련해서 질문드립니다. 강의에는 1개 이상 스킵, 컴플리트, 모두 컴플리트이런 식의 조건만 소개되어 있는데, 특정 테스크를 지정해서 설정하는 방법은 없나요?예를 들어, 5개 상위 테스크 중에 2,4번 테스크가 완료되는 경우에만 실행한다, 이런 식의 조건이 가능한지 궁금합니다. 별개로 airflow2 강의도 계획 중이신지 궁금합니다!
-
미해결Airflow 마스터 클래스
test 버튼 비활성화
SimpleHttp 오퍼레이터로 서울시 공공데이터 API 받아오기위해 커넥션 작성 중 test버튼이 비활성화 되어있습니다.구글링을 통해 해당 도커 airflow-webserver 의 ariflow.cfg 상태변수 test_connection = Enabled로 변경 후 도커를 내렸다 다시 올렸는데도 그대로 test 버튼이 비활성화 되어있습니다. 해결 방법이 궁금합니다.참고로 저는 unbuntu 22 버전에 실습 중 입니다.
-
해결됨Airflow 마스터 클래스
HDFS, Hive new Connection : apt-get update
HDFS, Hive를 위한 Connection 추가 과정에서 이미지를 빌드하는 부분에서 에러가 발생합니다.아래 명령 실행 후 에러가 발생하며, 에러 부분은 이미지(참고2)로 첨부하였습니다.sudo docker build -t airflow_custom . (참고 - Dockerfile 내용 / airflow 2.8.0 version 설치) (참고2 - 에러로그)
-
미해결Airflow 마스터 클래스
python 오퍼레이터 실행되지 않음
아래 코드 작동을 안합니다 어디가 잘못된 걸까요 operator 생성된게 보이지 않습니다 git pull까지 다한 상태입니다 from airflow import DAG import pendulum import datetime from airflow.operators.python import PythonOperator import random with DAG( dag_id="dags_python_operator", schedule="30 6 * *", start_date=pendulum.datetime(2024, 1, 9, tz="Asia/Seoul"), catchup=False, ) as dag: def select_fruit(): fruit = ['APPLE', 'BANANA', 'ORANGE', 'AVOCADO'] rand_int = random.randint(0,3) print(fruit[rand_int]) py_t1 = PythonOperator( task_id = 'py_t1', python_callable=select_fruit ) py_t1
-
미해결Airflow 마스터 클래스
dags_external_task_sensor 오류 질문
선생님 안녕하세요좋은 강의 감사합니다. section 9 dags_external_task_sensor 에서, task b 가 fail로 뜨지 않고 계속 running 인 오류가 나는 데 이유를 모르겠습니다. dags_branch_python_operator는 아래와 같습니다. from airflow import DAG import pendulum from airflow.operators.python import PythonOperator from airflow.operators.python import BranchPythonOperator with DAG( dag_id='dags_branch_python_operator', start_date=pendulum.datetime(2023,4,1, tz='Asia/Seoul'), schedule='0 1 * * *', catchup=False ) as dag: def select_random(): import random item_lst = ['A','B','C'] selected_item = random.choice(item_lst) # 만약 실행해야 하는 task가 하나라면 task_id를 str 으로 하나만 넣는다. # 만약 실행해야 하는 task가 두개 이상이라면 list of str을 넣는다. if selected_item == 'A': return 'task_a' elif selected_item in ['B','C']: return ['task_b','task_c'] python_branch_task = BranchPythonOperator( task_id='python_branch_task', python_callable=select_random ) def common_func(**kwargs): print(kwargs['selected']) task_a = PythonOperator( task_id='task_a', python_callable=common_func, op_kwargs={'selected':'A'} ) task_b = PythonOperator( task_id='task_b', python_callable=common_func, op_kwargs={'selected':'B'} ) task_c = PythonOperator( task_id='task_c', python_callable=common_func, op_kwargs={'selected':'C'} ) python_branch_task >> [task_a, task_b, task_c]마지막으로 돌린 기록은 a를 선택하고, b,c 는 skipped 된 상태입니다. dags_external_task_sensor 는 아래와 같고요from airflow import DAG from airflow.sensors.external_task import ExternalTaskSensor import pendulum from datetime import timedelta from airflow.utils.state import State with DAG( dag_id='dags_external_task_sensor', start_date=pendulum.datetime(2023,4,1, tz='Asia/Seoul'), schedule='0 7 * * *', catchup=False ) as dag: external_task_sensor_a = ExternalTaskSensor( task_id='external_task_sensor_a', external_dag_id = 'dags_branch_python_operator', external_task_id='task_a', allowed_states=[State.SKIPPED], # task_a 가 skipped로 되면 sensor_a task는 success로 표시된다는 뜻 # allowed states 조건을 만족하지 못하면 계속 실행된다. 10초마다 execution_delta=timedelta(hours=6), poke_interval=10 # 10초 ) external_task_sensor_b = ExternalTaskSensor( task_id='external_task_sensor_b', external_dag_id = 'dags_branch_python_operator', external_task_id='task_b', failed_states=[State.SKIPPED], # task_b 가 skipped로 되면 sensor_b task는 failed로 표시된다는 뜻 execution_delta=timedelta(hours=6), poke_interval=10 ) external_task_sensor_c = ExternalTaskSensor( task_id='external_task_sensor_c', external_dag_id = 'dags_branch_python_operator', external_task_id='task_c', allowed_states=[State.SUCCESS], # task_c 가 success로 되면 sensor_c task는 success로 표시된다는 뜻 # success가 뜰때까지 꼐속 시도를 한다. execution_delta=timedelta(hours=6), poke_interval=10 ) 이대로라면 강의에서 나온것 처럼 , b만 fail로 뜨고 a,c는 계속 running 이어야 하는데요, 셋다 running 이 나옵니다. log를 보면 계속 b를 poke만 하고 있더라고요 혹시 무엇이 문제일까요..?ㅠ
-
미해결Airflow 마스터 클래스
시각화 관련 질문
선생님 안녕하세요좋은 강의 감사합니다. 강의 내용에 대한 질문은 아니고요. 혹시 실습1에서 Rshiny를 사용하신 이유가 따로 있으실까요?Airflow는 파이썬이고, Rshiny를 처음 보는 것이어서 파이썬으로 된 시각화 라이브러리를 왜 사용하지 않는지 궁금합니다.plotly의 dash 같은 것은 airflow와 연동이 안되는건가요?
-
미해결Airflow 마스터 클래스
맥으로 에어플로우 라이브러리 설치가 안됩니다.
1 error generated. error: command '/usr/bin/gcc' failed with exit code 1 [end of output] note: This error originates from a subprocess, and is likely not a problem with pip. ERROR: Failed building wheel for google-re2Failed to build google-re2ERROR: Could not build wheels for google-re2, which is required to install pyproject.toml-based projects이렇게 뜨는데 뭐가 문젤까요 검색해도 해결방법이 안뜨네요.
-
해결됨실리콘밸리 엔지니어와 함께하는 Apache Airflow
병렬처리 질문드립니다.
안녕하세요 선생님 🙂 airflow 실습중에 airflow의 병렬처리에서 메시지 큐가 어떻게 처리되는지 궁금하여 질문드립니다!celery와 k8s를 병렬처리에 사용함에 있어서 메시지 큐를 별도로 설정하지 않는것 같은데요. 이 둘은 메시지 큐를 알아서 처리해주는건거요? celery와 k8s를 사용한 병렬 처리방식은 이해못해서 일단은 concurrent 패키지의 ThreadPoolExecutor 사용하여 병렬 처리를 하였습니다. airflow에서 병렬처리시 일반적으로 threadPool을 사용하는지도 궁금합니다. threadPool이 일반적이지 않다면 어떤 방식으로 병렬 처리를 하는지 궁금합니다!항상 감사합니다! 🙂
-
해결됨Airflow 마스터 클래스
색션8 postgres
안녕하세요 선생님색션8 2장에서 docker-compose.yaml파일을 수정 하고sudo docker compose up 하니 docker-compose.yaml: services.airflow-scheduler.depends_on.networks condition is required라는 오류가 납니다. 코드는# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # # Basic Airflow cluster configuration for CeleryExecutor with Redis and PostgreSQL. # # WARNING: This configuration is for local development. Do not use it in a production deployment. # # This configuration supports basic configuration using environment variables or an .env file # The following variables are supported: # # AIRFLOW_IMAGE_NAME - Docker image name used to run Airflow. # Default: apache/airflow:2.7.3 # AIRFLOW_UID - User ID in Airflow containers # Default: 50000 # AIRFLOW_PROJ_DIR - Base path to which all the files will be volumed. # Default: . # Those configurations are useful mostly in case of standalone testing/running Airflow in test/try-out mode # # _AIRFLOW_WWW_USER_USERNAME - Username for the administrator account (if requested). # Default: airflow # _AIRFLOW_WWW_USER_PASSWORD - Password for the administrator account (if requested). # Default: airflow # _PIP_ADDITIONAL_REQUIREMENTS - Additional PIP requirements to add when starting all containers. # Use this option ONLY for quick checks. Installing requirements at container # startup is done EVERY TIME the service is started. # A better way is to build a custom image or extend the official image # as described in https://airflow.apache.org/docs/docker-stack/build.html. # Default: '' # # Feel free to modify this file to suit your needs. --- version: '3.8' x-airflow-common: &airflow-common # In order to add custom dependencies or upgrade provider packages you can use your extended image. # Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml # and uncomment the "build" line below, Then run `docker-compose build` to build the images. image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.7.3} # build: . environment: &airflow-common-env AIRFLOW__CORE__EXECUTOR: CeleryExecutor AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow # For backward compatibility, with Airflow <2.3 AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0 AIRFLOW__CORE__FERNET_KEY: '' AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true' AIRFLOW__CORE__LOAD_EXAMPLES: 'true' AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session' # yamllint disable rule:line-length # Use simple http server on scheduler for health checks # See https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/logging-monitoring/check-health.html#scheduler-health-check-server # yamllint enable rule:line-length AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK: 'true' # WARNING: Use _PIP_ADDITIONAL_REQUIREMENTS option ONLY for a quick checks # for other purpose (development, test and especially production usage) build/extend Airflow image. _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-} AIRFLOW__SMTP__SMTP_HOST: 'smtp.gmail.com' AIRFLOW__SMTP__SMTP_USER: '' AIRFLOW__SMTP__SMTP_PASSWORD: '' AIRFLOW__SMTP__SMTP_PORT: 587 AIRFLOW__SMTP__SMTP_MAIL_FROM: '' volumes: - ${AIRFLOW_PROJ_DIR:-.}/airflow/dags:/opt/airflow/dags - ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs - ${AIRFLOW_PROJ_DIR:-.}/config:/opt/airflow/config - ${AIRFLOW_PROJ_DIR:-.}/airflow/plugins:/opt/airflow/plugins - ${AIRFLOW_PROJ_DIR:-.}/airflow/files:/opt/airflow/files user: "${AIRFLOW_UID:-50000}:0" depends_on: &airflow-common-depends-on redis: condition: service_healthy postgres: condition: service_healthy services: postgres_custom: image: postgres:13 environment: POSTGRES_USER: userbbs POSTGRES_PASSWORD: userbbs POSGRES_DB: userbbs TZ: Asia/Seoul volumes: - postgres-custom-db-volume:/var/lib/postgresql/data ports: - 5432:5432 networks: network_custom: ipv4_address: 172.28.0.3 postgres: image: postgres:13 environment: POSTGRES_USER: airflow POSTGRES_PASSWORD: airflow POSTGRES_DB: airflow volumes: - postgres-db-volume:/var/lib/postgresql/data healthcheck: test: ["CMD", "pg_isready", "-U", "airflow"] interval: 10s retries: 5 start_period: 5s restart: always ports: - 5431:5432 networks: network_custom: ipv4_address: 172.28.0.4 redis: image: redis:latest expose: - 6379 healthcheck: test: ["CMD", "redis-cli", "ping"] interval: 10s timeout: 30s retries: 50 start_period: 30s restart: always networks: network_custom: ipv4_address: 172.28.0.5 airflow-webserver: <<: *airflow-common command: webserver ports: - "8080:8080" healthcheck: test: ["CMD", "curl", "--fail", "http://localhost:8080/health"] interval: 30s timeout: 10s retries: 5 start_period: 30s restart: always depends_on: <<: *airflow-common-depends-on airflow-init: condition: service_completed_successfully networks: network_custom: ipv4_address: 172.28.0.6 airflow-scheduler: <<: *airflow-common command: scheduler healthcheck: test: ["CMD", "curl", "--fail", "http://localhost:8974/health"] interval: 30s timeout: 10s retries: 5 start_period: 30s restart: always depends_on: <<: *airflow-common-depends-on airflow-init: condition: service_completed_successfully networks: network_custom: ipv4_address: 172.28.0.7 airflow-worker: <<: *airflow-common command: celery worker healthcheck: # yamllint disable rule:line-length test: - "CMD-SHELL" - 'celery --app airflow.providers.celery.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}" || celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"' interval: 30s timeout: 10s retries: 5 start_period: 30s environment: <<: *airflow-common-env # Required to handle warm shutdown of the celery workers properly # See https://airflow.apache.org/docs/docker-stack/entrypoint.html#signal-propagation DUMB_INIT_SETSID: "0" restart: always depends_on: <<: *airflow-common-depends-on airflow-init: condition: service_completed_successfully networks: network_custom: ipv4_address: 172.28.0.8 airflow-triggerer: <<: *airflow-common command: triggerer healthcheck: test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"'] interval: 30s timeout: 10s retries: 5 start_period: 30s restart: always depends_on: <<: *airflow-common-depends-on airflow-init: condition: service_completed_successfully networks: network_custom: ipv4_address: 172.28.0.9 airflow-init: <<: *airflow-common entrypoint: /bin/bash # yamllint disable rule:line-length command: - -c - | function ver() { printf "%04d%04d%04d%04d" $${1//./ } } airflow_version=$$(AIRFLOW__LOGGING__LOGGING_LEVEL=INFO && gosu airflow airflow version) airflow_version_comparable=$$(ver $${airflow_version}) min_airflow_version=2.2.0 min_airflow_version_comparable=$$(ver $${min_airflow_version}) if (( airflow_version_comparable < min_airflow_version_comparable )); then echo echo -e "\033[1;31mERROR!!!: Too old Airflow version $${airflow_version}!\e[0m" echo "The minimum Airflow version supported: $${min_airflow_version}. Only use this or higher!" echo exit 1 fi if [[ -z " 입니다. 어디서 잘못 된걸까요?
-
미해결Airflow 마스터 클래스
postgres port 설정 질문있습니다
안녕하세요 선생님!지금 DBeaver에 DB 연결하는 과정 진행중에있는데, 제가 기존에 5432포트를 사용하고있어서 그런지 아래와같이 에러가 발생합니다.이런 경우에는 yaml 파일에서 포트를 임의로 수정해도 괜찮을까요? 임의로 수정했을때 혹시 이후의 과정에서 문제가 없는지 궁금합니다.그리고 동일번호의 포트는 DBeaver에서 2개 이상 쓸수없는게 맞는지도 궁금합니다
-
해결됨Airflow 마스터 클래스
section 8 postgres 연결 안됨
선생님 안녕하세요좋은 강의 감사합니다. section 8, connection 과 hook 강의 9분 대에서 처음으로 postgres 서버와 연결하는 부분에서 연결이 안된다고 에러가 뜨는데요 에러내용은 아래와 같습니다.참고로 저는 5432 포트 대신 5429 포트를 사용했고, dbeaver 상에 postgres도 제대로 연결이 되었고, py_opr_drct_insrt 테이블도 제대로 만들어졌습니다.오류 내용 중에 이런 내용이 있던데conn = _connect(dsn, connection_factory=connection_factory, **kwasync)psycopg2.OperationalError: connection to server at "172.28.0.3", port 5429 failed: Connection refusedIs the server running on that host and accepting TCP/IP connections? postgres 설정에 무슨 문제가 있는 것일까요? 891c9ef91a84*** Found local files:*** * /opt/airflow/logs/dag_id=dags_python_with_postgres/run_id=manual__2024-01-01T05:58:04.315802+00:00/task_id=insrt_postgres/attempt=1.log[2024-01-01, 14:58:06 KST] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: dags_python_with_postgres.insrt_postgres manual__2024-01-01T05:58:04.315802+00:00 [queued]>[2024-01-01, 14:58:06 KST] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: dags_python_with_postgres.insrt_postgres manual__2024-01-01T05:58:04.315802+00:00 [queued]>[2024-01-01, 14:58:06 KST] {taskinstance.py:1308} INFO - Starting attempt 1 of 1[2024-01-01, 14:58:06 KST] {taskinstance.py:1327} INFO - Executing <Task(PythonOperator): insrt_postgres> on 2024-01-01 05:58:04.315802+00:00[2024-01-01, 14:58:07 KST] {standard_task_runner.py:57} INFO - Started process 80 to run task[2024-01-01, 14:58:07 KST] {standard_task_runner.py:84} INFO - Running: ['***', 'tasks', 'run', 'dags_python_with_postgres', 'insrt_postgres', 'manual__2024-01-01T05:58:04.315802+00:00', '--job-id', '447', '--raw', '--subdir', 'DAGS_FOLDER/9. dags_python_with_postgres.py', '--cfg-path', '/tmp/tmp7_5f243h'][2024-01-01, 14:58:07 KST] {standard_task_runner.py:85} INFO - Job 447: Subtask insrt_postgres[2024-01-01, 14:58:07 KST] {task_command.py:410} INFO - Running <TaskInstance: dags_python_with_postgres.insrt_postgres manual__2024-01-01T05:58:04.315802+00:00 [running]> on host 891c9ef91a84[2024-01-01, 14:58:07 KST] {taskinstance.py:1547} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='dags_python_with_postgres' AIRFLOW_CTX_TASK_ID='insrt_postgres' AIRFLOW_CTX_EXECUTION_DATE='2024-01-01T05:58:04.315802+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2024-01-01T05:58:04.315802+00:00'[2024-01-01, 14:58:07 KST] {taskinstance.py:1824} ERROR - Task failed with exceptionTraceback (most recent call last):File "/home/airflow/.local/lib/python3.7/site-packages/airflow/operators/python.py", line 181, in executereturn_value = self.execute_callable()File "/home/airflow/.local/lib/python3.7/site-packages/airflow/operators/python.py", line 198, in execute_callablereturn self.python_callable(*self.op_args, **self.op_kwargs)File "/opt/airflow/dags/9. dags_python_with_postgres.py", line 16, in insrt_postgreswith closing(psycopg2.connect(host=ip, dbname=dbname, user=user, password=passwd, port=int(port))) as conn:File "/home/airflow/.local/lib/python3.7/site-packages/psycopg2/__init__.py", line 122, in connectconn = _connect(dsn, connection_factory=connection_factory, **kwasync)psycopg2.OperationalError: connection to server at "172.28.0.3", port 5429 failed: Connection refusedIs the server running on that host and accepting TCP/IP connections?[2024-01-01, 14:58:07 KST] {taskinstance.py:1350} INFO - Marking task as FAILED. dag_id=dags_python_with_postgres, task_id=insrt_postgres, execution_date=20240101T055804, start_date=20240101T055806, end_date=20240101T055807[2024-01-01, 14:58:07 KST] {standard_task_runner.py:109} ERROR - Failed to execute job 447 for task insrt_postgres (connection to server at "172.28.0.3", port 5429 failed: Connection refusedIs the server running on that host and accepting TCP/IP connections?; 80)[2024-01-01, 14:58:07 KST] {local_task_job_runner.py:225} INFO - Task exited with return code 1[2024-01-01, 14:58:07 KST] {taskinstance.py:2653} INFO - 0 downstream tasks scheduled from follow-on schedule check 같은 강의 15:00 부분 connection 부분 실습에서 test를 누르면 아래와 같은 에러가 뜹니다. 아마 같은게 아닐까 싶습니다connection to server at "172.28.0.3", port 5429 failed: Connection refused Is the server running on that host and accepting TCP/IP connections? dags_python_with_postgres 파일에서 구간마다 print 를 해서 실행되는지 여부를 확인해본 결과 print('실행1')with closing(psycopg2.connect(host=ip, dbname=dbname, user=user, password=passwd, port=int(port))) as conn:print('conn', conn) '실행1' 은 프린트가 되는데 conn은 프린트가 안됩니다.그런데 ip, port, dbname, user, passwd, **kwargs 모두 정확하게 들어갔거든요 ㅠ
-
해결됨Airflow 마스터 클래스
스케줄링과 관련된 질문입니다.
섹션3, 첫번째 강의에서 스케줄에 관한 질문입니다. 강의에서, start_date = 2023.1.1schedule = "30 6 * * *"next_run = 2023 03 18 6:30그리고 현재 날짜는 2023 03 19 인것으로 봤는데, next_run이 현재날짜보다 뒤에 있으니 pause -> unpause 를 하게되면 자동실행이 안되어야 하는거 아닌가요? 강의에서는 자동으로 실행되는것으로 봐서요. 감사합니다.
-
해결됨Airflow 마스터 클래스
섹션 7 custom operator 실습 질문 (variable)
선생님 안녕하세요좋은 강의 감사드립니다. 섹션 7 custom operator 실습 강의에서 계속 오류가 나서, 이런 저런 시도를 해보던 도중에 variable 에 문제가 있다는 것을 알고 수정해서 해결을 했는데요. 강의대로 했는데 안되는 이유를 모르겠습니다. 아래는 저의seoul_api_to_csv_operator.py 파일인데요 class SeoulApiToCsvOperator(BaseOperator): template_fields = ('endpoint', 'path','file_name','base_dt') def init(self, dataset_nm, path, file_name, base_dt=None, **kwargs): super().__init__(**kwargs) self.http_conn_id = 'openapi.seoul.go.kr' self.path = path self.file_name = file_name self.endpoint = '{{var.value.apikey_openapi_seoul_go_kr}}/json/' + dataset_nm self.base_dt = base_dt self.endpoint = '{{var.value.apikey_openapi_seoul_go_kr}}/json/' + dataset_nm 부분을self.endpoint = 'api key 직접 입력/json' + dataset_nm 으로 바꾸니까 코드가 정상적으로 실행이 되었습니다. 그런데 이해가 안되는것이 airflow admin --> variable 에 들어가서 강의에 나오는대로 그대로 입력을 했거든요. Val 부분에 정확한 api key가 입력된 것도 수차례 확인했습니다. 어디에서 문제가 있는걸까요??
-
해결됨Airflow 마스터 클래스
docker 폴더 관련 질문
안녕하세요좋은 강의 감사드립니다. 섹션 7 custom operator 실습(마지막 강의) 16:11 부분에 보시면요폴더 구조를 아래와 같이 만들어놓으셨더라고요 조금 이해가 안되는게, 왜 dags 폴더를 airflow 바깥에도 만들어놓시고 안에도 만들어놓으신거죠?logs는 또 airflow 폴더 밖에 있고요. 강의 16:16 에 보면, dags는 airflow 안에 있는 것을 사용하고 계신것 같은데, (2) airflow 밖에 있는 dags 폴더를 만드신 이유, 그리고 (2) 왜 logs 폴더는 dags, plugins 와 다르게 airflow 밖에 만드셨는지 가 궁금합니다.
-
해결됨실리콘밸리 엔지니어와 함께하는 Apache Airflow
connection 정보 이전 방법 질문드립니다.
안녕하세요 선생님 🙂 connection 정보를 새로운 환경에 이식 방법 질문드립니다.stackoverflow 등을 찾아보니 csv파일 또는 sh 파일로 구축하여 사용하는 것같은데요이 같은 방법은 보안에 취약할수있겠다는 생각이 들어서 어떤 방식으로 이식가능한 connection 정보를 저장하는것이 좋은지 질문드립니다. stackover reference - https://stackoverflow.com/questions/55626195/export-all-airflow-connections-to-new-environment즐거운 크리스마스 연휴되세요!감사합니다! 🙂
-
미해결Airflow 마스터 클래스
scheduled slot의 개수도 제한이 있나요?
예전에 airflow를 쓰면서 DAG의 task가 스케쥴에 따라서 실행되지 않았던 적이 있습니다. 실행될 시간인데 scheduled상태로도 들어가지 않고 아예 사라졌다가 한참 후에 다시 scheduled되어서 중간에 실행되지 않는 시간이 많이 생겼습니다. 당시에는 pool의 slot수를 늘리니 해결이 되긴 했습니다. 그런데 강의에 따르면 slot수에 scheduled는 영향을 주지 않는 것으로 이해가 됐습니다.혹시 scheduled도 따로 slot개수를 가지게 되는건가요? p.s. 그때는 몰랐지만 airflow가 너무 자주 실행하는 경우에는 적합하지 않다는 거는 알게된 경험이었습니다. 당시 스케쥴은 1분마다 실행시키는 DAG들을 많이 만들었거든요
-
해결됨실리콘밸리 엔지니어와 함께하는 Apache Airflow
PostgresOperator로 대량의 데이터 업로드 방법 질문드립니다.
안녕하세요 선생님 🙂 PostgresOperator 질문 드립니다. DB table에 데이터를 갱신하는 task를 혼자 만들어보고 있는데요.PostgresOperator는 executemany와 같은 기능을 지원하지 않는 것으로 확인했습니다. airflow에서 대량의 데이터를 insert / update 하는 방법이 있을까요..?