muduo(7)-AsyncLogging

muduo库的AsyncLogging异步日志记录使用双缓冲+任务队列实现,同时,在日志内容过多时,主动丢弃部分日志,确保系统高效的运行,防止被日志阻塞。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
BufferPtr currentBuffer_ GUARDED_BY(mutex_);//当前缓冲区
BufferPtr nextBuffer_ GUARDED_BY(mutex_);//备用缓冲区
BufferVector buffers_ GUARDED_BY(mutex_);//待写入队列


void AsyncLogging::append(const char* logline, int len)
{
muduo::MutexLockGuard lock(mutex_);
// 如果当前缓冲区有足够的空间,则直接追加到当前缓冲区
if (currentBuffer_->avail() > len)
{
currentBuffer_->append(logline, len);
}
else
{
// 将当前缓冲区放入待写入队列或者说待写入缓冲区
buffers_.push_back(std::move(currentBuffer_));
// 为 currentBuffer_ 获取一个新的空缓冲区
if (nextBuffer_) // 如果备用缓冲区存在
{
// 将备用缓冲区设置为当前缓冲区
currentBuffer_ = std::move(nextBuffer_);
}
else
{
// 在备用缓冲区不存在的情况下,为 currentBuffer_ 分配一个新的空缓冲区
currentBuffer_.reset(new Buffer);
}
// 将新的日志消息写入新的 currentBuffer_
currentBuffer_->append(logline, len);
// 通知等待的线程开始写入数据(当前缓冲区已满,批量写入)
cond_.notify();
}
}

后端一直循环写入日志,看起来比较绕,currentBuffer,nextBuffer,buffers,newBuffer1,newBuffer2,buffersToWrite,一共六个buffer

  1. currentBuffer:前端写入日志的buffer
  2. nextBuffer:前端备用buffer
  3. buffers:前端待写入日志队列
  4. newBuffer1:后端备用buffer
  5. newBuffer2:后端备用buffer
  6. bufferToWrite:真正要写入日志的buffer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
void AsyncLogging::threadFunc()
{
// 确认后端日志线程已启动
assert(running_ == true);
// 使用 CountDownLatch 通知前端线程:后端线程已成功启动,可以开始记录日志了
latch_.countDown();

// 创建一个 LogFile 对象,这是日志最终写入的目标文件。
// 第三个参数 threadSafe 设置为 false,因为所有写操作都在这一个后端线程内完成,无需加锁。
LogFile output(basename_, rollSize_, false);

// 这两个缓冲区用于后续和前端的缓冲区进行交换,避免在后端线程中频繁分配新内存
BufferPtr newBuffer1(new Buffer);
BufferPtr newBuffer2(new Buffer);
newBuffer1->bzero();
newBuffer2->bzero();

// 准备一个 vector,用于存放从前端(append函数)收集到的、待写入文件的缓冲区指针
BufferVector buffersToWrite;
buffersToWrite.reserve(16);

// 后端日志线程主循环
while (running_)
{
// 在循环开始时,断言两个空闲缓冲区都是空的,并且待写入区也是空的
assert(newBuffer1 && newBuffer1->length() == 0);
assert(newBuffer2 && newBuffer2->length() == 0);
assert(buffersToWrite.empty());

{
muduo::MutexLockGuard lock(mutex_);

// 等待可能被两种情况唤醒:
// 1. 前端线程写满一个 buffer 后调用 cond_.notify() 唤醒。
// 2. 超时时间到达,即使没有数据也要进行一次日志刷盘。
if (buffers_.empty()) // unusual usage!
{
cond_.waitForSeconds(flushInterval_);
}

// 无论 cond_ 是如何被唤醒的,我们都把前端的 currentBuffer_ 移到待写入队列中。
// 这样可以确保即使在超时的情况下,当前缓冲区里未满的日志也能被收集到。
buffers_.push_back(std::move(currentBuffer_));

// 将之前准备好的空闲缓冲区 newBuffer1 设置为新的 currentBuffer_
currentBuffer_ = std::move(newBuffer1);

// 将前端的整个待写入队列 buffers_ 和后端的 buffersToWrite 进行交换。
// 交换后,buffers_ 变为空,前端可以继续无锁地向其中添加写满的 buffer。
buffersToWrite.swap(buffers_);

// 如果前端把备用缓冲区 nextBuffer_ 也用掉了,那么就把另一个空闲缓冲区 newBuffer2 补上。
// currentbuffer写满了,执行currentbuffer = move(nextbuffer),此后nextbuffer为空
if (!nextBuffer_)
{
nextBuffer_ = std::move(newBuffer2);
}
}

// 断言我们确实拿到了待写入的数据
assert(!buffersToWrite.empty());

// 如果待写入的缓冲区数量过多(超过25个),说明前端日志产生速度远超后端写入速度,
// 为了防止内存无限增长,这里会丢弃掉一部分日志。
if (buffersToWrite.size() > 25)
{
char buf[256];
snprintf(buf, sizeof buf, "Dropped log messages at %s, %zd larger buffers\n",
Timestamp::now().toFormattedString().c_str(),
buffersToWrite.size()-2);
fputs(buf, stderr);
output.append(buf, static_cast<int>(strlen(buf)));
// 只保留前两个 buffer 的日志,其他的丢弃
buffersToWrite.erase(buffersToWrite.begin()+2, buffersToWrite.end());
}

// 遍历所有待写入的 buffer,将它们的内容追加到 LogFile 对象中
for (const auto& buffer : buffersToWrite)
{
output.append(buffer->data(), buffer->length());
}
if (buffersToWrite.size() > 2)
{
// 丢弃多余的buffer,只保留两个,用于回收利用,避免内存持有过多
buffersToWrite.resize(2);
}

// 从处理完的 buffersToWrite 中回收 buffer 作为下一个空闲缓冲区(这个一定执行)
if (!newBuffer1)
{
assert(!buffersToWrite.empty());
newBuffer1 = std::move(buffersToWrite.back());
buffersToWrite.pop_back();
newBuffer1->reset(); // 清空 buffer
}
//可能执行,如果是超时进来的,就不会执行,因为没有执行currentbuffer = move(nextbuffer),nextbuffer没有为空那么上面的交换
//就没有执行,所以newBUffer2就不会为空,反之为空
if (!newBuffer2)
{
assert(!buffersToWrite.empty());
newBuffer2 = std::move(buffersToWrite.back());
buffersToWrite.pop_back();
newBuffer2->reset(); // 清空 buffer
}

// 清空待写入队列,并强制将 LogFile 缓冲区的数据刷到磁盘
buffersToWrite.clear();
output.flush();
}
// 线程退出前,最后一次将 LogFile 缓冲区的数据刷到磁盘
output.flush();
}

如何使用:

这个AsyncLogging需要配合Logger类使用

  1. LOG_INFO 宏创建 Logger 对象
1
2
3
4
#define LOG_INFO if (muduo::Logger::logLevel() <= muduo::Logger::INFO) \
muduo::Logger(__FILE__, __LINE__).stream()

LOG_INFO << "hello world";

日志会创建一个临时对象,然后这个对象在结束的时候通过g_output将内容添加到currentBuffer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Logger::~Logger()
{
impl_.finish();
const LogStream::Buffer& buf(stream().buffer());
g_output(buf.data(), buf.length());
if (impl_.level_ == FATAL)
{
g_flush();
abort();
}
}
Logger::OutputFunc g_output = defaultOutput;

void defaultOutput(const char* msg, int len)
{
size_t n = fwrite(msg, 1, len, stdout);
// FIXME check n
(void)n;
}

从这里可以看出默认的输出是输出到终端,所以我们在使用的时候需要自定义输出器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#include <muduo/base/CurrentThread.h>
#include <muduo/base/AsyncLogging.h>
#include <muduo/base/Logging.h>
#include <muduo/base/Thread.h>

//定义全局AsyncLogging指针
muduo::AsyncLogging* g_asyncLogging = nullptr;

//定义异步输出函数
void asyncOutput(const char* msg, int len)
{
g_asyncLogging->append(msg, len);
}

int main()
{
muduo::AsyncLogging log("log.txt", 0);
g_asyncLogging = &log;

//设置日志输出函数
muduo::Logger::setOutput(asyncOutput);
//启动异步日志线程
log.start();
//输出日志
LOG_INFO << "hello world";
muduo::CurrentThread::sleepUsec(1000 * 1000);
return 0;
}

CentOS yum软件源

清华源

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
[base]
name=CentOS-7.9.2009 - Base - tsinghua.com
#mirrorlist=
baseurl=https://mirrors.tuna.tsinghua.edu.cn/centos-vault/7.9.2009/os/$basearch/
gpgcheck=1
gpgkey=file:///etc/pki/rpm-gpg/RPM-GPG-KEY-CentOS-7

#released updates
[updates]
name=CentOS-7.9.2009 - Updates - tsinghua.com
#mirrorlist=
baseurl=https://mirrors.tuna.tsinghua.edu.cn/centos-vault/7.9.2009/updates/$basearch/
gpgcheck=1
gpgkey=file:///etc/pki/rpm-gpg/RPM-GPG-KEY-CentOS-7
#additional packages that may be useful
[extras]
name=CentOS-7.9.2009 - Extras - tsinghua.com
#mirrorlist=
baseurl=https://mirrors.tuna.tsinghua.edu.cn/centos-vault/7.9.2009/extras/$basearch/
gpgcheck=1
gpgkey=file:///etc/pki/rpm-gpg/RPM-GPG-KEY-CentOS-7
#additional packages that extend functionality of existing packages
[centosplus]
name=CentOS-7.9.2009 - Plus - tsinghua.com
baseurl=https://mirrors.tuna.tsinghua.edu.cn/centos-vault/7.9.2009/centosplus/$basearch/
gpgcheck=1
enabled=0
gpgkey=file:///etc/pki/rpm-gpg/RPM-GPG-KEY-CentOS-7

官方源

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
[base]
name=CentOS-7.9.2009 - Base
baseurl=https://vault.centos.org/7.9.2009/os/$basearch/
gpgcheck=1
gpgkey=https://vault.centos.org/RPM-GPG-KEY-CentOS-7
enabled=1

[updates]
name=CentOS-7.9.2009 - Updates
baseurl=https://vault.centos.org/7.9.2009/updates/$basearch/
gpgcheck=1
gpgkey=https://vault.centos.org/RPM-GPG-KEY-CentOS-7
enabled=1

[extras]
name=CentOS-7.9.2009 - Extras
baseurl=https://vault.centos.org/7.9.2009/extras/$basearch/
gpgcheck=1
gpgkey=https://vault.centos.org/RPM-GPG-KEY-CentOS-7
enabled=1

[centosplus]
name=CentOS-7.9.2009 - Plus
baseurl=https://vault.centos.org/7.9.2009/centosplus/$basearch/
gpgcheck=1
gpgkey=https://vault.centos.org/RPM-GPG-KEY-CentOS-7
enabled=0

CentOS 网络环境配置

一、配置静态IP

修改Address,Netmask,Gateway,DNS,修改后点击Apply并重启网络

image-20250719140135106

二、代理环境

软件下载:clash for windows linux

image-20250719160231349

如果是root用户,需要添加--no-sandbox运行

1
./cfw --no-sandbox

添加环境

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
vim ~/.bashrc
# 添加如下信息
export http_proxy="http://127.0.0.1:7890"
export https_proxy="http://127.0.0.1:7890"
export HTTP_PR0XY="http://127.0.0.1:7890"
export HTTPS_PR0XY="http://127.0.0.1:7890"
export no_proxy="localhost,127.0.0.1""
source ~/.bashrc

sudo vim /etc/profile
export http_proxy="http://127.0.0.1:7890"
export https_proxy="http://127.0.0.1:7890"
export HTTP_PR0XY="http://127.0.0.1:7890"
export HTTPS_PR0XY="http://127.0.0.1:7890"
export no_proxy="localhost,127.0.0.1""
source /etc/profile

vim /etc/environment
export http_proxy="http://127.0.0.1:7890"
export https_proxy="http://127.0.0.1:7890"
export HTTP_PR0XY="http://127.0.0.1:7890"
export HTTPS_PR0XY="http://127.0.0.1:7890"
export no_proxy="localhost,127.0.0.1""
source /etc/environment

