작성
·
525
·
수정됨
1
airflow와 postgres 간의 connection 오류 문제입니다.
airflow UI -> admin-> connections에서 postgres 연결설정
docker-compose.yaml 설정
dag 코드
입력 airflow tasks test postgres_loader execute_sql_query 2023-01-01 시에 오류가 뜹니다ㅠ
[2024-06-21T15:40:45.514+0900] {dagbag.py:545} INFO - Filling up the DagBag from /home/kim/airflow/dags [2024-06-21T15:40:45.805+0900] {taskinstance.py:2076} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: postgres_loader.execute_sql_query __airflow_temporary_run_2024-06-21T06:40:45.755970+00:00__ [None]> [2024-06-21T15:40:45.811+0900] {taskinstance.py:2076} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: postgres_loader.execute_sql_query __airflow_temporary_run_2024-06-21T06:40:45.755970+00:00__ [None]> [2024-06-21T15:40:45.812+0900] {taskinstance.py:2306} INFO - Starting attempt 1 of 1 [2024-06-21T15:40:45.812+0900] {taskinstance.py:2388} WARNING - cannot record queued_duration for task execute_sql_query because previous state change time has not been saved [2024-06-21T15:40:45.813+0900] {taskinstance.py:2330} INFO - Executing <Task(PostgresOperator): execute_sql_query> on 2023-01-01 00:00:00+00:00 [2024-06-21T15:40:45.855+0900] {taskinstance.py:2648} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='postgres_loader' AIRFLOW_CTX_TASK_ID='execute_sql_query' AIRFLOW_CTX_EXECUTION_DATE='2023-01-01T00:00:00+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='__airflow_temporary_run_2024-06-21T06:40:45.755970+00:00__' [2024-06-21T15:40:45.858+0900] {taskinstance.py:430} INFO - ::endgroup:: [2024-06-21T15:40:45.870+0900] {sql.py:276} INFO - Executing: INSERT INTO sample_table (key, value) VALUES ('hello', 'world') [2024-06-21T15:40:45.875+0900] {taskinstance.py:441} INFO - ::group::Post task execution logs [2024-06-21T15:40:45.875+0900] {taskinstance.py:2905} ERROR - Task failed with exception Traceback (most recent call last): File "/home/kim/.local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 465, in _execute_task result = _execute_callable(context=context, **execute_callable_kwargs) File "/home/kim/.local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 432, in _execute_callable return execute_callable(context=context, **execute_callable_kwargs) File "/home/kim/.local/lib/python3.10/site-packages/airflow/models/baseoperator.py", line 401, in wrapper return func(self, *args, **kwargs) File "/home/kim/.local/lib/python3.10/site-packages/airflow/providers/common/sql/operators/sql.py", line 277, in execute hook = self.get_db_hook() File "/home/kim/.local/lib/python3.10/site-packages/airflow/providers/common/sql/operators/sql.py", line 188, in get_db_hook return self._hook File "/usr/lib/python3.10/functools.py", line 981, in __get__ val = self.func(instance) File "/home/kim/.local/lib/python3.10/site-packages/airflow/providers/common/sql/operators/sql.py", line 150, in _hook conn = BaseHook.get_connection(conn_id) File "/home/kim/.local/lib/python3.10/site-packages/airflow/hooks/base.py", line 83, in get_connection conn = Connection.get_connection_from_secrets(conn_id) File "/home/kim/.local/lib/python3.10/site-packages/airflow/models/connection.py", line 519, in get_connection_from_secrets raise AirflowNotFoundException(f"The conn_id `{conn_id}` isn't defined") airflow.exceptions.AirflowNotFoundException: The conn_id `my_postgres_connection` isn't defined
답변 4
0
airflow의 postgres와 따로 구성한 postgres 두 개가 존재하여 airflow의 postgres를 사용하기로 했고 docker-compose.yml파일을 설정하여 postgres 설정을 바꿨습니다. 그래도 connection을 못 찾네요ㅜㅜ 어떻게 해야 할까요
일단 선생님께서 말씀해주신 알려주신 방법을 확인해본 결과 "요 부분" 설정을 변경했습니다. 또한 두 번째와 세 번째 말씀해준 신 것 확인 결과 이상 없었습니다. 그래도 오류가 발생했습니다.
"connection to server at "localhost" (::1), port 5432"(아래 로그 첨부했습니다)
위 오류처럼 로컬 pc에서 listening 하고 있지 않은지 확인해보기 위해 확인해 본 결과
잘 실행되고 있습니다.
현재 docker 위에 ubuntu 환경으로 docker-compose를 구성했습니다.
추가 확인이 필요한 설정이 있으면 말씀해주세요 ㅠ
[오류 로그]
b0ec75311c2c
*** Found local files:
*** * /opt/airflow/logs/dag_id=postgres_loader/run_id=manual__2024-06-27T06:34:54.878565+00:00/task_id=execute_sql_query/attempt=1.log
[2024-06-27T06:34:57.200+0000] {local_task_job_runner.py:120} INFO - ::group::Pre task execution logs
[2024-06-27T06:34:57.232+0000] {taskinstance.py:2076} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: postgres_loader.execute_sql_query manual__2024-06-27T06:34:54.878565+00:00 [queued]>
[2024-06-27T06:34:57.241+0000] {taskinstance.py:2076} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: postgres_loader.execute_sql_query manual__2024-06-27T06:34:54.878565+00:00 [queued]>
[2024-06-27T06:34:57.242+0000] {taskinstance.py:2306} INFO - Starting attempt 1 of 1
[2024-06-27T06:34:57.260+0000] {taskinstance.py:2330} INFO - Executing <Task(PostgresOperator): execute_sql_query> on 2024-06-27 06:34:54.878565+00:00
[2024-06-27T06:34:57.267+0000] {warnings.py:112} WARNING - /home/***/.local/lib/python3.12/site-packages/***/task/task_runner/standard_task_runner.py:61: DeprecationWarning: This process (pid=131) is multi-threaded, use of fork() may lead to deadlocks in the child.
pid = os.fork()
[2024-06-27T06:34:57.268+0000] {standard_task_runner.py:63} INFO - Started process 133 to run task
[2024-06-27T06:34:57.269+0000] {standard_task_runner.py:90} INFO - Running: ['***', 'tasks', 'run', 'postgres_loader', 'execute_sql_query', 'manual__2024-06-27T06:34:54.878565+00:00', '--job-id', '143', '--raw', '--subdir', 'DAGS_FOLDER/postgres_loader.py', '--cfg-path', '/tmp/tmpehr9vin2']
[2024-06-27T06:34:57.272+0000] {standard_task_runner.py:91} INFO - Job 143: Subtask execute_sql_query
[2024-06-27T06:34:57.330+0000] {task_command.py:426} INFO - Running <TaskInstance: postgres_loader.execute_sql_query manual__2024-06-27T06:34:54.878565+00:00 [running]> on host b0ec75311c2c
[2024-06-27T06:34:57.450+0000] {taskinstance.py:2648} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='postgres_loader' AIRFLOW_CTX_TASK_ID='execute_sql_query' AIRFLOW_CTX_EXECUTION_DATE='2024-06-27T06:34:54.878565+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2024-06-27T06:34:54.878565+00:00'
[2024-06-27T06:34:57.453+0000] {taskinstance.py:430} INFO - ::endgroup::
[2024-06-27T06:34:57.454+0000] {sql.py:276} INFO - Executing:
INSERT INTO sample_table (key, value)
VALUES ('hello', 'world')
[2024-06-27T06:34:57.465+0000] {base.py:84} INFO - Using connection ID 'my_postgres_connection' for task execution.
[2024-06-27T06:34:57.478+0000] {base.py:84} INFO - Using connection ID 'my_postgres_connection' for task execution.
[2024-06-27T06:34:57.479+0000] {taskinstance.py:441} INFO - ::group::Post task execution logs
[2024-06-27T06:34:57.480+0000] {taskinstance.py:2905} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 465, in _execute_task
result = _execute_callable(context=context, **execute_callable_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 432, in _execute_callable
return execute_callable(context=context, **execute_callable_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/baseoperator.py", line 401, in wrapper
return func(self, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/common/sql/operators/sql.py", line 282, in execute
output = hook.run(
^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/common/sql/hooks/sql.py", line 431, in run
with self._create_autocommit_connection(autocommit) as conn:
File "/usr/local/lib/python3.12/contextlib.py", line 137, in __enter__
return next(self.gen)
^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/common/sql/hooks/sql.py", line 551, in _create_autocommit_connection
with closing(self.get_conn()) as conn:
^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/postgres/hooks/postgres.py", line 175, in get_conn
self.conn = psycopg2.connect(**conn_args)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/psycopg2/__init__.py", line 122, in connect
conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
psycopg2.OperationalError: connection to server at "localhost" (::1), port 5432 failed: Connection refused
Is the server running on that host and accepting TCP/IP connections?
connection to server at "localhost" (127.0.0.1), port 5432 failed: Connection refused
Is the server running on that host and accepting TCP/IP connections?
[2024-06-27T06:34:57.493+0000] {taskinstance.py:1206} INFO - Marking task as FAILED. dag_id=postgres_loader, task_id=execute_sql_query, run_id=manual__2024-06-27T06:34:54.878565+00:00, execution_date=20240627T063454, start_date=20240627T063457, end_date=20240627T063457
[2024-06-27T06:34:57.509+0000] {standard_task_runner.py:110} ERROR - Failed to execute job 143 for task execute_sql_query (connection to server at "localhost" (::1), port 5432 failed: Connection refused
Is the server running on that host and accepting TCP/IP connections?
connection to server at "localhost" (127.0.0.1), port 5432 failed: Connection refused
Is the server running on that host and accepting TCP/IP connections?
; 133)
[2024-06-27T06:34:57.523+0000] {local_task_job_runner.py:240} INFO - Task exited with return code 1
[2024-06-27T06:34:57.536+0000] {warnings.py:112} WARNING - /home/***/.local/lib/python3.12/site-packages/***/models/baseoperator.py:1297: AirflowProviderDeprecationWarning: Call to deprecated class PostgresOperator. (Please use `***.providers.common.sql.operators.sql.SQLExecuteQueryOperator`.Also, you can provide `hook_params={'schema': <database>}`.)
result = cls.__new__(cls)
[2024-06-27T06:34:57.556+0000] {taskinstance.py:3503} INFO - 0 downstream tasks scheduled from follow-on schedule check
[2024-06-27T06:34:57.561+0000] {local_task_job_runner.py:222} INFO - ::endgroup::
오류가 난 로그입니다.
윈도우의 postgres 가 아닌 docker위에 postgres를 사용하는 걸로 알고 있는데 이게 맞나요? 윈도우에는 DBeaver만 깔고 postgres는 깔지 않았습니다
0
안녕하세요 style12385님,
개인적인 사정으로 인해 답변이 늦었습니다. 에러 메세지를 보면 "The conn_id my_postgres_connection
isn't defined"가 문제인 것 같은데, 이 말은 Airflow가 my_postgres_connection
라는 정의를 못 찾은 것 같은데, 이말은 Airlfow UI 데이타베이스와 Airflow worker가 서로 다른 곳을 액세스하는 것 같아 보입니다.
connection이 잘 저장 되었는지, Airflow UI가 저장하는 곳이 Airflow worker가 접근하는 데이터 베이스가 같은지 확인을 해 보셔야겠습니다.
도움이 되셨길 바라겠습니다.
0
0
안녕하세요, 인프런 AI 인턴입니다.
style12385님의 질문과 관련하여, Docker 컨테이너에서 Airflow와 PostgreSQL 사이의 연결 설정에 문제가 발생한 것 같습니다. 질문 내용을 통해, Airflow에서 Postgres 연결 설정을 하고 docker-compose 파일을 사용하여 서비스를 구성하였으나, airflow tasks test
명령을 실행할 때 my_postgres_connection
에 대한 AirflowNotFoundException 오류가 발생했음을 확인할 수 있습니다. 이는 Airflow가 my_postgres_connection
연결 구성을 찾을 수 없다는 것을 의미합니다.
유사한 문제를 다룬 기존 답변 링크를 아래와 같이 추천드립니다. 이 링크들은 Docker와 Airflow 환경에서의 PostgreSQL 연결 설정 문제에 대한 해결책을 제공할 수 있습니다:
Docker 에서 airflow 사용시 질문드립니다: 이 게시물에서는 Docker 이미지를 사용하여 Airflow 서버를 만들고 PostgreSQL 서버와의 연결 문제에 대해 설명합니다. 제시된 해결책 중 하나는 docker-compose.yaml
파일에서 host.docker.internal
을 사용하여 Docker 내부에서 호스트의 localhost에 접근하는 방법을 설명합니다.
airflow tasks test error: 이 논의에서는 airflow tasks test
를 실행할 때 발생하는 오류와 관련하여, Airflow의 연결 설정을 확인하고, 필요한 경우 UI를 통해 올바르게 설정하도록 권장합니다.
위의 링크들은 Airflow와 PostgreSQL 간의 연결 문제 해결에 도움을 줄 수 있는 정보를 제공하는데, style12385님의 경우, my_postgres_connection
이 Airflow 시스템 내에서 올바르게 정의되었는지 확인하는 것이 중요합니다. Airflow Admin UI에서 Connections
섹션에서 이 연결 설정이 올바르게 등록되어 있는지 확인하십시오. 또한, 연결의 호스트, 포트, 사용자 이름 및 암호가 PostgreSQL 서비스에 정의된 값과 일치하는지 확인해야 합니다.
이 과정에서, docker-compose 파일, Airflow의 DAG 정의 및 my_postgres_connection
설정을 검토하여 문제의 원인을 파악하는 것이 중요합니다. 설정의 일관성과 정확성을 확인하십시오.
저는 질문자님의 이해를 돕고자 지식공유자님의 기존 답변을 참고하여 유사한 답변 링크를 추천해드리고 있습니다.
현재 베타 기능으로 답변이 만족스럽지 않을 수 있는 점 양해 부탁드립니다. 🙏
추가적으로 궁금한 점이 있으시면, 이어서 질문해 주세요. 곧 지식공유자께서 답변해 주실 것입니다.
제가 설정을 봐야 확실히 알 것 같은데, 중요한 점은 https://github.com/dimz119/learn-apache-airflow/blob/main/docker-compose.yaml#L58-L61 요 부분을 보셔서 모두
postgres
로 포인팅이 되어 있다면,docker ps
를 치셔서 혹시라도 다른 postgres가 돌고 있는지 확인해 보세요. 혹시 다른 postgres가 돌고 있다면 다 지우셔야 됩니다.마지막으로
docker compose down
하셔서 다커가 안 돌고 있는 상태에서도 한번 DB 연결 테스트를 해보세요. 혹시라도 다커가 아닌 로컬에서 설치를 하셨다면 그 데이타 베이스가 돌고 있을 수 있습니다.