해결된 질문
작성
·
74
0
with DAG(
dag_id="shedule_3_hour",
schedule="3 * * * *",
start_date=pendulum.datetime(2023, 3, 1),
catchup=False
) as dag:
def select_fruit():
fruit = ['APPLE','BANANA','ORANGE','AVOCADO']
rand_int = random.randint(0,3)
time.sleep(10)
print(fruit[rand_int])
py_t1 = PythonOperator(
task_id='py_t1',
python_callable=select_fruit
)
py_t1
처음 DAG을 시작(PAUSE)했을때는 2024-10-06, 06:01:47 UTC입니다.
DAG이 2024-10-06, 06:03:00 UTC에 처음 시작될거라 예상했는데 왜 처음 DAG을 시작(PAUSE)한 시간인 2024-10-06, 06:01:47 UTC에 처음 실행하는지 궁금합니다.
참고로 처음 실행되었던 작업에서도 오퍼레이터가 정상 실행되었습니다.
물론 약 2분 후 2024-10-06, 06:03:00 UTC에 작업도 실행되었습니다.
답변 3
0
답변 감사합니다! 그림과 함께 설명해주셔서 한번에 이해할 수 있었습니다. 감사합니다.
추가 질문 사항입니다.
"Airflow DAG이 처음 parsing되어 올라올 때나 pause 후 unpause했을 때 Airflow는 마지막으로 돌았어어야 할 run을 수행합니다." 라고 해주셨습니다.
혹시 "Airflow DAG이 처음 parsing되어 올라올 때나 pause 후 unpause했을 때" Airflow는 마지막으로 돌았어어야 할 run을 수행하지 않고 이후에 실행될 run만 실행되도록 하는 방법이 있을까요?
예를 들어 질문의 예시에서, DAG을 시작(PAUSE)한 시간이 아니고 스케줄 시간인 2024-10-06, 06:03:00 UTC에 작업이 처음 실행되도록이요!
0
안녕하세요 가보자고님!
Airflow DAG이 처음 parsing되어 올라올 때나 pause 후 unpause했을 때
Airflow는 마지막으로 돌았어어야 할 run을 수행합니다.
아래 그림을 보시면 pause --> unpause 했을 때 마지막으로 수행되었어야 할 RUN은 빨간색으로 표시한 RUN3 입니다.
다시 가보자고님의 상황을 보면
6시 1분에 DAG을 재개한 것 같은데, 그러면 가장 마지막으로 수행되었어야 할 RUN은
5시 3분에 수행되었어야 할 job입니다.
그 Job의 data_interval_start 는 4시 3분입니다.
그래서 두 번째 캡처하신 그림에서 RUN ID가 scheduled__2024-10-06T04:03:00+00:00
으로 표시된 겁니다.
(RUN ID는 시간 부분을 data_interval_start 값으로 표현합니다)
그리고 2분이 지나 6시 3분이 되자 본 스케줄이 작동한 것이고 이 RUN의
data_interval_start의 시간은 5시 3분이 됩니다.
그래서 마지막 캡처하신 그림의 RUN ID가 scheduled__2024-10-06T05:03:00+00:00
으로 표시된 겁니다.
제가 그린 그림에서 RUN4에 해당하는 스케줄이 동작한 겁니다.
이해되셨는지요?
0
안녕하세요, 인프런 AI 인턴입니다.
질문 주신 내용을 바탕으로 DAG가 첫 실행 시점에 예상보다 빠르게 실행된 부분에 대해 답변을 드리겠습니다.
Airflow에서 DAG을 처음 시작(PAUSE 해제)한 경우, 특정 상황에 따라 예상 일정보다 빨리 DAG가 실행되는 경우가 있습니다. 이 현상은 주로 다음과 같은 이유로 발생합니다:
DAG 시작 시점과 스케줄링 시점 간의 오차: Airflow는 DAG이 활성화되면 현재 시점에 따라 가장 가까운 실행 시점을 계산하여 실행합니다. DAG이 PAUSE 해제와 동시에 등록되면서 직접 실행되었을 가능성이 있습니다.
DAG의 catchup=False 설정: catchup 파라미터가 False로 설정된 경우, 할당된 시작 날짜에 대해 과거의 실행을 채워주지 않습니다. 이런 경우 최신 상태를 기준으로 즉시 시작할 수 있습니다.
일정 변경 및 재시작: DAG의 schedule은 매분마다 실행되는 스케줄로 설정되어 있으며, 이는 Airflow가 설정된 매 시각 혹은 3분 단계마다 실행될 수 있습니다. 그러나 실질적으로 해당 시점의 서버 로드 상태나 기타 조건들에 의해 실제로는 조금 빠르게 시작될 수 있습니다.
말씀해주신 이미지를 보면, 해당 DAG은 처음 실행(PAUSE 해제) 시점에 자동으로 실행된 후 정상적으로 그 다음 예정된 시각(2024-10-06, 06:03:00 UTC)에 한 번 더 실행된 것으로 보입니다. 이는 Airflow의 기본적인 스케줄러 작동 방식 때문일 수 있습니다.
이와 관련하여 추가적인 내용이나 직접적인 설정을 확인하려면 아래와 같은 게시물을 참고해 보실 수 있습니다:
저는 질문자님의 이해를 돕고자 지식공유자님의 기존 답변을 참고하여 유사한 답변 링크를 추천해드리고 있습니다.
현재 베타 기능으로 답변이 만족스럽지 않을 수 있는 점 양해 부탁드립니다. 🙏
추가적으로 궁금한 점이 있으시면, 이어서 질문해 주세요. 곧 지식공유자께서 답변해 주실 것입니다.
아쉽게도 airflow에 아직 그런 기능은 없는 듯 합니다.
꼭 그렇게 해야한다면 DAG 파라미터 중 start_date 값을 이용할 수 있습니다.
어제 제가 올렸던 그림에서 unpause 했을 때 RUN3 (빨간색)이 실행되는 상황을 보면,
RUN3이 실행될 수 있는 조건은 DAG의 start_date 값이 RUN3의 data_interval_start 값보다 이전이여야 합니다.
따라서 1시간 마다 도는 DAG의 경우 start_date 값을 아래처럼 수정해보세요.
pendulum.now("Asia/Seoul").subtract(hours=1)
그럼 unpause 했을 때 마지막으로 돌았어야 할 RUN은 수행되지 않습니다.
다만 이 방법은 나중에 backfill 등 작업시 영향을 줄 수 있어서 권장하진 않습니다.