sudo vim /etc/yum.conf
export http_proxy="http://127.0.0.1:7890"
export https_proxy="http://127.0.0.1:7890"
export HTTP_PR0XY="http://127.0.0.1:7890"
export HTTPS_PR0XY="http://127.0.0.1:7890"
export no_proxy="localhost,127.0.0.1""

sudo vim /etc/wgetrc
http_proxy = http://127.0.0.1:7890
https_proxy = http://127.0.0.1:7890
HTTP_PR0XY = http://127.0.0.1:7890
HTTPS_PR0XY = http://127.0.0.1:7890
no_proxy = localhost,127.0.0.1

curl -I http://www.baidu.com

一键脚本:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
#!/bin/bash
# =================================================================
# 一键为 CentOS 设置/取消 127.0.0.1:7890 代理 (优化版)
# 用法:
# sudo ./set_proxy.sh on # 启用代理
# sudo ./set_proxy.sh off # 关闭代理
# =================================================================

# --- 配置区 ---
PROXY_URL="http://127.0.0.1:7890"
NO_PROXY="localhost,127.0.0.1,::1,192.168.0.0/16,10.0.0.0/8"

# --- 标记,用于安全地添加和删除配置 ---
BEGIN_MARKER="# BEGIN PROXY CONFIG - Managed by script"
END_MARKER="# END PROXY CONFIG - Managed by script"

# --- 获取真正的用户家目录 ---
# 如果是通过 sudo 执行,SUDO_USER 变量会包含原用户名
if [[ -n "$SUDO_USER" ]]; then
USER_HOME=$(getent passwd "$SUDO_USER" | cut -d: -f6)
else
# 如果是 root 直接执行,则使用 root 的家目录
USER_HOME=$HOME
fi

# 需要写入的环境变量文件列表
# 注意:/etc/environment 的语法和其他文件不同,没有 export
declare -a ENV_FILES=("$USER_HOME/.bashrc" "/etc/profile")
ENV_FILE_SYSTEM="/etc/environment"

# --- 函数定义 ---

# 安全地写入或清除代理配置
handle_proxy_files() {
local action="$1"

# 构造配置内容
local proxy_content_export="
export http_proxy=\"$PROXY_URL\"
export https_proxy=\"$PROXY_URL\"
export HTTP_PROXY=\"$PROXY_URL\"
export HTTPS_PROXY=\"$PROXY_URL\"
export no_proxy=\"$NO_PROXY\"
export NO_PROXY=\"$NO_PROXY\""

local proxy_content_plain="
http_proxy=\"$PROXY_URL\"
https_proxy=\"$PROXY_URL\"
HTTP_PROXY=\"$PROXY_URL\"
HTTPS_PROXY=\"$PROXY_URL\"
no_proxy=\"$NO_PROXY\"
NO_PROXY=\"$NO_PROXY\""

# 清理函数:使用标记来精确删除
clear_proxy() {
local file="$1"
if [ -f "$file" ]; then
# 使用 sed 删除从 BEGIN_MARKER 到 END_MARKER 之间的所有行
sed -i "/^${BEGIN_MARKER}$/,/^${END_MARKER}$/d" "$file"
fi
}

# 写入函数
write_proxy() {
local file="$1"
local content="$2"
# 写入前先清理,防止重复
clear_proxy "$file"
# 使用 cat 和 EOF 来写入整个块
cat >> "$file" <<EOF

${BEGIN_MARKER}
${content}
${END_MARKER}
EOF
}

# 遍历文件列表进行操作
for f in "${ENV_FILES[@]}"; do
if [[ "$action" == "on" ]]; then
write_proxy "$f" "$proxy_content_export"
else
clear_proxy "$f"
fi
done

# 单独处理 /etc/environment
if [[ "$action" == "on" ]]; then
write_proxy "$ENV_FILE_SYSTEM" "$proxy_content_plain"
else
clear_proxy "$ENV_FILE_SYSTEM"
fi
}

# YUM/DNF 配置
handle_package_manager() {
local conf_file="/etc/yum.conf"
if [ ! -f "$conf_file" ]; then
conf_file="/etc/dnf/dnf.conf" # 兼容新的 CentOS/RHEL
fi

# 先删除旧的 proxy 设置行,避免重复
sed -i '/^proxy=/d' "$conf_file"

if [[ "$1" == "on" ]]; then
# 在 [main] 部分追加配置
# 如果[main]不存在,则直接追加到文件末尾
if grep -q "\[main\]" "$conf_file"; then
sed -i "/\[main\]/a proxy=$PROXY_URL" "$conf_file"
else
echo "proxy=$PROXY_URL" >> "$conf_file"
fi
fi
}

# --- 主流程 ---
case "$1" in
on)
echo ">>> 正在为系统配置代理: $PROXY_URL"
handle_proxy_files on
handle_package_manager on
echo ">>> 代理配置写入成功!"
echo
echo "========================= 重要提示 ========================="
echo "请执行以下命令使配置在当前终端立即生效:"
echo " source ${USER_HOME}/.bashrc"
echo "或者,请重新打开一个新的终端窗口。"
echo "=========================================================="
;;
off)
echo ">>> 正在清除系统代理配置..."
handle_proxy_files off
handle_package_manager off
echo ">>> 代理配置已清除!"
echo
echo "========================= 重要提示 ========================="
echo "请执行以下命令使变更在当前终端立即生效:"
echo " source ${USER_HOME}/.bashrc"
echo "或者,请重新打开一个新的终端窗口。"
echo "=========================================================="
;;
*)
echo "用法: sudo $0 {on|off}"
exit 1
;;
esac

exit 0

使用方法

  1. 把脚本保存为 set_proxy.sh ,并赋予可执行权限
1
chmod +x set_proxy.sh
  1. 启用代理
1
sudo ./set_proxy.sh on
  1. 关闭代理
1
sudo ./set_proxy.sh off

CentOS C++环境搭建

一、配置编译套件

百度网盘:[devtoolset-11](通过网盘分享的文件:devtoolset-11_0717.tar.gz
链接: https://pan.baidu.com/s/1w9GIWchuaBNbjeJrusrbSQ?pwd=0228 提取码: 0228
–来自百度网盘超级会员v4的分享)

下载后执行如下命令:

1
2
3
cp devtoolset-11_0717.tar.gz /opt/rh/
cd /opt/rh/
tar -zxvf devtoolset-11_0717.tar.gz

这样就配置好了,如果要激活环境,可以使用如下命令(每次使用都要激活)

1
2
3
source /opt/rh/devtoolset-11/enable
或者
scl enable devtoolset-11 bash

如果觉得麻烦,可以添加到环境变量,以后都不需要手动激活(不建议,貌似会把环境搞乱)

1
2
3
4
vim ~/.bashrc
#添加如下命令
source /opt/rh/devtoolset-11/enable
g++ --version

注意,如果没有添加到环境变量,使用vscode进行ssh连接时,cmake插件是查询不到devtoolset-11里面的编译套件的!

百度网盘:[devtoolset-12](通过网盘分享的文件:devtoolset-12.tar.gz
链接: https://pan.baidu.com/s/1p7hJgtuKmDqHvtM2elAx6A?pwd=0228 提取码: 0228
–来自百度网盘超级会员v4的分享),如果这个不行,可以通过 如下方式获取,添加软件源到/etc/yum.repos.d/CentOS-Base.repo

1
2
3
4
5
6
7
8
9
10
[copr:copr.fedorainfracloud.org:mlampe:devtoolset-12]
name=Copr repo for devtoolset-12 owned by mlampe
baseurl=https://download.copr.fedorainfracloud.org/results/mlampe/devtoolset-12/epel-7-$basearch/
type=rpm-md
skip_if_unavailable=True
gpgcheck=1
gpgkey=https://download.copr.fedorainfracloud.org/results/mlampe/devtoolset-12/pubkey.gpg
repo_gpgcheck=0
enabled=1
enabled_metadata=1

然后通过如下命令下载

1
yum install devtoolset-12

二、cmake安装

百度网盘:[cmake 3.28](通过网盘分享的文件:cmake-3.28.0-linux-x86_64.tar.gz
链接: https://pan.baidu.com/s/1cVjYv8z3n-uWDzl1bBitig?pwd=0228 提取码: 0228
–来自百度网盘超级会员v4的分享)

下载后执行如下命令

