断开连接重试机制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void Connector::retry(int sockfd)
{
sockets::close(sockfd);
setState(kDisconnected);
if (connect_)
{
loop_->runAfter(retryDelayMs_ / 1000.0,std::bind(&Connector::startInLoop, shared_from_this()));
retryDelayMs_ = std::min(retryDelayMs_ * 2, kMaxRetryDelayMs);
}
else
{
LOG_DEBUG << "do not connect";
}
}

muduo在尝试重连时,并不是立刻进行连接,而是创建一个定时任务,并且,这个定时任务的间隔时间越来越长,通过翻倍的方式进行,知道最大间隔时长kMaxRetryDelayMs,避免频繁的尝试重连对服务器造成压力。

muduo 中 TcpClient 从开始连接到关闭重试的完整生命周期

阶段一:客户端初始化

在创建一个 TcpClient 对象时,主要完成了以下工作:

  1. 构造函数会保存 EventLoop 指针、服务器地址 InetAddress 和客户端名称。
  2. TcpClient 自身不处理连接的细节,而是将这个任务委托给一个内部的 Connector 对象。Connector 的核心职责就是与服务器建立连接
  3. TcpClient 会向 Connector 注册一个回调函数 TcpClient::newConnection。这个回调函数会在 Connector 成功建立连接后被调用。
1
2
3
4
5
6
7
8
9
10
11
12
TcpClient::TcpClient(EventLoop* loop, const InetAddress& serverAddr, const string& nameArg)
: loop_(CHECK_NOTNULL(loop)),
connector_(new Connector(loop, serverAddr)),
name_(nameArg),
connectionCallback_(defaultConnectionCallback),
messageCallback_(defaultMessageCallback),
retry_(false),
connect_(true),
nextConnId_(1)
{
connector_->setNewConnectionCallback(std::bind(&TcpClient::newConnection, this, _1));//设置成功连接的回调函数
}

阶段二:发起连接

  1. 调用TcpClient::connect函数连接服务器

    1
    2
    3
    4
    5
    void TcpClient::connect()
    {
    connect_ = true;
    connector_->start();
    }
  2. 启动connector,调用connector::start()函数

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    void Connector::start()
    {
    connect_ = true;
    loop_->runInLoop(std::bind(&Connector::startInLoop, this)); // FIXME: unsafe
    }

    void Connector::startInLoop()
    {
    loop_->assertInLoopThread();
    assert(state_ == kDisconnected);
    if (connect_)
    {
    connect();
    }
    else
    {
    LOG_DEBUG << "do not connect";
    }
    }

    void Connector::connect()
    {
    int sockfd = sockets::createNonblockingOrDie(serverAddr_.family());
    int ret = sockets::connect(sockfd, serverAddr_.getSockAddr());
    int savedErrno = (ret == 0) ? 0 : errno;
    switch (savedErrno)
    {
    case 0:
    case EINPROGRESS:
    case EINTR:
    case EISCONN:
    connecting(sockfd);
    break;

    case EAGAIN:
    case EADDRINUSE:
    case EADDRNOTAVAIL:
    case ECONNREFUSED:
    case ENETUNREACH:
    retry(sockfd);
    break;

    case EACCES:
    case EPERM:
    case EAFNOSUPPORT:
    case EALREADY:
    case EBADF:
    case EFAULT:
    case ENOTSOCK:
    LOG_SYSERR << "connect error in Connector::startInLoop " << savedErrno;
    sockets::close(sockfd);
    break;

    default:
    LOG_SYSERR << "Unexpected error in Connector::startInLoop " << savedErrno;
    sockets::close(sockfd);
    // connectErrorCallback_();
    break;
    }
    }

  3. start函数内部调用将Connector::startInLoop任务放到反应堆中执行,然后执行connect();函数,这个函数是非阻塞的,所以立即返回

    • 如果返回 0,表示连接立即成功(通常发生在连接本地地址时)。
    • 如果返回 -1errnoEINPROGRESS,表示连接正在进行中。这是最常见的情况。
    • 其他错误则表示连接失败,可能会触发重试逻辑。

    调用connecting函数,关注它的写事件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    void Connector::connecting(int sockfd)
    {
    setState(kConnecting);
    assert(!channel_);
    channel_.reset(new Channel(loop_, sockfd));
    channel_->setWriteCallback(std::bind(&Connector::handleWrite, this));
    channel_->setErrorCallback(std::bind(&Connector::handleError, this));

    channel_->enableWriting();
    }

阶段三:连接建立成功

  1. 当建立连接成功时,poller触发时间,调用Connector::handleWrite方法,在这个方法中,会检验是否真的连接成功了,如果连接成功,调用newConnectionCallback_方法,也就是TcpClient::newConnection方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    void Connector::handleWrite()
    {
    LOG_TRACE << "Connector::handleWrite " << state_;

    if (state_ == kConnecting)
    {
    int sockfd = removeAndResetChannel();
    // 获取socket的错误码,判断是否真的连接成功了,0表示连接成功,非0表示连接失败
    int err = sockets::getSocketError(sockfd);
    if (err)
    {
    LOG_WARN << "Connector::handleWrite - SO_ERROR = " << err << " " << strerror_tl(err);
    // 尝试重连
    retry(sockfd);
    }
    // 判断是否是自连接,如果是自连接,则尝试重连
    else if (sockets::isSelfConnect(sockfd))
    {
    LOG_WARN << "Connector::handleWrite - Self connect";
    retry(sockfd);
    }
    // 连接成功,设置状态为已连接,并调用回调函数
    else
    {
    setState(kConnected);
    if (connect_)
    {
    newConnectionCallback_(sockfd);
    }
    else
    {
    sockets::close(sockfd);
    }
    }
    }
    else
    {
    // what happened?
    assert(state_ == kDisconnected);
    }
    }

    void TcpClient::newConnection(int sockfd)
    {
    loop_->assertInLoopThread();
    // 获取对端地址
    InetAddress peerAddr(sockets::getPeerAddr(sockfd));
    char buf[32];
    // 格式化连接名称
    snprintf(buf, sizeof buf, ":%s#%d", peerAddr.toIpPort().c_str(), nextConnId_);
    ++nextConnId_;
    // 连接名称
    string connName = name_ + buf;

    // 获取本地地址
    InetAddress localAddr(sockets::getLocalAddr(sockfd));
    // 创建TcpConnection对象
    TcpConnectionPtr conn(new TcpConnection(loop_, connName, sockfd, localAddr, peerAddr));

    // 设置回调函数
    conn->setConnectionCallback(connectionCallback_);
    conn->setMessageCallback(messageCallback_);
    // 设置写完成回调函数
    conn->setWriteCompleteCallback(writeCompleteCallback_);
    conn->setCloseCallback(std::bind(&TcpClient::removeConnection, this, _1));
    // 设置连接
    {
    MutexLockGuard lock(mutex_);
    connection_ = conn;
    }
    conn->connectEstablished();
    }

    void TcpConnection::connectEstablished()
    {
    loop_->assertInLoopThread();
    assert(state_ == kConnecting);
    setState(kConnected);
    channel_->tie(shared_from_this());
    channel_->enableReading();

    connectionCallback_(shared_from_this());
    }

    TcpClient::newConnection会设置各种回调,然后调用TcpConnection::connectEstablished方法,开始监听读事件,然后调用connectionCallback_提示建立连接成功

阶段四:连接断开与触发重试

连接可能因为多种原因断开:客户端主动断开、服务器断开、网络故障等。

检测到断开

  • 对端关闭TcpConnection::handleReadread() 时返回 0,表示对端关闭了连接。
  • 发生错误handleRead 读取时出错,或 handleError 被调用。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
void TcpConnection::handleRead(Timestamp receiveTime)
{
loop_->assertInLoopThread();
int savedErrno = 0;
ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);
if (n > 0)
{
messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);
}
else if (n == 0)
{
handleClose();
}
else
{
errno = savedErrno;
LOG_SYSERR << "TcpConnection::handleRead";
handleError();
}
}

void TcpConnection::handleClose()
{
loop_->assertInLoopThread();
LOG_TRACE << "fd = " << channel_->fd() << " state = " << stateToString();
assert(state_ == kConnected || state_ == kDisconnecting);

setState(kDisconnected);
channel_->disableAll();

TcpConnectionPtr guardThis(shared_from_this());
connectionCallback_(guardThis);

closeCallback_(guardThis);
}
  1. 将连接状态设置为 kDisconnected
  2. Poller 中移除所有事件监听。
  3. 调用用户的 ConnectionCallback,通知连接已断开。
  4. 调用内部设置的 closeCallback_,也就是 TcpClient::removeConnection
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
void TcpClient::removeConnection(const TcpConnectionPtr& conn)
{
loop_->assertInLoopThread();
assert(loop_ == conn->getLoop());

{
MutexLockGuard lock(mutex_);
assert(connection_ == conn);
connection_.reset();
}

loop_->queueInLoop(std::bind(&TcpConnection::connectDestroyed, conn));
if (retry_ && connect_)
{
connector_->restart();
}
}

void TcpConnection::connectDestroyed()
{
loop_->assertInLoopThread();
if (state_ == kConnected)
{
setState(kDisconnected);
channel_->disableAll();

connectionCallback_(shared_from_this());
}
channel_->remove();
}
  1. 释放对 TcpConnection 对象的引用。TcpConnection 对象会在其所在的 I/O 线程中被安全地销毁。
  2. 检查retry_和connect_标志判断是否重试

阶段五:执行重试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
void Connector::restart()
{
loop_->assertInLoopThread();
setState(kDisconnected);
retryDelayMs_ = kInitRetryDelayMs;
connect_ = true;
startInLoop();
}

void Connector::startInLoop()
{
loop_->assertInLoopThread();
assert(state_ == kDisconnected);
if (connect_)
{
connect();
}
else
{
LOG_DEBUG << "do not connect";
}
}

void Connector::retry(int sockfd)
{
sockets::close(sockfd);
setState(kDisconnected);
if (connect_)
{
loop_->runAfter(retryDelayMs_ / 1000.0,
std::bind(&Connector::startInLoop, shared_from_this()));
retryDelayMs_ = std::min(retryDelayMs_ * 2, kMaxRetryDelayMs);
}
else
{
LOG_DEBUG << "do not connect";
}
}

然后重试连接失败,调用retry(sockfd);函数重试,这个过程会一直循环,直到连接成功或用户调用 stop()

阶段六:客户端关闭

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
void Connector::stop()
{
connect_ = false;
loop_->queueInLoop(std::bind(&Connector::stopInLoop, this));
}

void Connector::stopInLoop()
{
loop_->assertInLoopThread();
if (state_ == kConnecting)
{
setState(kDisconnected);
int sockfd = removeAndResetChannel();
retry(sockfd);
}
}

int Connector::removeAndResetChannel()
{
//取消监听事件
channel_->disableAll();
//删除poller上的监听
channel_->remove();
int sockfd = channel_->fd();
loop_->queueInLoop(std::bind(&Connector::resetChannel, this));
return sockfd;
}

void Connector::resetChannel()
{
channel_.reset();
}

TcpClient 对象析构时,它会确保 Connector 被停止,并且如果还存在 TcpConnection,会通过 forceClose() 强制关闭它,保证所有资源被正确释放。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
TcpClient::~TcpClient()
{
LOG_INFO << "TcpClient::~TcpClient[" << name_ << "] - connector " << get_pointer(connector_);
TcpConnectionPtr conn;
bool unique = false;
{
MutexLockGuard lock(mutex_);
unique = connection_.unique();
conn = connection_;
}
if (conn)
{
assert(loop_ == conn->getLoop());
// FIXME: not 100% safe, if we are in different thread
CloseCallback cb = std::bind(&detail::removeConnection, loop_, _1);
loop_->runInLoop(std::bind(&TcpConnection::setCloseCallback, conn, cb));
if (unique)
{
conn->forceClose();
}
}
else
{
connector_->stop();
// FIXME: HACK
loop_->runAfter(1, std::bind(&detail::removeConnector, connector_));
}
}

muduo库的AsyncLogging异步日志记录使用双缓冲+任务队列实现,同时,在日志内容过多时,主动丢弃部分日志,确保系统高效的运行,防止被日志阻塞。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
BufferPtr currentBuffer_ GUARDED_BY(mutex_);//当前缓冲区
BufferPtr nextBuffer_ GUARDED_BY(mutex_);//备用缓冲区
BufferVector buffers_ GUARDED_BY(mutex_);//待写入队列


