- 패턴: Outbox + 워커(at-least-once) → 아이템포턴시 + 디듀프로 실무적 exactly-once에 근접
- 큐 선택: 단순 작업=SQS/Rabbit/Celery, 대량 스트림/순서·재처리=Kafka
- 필수: 재시도(backoff), DLQ, 지연 큐, 배치 처리, 백프레셔, 오토스케일(KEDA)
- 가시성: 큐 지연(랙), 처리 시간, 실패율, 재시도 횟수 대시보드 고정
- 프런트: 워커 완료→ 웹훅/웹소켓/SSE or Next.js revalidateTag로 UI 갱신
1) 언제 비동기로 보내나?
- 외부 API/메일/알림/결제 후크/영상·이미지 처리/검색 색인/분석 집계
- “요청-응답 SLA”를 지키기 위해 API는 작업 접수만 하고 바로 202/200 반환
2) 기술 스택 한눈에
- 작업 큐: Celery(브로커: Redis/SQS/Rabbit), Dramatiq/RQ
- 스트림: Kafka/OpenSearch Ingest → 이벤트 소싱/재처리/순서 보장
- 스케일러: KEDA(큐 길이/컨슈머 랙 기반 오토스케일)
- 스케줄: Celery beat / Cloud Scheduler / Argo Workflows
선택 가이드(간단):
- SQS(FIFO): 중복제거 키, 순서 보장, 관리형
- RabbitMQ: 라우팅 유연, 지연 큐 플러그인
- Kafka: 파티션/순서·재처리·대량 로그/분석 파이프라인
3) 트랜잭셔널 Outbox(핵심 패턴)
동일 트랜잭션에서 도메인 데이터 + outbox 이벤트 기록 → 워커가 안전히 발행.
테이블
CREATE TABLE outbox_events (
  id uuid PRIMARY KEY DEFAULT uuid_generate_v4(),
  topic text NOT NULL,              -- ex) "user.created"
  aggregate_id uuid NOT NULL,
  event_type text NOT NULL,         -- "created"|"updated"|...
  payload jsonb NOT NULL,
  dedupe_key text UNIQUE,           -- 생산자 아이템포턴시 키
  created_at timestamptz NOT NULL DEFAULT now(),
  published_at timestamptz
);
CREATE INDEX ON outbox_events (published_at NULLS FIRST);
API 내 쓰기(동일 트랜잭션)
async with db.begin():
    db.add(User(...))
    db.add(OutboxEvent(
        topic="user.created",
        aggregate_id=user.id,
        event_type="created",
        payload={"id": str(user.id), "email": user.email},
        dedupe_key=f"user.created:{user.id}",
    ))
퍼블리셔 워커(POSTGRES → SQS/카프카)
-- 하나 집어오고 잠금
WITH picked AS (
  SELECT id FROM outbox_events
  WHERE published_at IS NULL
  ORDER BY created_at
  FOR UPDATE SKIP LOCKED
  LIMIT 1
)
UPDATE outbox_events o
SET published_at = now()
FROM picked p
WHERE o.id = p.id
RETURNING o.id, o.topic, o.payload, o.dedupe_key;
Python에서 RETURNING 결과를 받아 발행 실패 시 published_at을 되돌리거나(트랜잭션 롤백) 그대로 두고 재시도.
4) Celery(간단·강력한 작업 큐)
설정
# celery_app.py
from celery import Celery
app = Celery(
    "exapp",
    broker="redis://redis:6379/0",            # 또는 sqs://, amqp://
    backend="redis://redis:6379/1",
)
app.conf.update(
    task_acks_late=True,                      # 처리 후 ack (중복 대비는 디듀프 필수)
    worker_prefetch_multiplier=1,             # 공정성
    task_time_limit=120, task_soft_time_limit=110,
    broker_transport_options={"visibility_timeout": 3600}, # SQS일 때
)
작업 정의(재시도·백오프)
# tasks.py
from celery_app import app
from time import sleep
@app.task(bind=True, autoretry_for=(Exception,), retry_backoff=True, retry_backoff_max=60, max_retries=5)
def send_welcome_email(self, user_id: str):
    # 외부 API 호출… 실패 시 자동 재시도(지수 백오프)
    ...
FastAPI에서 작업 큐에 넣기
@app.post("/users")
async def create_user(dto: UserIn, db=Depends(get_db)):
    user = await svc_create_user(dto, db)  # 트랜잭션 내 outbox 기록
    # 또는 단순 작업이면 직접 Celery로:
    send_welcome_email.delay(str(user.id))
    return {"id": str(user.id)}
5) Kafka(순서·재처리·대량)
컨슈머(배치·수동 커밋·그레이스풀)
import asyncio, signal, json
from aiokafka import AIOKafkaConsumer
async def consume():
    consumer = AIOKafkaConsumer(
        "user.created",
        bootstrap_servers="kafka:9092",
        enable_auto_commit=False,
        group_id="exapp-user",
        max_poll_records=200,
    )
    await consumer.start()
    try:
        while True:
            msgs = await consumer.getmany(timeout_ms=500, max_records=500)
            for tp, batch in msgs.items():
                # 배치 처리 후 커밋
                for m in batch:
                    data = json.loads(m.value)
                    await handle_user_created(data)
                await consumer.commit()
    finally:
        await consumer.stop()
