Acceptor类是用于接受新连接的类,它负责监听一个端口,当有新的连接到来时,它负责接受这个连接,并调用回调函数处理这个连接。包含一个Socket对象,一个Channel对象,一个NewConnectionCallback对象。Socket 对底层的socket文件描述符(sockfd)进行了面向对象的封装,提供了 bind, listen, accept等接口。
一、类的初始化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
| Acceptor::Acceptor(EventLoop* loop, const InetAddress& listenAddr, bool reuseport) : loop_(loop), acceptSocket_(sockets::createNonblockingOrDie(listenAddr.family())), acceptChannel_(loop, acceptSocket_.fd()), listening_(false), idleFd_(::open("/dev/null", O_RDONLY | O_CLOEXEC)) { assert(idleFd_ >= 0); acceptSocket_.setReuseAddr(true); acceptSocket_.setReusePort(reuseport); acceptSocket_.bindAddress(listenAddr); acceptChannel_.setReadCallback(std::bind(&Acceptor::handleRead, this)); }
void Acceptor::handleRead() { loop_->assertInLoopThread(); InetAddress peerAddr; int connfd = acceptSocket_.accept(&peerAddr); if (connfd >= 0) { if (newConnectionCallback_) { newConnectionCallback_(connfd, peerAddr); } else { sockets::close(connfd); } } else { if (errno == EMFILE) { ::close(idleFd_); idleFd_ = ::accept(acceptSocket_.fd(), NULL, NULL); ::close(idleFd_); idleFd_ = ::open("/dev/null", O_RDONLY | O_CLOEXEC); } } }
void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr) { loop_->assertInLoopThread(); EventLoop* ioLoop = threadPool_->getNextLoop(); char buf[64]; snprintf(buf, sizeof buf, "-%s#%d", ipPort_.c_str(), nextConnId_); ++nextConnId_; string connName = name_ + buf;
InetAddress localAddr(sockets::getLocalAddr(sockfd)); TcpConnectionPtr conn(new TcpConnection(ioLoop, connName, sockfd, localAddr, peerAddr)); connections_[connName] = conn; conn->setConnectionCallback(connectionCallback_); conn->setMessageCallback(messageCallback_); conn->setWriteCompleteCallback(writeCompleteCallback_); conn->setCloseCallback(std::bind(&TcpServer::removeConnection, this, _1)); ioLoop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn)); }
|
宏观上看,Acceptor这个类并不复杂,主要的任务就是负责初始化监听描述符,并设置监听描述符的回调函数handleRead,在新连接到来时,Acceptor会调用Acceptor会调用headleRead建立连接,并调用newConnection回调函数。
1 2 3 4 5 6 7
| if (errno == EMFILE) { ::close(idleFd_); idleFd_ = ::accept(acceptSocket_.fd(), NULL, NULL); ::close(idleFd_); idleFd_ = ::open("/dev/null", O_RDONLY | O_CLOEXEC); }
|
当服务器并发连接数非常高时,可能会耗尽进程可用的文件描述符(fd)。这时,accept() 会失败并返回 EMFILE 错误。如果不处理,服务器将无法接受任何新连接,相当于“假死”。
muduo 的解决方案:
Acceptor 在构造时,就预先打开一个指向 /dev/null 的文件描述符 idleFd_。它就像一个“备用座位”。
- 当
accept 因 EMFILE 失败时,Acceptor 会立即关闭这个备用的 idleFd_,从而释放出一个文件描述符名额。
- 有了这个名额,
Acceptor 就能成功 accept() 那个等待中的新连接。
- 为了避免新连接因为没有被处理而丢失,
Acceptor 会立即 close() 这个刚刚接受的连接。这虽然拒绝了客户端,但保证了服务器自身不会卡死,并且向客户端发出了一个明确的拒绝信号(RST),客户端可以稍后重试。
- 最后,
Acceptor 会再次打开 /dev/null 来重新占用 idleFd_,为下一次 EMFILE 危机做好准备。
这个技巧确保了即使在 fd 耗尽的极端情况下,Acceptor 所在的 EventLoop 也能正常运转,不会因为 accept 不断失败而陷入死循环
SO_RESUCEADDR和SO_RESUCEPORT的区别
| 特性 |
SO_REUSEADDR (地址复用) |
SO_REUSEPORT (端口复用) |
| 核心目的 |
服务器快速重启 |
性能扩展 (负载均衡) |
| 解决问题 |
允许新启动的服务器立即绑定一个处于 TIME_WAIT 状态的端口 |
允许多个独立的监听套接字绑定到完全相同的 IP 和端口。 |
| 工作模式 |
一个端口在同一时间仍然只能被一个监听套接字绑定。 |
一个端口可以被多个监听套接字同时绑定。 |
| 适用场景 |
几乎所有服务器程序都应该开启,用于开发和运维中的快速迭代和重启。 |
面向高性能、高并发连接的服务器,用于在多核 CPU 上扩展 accept 的处理能力。 |
| 在 muduo 中 |
默认开启 |
默认关闭,需要显式开启。 |
在标准的 TcpServer 模型中,只有一个主线程负责 accept 所有新连接,但当服务器面临极高的连接建立速率时就会成为整个系统的性能瓶颈。acceptSocket_.setReusePort(reuseport); ,也就是SO_RESUCEPORT ,这个选项允许多个线程或进程创建各自的监听套接字,并全部绑定到同一个 IP 和端口上。当新连接到来时,内核会负责进行负载均衡,将这个连接请求“派发”给其中一个监听套接字。这样一来,accept 的工作就被均匀地分摊到了多个 CPU 核心上。
更多细节,见:https://stackoverflow.com/questions/14388706/how-do-so-reuseaddr-and-so-reuseport-differ
仔细看的话,还能发现建立连接的是一个acceptSocket_,而不是系统的accept函数,这是muduo对系统调用的封装,提高扩展性
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| Socket acceptSocket_;
class Socket : noncopyable { public: explicit Socket(int sockfd) : sockfd_(sockfd) { } ~Socket(); int fd() const { return sockfd_; } bool getTcpInfo(struct tcp_info*) const; bool getTcpInfoString(char* buf, int len) const; void bindAddress(const InetAddress& localaddr); void listen(); int accept(InetAddress* peeraddr); void shutdownWrite(); void setTcpNoDelay(bool on); void setReuseAddr(bool on); void setReusePort(bool on); void setKeepAlive(bool on);
private: const int sockfd_; };
Socket::~Socket() { sockets::close(sockfd_); }
void Socket::listen() { sockets::listenOrDie(sockfd_); }
void sockets::listenOrDie(int sockfd) { int ret = ::listen(sockfd, SOMAXCONN); if (ret < 0) { LOG_SYSFATAL << "sockets::listenOrDie"; } }
|
可以看见,Socket内部的实现都放在一个名为sockets的作用域中,设计思想:Socket作为一个抽象层,我们在使用的时候不需要考虑不同平台的调用方式,底层的sockets负责实现阔平台逻辑
二、挂载EventLoop
我们发现,Acceptor没有调用listen函数进行监听,实际上开始监听的操作是在TcpServer的start函数实现的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| void TcpServer::start() { if (started_.getAndSet(1) == 0) { threadPool_->start(threadInitCallback_);
assert(!acceptor_->listening()); loop_->runInLoop(std::bind(&Acceptor::listen, get_pointer(acceptor_))); } }
void EventLoop::runInLoop(Functor cb) { if (isInLoopThread()) { cb(); } else { queueInLoop(std::move(cb)); } }
void Acceptor::listen() { loop_->assertInLoopThread(); listening_ = true; acceptSocket_.listen(); acceptChannel_.enableReading(); }
|
在TcpServer::start函数启动后,loop_->runInLoop(std::bind(&Acceptor::listen, get_pointer(acceptor_)));会将Acceptor::listen函数启动开始监听,并将文件描述符放到主线程的epoll树上监听读实现,当事件触发时,会调用文件描述符的读回调,也就是headleRead函数处理连接。。