void AsyncLogging::append(const char* logline, int len)
{
muduo::MutexLockGuard lock(mutex_);
// 如果当前缓冲区有足够的空间,则直接追加到当前缓冲区
if (currentBuffer_->avail() > len)
{
currentBuffer_->append(logline, len);
}
else
{
// 将当前缓冲区放入待写入队列或者说待写入缓冲区
buffers_.push_back(std::move(currentBuffer_));
// 为 currentBuffer_ 获取一个新的空缓冲区
if (nextBuffer_) // 如果备用缓冲区存在
{
// 将备用缓冲区设置为当前缓冲区
currentBuffer_ = std::move(nextBuffer_);
}
else
{
// 在备用缓冲区不存在的情况下,为 currentBuffer_ 分配一个新的空缓冲区
currentBuffer_.reset(new Buffer);
}
// 将新的日志消息写入新的 currentBuffer_
currentBuffer_->append(logline, len);
// 通知等待的线程开始写入数据(当前缓冲区已满,批量写入)
cond_.notify();
}
}

后端一直循环写入日志,看起来比较绕,currentBuffer,nextBuffer,buffers,newBuffer1,newBuffer2,buffersToWrite,一共六个buffer

  1. currentBuffer:前端写入日志的buffer
  2. nextBuffer:前端备用buffer
  3. buffers:前端待写入日志队列
  4. newBuffer1:后端备用buffer
  5. newBuffer2:后端备用buffer
  6. bufferToWrite:真正要写入日志的buffer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
void AsyncLogging::threadFunc()
{
// 确认后端日志线程已启动
assert(running_ == true);
// 使用 CountDownLatch 通知前端线程:后端线程已成功启动,可以开始记录日志了
latch_.countDown();

// 创建一个 LogFile 对象,这是日志最终写入的目标文件。
// 第三个参数 threadSafe 设置为 false,因为所有写操作都在这一个后端线程内完成,无需加锁。
LogFile output(basename_, rollSize_, false);

// 这两个缓冲区用于后续和前端的缓冲区进行交换,避免在后端线程中频繁分配新内存
BufferPtr newBuffer1(new Buffer);
BufferPtr newBuffer2(new Buffer);
newBuffer1->bzero();
newBuffer2->bzero();

// 准备一个 vector,用于存放从前端(append函数)收集到的、待写入文件的缓冲区指针
BufferVector buffersToWrite;
buffersToWrite.reserve(16);

// 后端日志线程主循环
while (running_)
{
// 在循环开始时,断言两个空闲缓冲区都是空的,并且待写入区也是空的
assert(newBuffer1 && newBuffer1->length() == 0);
assert(newBuffer2 && newBuffer2->length() == 0);
assert(buffersToWrite.empty());

{
muduo::MutexLockGuard lock(mutex_);

// 等待可能被两种情况唤醒:
// 1. 前端线程写满一个 buffer 后调用 cond_.notify() 唤醒。
// 2. 超时时间到达,即使没有数据也要进行一次日志刷盘。
if (buffers_.empty()) // unusual usage!
{
cond_.waitForSeconds(flushInterval_);
}

// 无论 cond_ 是如何被唤醒的,我们都把前端的 currentBuffer_ 移到待写入队列中。
// 这样可以确保即使在超时的情况下,当前缓冲区里未满的日志也能被收集到。
buffers_.push_back(std::move(currentBuffer_));

// 将之前准备好的空闲缓冲区 newBuffer1 设置为新的 currentBuffer_
currentBuffer_ = std::move(newBuffer1);

// 将前端的整个待写入队列 buffers_ 和后端的 buffersToWrite 进行交换。
// 交换后,buffers_ 变为空,前端可以继续无锁地向其中添加写满的 buffer。
buffersToWrite.swap(buffers_);

// 如果前端把备用缓冲区 nextBuffer_ 也用掉了,那么就把另一个空闲缓冲区 newBuffer2 补上。
// currentbuffer写满了,执行currentbuffer = move(nextbuffer),此后nextbuffer为空
if (!nextBuffer_)
{
nextBuffer_ = std::move(newBuffer2);
}
}

// 断言我们确实拿到了待写入的数据
assert(!buffersToWrite.empty());

// 如果待写入的缓冲区数量过多(超过25个),说明前端日志产生速度远超后端写入速度,
// 为了防止内存无限增长,这里会丢弃掉一部分日志。
if (buffersToWrite.size() > 25)
{
char buf[256];
snprintf(buf, sizeof buf, "Dropped log messages at %s, %zd larger buffers\n",
Timestamp::now().toFormattedString().c_str(),
buffersToWrite.size()-2);
fputs(buf, stderr);
output.append(buf, static_cast<int>(strlen(buf)));
// 只保留前两个 buffer 的日志,其他的丢弃
buffersToWrite.erase(buffersToWrite.begin()+2, buffersToWrite.end());
}

// 遍历所有待写入的 buffer,将它们的内容追加到 LogFile 对象中
for (const auto& buffer : buffersToWrite)
{
output.append(buffer->data(), buffer->length());
}
if (buffersToWrite.size() > 2)
{
// 丢弃多余的buffer,只保留两个,用于回收利用,避免内存持有过多
buffersToWrite.resize(2);
}

// 从处理完的 buffersToWrite 中回收 buffer 作为下一个空闲缓冲区(这个一定执行)
if (!newBuffer1)
{
assert(!buffersToWrite.empty());
newBuffer1 = std::move(buffersToWrite.back());
buffersToWrite.pop_back();
newBuffer1->reset(); // 清空 buffer
}
//可能执行,如果是超时进来的,就不会执行,因为没有执行currentbuffer = move(nextbuffer),nextbuffer没有为空那么上面的交换
//就没有执行,所以newBUffer2就不会为空,反之为空
if (!newBuffer2)
{
assert(!buffersToWrite.empty());
newBuffer2 = std::move(buffersToWrite.back());
buffersToWrite.pop_back();
newBuffer2->reset(); // 清空 buffer
}

// 清空待写入队列,并强制将 LogFile 缓冲区的数据刷到磁盘
buffersToWrite.clear();
output.flush();
}
// 线程退出前,最后一次将 LogFile 缓冲区的数据刷到磁盘
output.flush();
}

如何使用:

这个AsyncLogging需要配合Logger类使用

  1. LOG_INFO 宏创建 Logger 对象
1
2
3
4
#define LOG_INFO if (muduo::Logger::logLevel() <= muduo::Logger::INFO) \
muduo::Logger(__FILE__, __LINE__).stream()

LOG_INFO << "hello world";

日志会创建一个临时对象,然后这个对象在结束的时候通过g_output将内容添加到currentBuffer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Logger::~Logger()
{
impl_.finish();
const LogStream::Buffer& buf(stream().buffer());
g_output(buf.data(), buf.length());
if (impl_.level_ == FATAL)
{
g_flush();
abort();
}
}
Logger::OutputFunc g_output = defaultOutput;

void defaultOutput(const char* msg, int len)
{
size_t n = fwrite(msg, 1, len, stdout);
// FIXME check n
(void)n;
}

从这里可以看出默认的输出是输出到终端,所以我们在使用的时候需要自定义输出器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#include <muduo/base/CurrentThread.h>
#include <muduo/base/AsyncLogging.h>
#include <muduo/base/Logging.h>
#include <muduo/base/Thread.h>

//定义全局AsyncLogging指针
muduo::AsyncLogging* g_asyncLogging = nullptr;

//定义异步输出函数
void asyncOutput(const char* msg, int len)
{
g_asyncLogging->append(msg, len);
}

int main()
{
muduo::AsyncLogging log("log.txt", 0);
g_asyncLogging = &log;

//设置日志输出函数
muduo::Logger::setOutput(asyncOutput);
//启动异步日志线程
log.start();
//输出日志
LOG_INFO << "hello world";
muduo::CurrentThread::sleepUsec(1000 * 1000);
return 0;
}

清华源

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
[base]
name=CentOS-7.9.2009 - Base - tsinghua.com
#mirrorlist=
baseurl=https://mirrors.tuna.tsinghua.edu.cn/centos-vault/7.9.2009/os/$basearch/
gpgcheck=1
gpgkey=file:///etc/pki/rpm-gpg/RPM-GPG-KEY-CentOS-7

#released updates
[updates]
name=CentOS-7.9.2009 - Updates - tsinghua.com
#mirrorlist=
baseurl=https://mirrors.tuna.tsinghua.edu.cn/centos-vault/7.9.2009/updates/$basearch/
gpgcheck=1
gpgkey=file:///etc/pki/rpm-gpg/RPM-GPG-KEY-CentOS-7
#additional packages that may be useful
[extras]
name=CentOS-7.9.2009 - Extras - tsinghua.com
#mirrorlist=
baseurl=https://mirrors.tuna.tsinghua.edu.cn/centos-vault/7.9.2009/extras/$basearch/
gpgcheck=1
gpgkey=file:///etc/pki/rpm-gpg/RPM-GPG-KEY-CentOS-7
#additional packages that extend functionality of existing packages
[centosplus]
name=CentOS-7.9.2009 - Plus - tsinghua.com
baseurl=https://mirrors.tuna.tsinghua.edu.cn/centos-vault/7.9.2009/centosplus/$basearch/
gpgcheck=1
enabled=0
gpgkey=file:///etc/pki/rpm-gpg/RPM-GPG-KEY-CentOS-7

官方源

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
[base]
name=CentOS-7.9.2009 - Base
baseurl=https://vault.centos.org/7.9.2009/os/$basearch/
gpgcheck=1
gpgkey=https://vault.centos.org/RPM-GPG-KEY-CentOS-7
enabled=1

[updates]
name=CentOS-7.9.2009 - Updates
baseurl=https://vault.centos.org/7.9.2009/updates/$basearch/
gpgcheck=1
gpgkey=https://vault.centos.org/RPM-GPG-KEY-CentOS-7
enabled=1

[extras]
name=CentOS-7.9.2009 - Extras
baseurl=https://vault.centos.org/7.9.2009/extras/$basearch/
gpgcheck=1
gpgkey=https://vault.centos.org/RPM-GPG-KEY-CentOS-7
enabled=1

[centosplus]
name=CentOS-7.9.2009 - Plus
baseurl=https://vault.centos.org/7.9.2009/centosplus/$basearch/
gpgcheck=1
gpgkey=https://vault.centos.org/RPM-GPG-KEY-CentOS-7
enabled=0

一、配置静态IP

修改Address,Netmask,Gateway,DNS,修改后点击Apply并重启网络

image-20250719140135106

二、代理环境

软件下载:clash for windows linux

image-20250719160231349

如果是root用户,需要添加--no-sandbox运行

1
./cfw --no-sandbox

添加环境

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
vim ~/.bashrc
# 添加如下信息
export http_proxy="http://127.0.0.1:7890"
export https_proxy="http://127.0.0.1:7890"
export HTTP_PR0XY="http://127.0.0.1:7890"
export HTTPS_PR0XY="http://127.0.0.1:7890"
export no_proxy="localhost,127.0.0.1""
source ~/.bashrc

sudo vim /etc/profile
export http_proxy="http://127.0.0.1:7890"
export https_proxy="http://127.0.0.1:7890"
export HTTP_PR0XY="http://127.0.0.1:7890"
export HTTPS_PR0XY="http://127.0.0.1:7890"
export no_proxy="localhost,127.0.0.1""
source /etc/profile

vim /etc/environment
export http_proxy="http://127.0.0.1:7890"
export https_proxy="http://127.0.0.1:7890"
export HTTP_PR0XY="http://127.0.0.1:7890"
export HTTPS_PR0XY="http://127.0.0.1:7890"
export no_proxy="localhost,127.0.0.1""
source /etc/environment

sudo vim /etc/yum.conf
export http_proxy="http://127.0.0.1:7890"
export https_proxy="http://127.0.0.1:7890"
export HTTP_PR0XY="http://127.0.0.1:7890"
export HTTPS_PR0XY="http://127.0.0.1:7890"
export no_proxy="localhost,127.0.0.1""

sudo vim /etc/wgetrc
http_proxy = http://127.0.0.1:7890
https_proxy = http://127.0.0.1:7890
HTTP_PR0XY = http://127.0.0.1:7890
HTTPS_PR0XY = http://127.0.0.1:7890
no_proxy = localhost,127.0.0.1

