Framing Composed Operations

프로토콜 파서(Framing) + 합성 async 연산(Composed Operations)
핵심은 길이-프리픽스(Varint) 프레이밍과 이를 async_compose 로 감싼 async_read_frame / async_write_frame 입니다. (콜백·use_awaitable·use_future 모두 대응)


A. 길이-프리픽스 프레이밍 + 합성 연산 템플릿

  • 프레임 포맷: VARINT(length) | payload[length]
  • 읽기: 스트림에서 헤더(Varint) → 본문 정확 길이 만큼 읽고 std::string 반환
  • 쓰기: 헤더(Varint) + 페이로드를 zero-copy(이중 버퍼)로 async_write
  • 합성 연산: boost::asio::async_compose 를 사용해 토큰 불문(콜백/코루틴/퓨처) 인터페이스
// framed_proto_demo.cpp
// g++ -std=c++20 -O2 framed_proto_demo.cpp -lboost_system -lpthread -o framed_proto_demo
// Windows(MSVC+vcpkg): cl /std:c++20 framed_proto_demo.cpp /EHsc /I <vcpkg>\installed\<triplet>\include /link /LIBPATH:<vcpkg>\installed\<triplet>\lib

#include <boost/asio.hpp>
#include <boost/asio/awaitable.hpp>
#include <boost/asio/async_result.hpp>
#include <boost/asio/compose.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/signal_set.hpp>
#include <boost/asio/use_awaitable.hpp>
#include <boost/beast/core/flat_buffer.hpp>
#include <algorithm>
#include <array>
#include <cstdint>
#include <cstring>
#include <iostream>
#include <string>
#include <vector>

namespace asio = boost::asio;
using asio::awaitable;
using asio::use_awaitable;
using tcp = asio::ip::tcp;

// ---------- Varint (uint32) ----------
static inline std::size_t encode_varint32(uint32_t v, std::uint8_t out[5]) {
  std::size_t i = 0;
  while (v >= 0x80) { out[i++] = (std::uint8_t)(v | 0x80); v >>= 7; }
  out[i++] = (std::uint8_t)v;
  return i; // 1..5
}
static inline bool try_decode_varint32(const std::uint8_t* p, std::size_t n, uint32_t& val, std::size_t& used) {
  // 최대 5바이트. n이 부족하면 false
  uint32_t result = 0; int shift = 0; used = 0;
  for (; used < n && used < 5; ++used) {
    uint32_t byte = p[used];
    result |= (byte & 0x7fu) << shift;
    if ((byte & 0x80u) == 0) { val = result; return true; }
    shift += 7;
  }
  return false; // 더 필요
}

// ---------- async_write_frame: (stream, payload) -> (ec, bytes) ----------
template <class AsyncWriteStream, class CompletionToken>
auto async_write_frame(AsyncWriteStream& stream,
                       std::string_view payload,
                       CompletionToken&& token)
{
  struct Op {
    AsyncWriteStream& s;
    std::string_view  pl;
    std::array<std::uint8_t, 5> hdr{};
    std::size_t hdr_len = 0;

    template <class Self>
    void operator()(Self& self, const boost::system::error_code& ec = {}, std::size_t = 0) {
      if (ec) { self.complete(ec, 0); return; }
      hdr_len = encode_varint32(static_cast<uint32_t>(pl.size()), hdr.data());
      std::array<asio::const_buffer, 2> bufs{
        asio::buffer(hdr.data(), hdr_len),
        asio::buffer(pl.data(),  pl.size())
      };
      asio::async_write(s, bufs, std::move(self));
    }
  };
  return asio::async_compose<CompletionToken, void(boost::system::error_code, std::size_t)>(
      Op{stream, payload}, token, stream);
}

