Channel类是用于管理文件描述符的类,它负责管理一个文件描述符的读写事件,*并提供接口来设置读写回调、关闭回调、错误回调等。包含一个EventLoop对象,一个文件描述符,一个事件类型,一个回调函数。是TcpConnection和EventLoop交流的桥梁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| class Channel : noncopyable { private: static string eventsToString(int fd, int ev);
void update(); void handleEventWithGuard(Timestamp receiveTime);
static const int kNoneEvent; static const int kReadEvent; static const int kWriteEvent;
EventLoop* loop_; const int fd_; int events_; int revents_; int index_; bool logHup_;
std::weak_ptr<void> tie_; bool tied_; bool eventHandling_; bool addedToLoop_; ReadEventCallback readCallback_; EventCallback writeCallback_; EventCallback closeCallback_; EventCallback errorCallback_; };
|
index_
通过index_实现一种状态记录机制,这种机制决定着poller中的行为,以epoll为例的话,就是决定ADD,DELETE,MOD等操作,如果没有这个index_的话,在每一个事件触发时,epoll不知道要执行什么操作,他不知道这个fd是否在epoll树上。在查看muduo如何实现之前,我们看一下下面的实现,存在哪些问题
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
| void TcpServer::run() { this->m_thread_pool->run(); auto *channel = new Channel(this->m_lfd, FDEvent::ReadEvent, TcpServer::accept_connection, nullptr, nullptr, this); this->m_main_loop->add_task(channel, ElementType::ADD); this->m_main_loop->run(); }
struct ChannelElement { ElementType type; Channel *channel; };
int EventLoop::add_task(Channel *channel, const ElementType type) { this->m_mutex.lock(); auto *node = new class ChannelElement(); node->channel = channel; node->type = type; this->m_task_q.push(node); this->m_mutex.unlock();
if (this->m_thread_id == std::this_thread::get_id()) { std::cout << "\n--------------子线程进入(evLoop->threadId == pthread_self()),threadName = "<< this->m_thread_name + ", threadID = " << this->m_thread_id << std::endl; process_taskQ(); } else { printf("\n----------主线程进入task_wake_up,threadID = %lu\n\n", pthread_self()); task_wake_up(); } return 0; }
void EventLoop::process_taskQ() {
while (!this->m_task_q.empty()) { this->m_mutex.lock(); const auto node = m_task_q.front(); m_task_q.pop(); this->m_mutex.unlock(); if (node->type == ElementType::ADD) { add(node->channel); } else if (node->type == ElementType::DELETE) { remove(node->channel); } else { modify(node->channel); } delete node; } }
|
上面这个例子存一个问题,那就是效率低,每次添加一个任务,都需要执行一次:new ChannelElement(),m_task_q.push(node)和m_task_q.pop();,同时还要获取互斥锁。是一种命令机制
而muduo的做法是一种状态机制
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| void EPollPoller::updateChannel(Channel* channel) { Poller::assertInLoopThread(); const int index = channel->index(); LOG_TRACE << "fd = " << channel->fd() << " events = " << channel->events() << " index = " << index; if (index == kNew || index == kDeleted) { int fd = channel->fd(); if (index == kNew) { assert(channels_.find(fd) == channels_.end()); channels_[fd] = channel; } else { assert(channels_.find(fd) != channels_.end()); assert(channels_[fd] == channel); } channel->set_index(kAdded); update(EPOLL_CTL_ADD, channel); } else { int fd = channel->fd(); (void)fd; assert(channels_.find(fd) != channels_.end()); assert(channels_[fd] == channel); assert(index == kAdded); if (channel->isNoneEvent()) { update(EPOLL_CTL_DEL, channel); channel->set_index(kDeleted); } else { update(EPOLL_CTL_MOD, channel); } } }
|
通过一个标志位index_记录channel的处理状态,不需要每次都new 出一个类似的ChannelElement,即使是push到activeChannels,其速度也是非常快的,因为push的是一个本就存在的指针
| 对比 |
activeChannels.push_back(channel) |
task_queue.push(new ...) |
| 操作对象 |
拷贝一个裸指针 |
拷贝一个裸指针 |
| 内存管理 |
无 (操作的是已存在的对象) |
有 (必须先 new,后 delete) |
| 性能开销 |
极低 (一次指针拷贝) |
高 (一次堆分配 + 一次堆释放) |
tie_和tied_
解决 TcpConnection 的生命周期安全问题。TcpConnection 是通过 std::shared_ptr 管理的, Channel 内部的回调函数如果直接捕获 shared_ptr 会导致循环引用。tie_ 用于保存一个指向 TcpConnection 的 weak_ptr。在 handleEvent 执行回调前,会尝试将 weak_ptr提升为 shared_ptr。如果提升成功,说明 TcpConnection 对象还活着(通过 tie_.lock() 在事件处理期间”锁住”对象,确保其不会被销毁),就安全地执行回调;如果失败,说明 TcpConnection 已经被销毁了,就不再执行回调,从而避免了对悬空指针的访问。tied_ 只是一个标志,表示是否启用了这个机制。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| void TcpConnection::connectEstablished() { channel_->tie(shared_from_this()); }
void Channel::tie(const std::shared_ptr<void>& obj) { tie_ = obj; tied_ = true; }
void Channel::handleEvent(Timestamp receiveTime) { std::shared_ptr<void> guard; if (tied_) { guard = tie_.lock(); if (guard) { handleEventWithGuard(receiveTime); } } else { handleEventWithGuard(receiveTime); } }
|
shared_from_this():用于在对象内部安全地获取指向自身的 shared_ptr,这里为什么不能直接使用 this 指针呢?
1 2 3 4 5 6 7 8 9 10 11
| class BadExample { public: std::shared_ptr<BadExample> getSharedPtr() { return std::shared_ptr<BadExample>(this); } };
auto ptr1 = std::make_shared<BadExample>(); auto ptr2 = ptr1->getSharedPtr();
|
TcpConnection 通过继承 std::enable_shared_from_this 来解决这个问题。