curl -I http://www.baidu.com

一键脚本:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
#!/bin/bash
# =================================================================
# 一键为 CentOS 设置/取消 127.0.0.1:7890 代理 (优化版)
# 用法:
# sudo ./set_proxy.sh on # 启用代理
# sudo ./set_proxy.sh off # 关闭代理
# =================================================================

# --- 配置区 ---
PROXY_URL="http://127.0.0.1:7890"
NO_PROXY="localhost,127.0.0.1,::1,192.168.0.0/16,10.0.0.0/8"

# --- 标记,用于安全地添加和删除配置 ---
BEGIN_MARKER="# BEGIN PROXY CONFIG - Managed by script"
END_MARKER="# END PROXY CONFIG - Managed by script"

# --- 获取真正的用户家目录 ---
# 如果是通过 sudo 执行,SUDO_USER 变量会包含原用户名
if [[ -n "$SUDO_USER" ]]; then
USER_HOME=$(getent passwd "$SUDO_USER" | cut -d: -f6)
else
# 如果是 root 直接执行,则使用 root 的家目录
USER_HOME=$HOME
fi

# 需要写入的环境变量文件列表
# 注意:/etc/environment 的语法和其他文件不同,没有 export
declare -a ENV_FILES=("$USER_HOME/.bashrc" "/etc/profile")
ENV_FILE_SYSTEM="/etc/environment"

# --- 函数定义 ---

# 安全地写入或清除代理配置
handle_proxy_files() {
local action="$1"

# 构造配置内容
local proxy_content_export="
export http_proxy=\"$PROXY_URL\"
export https_proxy=\"$PROXY_URL\"
export HTTP_PROXY=\"$PROXY_URL\"
export HTTPS_PROXY=\"$PROXY_URL\"
export no_proxy=\"$NO_PROXY\"
export NO_PROXY=\"$NO_PROXY\""

local proxy_content_plain="
http_proxy=\"$PROXY_URL\"
https_proxy=\"$PROXY_URL\"
HTTP_PROXY=\"$PROXY_URL\"
HTTPS_PROXY=\"$PROXY_URL\"
no_proxy=\"$NO_PROXY\"
NO_PROXY=\"$NO_PROXY\""

# 清理函数:使用标记来精确删除
clear_proxy() {
local file="$1"
if [ -f "$file" ]; then
# 使用 sed 删除从 BEGIN_MARKER 到 END_MARKER 之间的所有行
sed -i "/^${BEGIN_MARKER}$/,/^${END_MARKER}$/d" "$file"
fi
}

# 写入函数
write_proxy() {
local file="$1"
local content="$2"
# 写入前先清理,防止重复
clear_proxy "$file"
# 使用 cat 和 EOF 来写入整个块
cat >> "$file" <<EOF

${BEGIN_MARKER}
${content}
${END_MARKER}
EOF
}

# 遍历文件列表进行操作
for f in "${ENV_FILES[@]}"; do
if [[ "$action" == "on" ]]; then
write_proxy "$f" "$proxy_content_export"
else
clear_proxy "$f"
fi
done

# 单独处理 /etc/environment
if [[ "$action" == "on" ]]; then
write_proxy "$ENV_FILE_SYSTEM" "$proxy_content_plain"
else
clear_proxy "$ENV_FILE_SYSTEM"
fi
}

# YUM/DNF 配置
handle_package_manager() {
local conf_file="/etc/yum.conf"
if [ ! -f "$conf_file" ]; then
conf_file="/etc/dnf/dnf.conf" # 兼容新的 CentOS/RHEL
fi

# 先删除旧的 proxy 设置行,避免重复
sed -i '/^proxy=/d' "$conf_file"

if [[ "$1" == "on" ]]; then
# 在 [main] 部分追加配置
# 如果[main]不存在,则直接追加到文件末尾
if grep -q "\[main\]" "$conf_file"; then
sed -i "/\[main\]/a proxy=$PROXY_URL" "$conf_file"
else
echo "proxy=$PROXY_URL" >> "$conf_file"
fi
fi
}

# --- 主流程 ---
case "$1" in
on)
echo ">>> 正在为系统配置代理: $PROXY_URL"
handle_proxy_files on
handle_package_manager on
echo ">>> 代理配置写入成功!"
echo
echo "========================= 重要提示 ========================="
echo "请执行以下命令使配置在当前终端立即生效:"
echo " source ${USER_HOME}/.bashrc"
echo "或者,请重新打开一个新的终端窗口。"
echo "=========================================================="
;;
off)
echo ">>> 正在清除系统代理配置..."
handle_proxy_files off
handle_package_manager off
echo ">>> 代理配置已清除!"
echo
echo "========================= 重要提示 ========================="
echo "请执行以下命令使变更在当前终端立即生效:"
echo " source ${USER_HOME}/.bashrc"
echo "或者,请重新打开一个新的终端窗口。"
echo "=========================================================="
;;
*)
echo "用法: sudo $0 {on|off}"
exit 1
;;
esac

exit 0

使用方法

  1. 把脚本保存为 set_proxy.sh ,并赋予可执行权限
1
chmod +x set_proxy.sh
  1. 启用代理
1
sudo ./set_proxy.sh on
  1. 关闭代理
1
sudo ./set_proxy.sh off

一、配置编译套件

百度网盘:[devtoolset-11](通过网盘分享的文件:devtoolset-11_0717.tar.gz
链接: https://pan.baidu.com/s/1w9GIWchuaBNbjeJrusrbSQ?pwd=0228 提取码: 0228
–来自百度网盘超级会员v4的分享)

下载后执行如下命令:

1
2
3
cp devtoolset-11_0717.tar.gz /opt/rh/
cd /opt/rh/
tar -zxvf devtoolset-11_0717.tar.gz

这样就配置好了,如果要激活环境,可以使用如下命令(每次使用都要激活)

1
2
3
source /opt/rh/devtoolset-11/enable
或者
scl enable devtoolset-11 bash

如果觉得麻烦,可以添加到环境变量,以后都不需要手动激活(不建议,貌似会把环境搞乱)

1
2
3
4
vim ~/.bashrc
#添加如下命令
source /opt/rh/devtoolset-11/enable
g++ --version

注意,如果没有添加到环境变量,使用vscode进行ssh连接时,cmake插件是查询不到devtoolset-11里面的编译套件的!

百度网盘:[devtoolset-12](通过网盘分享的文件:devtoolset-12.tar.gz
链接: https://pan.baidu.com/s/1p7hJgtuKmDqHvtM2elAx6A?pwd=0228 提取码: 0228
–来自百度网盘超级会员v4的分享),如果这个不行,可以通过 如下方式获取,添加软件源到/etc/yum.repos.d/CentOS-Base.repo

1
2
3
4
5
6
7
8
9
10
[copr:copr.fedorainfracloud.org:mlampe:devtoolset-12]
name=Copr repo for devtoolset-12 owned by mlampe
baseurl=https://download.copr.fedorainfracloud.org/results/mlampe/devtoolset-12/epel-7-$basearch/
type=rpm-md
skip_if_unavailable=True
gpgcheck=1
gpgkey=https://download.copr.fedorainfracloud.org/results/mlampe/devtoolset-12/pubkey.gpg
repo_gpgcheck=0
enabled=1
enabled_metadata=1

然后通过如下命令下载

1
yum install devtoolset-12

二、cmake安装

百度网盘:[cmake 3.28](通过网盘分享的文件:cmake-3.28.0-linux-x86_64.tar.gz
链接: https://pan.baidu.com/s/1cVjYv8z3n-uWDzl1bBitig?pwd=0228 提取码: 0228
–来自百度网盘超级会员v4的分享)

下载后执行如下命令

