인프런 커뮤니티 질문&답변

김유민님의 프로필 이미지
김유민

작성한 질문수

Airflow 마스터 클래스

Custom Hook 만들고 bulk_load하기

sqlalchemy.create_engine으로 생성한 객체에 cursor가 없습니다.

작성

·

3.5K

·

수정됨

0

전체 에러 로그:

8fbc4f9f281a

*** Found local files:

*** * /opt/airflow/logs/dag_id=dags_python_with_custom_hook_bulk_load/run_id=manual__2024-03-06T03:55:34.102649+00:00/task_id=insrt_postgres/attempt=1.log

[2024-03-06T03:55:35.452+0000] {taskinstance.py:1979} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: dags_python_with_custom_hook_bulk_load.insrt_postgres manual__2024-03-06T03:55:34.102649+00:00 [queued]>

[2024-03-06T03:55:35.456+0000] {taskinstance.py:1979} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: dags_python_with_custom_hook_bulk_load.insrt_postgres manual__2024-03-06T03:55:34.102649+00:00 [queued]>

[2024-03-06T03:55:35.456+0000] {taskinstance.py:2193} INFO - Starting attempt 1 of 1

[2024-03-06T03:55:35.464+0000] {taskinstance.py:2214} INFO - Executing <Task(PythonOperator): insrt_postgres> on 2024-03-06 03:55:34.102649+00:00

[2024-03-06T03:55:35.469+0000] {standard_task_runner.py:60} INFO - Started process 262 to run task

[2024-03-06T03:55:35.471+0000] {standard_task_runner.py:87} INFO - Running: ['***', 'tasks', 'run', 'dags_python_with_custom_hook_bulk_load', 'insrt_postgres', 'manual__2024-03-06T03:55:34.102649+00:00', '--job-id', '620', '--raw', '--subdir', 'DAGS_FOLDER/dags_python_with custom_hook_bulk_load.py', '--cfg-path', '/tmp/tmp0zav_1t7']

[2024-03-06T03:55:35.472+0000] {standard_task_runner.py:88} INFO - Job 620: Subtask insrt_postgres

[2024-03-06T03:55:35.504+0000] {task_command.py:423} INFO - Running <TaskInstance: dags_python_with_custom_hook_bulk_load.insrt_postgres manual__2024-03-06T03:55:34.102649+00:00 [running]> on host 8fbc4f9f281a

[2024-03-06T03:55:35.555+0000] {taskinstance.py:2510} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='dags_python_with_custom_hook_bulk_load' AIRFLOW_CTX_TASK_ID='insrt_postgres' AIRFLOW_CTX_EXECUTION_DATE='2024-03-06T03:55:34.102649+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2024-03-06T03:55:34.102649+00:00'

[2024-03-06T03:55:35.556+0000] {custom_postgres_hook.py:24} INFO - 적재 대상 파일: /opt/***/files/TbCorona19CountStatus/20240306/TbCorona19CountStatus.csv

[2024-03-06T03:55:35.556+0000] {custom_postgres_hook.py:25} INFO - 테이블 :TbCorona19CountStatus_bulk2

[2024-03-06T03:55:35.562+0000] {base.py:83} INFO - Using connection ID 'conn-db-postgres-custom' for task execution.

[2024-03-06T03:55:35.582+0000] {custom_postgres_hook.py:34} INFO - TbCorona19CountStatus_bulk2.S_DT: 개행문자 제거

[2024-03-06T03:55:35.583+0000] {custom_postgres_hook.py:34} INFO - TbCorona19CountStatus_bulk2.T_DT: 개행문자 제거

[2024-03-06T03:55:35.583+0000] {custom_postgres_hook.py:38} INFO - 적재 건수:1212

[2024-03-06T03:55:35.583+0000] {custom_postgres_hook.py:40} INFO - postgresql://***:***@172.28.0.3/***

[2024-03-06T03:55:35.587+0000] {warnings.py:110} WARNING - /opt/***/plugins/hooks/custom_postgres_hook.py:43: UserWarning: pandas only supports SQLAlchemy connectable (engine/connection) or database string URI or sqlite3 DBAPI2 connection. Other DBAPI2 objects are not tested. Please consider using SQLAlchemy.

