인프런 영문 브랜드 로고
인프런 영문 브랜드 로고

인프런 커뮤니티 질문&답변

style12385님의 프로필 이미지
style12385

작성한 질문수

실리콘밸리 엔지니어와 함께하는 Apache Airflow

Provider(프로바이더)에 대해 알아보기

airflow와 postgres간의 connection 오류

작성

·

525

·

수정됨

1

airflow와 postgres 간의 connection 오류 문제입니다.

  1. airflow UI -> admin-> connections에서 postgres 연결설정

  2. docker-compose.yaml 설정

     

  3. dag 코드

  4. 입력 airflow tasks test postgres_loader execute_sql_query 2023-01-01 시에 오류가 뜹니다ㅠ

  5. [2024-06-21T15:40:45.514+0900] {dagbag.py:545} INFO - Filling up the DagBag from /home/kim/airflow/dags [2024-06-21T15:40:45.805+0900] {taskinstance.py:2076} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: postgres_loader.execute_sql_query __airflow_temporary_run_2024-06-21T06:40:45.755970+00:00__ [None]> [2024-06-21T15:40:45.811+0900] {taskinstance.py:2076} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: postgres_loader.execute_sql_query __airflow_temporary_run_2024-06-21T06:40:45.755970+00:00__ [None]> [2024-06-21T15:40:45.812+0900] {taskinstance.py:2306} INFO - Starting attempt 1 of 1 [2024-06-21T15:40:45.812+0900] {taskinstance.py:2388} WARNING - cannot record queued_duration for task execute_sql_query because previous state change time has not been saved [2024-06-21T15:40:45.813+0900] {taskinstance.py:2330} INFO - Executing <Task(PostgresOperator): execute_sql_query> on 2023-01-01 00:00:00+00:00 [2024-06-21T15:40:45.855+0900] {taskinstance.py:2648} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='postgres_loader' AIRFLOW_CTX_TASK_ID='execute_sql_query' AIRFLOW_CTX_EXECUTION_DATE='2023-01-01T00:00:00+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='__airflow_temporary_run_2024-06-21T06:40:45.755970+00:00__' [2024-06-21T15:40:45.858+0900] {taskinstance.py:430} INFO - ::endgroup:: [2024-06-21T15:40:45.870+0900] {sql.py:276} INFO - Executing: INSERT INTO sample_table (key, value) VALUES ('hello', 'world') [2024-06-21T15:40:45.875+0900] {taskinstance.py:441} INFO - ::group::Post task execution logs [2024-06-21T15:40:45.875+0900] {taskinstance.py:2905} ERROR - Task failed with exception Traceback (most recent call last): File "/home/kim/.local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 465, in _execute_task result = _execute_callable(context=context, **execute_callable_kwargs) File "/home/kim/.local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 432, in _execute_callable return execute_callable(context=context, **execute_callable_kwargs) File "/home/kim/.local/lib/python3.10/site-packages/airflow/models/baseoperator.py", line 401, in wrapper return func(self, *args, **kwargs) File "/home/kim/.local/lib/python3.10/site-packages/airflow/providers/common/sql/operators/sql.py", line 277, in execute hook = self.get_db_hook() File "/home/kim/.local/lib/python3.10/site-packages/airflow/providers/common/sql/operators/sql.py", line 188, in get_db_hook return self._hook File "/usr/lib/python3.10/functools.py", line 981, in __get__ val = self.func(instance) File "/home/kim/.local/lib/python3.10/site-packages/airflow/providers/common/sql/operators/sql.py", line 150, in _hook conn = BaseHook.get_connection(conn_id) File "/home/kim/.local/lib/python3.10/site-packages/airflow/hooks/base.py", line 83, in get_connection conn = Connection.get_connection_from_secrets(conn_id) File "/home/kim/.local/lib/python3.10/site-packages/airflow/models/connection.py", line 519, in get_connection_from_secrets raise AirflowNotFoundException(f"The conn_id `{conn_id}` isn't defined") airflow.exceptions.AirflowNotFoundException: The conn_id `my_postgres_connection` isn't defined

     

답변 4

0

style12385님의 프로필 이미지
style12385
질문자

airflow의 postgres와 따로 구성한 postgres 두 개가 존재하여 airflow의 postgres를 사용하기로 했고 docker-compose.yml파일을 설정하여 postgres 설정을 바꿨습니다. 그래도 connection을 못 찾네요ㅜㅜ 어떻게 해야 할까요

미쿡엔지니어님의 프로필 이미지
미쿡엔지니어
지식공유자