// ---------- async_read_frame: (stream, flat_buffer&) -> (ec, std::string) ----------
template <class AsyncReadStream, class CompletionToken>
auto async_read_frame(AsyncReadStream& stream,
                      boost::beast::flat_buffer& fb,
                      CompletionToken&& token)
{
  struct Op {
    AsyncReadStream& s;
    boost::beast::flat_buffer& fb;
    bool have_len = false;
    uint32_t need = 0; // payload bytes needed
    std::size_t header_used = 0;

    template <class Self>
    void operator()(Self& self, boost::system::error_code ec = {}, std::size_t = 0) {
      if (ec) { self.complete(ec, std::string{}); return; }

      for (;;) {
        // 1) 헤더(Varint) 파싱 시도
        if (!have_len) {
          auto cds = fb.cdata(); // const_buffer_sequence
          auto p   = static_cast<const std::uint8_t*>(cds.data());
          auto n   = cds.size();
          if (n == 0) {
            // 한 바이트라도 더 읽기
            asio::async_read(s, fb, asio::transfer_at_least(1), std::move(self));
            return;
          }
          uint32_t len = 0; std::size_t used = 0;
          if (!try_decode_varint32(p, n, len, used)) {
            // 헤더가 아직 불완전 → 최소 1바이트 더
            asio::async_read(s, fb, asio::transfer_at_least(1), std::move(self));
            return;
          }
          have_len = true;
          need = len;
          header_used = used;
          fb.consume(header_used); // 헤더 제거
        }

        // 2) payload 가 충분한지 확인
        if (fb.size() < need) {
          // 정확히 부족분만큼 읽기
          std::size_t deficit = need - fb.size();
          asio::async_read(s, fb, asio::transfer_exactly(deficit), std::move(self));
          return;
        }

        // 3) payload 추출
        std::string out; out.resize(need);
        auto seq = fb.data();
        std::size_t copied = 0;
        for (auto it = asio::buffer_sequence_begin(seq);
             it != asio::buffer_sequence_end(seq) && copied < need; ++it) {
          auto b = *it;
          auto m = std::min<std::size_t>(need - copied, asio::buffer_size(b));
          std::memcpy(out.data() + copied, asio::buffer_cast<const void*>(b), m);
          copied += m;
        }
        fb.consume(need);
        self.complete({}, std::move(out));
        return;
      }
    }
  };
  return asio::async_compose<CompletionToken, void(boost::system::error_code, std::string)>(
    Op{stream, fb}, token, stream);
}

// ---------- (선택) 라인-프레이밍 헬퍼 (CRLF, 최대 길이 보호) ----------
template <class AsyncReadStream, class CompletionToken>
auto async_read_line(AsyncReadStream& s,
                     boost::beast::flat_buffer& fb,
                     std::size_t max_line,
                     CompletionToken&& token)
{
  struct Op {
    AsyncReadStream& s; boost::beast::flat_buffer& fb; std::size_t cap;
    template <class Self>
    void operator()(Self& self, boost::system::error_code ec = {}, std::size_t = 0) {
      if (ec) { self.complete(ec, std::string{}); return; }
      if (fb.size() > cap) { self.complete(make_error_code(std::errc::message_size), std::string{}); return; }
      asio::async_read_until(s, fb, "\r\n", std::move(self));
    }
  };
  return asio::async_compose<CompletionToken, void(boost::system::error_code, std::string)>(
    Op{s, fb, max_line}, token, s);
}

// ---------- Demo: 에코 서버(프레임) + 간단 클라 ----------
awaitable<void> session(tcp::socket sock) {
  boost::beast::flat_buffer fb;
  try {
    for (;;) {
      auto msg = co_await async_read_frame(sock, fb, use_awaitable);
      // 예시: 대문자 변환 후 회신
      std::string up = msg; std::transform(up.begin(), up.end(), up.begin(), [](unsigned char c){return std::toupper(c);});
      co_await async_write_frame(sock, up, use_awaitable);
    }
  } catch (std::exception&) {
    // 연결 종료/에러 → 세션 종료
  }
  co_return;
}

awaitable<void> server(unsigned short port) {
  tcp::acceptor acc(co_await asio::this_coro::executor, {tcp::v4(), port});
  for (;;) {
    auto sock = co_await acc.async_accept(use_awaitable);
    asio::co_spawn(acc.get_executor(), session(std::move(sock)), asio::detached);
  }
}

