4. EventLoop
约 4563 字大约 15 分钟
2026-04-07
EventLoop 简介
EventLoop 类可以理解为 Reactor 模型中的事件循环核心,它负责把 Poller 监听到的事件分发给对应的 Channel,并执行用户注册的回调函数。
如果把 muduo 的网络框架看成一套“工厂流水线”,那么:
Channel负责描述某个 fd 以及它关心的事件;Poller负责监听这些 fd 是否发生了事件;EventLoop负责不停地循环、等待、分发、执行回调。
它的核心作用可以概括为:
- 调用
Poller::poll()等待 IO 事件; - 收集发生事件的
Channel; - 调用这些
Channel的handleEvent(); - 执行线程间投递过来的任务回调;
- 通过
eventfd唤醒阻塞中的poll()。
EventLoop 类重要的成员变量
1. 重要成员
std::atomic_bool looping_:标识当前EventLoop是否正在执行loop()循环。
它是原子变量,用于保证多线程场景下状态修改的安全性。std::atomic_bool quit_:标识是否需要退出事件循环。
当调用quit()后,这个值会被设置为true,loop()就会结束。const pid_t threadId_:记录创建这个EventLoop的线程 ID。
用来判断当前调用者是否在EventLoop自己所属的线程中。Timestamp pollRetureTime_:保存Poller返回事件的时间点。
这个时间会传给Channel::handleEvent(),方便回调知道事件发生的时间。std::unique_ptr<Poller> poller_:指向当前EventLoop所使用的 Poller。EventLoop不直接操作 epoll,而是通过Poller接口统一管理。int wakeupFd_:用于唤醒EventLoop的eventfd。
当其他线程向这个EventLoop投递任务时,可以通过它打断阻塞的epoll_wait()。std::unique_ptr<Channel> wakeupChannel_:专门用于监听wakeupFd_的 Channel。
当wakeupFd_可读时,就说明有线程向当前EventLoop发了唤醒信号。ChannelList activeChannels_:保存当前Poller检测到的所有活跃Channel。poll()返回后,EventLoop会遍历这个列表并逐个处理。std::atomic_bool callingPendingFunctors_:标识当前是否正在执行pendingFunctors_中的任务。
这个状态会影响queueInLoop()是否需要唤醒事件循环。std::vector<Functor> pendingFunctors_:存储需要在EventLoop所在线程中执行的回调任务。
这些任务通常是其他线程投递过来的。std::mutex mutex_:保护pendingFunctors_的互斥锁。
因为多个线程可能会同时向pendingFunctors_中添加任务,所以需要加锁保证线程安全。
// 事件循环类 主要包含了两个大模块 Channel Poller(epoll的抽象)
std::atomic_bool looping_; // 原子操作 底层通过CAS实现
std::atomic_bool quit_; // 标识退出loop循环
const pid_t threadId_; // 记录当前EventLoop是被哪个线程id创建的 即标识了当前EventLoop的所属线程id
Timestamp pollRetureTime_; // Poller返回发生事件的Channels的时间点
std::unique_ptr<Poller> poller_;
int wakeupFd_; // 作用:当mainLoop获取一个新用户的Channel 需通过轮询算法选择一个subLoop 通过该成员唤醒subLoop处理Channel
std::unique_ptr<Channel> wakeupChannel_;
ChannelList activeChannels_; // 返回Poller检测到当前有事件发生的所有Channel列表
std::atomic_bool callingPendingFunctors_; // 标识当前loop是否有需要执行的回调操作
std::vector<Functor> pendingFunctors_; // 存储loop需要执行的所有回调操作
std::mutex mutex_; // 互斥锁 用来保护上面vector容器的线程安全操作EventLoop 类重要的成员方法
1. 构造 / 析构
EventLoop();
~EventLoop();EventLoop::EventLoop()
: looping_(false)
, quit_(false)
, callingPendingFunctors_(false)
, threadId_(CurrentThread::tid())
, poller_(Poller::newDefaultPoller(this))
, wakeupFd_(createEventfd())
, wakeupChannel_(new Channel(this, wakeupFd_))
{
LOG_DEBUG("EventLoop created %p in thread %d\n", this, threadId_);
if (t_loopInThisThread)
{
LOG_FATAL("Another EventLoop %p exists in this thread %d\n", t_loopInThisThread, threadId_);
}
else
{
t_loopInThisThread = this;
}
wakeupChannel_->setReadCallback(
std::bind(&EventLoop::handleRead, this)); // 设置wakeupfd的事件类型以及发生事件后的回调操作
wakeupChannel_->enableReading(); // 每一个EventLoop都将监听wakeupChannel_的EPOLL读事件了
}
EventLoop::~EventLoop()
{
wakeupChannel_->disableAll(); // 给Channel移除所有感兴趣的事件
wakeupChannel_->remove(); // 把Channel从EventLoop上删除掉
::close(wakeupFd_);
t_loopInThisThread = nullptr;
}构造函数主要做了几件事:
- 记录当前线程 ID
threadId_; - 创建默认的
Poller; - 创建
eventfd作为唤醒fd; - 创建专门监听
wakeupFd_的wakeupChannel_; - 为
wakeupChannel_绑定读回调handleRead(); - 让
wakeupChannel_开始监听读事件。
提示
为什么要有 t_loopInThisThread?
它是一个线程局部变量,用来保证 一个线程只能有一个 EventLoop。 如果同一个线程里创建了多个 EventLoop,就会直接报错。
析构函数则负责:
- 取消
wakeupChannel_的所有监听; - 从 EventLoop 中移除它;
- 关闭
wakeupFd_; - 清空线程局部的 EventLoop 指针。
2. loop()
void loop();void EventLoop::loop()
{
looping_ = true;
quit_ = false;
LOG_INFO("EventLoop %p start looping\n", this);
while (!quit_)
{
activeChannels_.clear();
pollRetureTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
for (Channel *channel : activeChannels_)
{
// Poller监听哪些channel发生了事件 然后上报给EventLoop 通知channel处理相应的事件
channel->handleEvent(pollRetureTime_);
}
/**
* 执行当前EventLoop事件循环需要处理的回调操作 对于线程数 >=2 的情况 IO线程 mainloop(mainReactor) 主要工作:
* accept接收连接 => 将accept返回的connfd打包为Channel => TcpServer::newConnection通过轮询将TcpConnection对象分配给subloop处理
*
* mainloop调用queueInLoop将回调加入subloop(该回调需要subloop执行 但subloop还在poller_->poll处阻塞) queueInLoop通过wakeup将subloop唤醒
**/
doPendingFunctors();
}
LOG_INFO("EventLoop %p stop looping.\n", this);
looping_ = false;
}loop() 是整个 EventLoop 的主循环,也是 Reactor 的核心。
每一轮循环大致分成三步:
- 调用
poller_->poll()等待事件; - 遍历所有活跃的 Channel,调用它们的
handleEvent(); - 执行
pendingFunctors_中的任务。
这段逻辑的本质:loop() 就是一个不断重复的“事件分发器”:
- 没有事件时就阻塞等待;
- 有事件时就交给 Channel 处理;
- 有线程投递任务时就执行任务。
3. quit()
void quit();void EventLoop::quit()
{
quit_ = true;
if (!isInLoopThread())
{
wakeup();
}
}quit() 的作用是退出事件循环。
提示
为什么如果不是当前线程就要 wakeup()?
因为 EventLoop 可能正阻塞在 poller_->poll() 中。 如果其他线程调用 quit(),单纯把 quit_ = true 还不够,必须唤醒阻塞中的 epoll_wait(),让 loop() 能尽快检查到退出标志并结束循环。
4. runInLoop(Functor cb)
void runInLoop(Functor cb);void EventLoop::runInLoop(Functor cb)
{
if (isInLoopThread()) // 当前EventLoop中执行回调
{
cb();
}
else // 在非当前EventLoop线程中执行cb,就需要唤醒EventLoop所在线程执行cb
{
queueInLoop(cb);
}
}runInLoop() 的意思是:在 EventLoop 所在线程里执行这个回调。
分两种情况
- 如果当前就在这个 EventLoop 所属线程中,直接执行 cb();
- 如果不在,就把任务扔到
queueInLoop(),等待该 EventLoop 线程来执行。
这个函数的意义:它为上层提供了一个统一入口: 不管当前是不是 loop 线程,都能“确保任务最终在 loop 线程中执行”。
5. queueInLoop(Functor cb)
void queueInLoop(Functor cb);void EventLoop::queueInLoop(Functor cb)
{
{
std::unique_lock<std::mutex> lock(mutex_);
pendingFunctors_.emplace_back(cb);
}
/**
* || callingPendingFunctors的意思是 当前loop正在执行回调中 但是loop的pendingFunctors_中又加入了新的回调 需要通过wakeup写事件
* 唤醒相应的需要执行上面回调操作的loop的线程 让loop()下一次poller_->poll()不再阻塞(阻塞的话会延迟前一次新加入的回调的执行),然后
* 继续执行pendingFunctors_中的回调函数
**/
if (!isInLoopThread() || callingPendingFunctors_)
{
wakeup(); // 唤醒loop所在线程
}
}queueInLoop() 用来把一个回调任务加入待执行队列中。
作用:
- 把任务放入
pendingFunctors_; - 如果当前线程不是 loop 线程,或者当前正在执行其他任务,就通过
wakeup()唤醒事件循环线程。
提示
为什么需要唤醒?
因为 EventLoop 可能正在 poll() 阻塞,如果不唤醒,新的任务就会被延迟执行。
6. wakeup()
void wakeup();void EventLoop::wakeup()
{
uint64_t one = 1;
ssize_t n = write(wakeupFd_, &one, sizeof(one));
if (n != sizeof(one))
{
LOG_ERROR("EventLoop::wakeup() writes %lu bytes instead of 8\n", n);
}
}wakeup() 的作用是向 wakeupFd_ 写入一个整数,从而让 wakeupChannel_ 变成可读状态。
它的本质:wakeupFd_ 是一个 eventfd,只要往里写数据,就会触发读事件,wakeupChannel_ 监听到读事件后,会调用 handleRead(),这样就能打断阻塞中的 epll_wait(),这是一种线程间通知机制。
7. handleRead()
void handleRead();void EventLoop::handleRead()
{
uint64_t one = 1;
ssize_t n = read(wakeupFd_, &one, sizeof(one));
if (n != sizeof(one))
{
LOG_ERROR("EventLoop::handleRead() reads %lu bytes instead of 8\n", n);
}
}handleRead() 是 wakeupChannel_ 的读回调。
它从 wakeupFd_ 中读出 8 字节数据,清除 eventfd 的可读状态,让后续唤醒通知可以继续正常工作。
提示
为什么必须读掉?
因为如果不读,wakeupFd_ 会一直保持可读状态,epoll_wait() 会不断返回,造成空转。
8. updateChannel(Channel *channel)
void updateChannel(Channel *channel);void EventLoop::updateChannel(Channel *channel)
{
poller_->updateChannel(channel);
}EventLoop 本身不直接处理 epoll,它只是把工作转交给 Poller。
这个函数的作用就是: 把 Channel 的状态更新请求交给 Poller 去做。
9. removeChannel(Channel *channel)
void removeChannel(Channel *channel);void EventLoop::removeChannel(Channel *channel)
{
poller_->removeChannel(channel);
}和 updateChannel() 类似,removeChannel() 也是把删除 Channel 的动作交给 Poller 去完成。
10. hasChannel(Channel *channel)
bool hasChannel(Channel *channel);// bool EventLoop::hasChannel(Channel *channel)
// {
// return poller_->hasChannel(channel);
// }这个函数通常用于判断某个 Channel 是否已经被当前 EventLoop 管理。 它本质上会转发给 Poller::hasChannel()。
EventLoop 的线程判断
bool isInLoopThread() const { return threadId_ == CurrentThread::tid(); }这个函数用于判断当前调用是否发生在 EventLoop 自己的线程中。
提示
为什么要判断线程?
因为 muduo 的设计是 one loop per thread,很多操作必须在所属线程中完成。
例如:
- 直接执行回调;
- 修改 Channel;
- 操作 Poller。
如果不在所属线程中,就需要用 queueInLoop() / wakeup() 来跨线程投递任务。
EventLoop 和 Poller、Channel 的关系
可以这样理解:
Channel 保存:
- fd
- 感兴趣事件
- 实际发生事件
- 回调函数
Poller 负责:
- 把 Channel 注册到内核事件监听器
- 等待事件发生
- 把活跃 Channel 收集回来
EventLoop 负责:
- 驱动
Poller::poll() - 遍历活跃 Channel
- 调用
Channel::handleEvent() - 执行跨线程投递的回调任务 EventLoop 的整体运行流程
1. 创建阶段
EventLoop loop;
会做这些事:
- 创建 Poller
- 创建 eventfd
- 创建 wakeupChannel_
- 注册 handleRead() 回调
- 开始监听 wakeupFd_
2. 循环阶段
调用:
loop.loop();
后会进入死循环:
poller_->poll()等待事件;- 收集活跃 Channel;
- 执行
channel->handleEvent(); - 执行
doPendingFunctors()。
3. 任务投递阶段
其他线程可以调用 loop.queueInLoop(cb); 把任务交给该 EventLoop 所在线程执行。
如果 EventLoop 正在阻塞,就通过 wakeup() 把它唤醒。
4. 退出阶段
调用 loop.quit(); 后,EventLoop 会在合适的时机退出 loop()。
总结
EventLoop 的核心职责
- 驱动事件循环
- 调用 Poller 等待事件
- 分发事件给 Channel
- 执行跨线程任务
- 提供线程安全的任务投递机制
- 使用 eventfd 实现线程唤醒
EventLoop 就像一个“总调度员”:
- Poller 负责把“发生了什么”告诉它;
- Channel 负责把“事件发生后怎么处理”准备好;
- EventLoop 负责把整个流程串起来;
如果别的线程想插队干活,就通过 queueInLoop() 和 wakeup() 通知它。
EventLoop 是 Reactor 的心脏,Poller 是耳朵,Channel 是手脚,wakeupFd_ 则是跨线程的“门铃”。
EventLoop 源码
#pragma once
#include <functional>
#include <vector>
#include <atomic>
#include <memory>
#include <mutex>
#include "noncopyable.h"
#include "Timestamp.h"
#include "CurrentThread.h"
class Channel;
class Poller;
// 事件循环类 主要包含了两个大模块 Channel Poller(epoll的抽象)
class EventLoop : noncopyable
{
public:
using Functor = std::function<void()>;
EventLoop();
~EventLoop();
// 开启事件循环
void loop();
// 退出事件循环
void quit();
Timestamp pollReturnTime() const { return pollRetureTime_; }
// 在当前loop中执行
void runInLoop(Functor cb);
// 把上层注册的回调函数cb放入队列中 唤醒loop所在的线程执行cb
void queueInLoop(Functor cb);
// 通过eventfd唤醒loop所在的线程
void wakeup();
// EventLoop的方法 => Poller的方法
void updateChannel(Channel *channel);
void removeChannel(Channel *channel);
bool hasChannel(Channel *channel);
// 判断EventLoop对象是否在自己的线程里
bool isInLoopThread() const { return threadId_ == CurrentThread::tid(); } // threadId_为EventLoop创建时的线程id CurrentThread::tid()为当前线程id
private:
void handleRead(); // 给eventfd返回的文件描述符wakeupFd_绑定的事件回调 当wakeup()时 即有事件发生时 调用handleRead()读wakeupFd_的8字节 同时唤醒阻塞的epoll_wait
void doPendingFunctors(); // 执行上层回调
using ChannelList = std::vector<Channel *>;
std::atomic_bool looping_; // 原子操作 底层通过CAS实现
std::atomic_bool quit_; // 标识退出loop循环
const pid_t threadId_; // 记录当前EventLoop是被哪个线程id创建的 即标识了当前EventLoop的所属线程id
Timestamp pollRetureTime_; // Poller返回发生事件的Channels的时间点
std::unique_ptr<Poller> poller_;
int wakeupFd_; // 作用:当mainLoop获取一个新用户的Channel 需通过轮询算法选择一个subLoop 通过该成员唤醒subLoop处理Channel
std::unique_ptr<Channel> wakeupChannel_;
ChannelList activeChannels_; // 返回Poller检测到当前有事件发生的所有Channel列表
std::atomic_bool callingPendingFunctors_; // 标识当前loop是否有需要执行的回调操作
std::vector<Functor> pendingFunctors_; // 存储loop需要执行的所有回调操作
std::mutex mutex_; // 互斥锁 用来保护上面vector容器的线程安全操作
};#include <sys/eventfd.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <memory>
#include "EventLoop.h"
#include "Logger.h"
#include "Channel.h"
#include "Poller.h"
// 防止一个线程创建多个EventLoop
__thread EventLoop *t_loopInThisThread = nullptr;
// 定义默认的Poller IO复用接口的超时时间
const int kPollTimeMs = 10000; // 10000毫秒 = 10秒钟
/* 创建线程之后主线程和子线程谁先运行是不确定的。
* 通过一个eventfd在线程之间传递数据的好处是多个线程无需上锁就可以实现同步。
* eventfd支持的最低内核版本为Linux 2.6.27,在2.6.26及之前的版本也可以使用eventfd,但是flags必须设置为0。
* 函数原型:
* #include <sys/eventfd.h>
* int eventfd(unsigned int initval, int flags);
* 参数说明:
* initval,初始化计数器的值。
* flags, EFD_NONBLOCK,设置socket为非阻塞。
* EFD_CLOEXEC,执行fork的时候,在父进程中的描述符会自动关闭,子进程中的描述符保留。
* 场景:
* eventfd可以用于同一个进程之中的线程之间的通信。
* eventfd还可以用于同亲缘关系的进程之间的通信。
* eventfd用于不同亲缘关系的进程之间通信的话需要把eventfd放在几个进程共享的共享内存中(没有测试过)。
*/
// 创建wakeupfd 用来notify唤醒subReactor处理新来的channel
int createEventfd()
{
int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
if (evtfd < 0)
{
LOG_FATAL("eventfd error:%d\n", errno);
}
return evtfd;
}
EventLoop::EventLoop()
: looping_(false)
, quit_(false)
, callingPendingFunctors_(false)
, threadId_(CurrentThread::tid())
, poller_(Poller::newDefaultPoller(this))
, wakeupFd_(createEventfd())
, wakeupChannel_(new Channel(this, wakeupFd_))
{
LOG_DEBUG("EventLoop created %p in thread %d\n", this, threadId_);
if (t_loopInThisThread)
{
LOG_FATAL("Another EventLoop %p exists in this thread %d\n", t_loopInThisThread, threadId_);
}
else
{
t_loopInThisThread = this;
}
wakeupChannel_->setReadCallback(
std::bind(&EventLoop::handleRead, this)); // 设置wakeupfd的事件类型以及发生事件后的回调操作
wakeupChannel_->enableReading(); // 每一个EventLoop都将监听wakeupChannel_的EPOLL读事件了
}
EventLoop::~EventLoop()
{
wakeupChannel_->disableAll(); // 给Channel移除所有感兴趣的事件
wakeupChannel_->remove(); // 把Channel从EventLoop上删除掉
::close(wakeupFd_);
t_loopInThisThread = nullptr;
}
// 开启事件循环
void EventLoop::loop()
{
looping_ = true;
quit_ = false;
LOG_INFO("EventLoop %p start looping\n", this);
while (!quit_)
{
activeChannels_.clear();
pollRetureTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
for (Channel *channel : activeChannels_)
{
// Poller监听哪些channel发生了事件 然后上报给EventLoop 通知channel处理相应的事件
channel->handleEvent(pollRetureTime_);
}
/**
* 执行当前EventLoop事件循环需要处理的回调操作 对于线程数 >=2 的情况 IO线程 mainloop(mainReactor) 主要工作:
* accept接收连接 => 将accept返回的connfd打包为Channel => TcpServer::newConnection通过轮询将TcpConnection对象分配给subloop处理
*
* mainloop调用queueInLoop将回调加入subloop(该回调需要subloop执行 但subloop还在poller_->poll处阻塞) queueInLoop通过wakeup将subloop唤醒
**/
doPendingFunctors();
}
LOG_INFO("EventLoop %p stop looping.\n", this);
looping_ = false;
}
/**
* 退出事件循环
* 1. 如果loop在自己的线程中调用quit成功了 说明当前线程已经执行完毕了loop()函数的poller_->poll并退出
* 2. 如果不是当前EventLoop所属线程中调用quit退出EventLoop 需要唤醒EventLoop所属线程的epoll_wait
*
* 比如在一个subloop(worker)中调用mainloop(IO)的quit时 需要唤醒mainloop(IO)的poller_->poll 让其执行完loop()函数
*
* !!! 注意: 正常情况下 mainloop负责请求连接 将回调写入subloop中 通过生产者消费者模型即可实现线程安全的队列
* !!! 但是muduo通过wakeup()机制 使用eventfd创建的wakeupFd_ notify 使得mainloop和subloop之间能够进行通信
**/
void EventLoop::quit()
{
quit_ = true;
if (!isInLoopThread())
{
wakeup();
}
}
// 在当前loop中执行cb
void EventLoop::runInLoop(Functor cb)
{
if (isInLoopThread()) // 当前EventLoop中执行回调
{
cb();
}
else // 在非当前EventLoop线程中执行cb,就需要唤醒EventLoop所在线程执行cb
{
queueInLoop(cb);
}
}
// 把cb放入队列中 唤醒loop所在的线程执行cb
void EventLoop::queueInLoop(Functor cb)
{
{
std::unique_lock<std::mutex> lock(mutex_);
pendingFunctors_.emplace_back(cb);
}
/**
* || callingPendingFunctors的意思是 当前loop正在执行回调中 但是loop的pendingFunctors_中又加入了新的回调 需要通过wakeup写事件
* 唤醒相应的需要执行上面回调操作的loop的线程 让loop()下一次poller_->poll()不再阻塞(阻塞的话会延迟前一次新加入的回调的执行),然后
* 继续执行pendingFunctors_中的回调函数
**/
if (!isInLoopThread() || callingPendingFunctors_)
{
wakeup(); // 唤醒loop所在线程
}
}
void EventLoop::handleRead()
{
uint64_t one = 1;
ssize_t n = read(wakeupFd_, &one, sizeof(one));
if (n != sizeof(one))
{
LOG_ERROR("EventLoop::handleRead() reads %lu bytes instead of 8\n", n);
}
}
// 用来唤醒loop所在线程 向wakeupFd_写一个数据 wakeupChannel就发生读事件 当前loop线程就会被唤醒
void EventLoop::wakeup()
{
uint64_t one = 1;
ssize_t n = write(wakeupFd_, &one, sizeof(one));
if (n != sizeof(one))
{
LOG_ERROR("EventLoop::wakeup() writes %lu bytes instead of 8\n", n);
}
}
// EventLoop的方法 => Poller的方法
void EventLoop::updateChannel(Channel *channel)
{
poller_->updateChannel(channel);
}
void EventLoop::removeChannel(Channel *channel)
{
poller_->removeChannel(channel);
}
// bool EventLoop::hasChannel(Channel *channel)
// {
// return poller_->hasChannel(channel);
// }
void EventLoop::doPendingFunctors()
{
std::vector<Functor> functors;
callingPendingFunctors_ = true;
{
std::unique_lock<std::mutex> lock(mutex_);
functors.swap(pendingFunctors_); // 交换的方式减少了锁的临界区范围 提升效率 同时避免了死锁 如果执行functor()在临界区内 且functor()中调用queueInLoop()就会产生死锁
}
for (const Functor &functor : functors)
{
functor(); // 执行当前loop需要执行的回调操作
}
callingPendingFunctors_ = false;
}