FastAPI 프로덕션 튜닝(성능·연결·동시성)

  • ASGI: gunicorn + uvicorn.workers / uvloop + httptools, --limit-concurrency, --keep-alive 조정
  • DB: SQLAlchemy 2.0 async + asyncpg + PgBouncer, 풀/타임아웃/재시도 표준화, N+1 제거
  • 외부연동: httpx AsyncClient 전역 풀 + 시간제한 + 백오프 + 회로차단
  • 응답: ORJSONResponse 기본, ETag/Cache-Control·서버 캐시 병행
  • 리밋/백프레셔: Redis 토큰버킷/슬라이딩윈도우
  • 가시성: 구조화 로그 + OTEL 트레이스 + 프로메테우스 메트릭
  • 안정성: 그레이스풀 종료, startup/shutdown에서 커넥션 정리

1) 실행기(ASGI) & 프로세스 모델 튜닝

권장 실행 명령(일반적인 I/O 바운드 서비스)

gunicorn app.main:app \
  -k uvicorn.workers.UvicornWorker \
  -w $((CPU*2+1)) \
  --threads 1 \
  --bind 0.0.0.0:8000 \
  --timeout 60 --graceful-timeout 30 \
  --keep-alive 5 \
  --max-requests 5000 --max-requests-jitter 500 \
  --log-level info --access-logfile '-' --error-logfile '-'

Uvicorn 옵션(내장/로컬 실행 시)

uvicorn app.main:app \
  --loop uvloop --http httptools \
  --host 0.0.0.0 --port 8000 \
  --timeout-keep-alive 5 \
  --limit-concurrency 1000 \
  --backlog 2048

  • CPU 바운드면 워커 수 줄이고(예: CPU+1), I/O 바운드면 위 공식을 기본값으로 시작.
  • --limit-concurrency로 이벤트루프 과다 점유 방지(백프레셔 핵심).

2) FastAPI 부팅 스켈레톤(성능 플래그 포함)

# app/main.py
import os
from fastapi import FastAPI, Request, Response, HTTPException, Depends
from fastapi.responses import ORJSONResponse
from starlette.middleware.cors import CORSMiddleware
from starlette.middleware.gzip import GZipMiddleware

app = FastAPI(default_response_class=ORJSONResponse)

# 정확히 필요한 Origin만 허용
app.add_middleware(
    CORSMiddleware,
    allow_origins=os.getenv("CORS_ALLOW_ORIGINS","https://example.com").split(","),
    allow_credentials=True,
    allow_methods=["GET","POST","PUT","PATCH","DELETE","OPTIONS"],
    allow_headers=["Authorization","Content-Type","X-Request-ID"],
)

# 전송량 큰 엔드포인트에는 CDN/Brotli 권장, 앱 레벨은 GZip 최소화
app.add_middleware(GZipMiddleware, minimum_size=1024)

@app.middleware("http")
async def add_request_id(request: Request, call_next):
    rid = request.headers.get("X-Request-ID") or os.urandom(8).hex()
    response: Response = await call_next(request)
    response.headers["X-Request-ID"] = rid
    return response

@app.get("/healthz")
async def healthz():
    return {"ok": True}

@app.get("/readyz")
async def readyz():
    # 여기서 DB/Redis ping 등 실제 점검 로직 호출 권장 (타임아웃 필수)
    return {"db": "ok", "cache": "ok"}

3) 데이터베이스 레이어(Async SQLAlchemy + PgBouncer)

엔진/세션 팩토리

# app/db.py
import os
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession

DB_URL = os.getenv("DB_URL")  # e.g. postgresql+asyncpg://user:pw@host:5432/db

engine = create_async_engine(
    DB_URL,
    pool_size=int(os.getenv("DB_POOL_SIZE", "20")),
    max_overflow=int(os.getenv("DB_MAX_OVERFLOW", "20")),
    pool_timeout=10,              # 풀 대기 최대 10s
    pool_recycle=1800,            # 커넥션 재활용
    pool_pre_ping=True,           # 죽은 커넥션 사전 감지
)

SessionLocal = async_sessionmaker(bind=engine, expire_on_commit=False, class_=AsyncSession)

async def get_db() -> AsyncSession:
    async with SessionLocal() as session:
        yield session

