原文链接: C++编程:利用环形缓冲区优化 TCP 发送流程,避免 Short Write 问题

前置阅读: Linux编程:解析 EAGAIN 错误 Resource temporarily unavailable

相关: SPSC 无锁环形缓冲区设计剖析 | ARM-Linux 网络性能优化


1. 问题域: Short Write 与 EAGAIN

1.1 什么是 TCP Short Write

非阻塞模式下调用 send() / write(),内核 TCP 发送缓冲区空间不足时,系统调用只写入部分字节并返回实际写入数,errno 置为 EAGAIN / EWOULDBLOCK。这就是 short write。

应用层: send(fd, buf, 4096)
        ↓ 内核 TCP 发送缓冲区只剩 1500 字节
        返回 1500 (而非 4096)
        剩余 2596 字节需要应用层自行处理

阻塞模式下 send() 会等待直到全部写完,但阻塞会导致线程挂起,在 epoll 事件循环中不可接受。

1.2 EAGAIN: 不是错误,是"稍后重试"

EAGAIN (errno 11) 是 POSIX 标准定义的瞬态错误码,表示"资源暂时不可用"。在非阻塞 I/O 中,它的含义是:

  • send() 返回 -1, errno=EAGAIN: 内核 TCP 发送缓冲区已满,一个字节也写不进去。不是连接错误,稍后重试可能成功。
  • send() 返回 N (0 < N < len): 写入了部分字节 (short write)。不一定触发 EAGAIN,但后续 send 可能触发。

关键区分:

返回值errno含义处理方式
N > 0-成功写入 N 字节推进指针,继续写剩余
-1EAGAIN / EWOULDBLOCK内核缓冲满等待 EPOLLOUT 后重试
-1EINTR被信号中断立即重试
-1EPIPE / ECONNRESET连接已断关闭连接
0-对端关闭关闭连接

一个常见的错误是把 EAGAIN 当作致命错误直接断开连接 – 这会导致在网络拥塞或接收端处理缓慢时产生不必要的连接断开。

1.3 正确的处理策略

  1. 同步重试 (简单场景): 循环调用 send(),遇到 EAGAIN 时 yield() + 重试,超过上限则报告失败。适合低频场景。
  2. 用户态发送缓冲 + EPOLLOUT (高吞吐场景): 数据先写入环形缓冲区,由 EPOLLOUT 事件驱动异步刷写。不阻塞调用线程。

本文重点讨论方案 2。


2. 原始方案分析

原文实现了一个 LockFreeBytesBuffer (SPSC 字节环形缓冲) + SocketContext (epoll 事件驱动)。核心思路正确,但代码存在 5 个工程问题。

2.1 原始代码 (关键部分)

class LockFreeBytesBuffer {
 public:
  static const std::size_t kBufferSize = 10240U;

  bool append(const char* data, std::size_t length) noexcept {
    const std::size_t current_write = writer_index_.load(std::memory_order_relaxed);
    const std::size_t current_read = reader_index_.load(std::memory_order_acquire);
    const std::size_t free_space =
        (current_read + kBufferSize - current_write - 1U) % kBufferSize;
    if (length > free_space) return false;

    const std::size_t pos = current_write % kBufferSize;
    const std::size_t first_part = std::min(length, kBufferSize - pos);
    std::memcpy(&buffer_[pos], data, first_part);
    std::memcpy(&buffer_[0], data + first_part, length - first_part);
    writer_index_.store(current_write + length, std::memory_order_release);
    return true;
  }

  std::size_t beginRead(const char** target) noexcept {
    const std::size_t current_read = reader_index_.load(std::memory_order_relaxed);
    const std::size_t current_write = writer_index_.load(std::memory_order_acquire);
    const std::size_t available = (current_write - current_read) % kBufferSize;
    if (available == 0U) return 0U;

    const std::size_t pos = current_read % kBufferSize;
    *target = &buffer_[pos];
    return std::min(available, kBufferSize - pos);
  }

  void endRead(std::size_t length) noexcept {
    const std::size_t current_read = reader_index_.load(std::memory_order_relaxed);
    reader_index_.store(current_read + length, std::memory_order_release);
  }

 private:
  char buffer_[kBufferSize];
  std::atomic<std::size_t> reader_index_{0};
  std::atomic<std::size_t> writer_index_{0};
};

2.2 问题 1: 缓冲区大小不是 2 的幂

static const std::size_t kBufferSize = 10240U;  // 不是 2 的幂

const std::size_t pos = current_write % kBufferSize;  // 除法取模

