7. Buffer
约 3781 字大约 13 分钟
2026-04-14
Buffer 简介
Buffer 类可以理解为 muduo 网络库中的动态缓冲区封装,它的作用是把 socket 上读写的数据暂存起来,方便上层按“消息”的方式处理,而不是直接和零散的字节流打交道。
在 TCP 通信中,数据到达的大小是不确定的:
- 可能一次只来一点点数据
- 也可能一次来了很多数据
- 也可能应用层一次只读走一部分数据
所以需要一个缓冲区来统一管理这些字节流。Buffer 类主要负责:
- 保存读缓冲区数据
- 保存写缓冲区数据
- 维护读写指针
- 从 fd 读取数据到缓冲区
- 把缓冲区数据写到 fd
- 支持自动扩容和空间整理
你可以把它理解为:
- Buffer:字节流仓库
- readerIndex_:仓库里“已经读到哪里了”
- writerIndex_:仓库里“已经写到哪里了”
Buffer 类重要的成员变量
1. 重要成员
static const size_t kCheapPrepend = 8:预留出来的前置空间大小。
这部分空间通常用于在消息前面追加一些协议头信息,避免频繁搬移数据。static const size_t kInitialSize = 1024:Buffer 的初始容量。
默认会先分配 1024 字节的可用空间。std::vector<char> buffer_:底层真正存储数据的容器。
Buffer 的所有数据都存在这个 vector 里。size_t readerIndex_:读指针。
表示当前可读数据从哪里开始。size_t writerIndex_:写指针。
表示当前可写区域的起点,也就是数据已经写到了哪里。
static const size_t kCheapPrepend = 8;//初始预留的prependabel空间大小
static const size_t kInitialSize = 1024;
std::vector<char> buffer_;
size_t readerIndex_;
size_t writerIndex_;Buffer 的空间布局
Buffer 的内部结构大致可以看成这样:
| prependable | readable | writable |
| kCheapPrepend | 数据区 | 空闲区 |
三个区域的含义
- prependable:位于最前面的预留空间,可以用来在消息前面插入头部信息。
- readable:当前已经写入、但还没有被应用层读取的数据。
- writable:还可以继续写入数据的空间。
提示
这三个区域的边界由谁决定?
由两个下标决定:
readerIndex_writerIndex_
Buffer 类重要的成员方法
1. 构造 / 初始化
explicit Buffer(size_t initalSize = kInitialSize)
: buffer_(kCheapPrepend + initalSize)
, readerIndex_(kCheapPrepend)
, writerIndex_(kCheapPrepend)
{
}构造函数会做三件事:
- 初始化
buffer_,大小为 kCheapPrepend(8) + initalSize(1024) - 把
readerIndex_设到 kCheapPrepend - 把
writerIndex_也设到 kCheapPrepend
提示
为什么读写指针都从 kCheapPrepend 开始?
因为前面先留出一段空白区域,方便将来在消息前面追加内容,而不用马上搬移整个缓冲区。
2. 查询缓冲区状态
size_t readableBytes() const { return writerIndex_ - readerIndex_; }
size_t writableBytes() const { return buffer_.size() - writerIndex_; }
size_t prependableBytes() const { return readerIndex_; }这三个函数是 Buffer 的核心状态查询接口。
readableBytes()
表示当前有多少字节可以被读取。
可读字节数 =
writerIndex_-readerIndex_
writableBytes()
表示当前还有多少字节可以继续写入。
可写字节数 =
buffer_.size()-writerIndex_
prependableBytes()
表示前面还有多少可用于前置追加的空间。
可前置空间 =
readerIndex_
3. 获取可读数据起始位置
const char *peek() const { return begin() + readerIndex_; }这个函数返回当前可读数据的起始地址。
上层如果想直接查看缓冲区里的数据,可以通过 peek() 拿到读区首地址,而不是自己去算指针偏移。
4. 读取数据后移动读指针
void retrieve(size_t len)
{
if (len < readableBytes())
{
readerIndex_ += len;
}
else
{
retrieveAll();
}
}这个函数表示:应用层已经读取了部分数据,现在要把这部分数据从可读区“消费掉”。
分两种情况
- 情况1:只读走一部分
如果 len < readableBytes(),说明只是消费了部分可读数据:readerIndex_ += len
- 情况2:全部读完了
如果 len >= readableBytes(),说明可读数据都被读完了:直接调用 retrieveAll()
5. 清空缓冲区
void retrieveAll()
{
readerIndex_ = kCheapPrepend;
writerIndex_ = kCheapPrepend;
}这个函数把缓冲区恢复到初始状态。
当缓冲区所有可读数据都被消费完之后,直接把读写指针重新设置到预留区起点即可,不需要真的释放内存。
6. 把整个可读区转成 string
std::string retrieveAllAsString() { return retrieveAsString(readableBytes()); }
std::string retrieveAsString(size_t len)
{
std::string result(peek(), len);
retrieve(len);
return result;
}这两个函数用于把缓冲区里的数据取出来并转换成 std::string。
retrieveAsString(len):先从 peek() 位置拷贝 len 个字节到 std::string,再调用 retrieve(len) 消费掉这些数据。
retrieveAllAsString():实际上就是把所有可读数据都转成字符串。
上层拿到消息后,既能方便处理文本协议,也能方便调试输出。
7. 保证写入空间足够
void ensureWritableBytes(size_t len)
{
if (writableBytes() < len)
{
makeSpace(len);
}
}这个函数保证当前缓冲区有足够的可写空间。
当你准备往 Buffer 里写入 len 字节数据时,如果空间不够,就调用 makeSpace(len):
要么扩容,要么整理已有数据,腾出更多空间
8. 向缓冲区追加数据
void append(const char *data, size_t len)
{
ensureWritableBytes(len);
std::copy(data, data+len, beginWrite());
writerIndex_ += len;
}这个函数用于把外部数据追加到 Buffer 的可写区。
先确保有足够空间,把 [data, data + len) 拷贝到写位置,更新 writerIndex_
它就是“把新数据写进缓冲区”。
9. 获取写入起始位置
char *beginWrite() { return begin() + writerIndex_; }
const char *beginWrite() const { return begin() + writerIndex_; }这个函数返回当前可写区的起始地址。
方便 append() 或其他写入操作直接把数据写入到缓冲区末尾。
10. 从 fd 读取数据到缓冲区
ssize_t readFd(int fd, int *saveErrno);ssize_t Buffer::readFd(int fd, int *saveErrno)
{
char extrabuf[65536] = {0};
struct iovec vec[2];
const size_t writable = writableBytes();
vec[0].iov_base = begin() + writerIndex_;
vec[0].iov_len = writable;
vec[1].iov_base = extrabuf;
vec[1].iov_len = sizeof(extrabuf);
const int iovcnt = (writable < sizeof(extrabuf)) ? 2 : 1;
const ssize_t n = ::readv(fd, vec, iovcnt);
if (n < 0)
{
*saveErrno = errno;
}
else if (n <= writable)
{
writerIndex_ += n;
}
else
{
writerIndex_ = buffer_.size();
append(extrabuf, n - writable);
}
return n;
}这个函数用于把 socket fd 上的数据读入 Buffer。
相关信息
iovec = 输入 / 输出向量(I/O vector)是 Linux/Unix 系统提供的一个结构体,专门给 readv / writev 这两个分散 / 集中 I/O 函数用的。
作用:一次系统调用,读写多块不连续的内存。
struct iovec {
void *iov_base; // 内存块的起始地址
size_t iov_len; // 这块内存有多大(长度)
};readv 会做什么?
从 fd 读取数据,自动按顺序填满这两块内存!
先写满 vec [0],还有剩余数据 → 自动写进 vec [1]
提示
为什么不用单纯 read()?为什么要用 iovec + readv?
因为一次读到的数据大小不确定,而 Buffer 的可写空间也不一定够。所以这里用 readv() 做了“两段式读取”:
- 第一段:直接读到 Buffer 的可写区
- 第二段:如果 Buffer 空间不够,就先读到栈上的 extrabuf 中
如果不用 readv:Buffer 空间不够,你必须先把 Buffer 扩容,再 read,多一次系统调用 / 多一次内存拷贝。
用了 readv:一次 read 就把数据读进两块内存,不用提前扩容,零浪费、高效率,这就是分散读(scatter read)。
它的工作方式
- 情况1:Buffer 空间足够
n <= writable,说明数据都写进了 buffer_,只需要更新 writerIndex_ += n
- 情况2:Buffer 空间不够
n > writable,先把可写区写满,剩下的数据先落到 extrabuf,再调用 append(extrabuf, n - writable) 追加进 Buffer
这样设计的好处:减少系统调用开销,避免一开始就频繁扩容,兼顾高效和灵活
11. 向 fd 写出缓冲区数据
ssize_t writeFd(int fd, int *saveErrno);ssize_t Buffer::writeFd(int fd, int *saveErrno)
{
ssize_t n = ::write(fd, peek(), readableBytes());
if (n < 0)
{
*saveErrno = errno;
}
return n;
}这个函数用于把 Buffer 中的可读数据写到 fd。
通常用于输出缓冲区,把待发送的数据写到 socket 中。
它直接从 peek() 指向的可读区开始写,写入长度是 readableBytes()。
提示
为什么这里只用 write()?
因为写操作一般是把当前可发送的数据尽量往外发送。 如果一次没写完,通常会由上层逻辑保留剩余数据,等待下次可写事件继续发送。
Buffer 的私有辅助函数
1. begin()
char *begin() { return &*buffer_.begin(); }
const char *begin() const { return &*buffer_.begin(); }这个函数返回底层 vector<char> 的首地址。
方便通过指针偏移访问缓冲区内容。
2. makeSpace(size_t len)
void makeSpace(size_t len)
{
if (writableBytes() + prependableBytes() < len + kCheapPrepend)
{
buffer_.resize(writerIndex_ + len);
}
else
{
size_t readable = readableBytes();
std::copy(begin() + readerIndex_,
begin() + writerIndex_,
begin() + kCheapPrepend);
readerIndex_ = kCheapPrepend;
writerIndex_ = readerIndex_ + readable;
}
}这个函数用于在空间不足时腾出足够的可写空间。
它的逻辑分两种
- 情况1:空间实在不够,直接扩容
如果:writableBytes() + prependableBytes() < len + kCheapPrepend,说明就算把前面的可前置空间也利用起来,还是不够放下 len 个字节,那么就直接扩容:buffer_.resize(writerIndex_ + len);
- 情况2:空间够,只是数据分布不连续
如果总空间够,只是前面的读走空间被浪费了,那么就把可读数据搬到前面:将 [readerIndex_, writerIndex_) 拷贝到 kCheapPrepend 后面,重新设置 readerIndex_ 和 writerIndex_。
它是在做“整理仓库”:要么扩仓库,要么把已有货物往前挪一挪,腾出连续空间。
Buffer 的整体工作流程
1. 接收数据
当 socket 有数据可读时:
buffer.readFd(fd, &errno);
把数据读入 Buffer。
2. 读取消息
上层从 Buffer 中取数据:
buffer.retrieveAllAsString();
或者:
buffer.retrieve(len);
把已经消费的数据移除。
3. 发送数据
当需要向 socket 写数据时:
buffer.writeFd(fd, &errno);
把缓冲区里的内容写出去。
Buffer 就像一个“智能储物箱”:新数据先放进去,读的时候只移动读指针,写的时候只移动写指针 空间不够时自动整理或扩,它避免了每次处理 socket 数据都频繁分配内存、拷贝内存,是网络库里非常重要的基础组件。
总结
Buffer 的核心职责
- 管理一段连续内存
- 提供读写指针机制
- 支持动态扩容和空间整理
- 从 fd 高效读取数据
- 向 fd 写出数据
- 为上层协议解析提供稳定缓冲
Buffer 是 muduo 里用于承载 TCP 字节流的动态缓冲区,它通过 readerIndex_ 和 writerIndex_ 管理可读可写区域,并通过 readFd / writeFd 与 socket 进行高效数据交换。
Buffer 源码
#pragma once
#include <vector>
#include <string>
#include <algorithm>
#include <stddef.h>
// 网络库底层的缓冲区类型定义
class Buffer
{
public:
static const size_t kCheapPrepend = 8;//初始预留的prependabel空间大小
static const size_t kInitialSize = 1024;
explicit Buffer(size_t initalSize = kInitialSize)
: buffer_(kCheapPrepend + initalSize)
, readerIndex_(kCheapPrepend)
, writerIndex_(kCheapPrepend)
{
}
size_t readableBytes() const { return writerIndex_ - readerIndex_; }
size_t writableBytes() const { return buffer_.size() - writerIndex_; }
size_t prependableBytes() const { return readerIndex_; }
// 返回缓冲区中可读数据的起始地址
const char *peek() const { return begin() + readerIndex_; }
void retrieve(size_t len)
{
if (len < readableBytes())
{
readerIndex_ += len; // 说明应用只读取了可读缓冲区数据的一部分,就是len长度 还剩下readerIndex+=len到writerIndex_的数据未读
}
else // len == readableBytes()
{
retrieveAll();
}
}
void retrieveAll()
{
readerIndex_ = kCheapPrepend;
writerIndex_ = kCheapPrepend;
}
// 把onMessage函数上报的Buffer数据 转成string类型的数据返回
std::string retrieveAllAsString() { return retrieveAsString(readableBytes()); }
std::string retrieveAsString(size_t len)
{
std::string result(peek(), len);
retrieve(len); // 上面一句把缓冲区中可读的数据已经读取出来 这里肯定要对缓冲区进行复位操作
return result;
}
// buffer_.size - writerIndex_
void ensureWritableBytes(size_t len)
{
if (writableBytes() < len)
{
makeSpace(len); // 扩容
}
}
// 把[data, data+len]内存上的数据添加到writable缓冲区当中
void append(const char *data, size_t len)
{
ensureWritableBytes(len);
std::copy(data, data+len, beginWrite());
writerIndex_ += len;
}
char *beginWrite() { return begin() + writerIndex_; }
const char *beginWrite() const { return begin() + writerIndex_; }
// 从fd上读取数据
ssize_t readFd(int fd, int *saveErrno);
// 通过fd发送数据
ssize_t writeFd(int fd, int *saveErrno);
private:
// vector底层数组首元素的地址 也就是数组的起始地址
char *begin() { return &*buffer_.begin(); }
const char *begin() const { return &*buffer_.begin(); }
void makeSpace(size_t len)
{
/**
* | kCheapPrepend |xxx| reader | writer | // xxx标示reader中已读的部分
* | kCheapPrepend | reader | len |
**/
if (writableBytes() + prependableBytes() < len + kCheapPrepend) // 也就是说 len > xxx前面剩余的空间 + writer的部分
{
buffer_.resize(writerIndex_ + len);
}
else // 这里说明 len <= xxx + writer 把reader搬到从xxx开始 使得xxx后面是一段连续空间
{
size_t readable = readableBytes(); // readable = reader的长度
// 将当前缓冲区中从readerIndex_到writerIndex_的数据
// 拷贝到缓冲区起始位置kCheapPrepend处,以便腾出更多的可写空间
std::copy(begin() + readerIndex_,
begin() + writerIndex_,
begin() + kCheapPrepend);
readerIndex_ = kCheapPrepend;
writerIndex_ = readerIndex_ + readable;
}
}
std::vector<char> buffer_;
size_t readerIndex_;
size_t writerIndex_;
};#include <errno.h>
#include <sys/uio.h>
#include <unistd.h>
#include "Buffer.h"
/**
* 从fd上读取数据 Poller工作在LT模式
* Buffer缓冲区是有大小的! 但是从fd上读取数据的时候 却不知道tcp数据的最终大小
*
* @description: 从socket读到缓冲区的方法是使用readv先读至buffer_,
* Buffer_空间如果不够会读入到栈上65536个字节大小的空间,然后以append的
* 方式追加入buffer_。既考虑了避免系统调用带来开销,又不影响数据的接收。
**/
ssize_t Buffer::readFd(int fd, int *saveErrno)
{
// 栈额外空间,用于从套接字往出读时,当buffer_暂时不够用时暂存数据,待buffer_重新分配足够空间后,在把数据交换给buffer_。
char extrabuf[65536] = {0}; // 栈上内存空间 65536/1024 = 64KB
/*
struct iovec {
ptr_t iov_base; // iov_base指向的缓冲区存放的是readv所接收的数据或是writev将要发送的数据
size_t iov_len; // iov_len在各种情况下分别确定了接收的最大长度以及实际写入的长度
};
*/
// 使用iovec分配两个连续的缓冲区
struct iovec vec[2];
const size_t writable = writableBytes(); // 这是Buffer底层缓冲区剩余的可写空间大小 不一定能完全存储从fd读出的数据
// 第一块缓冲区,指向可写空间
vec[0].iov_base = begin() + writerIndex_;
vec[0].iov_len = writable;
// 第二块缓冲区,指向栈空间
vec[1].iov_base = extrabuf;
vec[1].iov_len = sizeof(extrabuf);
// when there is enough space in this buffer, don't read into extrabuf.
// when extrabuf is used, we read 128k-1 bytes at most.
// 这里之所以说最多128k-1字节,是因为若writable为64k-1,那么需要两个缓冲区 第一个64k-1 第二个64k 所以做多128k-1
// 如果第一个缓冲区>=64k 那就只采用一个缓冲区 而不使用栈空间extrabuf[65536]的内容
const int iovcnt = (writable < sizeof(extrabuf)) ? 2 : 1;
const ssize_t n = ::readv(fd, vec, iovcnt);
if (n < 0)
{
*saveErrno = errno;
}
else if (n <= writable) // Buffer的可写缓冲区已经够存储读出来的数据了
{
writerIndex_ += n;
}
else // extrabuf里面也写入了n-writable长度的数据
{
writerIndex_ = buffer_.size();
append(extrabuf, n - writable); // 对buffer_扩容 并将extrabuf存储的另一部分数据追加至buffer_
}
return n;
}
// inputBuffer_.readFd表示将对端数据读到inputBuffer_中,移动writerIndex_指针
// outputBuffer_.writeFd标示将数据写入到outputBuffer_中,从readerIndex_开始,可以写readableBytes()个字节
ssize_t Buffer::writeFd(int fd, int *saveErrno)
{
ssize_t n = ::write(fd, peek(), readableBytes());
if (n < 0)
{
*saveErrno = errno;
}
return n;
}