[카테고리:] 미분류

  • 비동기·이벤트 처리(큐/스트림/워크플로)

    • 패턴: Outbox + 워커(at-least-once) → 아이템포턴시 + 디듀프로 실무적 exactly-once에 근접
    • 큐 선택: 단순 작업=SQS/Rabbit/Celery, 대량 스트림/순서·재처리=Kafka
    • 필수: 재시도(backoff), DLQ, 지연 큐, 배치 처리, 백프레셔, 오토스케일(KEDA)
    • 가시성: 큐 지연(랙), 처리 시간, 실패율, 재시도 횟수 대시보드 고정
    • 프런트: 워커 완료→ 웹훅/웹소켓/SSE or Next.js revalidateTag로 UI 갱신

    1) 언제 비동기로 보내나?

    • 외부 API/메일/알림/결제 후크/영상·이미지 처리/검색 색인/분석 집계
    • “요청-응답 SLA”를 지키기 위해 API는 작업 접수만 하고 바로 202/200 반환

    2) 기술 스택 한눈에

    • 작업 큐: Celery(브로커: Redis/SQS/Rabbit), Dramatiq/RQ
    • 스트림: Kafka/OpenSearch Ingest → 이벤트 소싱/재처리/순서 보장
    • 스케일러: KEDA(큐 길이/컨슈머 랙 기반 오토스케일)
    • 스케줄: Celery beat / Cloud Scheduler / Argo Workflows

    선택 가이드(간단):

    • SQS(FIFO): 중복제거 키, 순서 보장, 관리형
    • RabbitMQ: 라우팅 유연, 지연 큐 플러그인
    • Kafka: 파티션/순서·재처리·대량 로그/분석 파이프라인

    3) 트랜잭셔널 Outbox(핵심 패턴)

    동일 트랜잭션에서 도메인 데이터 + outbox 이벤트 기록 → 워커가 안전히 발행.

    테이블

    CREATE TABLE outbox_events (
      id uuid PRIMARY KEY DEFAULT uuid_generate_v4(),
      topic text NOT NULL,              -- ex) "user.created"
      aggregate_id uuid NOT NULL,
      event_type text NOT NULL,         -- "created"|"updated"|...
      payload jsonb NOT NULL,
      dedupe_key text UNIQUE,           -- 생산자 아이템포턴시 키
      created_at timestamptz NOT NULL DEFAULT now(),
      published_at timestamptz
    );
    CREATE INDEX ON outbox_events (published_at NULLS FIRST);
    

    API 내 쓰기(동일 트랜잭션)

    async with db.begin():
        db.add(User(...))
        db.add(OutboxEvent(
            topic="user.created",
            aggregate_id=user.id,
            event_type="created",
            payload={"id": str(user.id), "email": user.email},
            dedupe_key=f"user.created:{user.id}",
        ))
    

    퍼블리셔 워커(POSTGRES → SQS/카프카)

    -- 하나 집어오고 잠금
    WITH picked AS (
      SELECT id FROM outbox_events
      WHERE published_at IS NULL
      ORDER BY created_at
      FOR UPDATE SKIP LOCKED
      LIMIT 1
    )
    UPDATE outbox_events o
    SET published_at = now()
    FROM picked p
    WHERE o.id = p.id
    RETURNING o.id, o.topic, o.payload, o.dedupe_key;
    

    Python에서 RETURNING 결과를 받아 발행 실패 시 published_at을 되돌리거나(트랜잭션 롤백) 그대로 두고 재시도.


    4) Celery(간단·강력한 작업 큐)

    설정

    # celery_app.py
    from celery import Celery
    app = Celery(
        "exapp",
        broker="redis://redis:6379/0",            # 또는 sqs://, amqp://
        backend="redis://redis:6379/1",
    )
    app.conf.update(
        task_acks_late=True,                      # 처리 후 ack (중복 대비는 디듀프 필수)
        worker_prefetch_multiplier=1,             # 공정성
        task_time_limit=120, task_soft_time_limit=110,
        broker_transport_options={"visibility_timeout": 3600}, # SQS일 때
    )
    

    작업 정의(재시도·백오프)

    # tasks.py
    from celery_app import app
    from time import sleep
    
    @app.task(bind=True, autoretry_for=(Exception,), retry_backoff=True, retry_backoff_max=60, max_retries=5)
    def send_welcome_email(self, user_id: str):
        # 외부 API 호출… 실패 시 자동 재시도(지수 백오프)
        ...
    

    FastAPI에서 작업 큐에 넣기

    @app.post("/users")
    async def create_user(dto: UserIn, db=Depends(get_db)):
        user = await svc_create_user(dto, db)  # 트랜잭션 내 outbox 기록
        # 또는 단순 작업이면 직접 Celery로:
        send_welcome_email.delay(str(user.id))
        return {"id": str(user.id)}
    

    5) Kafka(순서·재처리·대량)

    컨슈머(배치·수동 커밋·그레이스풀)

    import asyncio, signal, json
    from aiokafka import AIOKafkaConsumer
    
    async def consume():
        consumer = AIOKafkaConsumer(
            "user.created",
            bootstrap_servers="kafka:9092",
            enable_auto_commit=False,
            group_id="exapp-user",
            max_poll_records=200,
        )
        await consumer.start()
        try:
            while True:
                msgs = await consumer.getmany(timeout_ms=500, max_records=500)
                for tp, batch in msgs.items():
                    # 배치 처리 후 커밋
                    for m in batch:
                        data = json.loads(m.value)
                        await handle_user_created(data)
                    await consumer.commit()
        finally:
            await consumer.stop()
    
    asyncio.run(consume())
    
    • 파티션 키: user_id 등으로 같은 엔터티 순서 보장
    • DLQ: 실패 메시지를 user.created.dlq로 별도 발행

    6) 아이템포턴시 & 디듀프(실무 Exactly-once)

    • 생산자: dedupe_key(자연키) 저장 → 같은 키 중복 발행 방지
    • 소비자: 처리 전 디듀프 테이블/캐시 확인
    CREATE TABLE processed_events (key text PRIMARY KEY, processed_at timestamptz default now());
    
    async def process_once(key: str):
        try:
            await db.execute("INSERT INTO processed_events(key) VALUES (:k)", {"k": key})
            return True
        except UniqueViolation:
            return False
    

    7) 실패 전략: 재시도·DLQ·지연 큐

    • 재시도: 지수 백오프(초기 200~500ms) + 최대 재시도 횟수
    • DLQ: 한계 초과/영구 실패 → DLQ로 이동, 대시보드·알림
    • 지연 큐: SQS DelaySeconds / Rabbit 지연 플러그인 / Kafka 타임드 레코드

    8) 백프레셔·오토스케일(KEDA)

    • 생산자: 큐 길이↑ → 레이트리밋/서킷브레이커로 완화
    • 소비자: KEDA 트리거
      • SQS: queueLength 기준
      • Kafka: lagThreshold 기준

    예시(개념):

    # ScaledObject (Kafka)
    triggers:
    - type: kafka
      metadata:
        bootstrapServers: kafka:9092
        consumerGroup: exapp-user
        topic: user.created
        lagThreshold: "1000"
    

    9) 이벤트 스키마·버전관리

    • 스키마 강제: Pydantic/JSON Schema로 validate
    • 버전 필드: schema_version: 2
    • 하위 호환 이벤트만 배포 → 소비자 점진적 업그레이드
    class UserCreated(BaseModel):
        schema_version: int = 1
        id: UUID
        email: EmailStr
    

    10) 트레이싱/로깅(엔드-투-엔드)

    • Trace Context 전파: 헤더 → 메시지 메타로 저장(traceparent)
    • 워커에서 컨텍스트 복원 → API↔큐↔워커 단일 트레이스 연결
    • 로그에 event_id, dedupe_key, retry_count, duration_ms 포함

    11) 프런트 업데이트(Next.js)

    • 워커가 완료 시:
      1. 웹훅으로 Next.js API route 호출 → revalidateTag('user')
      2. 사용자 세션 대상이면 WebSocket/SSE로 푸시
      3. 폴링이 필요하면 stale-while-revalidate로 체감 지연 최소화
    // app/api/revalidate/route.ts
    import { revalidateTag } from 'next/cache';
    export async function POST(req: Request) {
      const { tag } = await req.json();
      revalidateTag(tag);
      return new Response('ok');
    }
    

    12) 보안·컴플라이언스 요점

    • PII 최소화: 이벤트에 불필요한 개인정보 금지(토큰화/키 참조)
    • 암호화: 전송(TLS), 저장(KMS/SSE)
    • 보존: 이벤트 TTL/보관 정책(GDPR/국내법) 준수

    13) 48시간 액션 플랜

    1. Outbox 테이블 추가, API 쓰기 트랜잭션에 outbox 기록
    2. 퍼블리셔 워커(SKIP LOCKED) 1개 가동 → SQS/Kafka 발행
    3. 소비자 워커에 아이템포턴시 디듀프 테이블 도입
    4. 재시도+DLQ 표준화(백오프 파라미터 고정)
    5. 대시보드: 큐 길이/랙, 처리 시간 p95, 실패율, DLQ 건수
    6. KEDA(또는 서버리스 스케일)로 오토스케일 초기 설정
    7. 워커 완료 → Next.js 태그 리밸리데이트 웹훅 연결

    14) “복붙” 스니펫 모음

    A. SQS FIFO 발행(중복 제거)

    import boto3, json, os
    sqs = boto3.client("sqs", region_name=os.getenv("AWS_REGION"))
    def publish_fifo(queue_url, group_id, dedupe_key, payload):
        sqs.send_message(
            QueueUrl=queue_url,
            MessageBody=json.dumps(payload, separators=(",",":")),
            MessageGroupId=group_id,              # 순서 그룹
            MessageDeduplicationId=dedupe_key,    # 중복 제거
        )
    

    B. 지연 재시도(지연 큐 없이 SQS Delay)

    def retry_later(queue_url, body, delay_sec=30):
        sqs.send_message(QueueUrl=queue_url, MessageBody=body, DelaySeconds=delay_sec)
    

    C. Postgres 작업 큐(간단 Pull)

    DELETE FROM job_queue
    WHERE id IN (
      SELECT id FROM job_queue
      WHERE run_at <= now()
      ORDER BY priority DESC, run_at
      FOR UPDATE SKIP LOCKED
      LIMIT 100
    )
    RETURNING *;
    

    마무리

    • 핵심은 **“요청은 가볍게, 무거운 일은 안전하게 뒤로”**입니다. Outbox+워커, 재시도+DLQ, 아이템포턴시+디듀프로 안정성을 확보하세요.
    • 다음 5편은 **인증/인가(OAuth2/OIDC, JWT/세션, RBAC/ABAC, 토큰 로테이션)**입니다.