1
2
3
4
5
tar -zxvf cmake-3.28.0-linux-x86_64.tar.gz
cd cmake-3.28.0-linux-x86_64
sudo cp -r /root/tools/cmake-3.28.0-linux-x86_64/bin/* /usr/local/bin/
sudo cp -r /root/tools/cmake-3.28.0-linux-x86_64/share/cmake-3.28 /usr/local/share/
cmake --version

其他版本见:https://cmake.org/files/

三、clangd安装

采用conda安装clangd

1
2
3
4
5
wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh 
bash Miniconda3-latest-Linux-x86_64.sh
source ~/.bashrc
conda install -c conda-forge clang clangxx clang-tools
clangd --version

一、线程安全注解

充分利用编译器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
EventLoop* loop_ GUARDED_BY(mutex_);

class SCOPED_CAPABILITY MutexLockGuard : noncopyable
{
public:
explicit MutexLockGuard(MutexLock& mutex) ACQUIRE(mutex)
: mutex_(mutex)
{
mutex_.lock();
}

~MutexLockGuard() RELEASE()
{
mutex_.unlock();
}

private:

MutexLock& mutex_;
};
#define MutexLockGuard(x) error "Missing guard object name"

EventLoop* loop_ GUARDED_BY(mutex_); 是一种线程安全注解,通常用于标注某个成员变量的访问需要特定的锁保护。在这个例子中,loop_ 的访问需要由 mutex_ 互斥锁保护。

在编译时,静态分析工具会检查代码中是否正确地在访问 loop_ 时持有 mutex_ 锁。如果没有持有锁,工具会发出警告或错误提示。GUARDED_BY 是一种编译期的注解,对运行时行为没有直接影响。它不会生成额外的代码,也不会影响程序的性能。为开发者提供明确的线程安全约定,提醒其他人维护代码时遵守这些规则。

MutexLockGuard(x)是一种防止用法错误的技巧宏,目的是防止你写出如下代码:

1
MutexLockGuard(mutex_);

这样写会创建一个临时的 MutexLockGuard 对象,它在这一行代码结束后就被销毁,锁也会立即释放,根本起不到加锁保护作用!宏定义把 MutexLockGuard(x) 替换成 error “Missing guard object name”。如果你写了 MutexLockGuard(mutex_);,编译器会报错:“Missing guard object name”。这样强制你必须写变量名,防止误用。

SCOPED_CAPABILITY,ACQUIRE,RELEASE都是配合Clang编译器检查使用的

SCOPED_CAPABILITY 是一个线程安全注解宏,这个类是一个“作用域锁”,构造函数是“加锁”,析构函数是“解锁”。

ACQUIRE(mutex) 也是线程安全注解宏,函数会“获取”某个锁(mutex)。

RELEASE()也是线程安全注解宏,函数会“释放”锁(mutex)。

通过Clang编译时检查,见:Thread Safety Analysis: https://clang.llvm.org/docs/ThreadSafetyAnalysis.html

二、事件处理模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
while (!quit_)
{
activeChannels_.clear();
//activeChannels_是一个传入传出参数,保存着有事件发生的Channel
pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
++iteration_;
// TODO sort channel by priority
eventHandling_ = true;
//遍历activeChannels_,调用Channel的handleEvent方法
for (Channel* channel : activeChannels_)
{
currentActiveChannel_ = channel;
currentActiveChannel_->handleEvent(pollReturnTime_);
}
currentActiveChannel_ = NULL;
eventHandling_ = false;
doPendingFunctors();
}
// epoll监听
Timestamp EPollPoller::poll(int timeoutMs, ChannelList* activeChannels)
{
LOG_TRACE << "fd total count " << channels_.size();
int numEvents = ::epoll_wait(epollfd_,
&*events_.begin(),
static_cast<int>(events_.size()),
timeoutMs);
int savedErrno = errno;
Timestamp now(Timestamp::now());
if (numEvents > 0)
{
LOG_TRACE << numEvents << " events happened";
//将监听到的全部事件都转移到activeChannels,单一职责,poller只负责 I/O 复用,事件处理是EventLoop的职责
fillActiveChannels(numEvents, activeChannels);
if (implicit_cast<size_t>(numEvents) == events_.size())
{
events_.resize(events_.size()*2);
}
}
else if (numEvents == 0)
{
LOG_TRACE << "nothing happened";
}
else
{
// error happens, log uncommon ones
if (savedErrno != EINTR)
{
errno = savedErrno;
LOG_SYSERR << "EPollPoller::poll()";
}
}
return now;
}

void EPollPoller::fillActiveChannels(int numEvents,
ChannelList* activeChannels) const
{
assert(implicit_cast<size_t>(numEvents) <= events_.size());
for (int i = 0; i < numEvents; ++i)
{
Channel* channel = static_cast<Channel*>(events_[i].data.ptr);
#ifndef NDEBUG
int fd = channel->fd();
ChannelMap::const_iterator it = channels_.find(fd);
assert(it != channels_.end());
assert(it->second == channel);
#endif
channel->set_revents(events_[i].events);
activeChannels->push_back(channel);
}
}

Poller 的职责是 I/O 复用,而 EventLoop 的职责是事件分发。通过activeChannels_将两者分开,可以使系统结构更清晰、耦合度更低、更易于扩展。

1
2
3
4
if (implicit_cast<size_t>(numEvents) == events_.size())
{
events_.resize(events_.size()*2);
}

epoll返回的数量等于我们传入的最大数量,说明epoll内部其实有更多的事件触发,只不过受限于我们传入的static_cast<int>(events_.size())无法全部传出,所以需要扩容方便后续处理更多事件

1
2
3
4
5
6
7
8
9
eventHandling_ = true;
//遍历activeChannels_,调用Channel的handleEvent方法
for (Channel* channel : activeChannels_)
{
currentActiveChannel_ = channel;
currentActiveChannel_->handleEvent(pollReturnTime_);
}
currentActiveChannel_ = NULL;
eventHandling_ = false;

eventHandling作为一个状态标志,防止在处理activeChannels_中的事件时,修改activeChannels_中的信息导致出差。不能在遍历一个容器的同时修改它

如在removeChannel函数中

1
2
3
4
5
6
7
8
9
10
11
12
void EventLoop::removeChannel(Channel* channel)
{
assert(channel->ownerLoop() == this);
assertInLoopThread();
if (eventHandling_)
{
assert(currentActiveChannel_ == channel ||
std::find(activeChannels_.begin(), activeChannels_.end(), channel) ==
activeChannels_.end());
}
poller_->removeChannel(channel);
}

在取出一个channel时,必须要确保当前活跃的channel和要删除channel一致或者不在activeChannels_中的chennel,如果两个channel不一致且存在activeChannels_中,就可能会发生异常情况,在

1
2
3
4
5
for (Channel* channel : activeChannels_)
{
currentActiveChannel_ = channel;
currentActiveChannel_->handleEvent(pollReturnTime_);
}

中执行到被删除的channel时就可能会发生错误(空指针)

三、职责单一原则

muduo强调类职责单一原则,其他类通过runInLoop添加到EventLoop中的pendingFunctors_,这个**pendingFunctors_**在EventLoop处理完全部epoll事件后统一处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
while (!quit_)
{
activeChannels_.clear();
// activeChannels_是一个传入传出参数,保存着有事件发生的Channel
pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
++iteration_;
if (Logger::logLevel() <= Logger::TRACE)
{
printActiveChannels();
}
// TODO sort channel by priority
eventHandling_ = true;
// 遍历activeChannels_,调用Channel的handleEvent方法
for (Channel* channel : activeChannels_)
{
currentActiveChannel_ = channel;
currentActiveChannel_->handleEvent(pollReturnTime_);
}
currentActiveChannel_ = NULL;
eventHandling_ = false;
doPendingFunctors();
}

void EventLoop::doPendingFunctors()
{
std::vector<Functor> functors;
callingPendingFunctors_ = true;

{
MutexLockGuard lock(mutex_);
functors.swap(pendingFunctors_);
}

for (const Functor& functor : functors)
{
functor();
}
callingPendingFunctors_ = false;
}

如将监听描述符上树添加到EventLoop

1
2
//将 Acceptor::listen() 的调用任务放入EventLoop 的待执行队列中,这确保了所有和 EventLoop 相关的操作都在同一个I/O线程中执行,避免了锁竞争
loop_->runInLoop(std::bind(&Acceptor::listen, get_pointer(acceptor_)));

如关闭连接等等

1
2
loop_->runInLoop(std::bind(&TcpServer::removeConnectionInLoop, this, conn));
loop_->runInLoop(std::bind(&TcpConnection::shutdownInLoop, this));

所有的事件操作都需要在EventLoop中执行

四、唤醒EventLoop线程

1
2
int wakeupFd_;
std::unique_ptr<Channel> wakeupChannel_;

muduo将wakeupChannel_添加到每一个EventLoop中,这样在有事件发生时,能够及时的唤醒对应的线程,防止阻塞

五、定时任务

muduo库还支持定时任务,在EventLoop初始化的时候,初始化一个timerQueue_,这个容器记录能够记录着各个定时任务,在TimerQueue初始化的时候,也会创建一个timerfd_放到EventLoop的监听当中,当最近的定时任务到期时,timerfd 会变为可读,EventLooppoll 调用中被唤醒,并将 timerfdChannel_ 作为一个活跃事件进行处理,最终调用其读回调,也就是 TimerQueue::handleRead,从而执行到期的定时任务。将时间事件转换为一个文件描述符的 I/O 事件,典型的应用有:定时发送心跳,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
//TimeQueue.h
private:
// 按到期时间排序的 set,用于快速查找下一个要到期的定时器
typedef std::pair<Timestamp, Timer*> Entry;
typedef std::set<Entry> TimerList;

// 按Timer* 地址排序的 set,用于快速取消(删除)一个定时器
typedef std::pair<Timer*, int64_t> ActiveTimer;
typedef std::set<ActiveTimer> ActiveTimerSet;

TimerList timers_;//待办事项
ActiveTimerSet activeTimers_;

//TimeQueue::Insert
bool TimerQueue::insert(Timer* timer)
{
loop_->assertInLoopThread();
assert(timers_.size() == activeTimers_.size());
bool earliestChanged = false;
Timestamp when = timer->expiration();
TimerList::iterator it = timers_.begin();
// 检查新插入的定时器是否会成为新的“最早到期”的定时器
if (it == timers_.end() || when < it->first)
{
earliestChanged = true;
}

// 同时插入到两个 set 中
timers_.insert(Entry(when, timer));
activeTimers_.insert(ActiveTimer(timer, timer->sequence()));

assert(timers_.size() == activeTimers_.size());
return earliestChanged;
}

//当 timerfd 触发,handleRead 被调用。
void TimerQueue::handleRead()
{
// 当前时间
Timestamp now(Timestamp::now());
readTimerfd(timerfd_, now); // 清除事件

// 1. 获取所有已到期的定时器(为什么不是一个,而是全部?这是因为在处理其他IO事件时,时间仍在流失,可能有多个任务都到期了)
std::vector<Entry> expired = getExpired(now);

callingExpiredTimers_ = true;
cancelingTimers_.clear();

// 2. 执行回调
for (const Entry& it : expired)
{
it.second->run();
}
callingExpiredTimers_ = false;

// 3. 重置周期性任务
reset(expired, now);
}

std::vector<TimerQueue::Entry> TimerQueue::getExpired(Timestamp now)
{
std::vector<Entry> expired;
// 构造一个哨兵值,时间为 now,指针为一个最大值
Entry sentry(now, reinterpret_cast<Timer*>(UINTPTR_MAX));
// lower_bound 会 O(log N) 找到第一个到期时间 > now 的迭代器
TimerList::iterator end = timers_.lower_bound(sentry);

// 从头到 end 迭代器之间的所有元素都是已到期的
std::copy(timers_.begin(), end, back_inserter(expired));
timers_.erase(timers_.begin(), end);

// 从 activeTimers_ 中也移除这些定时器
for (const Entry& it : expired)
{
ActiveTimer timer(it.second, it.second->sequence());
size_t n = activeTimers_.erase(timer);
assert(n == 1); (void)n;
}
return expired;
}

void TimerQueue::reset(const std::vector<Entry>& expired, Timestamp now)
{
Timestamp nextExpire;

// 遍历过期的定时器,如果定时器是重复的,则重启定时器,否则删除定时器
for (const Entry& it : expired)
{
ActiveTimer timer(it.second, it.second->sequence());
// 如果定时器是重复的,则重启定时器,否则删除定时器
if (it.second->repeat()
&& cancelingTimers_.find(timer) == cancelingTimers_.end())
{
it.second->restart(now);
insert(it.second);
}
// 如果定时器不是重复的,则删除定时器
else
{
delete it.second;
}
}

// 如果处理后的定时任务不为空,则设置下一次超时时间
if (!timers_.empty())
{
nextExpire = timers_.begin()->second->expiration();
}

//确保下一次超时时间有效
if (nextExpire.valid())
{
// 设置下一次超时时间
resetTimerfd(timerfd_, nextExpire);
}
}

//设置下一次到期时间
void resetTimerfd(int timerfd, Timestamp expiration)
{
// wake up loop by timerfd_settime()
struct itimerspec newValue;
struct itimerspec oldValue;
memZero(&newValue, sizeof newValue);
memZero(&oldValue, sizeof oldValue);
newValue.it_value = howMuchTimeFromNow(expiration);
//调用 timerfd_settime 系统调用,让 timerfd_ 在 expiration 这个时刻变为可读状态
int ret = ::timerfd_settime(timerfd, 0, &newValue, &oldValue);
if (ret)
{
LOG_SYSERR << "timerfd_settime()";
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//EventLoop.h
std::unique_ptr<TimerQueue> timerQueue_;

timerQueue_(new TimerQueue(this)),

TimerQueue::TimerQueue(EventLoop* loop)
: loop_(loop),
timerfd_(createTimerfd()),
timerfdChannel_(loop, timerfd_),
timers_(),
callingExpiredTimers_(false)
{
timerfdChannel_.setReadCallback(
std::bind(&TimerQueue::handleRead, this));
timerfdChannel_.enableReading();
}

六、任务安全处理

muduo在执行任务时非常注意安全问题,通常都会使用一个原子变量标注,防止在执行任务的时候其他类修改任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
//EventLoop.cc
eventHandling_(false),
callingPendingFunctors_(false),
//执行前设置为true
eventHandling_ = true;
// 遍历activeChannels_,调用Channel的handleEvent方法
for (Channel* channel : activeChannels_)
{
currentActiveChannel_ = channel;
currentActiveChannel_->handleEvent(pollReturnTime_);
}
currentActiveChannel_ = NULL;
//结束后设置为false
eventHandling_ = false;


//EventLoop.cc
//开始前设置为ture
callingPendingFunctors_ = true;
{
MutexLockGuard lock(mutex_);
functors.swap(pendingFunctors_);
}

for (const Functor& functor : functors)
{
functor();
}
//结束时设置为false
callingPendingFunctors_ = false;


//定时任务处理,TimerQueue.cc
callingExpiredTimers_ = true;
cancelingTimers_.clear();
// safe to callback outside critical section
for (const Entry& it : expired)
{
it.second->run();
}
callingExpiredTimers_ = false;

一、Buffer缓冲区设计

1
2
3
4
5
6
+-------------------+------------------+------------------+
| prependable bytes | readable bytes | writable bytes |
| | (CONTENT) | |
+-------------------+------------------+------------------+
| | | |
0 <= readerIndex <= writerIndex <= size
  1. prependable bytes:长度字段固定8字节
  2. readable bytes:可读缓冲区
  3. writable bytes:可写缓冲区
1
2
3
4
5
6
7
8
9
10
11
12
static const size_t kCheapPrepend = 8;
static const size_t kInitialSize = 1024;

explicit Buffer(size_t initialSize = kInitialSize)
: buffer_(kCheapPrepend + initialSize),
readerIndex_(kCheapPrepend),
writerIndex_(kCheapPrepend)
{
assert(readableBytes() == 0);
assert(writableBytes() == initialSize);
assert(prependableBytes() == kCheapPrepend);
}

在buffer初始化阶段,buffer的默认大小是8+1024,将可读指针和可写指针移动到同一个位置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
//NOTE 用最少的 read 系统调用次数,读取尽可能多的数据,以减少用户态和内核态之间的切换开销
ssize_t Buffer::readFd(int fd, int* savedErrno)
{
char extrabuf[65536];
struct iovec vec[2];
// writableBytes() 返回当前 inputBuffer_ 内部 std::vector<char> 中 writerIndex_
// 之后剩余的空闲空间大小
const size_t writable = writableBytes();
// 第一块缓冲区:指向 inputBuffer_ 内部的可写空间 (begin() + writerIndex_),长度为
// writableBytes()。
vec[0].iov_base = begin() + writerIndex_;
vec[0].iov_len = writable;
// 第二块缓冲区:指向一个在栈上临时分配的、大小为 64KB 的备用缓冲区 (extrabuf)。
vec[1].iov_base = extrabuf;
vec[1].iov_len = sizeof extrabuf;
// 如果 inputBuffer_ 内部的可写空间足够大,则只使用第一块缓冲区,否则会同时使用第二块缓冲区
const int iovcnt = (writable < sizeof extrabuf) ? 2 : 1;
// readv 会尝试一次性把 socket 接收缓冲区的数据同时读到这两块内存中。
const ssize_t n = sockets::readv(fd, vec, iovcnt);
if (n < 0)
{
*savedErrno = errno;
}
else if (implicit_cast<size_t>(n) <= writable)
{
writerIndex_ += n;
}
else
{
writerIndex_ = buffer_.size();
append(extrabuf, n - writable);
}
return n;
}

在从缓冲区中读数据时,使用readv函数,这个函数可以配置两个缓冲区,当第一个缓冲区写满时,会写到第二个缓冲区。

写入数据处理,以HttpResponse::appendToBuffer为例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
void HttpResponse::appendToBuffer(Buffer* output) const
{
char buf[32];
snprintf(buf, sizeof buf, "HTTP/1.1 %d ", statusCode_);
output->append(buf);
output->append(statusMessage_);
output->append("\r\n");

if (closeConnection_)
{
output->append("Connection: close\r\n");
}
else
{
snprintf(buf, sizeof buf, "Content-Length: %zd\r\n", body_.size());
output->append(buf);
output->append("Connection: Keep-Alive\r\n");
}

for (const auto& header : headers_)
{
output->append(header.first);
output->append(": ");
output->append(header.second);
output->append("\r\n");
}

output->append("\r\n");
output->append(body_);
}


void append(const StringPiece& str)
{
append(str.data(), str.size());
}

void append(const char* /*restrict*/ data, size_t len)
{
ensureWritableBytes(len);
std::copy(data, data + len, beginWrite());
hasWritten(len);
}

