- 패턴: Outbox + 워커(at-least-once) → 아이템포턴시 + 디듀프로 실무적 exactly-once에 근접
- 큐 선택: 단순 작업=SQS/Rabbit/Celery, 대량 스트림/순서·재처리=Kafka
- 필수: 재시도(backoff), DLQ, 지연 큐, 배치 처리, 백프레셔, 오토스케일(KEDA)
- 가시성: 큐 지연(랙), 처리 시간, 실패율, 재시도 횟수 대시보드 고정
- 프런트: 워커 완료→ 웹훅/웹소켓/SSE or Next.js revalidateTag로 UI 갱신
1) 언제 비동기로 보내나?
- 외부 API/메일/알림/결제 후크/영상·이미지 처리/검색 색인/분석 집계
- “요청-응답 SLA”를 지키기 위해 API는 작업 접수만 하고 바로 202/200 반환
2) 기술 스택 한눈에
- 작업 큐: Celery(브로커: Redis/SQS/Rabbit), Dramatiq/RQ
- 스트림: Kafka/OpenSearch Ingest → 이벤트 소싱/재처리/순서 보장
- 스케일러: KEDA(큐 길이/컨슈머 랙 기반 오토스케일)
- 스케줄: Celery beat / Cloud Scheduler / Argo Workflows
선택 가이드(간단):
- SQS(FIFO): 중복제거 키, 순서 보장, 관리형
- RabbitMQ: 라우팅 유연, 지연 큐 플러그인
- Kafka: 파티션/순서·재처리·대량 로그/분석 파이프라인
3) 트랜잭셔널 Outbox(핵심 패턴)
동일 트랜잭션에서 도메인 데이터 + outbox 이벤트 기록 → 워커가 안전히 발행.
테이블
CREATE TABLE outbox_events (
id uuid PRIMARY KEY DEFAULT uuid_generate_v4(),
topic text NOT NULL, -- ex) "user.created"
aggregate_id uuid NOT NULL,
event_type text NOT NULL, -- "created"|"updated"|...
payload jsonb NOT NULL,
dedupe_key text UNIQUE, -- 생산자 아이템포턴시 키
created_at timestamptz NOT NULL DEFAULT now(),
published_at timestamptz
);
CREATE INDEX ON outbox_events (published_at NULLS FIRST);
API 내 쓰기(동일 트랜잭션)
async with db.begin():
db.add(User(...))
db.add(OutboxEvent(
topic="user.created",
aggregate_id=user.id,
event_type="created",
payload={"id": str(user.id), "email": user.email},
dedupe_key=f"user.created:{user.id}",
))
퍼블리셔 워커(POSTGRES → SQS/카프카)
-- 하나 집어오고 잠금
WITH picked AS (
SELECT id FROM outbox_events
WHERE published_at IS NULL
ORDER BY created_at
FOR UPDATE SKIP LOCKED
LIMIT 1
)
UPDATE outbox_events o
SET published_at = now()
FROM picked p
WHERE o.id = p.id
RETURNING o.id, o.topic, o.payload, o.dedupe_key;
Python에서 RETURNING 결과를 받아 발행 실패 시 published_at을 되돌리거나(트랜잭션 롤백) 그대로 두고 재시도.
4) Celery(간단·강력한 작업 큐)
설정
# celery_app.py
from celery import Celery
app = Celery(
"exapp",
broker="redis://redis:6379/0", # 또는 sqs://, amqp://
backend="redis://redis:6379/1",
)
app.conf.update(
task_acks_late=True, # 처리 후 ack (중복 대비는 디듀프 필수)
worker_prefetch_multiplier=1, # 공정성
task_time_limit=120, task_soft_time_limit=110,
broker_transport_options={"visibility_timeout": 3600}, # SQS일 때
)
작업 정의(재시도·백오프)
# tasks.py
from celery_app import app
from time import sleep
@app.task(bind=True, autoretry_for=(Exception,), retry_backoff=True, retry_backoff_max=60, max_retries=5)
def send_welcome_email(self, user_id: str):
# 외부 API 호출… 실패 시 자동 재시도(지수 백오프)
...
FastAPI에서 작업 큐에 넣기
@app.post("/users")
async def create_user(dto: UserIn, db=Depends(get_db)):
user = await svc_create_user(dto, db) # 트랜잭션 내 outbox 기록
# 또는 단순 작업이면 직접 Celery로:
send_welcome_email.delay(str(user.id))
return {"id": str(user.id)}
5) Kafka(순서·재처리·대량)
컨슈머(배치·수동 커밋·그레이스풀)
import asyncio, signal, json
from aiokafka import AIOKafkaConsumer
async def consume():
consumer = AIOKafkaConsumer(
"user.created",
bootstrap_servers="kafka:9092",
enable_auto_commit=False,
group_id="exapp-user",
max_poll_records=200,
)
await consumer.start()
try:
while True:
msgs = await consumer.getmany(timeout_ms=500, max_records=500)
for tp, batch in msgs.items():
# 배치 처리 후 커밋
for m in batch:
data = json.loads(m.value)
await handle_user_created(data)
await consumer.commit()
finally:
await consumer.stop()
asyncio.run(consume())
- 파티션 키:
user_id등으로 같은 엔터티 순서 보장 - DLQ: 실패 메시지를
user.created.dlq로 별도 발행
6) 아이템포턴시 & 디듀프(실무 Exactly-once)
- 생산자:
dedupe_key(자연키) 저장 → 같은 키 중복 발행 방지 - 소비자: 처리 전 디듀프 테이블/캐시 확인
CREATE TABLE processed_events (key text PRIMARY KEY, processed_at timestamptz default now());
async def process_once(key: str):
try:
await db.execute("INSERT INTO processed_events(key) VALUES (:k)", {"k": key})
return True
except UniqueViolation:
return False
7) 실패 전략: 재시도·DLQ·지연 큐
- 재시도: 지수 백오프(초기 200~500ms) + 최대 재시도 횟수
- DLQ: 한계 초과/영구 실패 → DLQ로 이동, 대시보드·알림
- 지연 큐: SQS DelaySeconds / Rabbit 지연 플러그인 / Kafka 타임드 레코드
8) 백프레셔·오토스케일(KEDA)
- 생산자: 큐 길이↑ → 레이트리밋/서킷브레이커로 완화
- 소비자: KEDA 트리거
- SQS:
queueLength기준 - Kafka:
lagThreshold기준
- SQS:
예시(개념):
# ScaledObject (Kafka)
triggers:
- type: kafka
metadata:
bootstrapServers: kafka:9092
consumerGroup: exapp-user
topic: user.created
lagThreshold: "1000"
9) 이벤트 스키마·버전관리
- 스키마 강제: Pydantic/JSON Schema로 validate
- 버전 필드:
schema_version: 2 - 하위 호환 이벤트만 배포 → 소비자 점진적 업그레이드
class UserCreated(BaseModel):
schema_version: int = 1
id: UUID
email: EmailStr
10) 트레이싱/로깅(엔드-투-엔드)
- Trace Context 전파: 헤더 → 메시지 메타로 저장(
traceparent) - 워커에서 컨텍스트 복원 → API↔큐↔워커 단일 트레이스 연결
- 로그에 event_id, dedupe_key, retry_count, duration_ms 포함
11) 프런트 업데이트(Next.js)
- 워커가 완료 시:
- 웹훅으로 Next.js API route 호출 →
revalidateTag('user') - 사용자 세션 대상이면 WebSocket/SSE로 푸시
- 폴링이 필요하면
stale-while-revalidate로 체감 지연 최소화
- 웹훅으로 Next.js API route 호출 →
// app/api/revalidate/route.ts
import { revalidateTag } from 'next/cache';
export async function POST(req: Request) {
const { tag } = await req.json();
revalidateTag(tag);
return new Response('ok');
}
12) 보안·컴플라이언스 요점
- PII 최소화: 이벤트에 불필요한 개인정보 금지(토큰화/키 참조)
- 암호화: 전송(TLS), 저장(KMS/SSE)
- 보존: 이벤트 TTL/보관 정책(GDPR/국내법) 준수
13) 48시간 액션 플랜
- Outbox 테이블 추가, API 쓰기 트랜잭션에 outbox 기록
- 퍼블리셔 워커(SKIP LOCKED) 1개 가동 → SQS/Kafka 발행
- 소비자 워커에 아이템포턴시 디듀프 테이블 도입
- 재시도+DLQ 표준화(백오프 파라미터 고정)
- 대시보드: 큐 길이/랙, 처리 시간 p95, 실패율, DLQ 건수
- KEDA(또는 서버리스 스케일)로 오토스케일 초기 설정
- 워커 완료 → Next.js 태그 리밸리데이트 웹훅 연결
14) “복붙” 스니펫 모음
A. SQS FIFO 발행(중복 제거)
import boto3, json, os
sqs = boto3.client("sqs", region_name=os.getenv("AWS_REGION"))
def publish_fifo(queue_url, group_id, dedupe_key, payload):
sqs.send_message(
QueueUrl=queue_url,
MessageBody=json.dumps(payload, separators=(",",":")),
MessageGroupId=group_id, # 순서 그룹
MessageDeduplicationId=dedupe_key, # 중복 제거
)
B. 지연 재시도(지연 큐 없이 SQS Delay)
def retry_later(queue_url, body, delay_sec=30):
sqs.send_message(QueueUrl=queue_url, MessageBody=body, DelaySeconds=delay_sec)
C. Postgres 작업 큐(간단 Pull)
DELETE FROM job_queue
WHERE id IN (
SELECT id FROM job_queue
WHERE run_at <= now()
ORDER BY priority DESC, run_at
FOR UPDATE SKIP LOCKED
LIMIT 100
)
RETURNING *;
마무리
- 핵심은 **“요청은 가볍게, 무거운 일은 안전하게 뒤로”**입니다. Outbox+워커, 재시도+DLQ, 아이템포턴시+디듀프로 안정성을 확보하세요.
- 다음 5편은 **인증/인가(OAuth2/OIDC, JWT/세션, RBAC/ABAC, 토큰 로테이션)**입니다.
답글 남기기