awaitable<void> client(std::string host, unsigned short port) {
  tcp::resolver res(co_await asio::this_coro::executor);
  tcp::socket   s(co_await asio::this_coro::executor);
  auto eps = co_await res.async_resolve(host, std::to_string(port), use_awaitable);
  co_await asio::async_connect(s, eps, use_awaitable);

  // 요청들
  for (std::string q : {"hello", "MixedCase", "asio framed io"}) {
    co_await async_write_frame(s, q, use_awaitable);
    boost::beast::flat_buffer fb;
    auto reply = co_await async_read_frame(s, fb, use_awaitable);
    std::cout << "REQ=" << q << "  REP=" << reply << "\n";
  }
  s.close();
  co_return;
}

int main(int argc, char** argv) {
  try {
    if (argc < 2) {
      std::cout << "Usage:\n  server <port>\n  client <host> <port>\n";
      return 0;
    }

    asio::io_context io;
    asio::signal_set sig(io, SIGINT
#if !defined(_WIN32)
      , SIGTERM
#endif
    );
    sig.async_wait([&](auto, auto){ io.stop(); });

    std::string mode = argv[1];
    if (mode == "server") {
      if (argc < 3) { std::cerr << "server <port>\n"; return 1; }
      unsigned short port = static_cast<unsigned short>(std::stoi(argv[2]));
      asio::co_spawn(io, server(port), asio::detached);
    } else if (mode == "client") {
      if (argc < 4) { std::cerr << "client <host> <port>\n"; return 1; }
      std::string host = argv[2];
      unsigned short port = static_cast<unsigned short>(std::stoi(argv[3]));
      asio::co_spawn(io, client(host, port), asio::detached);
    } else {
      std::cerr << "unknown mode\n"; return 1;
    }

    io.run();
  } catch (std::exception& e) {
    std::cerr << "fatal: " << e.what() << "\n";
    return 1;
  }
}

사용 포인트

  • async_read_frame/async_write_frameCompletionToken 추상화로,
    콜백/코루틴/퓨처 스타일 모두 동일 함수로 사용됩니다.
  • flat_buffer호출자 소유로 두면 다중 프레임 읽기에서 남은 바이트 재사용이 됩니다.
  • 라인 기반이 필요하면 async_read_until 기반(위 async_read_line)처럼 최대 길이로 DoS를 방지하세요.

B. 더 깊은 확장 포인트(프레이밍/파서)

  1. 헤더 확장
    • MAGIC(2) | FLAGS(1) | VARINT(length) | TYPE(1) | payload | [CRC32(4)]
    • FLAGS 로 압축/암호화/치환 여부, TYPE 으로 메시지 타입 식별.
    • CRC/AEAD 무결성은 UDPTLS-없음 구간에서 유용.
  2. 증분 파서(Incremental Parser)
    • 현재는 단일 프레임 read. 더 빠르게 하려면 루프 내에서 여러 프레임을 한 번에 파싱(coalesced TCP 세그먼트 흡수) → 콜백 호출을 줄여 CPU/락 경합 감소.
  3. 합성 “트랜잭션”
    • async_call(stream, request) = write_frameread_frame 까지 묶은 한 번의 합성 연산으로 제공 → 상위 레이어에서 시간 제한/취소동시 호출 개수(세마포어) 제어가 쉬움.
  4. 백프레셔
    • 송신 큐 적체 시 async_write 완료까지 inflight write 개수 제한(bounded queue) + strand 직렬화.
    • 수신은 최대 프레임 길이 상한 + flat_buffer capacity cap 권장.
  5. 취소/타임아웃
    • 상위에서 steady_timer + cancel() 패턴(①에서 소개)으로 read_frame/call타임아웃 래핑.
  6. 프로토콜 테스팅
    • fuzz(바이트 뒤섞기), 파셜 read/write, 역순/중복 프레임 주입용 테스트 하네스.
    • Property-based(빠르게 랜덤 길이/내용) + PCAP 재생으로 회귀 테스트.

C. “지난 답변 중 빠진 것” 정리 (깊게)

아래는 ①~③ 템플릿에서 의도적으로 단순화했거나 실전에서 자주 보강되는 항목들입니다. 확장/적용 시 바로 넣으면 좋아요.

