一、线程安全注解
充分利用编译器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| EventLoop* loop_ GUARDED_BY(mutex_);
class SCOPED_CAPABILITY MutexLockGuard : noncopyable { public: explicit MutexLockGuard(MutexLock& mutex) ACQUIRE(mutex) : mutex_(mutex) { mutex_.lock(); }
~MutexLockGuard() RELEASE() { mutex_.unlock(); }
private:
MutexLock& mutex_; }; #define MutexLockGuard(x) error "Missing guard object name"
|
EventLoop* loop_ GUARDED_BY(mutex_); 是一种线程安全注解,通常用于标注某个成员变量的访问需要特定的锁保护。在这个例子中,loop_ 的访问需要由 mutex_ 互斥锁保护。
在编译时,静态分析工具会检查代码中是否正确地在访问 loop_ 时持有 mutex_ 锁。如果没有持有锁,工具会发出警告或错误提示。GUARDED_BY 是一种编译期的注解,对运行时行为没有直接影响。它不会生成额外的代码,也不会影响程序的性能。为开发者提供明确的线程安全约定,提醒其他人维护代码时遵守这些规则。
MutexLockGuard(x)是一种防止用法错误的技巧宏,目的是防止你写出如下代码:
这样写会创建一个临时的 MutexLockGuard 对象,它在这一行代码结束后就被销毁,锁也会立即释放,根本起不到加锁保护作用!宏定义把 MutexLockGuard(x) 替换成 error “Missing guard object name”。如果你写了 MutexLockGuard(mutex_);,编译器会报错:“Missing guard object name”。这样强制你必须写变量名,防止误用。
SCOPED_CAPABILITY,ACQUIRE,RELEASE都是配合Clang编译器检查使用的
SCOPED_CAPABILITY 是一个线程安全注解宏,这个类是一个“作用域锁”,构造函数是“加锁”,析构函数是“解锁”。
ACQUIRE(mutex) 也是线程安全注解宏,函数会“获取”某个锁(mutex)。
RELEASE()也是线程安全注解宏,函数会“释放”锁(mutex)。
通过Clang编译时检查,见:Thread Safety Analysis: https://clang.llvm.org/docs/ThreadSafetyAnalysis.html
二、事件处理模式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
| while (!quit_) { activeChannels_.clear(); pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_); ++iteration_; eventHandling_ = true; for (Channel* channel : activeChannels_) { currentActiveChannel_ = channel; currentActiveChannel_->handleEvent(pollReturnTime_); } currentActiveChannel_ = NULL; eventHandling_ = false; doPendingFunctors(); }
Timestamp EPollPoller::poll(int timeoutMs, ChannelList* activeChannels) { LOG_TRACE << "fd total count " << channels_.size(); int numEvents = ::epoll_wait(epollfd_, &*events_.begin(), static_cast<int>(events_.size()), timeoutMs); int savedErrno = errno; Timestamp now(Timestamp::now()); if (numEvents > 0) { LOG_TRACE << numEvents << " events happened"; fillActiveChannels(numEvents, activeChannels); if (implicit_cast<size_t>(numEvents) == events_.size()) { events_.resize(events_.size()*2); } } else if (numEvents == 0) { LOG_TRACE << "nothing happened"; } else { if (savedErrno != EINTR) { errno = savedErrno; LOG_SYSERR << "EPollPoller::poll()"; } } return now; }
void EPollPoller::fillActiveChannels(int numEvents, ChannelList* activeChannels) const { assert(implicit_cast<size_t>(numEvents) <= events_.size()); for (int i = 0; i < numEvents; ++i) { Channel* channel = static_cast<Channel*>(events_[i].data.ptr); #ifndef NDEBUG int fd = channel->fd(); ChannelMap::const_iterator it = channels_.find(fd); assert(it != channels_.end()); assert(it->second == channel); #endif channel->set_revents(events_[i].events); activeChannels->push_back(channel); } }
|
Poller 的职责是 I/O 复用,而 EventLoop 的职责是事件分发。通过activeChannels_将两者分开,可以使系统结构更清晰、耦合度更低、更易于扩展。
1 2 3 4
| if (implicit_cast<size_t>(numEvents) == events_.size()) { events_.resize(events_.size()*2); }
|
epoll返回的数量等于我们传入的最大数量,说明epoll内部其实有更多的事件触发,只不过受限于我们传入的static_cast<int>(events_.size())无法全部传出,所以需要扩容方便后续处理更多事件
1 2 3 4 5 6 7 8 9
| eventHandling_ = true;
for (Channel* channel : activeChannels_) { currentActiveChannel_ = channel; currentActiveChannel_->handleEvent(pollReturnTime_); } currentActiveChannel_ = NULL; eventHandling_ = false;
|
eventHandling作为一个状态标志,防止在处理activeChannels_中的事件时,修改activeChannels_中的信息导致出差。不能在遍历一个容器的同时修改它。
如在removeChannel函数中
1 2 3 4 5 6 7 8 9 10 11 12
| void EventLoop::removeChannel(Channel* channel) { assert(channel->ownerLoop() == this); assertInLoopThread(); if (eventHandling_) { assert(currentActiveChannel_ == channel || std::find(activeChannels_.begin(), activeChannels_.end(), channel) == activeChannels_.end()); } poller_->removeChannel(channel); }
|
在取出一个channel时,必须要确保当前活跃的channel和要删除channel一致或者不在activeChannels_中的chennel,如果两个channel不一致且存在activeChannels_中,就可能会发生异常情况,在
1 2 3 4 5
| for (Channel* channel : activeChannels_) { currentActiveChannel_ = channel; currentActiveChannel_->handleEvent(pollReturnTime_); }
|
中执行到被删除的channel时就可能会发生错误(空指针)
三、职责单一原则
muduo强调类职责单一原则,其他类通过runInLoop添加到EventLoop中的pendingFunctors_,这个**pendingFunctors_**在EventLoop处理完全部epoll事件后统一处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| while (!quit_) { activeChannels_.clear(); pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_); ++iteration_; if (Logger::logLevel() <= Logger::TRACE) { printActiveChannels(); } eventHandling_ = true; for (Channel* channel : activeChannels_) { currentActiveChannel_ = channel; currentActiveChannel_->handleEvent(pollReturnTime_); } currentActiveChannel_ = NULL; eventHandling_ = false; doPendingFunctors(); }
void EventLoop::doPendingFunctors() { std::vector<Functor> functors; callingPendingFunctors_ = true;
{ MutexLockGuard lock(mutex_); functors.swap(pendingFunctors_); }
for (const Functor& functor : functors) { functor(); } callingPendingFunctors_ = false; }
|
如将监听描述符上树添加到EventLoop
1 2
| loop_->runInLoop(std::bind(&Acceptor::listen, get_pointer(acceptor_)));
|
如关闭连接等等
1 2
| loop_->runInLoop(std::bind(&TcpServer::removeConnectionInLoop, this, conn)); loop_->runInLoop(std::bind(&TcpConnection::shutdownInLoop, this));
|
所有的事件操作都需要在EventLoop中执行
四、唤醒EventLoop线程
1 2
| int wakeupFd_; std::unique_ptr<Channel> wakeupChannel_;
|
muduo将wakeupChannel_添加到每一个EventLoop中,这样在有事件发生时,能够及时的唤醒对应的线程,防止阻塞
五、定时任务
muduo库还支持定时任务,在EventLoop初始化的时候,初始化一个timerQueue_,这个容器记录能够记录着各个定时任务,在TimerQueue初始化的时候,也会创建一个timerfd_放到EventLoop的监听当中,当最近的定时任务到期时,timerfd 会变为可读,EventLoop 从 poll 调用中被唤醒,并将 timerfdChannel_ 作为一个活跃事件进行处理,最终调用其读回调,也就是 TimerQueue::handleRead,从而执行到期的定时任务。将时间事件转换为一个文件描述符的 I/O 事件,典型的应用有:定时发送心跳,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133
| private: typedef std::pair<Timestamp, Timer*> Entry; typedef std::set<Entry> TimerList;
typedef std::pair<Timer*, int64_t> ActiveTimer; typedef std::set<ActiveTimer> ActiveTimerSet;
TimerList timers_; ActiveTimerSet activeTimers_;
bool TimerQueue::insert(Timer* timer) { loop_->assertInLoopThread(); assert(timers_.size() == activeTimers_.size()); bool earliestChanged = false; Timestamp when = timer->expiration(); TimerList::iterator it = timers_.begin(); if (it == timers_.end() || when < it->first) { earliestChanged = true; }
timers_.insert(Entry(when, timer)); activeTimers_.insert(ActiveTimer(timer, timer->sequence()));
assert(timers_.size() == activeTimers_.size()); return earliestChanged; }
void TimerQueue::handleRead() { Timestamp now(Timestamp::now()); readTimerfd(timerfd_, now);
std::vector<Entry> expired = getExpired(now);
callingExpiredTimers_ = true; cancelingTimers_.clear();
for (const Entry& it : expired) { it.second->run(); } callingExpiredTimers_ = false;
reset(expired, now); }
std::vector<TimerQueue::Entry> TimerQueue::getExpired(Timestamp now) { std::vector<Entry> expired; Entry sentry(now, reinterpret_cast<Timer*>(UINTPTR_MAX)); TimerList::iterator end = timers_.lower_bound(sentry);
std::copy(timers_.begin(), end, back_inserter(expired)); timers_.erase(timers_.begin(), end);
for (const Entry& it : expired) { ActiveTimer timer(it.second, it.second->sequence()); size_t n = activeTimers_.erase(timer); assert(n == 1); (void)n; } return expired; }
void TimerQueue::reset(const std::vector<Entry>& expired, Timestamp now) { Timestamp nextExpire;
for (const Entry& it : expired) { ActiveTimer timer(it.second, it.second->sequence()); if (it.second->repeat() && cancelingTimers_.find(timer) == cancelingTimers_.end()) { it.second->restart(now); insert(it.second); } else { delete it.second; } }
if (!timers_.empty()) { nextExpire = timers_.begin()->second->expiration(); }
if (nextExpire.valid()) { resetTimerfd(timerfd_, nextExpire); } }
void resetTimerfd(int timerfd, Timestamp expiration) { struct itimerspec newValue; struct itimerspec oldValue; memZero(&newValue, sizeof newValue); memZero(&oldValue, sizeof oldValue); newValue.it_value = howMuchTimeFromNow(expiration); int ret = ::timerfd_settime(timerfd, 0, &newValue, &oldValue); if (ret) { LOG_SYSERR << "timerfd_settime()"; } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| std::unique_ptr<TimerQueue> timerQueue_;
timerQueue_(new TimerQueue(this)),
TimerQueue::TimerQueue(EventLoop* loop) : loop_(loop), timerfd_(createTimerfd()), timerfdChannel_(loop, timerfd_), timers_(), callingExpiredTimers_(false) { timerfdChannel_.setReadCallback( std::bind(&TimerQueue::handleRead, this)); timerfdChannel_.enableReading(); }
|
六、任务安全处理
muduo在执行任务时非常注意安全问题,通常都会使用一个原子变量标注,防止在执行任务的时候其他类修改任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| eventHandling_(false), callingPendingFunctors_(false),
eventHandling_ = true;
for (Channel* channel : activeChannels_) { currentActiveChannel_ = channel; currentActiveChannel_->handleEvent(pollReturnTime_); } currentActiveChannel_ = NULL;
eventHandling_ = false;
callingPendingFunctors_ = true; { MutexLockGuard lock(mutex_); functors.swap(pendingFunctors_); }
for (const Functor& functor : functors) { functor(); }
callingPendingFunctors_ = false;
callingExpiredTimers_ = true; cancelingTimers_.clear();
for (const Entry& it : expired) { it.second->run(); } callingExpiredTimers_ = false;
|