1
2
3
4
5
tar -zxvf cmake-3.28.0-linux-x86_64.tar.gz
cd cmake-3.28.0-linux-x86_64
sudo cp -r /root/tools/cmake-3.28.0-linux-x86_64/bin/* /usr/local/bin/
sudo cp -r /root/tools/cmake-3.28.0-linux-x86_64/share/cmake-3.28 /usr/local/share/
cmake --version

其他版本见:https://cmake.org/files/

三、clangd安装

采用conda安装clangd

1
2
3
4
5
wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh 
bash Miniconda3-latest-Linux-x86_64.sh
source ~/.bashrc
conda install -c conda-forge clang clangxx clang-tools
clangd --version

muduo(6)-EventLoop

一、线程安全注解

充分利用编译器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
EventLoop* loop_ GUARDED_BY(mutex_);

class SCOPED_CAPABILITY MutexLockGuard : noncopyable
{
public:
explicit MutexLockGuard(MutexLock& mutex) ACQUIRE(mutex)
: mutex_(mutex)
{
mutex_.lock();
}

~MutexLockGuard() RELEASE()
{
mutex_.unlock();
}

private:

MutexLock& mutex_;
};
#define MutexLockGuard(x) error "Missing guard object name"

EventLoop* loop_ GUARDED_BY(mutex_); 是一种线程安全注解,通常用于标注某个成员变量的访问需要特定的锁保护。在这个例子中,loop_ 的访问需要由 mutex_ 互斥锁保护。

在编译时,静态分析工具会检查代码中是否正确地在访问 loop_ 时持有 mutex_ 锁。如果没有持有锁,工具会发出警告或错误提示。GUARDED_BY 是一种编译期的注解,对运行时行为没有直接影响。它不会生成额外的代码,也不会影响程序的性能。为开发者提供明确的线程安全约定,提醒其他人维护代码时遵守这些规则。

MutexLockGuard(x)是一种防止用法错误的技巧宏,目的是防止你写出如下代码:

1
MutexLockGuard(mutex_);

这样写会创建一个临时的 MutexLockGuard 对象,它在这一行代码结束后就被销毁,锁也会立即释放,根本起不到加锁保护作用!宏定义把 MutexLockGuard(x) 替换成 error “Missing guard object name”。如果你写了 MutexLockGuard(mutex_);,编译器会报错:“Missing guard object name”。这样强制你必须写变量名,防止误用。

SCOPED_CAPABILITY,ACQUIRE,RELEASE都是配合Clang编译器检查使用的

SCOPED_CAPABILITY 是一个线程安全注解宏,这个类是一个“作用域锁”,构造函数是“加锁”,析构函数是“解锁”。

ACQUIRE(mutex) 也是线程安全注解宏,函数会“获取”某个锁(mutex)。

RELEASE()也是线程安全注解宏,函数会“释放”锁(mutex)。

通过Clang编译时检查,见:Thread Safety Analysis: https://clang.llvm.org/docs/ThreadSafetyAnalysis.html

二、事件处理模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
while (!quit_)
{
activeChannels_.clear();
//activeChannels_是一个传入传出参数,保存着有事件发生的Channel
pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
++iteration_;
// TODO sort channel by priority
eventHandling_ = true;
//遍历activeChannels_,调用Channel的handleEvent方法
for (Channel* channel : activeChannels_)
{
currentActiveChannel_ = channel;
currentActiveChannel_->handleEvent(pollReturnTime_);
}
currentActiveChannel_ = NULL;
eventHandling_ = false;
doPendingFunctors();
}
// epoll监听
Timestamp EPollPoller::poll(int timeoutMs, ChannelList* activeChannels)
{
LOG_TRACE << "fd total count " << channels_.size();
int numEvents = ::epoll_wait(epollfd_,
&*events_.begin(),
static_cast<int>(events_.size()),
timeoutMs);
int savedErrno = errno;
Timestamp now(Timestamp::now());
if (numEvents > 0)
{
LOG_TRACE << numEvents << " events happened";
//将监听到的全部事件都转移到activeChannels,单一职责,poller只负责 I/O 复用,事件处理是EventLoop的职责
fillActiveChannels(numEvents, activeChannels);
if (implicit_cast<size_t>(numEvents) == events_.size())
{
events_.resize(events_.size()*2);
}
}
else if (numEvents == 0)
{
LOG_TRACE << "nothing happened";
}
else
{
// error happens, log uncommon ones
if (savedErrno != EINTR)
{
errno = savedErrno;
LOG_SYSERR << "EPollPoller::poll()";
}
}
return now;
}

void EPollPoller::fillActiveChannels(int numEvents,
ChannelList* activeChannels) const
{
assert(implicit_cast<size_t>(numEvents) <= events_.size());
for (int i = 0; i < numEvents; ++i)
{
Channel* channel = static_cast<Channel*>(events_[i].data.ptr);
#ifndef NDEBUG
int fd = channel->fd();
ChannelMap::const_iterator it = channels_.find(fd);
assert(it != channels_.end());
assert(it->second == channel);
#endif
channel->set_revents(events_[i].events);
activeChannels->push_back(channel);
}
}

Poller 的职责是 I/O 复用,而 EventLoop 的职责是事件分发。通过activeChannels_将两者分开,可以使系统结构更清晰、耦合度更低、更易于扩展。

1
2
3
4
if (implicit_cast<size_t>(numEvents) == events_.size())
{
events_.resize(events_.size()*2);
}

epoll返回的数量等于我们传入的最大数量,说明epoll内部其实有更多的事件触发,只不过受限于我们传入的static_cast<int>(events_.size())无法全部传出,所以需要扩容方便后续处理更多事件

1
2
3
4
5
6
7
8
9
eventHandling_ = true;
//遍历activeChannels_,调用Channel的handleEvent方法
for (Channel* channel : activeChannels_)
{
currentActiveChannel_ = channel;
currentActiveChannel_->handleEvent(pollReturnTime_);
}
currentActiveChannel_ = NULL;
eventHandling_ = false;

eventHandling作为一个状态标志,防止在处理activeChannels_中的事件时,修改activeChannels_中的信息导致出差。不能在遍历一个容器的同时修改它

如在removeChannel函数中

1
2
3
4
5
6
7
8
9
10
11
12
void EventLoop::removeChannel(Channel* channel)
{
assert(channel->ownerLoop() == this);
assertInLoopThread();
if (eventHandling_)
{
assert(currentActiveChannel_ == channel ||
std::find(activeChannels_.begin(), activeChannels_.end(), channel) ==
activeChannels_.end());
}
poller_->removeChannel(channel);
}

在取出一个channel时,必须要确保当前活跃的channel和要删除channel一致或者不在activeChannels_中的chennel,如果两个channel不一致且存在activeChannels_中,就可能会发生异常情况,在

1
2
3
4
5
for (Channel* channel : activeChannels_)
{
currentActiveChannel_ = channel;
currentActiveChannel_->handleEvent(pollReturnTime_);
}

中执行到被删除的channel时就可能会发生错误(空指针)

三、职责单一原则

muduo强调类职责单一原则,其他类通过runInLoop添加到EventLoop中的pendingFunctors_,这个**pendingFunctors_**在EventLoop处理完全部epoll事件后统一处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
while (!quit_)
{
activeChannels_.clear();
// activeChannels_是一个传入传出参数,保存着有事件发生的Channel
pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
++iteration_;
if (Logger::logLevel() <= Logger::TRACE)
{
printActiveChannels();
}
// TODO sort channel by priority
eventHandling_ = true;
// 遍历activeChannels_,调用Channel的handleEvent方法
for (Channel* channel : activeChannels_)
{
currentActiveChannel_ = channel;
currentActiveChannel_->handleEvent(pollReturnTime_);
}
currentActiveChannel_ = NULL;
eventHandling_ = false;
doPendingFunctors();
}

void EventLoop::doPendingFunctors()
{
std::vector<Functor> functors;
callingPendingFunctors_ = true;

{
MutexLockGuard lock(mutex_);
functors.swap(pendingFunctors_);
}

for (const Functor& functor : functors)
{
functor();
}
callingPendingFunctors_ = false;
}

如将监听描述符上树添加到EventLoop

1
2
//将 Acceptor::listen() 的调用任务放入EventLoop 的待执行队列中,这确保了所有和 EventLoop 相关的操作都在同一个I/O线程中执行,避免了锁竞争
loop_->runInLoop(std::bind(&Acceptor::listen, get_pointer(acceptor_)));

如关闭连接等等

1
2
loop_->runInLoop(std::bind(&TcpServer::removeConnectionInLoop, this, conn));
loop_->runInLoop(std::bind(&TcpConnection::shutdownInLoop, this));

所有的事件操作都需要在EventLoop中执行

四、唤醒EventLoop线程

1
2
int wakeupFd_;
std::unique_ptr<Channel> wakeupChannel_;

muduo将wakeupChannel_添加到每一个EventLoop中,这样在有事件发生时,能够及时的唤醒对应的线程,防止阻塞

五、定时任务

muduo库还支持定时任务,在EventLoop初始化的时候,初始化一个timerQueue_,这个容器记录能够记录着各个定时任务,在TimerQueue初始化的时候,也会创建一个timerfd_放到EventLoop的监听当中,当最近的定时任务到期时,timerfd 会变为可读,EventLooppoll 调用中被唤醒,并将 timerfdChannel_ 作为一个活跃事件进行处理,最终调用其读回调,也就是 TimerQueue::handleRead,从而执行到期的定时任务。将时间事件转换为一个文件描述符的 I/O 事件,典型的应用有:定时发送心跳,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
//TimeQueue.h
private:
// 按到期时间排序的 set,用于快速查找下一个要到期的定时器
typedef std::pair<Timestamp, Timer*> Entry;
typedef std::set<Entry> TimerList;

// 按Timer* 地址排序的 set,用于快速取消(删除)一个定时器
typedef std::pair<Timer*, int64_t> ActiveTimer;
typedef std::set<ActiveTimer> ActiveTimerSet;

TimerList timers_;//待办事项
ActiveTimerSet activeTimers_;

//TimeQueue::Insert
bool TimerQueue::insert(Timer* timer)
{
loop_->assertInLoopThread();
assert(timers_.size() == activeTimers_.size());
bool earliestChanged = false;
Timestamp when = timer->expiration();
TimerList::iterator it = timers_.begin();
// 检查新插入的定时器是否会成为新的“最早到期”的定时器
if (it == timers_.end() || when < it->first)
{
earliestChanged = true;
}

// 同时插入到两个 set 中
timers_.insert(Entry(when, timer));
activeTimers_.insert(ActiveTimer(timer, timer->sequence()));

assert(timers_.size() == activeTimers_.size());
return earliestChanged;
}

//当 timerfd 触发,handleRead 被调用。
void TimerQueue::handleRead()
{
// 当前时间
Timestamp now(Timestamp::now());
readTimerfd(timerfd_, now); // 清除事件

// 1. 获取所有已到期的定时器(为什么不是一个,而是全部?这是因为在处理其他IO事件时,时间仍在流失,可能有多个任务都到期了)
std::vector<Entry> expired = getExpired(now);

callingExpiredTimers_ = true;
cancelingTimers_.clear();

// 2. 执行回调
for (const Entry& it : expired)
{
it.second->run();
}
callingExpiredTimers_ = false;

// 3. 重置周期性任务
reset(expired, now);
}

std::vector<TimerQueue::Entry> TimerQueue::getExpired(Timestamp now)
{
std::vector<Entry> expired;
// 构造一个哨兵值,时间为 now,指针为一个最大值
Entry sentry(now, reinterpret_cast<Timer*>(UINTPTR_MAX));
// lower_bound 会 O(log N) 找到第一个到期时间 > now 的迭代器
TimerList::iterator end = timers_.lower_bound(sentry);

// 从头到 end 迭代器之间的所有元素都是已到期的
std::copy(timers_.begin(), end, back_inserter(expired));
timers_.erase(timers_.begin(), end);

// 从 activeTimers_ 中也移除这些定时器
for (const Entry& it : expired)
{
ActiveTimer timer(it.second, it.second->sequence());
size_t n = activeTimers_.erase(timer);
assert(n == 1); (void)n;
}
return expired;
}

void TimerQueue::reset(const std::vector<Entry>& expired, Timestamp now)
{
Timestamp nextExpire;

// 遍历过期的定时器,如果定时器是重复的,则重启定时器,否则删除定时器
for (const Entry& it : expired)
{
ActiveTimer timer(it.second, it.second->sequence());
// 如果定时器是重复的,则重启定时器,否则删除定时器
if (it.second->repeat()
&& cancelingTimers_.find(timer) == cancelingTimers_.end())
{
it.second->restart(now);
insert(it.second);
}
// 如果定时器不是重复的,则删除定时器
else
{
delete it.second;
}
}

// 如果处理后的定时任务不为空,则设置下一次超时时间
if (!timers_.empty())
{
nextExpire = timers_.begin()->second->expiration();
}

//确保下一次超时时间有效
if (nextExpire.valid())
{
// 设置下一次超时时间
resetTimerfd(timerfd_, nextExpire);
}
}

//设置下一次到期时间
void resetTimerfd(int timerfd, Timestamp expiration)
{
// wake up loop by timerfd_settime()
struct itimerspec newValue;
struct itimerspec oldValue;
memZero(&newValue, sizeof newValue);
memZero(&oldValue, sizeof oldValue);
newValue.it_value = howMuchTimeFromNow(expiration);
//调用 timerfd_settime 系统调用,让 timerfd_ 在 expiration 这个时刻变为可读状态
int ret = ::timerfd_settime(timerfd, 0, &newValue, &oldValue);
if (ret)
{
LOG_SYSERR << "timerfd_settime()";
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//EventLoop.h
std::unique_ptr<TimerQueue> timerQueue_;

timerQueue_(new TimerQueue(this)),

TimerQueue::TimerQueue(EventLoop* loop)
: loop_(loop),
timerfd_(createTimerfd()),
timerfdChannel_(loop, timerfd_),
timers_(),
callingExpiredTimers_(false)
{
timerfdChannel_.setReadCallback(
std::bind(&TimerQueue::handleRead, this));
timerfdChannel_.enableReading();
}

六、任务安全处理

muduo在执行任务时非常注意安全问题,通常都会使用一个原子变量标注,防止在执行任务的时候其他类修改任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
//EventLoop.cc
eventHandling_(false),
callingPendingFunctors_(false),
//执行前设置为true
eventHandling_ = true;
// 遍历activeChannels_,调用Channel的handleEvent方法
for (Channel* channel : activeChannels_)
{
currentActiveChannel_ = channel;
currentActiveChannel_->handleEvent(pollReturnTime_);
}
currentActiveChannel_ = NULL;
//结束后设置为false
eventHandling_ = false;


//EventLoop.cc
//开始前设置为ture
callingPendingFunctors_ = true;
{
MutexLockGuard lock(mutex_);
functors.swap(pendingFunctors_);
}

for (const Functor& functor : functors)
{
functor();
}
//结束时设置为false
callingPendingFunctors_ = false;


//定时任务处理,TimerQueue.cc
callingExpiredTimers_ = true;
cancelingTimers_.clear();
// safe to callback outside critical section
for (const Entry& it : expired)
{
it.second->run();
}
callingExpiredTimers_ = false;

muduo(5)-Buffer

一、Buffer缓冲区设计

1
2
3
4
5
6
+-------------------+------------------+------------------+
| prependable bytes | readable bytes | writable bytes |
| | (CONTENT) | |
+-------------------+------------------+------------------+
| | | |
0 <= readerIndex <= writerIndex <= size
  1. prependable bytes:长度字段固定8字节
  2. readable bytes:可读缓冲区
  3. writable bytes:可写缓冲区
1
2
3
4
5
6
7
8
9
10
11
12
static const size_t kCheapPrepend = 8;
static const size_t kInitialSize = 1024;

explicit Buffer(size_t initialSize = kInitialSize)
: buffer_(kCheapPrepend + initialSize),
readerIndex_(kCheapPrepend),
writerIndex_(kCheapPrepend)
{
assert(readableBytes() == 0);
assert(writableBytes() == initialSize);
assert(prependableBytes() == kCheapPrepend);
}

在buffer初始化阶段,buffer的默认大小是8+1024,将可读指针和可写指针移动到同一个位置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
//NOTE 用最少的 read 系统调用次数,读取尽可能多的数据,以减少用户态和内核态之间的切换开销
ssize_t Buffer::readFd(int fd, int* savedErrno)
{
char extrabuf[65536];
struct iovec vec[2];
// writableBytes() 返回当前 inputBuffer_ 内部 std::vector<char> 中 writerIndex_
// 之后剩余的空闲空间大小
const size_t writable = writableBytes();
// 第一块缓冲区:指向 inputBuffer_ 内部的可写空间 (begin() + writerIndex_),长度为
// writableBytes()。
vec[0].iov_base = begin() + writerIndex_;
vec[0].iov_len = writable;
// 第二块缓冲区:指向一个在栈上临时分配的、大小为 64KB 的备用缓冲区 (extrabuf)。
vec[1].iov_base = extrabuf;
vec[1].iov_len = sizeof extrabuf;
// 如果 inputBuffer_ 内部的可写空间足够大,则只使用第一块缓冲区,否则会同时使用第二块缓冲区
const int iovcnt = (writable < sizeof extrabuf) ? 2 : 1;
// readv 会尝试一次性把 socket 接收缓冲区的数据同时读到这两块内存中。
const ssize_t n = sockets::readv(fd, vec, iovcnt);
if (n < 0)
{
*savedErrno = errno;
}
else if (implicit_cast<size_t>(n) <= writable)
{
writerIndex_ += n;
}
else
{
writerIndex_ = buffer_.size();
append(extrabuf, n - writable);
}
return n;
}

在从缓冲区中读数据时,使用readv函数,这个函数可以配置两个缓冲区,当第一个缓冲区写满时,会写到第二个缓冲区。

写入数据处理,以HttpResponse::appendToBuffer为例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
void HttpResponse::appendToBuffer(Buffer* output) const
{
char buf[32];
snprintf(buf, sizeof buf, "HTTP/1.1 %d ", statusCode_);
output->append(buf);
output->append(statusMessage_);
output->append("\r\n");

if (closeConnection_)
{
output->append("Connection: close\r\n");
}
else
{
snprintf(buf, sizeof buf, "Content-Length: %zd\r\n", body_.size());
output->append(buf);
output->append("Connection: Keep-Alive\r\n");
}

for (const auto& header : headers_)
{
output->append(header.first);
output->append(": ");
output->append(header.second);
output->append("\r\n");
}

output->append("\r\n");
output->append(body_);
}


void append(const StringPiece& str)
{
append(str.data(), str.size());
}

void append(const char* /*restrict*/ data, size_t len)
{
ensureWritableBytes(len);
std::copy(data, data + len, beginWrite());
hasWritten(len);
}

