[카테고리:] 미분류

  • FastAPI 프로덕션 튜닝(성능·연결·동시성)

    • ASGI: gunicorn + uvicorn.workers / uvloop + httptools, --limit-concurrency, --keep-alive 조정
    • DB: SQLAlchemy 2.0 async + asyncpg + PgBouncer, 풀/타임아웃/재시도 표준화, N+1 제거
    • 외부연동: httpx AsyncClient 전역 풀 + 시간제한 + 백오프 + 회로차단
    • 응답: ORJSONResponse 기본, ETag/Cache-Control·서버 캐시 병행
    • 리밋/백프레셔: Redis 토큰버킷/슬라이딩윈도우
    • 가시성: 구조화 로그 + OTEL 트레이스 + 프로메테우스 메트릭
    • 안정성: 그레이스풀 종료, startup/shutdown에서 커넥션 정리

    1) 실행기(ASGI) & 프로세스 모델 튜닝

    권장 실행 명령(일반적인 I/O 바운드 서비스)

    gunicorn app.main:app \
      -k uvicorn.workers.UvicornWorker \
      -w $((CPU*2+1)) \
      --threads 1 \
      --bind 0.0.0.0:8000 \
      --timeout 60 --graceful-timeout 30 \
      --keep-alive 5 \
      --max-requests 5000 --max-requests-jitter 500 \
      --log-level info --access-logfile '-' --error-logfile '-'
    

    Uvicorn 옵션(내장/로컬 실행 시)

    uvicorn app.main:app \
      --loop uvloop --http httptools \
      --host 0.0.0.0 --port 8000 \
      --timeout-keep-alive 5 \
      --limit-concurrency 1000 \
      --backlog 2048
    

    • CPU 바운드면 워커 수 줄이고(예: CPU+1), I/O 바운드면 위 공식을 기본값으로 시작.
    • --limit-concurrency로 이벤트루프 과다 점유 방지(백프레셔 핵심).

    2) FastAPI 부팅 스켈레톤(성능 플래그 포함)

    # app/main.py
    import os
    from fastapi import FastAPI, Request, Response, HTTPException, Depends
    from fastapi.responses import ORJSONResponse
    from starlette.middleware.cors import CORSMiddleware
    from starlette.middleware.gzip import GZipMiddleware
    
    app = FastAPI(default_response_class=ORJSONResponse)
    
    # 정확히 필요한 Origin만 허용
    app.add_middleware(
        CORSMiddleware,
        allow_origins=os.getenv("CORS_ALLOW_ORIGINS","https://example.com").split(","),
        allow_credentials=True,
        allow_methods=["GET","POST","PUT","PATCH","DELETE","OPTIONS"],
        allow_headers=["Authorization","Content-Type","X-Request-ID"],
    )
    
    # 전송량 큰 엔드포인트에는 CDN/Brotli 권장, 앱 레벨은 GZip 최소화
    app.add_middleware(GZipMiddleware, minimum_size=1024)
    
    @app.middleware("http")
    async def add_request_id(request: Request, call_next):
        rid = request.headers.get("X-Request-ID") or os.urandom(8).hex()
        response: Response = await call_next(request)
        response.headers["X-Request-ID"] = rid
        return response
    
    @app.get("/healthz")
    async def healthz():
        return {"ok": True}
    
    @app.get("/readyz")
    async def readyz():
        # 여기서 DB/Redis ping 등 실제 점검 로직 호출 권장 (타임아웃 필수)
        return {"db": "ok", "cache": "ok"}
    

    3) 데이터베이스 레이어(Async SQLAlchemy + PgBouncer)

    엔진/세션 팩토리

    # app/db.py
    import os
    from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
    
    DB_URL = os.getenv("DB_URL")  # e.g. postgresql+asyncpg://user:pw@host:5432/db
    
    engine = create_async_engine(
        DB_URL,
        pool_size=int(os.getenv("DB_POOL_SIZE", "20")),
        max_overflow=int(os.getenv("DB_MAX_OVERFLOW", "20")),
        pool_timeout=10,              # 풀 대기 최대 10s
        pool_recycle=1800,            # 커넥션 재활용
        pool_pre_ping=True,           # 죽은 커넥션 사전 감지
    )
    
    SessionLocal = async_sessionmaker(bind=engine, expire_on_commit=False, class_=AsyncSession)
    
    async def get_db() -> AsyncSession:
        async with SessionLocal() as session:
            yield session
    

    N+1 방지(관계 로딩 전략)

    # app/repo.py
    from sqlalchemy import select
    from sqlalchemy.orm import selectinload
    from .models import Order, OrderItem
    
    async def get_order_with_items(db):
        stmt = select(Order).options(selectinload(Order.items))  # ✅ 한번에 가져오기
        result = await db.execute(stmt)
        return result.scalars().all()
    

    • PgBouncer transaction pooling 모드 + statement_timeout(예: 3~5s) 설정.
    • 느린 쿼리 로그 활성화 후 Top N 튜닝(인덱스, 커버링, 파티셔닝).

    4) Redis(캐시/레이트리밋/락)

    # app/redis.py
    import os, redis.asyncio as redis
    REDIS_URL = os.getenv("REDIS_URL","redis://localhost:6379/0")
    redis_cli = redis.from_url(REDIS_URL, encoding="utf-8", decode_responses=True)
    

    간단 토큰버킷(고정 윈도우 변형)

    # app/rate_limit.py
    from fastapi import Request, HTTPException
    from .redis import redis_cli
    
    async def rate_limit(request: Request, key_prefix: str, limit: int, window_sec: int):
        key = f"rl:{key_prefix}:{request.client.host}"
        cur = await redis_cli.incr(key)
        if cur == 1:
            await redis_cli.expire(key, window_sec)
        if cur > limit:
            raise HTTPException(status_code=429, detail="Too Many Requests")
    

    사용:

    @app.get("/heavy")
    async def heavy_endpoint(request: Request):
        await rate_limit(request, "heavy", limit=100, window_sec=60)
        return {"ok": True}
    

    5) 외부 HTTP 연동(풀/타임아웃/재시도/회로차단)

    # app/http.py
    import httpx, os, asyncio
    from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
    
    LIMITS = httpx.Limits(max_connections=200, max_keepalive_connections=50)
    TIMEOUT = httpx.Timeout(connect=2.0, read=3.0, write=3.0, pool=2.0)
    client = httpx.AsyncClient(limits=LIMITS, timeout=TIMEOUT)
    
    class UpstreamError(Exception): pass
    
    @retry(
        reraise=True,
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=0.2, min=0.2, max=1.0),
        retry=retry_if_exception_type((httpx.HTTPError, UpstreamError)),
    )
    async def call_upstream(url: str):
        r = await client.get(url)
        if r.status_code >= 500:
            raise UpstreamError(f"upstream {r.status_code}")
        return r.json()
    

    • 고가용 서비스면 회로차단기(aiobreaker 등) 추가(실패율·반개방·스냅백).

    6) 응답 최적화 & 캐싱

    ORJSON 기본 + ETag/Cache-Control

    from fastapi import Header
    from fastapi.responses import Response
    import hashlib, json
    
    def make_etag(payload: dict) -> str:
        raw = json.dumps(payload, separators=(",", ":"), sort_keys=True).encode()
        return hashlib.md5(raw).hexdigest()
    
    @app.get("/articles/{id}")
    async def get_article(id: int, if_none_match: str | None = Header(default=None)):
        # DB 또는 캐시에서 조회
        data = {"id": id, "title": "Hello"}
        etag = make_etag(data)
        if if_none_match == etag:
            return Response(status_code=304)
        resp = ORJSONResponse(data)
        resp.headers["ETag"] = etag
        resp.headers["Cache-Control"] = "public, max-age=60"
        return resp
    

    SSR/ISR로 가능한 건 Next.js 쪽에서 CDN 캐시 활용(엣지로 밀기).


    7) Pydantic v2 성능 팁

    from pydantic import BaseModel, ConfigDict
    
    class Article(BaseModel):
        model_config = ConfigDict(from_attributes=True, ser_json_tuples=True)
        id: int
        title: str
    
    # 직렬화 시
    payload = Article(id=1, title="x").model_dump(exclude_none=True)
    
    • DTO는 필드 최소화·exclude_none·중첩 모델 과다 사용 지양
    • 대량 응답은 스트리밍 또는 페이징 고정

    8) 구조화 로깅 & 메트릭 & 트레이싱

    JSON 로그(표준 라이브러리)

    # app/logging.py
    import json, logging, sys
    
    class JsonFormatter(logging.Formatter):
        def format(self, record):
            base = {"level": record.levelname, "msg": record.getMessage(), "logger": record.name}
            if record.exc_info:
                base["exc"] = self.formatException(record.exc_info)
            return json.dumps(base, ensure_ascii=False)
    
    handler = logging.StreamHandler(sys.stdout)
    handler.setFormatter(JsonFormatter())
    logging.getLogger().handlers = [handler]
    logging.getLogger().setLevel(logging.INFO)
    

    프로메테우스(요청/지연/에러 자동계측)

    # app/metrics.py
    from prometheus_fastapi_instrumentator import Instrumentator
    
    def setup_metrics(app):
        Instrumentator().instrument(app).expose(app, endpoint="/metrics")
    

    OTEL(간단 셋업)

    # app/otel.py
    import os
    from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
    from opentelemetry.sdk.resources import Resource
    from opentelemetry import trace
    from opentelemetry.sdk.trace import TracerProvider
    from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
    from opentelemetry.sdk.trace.export import BatchSpanProcessor
    
    def setup_tracing(app):
        provider = TracerProvider(resource=Resource.create({"service.name":"api"}))
        exporter = OTLPSpanExporter(endpoint=os.getenv("OTLP_ENDPOINT","http://otel-collector:4317"), insecure=True)
        provider.add_span_processor(BatchSpanProcessor(exporter))
        trace.set_tracer_provider(provider)
        FastAPIInstrumentor.instrument_app(app, excluded_urls="/healthz,/metrics")
    

    앱 결합:

    # app/main.py (일부)
    from .metrics import setup_metrics
    from .otel import setup_tracing
    setup_metrics(app)
    setup_tracing(app)
    

    9) 업로드/바디 보호(대용량 방지 & 보안)

    # app/mw_bodylimit.py
    from starlette.middleware.base import BaseHTTPMiddleware
    from fastapi import Request, HTTPException
    
    class BodySizeLimitMiddleware(BaseHTTPMiddleware):
        def __init__(self, app, max_body: int = 5*1024*1024):
            super().__init__(app); self.max_body = max_body
        async def dispatch(self, request: Request, call_next):
            cl = request.headers.get("content-length")
            if cl and int(cl) > self.max_body:
                raise HTTPException(413, "Payload Too Large")
            return await call_next(request)
    
    app.add_middleware(BodySizeLimitMiddleware, max_body=5*1024*1024)
    

    10) 그레이스풀 시작·종료

    # app/lifecycle.py
    from fastapi import FastAPI
    from .db import engine
    from .redis import redis_cli
    
    def mount_lifecycle(app: FastAPI):
        @app.on_event("startup")
        async def startup():
            # 프리워밍/헬스 체크 등
            pass
    
        @app.on_event("shutdown")
        async def shutdown():
            await redis_cli.close()
            await engine.dispose()
    
    # main.py 연결
    from .lifecycle import mount_lifecycle
    mount_lifecycle(app)
    

    11) 성능 진단 빠른 루틴

    1. k6/locust로 p95·에러율 기준선 재측정
    2. 느린 엔드포인트 Top 10 → DB 쿼리/외부연동 분해
    3. 캐시 후보(핫리드, 파생조회) 선정 → Redis·ISR 적용
    4. 풀/동시성 한도(--limit-concurrency, DB/HTTP 풀) 상향/하향 A/B
    5. max-requests로 워커 메모리 누수 방지
    6. 트레이스 샘플링 5~10% → 병목 지점 시각화

    12) 패키지 설치 요약

    pip install "fastapi[all]" uvicorn[standard] gunicorn orjson \
                sqlalchemy[asyncio] asyncpg \
                redis tenacity httpx \
                prometheus-fastapi-instrumentator \
                opentelemetry-sdk opentelemetry-exporter-otlp opentelemetry-instrumentation-fastapi
    

    마무리: 2주 액션 플랜(템플릿)

    • gunicorn+uvicorn 프로필 도입(--limit-concurrency, keep-alive 튜닝)
    • DB 풀/타임아웃 표준화 + PgBouncer 적용 + 느린쿼리 로그
    • httpx 전역 AsyncClient + 재시도/타임아웃/풀 제한 + 회로차단
    • Redis 리밋·캐시 적용(핵심 조회/쓰기 엔드포인트 3개)
    • ORJSONResponse + ETag/Cache-Control 적용
    • 구조화 로그 + /metrics + OTEL 수집 배선
    • 그레이스풀 종료/헬스체크 고도화

    다음은 3편. 데이터 레이어(Postgres/Redis/Search)와 캐싱 전략으로 간다.