10240 不是 2 的幂,% 10240 编译器无法优化为位掩码 & (N-1),在 ARM Cortex-A53 上一次除法需要 ~20 个时钟周期,位与只需 1 个。

每次 append()beginRead() 各有 2 次取模,单次 I/O 操作多出 ~60 ns 的无谓开销。

修正: 缓冲区大小用 2 的幂,取模改为位与:

static constexpr std::size_t kBufferSize = 8192U;  // 2^13
static constexpr std::size_t kMask = kBufferSize - 1U;

const std::size_t pos = current_write & kMask;  // 1 条 AND 指令

2.3 问题 2: 无界索引 + 非 2 幂 = 溢出隐患

writer_index_reader_index_ 是无界递增的 size_t。如果 kBufferSize 是 2 的幂,无符号溢出后 (write - read) 的差值仍然正确 (利用无符号算术的回绕性质)。但 kBufferSize = 10240 下:

std::size_t available = (current_write - current_read) % kBufferSize;

write - read 接近 SIZE_MAX 时,% 10240 的结果不等于实际有效数据量。虽然在实践中 size_t 的回绕周期极长 (64-bit 下约 1.8 x 10^19),但设计上不应依赖此假设。

修正: 使用 2 的幂后,索引差值天然正确:

// 2 的幂下,无符号差值 & kMask 始终正确
std::size_t available = (current_write - current_read);  // 无需 % 或 &
// available 直接表示有效数据量,因为 write 永远 >= read

2.4 问题 3: 测试程序内存泄漏

// 原始代码: unique_ptr 创建后未存储
std::unique_ptr<SocketContext> client =
    std::make_unique<SocketContext>(epoll_fd, client_fd);

ev.data.ptr = client.get();
// client 在此作用域结束后析构,data.ptr 变成悬空指针

unique_ptr 在栈上创建,离开 if 块后立即析构,epoll_event.data.ptr 指向已释放内存。后续 EPOLLOUT 事件触发时解引用这个悬空指针,行为未定义。

此外,addFd() 在构造函数中已经 EPOLL_CTL_ADD 了一次,main() 中又加了一次,造成重复注册。

2.5 问题 4: doSend 不处理部分发送

int doSend() {
  const char* pdata = nullptr;
  std::size_t data_size = buffer_.beginRead(&pdata);
  if (data_size == 0) return 0;

  int send_size = send(sock_fd_, pdata, static_cast<int>(data_size), MSG_DONTWAIT);
  if (send_size > 0) {
    buffer_.endRead(static_cast<std::size_t>(send_size));
  }
  return send_size;
}

send() 可能只发送了 data_size 的一部分 (short write)。此时 endRead(send_size) 正确推进了读指针,但没有重新注册 EPOLLOUT 来触发下一次刷写。在 EPOLLET (边缘触发) 模式下,如果不重新 MOD 事件,剩余数据将永远不会被发送。

LT (水平触发) 模式下问题较轻,因为只要发送缓冲区可写,EPOLLOUT 会持续触发。但原文使用 EPOLLONESHOT,每次事件后必须重新注册。

修正: doSend() 返回后检查缓冲区是否还有数据,有则重新注册 EPOLLOUT:

int doSend() {
  // ... send logic ...
  if (send_size > 0) {
    buffer_.endRead(static_cast<std::size_t>(send_size));
  }
  // 缓冲区非空,继续注册 EPOLLOUT
  if (buffer_.available() > 0) {
    modifyEvent(true, true);  // EPOLLIN + EPOLLOUT
  } else {
    modifyEvent(true, false);  // 只保留 EPOLLIN
  }
  return send_size;
}

2.6 问题 5: EAGAIN 处理不完整

if (send_size == -1 && errno != EAGAIN) {
  fprintf(stderr, "send failed, error: %s\n", strerror(errno));
}

两个问题:

  • 缺少 EWOULDBLOCK 检查 (POSIX 允许 EAGAIN != EWOULDBLOCK,虽然 Linux 上相等)
  • EINTR (被信号中断) 也应当重试,而非静默忽略

3. 工程级改进方案

3.1 SendBuffer: 2 的幂字节环形缓冲

/// @brief SPSC 字节环形缓冲区,用于 TCP 非阻塞发送缓冲
/// @tparam SizeLog2 缓冲区大小的 log2 值 (默认 13 = 8KB)
template <uint32_t SizeLog2 = 13>
class SendBuffer {
 public:
  static constexpr uint32_t kSize = 1U << SizeLog2;
  static constexpr uint32_t kMask = kSize - 1U;