① 단일 연결 제한/레이트 리미트 TCP 서버

  • Inbound 레이트 리미트: 예시에는 송신(Outbound)만 토큰 버킷 적용. 수신 바이트/초도 제한해야 폭주 클라 방어 가능.
  • 쓰레드 안전/Fairness: 다수 세션이 있을 때 per-connection 토큰 버킷 + 전역 토큰(대역폭 상한) 을 조합하고, 라운드로빈 또는 WFQ(가중 공정 큐잉) 로 공정화.
  • 백프레셔/큐 상한: async_write 대기 큐를 bounded 로 두고, 초과 시 드롭/에러 정책.
  • 프레이밍 상한: async_read_until 사용 시 최대 라인 길이(예: 8KB)로 DoS 방지.
  • 네트워크 옵션: tcp::no_delay, reuse_address, backlog 튜닝, SO_RCVBUF/SO_SNDBUF 조절.
  • Graceful 종료: 서버 stop 시 acceptor.close() 후, 세션에 Drain 윈도우(N초) 제공.
  • 관측성: 구조화 로깅(요청ID), 메트릭(접속수/에러/타임아웃/처리량) Prometheus 노출.
  • 보안: 인증/인가(토큰·mTLS), IP당 연결/초 제한, slowloris 대응(헤더/바디 타임아웃).

② TLS(Boost.Beast/SNI) HTTP/WS

  • 검증 모드(클라이언트): 데모에선 verify_peer 비활성. 실서비스는 CA 신뢰 스토어 설정 + 호스트네임 검증 필수.
  • ALPN/HTTP2: SSL_CTX_set_alpn_select_cb 로 h2 협상(필요시).
  • OCSP Stapling: 체인 상태 실시간 확인(대규모 배포 시 중요).
  • 세션 재사용: 세션 티켓/캐시, 키 롤링 전략.
  • 핸드셰이크 타임아웃: 느린 클라 방어. IP당 동시 핸드셰이크 제한 같이.
  • 인증서 재로딩: SIGHUP/관리 API로 무중단 로테이션(SNI 맵 스왑은 RCU/락).
  • 암호군/Cipher Policy: 현대 곡선(ECDHE)·AES-GCM/CHACHA20 선호, TLS1.2/1.3 정책 문서화.

③ UDP Coalesce/Fragment

  • 무결성/보안: CRC32/Adler-32 또는 AEAD(ChaCha20-Poly1305) 를 프레임에 적용.
  • 신뢰성 계층: ACK/NACK + 재전송 + 슬라이딩 윈도우 + 중복 억제.
  • 혼잡 제어: 응답 RTT/손실 기반 송신 윈도우 조절(최소한 전송 속도 상한).
  • 배치 I/O: 리눅스 recvmmsg/sendmmsg(Asio 커스텀 익스텐션)로 syscall 감소.
  • 소켓 옵션: SO_RCVBUF/SO_SNDBUF 확대, ECN/DSCP 마킹, IP_MTU_DISCOVER/PMTUD.
  • 멀티큐/RSS: 고부하 환경에서 reuse_port + 코어 바인딩으로 확장.
  • NAT 유지/키핑: 주기적 keepalive 프레임, 연결ID(CID) 부여.
  • 메모리 보호: 재조립 테이블 총 메모리 상한 + LRU + 메시지별 타임아웃/최대 조각 수 제한.

④ 프레이밍/합성 연산(본 템플릿)

  • 여러 프레임 한 번에 파싱: 현재는 1프레임 반환. 고성능에선 루프 내 다중 프레임 파싱 + 콜백/채널로 배출.
  • 에러 분류: message_too_big, bad_varint, crc_mismatch구체 에러 코드 설계.
  • 프로토콜 버전 협상: 첫 프레임에 버전/기능 비트 교환, 다운그레이드 정책.
  • 콜 합성: async_call(write→read) 을 별도 합성 연산으로 제공하고 타임아웃/취소 토큰을 인자화.
  • 스키마: JSON/CBOR/Cap’n Proto/FlatBuffers 등과 결합 시 메시지 경량 검증 추가.

“프로덕션 강화판”

위 프레이밍에 CRC32/AEAD, async_call() 합성 연산, 콜당 타임아웃/취소 토큰, bounded write-queue + strand 를 바로 얹어야 함.

코멘트

답글 남기기

이메일 주소는 공개되지 않습니다. 필수 필드는 *로 표시됩니다