file_df.to_sql(name = table_name,

[2024-03-06T03:55:35.589+0000] {taskinstance.py:2728} ERROR - Task failed with exception

Traceback (most recent call last):

File "/home/airflow/.local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 444, in executetask

result = executecallable(context=context, **execute_callable_kwargs)

^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

File "/home/airflow/.local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 414, in executecallable

return execute_callable(context=context, **execute_callable_kwargs)

^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

File "/home/airflow/.local/lib/python3.11/site-packages/airflow/operators/python.py", line 200, in execute

return_value = self.execute_callable()

^^^^^^^^^^^^^^^^^^^^^^^

File "/home/airflow/.local/lib/python3.11/site-packages/airflow/operators/python.py", line 217, in execute_callable

return self.python_callable(*self.op_args, **self.op_kwargs)

^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

File "/opt/airflow/dags/dags_python_with custom_hook_bulk_load.py", line 14, in insrt_postgres

custom_postgres_hook.bulk_load(table_name=tbl_nm, file_name=file_nm, delimiter=',',

File "/opt/airflow/plugins/hooks/custom_postgres_hook.py", line 43, in bulk_load

file_df.to_sql(name = table_name,

File "/home/airflow/.local/lib/python3.11/site-packages/pandas/util/_decorators.py", line 333, in wrapper

return func(*args, **kwargs)

^^^^^^^^^^^^^^^^^^^^^

File "/home/airflow/.local/lib/python3.11/site-packages/pandas/core/generic.py", line 3084, in to_sql

return sql.to_sql(

^^^^^^^^^^^

File "/home/airflow/.local/lib/python3.11/site-packages/pandas/io/sql.py", line 842, in to_sql

return pandas_sql.to_sql(

^^^^^^^^^^^^^^^^^^

File "/home/airflow/.local/lib/python3.11/site-packages/pandas/io/sql.py", line 2848, in to_sql

table.create()

File "/home/airflow/.local/lib/python3.11/site-packages/pandas/io/sql.py", line 984, in create

if self.exists():

^^^^^^^^^^^^^

File "/home/airflow/.local/lib/python3.11/site-packages/pandas/io/sql.py", line 970, in exists

return self.pd_sql.has_table(self.name, self.schema)

^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

File "/home/airflow/.local/lib/python3.11/site-packages/pandas/io/sql.py", line 2863, in has_table

return len(self.execute(query, [name]).fetchall()) > 0

^^^^^^^^^^^^^^^^^^^^^^^^^^^

File "/home/airflow/.local/lib/python3.11/site-packages/pandas/io/sql.py", line 2670, in execute

cur = self.con.cursor()

^^^^^^^^^^^^^^^

AttributeError: 'Engine' object has no attribute 'cursor'

[2024-03-06T03:55:35.597+0000] {taskinstance.py:1149} INFO - Marking task as FAILED. dag_id=dags_python_with_custom_hook_bulk_load, task_id=insrt_postgres, execution_date=20240306T035534, start_date=20240306T035535, end_date=20240306T035535

[2024-03-06T03:55:35.605+0000] {standard_task_runner.py:107} ERROR - Failed to execute job 620 for task insrt_postgres ('Engine' object has no attribute 'cursor'; 262)

[2024-03-06T03:55:35.643+0000] {local_task_job_runner.py:234} INFO - Task exited with return code 1

[2024-03-06T03:55:35.655+0000] {taskinstance.py:3309} INFO - 0 downstream tasks scheduled from follow-on schedule check

결과로 cursor가 attribute로 없다는 오류가 발생합니다.

 

기존에 있던 이후 글은 글자수 제한으로 삭제합니다.

답변 3

0

김유민님의 프로필 이미지
김유민
질문자

버전을 변경했더니 성공하였습니다!!! ㅠㅠ

버전 문제였네요...

감사합니다!!

김현진님의 프로필 이미지
김현진
지식공유자

pandas 버전이 높아지면서 sqlalchemy랑 충돌이 있나보네요.

구글링 했을 때 나온 내용과 일치하는 것 같습니다.

 

(https://stackoverflow.com/questions/38332787/pandas-to-sql-to-sqlite-returns-engine-object-has-no-attribute-cursor)

image

아무튼 해결되서 다행입니다.

열공하세요 ^^

0

김유민님의 프로필 이미지
김유민
질문자

안녕하세요? 먼저 친절하고 빠른 피드백 감사합니다.

>>> import pandas as pd
>>> import sqlalchemy
>>> pd.__version__
'2.2.1'
>>> sqlalchemy.__version__
'1.4.51'

설치된 버전은 위와 같고, 원본 글에 전체 에러 로그를 추가해 놓았습니다.

버전 조합을 변경후 재시도 해보도록 하겠습니다!!

0

김현진님의 프로필 이미지
김현진
지식공유자

안녕하세요 김유민님!

환경은 강의에 나온 환경 그대로이죠?

먼저 아래 발생되었던 에러 로그 전체를 첨부해주시고, 작성하신 CustomPostgresHook 파일의 내용도 전체 올려주시겠어요?

 

AttributeError: 'Engine' object has no attribute 'cursor'

암튼 이 에러는 저도 처음보는 에러인데, 검색해보니 pandas와 sqlalchemy 간의 버전 충돌일 수 있을 듯 합니다. WSL 내 컨테이너 안으로 들어가서 아래와 같이 pandas 와 sqlalchemy 의 버전 확인 부탁드립니다.

(컨테이너ID는 sudo docker ps 명령으로 확인)

$ sudo docker exec -it {컨테이너ID} bash 

컨테이너 안으로 진입한 후 아래 두 명령을 치면 각각 버전이 출력될겁니다.

>>> import pandas as pd
>>> import sqlalchemy
>>> pd.__version__
>>> sqlalchemy.__version__

버전 확인 후 남겨주시겠어요?

참고로 저는 airflow에서 사용중인 버전은

pandas==2.0.3

sqlalchemy=1.4.50

위 버전 조합으로 테스트했을 때 이상없습니다.

김유민님의 프로필 이미지
김유민

작성한 질문수

질문하기