작성
·
69
·
수정됨
0
안녕하세요,
인강 수강후 현업에서 Airflow 를 통해서 Dag 을 수행하고 있는데요,
서비스 PM 등의 이유로 수시간(3~5시간) 정도 pause 를 진행하고 다시 unpause 를 하게 되는 경우가 있습니다.
이럴때 unpasue 를 클릭하게 된다면 비록 Dag 에 catchup = false 로 지정해 놓더라도, 가장 최근에 수행 되었어야 할 Dag Run 은 수행되게 되는데요
Ex )
schedule_interval = 10,20,30 1,2,3 * * * *
Dag.catchup = False
pause = 01:15
unpause = 03:25
수행되는 Dag run 의 data_interval_end = 03:20
그런데 여기서 3시 20 분 dag_run 을 수행시키지 않기 위해서 생각을 하다가, Dag 의 default_args 의 start_date 값을 datetime.now 로 주는것에 대해서 생각을 해봤는데요, 당장 간단하게 테스트를 했을때는 큰 문제가 없었는데, 혹시 해당 케이스가 문제가 되는 케이스가 있을까요?
그리고 start_date 를 now 로 하는것 말고도 다른 방안이 있다면 좋은 방안 부탁 드립니다
감사합니다!
답변 1
0
안녕하세요 qkswl6263님
음 start_date 값을 now로 주면 dag이 수행되나요? dag의 data_interval_end 가 도래하여 dag을 수행되려면 start_date 가 최소 한 주기보다 뒤에 있어야 수행되어야 할텐데 start_date가 계속 이동하기 때문에 dag이 제대로 수행되지 않을 것 같아서요.
테스트하실 때 큰 문제 없다고 하셨는데, 혹시 주기를 몇 분으로 하셨나요?
주기를 10분 정도로 늘려서 테스트 한번 해보세요.
해결방안으로는 생각나는 것 중 하나는 dag의 마지막 dag_run 일자를 가져오는 방법이 있습니다.
마지막 수행된 run의 data_interval_end 값(=last_data_interval_end)이 이번에 수행되는 스케줄의 data_interval_start 와 같다면 실행하도록 하고 아니면 종료시키는 방법은 어떤지요?
Airflow 기본 오퍼레이터 중 ShortCircuitOperator 오퍼레이터가 있습니다. (강의에서는 짧게 소개만 하고 실제 실습하지는 않았습니다)
이 오퍼레이터는 python 오퍼레이터에서 파생된건데, 조건이 맞으면 하위 태스크를 실행시키지 않고 dag 수행을 종료시키는 오퍼레이터입니다.
이 오퍼레이터를 사용하고, python_callable 함수에는 위에서 설명한 last_data_interval_end 값과 이번 스케줄의 data_interval_start 를 비교, True or False를 반환해주도록 하면 될 것 같습니다.
from airflow.models import DagRun
def short_circuit(**kwargs):
dag_runs = DagRun.find(dag_id='dag id')
이렇게 하면 dagRun 인스턴스를 가져오는데, dag_runs는 현 스케줄을 포함하여 지금까지 수행한 RUN 정보를 담고있습니다. (리스트로 출력됨)
그래서 아래처럼 dag_runs를 data_interval_end 기준 역순 정렬하고 두 번째 값을 꺼내면 마지막으로 수행됐던 RUN을 가져올 수 있습니다.
dag_runs.sort(key=lambda x: x.data_interval_end, reverse=True)
# dag_runs[0]: 현재 스케줄 시간
# dag_runs[1]: 이전 스케줄 시간
last_data_interval_end = dag_runs[1].data_interval_end
남은건 본 스케줄의 data_interval_start or end 값과 위에서 얻은 last_data_interval_end 와 비교하여 건너띄어 실행된 스케줄이라면 False, 이어서 실행된 스케줄이라면 True를 리턴하도록 작성하시면 될 것 같습니다.
이해 되시는지요?
안녕하세요! 상세한 코드와 답변 감사합니다.
일단, 테스트했던 start_date = datetime.now() 는.. 안 되는게 맞네요.. 테스트 할 때 Dag 반영이 제대로 되지 않은 상태에서 테스트 한 것 같습니다.
먼저 말씀 주신 내용은 이해 했고, 관련해서 조금 생각을 해보았는데요,
제가 기존에 예시를 들었던 3시 20분 배치의 경우에는 가운데에 여러 배치가 있기 때문에 돌지 않고 그 다음 배치부터 제대로 이전 data_interval_end = data_interval_start 조건을 충족 시킬 것으로 보이는데요,
다만, 만약 가운데 낀 배치가 1개만 있는 경우에는 해당 작업이 큰 의미를 가지지 못할 것으로 보입니다 .
Ex )
schedule_interval = 10,20,30 1,2,3
Dag.catchup = False
pause = 01:15
unpause = 01:25
이전 Dag 의 data_interval_end = 01:10
현재 Dag 의 data_interval_start = 01:10
현재 Dag 의 data_interval_end = 01:20 -> 수행
이런 느낌으로 갈것 같아서요. ..
현재 실무가 인강을 들을 때처럼 파이프라인을 위한 배치 보다는 스케줄링에 초점이 맞춰 수행되다 보니, 저런것 하나도 재수행을 하지 않거나, 재수행을 하더라도 실 로직을 타지 않았으면 하는 생각이 있는데, 그게 쉽지 않네요..
제가 좀 더 찾아 보고 적용할수 있도록 하겠습니다. ㅎㅎ
빠른 시간 , 자세한 답변 감사드립니다!