//关键时makeSpace函数
void ensureWritableBytes(size_t len)
{
if (writableBytes() < len)
{
makeSpace(len);
}
assert(writableBytes() >= len);
}

void makeSpace(size_t len)
{
//如果剩余空间不足,则重新分配内存
if (writableBytes() + prependableBytes() < len + kCheapPrepend)
{
buffer_.resize(writerIndex_ + len);
}
else
{
// 将可读数据从当前位置移动到 buffer 的起始位置,为新数据腾出空间。
assert(kCheapPrepend < readerIndex_);
size_t readable = readableBytes();
// 将可读数据从当前位置移动到 buffer 的起始位置,为新数据腾出空间。
std::copy(begin() + readerIndex_, begin() + writerIndex_, begin() + kCheapPrepend);
readerIndex_ = kCheapPrepend;
writerIndex_ = readerIndex_ + readable;
assert(readable == readableBytes());
}
}

// 返回可写字节数
size_t writableBytes() const
{
return buffer_.size() - writerIndex_;
}
// 返回当前读取位置到buffer起始位置的距离
size_t prependableBytes() const
{
return readerIndex_;
}

muduo将buffer中的内存设计为可移动的,即长度字段固定,但可读和可写缓冲区大小可以调节,原因是这样的,readerIndex_在读取数据的时候是会往右边移动的,readerIndex_和writerIndex之间的可读区域其实是一个滑动窗口,在向右移动的过程中,左边就会空出一部分内存,也就是kCheapPrepend到readerIndex_之间的那片内存,这个是可利用的,所以当发现writerIndex到末尾的内存不够用时,会左边检查空出的那部分内存,如果两个内存加起来够用,就将可读区域的内存往左边移,这样右边就能空出更多内存,这样就能插入数据了。如果内存真的不够,就buffer_.resize(writerIndex_ + len);重新分配内存,通过内存复用避免频繁创建新的内存

二、零拷贝添加长度

在实现网络协议是,通常需要在数据包内容前面添加长度字段,经典的协议为:[4字节长度][消息体]

常规的做法是:先序列化消息体,得到长度 N,然后申请 4+N 的空间,先把长度 N 写进去,再把消息体拷贝进去。这个过程至少需要一次额外的拷贝。

muduo 的做法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
void ProtobufCodecLite::fillEmptyBuffer(muduo::net::Buffer* buf,
const google::protobuf::Message& message)
{
assert(buf->readableBytes() == 0);

buf->append(tag_);

int byte_size = serializeToBuffer(message, buf);

int32_t checkSum = checksum(buf->peek(), static_cast<int>(buf->readableBytes()));
buf->appendInt32(checkSum);
assert(buf->readableBytes() == tag_.size() + byte_size + kChecksumLen);
(void)byte_size;
int32_t len = sockets::hostToNetwork32(static_cast<int32_t>(buf->readableBytes()));
//上面是填入响应数据,prepend是在长度字段填入响应数据长度
buf->prepend(&len, sizeof len);
}
// 将转换成网络字节序之后的4个字节,添加到 Buffer 的最前端
void prepend(const void* /*restrict*/ data, size_t len)
{
assert(len <= prependableBytes());
readerIndex_ -= len;
const char* d = static_cast<const char*>(data);
std::copy(d, d + len, begin() + readerIndex_);
}
  1. 直接在 writerIndex_ 处(writable 区域)序列化消息体。
  2. 得到消息体长度 N 后,利用 prependable 空间,在 readerIndex_ 之前写入4字节的长度 N,然后将 readerIndex_ 向左移动4个字节。
  3. 整个过程没有 memmove 或额外的内存拷贝

三、数据操作零拷贝

当上层逻辑(比如协议解析)需要检查缓冲区中的数据时,它会调用 peek() 方法。这个方法直接返回 begin() + readerIndex_ 的指针,让用户可以直接访问内部存储。

如在Http协议解析的时候:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
if (state_ == kExpectRequestLine)
{
const char* crlf = buf->findCRLF();
if (crlf)
{
ok = processRequestLine(buf->peek(), crlf);
if (ok)
{
request_.setReceiveTime(receiveTime);
buf->retrieveUntil(crlf + 2);
state_ = kExpectHeaders;
}
else
{
hasMore = false;
}
}
else
{
hasMore = false;
}
}


bool HttpContext::processRequestLine(const char* begin, const char* end)
{
bool succeed = false;
const char* start = begin;
const char* space = std::find(start, end, ' ');
if (space != end && request_.setMethod(start, space))
{
start = space+1;
space = std::find(start, end, ' ');
if (space != end)
{
const char* question = std::find(start, space, '?');
if (question != space)
{
request_.setPath(start, question);
request_.setQuery(question, space);
}
else
{
request_.setPath(start, space);
}
start = space+1;
succeed = end-start == 8 && std::equal(start, end-1, "HTTP/1.");
if (succeed)
{
if (*(end-1) == '1')
{
request_.setVersion(HttpRequest::kHttp11);
}
else if (*(end-1) == '0')
{
request_.setVersion(HttpRequest::kHttp10);
}
else
{
succeed = false;
}
}
}
}
return succeed;
}

processRequestLine 在分割出 method, path 等部分后,传递给 HttpRequestset 方法的是一对指向 Buffer 内部内存的 const char* 指针HttpRequestset 方法内部才根据这对指针创建 std::string。这样做的好处是,解析本身是零拷贝的,只有在确认需要存储时才发生一次内存分配。这在性能上通常优于在解析过程中创建多个临时 std::string 对象。

Bufferreadable 区域的数据已经被上层逻辑完全处理或转发后,就会调用 retrieve() 来更新缓冲区的状态,以便后续的内存复用。**利用retrieve移动指针,减少删除和清理任何内存的消耗。**如:buf->retrieveUntil(crlf + 2);,

muduo(4)-TcpConnection

TcpConnection类是用于管理TCP连接的类,它负责管理一个TCP连接的读写事件, 并提供接口来设置读写回调、关闭回调、错误回调等。包含一个EventLoop对象,一个文件描述符,一个事件类型,一个回调函数。

一、数据发送策略

muduoTcpConnection 采用两种发送方式结合的策略。首先尝试直接发送,如果 outputBuffer_ 为空,就直接将数据写入 socket 内核缓冲区,避免了不必要的内存拷贝。当直接发送无法一次性完成时(通常因为内核缓冲区已满),剩余的数据会被存入应用层的 outputBuffer_。此时的发送流程从“主动发送”切换到了“事件驱动”模式。因为在使用outputBuffer时,会调用 channel_->enableWriting();关注写事件,后面通过handleWrite函数处理outputBuffer中的数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
void TcpConnection::sendInLoop(const void* data, size_t len)
{
loop_->assertInLoopThread();
ssize_t nwrote = 0;
size_t remaining = len;
bool faultError = false;
if (state_ == kDisconnected)
{
LOG_WARN << "disconnected, give up writing";
return;
}
// 如果输出缓冲区为空,并且 channel 没有在监听可写事件,尝试直接发送
if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0)
{
nwrote = sockets::write(channel_->fd(), data, len);
if (nwrote >= 0)
{
remaining = len - nwrote;
// 如果数据全部写入,则调用写完成回调
if (remaining == 0 && writeCompleteCallback_)
{
loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this()));
}
}
// 否则就是写入失败
else
{
nwrote = 0;
// 判断是否是缓冲区写满了,如果不是缓冲区写满,则记录错误
if (errno != EWOULDBLOCK)
{
LOG_SYSERR << "TcpConnection::sendInLoop";
if (errno == EPIPE || errno == ECONNRESET)
{
faultError = true;
}
}
}
}
//如果数据没有一次写完,说明缓冲区满了,需要将数据追加到输出队列中
assert(remaining <= len);
if (!faultError && remaining > 0)
{
// 计算已经写入的数据长度
size_t oldLen = outputBuffer_.readableBytes();
// 如果已经写入的和剩余的字符串长度大于等于高水位,并且已经写入的小于最高水位线,则调用高水位线回调
if (oldLen + remaining >= highWaterMark_ && oldLen < highWaterMark_ &&
highWaterMarkCallback_)
{
// 调用高水位线回调,处理高水位线事件
loop_->queueInLoop(
std::bind(highWaterMarkCallback_, shared_from_this(), oldLen + remaining));
}
// 将剩余数据放入 outputBuffer_
outputBuffer_.append(static_cast<const char*>(data) + nwrote, remaining);
// 如果当前没有写事件,则开启写事件
if (!channel_->isWriting())
{
//当 outputBuffer_ 中有数据积压,并且内核的发送缓冲区有可用空间时,EventLoop 会触发 Channel 的可写事件,最终调用 handleWrite()
channel_->enableWriting();
}
}
}

void TcpConnection::handleWrite()
{
loop_->assertInLoopThread();
if (channel_->isWriting())
{
// 将 outputBuffer_ 中的数据写入到 socket 中
ssize_t n =
sockets::write(channel_->fd(), outputBuffer_.peek(), outputBuffer_.readableBytes());
if (n > 0)
{
// 从 outputBuffer_ 中移除已经写入的数据
outputBuffer_.retrieve(n);
// 如果 outputBuffer_ 中没有数据了,则关闭写事件
if (outputBuffer_.readableBytes() == 0)
{
channel_->disableWriting();
// 如果写完成回调不为空,则调用写完成回调
if (writeCompleteCallback_)
{
loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this()));
}
// 如果连接状态为 kDisconnecting,则关闭连接
if (state_ == kDisconnecting)
{
shutdownInLoop();
}
}
}
else
{
LOG_SYSERR << "TcpConnection::handleWrite";
// if (state_ == kDisconnecting)
// {
// shutdownInLoop();
// }
}
}
else
{
LOG_TRACE << "Connection fd = " << channel_->fd() << " is down, no more writing";
}
}

在数据存入 outputBuffer_ 时,会检查缓冲区的水位。如果“已有数据 + 新增数据”的总量超过了设定的高水位线 highWaterMark_,就会触发 highWaterMarkCallback_ 回调,但是这个回调仅仅是通知muduo 库本身不会擅自主张的丢弃数据,还是需要存入outputBuffer中,确保数据的可靠发送。具体的流量控制,需要在highWaterMarkCallback_中配置。

