인프런 커뮤니티 질문&답변

nathan님의 프로필 이미지
nathan

작성한 질문수

Airflow 마스터 클래스

dags_external_task_sensor 오류 질문

작성

·

213

·

수정됨

0

선생님 안녕하세요

좋은 강의 감사합니다.

 

section 9 dags_external_task_sensor 에서, task b 가 fail로 뜨지 않고 계속 running 인 오류가 나는 데 이유를 모르겠습니다.

 

 

dags_branch_python_operator는 아래와 같습니다.

 

from airflow import DAG
import pendulum
from airflow.operators.python import PythonOperator
from airflow.operators.python import BranchPythonOperator

with DAG(
    dag_id='dags_branch_python_operator',
    start_date=pendulum.datetime(2023,4,1, tz='Asia/Seoul'), 
    schedule='0 1 * * *',
    catchup=False
) as dag:
    

    def select_random():
        import random

        item_lst = ['A','B','C']
        selected_item = random.choice(item_lst)
        # 만약 실행해야 하는 task가 하나라면 task_id를 str 으로 하나만 넣는다.
        # 만약 실행해야 하는 task가 두개 이상이라면 list of str을 넣는다.

        if selected_item == 'A':
            return 'task_a'
        elif selected_item in ['B','C']:
            return ['task_b','task_c']


    python_branch_task = BranchPythonOperator(
        task_id='python_branch_task',
        python_callable=select_random
    )
    
    def common_func(**kwargs):
        print(kwargs['selected'])



    task_a = PythonOperator(
        task_id='task_a',
        python_callable=common_func,
        op_kwargs={'selected':'A'}
    )

    task_b = PythonOperator(
        task_id='task_b',
        python_callable=common_func,
        op_kwargs={'selected':'B'}
    )

    task_c = PythonOperator(
        task_id='task_c',
        python_callable=common_func,
        op_kwargs={'selected':'C'}
    )

    python_branch_task >> [task_a, task_b, task_c]

마지막으로 돌린 기록은 a를 선택하고, b,c 는 skipped 된 상태입니다.

aaa.PNG

 

 

dags_external_task_sensor 는 아래와 같고요

from airflow import DAG
from airflow.sensors.external_task import ExternalTaskSensor
import pendulum
from datetime import timedelta
from airflow.utils.state import State 

with DAG(
    dag_id='dags_external_task_sensor',
    start_date=pendulum.datetime(2023,4,1, tz='Asia/Seoul'),
    schedule='0 7 * * *',
    catchup=False
) as dag:

    external_task_sensor_a = ExternalTaskSensor(
        task_id='external_task_sensor_a',
        external_dag_id = 'dags_branch_python_operator',
        external_task_id='task_a',
        allowed_states=[State.SKIPPED], # task_a 가 skipped로 되면 sensor_a task는 success로 표시된다는 뜻
        # allowed states 조건을 만족하지 못하면 계속 실행된다. 10초마다 
        execution_delta=timedelta(hours=6),
        poke_interval=10 # 10초
    )


    external_task_sensor_b = ExternalTaskSensor(
        task_id='external_task_sensor_b',
        external_dag_id = 'dags_branch_python_operator',
        external_task_id='task_b',
        failed_states=[State.SKIPPED], # task_b 가 skipped로 되면 sensor_b task는 failed로 표시된다는 뜻
        execution_delta=timedelta(hours=6),
        poke_interval=10
    )

    external_task_sensor_c = ExternalTaskSensor(
        task_id='external_task_sensor_c',
        external_dag_id = 'dags_branch_python_operator',
        external_task_id='task_c',
        allowed_states=[State.SUCCESS], # task_c 가 success로 되면 sensor_c task는 success로 표시된다는 뜻
        # success가 뜰때까지 꼐속 시도를 한다. 
        execution_delta=timedelta(hours=6),
        poke_interval=10
    )

 

이대로라면 강의에서 나온것 처럼 , b만 fail로 뜨고 a,c는 계속 running 이어야 하는데요, 셋다 running 이 나옵니다. log를 보면 계속 b를 poke만 하고 있더라고요

bbb.PNG

 

 

혹시 무엇이 문제일까요..?ㅠ

 

 

 

답변 1

0

김현진님의 프로필 이미지
김현진
지식공유자

안녕하세요 Nathan님

Task 3개가 모두 running으로 뜨고 있는것은 모니터링할 대상 DAG의 시간대를 못찾았기 때문입니다.

못 찾고있는 이유는 dags_external_task_sensor DAG을 Manual 로 실행시켰기 때문이에요.

  • 01/07 13:39 분에 manual로 dags_external_task_sensor DAG을 실행시켰고 RUN_ID 는 manual__2024-01-07T04:39:13.768757+00:00 로 나옵니다. (manual__2024-01-07T13:39:13.768757+09:00)

  • 모니터링할 대상 (dags_branch_python_operator)의 task 를 찾을 때는 위 RUN_ID 에 있는 시간보다 6시간 앞선 것을 찾으려 합니다.

  • 그러므로 dags_branch_python_operator DAG 에서 data_interval_start가 01/06 07:39:13 (KST)에 수행된 것을 찾으려고 합니다.

  • 그런데 dags_branch_python_operator DAG에는 그 시간에 수행된 Job이 없을테니 dags_external_task_sensor의 3개 task 모두 무한 대기 하는 것입니다.

 

external task sensor는 manual 로 수행했을 때 저런 문제가 있어요. 일단 실습을 제대로 해보려면 dags_external_task_sensor의 수행 이력 중 RUN_ID가 scheduled__2024-01-06T22:00:00+00:00인 RUN이 있을 거에요. 해당 Run을 선택하시고 clear 버튼을 눌러 재수행해보시겠어요? 그럼 sensing 대상 DAG인 dags_branch_python_operator DAG의 01/07 01:00 (KST)에 수행된 task를 sensing하게 될겁니다.

(물론 저 시간대에 수행된 이력이 있어야 합니다)

nathan님의 프로필 이미지
nathan

작성한 질문수

질문하기