  /// @brief 写入数据到缓冲区 (生产者线程调用)
  /// @return 实际写入的字节数 (可能小于 len,表示缓冲区满)
  uint32_t Write(const uint8_t* data, uint32_t len) noexcept {
    const uint32_t w = write_idx_.load(std::memory_order_relaxed);
    const uint32_t r = read_idx_.load(std::memory_order_acquire);
    const uint32_t free = kSize - (w - r);  // 无符号差值在 2 的幂下天然正确
    const uint32_t to_write = (len < free) ? len : free;
    if (to_write == 0) return 0;

    const uint32_t pos = w & kMask;
    const uint32_t first = (kSize - pos < to_write) ? (kSize - pos) : to_write;
    std::memcpy(&buf_[pos], data, first);
    if (first < to_write) {
      std::memcpy(&buf_[0], data + first, to_write - first);
    }
    write_idx_.store(w + to_write, std::memory_order_release);
    return to_write;
  }

  /// @brief 获取可读数据的连续区间指针 (消费者线程调用)
  /// @param[out] ptr 指向缓冲区内数据起始位置 (零拷贝)
  /// @return 连续可读字节数 (不跨环形边界)
  uint32_t Peek(const uint8_t** ptr) noexcept {
    const uint32_t r = read_idx_.load(std::memory_order_relaxed);
    const uint32_t w = write_idx_.load(std::memory_order_acquire);
    const uint32_t avail = w - r;
    if (avail == 0) return 0;

    const uint32_t pos = r & kMask;
    *ptr = &buf_[pos];
    const uint32_t contig = kSize - pos;
    return (avail < contig) ? avail : contig;
  }

  /// @brief 消费者确认已读取 len 字节
  void Consume(uint32_t len) noexcept {
    read_idx_.fetch_add(len, std::memory_order_release);
  }

  /// @brief 查询缓冲区内待发送数据量
  uint32_t Pending() const noexcept {
    return write_idx_.load(std::memory_order_acquire)
         - read_idx_.load(std::memory_order_relaxed);
  }

  bool IsEmpty() const noexcept { return Pending() == 0; }

 private:
  alignas(64) std::atomic<uint32_t> write_idx_{0};
  alignas(64) std::atomic<uint32_t> read_idx_{0};
  alignas(64) uint8_t buf_[kSize]{};
};

与原始 LockFreeBytesBuffer 的设计差异:

设计点原始方案改进方案
缓冲区大小10240 (非 2 幂)1 << SizeLog2 (编译期保证)
索引取模% kBufferSize (除法)& kMask (1 条指令)
可用空间计算(r + N - w - 1) % NN - (w - r) (无符号差值)
缓存行对齐alignas(64) 消除伪共享
API 设计beginRead/endRead 分离Peek/Consume (更明确语义)
索引类型size_t (8 字节)uint32_t (4 字节, 嵌入式友好)

3.2 AsyncSocket: 事件驱动异步发送

/// @brief 非阻塞 TCP socket,内置发送缓冲区
class AsyncSocket {
 public:
  AsyncSocket(int epoll_fd, int sock_fd) noexcept
      : epoll_fd_(epoll_fd), fd_(sock_fd) {
    // 设置非阻塞
    int flags = ::fcntl(fd_, F_GETFL, 0);
    ::fcntl(fd_, F_SETFL, flags | O_NONBLOCK);
  }

  ~AsyncSocket() {
    if (fd_ >= 0) {
      ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd_, nullptr);
      ::close(fd_);
    }
  }

  /// @brief 异步发送: 数据先入缓冲区,由 EPOLLOUT 驱动实际发送
  /// @return 实际入队的字节数 (< len 表示缓冲区满,应用层需处理背压)
  uint32_t AsyncSend(const uint8_t* data, uint32_t len) noexcept {
    uint32_t written = send_buf_.Write(data, len);
    if (written > 0 && !epollout_armed_) {
      ArmEpollout();
    }
    return written;
  }

  /// @brief EPOLLOUT 事件回调: 将缓冲区数据刷入内核
  /// @return >0 实际发送字节数, 0 缓冲区空, <0 连接错误
  int FlushSendBuffer() noexcept {
    int total_sent = 0;
    for (;;) {
      const uint8_t* ptr = nullptr;
      uint32_t avail = send_buf_.Peek(&ptr);
      if (avail == 0) break;

      ssize_t n = ::send(fd_, ptr, avail, MSG_DONTWAIT | MSG_NOSIGNAL);
      if (n > 0) {
        send_buf_.Consume(static_cast<uint32_t>(n));
        total_sent += static_cast<int>(n);
        continue;  // 尝试继续发送 (边界跨回环可能还有数据)
      }
      if (n < 0) {
        if (errno == EAGAIN || errno == EWOULDBLOCK) {
          break;  // 内核缓冲区满,等下一次 EPOLLOUT
        }
        if (errno == EINTR) continue;  // 被信号中断,重试
        return -1;  // 真正的错误 (EPIPE, ECONNRESET 等)
      }
      // n == 0: 对端关闭
      return -1;
    }

    // 更新 epoll 注册状态
    if (send_buf_.IsEmpty()) {
      DisarmEpollout();
    }
    return total_sent;
  }

 private:
  void ArmEpollout() noexcept {
    struct epoll_event ev{};
    ev.data.ptr = this;
    ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
    ::epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, fd_, &ev);
    epollout_armed_ = true;
  }

  void DisarmEpollout() noexcept {
    struct epoll_event ev{};
    ev.data.ptr = this;
    ev.events = EPOLLIN | EPOLLET;
    ::epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, fd_, &ev);
    epollout_armed_ = false;
  }

  int epoll_fd_;
  int fd_;
  SendBuffer<13> send_buf_;       // 8 KB 发送缓冲
  bool epollout_armed_ = false;
};

