인프런 커뮤니티 질문&답변

휘뚜루마뚜루님의 프로필 이미지
휘뚜루마뚜루

작성한 질문수

Airflow 마스터 클래스

Custom operator 개발 실습

[ERROR] Max retries exceeded with url

해결된 질문

작성

·

5.7K

0

task실행 하면 api호출이 안되고 requests에서 에러가 나옵니다

requests.exceptions.ConnectionError: HTTPConnectionPool(host='http', port=80): Max retries exceeded with url: //openapi.seoul.go.kr:8088/***/json/TbCorona19CountStatus/1/1000/?Content-Type=application%2Fjson&charset=utf-8&Accept=%2A%2F%2A (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7fbc7dac3fa0>: Failed to establish a new connection: [Errno -2] Name or service not known'))

그런데 postman이든 브라우저든 다른 곳에서는 정상척으로 호출하고 이전에 SimpleHttpOperator도 정상적으로 모두 호출합니다.

왜 연결이 안되고 Max retries가 뜨는지 잘 모르겠어요 ㅠ

혹시 while문에서 너무 빨리호출하나 싶어서 time.sleep도 넣어봤지만 차이 없네요

답변 1

0

김현진님의 프로필 이미지
김현진
지식공유자

안녕하세요 휘뚜루마뚜루 님

왠지 내용 중 아래의 host 부분이 잘못 들어간것 같습니다.

HTTPConnectionPool(host='http', port=80)

그런데 이걸로는 원인을 알기 어려워서

DAG 작성 내용이랑 operator 작성한거랑 connection 등록한 내용도 같이 올려주시겠어요?

# operator
from airflow.models.baseoperator import BaseOperator
from airflow.hooks.base import BaseHook
import pandas as pd
import time

class SeoulApiToCsvOperator(BaseOperator):
    template_fields = ('endpoint', 'path', 'file_name', 'base_dt')

    def __init__(self, dataset_nm, path, file_name, base_dt=None, **kwargs):
        super().__init__(**kwargs)
        self.http_conn_id = 'openapi.seoul.go.kr'
        self.path = path
        self.file_name = file_name
        self.endpoint = '{{var.value.apikey_openapi_seoul_go_kr}}/json/' + dataset_nm
        self.base_dt = base_dt

    def execute(self, context):
        import os

        connection = BaseHook.get_connection(self.http_conn_id)
        self.base_url = f'http://{connection.host}:{connection.port}/{self.endpoint}'

        total_row_df = pd.DataFrame()
        start_row = 1
        end_row = 1000
        while True:
            time.sleep(3)
            self.log.info(f'시작:{start_row}')
            self.log.info(f'끝:{end_row}')
            row_df = self._call_api(self.base_url, start_row, end_row)
            total_row_df = pd.concat([total_row_df, row_df])
            if len(row_df) < 1000:
                break
            else:
                start_row = end_row + 1
                end_row += 1000

        if not os.path.exists(self.path):
            os.system(f'mkdir -p {self.path}')
        total_row_df.to_csv(self.path + '/' + self.file_name, encoding='utf-8', index=False)

    def _call_api(self, base_url, start_row, end_row):
        import requests
        import json

        headers = {'Content-Type': 'application/json',
                   'charset': 'utf-8',
                   'Accept': '*/*'
                   }

        request_url = f'{base_url}/{start_row}/{end_row}/'
        if self.base_dt is not None:
            request_url = f'{base_url}/{start_row}/{end_row}/{self.base_dt}'
        response = requests.get(request_url, headers)
        contents = json.loads(response.text)

        key_nm = list(contents.keys())[0]
        row_data = contents.get(key_nm).get('row')
        row_df = pd.DataFrame(row_data)

        return row_df
# DAG 
from operators.seoul_api_to_csv_operator import SeoulApiToCsvOperator
from airflow import DAG
import pendulum

with DAG(
    dag_id="dags_seoul_api",
    schedule="0 7 * * *",
    start_date=pendulum.datetime(2023, 11, 1, tz="Asia/Seoul"),
    catchup=False
) as dag:
    tb_corona19_count_status = SeoulApiToCsvOperator(
        task_id='tb_corona19_count_status',
        dataset_nm='TbCorona19CountStatus',
        path='/opt/airflow/files/TbCorona19CountStatus/{{data_interval_end.in_timezone("Asia/Seoul") | ds_nodash }}',
        file_name='TbCorona19CountStatus.csv'
    )

    tb_corona19_vaccine_stat_new = SeoulApiToCsvOperator(
        task_id="tv_corona19_vaccine_stat_new",
        dataset_nm="tvCorona19VaccinestatNew",
        path="/opt/airflow/files/tvCorona19VaccinestatNew/{{ data_interval_end.in_timezone('Asia/Seoul') | ds_nodash }}",
        file_name="tvCorona19VaccinestatNew.csv"
    )

    tb_corona19_count_status >> tb_corona19_vaccine_stat_new

 

image

에러에 나오는 URL에는 8088로 잘 나오고 그 URL에 api key만 바꿔서 찌르면 응답이 잘오는데 operator로 실행할 때만 안되는게 이상하네요

우선 호출은 해결했습니다. base_url을 만드는 부분에서 http://가 들어가고 connections의 host에서 http:// 가 중복되어 api 호출을 하지 못하는 것이었습니다.

 

다만 테이블 저장하면 6줄만 나오네요

이 부분은 while 내부의 문제일테니 다시 확인해보겠습니다 ㅎㅎ

김현진님의 프로필 이미지
김현진
지식공유자

휘뚜루마뚜루님

일단 커넥션 문제는 해결되서 다행이네요.

코로나 백신 데이터셋으로 6줄만 저장되세요?

저장되는 날짜값 보시고 start_row, end_row 부분이 잘못들어가진 않았는지 확인해보셔야 할거에요. 트러블 슈팅 해보시고 안되면 또 남겨주세요 ^^

start_row / end_row 문제가 맞았습니다 ㅎㅎ

수정하고나니 다 잘되네요!

휘뚜루마뚜루님의 프로필 이미지
휘뚜루마뚜루

작성한 질문수

질문하기