해결된 질문
24.06.22 23:27 작성
·
110
·
수정됨
0
안녕하세요, 수업을 대부분 수강하고 실제 현업에서 사용중에 있는데 문의사항이 있어서 질문 드립니다.
현재 상황은 이렇습니다.
DAG 구성
- 5분단위 스케줄링
-4개의 task
- task1 >> task2 >> task3 >> task4
- 각 task 별 timeout =5분
문제 상황은 task 2번이 한달에 한번씩 data 가 많아지면 5분까지 타임아웃이 걸릴때가 있는 것인데요,
이때 그다음 Dag run 이 수행되면서 task 2 번이 동시에 수행 되는 시간이 조금 있는데 그때 데이터 처리가 중복으로 처리되는 현상이 발생하게 됩니다. 그래서 가능하면 task2 을 동시에 돌리는걸 막고 싶었는데요,
처음 생각해낸 방법은 task_concurrency 옵션을 task 에 주어서 1개만 돌수 있게 바꾸고 timeout 을 조금더 넉넉하게 주려고 했으나, 만에하나 해당 task 가 10분이상 걸린다면 dag run 이 수행되고있는것 제외 2개가 더 웨이팅을 하는것이 되고, 이게 누적이 될수도 있을것으로 보여서 문제로 인지 했습니다.
서비스 적으로 5분내에 돌수 있게 하거나, 아니면 5분 스케줄링을 변경하는 방법을 고려해야 하지만 해당 고려 없이 혹시 airflow 단에서 할수 있는 작업이 있을까요?
ex . runninng 중인 task 와 대기중인 task 가 하나정도 있다면 해당 task 는 스킵하는 옵션 등입니다..
답변 1
0
2024. 06. 23. 21:36
안녕하세요 qkswl6253님
현업에서 잘 사용중이신거 같아 저도 기분이 좋습니다 ^^
우선 위 같은 상황에서 일반적으로 task에 max_active_tis_per_dag=1 옵션을 줄 수 있습니다. 참고로 언급하신 task_concurrency 옵션은 향후 deprecated될 예정이고 사실 max_activive_tis_per_dag 옵션과 같습니다.
그런데 이 옵션을 적용하면 말씀하신 바와 같이 다른 DagRun의 task는 모두 queue에 대기하게 됩니다. 동시에 꼭 하나의 task만 수행되도록 해야한다면 제 생각에 branch operator를 사용해보는게 어떨까 합니다.
1. BranchPythonOperator
2. @task.branch
3. BranchOperator 상속하여 재정의
branch operator 만드는 방법은 위 3가지 중 하나를 이용하고요, 이 branch operator를 이용해 만든 task를 task_branch라 해볼께요.
task_branch의 로직을 어떻게 작성하냐가 어려운데, 방법은 여러 가지가 있지만
task2가 시작/종료되면 시작되었다는 정보를 어딘가에 저장해주면 좋을 듯 합니다. (variable에 특정 key/value로 저장해줘도 좋습니다, 또는 특정 공유 디렉토리에 파일로 기입해도 좋습니다)
그리고 task_branch는 그 정보를 참고해서 task2가 아직 수행중인지 여부를 판단한 후 그 결과에 따라 task branch는 아래처럼 로직을 분기해줄 수 있습니다.
if 'task_2가 미수행중이면':
return 'task_2'
else:
return 'task_empty'
그리고 EmptyOperator를 하나 이용한다면 (task_empty라 명명) task 연결을 아래처럼 해줄 수 있습니다.
task1 >> task_branch >> [task2, task_empty] >> task3 >> task4
다만 task3의 trigger_rule='one_success'로 주어야 합니다. 그래야 task2가 skip되든, task_empty가 skip되든 task3가 수행됩니다. 좀 복잡하긴 하지만 이렇게 작성하면 DagRun의 task2 수행여부에 따라 분기처리가 가능할 듯 합니다.
잘 해결되면 좋겠네요 ^^