关键设计:

  1. EPOLLOUT 按需注册: 只在缓冲区有数据时注册 EPOLLOUT,避免空转唤醒
  2. FlushSendBuffer 循环刷写: 一次 EPOLLOUT 事件尽量多地发送数据 (EPOLLET 要求)
  3. EAGAIN/EINTR 正确处理: EAGAIN 等待下次事件,EINTR 立即重试,其他错误断开
  4. 背压感知: AsyncSend() 返回实际入队字节数,应用层可据此控制生产速率

3.3 事件循环集成

// epoll 事件循环
struct epoll_event events[64];
int n = ::epoll_wait(epoll_fd, events, 64, -1);

for (int i = 0; i < n; ++i) {
  auto* sock = static_cast<AsyncSocket*>(events[i].data.ptr);

  if (events[i].events & (EPOLLERR | EPOLLHUP)) {
    // 连接错误或对端挂断
    delete sock;
    continue;
  }
  if (events[i].events & EPOLLIN) {
    // 读取数据...
    sock->OnReadable();
  }
  if (events[i].events & EPOLLOUT) {
    int ret = sock->FlushSendBuffer();
    if (ret < 0) {
      // 发送失败,关闭连接
      delete sock;
      continue;
    }
  }
}

对比原始方案缺失的错误处理:

  • EPOLLERR / EPOLLHUP 事件现在被正确检测
  • 连接关闭时清理资源 (原始方案的 unique_ptr 生命周期管理有缺陷)

3.4 数据流时序

生产者线程                      I/O 线程 (epoll)
    |                               |
    | AsyncSend(data, 4096)         |
    |   → Write 4096B to SendBuffer |
    |   → ArmEpollout()            |
    |                               |
    |                          EPOLLOUT 触发
    |                               |
    |                          FlushSendBuffer()
    |                            → Peek() 获取连续区间
    |                            → send(fd, ptr, avail)
    |                            → 内核接受 1500B (short write)
    |                            → Consume(1500)
    |                            → send(fd, ptr, avail) 再次尝试
    |                            → EAGAIN (内核缓冲满)
    |                            → 等待下一次 EPOLLOUT
    |                               |
    |                          EPOLLOUT 再次触发
    |                            → 发送剩余 2596B
    |                            → 缓冲区空
    |                            → DisarmEpollout()

4. 与 newosp 基础设施的对比

newosp 的 SpscRingbuffer<T, N> 和本文的 SendBuffer 解决不同层面的问题:

4.1 SpscRingbuffer: 类型化元素队列

// newosp: 传递结构化帧 (类型安全)
using RecvRing = osp::SpscRingbuffer<RecvFrameSlot, 32>;

RecvFrameSlot slot;
slot.header = ...;
std::memcpy(slot.payload, data, len);
ring.Push(slot);  // 整帧入队
  • 用途: 接收线程 → 处理线程的帧传递
  • 元素: 固定大小结构体 (RecvFrameSlot ~4KB)
  • 操作粒度: 整帧 Push/Pop

4.2 SendBuffer: 字节流缓冲

// 本文: 字节流发送缓冲 (面向 TCP)
SendBuffer<13> buf;

buf.Write(header_bytes, 14);   // 帧头
buf.Write(payload, 4096);      // 载荷
// 由 EPOLLOUT 驱动 Peek() + send() + Consume()
  • 用途: 应用层 → TCP 发送的字节流暂存
  • 元素: 原始字节 (uint8_t)
  • 操作粒度: 可变长度字节块