//关键时makeSpace函数
void ensureWritableBytes(size_t len)
{
if (writableBytes() < len)
{
makeSpace(len);
}
assert(writableBytes() >= len);
}

void makeSpace(size_t len)
{
//如果剩余空间不足,则重新分配内存
if (writableBytes() + prependableBytes() < len + kCheapPrepend)
{
buffer_.resize(writerIndex_ + len);
}
else
{
// 将可读数据从当前位置移动到 buffer 的起始位置,为新数据腾出空间。
assert(kCheapPrepend < readerIndex_);
size_t readable = readableBytes();
// 将可读数据从当前位置移动到 buffer 的起始位置,为新数据腾出空间。
std::copy(begin() + readerIndex_, begin() + writerIndex_, begin() + kCheapPrepend);
readerIndex_ = kCheapPrepend;
writerIndex_ = readerIndex_ + readable;
assert(readable == readableBytes());
}
}

// 返回可写字节数
size_t writableBytes() const
{
return buffer_.size() - writerIndex_;
}
// 返回当前读取位置到buffer起始位置的距离
size_t prependableBytes() const
{
return readerIndex_;
}

muduo将buffer中的内存设计为可移动的,即长度字段固定,但可读和可写缓冲区大小可以调节,原因是这样的,readerIndex_在读取数据的时候是会往右边移动的,readerIndex_和writerIndex之间的可读区域其实是一个滑动窗口,在向右移动的过程中,左边就会空出一部分内存,也就是kCheapPrepend到readerIndex_之间的那片内存,这个是可利用的,所以当发现writerIndex到末尾的内存不够用时,会左边检查空出的那部分内存,如果两个内存加起来够用,就将可读区域的内存往左边移,这样右边就能空出更多内存,这样就能插入数据了。如果内存真的不够,就buffer_.resize(writerIndex_ + len);重新分配内存,通过内存复用避免频繁创建新的内存

二、零拷贝添加长度

在实现网络协议是,通常需要在数据包内容前面添加长度字段,经典的协议为:[4字节长度][消息体]

常规的做法是:先序列化消息体,得到长度 N,然后申请 4+N 的空间,先把长度 N 写进去,再把消息体拷贝进去。这个过程至少需要一次额外的拷贝。

muduo 的做法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
void ProtobufCodecLite::fillEmptyBuffer(muduo::net::Buffer* buf,
const google::protobuf::Message& message)
{
assert(buf->readableBytes() == 0);

buf->append(tag_);

int byte_size = serializeToBuffer(message, buf);

int32_t checkSum = checksum(buf->peek(), static_cast<int>(buf->readableBytes()));
buf->appendInt32(checkSum);
assert(buf->readableBytes() == tag_.size() + byte_size + kChecksumLen);
(void)byte_size;
int32_t len = sockets::hostToNetwork32(static_cast<int32_t>(buf->readableBytes()));
//上面是填入响应数据,prepend是在长度字段填入响应数据长度
buf->prepend(&len, sizeof len);
}
// 将转换成网络字节序之后的4个字节,添加到 Buffer 的最前端
void prepend(const void* /*restrict*/ data, size_t len)
{
assert(len <= prependableBytes());
readerIndex_ -= len;
const char* d = static_cast<const char*>(data);
std::copy(d, d + len, begin() + readerIndex_);
}
  1. 直接在 writerIndex_ 处(writable 区域)序列化消息体。
  2. 得到消息体长度 N 后,利用 prependable 空间,在 readerIndex_ 之前写入4字节的长度 N,然后将 readerIndex_ 向左移动4个字节。
  3. 整个过程没有 memmove 或额外的内存拷贝

三、数据操作零拷贝

当上层逻辑(比如协议解析)需要检查缓冲区中的数据时,它会调用 peek() 方法。这个方法直接返回 begin() + readerIndex_ 的指针,让用户可以直接访问内部存储。

如在Http协议解析的时候:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
if (state_ == kExpectRequestLine)
{
const char* crlf = buf->findCRLF();
if (crlf)
{
ok = processRequestLine(buf->peek(), crlf);
if (ok)
{
request_.setReceiveTime(receiveTime);
buf->retrieveUntil(crlf + 2);
state_ = kExpectHeaders;
}
else
{
hasMore = false;
}
}
else
{
hasMore = false;
}
}


bool HttpContext::processRequestLine(const char* begin, const char* end)
{
bool succeed = false;
const char* start = begin;
const char* space = std::find(start, end, ' ');
if (space != end && request_.setMethod(start, space))
{
start = space+1;
space = std::find(start, end, ' ');
if (space != end)
{
const char* question = std::find(start, space, '?');
if (question != space)
{
request_.setPath(start, question);
request_.setQuery(question, space);
}
else
{
request_.setPath(start, space);
}
start = space+1;
succeed = end-start == 8 && std::equal(start, end-1, "HTTP/1.");
if (succeed)
{
if (*(end-1) == '1')
{
request_.setVersion(HttpRequest::kHttp11);
}
else if (*(end-1) == '0')
{
request_.setVersion(HttpRequest::kHttp10);
}
else
{
succeed = false;
}
}
}
}
return succeed;
}

processRequestLine 在分割出 method, path 等部分后,传递给 HttpRequestset 方法的是一对指向 Buffer 内部内存的 const char* 指针HttpRequestset 方法内部才根据这对指针创建 std::string。这样做的好处是,解析本身是零拷贝的,只有在确认需要存储时才发生一次内存分配。这在性能上通常优于在解析过程中创建多个临时 std::string 对象。

Bufferreadable 区域的数据已经被上层逻辑完全处理或转发后,就会调用 retrieve() 来更新缓冲区的状态,以便后续的内存复用。**利用retrieve移动指针,减少删除和清理任何内存的消耗。**如:buf->retrieveUntil(crlf + 2);,

TcpConnection类是用于管理TCP连接的类,它负责管理一个TCP连接的读写事件, 并提供接口来设置读写回调、关闭回调、错误回调等。包含一个EventLoop对象,一个文件描述符,一个事件类型,一个回调函数。

一、数据发送策略

muduoTcpConnection 采用两种发送方式结合的策略。首先尝试直接发送,如果 outputBuffer_ 为空,就直接将数据写入 socket 内核缓冲区,避免了不必要的内存拷贝。当直接发送无法一次性完成时(通常因为内核缓冲区已满),剩余的数据会被存入应用层的 outputBuffer_。此时的发送流程从“主动发送”切换到了“事件驱动”模式。因为在使用outputBuffer时,会调用 channel_->enableWriting();关注写事件,后面通过handleWrite函数处理outputBuffer中的数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
void TcpConnection::sendInLoop(const void* data, size_t len)
{
loop_->assertInLoopThread();
ssize_t nwrote = 0;
size_t remaining = len;
bool faultError = false;
if (state_ == kDisconnected)
{
LOG_WARN << "disconnected, give up writing";
return;
}
// 如果输出缓冲区为空,并且 channel 没有在监听可写事件,尝试直接发送
if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0)
{
nwrote = sockets::write(channel_->fd(), data, len);
if (nwrote >= 0)
{
remaining = len - nwrote;
// 如果数据全部写入,则调用写完成回调
if (remaining == 0 && writeCompleteCallback_)
{
loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this()));
}
}
// 否则就是写入失败
else
{
nwrote = 0;
// 判断是否是缓冲区写满了,如果不是缓冲区写满,则记录错误
if (errno != EWOULDBLOCK)
{
LOG_SYSERR << "TcpConnection::sendInLoop";
if (errno == EPIPE || errno == ECONNRESET)
{
faultError = true;
}
}
}
}
//如果数据没有一次写完,说明缓冲区满了,需要将数据追加到输出队列中
assert(remaining <= len);
if (!faultError && remaining > 0)
{
// 计算已经写入的数据长度
size_t oldLen = outputBuffer_.readableBytes();
// 如果已经写入的和剩余的字符串长度大于等于高水位,并且已经写入的小于最高水位线,则调用高水位线回调
if (oldLen + remaining >= highWaterMark_ && oldLen < highWaterMark_ &&
highWaterMarkCallback_)
{
// 调用高水位线回调,处理高水位线事件
loop_->queueInLoop(
std::bind(highWaterMarkCallback_, shared_from_this(), oldLen + remaining));
}
// 将剩余数据放入 outputBuffer_
outputBuffer_.append(static_cast<const char*>(data) + nwrote, remaining);
// 如果当前没有写事件,则开启写事件
if (!channel_->isWriting())
{
//当 outputBuffer_ 中有数据积压,并且内核的发送缓冲区有可用空间时,EventLoop 会触发 Channel 的可写事件,最终调用 handleWrite()
channel_->enableWriting();
}
}
}

void TcpConnection::handleWrite()
{
loop_->assertInLoopThread();
if (channel_->isWriting())
{
// 将 outputBuffer_ 中的数据写入到 socket 中
ssize_t n =
sockets::write(channel_->fd(), outputBuffer_.peek(), outputBuffer_.readableBytes());
if (n > 0)
{
// 从 outputBuffer_ 中移除已经写入的数据
outputBuffer_.retrieve(n);
// 如果 outputBuffer_ 中没有数据了,则关闭写事件
if (outputBuffer_.readableBytes() == 0)
{
channel_->disableWriting();
// 如果写完成回调不为空,则调用写完成回调
if (writeCompleteCallback_)
{
loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this()));
}
// 如果连接状态为 kDisconnecting,则关闭连接
if (state_ == kDisconnecting)
{
shutdownInLoop();
}
}
}
else
{
LOG_SYSERR << "TcpConnection::handleWrite";
// if (state_ == kDisconnecting)
// {
// shutdownInLoop();
// }
}
}
else
{
LOG_TRACE << "Connection fd = " << channel_->fd() << " is down, no more writing";
}
}

在数据存入 outputBuffer_ 时,会检查缓冲区的水位。如果“已有数据 + 新增数据”的总量超过了设定的高水位线 highWaterMark_,就会触发 highWaterMarkCallback_ 回调,但是这个回调仅仅是通知muduo 库本身不会擅自主张的丢弃数据,还是需要存入outputBuffer中,确保数据的可靠发送。具体的流量控制,需要在highWaterMarkCallback_中配置。

值得注意的是,moduo库的epoll并没有使用ET模式,而是使用LT模式,在触发回调执行handlewrite函数时,并没有使用循环,而是只发送一次,然后在全部发送后调用channel_->disableWriting();取消写事件,如果outputbuffer一次性没有写完,epoll仍会关注这个可写事件,因为没有调用disableWriting,所以会在下一次epoll->wait函数的时候再次触发,知道数据全部写完并调用disableWriting,这样设计的原因是通过 EventLoop 实现了公平调度,避免了单个连接长时间霸占 I/O 线程,导致系统“卡死”