值得注意的是,moduo库的epoll并没有使用ET模式,而是使用LT模式,在触发回调执行handlewrite函数时,并没有使用循环,而是只发送一次,然后在全部发送后调用channel_->disableWriting();取消写事件,如果outputbuffer一次性没有写完,epoll仍会关注这个可写事件,因为没有调用disableWriting,所以会在下一次epoll->wait函数的时候再次触发,知道数据全部写完并调用disableWriting,这样设计的原因是通过 EventLoop 实现了公平调度,避免了单个连接长时间霸占 I/O 线程,导致系统“卡死”

二、Boost::any

boost::any context_ 的作用是:允许用户将任意类型的、自定义的数据附加到一个 TcpConnection 对象上,作为一个与该连接绑定的“上下文”或“状态管理器”。boost::any (在 C++17 中已被标准化为 std::any) 是一个可以持有任意类型单个值的类型安全容器。你可以把它想象成一个“万能盒子”,什么都能装,但在取出来的时候,你必须明确知道里面装的是什么类型,否则会抛出异常。这比使用不安全的 void* 指针要好得多

muduo 作为一个通用的网络库,它只负责管理 TCP 连接、收发字节流这些底层事务。它完全不知道上层的业务逻辑是什么。

  • 对于一个 HTTP 服务器,每个连接可能需要维护一个 HttpContext 对象,用来解析 HTTP 请求的状态。
  • 对于一个 RPC 服务器,每个连接可能需要维护一个 RpcChannel 对象,用来处理 RPC 调用。
  • 对于一个游戏服务器,每个连接可能需要关联一个 PlayerSession 对象,来存储玩家信息。

如果 muduo 要为每一种应用都去修改 TcpConnection 类,添加 HttpContext* httpContext_Player* player_ 这样的成员,那这个库就失去了通用性。

context_ 就是为了解决这个问题而生的。它提供了一个统一的、非侵入式的接口,让用户可以把自己的业务对象“挂”在 TcpConnection 上。

在 HTTP 服务器中 (HttpServer.cc):

当一个新的 TCP 连接建立时,HttpServer 会创建一个 HttpContext 对象,并通过 conn->setContext() 将它存入连接的 context_ 中。在后续的 onMessage 回调中,服务器会通过 conn->getMutableContext() 取出这个 HttpContext 对象,用它来持续解析同一个连接上发来的数据流。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
void HttpServer::onConnection(const TcpConnectionPtr& conn)
{
if (conn->connected())
{
// 新连接建立时,创建一个 HttpContext 并附加到连接上
conn->setContext(HttpContext());
}
}

void HttpServer::onMessage(const TcpConnectionPtr& conn,
Buffer* buf,
Timestamp receiveTime)
{
// 从连接中取出之前存入的 HttpContext
HttpContext* context = boost::any_cast<HttpContext>(conn->getMutableContext());

if (!context->parseRequest(buf, receiveTime))
{
// ...
}
// ...
}

在 RPC 服务器中 (RpcServer.cc):

同样,当新连接建立时,RpcServer 会创建一个 RpcChannel 对象,并将其存入 context_

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void RpcServer::onConnection(const TcpConnectionPtr& conn)
{
// ...
if (conn->connected())
{
RpcChannelPtr channel(new RpcChannel(conn));
channel->setServices(&services_);
conn->setMessageCallback(
std::bind(&RpcChannel::onMessage, get_pointer(channel), _1, _2, _3));
// 将 RpcChannelPtr 附加到连接上
conn->setContext(channel);
}
// ...
}

TcpConnection 类不需要知道任何关于上层业务(HTTP, RPC, Game…)的细节。它只负责提供一个“插座”。用户不需要为了添加自定义状态而去继承 TcpConnection 或修改库源码。使用 boost::any_cast 来获取数据,如果在运行时类型不匹配,会抛出异常,这比使用 void* 进行不安全的 static_cast 要健壮得多

具体使用示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
#include "muduo/base/Logging.h"
#include "muduo/net/EventLoop.h"
#include "muduo/net/http/HttpRequest.h"
#include "muduo/net/http/HttpResponse.h"
#include "muduo/net/http/HttpServer.h"

#include <iostream>
#include <map>

using namespace muduo;
using namespace muduo::net;

extern char favicon[555];
bool benchmark = false;

void onRequest(const HttpRequest& req, HttpResponse* resp)
{
std::cout << "Headers " << req.methodString() << " " << req.path() << std::endl;
if (!benchmark)
{
const std::map<string, string>& headers = req.headers();
for (const auto& header : headers)
{
std::cout << header.first << ": " << header.second << std::endl;
}
}

if (req.path() == "/")
{
resp->setStatusCode(HttpResponse::k200Ok);
resp->setStatusMessage("OK");
resp->setContentType("text/html");
resp->addHeader("Server", "Muduo");
string now = Timestamp::now().toFormattedString();
resp->setBody("<html><head><title>This is title</title></head>"
"<body><h1>Hello</h1>Now is " +
now + "</body></html>");
}
else if (req.path() == "/favicon.ico")
{
resp->setStatusCode(HttpResponse::k200Ok);
resp->setStatusMessage("OK");
resp->setContentType("image/png");
resp->setBody(string(favicon, sizeof favicon));
}
else if (req.path() == "/hello")
{
resp->setStatusCode(HttpResponse::k200Ok);
resp->setStatusMessage("OK");
resp->setContentType("text/plain");
resp->addHeader("Server", "Muduo");
resp->setBody("hello, world!\n");
}
else
{
resp->setStatusCode(HttpResponse::k404NotFound);
resp->setStatusMessage("Not Found");
resp->setCloseConnection(true);
}
}

int main(int argc, char* argv[])
{
int numThreads = 0;
if (argc > 1)
{
benchmark = true;
Logger::setLogLevel(Logger::WARN);
numThreads = atoi(argv[1]);
}
EventLoop loop;
HttpServer server(&loop, InetAddress(8000), "dummy");
server.setHttpCallback(onRequest);
server.setThreadNum(numThreads);
server.start();
loop.loop();
}

char favicon[555] = {
'\x89', 'P', 'N', 'G', '\xD', '\xA', '\x1A', '\xA', '\x0', '\x0', '\x0', '\xD',
'I', 'H', 'D', 'R', '\x0', '\x0', '\x0', '\x10', '\x0', '\x0', '\x0', '\x10',
'\x8', '\x6', '\x0', '\x0', '\x0', '\x1F', '\xF3', '\xFF', 'a', '\x0', '\x0', '\x0',
'\x19', 't', 'E', 'X', 't', 'S', 'o', 'f', 't', 'w', 'a', 'r',
'e', '\x0', 'A', 'd', 'o', 'b', 'e', '\x20', 'I', 'm', 'a', 'g',
'e', 'R', 'e', 'a', 'd', 'y', 'q', '\xC9', 'e', '\x3C', '\x0', '\x0',
'\x1', '\xCD', 'I', 'D', 'A', 'T', 'x', '\xDA', '\x94', '\x93', '9', 'H',
'\x3', 'A', '\x14', '\x86', '\xFF', '\x5D', 'b', '\xA7', '\x4', 'R', '\xC4', 'm',
'\x22', '\x1E', '\xA0', 'F', '\x24', '\x8', '\x16', '\x16', 'v', '\xA', '6', '\xBA',
'J', '\x9A', '\x80', '\x8', 'A', '\xB4', 'q', '\x85', 'X', '\x89', 'G', '\xB0',
'I', '\xA9', 'Q', '\x24', '\xCD', '\xA6', '\x8', '\xA4', 'H', 'c', '\x91', 'B',
'\xB', '\xAF', 'V', '\xC1', 'F', '\xB4', '\x15', '\xCF', '\x22', 'X', '\x98', '\xB',
'T', 'H', '\x8A', 'd', '\x93', '\x8D', '\xFB', 'F', 'g', '\xC9', '\x1A', '\x14',
'\x7D', '\xF0', 'f', 'v', 'f', '\xDF', '\x7C', '\xEF', '\xE7', 'g', 'F', '\xA8',
'\xD5', 'j', 'H', '\x24', '\x12', '\x2A', '\x0', '\x5', '\xBF', 'G', '\xD4', '\xEF',
'\xF7', '\x2F', '6', '\xEC', '\x12', '\x20', '\x1E', '\x8F', '\xD7', '\xAA', '\xD5', '\xEA',
'\xAF', 'I', '5', 'F', '\xAA', 'T', '\x5F', '\x9F', '\x22', 'A', '\x2A', '\x95',
'\xA', '\x83', '\xE5', 'r', '9', 'd', '\xB3', 'Y', '\x96', '\x99', 'L', '\x6',
'\xE9', 't', '\x9A', '\x25', '\x85', '\x2C', '\xCB', 'T', '\xA7', '\xC4', 'b', '1',
'\xB5', '\x5E', '\x0', '\x3', 'h', '\x9A', '\xC6', '\x16', '\x82', '\x20', 'X', 'R',
'\x14', 'E', '6', 'S', '\x94', '\xCB', 'e', 'x', '\xBD', '\x5E', '\xAA', 'U',
'T', '\x23', 'L', '\xC0', '\xE0', '\xE2', '\xC1', '\x8F', '\x0', '\x9E', '\xBC', '\x9',
'A', '\x7C', '\x3E', '\x1F', '\x83', 'D', '\x22', '\x11', '\xD5', 'T', '\x40', '\x3F',
'8', '\x80', 'w', '\xE5', '3', '\x7', '\xB8', '\x5C', '\x2E', 'H', '\x92', '\x4',
'\x87', '\xC3', '\x81', '\x40', '\x20', '\x40', 'g', '\x98', '\xE9', '6', '\x1A', '\xA6',
'g', '\x15', '\x4', '\xE3', '\xD7', '\xC8', '\xBD', '\x15', '\xE1', 'i', '\xB7', 'C',
'\xAB', '\xEA', 'x', '\x2F', 'j', 'X', '\x92', '\xBB', '\x18', '\x20', '\x9F', '\xCF',
'3', '\xC3', '\xB8', '\xE9', 'N', '\xA7', '\xD3', 'l', 'J', '\x0', 'i', '6',
'\x7C', '\x8E', '\xE1', '\xFE', 'V', '\x84', '\xE7', '\x3C', '\x9F', 'r', '\x2B', '\x3A',
'B', '\x7B', '7', 'f', 'w', '\xAE', '\x8E', '\xE', '\xF3', '\xBD', 'R', '\xA9',
'd', '\x2', 'B', '\xAF', '\x85', '2', 'f', 'F', '\xBA', '\xC', '\xD9', '\x9F',
'\x1D', '\x9A', 'l', '\x22', '\xE6', '\xC7', '\x3A', '\x2C', '\x80', '\xEF', '\xC1', '\x15',
'\x90', '\x7', '\x93', '\xA2', '\x28', '\xA0', 'S', 'j', '\xB1', '\xB8', '\xDF', '\x29',
'5', 'C', '\xE', '\x3F', 'X', '\xFC', '\x98', '\xDA', 'y', 'j', 'P', '\x40',
'\x0', '\x87', '\xAE', '\x1B', '\x17', 'B', '\xB4', '\x3A', '\x3F', '\xBE', 'y', '\xC7',
'\xA', '\x26', '\xB6', '\xEE', '\xD9', '\x9A', '\x60', '\x14', '\x93', '\xDB', '\x8F', '\xD',
'\xA', '\x2E', '\xE9', '\x23', '\x95', '\x29', 'X', '\x0', '\x27', '\xEB', 'n', 'V',
'p', '\xBC', '\xD6', '\xCB', '\xD6', 'G', '\xAB', '\x3D', 'l', '\x7D', '\xB8', '\xD2',
'\xDD', '\xA0', '\x60', '\x83', '\xBA', '\xEF', '\x5F', '\xA4', '\xEA', '\xCC', '\x2', 'N',
'\xAE', '\x5E', 'p', '\x1A', '\xEC', '\xB3', '\x40', '9', '\xAC', '\xFE', '\xF2', '\x91',
'\x89', 'g', '\x91', '\x85', '\x21', '\xA8', '\x87', '\xB7', 'X', '\x7E', '\x7E', '\x85',
'\xBB', '\xCD', 'N', 'N', 'b', 't', '\x40', '\xFA', '\x93', '\x89', '\xEC', '\x1E',
'\xEC', '\x86', '\x2', 'H', '\x26', '\x93', '\xD0', 'u', '\x1D', '\x7F', '\x9', '2',
'\x95', '\xBF', '\x1F', '\xDB', '\xD7', 'c', '\x8A', '\x1A', '\xF7', '\x5C', '\xC1', '\xFF',
'\x22', 'J', '\xC3', '\x87', '\x0', '\x3', '\x0', 'K', '\xBB', '\xF8', '\xD6', '\x2A',
'v', '\x98', 'I', '\x0', '\x0', '\x0', '\x0', 'I', 'E', 'N', 'D', '\xAE',
'B', '\x60', '\x82',
};

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
HttpServer::HttpServer(EventLoop* loop,
const InetAddress& listenAddr,
const string& name,
TcpServer::Option option)
: server_(loop, listenAddr, name, option),
httpCallback_(detail::defaultHttpCallback)
{
server_.setConnectionCallback(
std::bind(&HttpServer::onConnection, this, _1));
server_.setMessageCallback(
std::bind(&HttpServer::onMessage, this, _1, _2, _3));
}