asyncio.run(consume())
- 파티션 키: user_id등으로 같은 엔터티 순서 보장
- DLQ: 실패 메시지를 user.created.dlq로 별도 발행
6) 아이템포턴시 & 디듀프(실무 Exactly-once)
- 생산자: dedupe_key(자연키) 저장 → 같은 키 중복 발행 방지
- 소비자: 처리 전 디듀프 테이블/캐시 확인
CREATE TABLE processed_events (key text PRIMARY KEY, processed_at timestamptz default now());
async def process_once(key: str):
    try:
        await db.execute("INSERT INTO processed_events(key) VALUES (:k)", {"k": key})
        return True
    except UniqueViolation:
        return False
7) 실패 전략: 재시도·DLQ·지연 큐
- 재시도: 지수 백오프(초기 200~500ms) + 최대 재시도 횟수
- DLQ: 한계 초과/영구 실패 → DLQ로 이동, 대시보드·알림
- 지연 큐: SQS DelaySeconds / Rabbit 지연 플러그인 / Kafka 타임드 레코드
8) 백프레셔·오토스케일(KEDA)
- 생산자: 큐 길이↑ → 레이트리밋/서킷브레이커로 완화
- 소비자: KEDA 트리거
- SQS: queueLength기준
- Kafka: lagThreshold기준
 
- SQS: 
예시(개념):
# ScaledObject (Kafka)
triggers:
- type: kafka
  metadata:
    bootstrapServers: kafka:9092
    consumerGroup: exapp-user
    topic: user.created
    lagThreshold: "1000"
9) 이벤트 스키마·버전관리
- 스키마 강제: Pydantic/JSON Schema로 validate
- 버전 필드: schema_version: 2
- 하위 호환 이벤트만 배포 → 소비자 점진적 업그레이드
class UserCreated(BaseModel):
    schema_version: int = 1
    id: UUID
    email: EmailStr
10) 트레이싱/로깅(엔드-투-엔드)
- Trace Context 전파: 헤더 → 메시지 메타로 저장(traceparent)
- 워커에서 컨텍스트 복원 → API↔큐↔워커 단일 트레이스 연결
- 로그에 event_id, dedupe_key, retry_count, duration_ms 포함
11) 프런트 업데이트(Next.js)
- 워커가 완료 시:
- 웹훅으로 Next.js API route 호출 → revalidateTag('user')
- 사용자 세션 대상이면 WebSocket/SSE로 푸시
- 폴링이 필요하면 stale-while-revalidate로 체감 지연 최소화
 
- 웹훅으로 Next.js API route 호출 → 
// app/api/revalidate/route.ts
import { revalidateTag } from 'next/cache';
export async function POST(req: Request) {
  const { tag } = await req.json();
  revalidateTag(tag);
  return new Response('ok');
}
12) 보안·컴플라이언스 요점
- PII 최소화: 이벤트에 불필요한 개인정보 금지(토큰화/키 참조)
- 암호화: 전송(TLS), 저장(KMS/SSE)
- 보존: 이벤트 TTL/보관 정책(GDPR/국내법) 준수
13) 48시간 액션 플랜
- Outbox 테이블 추가, API 쓰기 트랜잭션에 outbox 기록
- 퍼블리셔 워커(SKIP LOCKED) 1개 가동 → SQS/Kafka 발행
- 소비자 워커에 아이템포턴시 디듀프 테이블 도입
- 재시도+DLQ 표준화(백오프 파라미터 고정)
- 대시보드: 큐 길이/랙, 처리 시간 p95, 실패율, DLQ 건수
- KEDA(또는 서버리스 스케일)로 오토스케일 초기 설정
- 워커 완료 → Next.js 태그 리밸리데이트 웹훅 연결
14) “복붙” 스니펫 모음
A. SQS FIFO 발행(중복 제거)
import boto3, json, os
sqs = boto3.client("sqs", region_name=os.getenv("AWS_REGION"))
def publish_fifo(queue_url, group_id, dedupe_key, payload):
    sqs.send_message(
        QueueUrl=queue_url,
        MessageBody=json.dumps(payload, separators=(",",":")),
        MessageGroupId=group_id,              # 순서 그룹
        MessageDeduplicationId=dedupe_key,    # 중복 제거
    )
B. 지연 재시도(지연 큐 없이 SQS Delay)
def retry_later(queue_url, body, delay_sec=30):
    sqs.send_message(QueueUrl=queue_url, MessageBody=body, DelaySeconds=delay_sec)
C. Postgres 작업 큐(간단 Pull)
DELETE FROM job_queue
WHERE id IN (
  SELECT id FROM job_queue
  WHERE run_at <= now()
  ORDER BY priority DESC, run_at
  FOR UPDATE SKIP LOCKED
  LIMIT 100
)
RETURNING *;
마무리
- 핵심은 **“요청은 가볍게, 무거운 일은 안전하게 뒤로”**입니다. Outbox+워커, 재시도+DLQ, 아이템포턴시+디듀프로 안정성을 확보하세요.
- 다음 5편은 **인증/인가(OAuth2/OIDC, JWT/세션, RBAC/ABAC, 토큰 로테이션)**입니다.