二、Boost::any

boost::any context_ 的作用是:允许用户将任意类型的、自定义的数据附加到一个 TcpConnection 对象上,作为一个与该连接绑定的“上下文”或“状态管理器”。boost::any (在 C++17 中已被标准化为 std::any) 是一个可以持有任意类型单个值的类型安全容器。你可以把它想象成一个“万能盒子”,什么都能装,但在取出来的时候,你必须明确知道里面装的是什么类型,否则会抛出异常。这比使用不安全的 void* 指针要好得多

muduo 作为一个通用的网络库,它只负责管理 TCP 连接、收发字节流这些底层事务。它完全不知道上层的业务逻辑是什么。

  • 对于一个 HTTP 服务器,每个连接可能需要维护一个 HttpContext 对象,用来解析 HTTP 请求的状态。
  • 对于一个 RPC 服务器,每个连接可能需要维护一个 RpcChannel 对象,用来处理 RPC 调用。
  • 对于一个游戏服务器,每个连接可能需要关联一个 PlayerSession 对象,来存储玩家信息。

如果 muduo 要为每一种应用都去修改 TcpConnection 类,添加 HttpContext* httpContext_Player* player_ 这样的成员,那这个库就失去了通用性。

context_ 就是为了解决这个问题而生的。它提供了一个统一的、非侵入式的接口,让用户可以把自己的业务对象“挂”在 TcpConnection 上。

在 HTTP 服务器中 (HttpServer.cc):

当一个新的 TCP 连接建立时,HttpServer 会创建一个 HttpContext 对象,并通过 conn->setContext() 将它存入连接的 context_ 中。在后续的 onMessage 回调中,服务器会通过 conn->getMutableContext() 取出这个 HttpContext 对象,用它来持续解析同一个连接上发来的数据流。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
void HttpServer::onConnection(const TcpConnectionPtr& conn)
{
if (conn->connected())
{
// 新连接建立时,创建一个 HttpContext 并附加到连接上
conn->setContext(HttpContext());
}
}

void HttpServer::onMessage(const TcpConnectionPtr& conn,
Buffer* buf,
Timestamp receiveTime)
{
// 从连接中取出之前存入的 HttpContext
HttpContext* context = boost::any_cast<HttpContext>(conn->getMutableContext());

if (!context->parseRequest(buf, receiveTime))
{
// ...
}
// ...
}

在 RPC 服务器中 (RpcServer.cc):

同样,当新连接建立时,RpcServer 会创建一个 RpcChannel 对象,并将其存入 context_

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void RpcServer::onConnection(const TcpConnectionPtr& conn)
{
// ...
if (conn->connected())
{
RpcChannelPtr channel(new RpcChannel(conn));
channel->setServices(&services_);
conn->setMessageCallback(
std::bind(&RpcChannel::onMessage, get_pointer(channel), _1, _2, _3));
// 将 RpcChannelPtr 附加到连接上
conn->setContext(channel);
}
// ...
}

TcpConnection 类不需要知道任何关于上层业务(HTTP, RPC, Game…)的细节。它只负责提供一个“插座”。用户不需要为了添加自定义状态而去继承 TcpConnection 或修改库源码。使用 boost::any_cast 来获取数据,如果在运行时类型不匹配,会抛出异常,这比使用 void* 进行不安全的 static_cast 要健壮得多

具体使用示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
#include "muduo/base/Logging.h"
#include "muduo/net/EventLoop.h"
#include "muduo/net/http/HttpRequest.h"
#include "muduo/net/http/HttpResponse.h"
#include "muduo/net/http/HttpServer.h"

#include <iostream>
#include <map>

using namespace muduo;
using namespace muduo::net;

extern char favicon[555];
bool benchmark = false;

void onRequest(const HttpRequest& req, HttpResponse* resp)
{
std::cout << "Headers " << req.methodString() << " " << req.path() << std::endl;
if (!benchmark)
{
const std::map<string, string>& headers = req.headers();
for (const auto& header : headers)
{
std::cout << header.first << ": " << header.second << std::endl;
}
}

if (req.path() == "/")
{
resp->setStatusCode(HttpResponse::k200Ok);
resp->setStatusMessage("OK");
resp->setContentType("text/html");
resp->addHeader("Server", "Muduo");
string now = Timestamp::now().toFormattedString();
resp->setBody("<html><head><title>This is title</title></head>"
"<body><h1>Hello</h1>Now is " +
now + "</body></html>");
}
else if (req.path() == "/favicon.ico")
{
resp->setStatusCode(HttpResponse::k200Ok);
resp->setStatusMessage("OK");
resp->setContentType("image/png");
resp->setBody(string(favicon, sizeof favicon));
}
else if (req.path() == "/hello")
{
resp->setStatusCode(HttpResponse::k200Ok);
resp->setStatusMessage("OK");
resp->setContentType("text/plain");
resp->addHeader("Server", "Muduo");
resp->setBody("hello, world!\n");
}
else
{
resp->setStatusCode(HttpResponse::k404NotFound);
resp->setStatusMessage("Not Found");
resp->setCloseConnection(true);
}
}

int main(int argc, char* argv[])
{
int numThreads = 0;
if (argc > 1)
{
benchmark = true;
Logger::setLogLevel(Logger::WARN);
numThreads = atoi(argv[1]);
}
EventLoop loop;
HttpServer server(&loop, InetAddress(8000), "dummy");
server.setHttpCallback(onRequest);
server.setThreadNum(numThreads);
server.start();
loop.loop();
}

char favicon[555] = {
'\x89', 'P', 'N', 'G', '\xD', '\xA', '\x1A', '\xA', '\x0', '\x0', '\x0', '\xD',
'I', 'H', 'D', 'R', '\x0', '\x0', '\x0', '\x10', '\x0', '\x0', '\x0', '\x10',
'\x8', '\x6', '\x0', '\x0', '\x0', '\x1F', '\xF3', '\xFF', 'a', '\x0', '\x0', '\x0',
'\x19', 't', 'E', 'X', 't', 'S', 'o', 'f', 't', 'w', 'a', 'r',
'e', '\x0', 'A', 'd', 'o', 'b', 'e', '\x20', 'I', 'm', 'a', 'g',
'e', 'R', 'e', 'a', 'd', 'y', 'q', '\xC9', 'e', '\x3C', '\x0', '\x0',
'\x1', '\xCD', 'I', 'D', 'A', 'T', 'x', '\xDA', '\x94', '\x93', '9', 'H',
'\x3', 'A', '\x14', '\x86', '\xFF', '\x5D', 'b', '\xA7', '\x4', 'R', '\xC4', 'm',
'\x22', '\x1E', '\xA0', 'F', '\x24', '\x8', '\x16', '\x16', 'v', '\xA', '6', '\xBA',
'J', '\x9A', '\x80', '\x8', 'A', '\xB4', 'q', '\x85', 'X', '\x89', 'G', '\xB0',
'I', '\xA9', 'Q', '\x24', '\xCD', '\xA6', '\x8', '\xA4', 'H', 'c', '\x91', 'B',
'\xB', '\xAF', 'V', '\xC1', 'F', '\xB4', '\x15', '\xCF', '\x22', 'X', '\x98', '\xB',
'T', 'H', '\x8A', 'd', '\x93', '\x8D', '\xFB', 'F', 'g', '\xC9', '\x1A', '\x14',
'\x7D', '\xF0', 'f', 'v', 'f', '\xDF', '\x7C', '\xEF', '\xE7', 'g', 'F', '\xA8',
'\xD5', 'j', 'H', '\x24', '\x12', '\x2A', '\x0', '\x5', '\xBF', 'G', '\xD4', '\xEF',
'\xF7', '\x2F', '6', '\xEC', '\x12', '\x20', '\x1E', '\x8F', '\xD7', '\xAA', '\xD5', '\xEA',
'\xAF', 'I', '5', 'F', '\xAA', 'T', '\x5F', '\x9F', '\x22', 'A', '\x2A', '\x95',
'\xA', '\x83', '\xE5', 'r', '9', 'd', '\xB3', 'Y', '\x96', '\x99', 'L', '\x6',
'\xE9', 't', '\x9A', '\x25', '\x85', '\x2C', '\xCB', 'T', '\xA7', '\xC4', 'b', '1',
'\xB5', '\x5E', '\x0', '\x3', 'h', '\x9A', '\xC6', '\x16', '\x82', '\x20', 'X', 'R',
'\x14', 'E', '6', 'S', '\x94', '\xCB', 'e', 'x', '\xBD', '\x5E', '\xAA', 'U',
'T', '\x23', 'L', '\xC0', '\xE0', '\xE2', '\xC1', '\x8F', '\x0', '\x9E', '\xBC', '\x9',
'A', '\x7C', '\x3E', '\x1F', '\x83', 'D', '\x22', '\x11', '\xD5', 'T', '\x40', '\x3F',
'8', '\x80', 'w', '\xE5', '3', '\x7', '\xB8', '\x5C', '\x2E', 'H', '\x92', '\x4',
'\x87', '\xC3', '\x81', '\x40', '\x20', '\x40', 'g', '\x98', '\xE9', '6', '\x1A', '\xA6',
'g', '\x15', '\x4', '\xE3', '\xD7', '\xC8', '\xBD', '\x15', '\xE1', 'i', '\xB7', 'C',
'\xAB', '\xEA', 'x', '\x2F', 'j', 'X', '\x92', '\xBB', '\x18', '\x20', '\x9F', '\xCF',
'3', '\xC3', '\xB8', '\xE9', 'N', '\xA7', '\xD3', 'l', 'J', '\x0', 'i', '6',
'\x7C', '\x8E', '\xE1', '\xFE', 'V', '\x84', '\xE7', '\x3C', '\x9F', 'r', '\x2B', '\x3A',
'B', '\x7B', '7', 'f', 'w', '\xAE', '\x8E', '\xE', '\xF3', '\xBD', 'R', '\xA9',
'd', '\x2', 'B', '\xAF', '\x85', '2', 'f', 'F', '\xBA', '\xC', '\xD9', '\x9F',
'\x1D', '\x9A', 'l', '\x22', '\xE6', '\xC7', '\x3A', '\x2C', '\x80', '\xEF', '\xC1', '\x15',
'\x90', '\x7', '\x93', '\xA2', '\x28', '\xA0', 'S', 'j', '\xB1', '\xB8', '\xDF', '\x29',
'5', 'C', '\xE', '\x3F', 'X', '\xFC', '\x98', '\xDA', 'y', 'j', 'P', '\x40',
'\x0', '\x87', '\xAE', '\x1B', '\x17', 'B', '\xB4', '\x3A', '\x3F', '\xBE', 'y', '\xC7',
'\xA', '\x26', '\xB6', '\xEE', '\xD9', '\x9A', '\x60', '\x14', '\x93', '\xDB', '\x8F', '\xD',
'\xA', '\x2E', '\xE9', '\x23', '\x95', '\x29', 'X', '\x0', '\x27', '\xEB', 'n', 'V',
'p', '\xBC', '\xD6', '\xCB', '\xD6', 'G', '\xAB', '\x3D', 'l', '\x7D', '\xB8', '\xD2',
'\xDD', '\xA0', '\x60', '\x83', '\xBA', '\xEF', '\x5F', '\xA4', '\xEA', '\xCC', '\x2', 'N',
'\xAE', '\x5E', 'p', '\x1A', '\xEC', '\xB3', '\x40', '9', '\xAC', '\xFE', '\xF2', '\x91',
'\x89', 'g', '\x91', '\x85', '\x21', '\xA8', '\x87', '\xB7', 'X', '\x7E', '\x7E', '\x85',
'\xBB', '\xCD', 'N', 'N', 'b', 't', '\x40', '\xFA', '\x93', '\x89', '\xEC', '\x1E',
'\xEC', '\x86', '\x2', 'H', '\x26', '\x93', '\xD0', 'u', '\x1D', '\x7F', '\x9', '2',
'\x95', '\xBF', '\x1F', '\xDB', '\xD7', 'c', '\x8A', '\x1A', '\xF7', '\x5C', '\xC1', '\xFF',
'\x22', 'J', '\xC3', '\x87', '\x0', '\x3', '\x0', 'K', '\xBB', '\xF8', '\xD6', '\x2A',
'v', '\x98', 'I', '\x0', '\x0', '\x0', '\x0', 'I', 'E', 'N', 'D', '\xAE',
'B', '\x60', '\x82',
};

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
HttpServer::HttpServer(EventLoop* loop,
const InetAddress& listenAddr,
const string& name,
TcpServer::Option option)
: server_(loop, listenAddr, name, option),
httpCallback_(detail::defaultHttpCallback)
{
server_.setConnectionCallback(
std::bind(&HttpServer::onConnection, this, _1));
server_.setMessageCallback(
std::bind(&HttpServer::onMessage, this, _1, _2, _3));
}