void HttpServer::start()
{
LOG_WARN << "HttpServer[" << server_.name()
<< "] starts listening on " << server_.ipPort();
server_.start();
}

void HttpServer::onConnection(const TcpConnectionPtr& conn)
{
if (conn->connected())
{
conn->setContext(HttpContext());
}
}

void HttpServer::onMessage(const TcpConnectionPtr& conn,
Buffer* buf,
Timestamp receiveTime)
{
HttpContext* context = boost::any_cast<HttpContext>(conn->getMutableContext());

if (!context->parseRequest(buf, receiveTime))
{
conn->send("HTTP/1.1 400 Bad Request\r\n\r\n");
conn->shutdown();
}

if (context->gotAll())
{
onRequest(conn, context->request());
context->reset();
}
}

void HttpServer::onRequest(const TcpConnectionPtr& conn, const HttpRequest& req)
{
const string& connection = req.getHeader("Connection");
bool close = connection == "close" ||
(req.getVersion() == HttpRequest::kHttp10 && connection != "Keep-Alive");
HttpResponse response(close);
httpCallback_(req, &response);
Buffer buf;
response.appendToBuffer(&buf);
conn->send(&buf);
if (response.closeConnection())
{
conn->shutdown();
}
}


  1. 在HttpServer初始化的时候,在TcpServer中设置了自定义的onConnection和onMessage的回调函数,在onConnection中设置了 conn>setContext(HttpContext());上下文对象,在onMessage中,设置了自定义的解析函数

  2. 在新连接建立后,会触发设置好的onConnection函数

    1
    2
    3
    4
    5
    6
    7
    void HttpServer::onConnection(const TcpConnectionPtr& conn)
    {
    if (conn->connected())
    {
    conn->setContext(HttpContext());
    }
    }

    将httpContext上下文设置到TcpConnection的context中

  3. 在请求消息到来时,会触发epoll树上的读事件

  4. 这个读事件会触发设置好的读回调,也就是TcpConnection::handleRead(Timestamp receiveTime)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    {
    loop_->assertInLoopThread();
    int savedErrno = 0;
    ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);
    if (n > 0)
    {
    messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);//如果正确读取数据,会调用我们设置的自定义messageCallback_
    }
    else if (n == 0)
    {
    handleClose();
    }
    else
    {
    errno = savedErrno;
    LOG_SYSERR << "TcpConnection::handleRead";
    handleError();
    }
    }

    如果正确读取到数据,会调用预先设置到的messageCallback_,将读取到的内容通过buf传进来

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    void HttpServer::onMessage(const TcpConnectionPtr& conn,
    Buffer* buf,
    Timestamp receiveTime)
    {
    HttpContext* context = boost::any_cast<HttpContext>(conn->getMutableContext());

    if (!context->parseRequest(buf, receiveTime))
    {
    conn->send("HTTP/1.1 400 Bad Request\r\n\r\n");
    conn->shutdown();
    }

    if (context->gotAll())
    {
    onRequest(conn, context->request());
    context->reset();
    }
    }

    然后获取实现设置好的context对象,解析http请求,然后调用onRequest函数,调用httpCallback_函数返回数据。

有没有好奇一个问题,那就是在onConnection中传入一个context,然后在onMessage中取出context,但是你会发现,在TcpConnection中,没有对context做任何操作,那为什么还需要传入context呢?buf和receiveTime直接传过来,然后Context使用局部变量不就好了吗?

  **原因是TCP是一个流式传输协议**

在发送数据过来是,TCP 并不保证你每次收到的数据都恰好是一个完整的应用层消息(比如一个完整的 HTTP 请求),一个完整的 HTTP 请求可能会被拆分成多个 TCP 包进行传输。这意味着 `onMessage` 回调可能会被触发多次,每次 `buf` 里只包含请求的一部分,例如:

1. 第一次 `onMessage`,`buf` 里是 `"GET /index.html HTTP/1.1\r\n"`
2. 第二次 `onMessage`,`buf` 里是 `"Host: www.example.com\r\n"`
3. 第三次 `onMessage`,`buf` 里是 `"\r\n"`

为了效率,TCP 也可能将多个小的数据包合并在一起发送。在 HTTP 的 `Keep-Alive` 模式下,客户端可能会连续发送多个请求。这意味着你的一次 `onMessage` 回调中,`buf` 里可能包含一个半、甚至两个或更多的 HTTP 请求。
如果`onMessage` 每次都创建一个新的、局部的 `HttpContext` 对象:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 假设使用局部变量
void HttpServer::onMessage(const TcpConnectionPtr& conn,
Buffer* buf,
Timestamp receiveTime)
{
HttpContext local_context; // 每次都创建一个新的 context

// 场景1:请求被拆分
// 第一次回调,buf 里只有 "GET /index.html...",不完整。
// parseRequest 会解析一部分,然后返回。local_context 随函数结束而被销毁。
// 第二次回调,buf 里是剩下的部分 "Host: ...\r\n\r\n"。
// 此时又创建了一个全新的 local_context,它完全不知道之前已经解析过请求行了。
// 它会尝试把 "Host: ..." 当作一个新的请求行来解析,这必然会导致解析失败。
local_context.parseRequest(buf, receiveTime);

if (local_context.gotAll()) {
// ...
}
}
**局部变量是无状态的**。它无法“记住”上一次 `onMessage` 回调时解析到了哪里。 `context` 的真正作用:为每个连接维持状态,内部维护一个解析的状态机,记录着请求的解析状态。当一个不完整的 HTTP 请求到达时,`HttpContext` 会解析它所能解析的部分,并记录下当前的状态。当这个连接的下一个数据包到达时,`onMessage` 通过 `conn->getMutableContext()` 获取到的是**同一个 `HttpContext` 实例**,然后继续从上次的状态开始解析。 `HttpServer` 可能同时处理成千上万个连接,每个连接的 HTTP 请求解析进度都不同。`context` 机制确保了每个连接的解析状态都独立存储,互不干扰。所以看起来传进去啥都没干,实际上是确保了数据的正确解析,将TcpConnecton和应用层解析状态绑定到了一起

muduo(3)-Channel

Channel类是用于管理文件描述符的类,它负责管理一个文件描述符的读写事件,*并提供接口来设置读写回调、关闭回调、错误回调等。包含一个EventLoop对象,一个文件描述符,一个事件类型,一个回调函数。是TcpConnection和EventLoop交流的桥梁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class Channel : noncopyable
{
private:
static string eventsToString(int fd, int ev);

void update();
void handleEventWithGuard(Timestamp receiveTime);

static const int kNoneEvent; // 表示不关注任何事件,即不关注读写事件
static const int kReadEvent; // 表示关注读事件,即POLLIN | POLLPRI
static const int kWriteEvent; // 表示关注写事件,即POLLOUT

EventLoop* loop_; // 指向所属的 EventLoop
const int fd_; // 封装的文件描述符
int events_; // 感兴趣的事件类型
int revents_; // 实际发生的事件类型,Channel::handleEvent 方法会根据 revents_ 的值来决定调用哪个具体的回调函数。
int index_; // 记录该 Channel 在 Poller 中的状态,避免了不必要的系统调用
bool logHup_; // 是否记录HUP事件日志,连接断开时会触发HUP事件

std::weak_ptr<void> tie_;
bool tied_;
bool eventHandling_;//一个状态标志,当 Channel 正在执行 handleEvent 方法时,这个值设置为 true,结束时设为false,防止在事件处理过程中发生重入
bool addedToLoop_;//一个状态标志,表示这个 `Channel` 是否已经被添加到 `EventLoop` 的 `Poller` 中
ReadEventCallback readCallback_;
EventCallback writeCallback_;
EventCallback closeCallback_;
EventCallback errorCallback_;
};

index_

​ 通过index_实现一种状态记录机制,这种机制决定着poller中的行为,以epoll为例的话,就是决定ADD,DELETE,MOD等操作,如果没有这个index_的话,在每一个事件触发时,epoll不知道要执行什么操作,他不知道这个fd是否在epoll树上。在查看muduo如何实现之前,我们看一下下面的实现,存在哪些问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
void TcpServer::run()
{
this->m_thread_pool->run();
auto *channel = new Channel(this->m_lfd, FDEvent::ReadEvent, TcpServer::accept_connection,
nullptr, nullptr, this);
this->m_main_loop->add_task(channel, ElementType::ADD);//将监听描述符放到epoll树上
this->m_main_loop->run();
}

struct ChannelElement
{
// 处理节点的类型,ADD,DELETE,MODIFY
ElementType type;
Channel *channel;
};

int EventLoop::add_task(Channel *channel, const ElementType type)
{
this->m_mutex.lock();
auto *node = new class ChannelElement();//包装一个任务
node->channel = channel;
node->type = type;
this->m_task_q.push(node);
this->m_mutex.unlock();

if (this->m_thread_id == std::this_thread::get_id())
{
std::cout << "\n--------------子线程进入(evLoop->threadId == pthread_self()),threadName = "<< this->m_thread_name
+ ", threadID = " << this->m_thread_id << std::endl;
process_taskQ();
} else
{
printf("\n----------主线程进入task_wake_up,threadID = %lu\n\n", pthread_self());
task_wake_up();
}
return 0;
}

void EventLoop::process_taskQ()
{

while (!this->m_task_q.empty())
{
this->m_mutex.lock();
const auto node = m_task_q.front();
m_task_q.pop();
this->m_mutex.unlock();
if (node->type == ElementType::ADD)//根据任务类型中的定义匹配对应的实现
{
add(node->channel);
} else if (node->type == ElementType::DELETE)
{
remove(node->channel);
} else
{
modify(node->channel);
}
delete node;
}
}

上面这个例子存一个问题,那就是效率低,每次添加一个任务,都需要执行一次:new ChannelElement(),m_task_q.push(node)和m_task_q.pop();,同时还要获取互斥锁。是一种命令机制

而muduo的做法是一种状态机制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
void EPollPoller::updateChannel(Channel* channel)
{
Poller::assertInLoopThread();
const int index = channel->index();
LOG_TRACE << "fd = " << channel->fd()
<< " events = " << channel->events() << " index = " << index;

if (index == kNew || index == kDeleted)
{
// a new one, add with EPOLL_CTL_ADD
int fd = channel->fd();
if (index == kNew)
{
assert(channels_.find(fd) == channels_.end());
//如果是新的,将它添加到 Poller 的 map 中进行管理
channels_[fd] = channel;
}
else // index == kDeleted
{
assert(channels_.find(fd) != channels_.end());
//原本就存在于 map 中,只不过之前被停用了,现在重新激活
assert(channels_[fd] == channel);
}
// 更新状态为“已添加”,并调用 epoll_ctl(ADD) 将其加入内核监听
channel->set_index(kAdded);
update(EPOLL_CTL_ADD, channel);
}
else// 否则就是KAdded状态,即已经关注了该事件
{
// 如果 Channel 不再关心任何事件
int fd = channel->fd();
(void)fd;
assert(channels_.find(fd) != channels_.end());
assert(channels_[fd] == channel);
assert(index == kAdded);
// 如果该事件不关注了,调用 epoll_ctl(DEL) 从内核监听中移除
if (channel->isNoneEvent())
{
update(EPOLL_CTL_DEL, channel);
channel->set_index(kDeleted);
}
else
{
// Channel 关心的事件类型发生了变化
update(EPOLL_CTL_MOD, channel);
}
}
}

