해결된 질문
작성
·
172
0
안녕하세요 강사님!
dags_python_with_templates.py 파일에서 show_templates 함수를 아무 인자도 넣지 않고 실행을 시키는데요, dag 실행시 출력되는 키워드 인자들은 에어플로우 태스크 객체에 기본으로 입력되는 값들인가요? 정확한 원리가 궁금하여 여쭤봅니다
답변 2
1
1
안녕하세요 방효석님!
맞습니다. 정확히는 @task 데커레이터가 넣어준다고 보시면 됩니다.
@task 데커레이터가 어떻게 넣어주는지 이해하기 전에 먼저 PythonOperator를 사용할 때를 생각해봅시다.
with DAG(...
) as dag:
def print_context(**context):
print('context start')
print(context)
print('context end')
t1 = PythonOperator(
task_id="print_context_from_python_operator",
python_callable=print_context
)
이렇게 작성해서 한번 돌려보시면 @task 데커레이터를 쓰지 않아도
context 에 값이 저장되어 print 되는 것을 볼 수 있습니다.
결국 PythonOperator 가 여러 기본 변수들을 context에 넣어준다고 볼 수 있겠죠.
그럼 이제 @task 데커레이터가 어떻게 넣어주는지 알려면 소스코드를 봐야 하는데, 좀 복잡합니다.
먼저 @task 키워드를 쓸때 사용되는 task는 task_decorator_factory 함수를 리턴하는 함수입니다.
def python_task(
python_callable: Callable | None = None,
multiple_outputs: bool | None = None,
**kwargs,
) -> TaskDecorator:
"""
Wrap a function into an Airflow operator.
Accepts kwargs for operator kwarg. Can be reused in a single DAG.
:param python_callable: Function to decorate
:param multiple_outputs: If set to True, the decorated function's return value will be unrolled to
multiple XCom values. Dict will unroll to XCom values with its keys as XCom keys. Defaults to False.
"""
return task_decorator_factory(
python_callable=python_callable,
multiple_outputs=multiple_outputs,
decorated_operator_class=_PythonDecoratedOperator,
**kwargs,
)
(https://github.com/apache/airflow/blob/main/airflow/decorators/python.py)
그리고 실습시 작성했던 아래 코드를 다시 생각해보면,
@task(task_id='python_task')
def show_templates(**kwargs):
from pprint import pprint
pprint(kwargs)
show_templates()
show_templates() 실행시 task 데커레이터의 outer function에 해당하는 python_task 가 실행되게 됩니다.
위에서 python_task는 task_decorator_factory 함수를 리턴한다고 말씀드렸죠?
task_decorator_factory 함수는 아래처럼 생겼습니다. 좀 복잡하죠?
def task_decorator_factory(
python_callable: Callable | None = None,
*,
multiple_outputs: bool | None = None,
decorated_operator_class: type[BaseOperator],
**kwargs,
) -> TaskDecorator:
"""Generate a wrapper that wraps a function into an Airflow operator.
Can be reused in a single DAG.
:param python_callable: Function to decorate.
:param multiple_outputs: If set to True, the decorated function's return
value will be unrolled to multiple XCom values. Dict will unroll to XCom
values with its keys as XCom keys. If set to False (default), only at
most one XCom value is pushed.
:param decorated_operator_class: The operator that executes the logic needed
to run the python function in the correct environment.
Other kwargs are directly forwarded to the underlying operator class when
it's instantiated.
"""
if multiple_outputs is None:
multiple_outputs = cast(bool, attr.NOTHING)
if python_callable:
decorator = _TaskDecorator(
function=python_callable,
multiple_outputs=multiple_outputs,
operator_class=decorated_operator_class,
kwargs=kwargs,
)
return cast(TaskDecorator, decorator)
elif python_callable is not None:
raise TypeError("No args allowed while using @task, use kwargs instead")
def decorator_factory(python_callable):
return _TaskDecorator(
function=python_callable,
multiple_outputs=multiple_outputs,
operator_class=decorated_operator_class,
kwargs=kwargs,
)
return cast(TaskDecorator, decorator_factory)
(https://github.com/apache/airflow/blob/main/airflow/decorators/base.py)
참고로 task_decorator_factory는 데커레이터를 쉽게 만들게 해주는 함수입니다. task_decorator_factory를 이용하면 custom 한 데커레이터도 만들 수 있습니다.
어쨌든 task_decorator_factory를 이해하는게 중요한데 이 함수의 파라미터 중 decorated_operator_class 파라미터는 어떤 오퍼레이터를 실행하게 할 것인지를 결정합니다. 다시 위의 python_task 함수를 살펴보면 task_decorator_factory 를 리턴할 때 파라미터 중 decorated_operator_class=_PythonDecoratedOperator 를 전달하고 있습니다.
_PythonDecoratedOperator 클래스를 보면 DecoratedOperator와 PythonOperator를 다중 상속받고 있습니다.
class _PythonDecoratedOperator(DecoratedOperator, PythonOperator):
"""
Wraps a Python callable and captures args/kwargs when called for execution.
:param python_callable: A reference to an object that is callable
:param op_kwargs: a dictionary of keyword arguments that will get unpacked
in your function (templated)
:param op_args: a list of positional arguments that will get unpacked when
calling your callable (templated)
:param multiple_outputs: If set to True, the decorated function's return value will be unrolled to
multiple XCom values. Dict will unroll to XCom values with its keys as XCom keys. Defaults to False.
"""
(https://github.com/apache/airflow/blob/main/airflow/decorators/python.py)
쉽게 생각하면 PythonOperator + DecoratedOperator 라는 두 개의 부모를 상속한 _PythonDecoratedOperator가 정의돼있는데 우리가 @task 데커레이터를 쓰면
_PythonDecoratedOperator가 실행되는 구조입니다. 이 과정에서 결과적으로 PythonOperator를 실행하는 것과 같고 이 과정중에 context가 **kwargs에 전달된다고 보시면 됩니다.
코드가 좀 복잡한데 결국 @task 데커레이터는 PythonOperator를 실행하는 것과 같고 PythonOperator를 썼을 때도 기본 변수를 **kwargs 로 밀어넣어주고 있으므로 원리는 같다. 라고 생각하시면 됩니다.