해결된 질문
작성
·
387
·
수정됨
0
안녕하세요. 좋은 강의 감사합니다.
강의를 기반으로 제 방식대로 서버를 구성하다가 알 수 없는 에러에 빠졌습니다.
서버는 정상 구동은 되고, postman으로 root url인 localhost:8000/
에 request를 날리면 정상적으로 결과값을 반환 받는데localhost:8000/api/recommend?userId=1
만 호출하면 바로 에러메세지 없이 500만 응답으로 받고 있습니다.
print(1)
도 서버 로그에 찍히지 않고 서버 로그는 아예 나오질 않네요.localhost:8000/
에서도 동일하게 서버 로그는 찍히지 않습니다
우선 의도는 BaseRepository 클래스를 만들어서 find_by_id
같은 중복 코드를 하나로 관리해보려고 했습니다.
의존성 주입 부분은 지피티의 도움을 받아서 위치를 조정했습니다.
도저히 어디서 문제가 난건지 알 수 없어서 도움 요청 드립니다 ㅜㅜ
# main.py
from typing import Dict
from dotenv import load_dotenv
from fastapi import FastAPI
from src.app.app import create_app
load_dotenv()
app: FastAPI = create_app()
@app.get("/")
async def health_check_handler() -> Dict[str, str]:
return {"statusMsg": "good"}
# app.py
from contextlib import asynccontextmanager
from typing import AsyncGenerator
import anyio
from fastapi import FastAPI
from src.app.endpoints.recommend import router
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator:
limiter = anyio.to_thread.current_default_thread_limiter()
limiter.total_tokens = 200
yield
def create_app() -> FastAPI:
app = FastAPI(lifespan=lifespan)
app.include_router(router, prefix="/api")
# 다른 설정들(예: 미들웨어, 이벤트 핸들러 등)을 추가할 수 있습니다.
return app
# connection.py
import os
import urllib
from typing import AsyncGenerator
from dotenv import load_dotenv
from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine
load_dotenv()
DB_USERNAME: str = os.getenv("DB_USERNAME", "root")
DB_PASSWORD: str = os.getenv("DB_PASSWORD", "root")
DB_HOST: str = os.getenv("DB_HOST", "127.0.0.1")
DB_NAME: str = os.getenv("DB_NAME", "")
DB_PORT: str = os.getenv("DB_PORT", "3306")
DB_ECHO: bool = os.getenv("DB_ECHO", "true").lower() == "true"
if not DB_NAME:
raise ValueError("DB_NAME 환경변수가 설정되지 않았습니다.")
# 비밀번호 특수문자 허용
encoded_password = urllib.parse.quote_plus(DB_PASSWORD)
DATABASE_URL: str = f"mysql+asyncmy://{DB_USERNAME}:{encoded_password}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
engine: AsyncEngine = create_async_engine(
DATABASE_URL,
echo=DB_ECHO,
pool_size=10,
max_overflow=0,
pool_timeout=30, # second
pool_recycle=60, # second
pool_pre_ping=True,
)
SessionFactory = async_sessionmaker(autocommit=False, autoflush=False, bind=engine)
async def get_db() -> AsyncGenerator[AsyncSession, None]:
session = SessionFactory()
try:
yield session
finally:
await session.close()
# user_route.py
from typing import Dict
from fastapi import APIRouter, Depends, status
from src.app.dependency.query_param_denpendency import snake_case_query
from src.core.common_type import V
from src.core.exception.not_found_exceptions import UserNotFoundExceiption
from src.db.connection import get_db
from src.entity.user import UserEntity
from src.repository.user import UserRepository, get_user_repository
from src.dto.response.user_response import UserResponse
router = APIRouter(prefix="/recommend")
@router.get(path="", status_code=status.HTTP_200_OK, response_model=UserResponse)
async def get_recommend_schedule(
params: Dict[str, V]=Depends(snake_case_query),
user_repo: UserRepository=Depends(get_user_repository)
):
print(1)
user_id: int = int(params.get("user_id", None))
user: UserEntity | None = await user_repo.get_user_by_id(user_id)
if not user:
raise UserNotFoundExceiption()
user = UserResponse.model_validate(user)
return user
# base_repository.py
from typing import Type
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from src.core.common_type import E
class BaseRepository:
def __init__(self, db: AsyncSession):
self.db = db
async def get_entity_by_id(self, model: Type[E], entity_id: int) -> E | None:
print(2)
entity: E | None = await self.db.execute(select(model).where(model.id==entity_id))
return entity.scalars().first()
# user_repo.py
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession
from src.db.connection import get_db
from src.entity.user import UserEntity
from src.repository.base_repository import BaseRepository
def get_user_repository(db: AsyncSession = Depends(get_db)) -> "UserRepository":
return UserRepository(db)
class UserRepository(BaseRepository):
async def get_user_by_id(self, user_id: int) -> UserEntity | None:
print(3)
return await self.get_entity_by_id(UserEntity, user_id)
# user_response.py
from datetime import datetime
from typing import Optional
from pydantic import BaseModel
class UserResponse(BaseModel):
id: int
name: str
nickname: str
email: str
phone: str
join_date: datetime
updated_at: Optional[datetime] = None
deleted_at: Optional[datetime] = None
class Config:
from_attributes = True
도대체 어디서 문제가 생긴걸까요...
답변 5
1
0
아하 위 문제도 해결했습니다.
모든 entity 클래스에 정역참조 관계 선언에다가 lazy="joined"를 넣어놨더니 무한순환에 빠졌던 것 같습니다. 적절하게 필요한 것만 넣고 나머지는 빼니까 잘 되네요
0
안녕하세요. 위 에러는 해결됐는데, 새로운 문제가 생겼습니다 ㅜㅜ
from typing import Tuple, Type
from fastapi import Depends
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncResult, AsyncSession
from src.core.type.common_type import E
from src.db.connection import get_db
class BaseRepository:
def __init__(self, db: AsyncSession=Depends(get_db)):
self.db = db
async def get_entity_by_id(self, model: Type[E], entity_id: int) -> E | None:
async with self.db as db:
result: AsyncResult[Tuple[E]] = await db.execute(select(model).where(model.id == entity_id)) # 문제가 생긴 부분
entity: E | None = result.scalar_one_or_none()
return entity
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession
from src.db.connection import get_db
from src.entity.user import UserEntity
from src.repository.base_repository import BaseRepository
class UserRepository(BaseRepository):
async def get_user_by_id(self, user_id: int) -> UserEntity | None:
return await self.get_entity_by_id(UserEntity, user_id)
위 코드 중 BaseRepository의 result를 기다리는 중에 db lock인지 무한 대기 상태인지 서버가 멈춰버리는 현상이 있습니다.
db에 쿼리문이 날라가는건 확인했습니다만 다음 코드로 넘어가질 않네요.
제가 놓친게 있을까요?
0
0
안녕하세요, 인프런 AI 인턴입니다.
윤서준님께서 겪고 계신 문제는 FastAPI 서버가 요청에 대해 500 에러를 반환하나 에러 메시지가 없어서 정확한 원인을 파악하기 어려운 상황입니다. 이런 경우 여러 가지 원인이 있을 수 있지만, 주로 다음과 같은 이유로 발생할 수 있습니다:
환경 변수 문제: .env
파일이 제대로 로드되지 않아 데이터베이스 연결에 필요한 환경 변수가 누락되었을 가능성이 있습니다. DB_NAME
환경 변수가 설정되지 않았다고 하셨으니, .env
파일 내의 변수가 올바르게 설정되어 있는지 다시 확인해보세요.
데이터베이스 연결 문제: 데이터베이스 URL이 잘못되었거나, 데이터베이스 서버에 문제가 있을 수 있습니다. 데이터베이스 연결이 올바르게 이루어지고 있는지 확인하세요. 데이터베이스 서버가 실행 중인지, mysql+asyncmy
를 사용하여 연결되고 있는지 점검해보세요.
의존성 주입 문제: 의존성 주입 설정이 잘못되어 있을 경우, 주입된 객체가 구현되지 않아 요청이 실패하는 상황이 발생할 수 있습니다. get_user_repository
의 의존성 주입 부분이나 엔티티 클래스의 정의가 올바른지 확인해보세요.
UserRepository 클래스 문제: get_user_by_id
함수가 모델 엔티티를 올바르게 가져오지 못하고 있을 가능성이 있습니다. 디버깅을 위해 print()
같은 함수를 사용하여 함수가 어디까지 실행되고 있는지, 어디서 문제가 발생하는지를 파악해보는 것도 좋습니다.
SQLAlchemy 쿼리 문제: SQLAlchemy 쿼리 실행 시 예기치 않은 오류가 발생할 수 있습니다. 쿼리가 올바르게 작성되었는지, UserEntity
에서 ID 필드가 올바르게 지정되어 있는지 확인해주세요.
Pydantic 모델 문제: Pydantic 모델에서 제공하는 from_attributes = True
설정이 FastAPI와의 연동에서 충돌을 일으킬 수도 있습니다. 설정이 올바른지 확인하시고, 필요한 경우 디버깅하여 Pydantic 모델의 동작을 확인해보세요.
아래는 FastAPI와 관련된 서버 에러를 확인할 수 있는 유사한 질문 링크입니다:
- FastAPI 오류
위의 사항들을 점검해 보시고, 여전히 해결되지 않는 부분이 있으면, 구체적인 서버 로그 또는 에러 메시지를 함께 공유해 주시면 추가 도움을 드릴 수 있습니다.
저는 질문자님의 이해를 돕고자 지식공유자님의 기존 답변을 참고하여 유사한 답변 링크를 추천해드리고 있습니다. 현재 베타 기능으로 답변이 만족스럽지 않을 수 있는 점 양해 부탁드립니다. 🙏 추가적으로 궁금한 점이 있으시면, 이어서 질문해 주세요. 곧 지식공유자께서 답변해 주실 것입니다.
import re from typing import Dict, Type from fastapi import Request from src.core.type.common_type import V # 쿼리 스트링 전체를 camel case에서 snake cakse로 변환하는 함수 def convert_query_params_from_camel_to_snake(query_params: Dict[str, Type[V]]) -> Dict[str, Type[V]]: return {re.sub(r'(?<!^)(?=[A-Z])', '_', k).lower(): v for k, v in query_params.items()} def snake_case_query(request: Request) -> Dict[str, Type[V]]: return convert_query_params_from_camel_to_snake(request.query_params)
이런 식으로 카멜케이스를 스네이크 케이스로 바꾸는 코드입니다.