通过一个标志位index_记录channel的处理状态,不需要每次都new 出一个类似的ChannelElement,即使是push到activeChannels,其速度也是非常快的,因为push的是一个本就存在的指针

对比 activeChannels.push_back(channel) task_queue.push(new ...)
操作对象 拷贝一个裸指针 拷贝一个裸指针
内存管理 (操作的是已存在的对象) (必须先 new,后 delete)
性能开销 极低 (一次指针拷贝) (一次堆分配 + 一次堆释放)

tie_和tied_

解决 TcpConnection 的生命周期安全问题TcpConnection 是通过 std::shared_ptr 管理的, Channel 内部的回调函数如果直接捕获 shared_ptr 会导致循环引用。tie_ 用于保存一个指向 TcpConnectionweak_ptr。在 handleEvent 执行回调前,会尝试将 weak_ptr提升为 shared_ptr。如果提升成功,说明 TcpConnection 对象还活着(通过 tie_.lock() 在事件处理期间”锁住”对象,确保其不会被销毁),就安全地执行回调;如果失败,说明 TcpConnection 已经被销毁了,就不再执行回调,从而避免了对悬空指针的访问。tied_ 只是一个标志,表示是否启用了这个机制。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
//1
void TcpConnection::connectEstablished()
{
// ...
channel_->tie(shared_from_this()); // 将TcpConnection的shared_ptr绑定到Channel
// ...
}
//2
void Channel::tie(const std::shared_ptr<void>& obj)
{
tie_ = obj; // 以weak_ptr形式存储,不增加引用计数
tied_ = true; // 标记已绑定
}
//3
void Channel::handleEvent(Timestamp receiveTime)
{
std::shared_ptr<void> guard;
if (tied_)
{
guard = tie_.lock(); // 尝试将weak_ptr转换为shared_ptr
if (guard)
{
handleEventWithGuard(receiveTime); // 有guard保护下处理事件
}
// 如果guard为空,说明对象已销毁,直接跳过处理
}
else
{
handleEventWithGuard(receiveTime); // 未绑定时直接处理
}
}


shared_from_this():用于在对象内部安全地获取指向自身的 shared_ptr,这里为什么不能直接使用 this 指针呢?

1
2
3
4
5
6
7
8
9
10
11
class BadExample {
public:
std::shared_ptr<BadExample> getSharedPtr() {
return std::shared_ptr<BadExample>(this);
}
};

// 使用时会出现问题
auto ptr1 = std::make_shared<BadExample>();
auto ptr2 = ptr1->getSharedPtr(); // 创建了两个独立的控制块!
// 当 ptr1 和 ptr2 都析构时,对象会被删除两次 -> 崩溃

TcpConnection 通过继承 std::enable_shared_from_this 来解决这个问题。

muduo(2)-Acceptor

Acceptor类是用于接受新连接的类,它负责监听一个端口,当有新的连接到来时,它负责接受这个连接,并调用回调函数处理这个连接。包含一个Socket对象,一个Channel对象,一个NewConnectionCallback对象。Socket 对底层的socket文件描述符(sockfd)进行了面向对象的封装,提供了 bind, listen, accept等接口。

一、类的初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
Acceptor::Acceptor(EventLoop* loop, const InetAddress& listenAddr, bool reuseport)
: loop_(loop),
acceptSocket_(sockets::createNonblockingOrDie(listenAddr.family())), // 1. 创建Socket
acceptChannel_(loop, acceptSocket_.fd()), // 创建Channel
listening_(false),
idleFd_(::open("/dev/null",
O_RDONLY |
O_CLOEXEC)) // 预留一个fd,当accept失败时,释放预留的fd,并重新接受新连接
{
assert(idleFd_ >= 0);
acceptSocket_.setReuseAddr(true);
acceptSocket_.setReusePort(reuseport); // 设置端口复用
acceptSocket_.bindAddress(listenAddr); // 绑定地址
// 为 acceptChannel_ 设置一个读回调函数 Acceptor::handleRead。当有新连接到达时,EventLoop
// 会调用这个函数,handleRead 内部会调用 accept() 来接受连接。
acceptChannel_.setReadCallback(std::bind(&Acceptor::handleRead, this));
}

// 当有新连接到来时,epoll会返回监听描述符,然后EventLoop会通过之前设置好的Channel来处理这个监听描述符,会调用channel的回调函数,也就是这里的handleRead进行处理,执行accept,并调用TcpServer中的newConnetion函数。
void Acceptor::handleRead()
{
loop_->assertInLoopThread();
InetAddress peerAddr;
int connfd = acceptSocket_.accept(&peerAddr);
if (connfd >= 0)
{
// 在TcpServer的构造函数中设置好了newConnectionCallback_,当有新连接到来时,会调用TcpServer的newConnection函数
if (newConnectionCallback_)
{
newConnectionCallback_(connfd, peerAddr);
}
else
{
sockets::close(connfd);
}
}
else
{
if (errno == EMFILE)
{
::close(idleFd_); // 1. 释放预留的fd
idleFd_ = ::accept(acceptSocket_.fd(), NULL, NULL); // 2. 接受新连接
::close(idleFd_); // 3. 立即关闭它
idleFd_ = ::open("/dev/null", O_RDONLY | O_CLOEXEC); // 4. 重新打开一个fd
}
}
}

//Acceptor建立新连接时,调用TcpServer的newConnection函数,将新连接封装成一个TcpConnectionPtr
void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr)
{
loop_->assertInLoopThread();
EventLoop* ioLoop = threadPool_->getNextLoop();//获取一个EventLoop线程,将TcpConnection对象放入该线程中
char buf[64];
snprintf(buf, sizeof buf, "-%s#%d", ipPort_.c_str(), nextConnId_);
++nextConnId_;
string connName = name_ + buf;

InetAddress localAddr(sockets::getLocalAddr(sockfd));
TcpConnectionPtr conn(new TcpConnection(ioLoop, connName, sockfd, localAddr, peerAddr));
connections_[connName] = conn;
//设置TcpConnection的回调函数
conn->setConnectionCallback(connectionCallback_);// 连接回调
conn->setMessageCallback(messageCallback_);// 消息回调
conn->setWriteCompleteCallback(writeCompleteCallback_);// 写完成回调
conn->setCloseCallback(std::bind(&TcpServer::removeConnection, this, _1)); // 关闭回调
ioLoop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn));//将TcpConnection对象的连接建立事件放入EventLoop线程中执行
}

宏观上看,Acceptor这个类并不复杂,主要的任务就是负责初始化监听描述符,并设置监听描述符的回调函数handleRead,在新连接到来时,Acceptor会调用Acceptor会调用headleRead建立连接,并调用newConnection回调函数。

1
2
3
4
5
6
7
if (errno == EMFILE)
{
::close(idleFd_); // 1. 释放预留的fd
idleFd_ = ::accept(acceptSocket_.fd(), NULL, NULL); // 2. 接受新连接
::close(idleFd_); // 3. 立即关闭它
idleFd_ = ::open("/dev/null", O_RDONLY | O_CLOEXEC); // 4. 重新打开一个fd
}

当服务器并发连接数非常高时,可能会耗尽进程可用的文件描述符(fd)。这时,accept() 会失败并返回 EMFILE 错误。如果不处理,服务器将无法接受任何新连接,相当于“假死”。

muduo 的解决方案:

  1. Acceptor 在构造时,就预先打开一个指向 /dev/null 的文件描述符 idleFd_。它就像一个“备用座位”。
  2. acceptEMFILE 失败时,Acceptor立即关闭这个备用的 idleFd_,从而释放出一个文件描述符名额。
  3. 有了这个名额,Acceptor 就能成功 accept() 那个等待中的新连接。
  4. 为了避免新连接因为没有被处理而丢失,Acceptor 会立即 close() 这个刚刚接受的连接。这虽然拒绝了客户端,但保证了服务器自身不会卡死,并且向客户端发出了一个明确的拒绝信号(RST),客户端可以稍后重试。
  5. 最后,Acceptor 会再次打开 /dev/null 来重新占用 idleFd_,为下一次 EMFILE 危机做好准备。

这个技巧确保了即使在 fd 耗尽的极端情况下,Acceptor 所在的 EventLoop 也能正常运转,不会因为 accept 不断失败而陷入死循环

SO_RESUCEADDR和SO_RESUCEPORT的区别

特性 SO_REUSEADDR (地址复用) SO_REUSEPORT (端口复用)
核心目的 服务器快速重启 性能扩展 (负载均衡)
解决问题 允许新启动的服务器立即绑定一个处于 TIME_WAIT 状态的端口 允许多个独立的监听套接字绑定到完全相同的 IP 和端口。
工作模式 一个端口在同一时间仍然只能被一个监听套接字绑定。 一个端口可以被多个监听套接字同时绑定。
适用场景 几乎所有服务器程序都应该开启,用于开发和运维中的快速迭代和重启。 面向高性能、高并发连接的服务器,用于在多核 CPU 上扩展 accept 的处理能力。
在 muduo 中 默认开启 默认关闭,需要显式开启。

在标准的 TcpServer 模型中,只有一个主线程负责 accept 所有新连接,但当服务器面临极高的连接建立速率时就会成为整个系统的性能瓶颈acceptSocket_.setReusePort(reuseport); ,也就是SO_RESUCEPORT ,这个选项允许多个线程或进程创建各自的监听套接字,并全部绑定到同一个 IP 和端口上。当新连接到来时,内核会负责进行负载均衡,将这个连接请求“派发”给其中一个监听套接字。这样一来,accept 的工作就被均匀地分摊到了多个 CPU 核心上。

更多细节,见:https://stackoverflow.com/questions/14388706/how-do-so-reuseaddr-and-so-reuseport-differ

仔细看的话,还能发现建立连接的是一个acceptSocket_,而不是系统的accept函数,这是muduo对系统调用的封装,提高扩展性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
Socket acceptSocket_;
/// 对底层的socket文件描述符(sockfd)进行了面向对象的封装,提供了 bind, listen, accept 等接口。
class Socket : noncopyable
{
public:
explicit Socket(int sockfd) : sockfd_(sockfd)
{
}
~Socket();
int fd() const
{
return sockfd_;
}
bool getTcpInfo(struct tcp_info*) const;
bool getTcpInfoString(char* buf, int len) const;
void bindAddress(const InetAddress& localaddr);
void listen();
int accept(InetAddress* peeraddr);
void shutdownWrite();
void setTcpNoDelay(bool on);
void setReuseAddr(bool on);
void setReusePort(bool on);
void setKeepAlive(bool on);

private:
const int sockfd_;
};
//Socket封装socket,利用RAII思想关闭文件描述符,避免忘记关闭
Socket::~Socket()
{
sockets::close(sockfd_);
}
//Socket
void Socket::listen()
{
sockets::listenOrDie(sockfd_);
}
//sockets
void sockets::listenOrDie(int sockfd)
{
//设置等待连接队列的最大长度(SOMAXCONN)。
int ret = ::listen(sockfd, SOMAXCONN);
if (ret < 0)
{
LOG_SYSFATAL << "sockets::listenOrDie";
}
}

可以看见,Socket内部的实现都放在一个名为sockets的作用域中,设计思想:Socket作为一个抽象层,我们在使用的时候不需要考虑不同平台的调用方式,底层的sockets负责实现阔平台逻辑

二、挂载EventLoop

我们发现,Acceptor没有调用listen函数进行监听,实际上开始监听的操作是在TcpServer的start函数实现的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
void TcpServer::start()
{
if (started_.getAndSet(1) == 0)
{
//启动EventLoop线程池
threadPool_->start(threadInitCallback_);

assert(!acceptor_->listening());
//将 Acceptor::listen() 的调用任务放入EventLoop 的待执行队列中,这确保了所有和 EventLoop 相关的操作都在同一个I/O线程中执行,避免了锁竞争
loop_->runInLoop(std::bind(&Acceptor::listen, get_pointer(acceptor_)));
}
}

void EventLoop::runInLoop(Functor cb)
{
if (isInLoopThread())
{
cb();
}
else
{
queueInLoop(std::move(cb));
}
}

void Acceptor::listen()
{
loop_->assertInLoopThread();
listening_ = true;
// 将socket设置为监听状态,并设置等待连接队列的最大长度(SOMAXCONN)。
acceptSocket_.listen();
// 放到epoll树上,注册可读事件
acceptChannel_.enableReading();
}