제가 설정을 봐야 확실히 알 것 같은데, 중요한 점은 https://github.com/dimz119/learn-apache-airflow/blob/main/docker-compose.yaml#L58-L61 요 부분을 보셔서 모두 postgres로 포인팅이 되어 있다면, docker ps 를 치셔서 혹시라도 다른 postgres가 돌고 있는지 확인해 보세요. 혹시 다른 postgres가 돌고 있다면 다 지우셔야 됩니다.

마지막으로 docker compose down 하셔서 다커가 안 돌고 있는 상태에서도 한번 DB 연결 테스트를 해보세요. 혹시라도 다커가 아닌 로컬에서 설치를 하셨다면 그 데이타 베이스가 돌고 있을 수 있습니다.

style12385님의 프로필 이미지
style12385
질문자

일단 선생님께서 말씀해주신 알려주신 방법을 확인해본 결과 "요 부분" 설정을 변경했습니다. 또한 두 번째와 세 번째 말씀해준 신 것 확인 결과 이상 없었습니다. 그래도 오류가 발생했습니다.

"connection to server at "localhost" (::1), port 5432"(아래 로그 첨부했습니다)

위 오류처럼 로컬 pc에서 listening 하고 있지 않은지 확인해보기 위해 확인해 본 결과

image.png

잘 실행되고 있습니다.

현재 docker 위에 ubuntu 환경으로 docker-compose를 구성했습니다.

  • 추가 확인이 필요한 설정이 있으면 말씀해주세요 ㅠ

    image.png
image.png

[오류 로그]

b0ec75311c2c

*** Found local files:

*** * /opt/airflow/logs/dag_id=postgres_loader/run_id=manual__2024-06-27T06:34:54.878565+00:00/task_id=execute_sql_query/attempt=1.log

[2024-06-27T06:34:57.200+0000] {local_task_job_runner.py:120} INFO - ::group::Pre task execution logs

[2024-06-27T06:34:57.232+0000] {taskinstance.py:2076} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: postgres_loader.execute_sql_query manual__2024-06-27T06:34:54.878565+00:00 [queued]>

[2024-06-27T06:34:57.241+0000] {taskinstance.py:2076} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: postgres_loader.execute_sql_query manual__2024-06-27T06:34:54.878565+00:00 [queued]>

[2024-06-27T06:34:57.242+0000] {taskinstance.py:2306} INFO - Starting attempt 1 of 1

[2024-06-27T06:34:57.260+0000] {taskinstance.py:2330} INFO - Executing <Task(PostgresOperator): execute_sql_query> on 2024-06-27 06:34:54.878565+00:00

[2024-06-27T06:34:57.267+0000] {warnings.py:112} WARNING - /home/***/.local/lib/python3.12/site-packages/***/task/task_runner/standard_task_runner.py:61: DeprecationWarning: This process (pid=131) is multi-threaded, use of fork() may lead to deadlocks in the child.

pid = os.fork()

[2024-06-27T06:34:57.268+0000] {standard_task_runner.py:63} INFO - Started process 133 to run task

[2024-06-27T06:34:57.269+0000] {standard_task_runner.py:90} INFO - Running: ['***', 'tasks', 'run', 'postgres_loader', 'execute_sql_query', 'manual__2024-06-27T06:34:54.878565+00:00', '--job-id', '143', '--raw', '--subdir', 'DAGS_FOLDER/postgres_loader.py', '--cfg-path', '/tmp/tmpehr9vin2']

[2024-06-27T06:34:57.272+0000] {standard_task_runner.py:91} INFO - Job 143: Subtask execute_sql_query

[2024-06-27T06:34:57.330+0000] {task_command.py:426} INFO - Running <TaskInstance: postgres_loader.execute_sql_query manual__2024-06-27T06:34:54.878565+00:00 [running]> on host b0ec75311c2c

[2024-06-27T06:34:57.450+0000] {taskinstance.py:2648} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='postgres_loader' AIRFLOW_CTX_TASK_ID='execute_sql_query' AIRFLOW_CTX_EXECUTION_DATE='2024-06-27T06:34:54.878565+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2024-06-27T06:34:54.878565+00:00'

[2024-06-27T06:34:57.453+0000] {taskinstance.py:430} INFO - ::endgroup::

[2024-06-27T06:34:57.454+0000] {sql.py:276} INFO - Executing:

INSERT INTO sample_table (key, value)

VALUES ('hello', 'world')

[2024-06-27T06:34:57.465+0000] {base.py:84} INFO - Using connection ID 'my_postgres_connection' for task execution.

[2024-06-27T06:34:57.478+0000] {base.py:84} INFO - Using connection ID 'my_postgres_connection' for task execution.

[2024-06-27T06:34:57.479+0000] {taskinstance.py:441} INFO - ::group::Post task execution logs

