- ASGI: gunicorn + uvicorn.workers / uvloop + httptools,
--limit-concurrency,--keep-alive조정 - DB: SQLAlchemy 2.0 async + asyncpg + PgBouncer, 풀/타임아웃/재시도 표준화, N+1 제거
- 외부연동: httpx AsyncClient 전역 풀 + 시간제한 + 백오프 + 회로차단
- 응답: ORJSONResponse 기본, ETag/Cache-Control·서버 캐시 병행
- 리밋/백프레셔: Redis 토큰버킷/슬라이딩윈도우
- 가시성: 구조화 로그 + OTEL 트레이스 + 프로메테우스 메트릭
- 안정성: 그레이스풀 종료,
startup/shutdown에서 커넥션 정리
1) 실행기(ASGI) & 프로세스 모델 튜닝
권장 실행 명령(일반적인 I/O 바운드 서비스)
gunicorn app.main:app \
-k uvicorn.workers.UvicornWorker \
-w $((CPU*2+1)) \
--threads 1 \
--bind 0.0.0.0:8000 \
--timeout 60 --graceful-timeout 30 \
--keep-alive 5 \
--max-requests 5000 --max-requests-jitter 500 \
--log-level info --access-logfile '-' --error-logfile '-'
Uvicorn 옵션(내장/로컬 실행 시)
uvicorn app.main:app \
--loop uvloop --http httptools \
--host 0.0.0.0 --port 8000 \
--timeout-keep-alive 5 \
--limit-concurrency 1000 \
--backlog 2048
팁
- CPU 바운드면 워커 수 줄이고(예: CPU+1), I/O 바운드면 위 공식을 기본값으로 시작.
--limit-concurrency로 이벤트루프 과다 점유 방지(백프레셔 핵심).
2) FastAPI 부팅 스켈레톤(성능 플래그 포함)
# app/main.py
import os
from fastapi import FastAPI, Request, Response, HTTPException, Depends
from fastapi.responses import ORJSONResponse
from starlette.middleware.cors import CORSMiddleware
from starlette.middleware.gzip import GZipMiddleware
app = FastAPI(default_response_class=ORJSONResponse)
# 정확히 필요한 Origin만 허용
app.add_middleware(
CORSMiddleware,
allow_origins=os.getenv("CORS_ALLOW_ORIGINS","https://example.com").split(","),
allow_credentials=True,
allow_methods=["GET","POST","PUT","PATCH","DELETE","OPTIONS"],
allow_headers=["Authorization","Content-Type","X-Request-ID"],
)
# 전송량 큰 엔드포인트에는 CDN/Brotli 권장, 앱 레벨은 GZip 최소화
app.add_middleware(GZipMiddleware, minimum_size=1024)
@app.middleware("http")
async def add_request_id(request: Request, call_next):
rid = request.headers.get("X-Request-ID") or os.urandom(8).hex()
response: Response = await call_next(request)
response.headers["X-Request-ID"] = rid
return response
@app.get("/healthz")
async def healthz():
return {"ok": True}
@app.get("/readyz")
async def readyz():
# 여기서 DB/Redis ping 등 실제 점검 로직 호출 권장 (타임아웃 필수)
return {"db": "ok", "cache": "ok"}
3) 데이터베이스 레이어(Async SQLAlchemy + PgBouncer)
엔진/세션 팩토리
# app/db.py
import os
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
DB_URL = os.getenv("DB_URL") # e.g. postgresql+asyncpg://user:pw@host:5432/db
engine = create_async_engine(
DB_URL,
pool_size=int(os.getenv("DB_POOL_SIZE", "20")),
max_overflow=int(os.getenv("DB_MAX_OVERFLOW", "20")),
pool_timeout=10, # 풀 대기 최대 10s
pool_recycle=1800, # 커넥션 재활용
pool_pre_ping=True, # 죽은 커넥션 사전 감지
)
SessionLocal = async_sessionmaker(bind=engine, expire_on_commit=False, class_=AsyncSession)
async def get_db() -> AsyncSession:
async with SessionLocal() as session:
yield session
N+1 방지(관계 로딩 전략)
# app/repo.py
from sqlalchemy import select
from sqlalchemy.orm import selectinload
from .models import Order, OrderItem
async def get_order_with_items(db):
stmt = select(Order).options(selectinload(Order.items)) # ✅ 한번에 가져오기
result = await db.execute(stmt)
return result.scalars().all()
팁
- PgBouncer transaction pooling 모드 +
statement_timeout(예: 3~5s) 설정.- 느린 쿼리 로그 활성화 후 Top N 튜닝(인덱스, 커버링, 파티셔닝).
4) Redis(캐시/레이트리밋/락)
# app/redis.py
import os, redis.asyncio as redis
REDIS_URL = os.getenv("REDIS_URL","redis://localhost:6379/0")
redis_cli = redis.from_url(REDIS_URL, encoding="utf-8", decode_responses=True)
간단 토큰버킷(고정 윈도우 변형)
# app/rate_limit.py
from fastapi import Request, HTTPException
from .redis import redis_cli
async def rate_limit(request: Request, key_prefix: str, limit: int, window_sec: int):
key = f"rl:{key_prefix}:{request.client.host}"
cur = await redis_cli.incr(key)
if cur == 1:
await redis_cli.expire(key, window_sec)
if cur > limit:
raise HTTPException(status_code=429, detail="Too Many Requests")
사용:
@app.get("/heavy")
async def heavy_endpoint(request: Request):
await rate_limit(request, "heavy", limit=100, window_sec=60)
return {"ok": True}
5) 외부 HTTP 연동(풀/타임아웃/재시도/회로차단)
# app/http.py
import httpx, os, asyncio
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
LIMITS = httpx.Limits(max_connections=200, max_keepalive_connections=50)
TIMEOUT = httpx.Timeout(connect=2.0, read=3.0, write=3.0, pool=2.0)
client = httpx.AsyncClient(limits=LIMITS, timeout=TIMEOUT)
class UpstreamError(Exception): pass
@retry(
reraise=True,
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=0.2, min=0.2, max=1.0),
retry=retry_if_exception_type((httpx.HTTPError, UpstreamError)),
)
async def call_upstream(url: str):
r = await client.get(url)
if r.status_code >= 500:
raise UpstreamError(f"upstream {r.status_code}")
return r.json()
팁
- 고가용 서비스면 회로차단기(aiobreaker 등) 추가(실패율·반개방·스냅백).
6) 응답 최적화 & 캐싱
ORJSON 기본 + ETag/Cache-Control
from fastapi import Header
from fastapi.responses import Response
import hashlib, json
def make_etag(payload: dict) -> str:
raw = json.dumps(payload, separators=(",", ":"), sort_keys=True).encode()
return hashlib.md5(raw).hexdigest()
@app.get("/articles/{id}")
async def get_article(id: int, if_none_match: str | None = Header(default=None)):
# DB 또는 캐시에서 조회
data = {"id": id, "title": "Hello"}
etag = make_etag(data)
if if_none_match == etag:
return Response(status_code=304)
resp = ORJSONResponse(data)
resp.headers["ETag"] = etag
resp.headers["Cache-Control"] = "public, max-age=60"
return resp
SSR/ISR로 가능한 건 Next.js 쪽에서 CDN 캐시 활용(엣지로 밀기).
7) Pydantic v2 성능 팁
from pydantic import BaseModel, ConfigDict
class Article(BaseModel):
model_config = ConfigDict(from_attributes=True, ser_json_tuples=True)
id: int
title: str
# 직렬화 시
payload = Article(id=1, title="x").model_dump(exclude_none=True)
- DTO는 필드 최소화·
exclude_none·중첩 모델 과다 사용 지양 - 대량 응답은 스트리밍 또는 페이징 고정
8) 구조화 로깅 & 메트릭 & 트레이싱
JSON 로그(표준 라이브러리)
# app/logging.py
import json, logging, sys
class JsonFormatter(logging.Formatter):
def format(self, record):
base = {"level": record.levelname, "msg": record.getMessage(), "logger": record.name}
if record.exc_info:
base["exc"] = self.formatException(record.exc_info)
return json.dumps(base, ensure_ascii=False)
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(JsonFormatter())
logging.getLogger().handlers = [handler]
logging.getLogger().setLevel(logging.INFO)
프로메테우스(요청/지연/에러 자동계측)
# app/metrics.py
from prometheus_fastapi_instrumentator import Instrumentator
def setup_metrics(app):
Instrumentator().instrument(app).expose(app, endpoint="/metrics")
OTEL(간단 셋업)
# app/otel.py
import os
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.sdk.resources import Resource
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace.export import BatchSpanProcessor
def setup_tracing(app):
provider = TracerProvider(resource=Resource.create({"service.name":"api"}))
exporter = OTLPSpanExporter(endpoint=os.getenv("OTLP_ENDPOINT","http://otel-collector:4317"), insecure=True)
provider.add_span_processor(BatchSpanProcessor(exporter))
trace.set_tracer_provider(provider)
FastAPIInstrumentor.instrument_app(app, excluded_urls="/healthz,/metrics")
앱 결합:
# app/main.py (일부)
from .metrics import setup_metrics
from .otel import setup_tracing
setup_metrics(app)
setup_tracing(app)
9) 업로드/바디 보호(대용량 방지 & 보안)
# app/mw_bodylimit.py
from starlette.middleware.base import BaseHTTPMiddleware
from fastapi import Request, HTTPException
class BodySizeLimitMiddleware(BaseHTTPMiddleware):
def __init__(self, app, max_body: int = 5*1024*1024):
super().__init__(app); self.max_body = max_body
async def dispatch(self, request: Request, call_next):
cl = request.headers.get("content-length")
if cl and int(cl) > self.max_body:
raise HTTPException(413, "Payload Too Large")
return await call_next(request)
app.add_middleware(BodySizeLimitMiddleware, max_body=5*1024*1024)
10) 그레이스풀 시작·종료
# app/lifecycle.py
from fastapi import FastAPI
from .db import engine
from .redis import redis_cli
def mount_lifecycle(app: FastAPI):
@app.on_event("startup")
async def startup():
# 프리워밍/헬스 체크 등
pass
@app.on_event("shutdown")
async def shutdown():
await redis_cli.close()
await engine.dispose()
# main.py 연결
from .lifecycle import mount_lifecycle
mount_lifecycle(app)
11) 성능 진단 빠른 루틴
- k6/locust로 p95·에러율 기준선 재측정
- 느린 엔드포인트 Top 10 → DB 쿼리/외부연동 분해
- 캐시 후보(핫리드, 파생조회) 선정 → Redis·ISR 적용
- 풀/동시성 한도(
--limit-concurrency, DB/HTTP 풀) 상향/하향 A/B max-requests로 워커 메모리 누수 방지- 트레이스 샘플링 5~10% → 병목 지점 시각화
12) 패키지 설치 요약
pip install "fastapi[all]" uvicorn[standard] gunicorn orjson \
sqlalchemy[asyncio] asyncpg \
redis tenacity httpx \
prometheus-fastapi-instrumentator \
opentelemetry-sdk opentelemetry-exporter-otlp opentelemetry-instrumentation-fastapi
마무리: 2주 액션 플랜(템플릿)
- gunicorn+uvicorn 프로필 도입(
--limit-concurrency, keep-alive 튜닝) - DB 풀/타임아웃 표준화 + PgBouncer 적용 + 느린쿼리 로그
- httpx 전역 AsyncClient + 재시도/타임아웃/풀 제한 + 회로차단
- Redis 리밋·캐시 적용(핵심 조회/쓰기 엔드포인트 3개)
- ORJSONResponse + ETag/Cache-Control 적용
- 구조화 로그 + /metrics + OTEL 수집 배선
- 그레이스풀 종료/헬스체크 고도화
다음은 3편. 데이터 레이어(Postgres/Redis/Search)와 캐싱 전략으로 간다.