작성
·
213
·
수정됨
0
선생님 안녕하세요
좋은 강의 감사합니다.
section 9 dags_external_task_sensor 에서, task b 가 fail로 뜨지 않고 계속 running 인 오류가 나는 데 이유를 모르겠습니다.
dags_branch_python_operator는 아래와 같습니다.
from airflow import DAG
import pendulum
from airflow.operators.python import PythonOperator
from airflow.operators.python import BranchPythonOperator
with DAG(
dag_id='dags_branch_python_operator',
start_date=pendulum.datetime(2023,4,1, tz='Asia/Seoul'),
schedule='0 1 * * *',
catchup=False
) as dag:
def select_random():
import random
item_lst = ['A','B','C']
selected_item = random.choice(item_lst)
# 만약 실행해야 하는 task가 하나라면 task_id를 str 으로 하나만 넣는다.
# 만약 실행해야 하는 task가 두개 이상이라면 list of str을 넣는다.
if selected_item == 'A':
return 'task_a'
elif selected_item in ['B','C']:
return ['task_b','task_c']
python_branch_task = BranchPythonOperator(
task_id='python_branch_task',
python_callable=select_random
)
def common_func(**kwargs):
print(kwargs['selected'])
task_a = PythonOperator(
task_id='task_a',
python_callable=common_func,
op_kwargs={'selected':'A'}
)
task_b = PythonOperator(
task_id='task_b',
python_callable=common_func,
op_kwargs={'selected':'B'}
)
task_c = PythonOperator(
task_id='task_c',
python_callable=common_func,
op_kwargs={'selected':'C'}
)
python_branch_task >> [task_a, task_b, task_c]
마지막으로 돌린 기록은 a를 선택하고, b,c 는 skipped 된 상태입니다.
dags_external_task_sensor 는 아래와 같고요
from airflow import DAG
from airflow.sensors.external_task import ExternalTaskSensor
import pendulum
from datetime import timedelta
from airflow.utils.state import State
with DAG(
dag_id='dags_external_task_sensor',
start_date=pendulum.datetime(2023,4,1, tz='Asia/Seoul'),
schedule='0 7 * * *',
catchup=False
) as dag:
external_task_sensor_a = ExternalTaskSensor(
task_id='external_task_sensor_a',
external_dag_id = 'dags_branch_python_operator',
external_task_id='task_a',
allowed_states=[State.SKIPPED], # task_a 가 skipped로 되면 sensor_a task는 success로 표시된다는 뜻
# allowed states 조건을 만족하지 못하면 계속 실행된다. 10초마다
execution_delta=timedelta(hours=6),
poke_interval=10 # 10초
)
external_task_sensor_b = ExternalTaskSensor(
task_id='external_task_sensor_b',
external_dag_id = 'dags_branch_python_operator',
external_task_id='task_b',
failed_states=[State.SKIPPED], # task_b 가 skipped로 되면 sensor_b task는 failed로 표시된다는 뜻
execution_delta=timedelta(hours=6),
poke_interval=10
)
external_task_sensor_c = ExternalTaskSensor(
task_id='external_task_sensor_c',
external_dag_id = 'dags_branch_python_operator',
external_task_id='task_c',
allowed_states=[State.SUCCESS], # task_c 가 success로 되면 sensor_c task는 success로 표시된다는 뜻
# success가 뜰때까지 꼐속 시도를 한다.
execution_delta=timedelta(hours=6),
poke_interval=10
)
이대로라면 강의에서 나온것 처럼 , b만 fail로 뜨고 a,c는 계속 running 이어야 하는데요, 셋다 running 이 나옵니다. log를 보면 계속 b를 poke만 하고 있더라고요
혹시 무엇이 문제일까요..?ㅠ
답변 1
0
안녕하세요 Nathan님
Task 3개가 모두 running으로 뜨고 있는것은 모니터링할 대상 DAG의 시간대를 못찾았기 때문입니다.
못 찾고있는 이유는 dags_external_task_sensor DAG을 Manual 로 실행시켰기 때문이에요.
01/07 13:39 분에 manual로 dags_external_task_sensor DAG을 실행시켰고 RUN_ID 는 manual__2024-01-07T04:39:13.768757+00:00 로 나옵니다. (manual__2024-01-07T13:39:13.768757+09:00)
모니터링할 대상 (dags_branch_python_operator)의 task 를 찾을 때는 위 RUN_ID 에 있는 시간보다 6시간 앞선 것을 찾으려 합니다.
그러므로 dags_branch_python_operator DAG 에서 data_interval_start가 01/06 07:39:13 (KST)에 수행된 것을 찾으려고 합니다.
그런데 dags_branch_python_operator DAG에는 그 시간에 수행된 Job이 없을테니 dags_external_task_sensor의 3개 task 모두 무한 대기 하는 것입니다.
external task sensor는 manual 로 수행했을 때 저런 문제가 있어요. 일단 실습을 제대로 해보려면 dags_external_task_sensor의 수행 이력 중 RUN_ID가 scheduled__2024-01-06T22:00:00+00:00인 RUN이 있을 거에요. 해당 Run을 선택하시고 clear 버튼을 눌러 재수행해보시겠어요? 그럼 sensing 대상 DAG인 dags_branch_python_operator DAG의 01/07 01:00 (KST)에 수행된 task를 sensing하게 될겁니다.
(물론 저 시간대에 수행된 이력이 있어야 합니다)