[2024-06-27T06:34:57.480+0000] {taskinstance.py:2905} ERROR - Task failed with exception

Traceback (most recent call last):

File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 465, in _execute_task

result = _execute_callable(context=context, **execute_callable_kwargs)

^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 432, in _execute_callable

return execute_callable(context=context, **execute_callable_kwargs)

^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/baseoperator.py", line 401, in wrapper

return func(self, *args, **kwargs)

^^^^^^^^^^^^^^^^^^^^^^^^^^^

File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/common/sql/operators/sql.py", line 282, in execute

output = hook.run(

^^^^^^^^^

File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/common/sql/hooks/sql.py", line 431, in run

with self._create_autocommit_connection(autocommit) as conn:

File "/usr/local/lib/python3.12/contextlib.py", line 137, in __enter__

return next(self.gen)

^^^^^^^^^^^^^^

File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/common/sql/hooks/sql.py", line 551, in _create_autocommit_connection

with closing(self.get_conn()) as conn:

^^^^^^^^^^^^^^^

File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/postgres/hooks/postgres.py", line 175, in get_conn

self.conn = psycopg2.connect(**conn_args)

^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

File "/home/airflow/.local/lib/python3.12/site-packages/psycopg2/__init__.py", line 122, in connect

conn = _connect(dsn, connection_factory=connection_factory, **kwasync)

^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

psycopg2.OperationalError: connection to server at "localhost" (::1), port 5432 failed: Connection refused

Is the server running on that host and accepting TCP/IP connections?

connection to server at "localhost" (127.0.0.1), port 5432 failed: Connection refused

Is the server running on that host and accepting TCP/IP connections?

[2024-06-27T06:34:57.493+0000] {taskinstance.py:1206} INFO - Marking task as FAILED. dag_id=postgres_loader, task_id=execute_sql_query, run_id=manual__2024-06-27T06:34:54.878565+00:00, execution_date=20240627T063454, start_date=20240627T063457, end_date=20240627T063457

[2024-06-27T06:34:57.509+0000] {standard_task_runner.py:110} ERROR - Failed to execute job 143 for task execute_sql_query (connection to server at "localhost" (::1), port 5432 failed: Connection refused

Is the server running on that host and accepting TCP/IP connections?

connection to server at "localhost" (127.0.0.1), port 5432 failed: Connection refused

Is the server running on that host and accepting TCP/IP connections?

; 133)

[2024-06-27T06:34:57.523+0000] {local_task_job_runner.py:240} INFO - Task exited with return code 1

[2024-06-27T06:34:57.536+0000] {warnings.py:112} WARNING - /home/***/.local/lib/python3.12/site-packages/***/models/baseoperator.py:1297: AirflowProviderDeprecationWarning: Call to deprecated class PostgresOperator. (Please use `***.providers.common.sql.operators.sql.SQLExecuteQueryOperator`.Also, you can provide `hook_params={'schema': <database>}`.)

result = cls.__new__(cls)

[2024-06-27T06:34:57.556+0000] {taskinstance.py:3503} INFO - 0 downstream tasks scheduled from follow-on schedule check

[2024-06-27T06:34:57.561+0000] {local_task_job_runner.py:222} INFO - ::endgroup::

오류가 난 로그입니다.

미쿡엔지니어님의 프로필 이미지
미쿡엔지니어
지식공유자

앗.......... 윈도우군요...... ㅠㅠ

아마 윈도우에서 postgres 접근을 이 IP주소로 못하는 것 같네요. 뭔가 방화벽 문제인 것 같은데...

connection to server at "localhost" (127.0.0.1), port 5432 failed: Connection refused

위에 에러가 중요 문제입니다.

 

근데... 이 질문 조회 수 왜 이렇게 많죠? ㅎㅎㅎ

style12385님의 프로필 이미지
style12385
질문자

그러면 docker-compose 실행하고 psql로 접속해서 conf 파일을 만들어야 하는 건가요?

style12385님의 프로필 이미지
style12385
질문자

윈도우의 postgres 가 아닌 docker위에 postgres를 사용하는 걸로 알고 있는데 이게 맞나요? 윈도우에는 DBeaver만 깔고 postgres는 깔지 않았습니다

미쿡엔지니어님의 프로필 이미지
미쿡엔지니어
지식공유자

흐음 한번 localhost 대신에 postgres라고 넣어보실래요? 다커컴포즈 내에서는 postgres로 서비스 이름이 설정 되어 있으실 겁니다. UI에서 postgres로 넣어보세요 ㅎㅎㅎ

style12385님의 프로필 이미지
style12385
질문자

드디어 다른 로그가 나오네요!!!! 인증 오류입니다. postgres 의 패스워드를 postgres로 해봤는데 오류네요.

image.png

미쿡엔지니어님의 프로필 이미지
미쿡엔지니어
지식공유자

제가 지하철이라 기억은 잘 나지 않지만 다커컴포즈 보시면 유저명과 비밀번호 있을 겁니다. 거의 다 오셨네요

style12385님의 프로필 이미지
style12385
질문자

docker-compose.yaml 파일 확인해 보니 있습니다..! 선생님 도움 덕분에 드디어 해결했습니다! 감사합니다

미쿡엔지니어님의 프로필 이미지
미쿡엔지니어
지식공유자

수고하셨어요 ㅎㅎ

0

미쿡엔지니어님의 프로필 이미지
미쿡엔지니어
지식공유자

안녕하세요 style12385님,

개인적인 사정으로 인해 답변이 늦었습니다. 에러 메세지를 보면 "The conn_id my_postgres_connection isn't defined"가 문제인 것 같은데, 이 말은 Airflow가 my_postgres_connection라는 정의를 못 찾은 것 같은데, 이말은 Airlfow UI 데이타베이스와 Airflow worker가 서로 다른 곳을 액세스하는 것 같아 보입니다.

connection이 잘 저장 되었는지, Airflow UI가 저장하는 곳이 Airflow worker가 접근하는 데이터 베이스가 같은지 확인을 해 보셔야겠습니다.

도움이 되셨길 바라겠습니다.

0

style12385님의 프로필 이미지
style12385
질문자

DBeaver에서 연결 확인, airflow에서 ps 명령어로 잘 작동하는지 확인했는데 위 명령어 치면 에러가 나네요. 다른 확인 방법이 있을까요

0

안녕하세요, 인프런 AI 인턴입니다.

style12385님의 질문과 관련하여, Docker 컨테이너에서 Airflow와 PostgreSQL 사이의 연결 설정에 문제가 발생한 것 같습니다. 질문 내용을 통해, Airflow에서 Postgres 연결 설정을 하고 docker-compose 파일을 사용하여 서비스를 구성하였으나, airflow tasks test 명령을 실행할 때 my_postgres_connection에 대한 AirflowNotFoundException 오류가 발생했음을 확인할 수 있습니다. 이는 Airflow가 my_postgres_connection 연결 구성을 찾을 수 없다는 것을 의미합니다.

유사한 문제를 다룬 기존 답변 링크를 아래와 같이 추천드립니다. 이 링크들은 Docker와 Airflow 환경에서의 PostgreSQL 연결 설정 문제에 대한 해결책을 제공할 수 있습니다:

  1. Docker 에서 airflow 사용시 질문드립니다: 이 게시물에서는 Docker 이미지를 사용하여 Airflow 서버를 만들고 PostgreSQL 서버와의 연결 문제에 대해 설명합니다. 제시된 해결책 중 하나는 docker-compose.yaml 파일에서 host.docker.internal을 사용하여 Docker 내부에서 호스트의 localhost에 접근하는 방법을 설명합니다.

  2. airflow tasks test error: 이 논의에서는 airflow tasks test를 실행할 때 발생하는 오류와 관련하여, Airflow의 연결 설정을 확인하고, 필요한 경우 UI를 통해 올바르게 설정하도록 권장합니다.

위의 링크들은 Airflow와 PostgreSQL 간의 연결 문제 해결에 도움을 줄 수 있는 정보를 제공하는데, style12385님의 경우, my_postgres_connection이 Airflow 시스템 내에서 올바르게 정의되었는지 확인하는 것이 중요합니다. Airflow Admin UI에서 Connections 섹션에서 이 연결 설정이 올바르게 등록되어 있는지 확인하십시오. 또한, 연결의 호스트, 포트, 사용자 이름 및 암호가 PostgreSQL 서비스에 정의된 값과 일치하는지 확인해야 합니다.

이 과정에서, docker-compose 파일, Airflow의 DAG 정의 및 my_postgres_connection 설정을 검토하여 문제의 원인을 파악하는 것이 중요합니다. 설정의 일관성과 정확성을 확인하십시오.

저는 질문자님의 이해를 돕고자 지식공유자님의 기존 답변을 참고하여 유사한 답변 링크를 추천해드리고 있습니다.
현재 베타 기능으로 답변이 만족스럽지 않을 수 있는 점 양해 부탁드립니다. 🙏
추가적으로 궁금한 점이 있으시면, 이어서 질문해 주세요. 곧 지식공유자께서 답변해 주실 것입니다.

style12385님의 프로필 이미지
style12385

작성한 질문수

질문하기