4.3 核心共性

两者共享相同的底层设计原则:

设计原则SpscRingbufferSendBuffer
2 的幂大小 + 位掩码static_assert(IsPowerOf2)1 << SizeLog2
无界递增索引head_/tail_ 无符号递增write_idx_/read_idx_
acquire-release 配对AcquireOrder()/ReleaseOrder()acquire/release
缓存行对齐消除伪共享alignas(kCacheLineSize)alignas(64)
SPSC 约束 (不可多线程写)文档约定 + API 分离同上
零堆分配栈上 std::array栈上 uint8_t[]

4.4 newosp transport 的 short write 处理

newosp transport.hpp 中的 SendAll() 处理 TCP short write。v0.4.1 新增了 EAGAIN 区分:

// newosp v0.4.1 transport.hpp SendAll():
while (remaining > 0) {
  auto r = socket_.Send(ptr, remaining);
  if (!r.has_value()) {
    if (r.get_error() == SocketError::kWouldBlock) {
      // EAGAIN: 有限重试 (yield + 最多 16 次)
      if (++eagain_count > kMaxEagainRetries) {
        return TransportError::kWouldBlock;  // 不断开连接
      }
      std::this_thread::yield();
      continue;
    }
    // 致命错误 (EPIPE, ECONNRESET)
    connected_ = false;
    return TransportError::kSendFailed;
  }
  ptr += sent;
  remaining -= sent;
  eagain_count = 0;  // 有进展则重置
}

这是同步有限重试方案 – 比 v0.4.0 的直接判定失败改进了对瞬态 EAGAIN 的容忍度,且 EAGAIN 重试耗尽后返回 kWouldBlock 而不断开连接,允许调用方决策是否重试。

对于 newosp 的目标场景 (同机 shm_transport 优先,TCP 仅作远程备选),同步方案是合理的选择。高吞吐 TCP 场景应引入本文的 SendBuffer + EPOLLOUT 异步方案。


5. 内存序细节

5.1 为什么 Write 侧 load 自己的 write_idx 用 relaxed

const uint32_t w = write_idx_.load(std::memory_order_relaxed);  // 只有自己写
const uint32_t r = read_idx_.load(std::memory_order_acquire);   // 对方写,需 acquire

SPSC 模型中,write_idx_ 只由生产者线程修改,read_idx_ 只由消费者线程修改。加载自己拥有的索引不需要同步 (值一定是上次 store 的值),加载对方的索引需要 acquire 来保证看到对方的最新值以及相关的数据写入。

5.2 store 用 release 的含义

write_idx_.store(w + to_write, std::memory_order_release);

release 保证: 在 store 之前的所有 memcpy(数据写入) 对另一个线程的 acquire load 可见。这是 SPSC 无锁正确性的核心 – 消费者 acquire load 到新的 write_idx 后,数据一定已经就位。

5.3 ARM 上的实际代价

ARM (非 TSO 架构) 上:

  • relaxed load/store: 普通 ldr/str 指令
  • acquire load: ldar 指令 (ARM v8) 或 ldr + dmb ishld (ARM v7)
  • release store: stlr 指令 (ARM v8) 或 dmb ish + str (ARM v7)

每次 Write() 只有 1 次 acquire + 1 次 release,开销可控。这也是 newosp SpscRingbuffer 提供 FakeTSO 模式的原因 – 单核 MCU 上所有 acquire/release 可降级为 relaxed + compiler fence,进一步消除 barrier 开销。


6. 总结

  1. TCP short write 在非阻塞 + EPOLLET 模式下必须处理。正确方案是用户态发送缓冲 + EPOLLOUT 事件驱动刷写,而非阻塞重试。

  2. 字节环形缓冲的工程要求: 2 的幂位掩码 (非除法取模)、无界递增无符号索引、精确 acquire-release 内存序、缓存行对齐消除伪共享。这些要求与结构化 SPSC 队列完全一致。

  3. EPOLLOUT 管理关键: 按需注册 (有数据时 arm,空时 disarm),EPOLLET 模式下一次事件循环内尽量多发送,区分 EAGAIN (等待) / EINTR (重试) / 其他 (断开)。

  4. 同步 vs 异步 short write 处理: 同步方案 (循环 send() 直到写完) 适合低吞吐场景,实现简单但阻塞调用线程; 异步方案 (缓冲 + EPOLLOUT) 适合高吞吐场景,不阻塞但需要管理缓冲区生命周期和背压。


参考