在TcpServer::start函数启动后,loop_->runInLoop(std::bind(&Acceptor::listen, get_pointer(acceptor_)));会将Acceptor::listen函数启动开始监听,并将文件描述符放到主线程的epoll树上监听读实现,当事件触发时,会调用文件描述符的读回调,也就是headleRead函数处理连接。。

muduo(1)-TcpServer

TcpServer类是用于管理TCP连接的类,它负责管理多个TcpConnection对象,并提供接口来设置连接回调、消息回调、写完成回调等。包含一个Acceptor对象,一个EventLoopThreadPool对象,一个ConnectionMap对象,Acceptor 负责监听新连接,EventLoopThreadPool 负责管理多个EventLoop线程,ConnectionMap 负责管理多个TcpConnection对象。

一、类的前置声明

不要直接将一个类作为另一个类的类成员,尽量使用指针,然后通过前置声明减少头文件依赖,加快编译速度

1
2
3
class Acceptor;
class EventLoop;
class EventLoopThreadPool;

作用:

  1. 减少头文件依赖:如果在头文件里只用到了指针或引用(比如 EventLoop* loop_;),只需要告诉编译器“有这么个类”,不需要知道它的具体实现。这样可以减少头文件之间的耦合,避免不必要的编译依赖
  2. 加快编译速度: 在C++中,#include 是一个简单粗暴的文本替换操作。如果在一个头文件 A.h#include "B.h",那么任何 #include "A.h" 的源文件 (.cc) 都会被迫把 B.h 的全部内容也包含进来。如果 B.h 文件有任何一丁点改动,所有包含了 A.h 的源文件,以及包含了那些源文件所生成头文件的其他源文件……全都需要重新编译。在一个大型项目中,这可能意味着一次小修改导致数十分钟甚至数小时的编译等待。
  3. 防止循环依赖:如果两个类互相包含对方的头文件,就会导致循环依赖,编译器会报错。前向声明可以避免这种情况。

这种设计是如何工作的:

​ 以std::unique_ptr acceptor_;为例,class Acceptor; 这行代码告诉编译器“有一个类叫Acceptor,你不需要管他怎么实现的”,编译器不需要知道 Acceptor 的完整定义就可以处理 std::unique_ptr<Acceptor>。为什么?因为无论 Acceptor 本身多复杂,一个指针(或智能指针)的大小是固定的(在64位系统上是8字节)。编译器知道如何为一个指针分配空间。

TcpServer 只有在它的实现文件 TcpServer.cc 中才需要真正地创建 Acceptor 对象 (new Acceptor(...)) 或者调用它的方法 (acceptor_->listen())。因此,#include "muduo/net/Acceptor.h" 这行代码被放在了 TcpServer.cc 的开头,而不是头文件中。

​ 现在,如果 Acceptor.hEventLoopThreadPool.h 的内部实现发生了任何改变,只要 TcpServer.h 的接口不变,就只有 TcpServer.cc 这一个文件需要重新编译。所有其他只包含了 TcpServer.h 的文件都安然无恙。编译时间从 O(N) 变成了 O(1)。同时,TcpServer 的使用者完全不知道 Acceptor 的存在,实现细节被完美地隐藏了起来。

什么时候使用头文件?

在头文件里用到了类的完整定义(比如作为成员变量直接存储对象,而不是指针/引用,或者需要调用成员函数),就必须 include 头文件。如果只是用指针、引用、声明参数类型、返回值类型,只需要前向声明。

二、禁用拷贝赋值

通过继承一个禁用掉拷贝和赋值运算符的类实现,避免重复代码,同时,在继承这个类时,可以充当文档告诉其他开发者,这个类时禁止拷贝或者赋值的

1
2
3
4
5
6
7
8
9
10
11
class noncopyable
{
public:
noncopyable(const noncopyable&) = delete;
void operator=(const noncopyable&) = delete;

protected:
noncopyable() = default;
~noncopyable() = default;
};
class TcpServer : noncopyable //继承noncopyable,class继承默认为private

原理:利用 C++ 的继承和访问控制机制,让编译器在你尝试拷贝或赋值时直接报错。流程:通过private继承noncopyable,意味着 noncopyable 的 public 和 protected 成员在 TcpServer 里是 private 的,外部无法访问。

1、构造和析构是 protected,这样只能被子类构造和析构,不能在外部直接创建 noncopyable 对象。

2、拷贝构造函数和赋值运算符被 = delete,外部和子类都不能访问。

编译器行为:

  • 当你写 TcpServer a; TcpServer b = a; 或 b = a; 时,编译器会去找 TcpServer 的拷贝构造和赋值运算符。

  • 由于 TcpServer 没有自己实现这两个函数,编译器会去基类 noncopyable 里找。

  • 发现基类的这两个函数被 = delete,所以编译器直接报错,禁止拷贝和赋值。

三、回调函数声明

1
2
3
4
5
6
7
8
9
//Callback.h
typedef std::function<void (const TcpConnectionPtr&)> ConnectionCallback;
typedef std::function<void (const TcpConnectionPtr&)> CloseCallback;
typedef std::function<void (const TcpConnectionPtr&)> WriteCompleteCallback;
//TcpServer.h
class TcpServer : noncopyable
{
public:
typedef std::function<void(EventLoop*)> ThreadInitCallback;

刚开始很奇怪,为什么ThreadInitCallback生命在TcpServer.h,而其他回调放在统一放在Callback.h文件中,以为是作者忘记了,后来询问ai,发现是有意为之

原因:

  1. ThreadInitCallback 这个类型只和 TcpServer 的线程池初始化有关,它的语义非常专用,只在 TcpServer 相关代码中用到。
  2. Callback.h 里一般放的是通用的回调类型,比如 ConnectionCallback、MessageCallback 这种会被很多类用到的回调。
  3. 如果把所有只在某个类用到的 typedef 都放到 Callback.h,会让 Callback.h 变得很臃肿、不清晰,反而降低了可维护性。
  4. ThreadInitCallback 依赖于 EventLoop 类型,如果放到 Callback.h,就会让 Callback.h 依赖 EventLoop.h,这样会让头文件之间的依赖变复杂,甚至可能引入循环依赖。
  5. 一般来说,只在某个类/模块内部用到的类型定义,直接放在对应的头文件里,而不是放到全局的 callback 头文件中。只有那种全局通用的回调类型,才会放到 Callback.h 这种公共头文件。

四、变量命名方式

封装,在muduo库中你几乎看不见直接使用底层API,至少都会都会经过一次封装,封装成一个类,同时,使用typedef重命名,明确每一个类的作用

1
2
3
4
5
6
7
typedef std::map<string, TcpConnectionPtr> ConnectionMap;
typedef std::function<void(EventLoop*)> ThreadInitCallback;
AtomicInt32 started_;
TcpServer(EventLoop* loop,
const InetAddress& listenAddr,
const string& nameArg,
Option option = kNoReusePort);

目的:

​ 这不仅仅是为了隐藏底层细节,更是为了创造一个比底层API更强大、更安全、更易用的抽象层。如后面会看见的,poller将IO复用模型封装,随时能改变底层的IO复用模型,还有Socket同理,不直接使用linux底层的socket函数,这样在移植系统时,无论底层接口是什么,都能直接使用Socket而不需要关心底层,和poller一样,都能随时改变,这种设计的可扩展性就很高。后面还会继续展开muduo库在使用上的各种封装。

​ 还有typedef的使用,std::map<std::string, std::shared_ptr<muduo::net::TcpConnection>> 这样的类型声明又长又复杂。当您读到 ConnectionMap 时,您立刻就能明白它的意图——这是一个“存储连接的容器”,而不需要去想着这个复杂map到底是干什么的。

五、智能指针使用

在muduo库中,很少能看见裸指针的直接使用,智能指针管理“所有权”,裸指针表示“使用权”

1
2
3
EventLoop* loop_; 
std::unique_ptr<Acceptor> acceptor_;
std::shared_ptr<EventLoopThreadPool> threadPool_;

muduo中大量使用智能指针,其根本是RAII的编程思想,作者通过大量封装,充分的利用了RAII思想,如后面会介绍的Socket,在析构的时候关闭文件描述符,避免忘记关闭文件描述符,如MutexLockGuardMutexLockGuard 类在构造函数中调用 mutex_.lock(),在**析构函数中调用 mutex_.unlock()**等等。

虽然裸指针很少使用,但是可以看到,EventLoop就是一个裸指针

智能指针 (std::unique_ptr, std::shared_ptr):当你使用智能指针时,你是在声明:“我这个对象,对另一个对象的生命周期负有责任”。

  • std::unique_ptr<Acceptor> acceptor_ 意味着 TcpServer 独占 Acceptor,当 TcpServer 析构时,Acceptor 必须被销毁。
  • std::shared_ptr<TcpConnection> 意味着 TcpServer 和其他协作者(如回调函数)共同拥有 TcpConnection 的生命周期。

裸指针 (EventLoop*):当你使用裸指针时,你是在声明:“我需要使用那个对象,但我不管它的死活,它的生命周期由别人负责”。这是一种**非拥有(non-owning)**的、观察性的关系。

因为在 muduo 的设计中,EventLoop 的生命周期总是长于使用它的 TcpConnectionTcpServerEventLoop 对象是在一个线程的栈上创建的,它的生命周期与整个线程的事件循环 loop.loop() 绑定。只要这个线程在运行,EventLoop 对象就一直存活。用智能指针明确所有权和生命周期管理,用裸指针表示无所有权的、有生命周期保障的引用。

RAII思想:

将任何一种“资源”(文件描述符、内存、锁、线程、甚至是更复杂的对象)的生命周期,与一个栈上对象的生命周期绑定。通过 “对象离开作用域时析构函数必被调用”的特性,来实现资源的自动、安全、无遗漏的释放。

六、类初始化处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
TcpServer::TcpServer(EventLoop* loop,
const InetAddress& listenAddr,
const string& nameArg,
Option option)
: loop_(CHECK_NOTNULL(loop)),//主EventLoop
ipPort_(listenAddr.toIpPort()),//监听地址
name_(nameArg),//服务器名称
acceptor_(new Acceptor(loop, listenAddr, option == kReusePort)),//Acceptor 负责监听新连接
threadPool_(new EventLoopThreadPool(loop, name_)),//EventLoopThreadPool 负责管理多个EventLoop线程
connectionCallback_(defaultConnectionCallback),//默认连接回调
messageCallback_(defaultMessageCallback),//默认消息回调
nextConnId_(1)//连接计数器,为每一个新到来的 TcpConnection 生成一个独一无二的、递增的ID号。
{
// 设置Acceptor的回调函数,当有新连接到来时,Acceptor会调用TcpServer的newConnection函数,将新连接封装成一个TcpConnectionPtr
acceptor_->setNewConnectionCallback(std::bind(&TcpServer::newConnection, this, _1, _2));
}

//Acceptor建立新连接时,调用TcpServer的newConnection函数,将新连接封装成一个TcpConnectionPtr
void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr)
{
loop_->assertInLoopThread();
EventLoop* ioLoop = threadPool_->getNextLoop();//获取一个EventLoop线程,将TcpConnection对象放入该线程中
char buf[64];
snprintf(buf, sizeof buf, "-%s#%d", ipPort_.c_str(), nextConnId_);
++nextConnId_;//连接计数器,为每一个新到来的 TcpConnection 生成一个独一无二的、递增的ID号。
string connName = name_ + buf;

InetAddress localAddr(sockets::getLocalAddr(sockfd));
TcpConnectionPtr conn(new TcpConnection(ioLoop, connName, sockfd, localAddr, peerAddr));
connections_[connName] = conn;
//设置TcpConnection的回调函数
conn->setConnectionCallback(connectionCallback_);// 连接回调
conn->setMessageCallback(messageCallback_);// 消息回调
conn->setWriteCompleteCallback(writeCompleteCallback_);// 写完成回调
conn->setCloseCallback(std::bind(&TcpServer::removeConnection, this, _1)); // 关闭回调
ioLoop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn)); ////将TcpConnection对象的连接建立事件放入EventLoop线程中执行
}

TcpServer在建立连接阶段的作用:

开始的时候,TcpServer在构造函数那里初始化Acceptor,并设置Acceptor建立连接后的回调,这个回调用于将fd封装成一个TcpConnnection并放入到一个EventLoop中,由该EventLoop负责后续的监听。