작성
·
3.5K
·
수정됨
0
전체 에러 로그:
8fbc4f9f281a
*** Found local files:
*** * /opt/airflow/logs/dag_id=dags_python_with_custom_hook_bulk_load/run_id=manual__2024-03-06T03:55:34.102649+00:00/task_id=insrt_postgres/attempt=1.log
[2024-03-06T03:55:35.452+0000] {taskinstance.py:1979} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: dags_python_with_custom_hook_bulk_load.insrt_postgres manual__2024-03-06T03:55:34.102649+00:00 [queued]>
[2024-03-06T03:55:35.456+0000] {taskinstance.py:1979} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: dags_python_with_custom_hook_bulk_load.insrt_postgres manual__2024-03-06T03:55:34.102649+00:00 [queued]>
[2024-03-06T03:55:35.456+0000] {taskinstance.py:2193} INFO - Starting attempt 1 of 1
[2024-03-06T03:55:35.464+0000] {taskinstance.py:2214} INFO - Executing <Task(PythonOperator): insrt_postgres> on 2024-03-06 03:55:34.102649+00:00
[2024-03-06T03:55:35.469+0000] {standard_task_runner.py:60} INFO - Started process 262 to run task
[2024-03-06T03:55:35.471+0000] {standard_task_runner.py:87} INFO - Running: ['***', 'tasks', 'run', 'dags_python_with_custom_hook_bulk_load', 'insrt_postgres', 'manual__2024-03-06T03:55:34.102649+00:00', '--job-id', '620', '--raw', '--subdir', 'DAGS_FOLDER/dags_python_with custom_hook_bulk_load.py', '--cfg-path', '/tmp/tmp0zav_1t7']
[2024-03-06T03:55:35.472+0000] {standard_task_runner.py:88} INFO - Job 620: Subtask insrt_postgres
[2024-03-06T03:55:35.504+0000] {task_command.py:423} INFO - Running <TaskInstance: dags_python_with_custom_hook_bulk_load.insrt_postgres manual__2024-03-06T03:55:34.102649+00:00 [running]> on host 8fbc4f9f281a
[2024-03-06T03:55:35.555+0000] {taskinstance.py:2510} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='dags_python_with_custom_hook_bulk_load' AIRFLOW_CTX_TASK_ID='insrt_postgres' AIRFLOW_CTX_EXECUTION_DATE='2024-03-06T03:55:34.102649+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2024-03-06T03:55:34.102649+00:00'
[2024-03-06T03:55:35.556+0000] {custom_postgres_hook.py:24} INFO - 적재 대상 파일: /opt/***/files/TbCorona19CountStatus/20240306/TbCorona19CountStatus.csv
[2024-03-06T03:55:35.556+0000] {custom_postgres_hook.py:25} INFO - 테이블 :TbCorona19CountStatus_bulk2
[2024-03-06T03:55:35.562+0000] {base.py:83} INFO - Using connection ID 'conn-db-postgres-custom' for task execution.
[2024-03-06T03:55:35.582+0000] {custom_postgres_hook.py:34} INFO - TbCorona19CountStatus_bulk2.S_DT: 개행문자 제거
[2024-03-06T03:55:35.583+0000] {custom_postgres_hook.py:34} INFO - TbCorona19CountStatus_bulk2.T_DT: 개행문자 제거
[2024-03-06T03:55:35.583+0000] {custom_postgres_hook.py:38} INFO - 적재 건수:1212
[2024-03-06T03:55:35.583+0000] {custom_postgres_hook.py:40} INFO - postgresql://***:***@172.28.0.3/***
[2024-03-06T03:55:35.587+0000] {warnings.py:110} WARNING - /opt/***/plugins/hooks/custom_postgres_hook.py:43: UserWarning: pandas only supports SQLAlchemy connectable (engine/connection) or database string URI or sqlite3 DBAPI2 connection. Other DBAPI2 objects are not tested. Please consider using SQLAlchemy.
file_df.to_sql(name = table_name,
[2024-03-06T03:55:35.589+0000] {taskinstance.py:2728} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 444, in executetask
result = executecallable(context=context, **execute_callable_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 414, in executecallable
return execute_callable(context=context, **execute_callable_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.11/site-packages/airflow/operators/python.py", line 200, in execute
return_value = self.execute_callable()
^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.11/site-packages/airflow/operators/python.py", line 217, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/dags/dags_python_with custom_hook_bulk_load.py", line 14, in insrt_postgres
custom_postgres_hook.bulk_load(table_name=tbl_nm, file_name=file_nm, delimiter=',',
File "/opt/airflow/plugins/hooks/custom_postgres_hook.py", line 43, in bulk_load
file_df.to_sql(name = table_name,
File "/home/airflow/.local/lib/python3.11/site-packages/pandas/util/_decorators.py", line 333, in wrapper
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.11/site-packages/pandas/core/generic.py", line 3084, in to_sql
return sql.to_sql(
^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.11/site-packages/pandas/io/sql.py", line 842, in to_sql
return pandas_sql.to_sql(
^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.11/site-packages/pandas/io/sql.py", line 2848, in to_sql
table.create()
File "/home/airflow/.local/lib/python3.11/site-packages/pandas/io/sql.py", line 984, in create
if self.exists():
^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.11/site-packages/pandas/io/sql.py", line 970, in exists
return self.pd_sql.has_table(self.name, self.schema)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.11/site-packages/pandas/io/sql.py", line 2863, in has_table
return len(self.execute(query, [name]).fetchall()) > 0
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.11/site-packages/pandas/io/sql.py", line 2670, in execute
cur = self.con.cursor()
^^^^^^^^^^^^^^^
AttributeError: 'Engine' object has no attribute 'cursor'
[2024-03-06T03:55:35.597+0000] {taskinstance.py:1149} INFO - Marking task as FAILED. dag_id=dags_python_with_custom_hook_bulk_load, task_id=insrt_postgres, execution_date=20240306T035534, start_date=20240306T035535, end_date=20240306T035535
[2024-03-06T03:55:35.605+0000] {standard_task_runner.py:107} ERROR - Failed to execute job 620 for task insrt_postgres ('Engine' object has no attribute 'cursor'; 262)
[2024-03-06T03:55:35.643+0000] {local_task_job_runner.py:234} INFO - Task exited with return code 1
[2024-03-06T03:55:35.655+0000] {taskinstance.py:3309} INFO - 0 downstream tasks scheduled from follow-on schedule check
결과로 cursor가 attribute로 없다는 오류가 발생합니다.
기존에 있던 이후 글은 글자수 제한으로 삭제합니다.
답변 3
0
0
안녕하세요? 먼저 친절하고 빠른 피드백 감사합니다.
>>> import pandas as pd
>>> import sqlalchemy
>>> pd.__version__
'2.2.1'
>>> sqlalchemy.__version__
'1.4.51'
설치된 버전은 위와 같고, 원본 글에 전체 에러 로그를 추가해 놓았습니다.
버전 조합을 변경후 재시도 해보도록 하겠습니다!!
0
안녕하세요 김유민님!
환경은 강의에 나온 환경 그대로이죠?
먼저 아래 발생되었던 에러 로그 전체를 첨부해주시고, 작성하신 CustomPostgresHook 파일의 내용도 전체 올려주시겠어요?
AttributeError: 'Engine' object has no attribute 'cursor'
암튼 이 에러는 저도 처음보는 에러인데, 검색해보니 pandas와 sqlalchemy 간의 버전 충돌일 수 있을 듯 합니다. WSL 내 컨테이너 안으로 들어가서 아래와 같이 pandas 와 sqlalchemy 의 버전 확인 부탁드립니다.
(컨테이너ID는 sudo docker ps 명령으로 확인)
$ sudo docker exec -it {컨테이너ID} bash
컨테이너 안으로 진입한 후 아래 두 명령을 치면 각각 버전이 출력될겁니다.
>>> import pandas as pd
>>> import sqlalchemy
>>> pd.__version__
>>> sqlalchemy.__version__
버전 확인 후 남겨주시겠어요?
참고로 저는 airflow에서 사용중인 버전은
pandas==2.0.3
sqlalchemy=1.4.50
위 버전 조합으로 테스트했을 때 이상없습니다.
pandas 버전이 높아지면서 sqlalchemy랑 충돌이 있나보네요.
구글링 했을 때 나온 내용과 일치하는 것 같습니다.
(https://stackoverflow.com/questions/38332787/pandas-to-sql-to-sqlite-returns-engine-object-has-no-attribute-cursor)
아무튼 해결되서 다행입니다.
열공하세요 ^^