void HttpServer::start()
{
LOG_WARN << "HttpServer[" << server_.name()
<< "] starts listening on " << server_.ipPort();
server_.start();
}

void HttpServer::onConnection(const TcpConnectionPtr& conn)
{
if (conn->connected())
{
conn->setContext(HttpContext());
}
}

void HttpServer::onMessage(const TcpConnectionPtr& conn,
Buffer* buf,
Timestamp receiveTime)
{
HttpContext* context = boost::any_cast<HttpContext>(conn->getMutableContext());

if (!context->parseRequest(buf, receiveTime))
{
conn->send("HTTP/1.1 400 Bad Request\r\n\r\n");
conn->shutdown();
}

if (context->gotAll())
{
onRequest(conn, context->request());
context->reset();
}
}

void HttpServer::onRequest(const TcpConnectionPtr& conn, const HttpRequest& req)
{
const string& connection = req.getHeader("Connection");
bool close = connection == "close" ||
(req.getVersion() == HttpRequest::kHttp10 && connection != "Keep-Alive");
HttpResponse response(close);
httpCallback_(req, &response);
Buffer buf;
response.appendToBuffer(&buf);
conn->send(&buf);
if (response.closeConnection())
{
conn->shutdown();
}
}


  1. 在HttpServer初始化的时候,在TcpServer中设置了自定义的onConnection和onMessage的回调函数,在onConnection中设置了 conn>setContext(HttpContext());上下文对象,在onMessage中,设置了自定义的解析函数

  2. 在新连接建立后,会触发设置好的onConnection函数

    1
    2
    3
    4
    5
    6
    7
    void HttpServer::onConnection(const TcpConnectionPtr& conn)
    {
    if (conn->connected())
    {
    conn->setContext(HttpContext());
    }
    }

    将httpContext上下文设置到TcpConnection的context中

  3. 在请求消息到来时,会触发epoll树上的读事件

  4. 这个读事件会触发设置好的读回调,也就是TcpConnection::handleRead(Timestamp receiveTime)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    {
    loop_->assertInLoopThread();
    int savedErrno = 0;
    ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);
    if (n > 0)
    {
    messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);//如果正确读取数据,会调用我们设置的自定义messageCallback_
    }
    else if (n == 0)
    {
    handleClose();
    }
    else
    {
    errno = savedErrno;
    LOG_SYSERR << "TcpConnection::handleRead";
    handleError();
    }
    }

    如果正确读取到数据,会调用预先设置到的messageCallback_,将读取到的内容通过buf传进来

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    void HttpServer::onMessage(const TcpConnectionPtr& conn,
    Buffer* buf,
    Timestamp receiveTime)
    {
    HttpContext* context = boost::any_cast<HttpContext>(conn->getMutableContext());

    if (!context->parseRequest(buf, receiveTime))
    {
    conn->send("HTTP/1.1 400 Bad Request\r\n\r\n");
    conn->shutdown();
    }

    if (context->gotAll())
    {
    onRequest(conn, context->request());
    context->reset();
    }
    }

    然后获取实现设置好的context对象,解析http请求,然后调用onRequest函数,调用httpCallback_函数返回数据。

有没有好奇一个问题,那就是在onConnection中传入一个context,然后在onMessage中取出context,但是你会发现,在TcpConnection中,没有对context做任何操作,那为什么还需要传入context呢?buf和receiveTime直接传过来,然后Context使用局部变量不就好了吗?

  **原因是TCP是一个流式传输协议**

在发送数据过来是,TCP 并不保证你每次收到的数据都恰好是一个完整的应用层消息(比如一个完整的 HTTP 请求),一个完整的 HTTP 请求可能会被拆分成多个 TCP 包进行传输。这意味着 `onMessage` 回调可能会被触发多次,每次 `buf` 里只包含请求的一部分,例如:

1. 第一次 `onMessage`,`buf` 里是 `"GET /index.html HTTP/1.1\r\n"`
2. 第二次 `onMessage`,`buf` 里是 `"Host: www.example.com\r\n"`
3. 第三次 `onMessage`,`buf` 里是 `"\r\n"`

为了效率,TCP 也可能将多个小的数据包合并在一起发送。在 HTTP 的 `Keep-Alive` 模式下,客户端可能会连续发送多个请求。这意味着你的一次 `onMessage` 回调中,`buf` 里可能包含一个半、甚至两个或更多的 HTTP 请求。
如果`onMessage` 每次都创建一个新的、局部的 `HttpContext` 对象:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 假设使用局部变量
void HttpServer::onMessage(const TcpConnectionPtr& conn,
Buffer* buf,
Timestamp receiveTime)
{
HttpContext local_context; // 每次都创建一个新的 context

// 场景1:请求被拆分
// 第一次回调,buf 里只有 "GET /index.html...",不完整。
// parseRequest 会解析一部分,然后返回。local_context 随函数结束而被销毁。
// 第二次回调,buf 里是剩下的部分 "Host: ...\r\n\r\n"。
// 此时又创建了一个全新的 local_context,它完全不知道之前已经解析过请求行了。
// 它会尝试把 "Host: ..." 当作一个新的请求行来解析,这必然会导致解析失败。
local_context.parseRequest(buf, receiveTime);

if (local_context.gotAll()) {
// ...
}
}
**局部变量是无状态的**。它无法“记住”上一次 `onMessage` 回调时解析到了哪里。 `context` 的真正作用:为每个连接维持状态,内部维护一个解析的状态机,记录着请求的解析状态。当一个不完整的 HTTP 请求到达时,`HttpContext` 会解析它所能解析的部分,并记录下当前的状态。当这个连接的下一个数据包到达时,`onMessage` 通过 `conn->getMutableContext()` 获取到的是**同一个 `HttpContext` 实例**,然后继续从上次的状态开始解析。 `HttpServer` 可能同时处理成千上万个连接,每个连接的 HTTP 请求解析进度都不同。`context` 机制确保了每个连接的解析状态都独立存储,互不干扰。所以看起来传进去啥都没干,实际上是确保了数据的正确解析,将TcpConnecton和应用层解析状态绑定到了一起

Channel类是用于管理文件描述符的类,它负责管理一个文件描述符的读写事件,*并提供接口来设置读写回调、关闭回调、错误回调等。包含一个EventLoop对象,一个文件描述符,一个事件类型,一个回调函数。是TcpConnection和EventLoop交流的桥梁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class Channel : noncopyable
{
private:
static string eventsToString(int fd, int ev);

void update();
void handleEventWithGuard(Timestamp receiveTime);

static const int kNoneEvent; // 表示不关注任何事件,即不关注读写事件
static const int kReadEvent; // 表示关注读事件,即POLLIN | POLLPRI
static const int kWriteEvent; // 表示关注写事件,即POLLOUT

EventLoop* loop_; // 指向所属的 EventLoop
const int fd_; // 封装的文件描述符
int events_; // 感兴趣的事件类型
int revents_; // 实际发生的事件类型,Channel::handleEvent 方法会根据 revents_ 的值来决定调用哪个具体的回调函数。
int index_; // 记录该 Channel 在 Poller 中的状态,避免了不必要的系统调用
bool logHup_; // 是否记录HUP事件日志,连接断开时会触发HUP事件

std::weak_ptr<void> tie_;
bool tied_;
bool eventHandling_;//一个状态标志,当 Channel 正在执行 handleEvent 方法时,这个值设置为 true,结束时设为false,防止在事件处理过程中发生重入
bool addedToLoop_;//一个状态标志,表示这个 `Channel` 是否已经被添加到 `EventLoop` 的 `Poller` 中
ReadEventCallback readCallback_;
EventCallback writeCallback_;
EventCallback closeCallback_;
EventCallback errorCallback_;
};

index_

​ 通过index_实现一种状态记录机制,这种机制决定着poller中的行为,以epoll为例的话,就是决定ADD,DELETE,MOD等操作,如果没有这个index_的话,在每一个事件触发时,epoll不知道要执行什么操作,他不知道这个fd是否在epoll树上。在查看muduo如何实现之前,我们看一下下面的实现,存在哪些问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
void TcpServer::run()
{
this->m_thread_pool->run();
auto *channel = new Channel(this->m_lfd, FDEvent::ReadEvent, TcpServer::accept_connection,
nullptr, nullptr, this);
this->m_main_loop->add_task(channel, ElementType::ADD);//将监听描述符放到epoll树上
this->m_main_loop->run();
}

struct ChannelElement
{
// 处理节点的类型,ADD,DELETE,MODIFY
ElementType type;
Channel *channel;
};

int EventLoop::add_task(Channel *channel, const ElementType type)
{
this->m_mutex.lock();
auto *node = new class ChannelElement();//包装一个任务
node->channel = channel;
node->type = type;
this->m_task_q.push(node);
this->m_mutex.unlock();

if (this->m_thread_id == std::this_thread::get_id())
{
std::cout << "\n--------------子线程进入(evLoop->threadId == pthread_self()),threadName = "<< this->m_thread_name
+ ", threadID = " << this->m_thread_id << std::endl;
process_taskQ();
} else
{
printf("\n----------主线程进入task_wake_up,threadID = %lu\n\n", pthread_self());
task_wake_up();
}
return 0;
}

void EventLoop::process_taskQ()
{

while (!this->m_task_q.empty())
{
this->m_mutex.lock();
const auto node = m_task_q.front();
m_task_q.pop();
this->m_mutex.unlock();
if (node->type == ElementType::ADD)//根据任务类型中的定义匹配对应的实现
{
add(node->channel);
} else if (node->type == ElementType::DELETE)
{
remove(node->channel);
} else
{
modify(node->channel);
}
delete node;
}
}

上面这个例子存一个问题,那就是效率低,每次添加一个任务,都需要执行一次:new ChannelElement(),m_task_q.push(node)和m_task_q.pop();,同时还要获取互斥锁。是一种命令机制

而muduo的做法是一种状态机制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
void EPollPoller::updateChannel(Channel* channel)
{
Poller::assertInLoopThread();
const int index = channel->index();
LOG_TRACE << "fd = " << channel->fd()
<< " events = " << channel->events() << " index = " << index;

if (index == kNew || index == kDeleted)
{
// a new one, add with EPOLL_CTL_ADD
int fd = channel->fd();
if (index == kNew)
{
assert(channels_.find(fd) == channels_.end());
//如果是新的,将它添加到 Poller 的 map 中进行管理
channels_[fd] = channel;
}
else // index == kDeleted
{
assert(channels_.find(fd) != channels_.end());
//原本就存在于 map 中,只不过之前被停用了,现在重新激活
assert(channels_[fd] == channel);
}
// 更新状态为“已添加”,并调用 epoll_ctl(ADD) 将其加入内核监听
channel->set_index(kAdded);
update(EPOLL_CTL_ADD, channel);
}
else// 否则就是KAdded状态,即已经关注了该事件
{
// 如果 Channel 不再关心任何事件
int fd = channel->fd();
(void)fd;
assert(channels_.find(fd) != channels_.end());
assert(channels_[fd] == channel);
assert(index == kAdded);
// 如果该事件不关注了,调用 epoll_ctl(DEL) 从内核监听中移除
if (channel->isNoneEvent())
{
update(EPOLL_CTL_DEL, channel);
channel->set_index(kDeleted);
}
else
{
// Channel 关心的事件类型发生了变化
update(EPOLL_CTL_MOD, channel);
}
}
}

通过一个标志位index_记录channel的处理状态,不需要每次都new 出一个类似的ChannelElement,即使是push到activeChannels,其速度也是非常快的,因为push的是一个本就存在的指针