N+1 방지(관계 로딩 전략)

# app/repo.py
from sqlalchemy import select
from sqlalchemy.orm import selectinload
from .models import Order, OrderItem

async def get_order_with_items(db):
    stmt = select(Order).options(selectinload(Order.items))  # ✅ 한번에 가져오기
    result = await db.execute(stmt)
    return result.scalars().all()

  • PgBouncer transaction pooling 모드 + statement_timeout(예: 3~5s) 설정.
  • 느린 쿼리 로그 활성화 후 Top N 튜닝(인덱스, 커버링, 파티셔닝).

4) Redis(캐시/레이트리밋/락)

# app/redis.py
import os, redis.asyncio as redis
REDIS_URL = os.getenv("REDIS_URL","redis://localhost:6379/0")
redis_cli = redis.from_url(REDIS_URL, encoding="utf-8", decode_responses=True)

간단 토큰버킷(고정 윈도우 변형)

# app/rate_limit.py
from fastapi import Request, HTTPException
from .redis import redis_cli

async def rate_limit(request: Request, key_prefix: str, limit: int, window_sec: int):
    key = f"rl:{key_prefix}:{request.client.host}"
    cur = await redis_cli.incr(key)
    if cur == 1:
        await redis_cli.expire(key, window_sec)
    if cur > limit:
        raise HTTPException(status_code=429, detail="Too Many Requests")

사용:

@app.get("/heavy")
async def heavy_endpoint(request: Request):
    await rate_limit(request, "heavy", limit=100, window_sec=60)
    return {"ok": True}

5) 외부 HTTP 연동(풀/타임아웃/재시도/회로차단)

# app/http.py
import httpx, os, asyncio
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type

LIMITS = httpx.Limits(max_connections=200, max_keepalive_connections=50)
TIMEOUT = httpx.Timeout(connect=2.0, read=3.0, write=3.0, pool=2.0)
client = httpx.AsyncClient(limits=LIMITS, timeout=TIMEOUT)

class UpstreamError(Exception): pass

@retry(
    reraise=True,
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=0.2, min=0.2, max=1.0),
    retry=retry_if_exception_type((httpx.HTTPError, UpstreamError)),
)
async def call_upstream(url: str):
    r = await client.get(url)
    if r.status_code >= 500:
        raise UpstreamError(f"upstream {r.status_code}")
    return r.json()

  • 고가용 서비스면 회로차단기(aiobreaker 등) 추가(실패율·반개방·스냅백).

6) 응답 최적화 & 캐싱

ORJSON 기본 + ETag/Cache-Control

from fastapi import Header
from fastapi.responses import Response
import hashlib, json

def make_etag(payload: dict) -> str:
    raw = json.dumps(payload, separators=(",", ":"), sort_keys=True).encode()
    return hashlib.md5(raw).hexdigest()

@app.get("/articles/{id}")
async def get_article(id: int, if_none_match: str | None = Header(default=None)):
    # DB 또는 캐시에서 조회
    data = {"id": id, "title": "Hello"}
    etag = make_etag(data)
    if if_none_match == etag:
        return Response(status_code=304)
    resp = ORJSONResponse(data)
    resp.headers["ETag"] = etag
    resp.headers["Cache-Control"] = "public, max-age=60"
    return resp

SSR/ISR로 가능한 건 Next.js 쪽에서 CDN 캐시 활용(엣지로 밀기).


7) Pydantic v2 성능 팁

from pydantic import BaseModel, ConfigDict

class Article(BaseModel):
    model_config = ConfigDict(from_attributes=True, ser_json_tuples=True)
    id: int
    title: str

# 직렬화 시
payload = Article(id=1, title="x").model_dump(exclude_none=True)
  • DTO는 필드 최소화·exclude_none·중첩 모델 과다 사용 지양
  • 대량 응답은 스트리밍 또는 페이징 고정

8) 구조화 로깅 & 메트릭 & 트레이싱

JSON 로그(표준 라이브러리)

# app/logging.py
import json, logging, sys

class JsonFormatter(logging.Formatter):
    def format(self, record):
        base = {"level": record.levelname, "msg": record.getMessage(), "logger": record.name}
        if record.exc_info:
            base["exc"] = self.formatException(record.exc_info)
        return json.dumps(base, ensure_ascii=False)

handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(JsonFormatter())
logging.getLogger().handlers = [handler]
logging.getLogger().setLevel(logging.INFO)

프로메테우스(요청/지연/에러 자동계측)

# app/metrics.py
from prometheus_fastapi_instrumentator import Instrumentator

def setup_metrics(app):
    Instrumentator().instrument(app).expose(app, endpoint="/metrics")

OTEL(간단 셋업)

# app/otel.py
import os
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.sdk.resources import Resource
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace.export import BatchSpanProcessor

def setup_tracing(app):
    provider = TracerProvider(resource=Resource.create({"service.name":"api"}))
    exporter = OTLPSpanExporter(endpoint=os.getenv("OTLP_ENDPOINT","http://otel-collector:4317"), insecure=True)
    provider.add_span_processor(BatchSpanProcessor(exporter))
    trace.set_tracer_provider(provider)
    FastAPIInstrumentor.instrument_app(app, excluded_urls="/healthz,/metrics")

앱 결합:

# app/main.py (일부)
from .metrics import setup_metrics
from .otel import setup_tracing
setup_metrics(app)
setup_tracing(app)

9) 업로드/바디 보호(대용량 방지 & 보안)

# app/mw_bodylimit.py
from starlette.middleware.base import BaseHTTPMiddleware
from fastapi import Request, HTTPException

class BodySizeLimitMiddleware(BaseHTTPMiddleware):
    def __init__(self, app, max_body: int = 5*1024*1024):
        super().__init__(app); self.max_body = max_body
    async def dispatch(self, request: Request, call_next):
        cl = request.headers.get("content-length")
        if cl and int(cl) > self.max_body:
            raise HTTPException(413, "Payload Too Large")
        return await call_next(request)

app.add_middleware(BodySizeLimitMiddleware, max_body=5*1024*1024)

10) 그레이스풀 시작·종료

# app/lifecycle.py
from fastapi import FastAPI
from .db import engine
from .redis import redis_cli

def mount_lifecycle(app: FastAPI):
    @app.on_event("startup")
    async def startup():
        # 프리워밍/헬스 체크 등
        pass

    @app.on_event("shutdown")
    async def shutdown():
        await redis_cli.close()
        await engine.dispose()
# main.py 연결
from .lifecycle import mount_lifecycle
mount_lifecycle(app)

11) 성능 진단 빠른 루틴

  1. k6/locust로 p95·에러율 기준선 재측정
  2. 느린 엔드포인트 Top 10 → DB 쿼리/외부연동 분해
  3. 캐시 후보(핫리드, 파생조회) 선정 → Redis·ISR 적용
  4. 풀/동시성 한도(--limit-concurrency, DB/HTTP 풀) 상향/하향 A/B
  5. max-requests로 워커 메모리 누수 방지
  6. 트레이스 샘플링 5~10% → 병목 지점 시각화

12) 패키지 설치 요약

pip install "fastapi[all]" uvicorn[standard] gunicorn orjson \
            sqlalchemy[asyncio] asyncpg \
            redis tenacity httpx \
            prometheus-fastapi-instrumentator \
            opentelemetry-sdk opentelemetry-exporter-otlp opentelemetry-instrumentation-fastapi

마무리: 2주 액션 플랜(템플릿)

  • gunicorn+uvicorn 프로필 도입(--limit-concurrency, keep-alive 튜닝)
  • DB 풀/타임아웃 표준화 + PgBouncer 적용 + 느린쿼리 로그
  • httpx 전역 AsyncClient + 재시도/타임아웃/풀 제한 + 회로차단
  • Redis 리밋·캐시 적용(핵심 조회/쓰기 엔드포인트 3개)
  • ORJSONResponse + ETag/Cache-Control 적용
  • 구조화 로그 + /metrics + OTEL 수집 배선
  • 그레이스풀 종료/헬스체크 고도화

다음은 3편. 데이터 레이어(Postgres/Redis/Search)와 캐싱 전략으로 간다.

코멘트

답글 남기기

이메일 주소는 공개되지 않습니다. 필수 필드는 *로 표시됩니다