对比 activeChannels.push_back(channel) task_queue.push(new ...)
操作对象 拷贝一个裸指针 拷贝一个裸指针
内存管理 (操作的是已存在的对象) (必须先 new,后 delete)
性能开销 极低 (一次指针拷贝) (一次堆分配 + 一次堆释放)

tie_和tied_

解决 TcpConnection 的生命周期安全问题TcpConnection 是通过 std::shared_ptr 管理的, Channel 内部的回调函数如果直接捕获 shared_ptr 会导致循环引用。tie_ 用于保存一个指向 TcpConnectionweak_ptr。在 handleEvent 执行回调前,会尝试将 weak_ptr提升为 shared_ptr。如果提升成功,说明 TcpConnection 对象还活着(通过 tie_.lock() 在事件处理期间”锁住”对象,确保其不会被销毁),就安全地执行回调;如果失败,说明 TcpConnection 已经被销毁了,就不再执行回调,从而避免了对悬空指针的访问。tied_ 只是一个标志,表示是否启用了这个机制。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
//1
void TcpConnection::connectEstablished()
{
// ...
channel_->tie(shared_from_this()); // 将TcpConnection的shared_ptr绑定到Channel
// ...
}
//2
void Channel::tie(const std::shared_ptr<void>& obj)
{
tie_ = obj; // 以weak_ptr形式存储,不增加引用计数
tied_ = true; // 标记已绑定
}
//3
void Channel::handleEvent(Timestamp receiveTime)
{
std::shared_ptr<void> guard;
if (tied_)
{
guard = tie_.lock(); // 尝试将weak_ptr转换为shared_ptr
if (guard)
{
handleEventWithGuard(receiveTime); // 有guard保护下处理事件
}
// 如果guard为空,说明对象已销毁,直接跳过处理
}
else
{
handleEventWithGuard(receiveTime); // 未绑定时直接处理
}
}


shared_from_this():用于在对象内部安全地获取指向自身的 shared_ptr,这里为什么不能直接使用 this 指针呢?

1
2
3
4
5
6
7
8
9
10
11
class BadExample {
public:
std::shared_ptr<BadExample> getSharedPtr() {
return std::shared_ptr<BadExample>(this);
}
};

// 使用时会出现问题
auto ptr1 = std::make_shared<BadExample>();
auto ptr2 = ptr1->getSharedPtr(); // 创建了两个独立的控制块!
// 当 ptr1 和 ptr2 都析构时,对象会被删除两次 -> 崩溃

TcpConnection 通过继承 std::enable_shared_from_this 来解决这个问题。

Acceptor类是用于接受新连接的类,它负责监听一个端口,当有新的连接到来时,它负责接受这个连接,并调用回调函数处理这个连接。包含一个Socket对象,一个Channel对象,一个NewConnectionCallback对象。Socket 对底层的socket文件描述符(sockfd)进行了面向对象的封装,提供了 bind, listen, accept等接口。

一、类的初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
Acceptor::Acceptor(EventLoop* loop, const InetAddress& listenAddr, bool reuseport)
: loop_(loop),
acceptSocket_(sockets::createNonblockingOrDie(listenAddr.family())), // 1. 创建Socket
acceptChannel_(loop, acceptSocket_.fd()), // 创建Channel
listening_(false),
idleFd_(::open("/dev/null",
O_RDONLY |
O_CLOEXEC)) // 预留一个fd,当accept失败时,释放预留的fd,并重新接受新连接
{
assert(idleFd_ >= 0);
acceptSocket_.setReuseAddr(true);
acceptSocket_.setReusePort(reuseport); // 设置端口复用
acceptSocket_.bindAddress(listenAddr); // 绑定地址
// 为 acceptChannel_ 设置一个读回调函数 Acceptor::handleRead。当有新连接到达时,EventLoop
// 会调用这个函数,handleRead 内部会调用 accept() 来接受连接。
acceptChannel_.setReadCallback(std::bind(&Acceptor::handleRead, this));
}

// 当有新连接到来时,epoll会返回监听描述符,然后EventLoop会通过之前设置好的Channel来处理这个监听描述符,会调用channel的回调函数,也就是这里的handleRead进行处理,执行accept,并调用TcpServer中的newConnetion函数。
void Acceptor::handleRead()
{
loop_->assertInLoopThread();
InetAddress peerAddr;
int connfd = acceptSocket_.accept(&peerAddr);
if (connfd >= 0)
{
// 在TcpServer的构造函数中设置好了newConnectionCallback_,当有新连接到来时,会调用TcpServer的newConnection函数
if (newConnectionCallback_)
{
newConnectionCallback_(connfd, peerAddr);
}
else
{
sockets::close(connfd);
}
}
else
{
if (errno == EMFILE)
{
::close(idleFd_); // 1. 释放预留的fd
idleFd_ = ::accept(acceptSocket_.fd(), NULL, NULL); // 2. 接受新连接
::close(idleFd_); // 3. 立即关闭它
idleFd_ = ::open("/dev/null", O_RDONLY | O_CLOEXEC); // 4. 重新打开一个fd
}
}
}

//Acceptor建立新连接时,调用TcpServer的newConnection函数,将新连接封装成一个TcpConnectionPtr
void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr)
{
loop_->assertInLoopThread();
EventLoop* ioLoop = threadPool_->getNextLoop();//获取一个EventLoop线程,将TcpConnection对象放入该线程中
char buf[64];
snprintf(buf, sizeof buf, "-%s#%d", ipPort_.c_str(), nextConnId_);
++nextConnId_;
string connName = name_ + buf;

InetAddress localAddr(sockets::getLocalAddr(sockfd));
TcpConnectionPtr conn(new TcpConnection(ioLoop, connName, sockfd, localAddr, peerAddr));
connections_[connName] = conn;
//设置TcpConnection的回调函数
conn->setConnectionCallback(connectionCallback_);// 连接回调
conn->setMessageCallback(messageCallback_);// 消息回调
conn->setWriteCompleteCallback(writeCompleteCallback_);// 写完成回调
conn->setCloseCallback(std::bind(&TcpServer::removeConnection, this, _1)); // 关闭回调
ioLoop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn));//将TcpConnection对象的连接建立事件放入EventLoop线程中执行
}

宏观上看,Acceptor这个类并不复杂,主要的任务就是负责初始化监听描述符,并设置监听描述符的回调函数handleRead,在新连接到来时,Acceptor会调用Acceptor会调用headleRead建立连接,并调用newConnection回调函数。

1
2
3
4
5
6
7
if (errno == EMFILE)
{
::close(idleFd_); // 1. 释放预留的fd
idleFd_ = ::accept(acceptSocket_.fd(), NULL, NULL); // 2. 接受新连接
::close(idleFd_); // 3. 立即关闭它
idleFd_ = ::open("/dev/null", O_RDONLY | O_CLOEXEC); // 4. 重新打开一个fd
}

当服务器并发连接数非常高时,可能会耗尽进程可用的文件描述符(fd)。这时,accept() 会失败并返回 EMFILE 错误。如果不处理,服务器将无法接受任何新连接,相当于“假死”。

muduo 的解决方案:

  1. Acceptor 在构造时,就预先打开一个指向 /dev/null 的文件描述符 idleFd_。它就像一个“备用座位”。
  2. acceptEMFILE 失败时,Acceptor立即关闭这个备用的 idleFd_,从而释放出一个文件描述符名额。
  3. 有了这个名额,Acceptor 就能成功 accept() 那个等待中的新连接。
  4. 为了避免新连接因为没有被处理而丢失,Acceptor 会立即 close() 这个刚刚接受的连接。这虽然拒绝了客户端,但保证了服务器自身不会卡死,并且向客户端发出了一个明确的拒绝信号(RST),客户端可以稍后重试。
  5. 最后,Acceptor 会再次打开 /dev/null 来重新占用 idleFd_,为下一次 EMFILE 危机做好准备。

这个技巧确保了即使在 fd 耗尽的极端情况下,Acceptor 所在的 EventLoop 也能正常运转,不会因为 accept 不断失败而陷入死循环

SO_RESUCEADDR和SO_RESUCEPORT的区别

特性 SO_REUSEADDR (地址复用) SO_REUSEPORT (端口复用)
核心目的 服务器快速重启 性能扩展 (负载均衡)
解决问题 允许新启动的服务器立即绑定一个处于 TIME_WAIT 状态的端口 允许多个独立的监听套接字绑定到完全相同的 IP 和端口。
工作模式 一个端口在同一时间仍然只能被一个监听套接字绑定。 一个端口可以被多个监听套接字同时绑定。
适用场景 几乎所有服务器程序都应该开启,用于开发和运维中的快速迭代和重启。 面向高性能、高并发连接的服务器,用于在多核 CPU 上扩展 accept 的处理能力。
在 muduo 中 默认开启 默认关闭,需要显式开启。

在标准的 TcpServer 模型中,只有一个主线程负责 accept 所有新连接,但当服务器面临极高的连接建立速率时就会成为整个系统的性能瓶颈acceptSocket_.setReusePort(reuseport); ,也就是SO_RESUCEPORT ,这个选项允许多个线程或进程创建各自的监听套接字,并全部绑定到同一个 IP 和端口上。当新连接到来时,内核会负责进行负载均衡,将这个连接请求“派发”给其中一个监听套接字。这样一来,accept 的工作就被均匀地分摊到了多个 CPU 核心上。

更多细节,见:https://stackoverflow.com/questions/14388706/how-do-so-reuseaddr-and-so-reuseport-differ

仔细看的话,还能发现建立连接的是一个acceptSocket_,而不是系统的accept函数,这是muduo对系统调用的封装,提高扩展性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
Socket acceptSocket_;
/// 对底层的socket文件描述符(sockfd)进行了面向对象的封装,提供了 bind, listen, accept 等接口。
class Socket : noncopyable
{
public:
explicit Socket(int sockfd) : sockfd_(sockfd)
{
}
~Socket();
int fd() const
{
return sockfd_;
}
bool getTcpInfo(struct tcp_info*) const;
bool getTcpInfoString(char* buf, int len) const;
void bindAddress(const InetAddress& localaddr);
void listen();
int accept(InetAddress* peeraddr);
void shutdownWrite();
void setTcpNoDelay(bool on);
void setReuseAddr(bool on);
void setReusePort(bool on);
void setKeepAlive(bool on);

private:
const int sockfd_;
};
//Socket封装socket,利用RAII思想关闭文件描述符,避免忘记关闭
Socket::~Socket()
{
sockets::close(sockfd_);
}
//Socket
void Socket::listen()
{
sockets::listenOrDie(sockfd_);
}
//sockets
void sockets::listenOrDie(int sockfd)
{
//设置等待连接队列的最大长度(SOMAXCONN)。
int ret = ::listen(sockfd, SOMAXCONN);
if (ret < 0)
{
LOG_SYSFATAL << "sockets::listenOrDie";
}
}

可以看见,Socket内部的实现都放在一个名为sockets的作用域中,设计思想:Socket作为一个抽象层,我们在使用的时候不需要考虑不同平台的调用方式,底层的sockets负责实现阔平台逻辑

二、挂载EventLoop

我们发现,Acceptor没有调用listen函数进行监听,实际上开始监听的操作是在TcpServer的start函数实现的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
void TcpServer::start()
{
if (started_.getAndSet(1) == 0)
{
//启动EventLoop线程池
threadPool_->start(threadInitCallback_);

assert(!acceptor_->listening());
//将 Acceptor::listen() 的调用任务放入EventLoop 的待执行队列中,这确保了所有和 EventLoop 相关的操作都在同一个I/O线程中执行,避免了锁竞争
loop_->runInLoop(std::bind(&Acceptor::listen, get_pointer(acceptor_)));
}
}

void EventLoop::runInLoop(Functor cb)
{
if (isInLoopThread())
{
cb();
}
else
{
queueInLoop(std::move(cb));
}
}

void Acceptor::listen()
{
loop_->assertInLoopThread();
listening_ = true;
// 将socket设置为监听状态,并设置等待连接队列的最大长度(SOMAXCONN)。
acceptSocket_.listen();
// 放到epoll树上,注册可读事件
acceptChannel_.enableReading();
}


在TcpServer::start函数启动后,loop_->runInLoop(std::bind(&Acceptor::listen, get_pointer(acceptor_)));会将Acceptor::listen函数启动开始监听,并将文件描述符放到主线程的epoll树上监听读实现,当事件触发时,会调用文件描述符的读回调,也就是headleRead函数处理连接。。

0%