TcpConnection类是用于管理TCP连接的类,它负责管理一个TCP连接的读写事件, 并提供接口来设置读写回调、关闭回调、错误回调等。包含一个EventLoop对象,一个文件描述符,一个事件类型,一个回调函数。
一、数据发送策略
muduo 的 TcpConnection 采用两种发送方式结合的策略。首先尝试直接发送,如果 outputBuffer_ 为空,就直接将数据写入 socket 内核缓冲区,避免了不必要的内存拷贝。当直接发送无法一次性完成时(通常因为内核缓冲区已满),剩余的数据会被存入应用层的 outputBuffer_。此时的发送流程从“主动发送”切换到了“事件驱动”模式。因为在使用outputBuffer时,会调用 channel_->enableWriting();关注写事件,后面通过handleWrite函数处理outputBuffer中的数据。
1 | void TcpConnection::sendInLoop(const void* data, size_t len) |
在数据存入 outputBuffer_ 时,会检查缓冲区的水位。如果“已有数据 + 新增数据”的总量超过了设定的高水位线 highWaterMark_,就会触发 highWaterMarkCallback_ 回调,但是这个回调仅仅是通知,muduo 库本身不会擅自主张的丢弃数据,还是需要存入outputBuffer中,确保数据的可靠发送。具体的流量控制,需要在highWaterMarkCallback_中配置。
值得注意的是,moduo库的epoll并没有使用ET模式,而是使用LT模式,在触发回调执行handlewrite函数时,并没有使用循环,而是只发送一次,然后在全部发送后调用channel_->disableWriting();取消写事件,如果outputbuffer一次性没有写完,epoll仍会关注这个可写事件,因为没有调用disableWriting,所以会在下一次epoll->wait函数的时候再次触发,知道数据全部写完并调用disableWriting,这样设计的原因是通过 EventLoop 实现了公平调度,避免了单个连接长时间霸占 I/O 线程,导致系统“卡死”
二、Boost::any
boost::any context_ 的作用是:允许用户将任意类型的、自定义的数据附加到一个 TcpConnection 对象上,作为一个与该连接绑定的“上下文”或“状态管理器”。boost::any (在 C++17 中已被标准化为 std::any) 是一个可以持有任意类型单个值的类型安全容器。你可以把它想象成一个“万能盒子”,什么都能装,但在取出来的时候,你必须明确知道里面装的是什么类型,否则会抛出异常。这比使用不安全的 void* 指针要好得多
muduo 作为一个通用的网络库,它只负责管理 TCP 连接、收发字节流这些底层事务。它完全不知道上层的业务逻辑是什么。
- 对于一个 HTTP 服务器,每个连接可能需要维护一个
HttpContext对象,用来解析 HTTP 请求的状态。 - 对于一个 RPC 服务器,每个连接可能需要维护一个
RpcChannel对象,用来处理 RPC 调用。 - 对于一个游戏服务器,每个连接可能需要关联一个
Player或Session对象,来存储玩家信息。
如果 muduo 要为每一种应用都去修改 TcpConnection 类,添加 HttpContext* httpContext_ 或 Player* player_ 这样的成员,那这个库就失去了通用性。
context_ 就是为了解决这个问题而生的。它提供了一个统一的、非侵入式的接口,让用户可以把自己的业务对象“挂”在 TcpConnection 上。
在 HTTP 服务器中 (HttpServer.cc):
当一个新的 TCP 连接建立时,HttpServer 会创建一个 HttpContext 对象,并通过 conn->setContext() 将它存入连接的 context_ 中。在后续的 onMessage 回调中,服务器会通过 conn->getMutableContext() 取出这个 HttpContext 对象,用它来持续解析同一个连接上发来的数据流。
1 | void HttpServer::onConnection(const TcpConnectionPtr& conn) |
在 RPC 服务器中 (RpcServer.cc):
同样,当新连接建立时,RpcServer 会创建一个 RpcChannel 对象,并将其存入 context_。
1 | void RpcServer::onConnection(const TcpConnectionPtr& conn) |
TcpConnection 类不需要知道任何关于上层业务(HTTP, RPC, Game…)的细节。它只负责提供一个“插座”。用户不需要为了添加自定义状态而去继承 TcpConnection 或修改库源码。使用 boost::any_cast 来获取数据,如果在运行时类型不匹配,会抛出异常,这比使用 void* 进行不安全的 static_cast 要健壮得多
具体使用示例:
1 |
|
1 | HttpServer::HttpServer(EventLoop* loop, |
在HttpServer初始化的时候,在TcpServer中设置了自定义的onConnection和onMessage的回调函数,在onConnection中设置了
conn>setContext(HttpContext());上下文对象,在onMessage中,设置了自定义的解析函数在新连接建立后,会触发设置好的onConnection函数
1
2
3
4
5
6
7void HttpServer::onConnection(const TcpConnectionPtr& conn)
{
if (conn->connected())
{
conn->setContext(HttpContext());
}
}将httpContext上下文设置到TcpConnection的context中
在请求消息到来时,会触发epoll树上的读事件
这个读事件会触发设置好的读回调,也就是TcpConnection::handleRead(Timestamp receiveTime)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19{
loop_->assertInLoopThread();
int savedErrno = 0;
ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);
if (n > 0)
{
messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);//如果正确读取数据,会调用我们设置的自定义messageCallback_
}
else if (n == 0)
{
handleClose();
}
else
{
errno = savedErrno;
LOG_SYSERR << "TcpConnection::handleRead";
handleError();
}
}如果正确读取到数据,会调用预先设置到的messageCallback_,将读取到的内容通过buf传进来
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18void HttpServer::onMessage(const TcpConnectionPtr& conn,
Buffer* buf,
Timestamp receiveTime)
{
HttpContext* context = boost::any_cast<HttpContext>(conn->getMutableContext());
if (!context->parseRequest(buf, receiveTime))
{
conn->send("HTTP/1.1 400 Bad Request\r\n\r\n");
conn->shutdown();
}
if (context->gotAll())
{
onRequest(conn, context->request());
context->reset();
}
}然后获取实现设置好的context对象,解析http请求,然后调用onRequest函数,调用httpCallback_函数返回数据。
有没有好奇一个问题,那就是在onConnection中传入一个context,然后在onMessage中取出context,但是你会发现,在TcpConnection中,没有对context做任何操作,那为什么还需要传入context呢?buf和receiveTime直接传过来,然后Context使用局部变量不就好了吗?
**原因是TCP是一个流式传输协议**
在发送数据过来是,TCP 并不保证你每次收到的数据都恰好是一个完整的应用层消息(比如一个完整的 HTTP 请求),一个完整的 HTTP 请求可能会被拆分成多个 TCP 包进行传输。这意味着 `onMessage` 回调可能会被触发多次,每次 `buf` 里只包含请求的一部分,例如:
1. 第一次 `onMessage`,`buf` 里是 `"GET /index.html HTTP/1.1\r\n"`
2. 第二次 `onMessage`,`buf` 里是 `"Host: www.example.com\r\n"`
3. 第三次 `onMessage`,`buf` 里是 `"\r\n"`
为了效率,TCP 也可能将多个小的数据包合并在一起发送。在 HTTP 的 `Keep-Alive` 模式下,客户端可能会连续发送多个请求。这意味着你的一次 `onMessage` 回调中,`buf` 里可能包含一个半、甚至两个或更多的 HTTP 请求。
如果`onMessage` 每次都创建一个新的、局部的 `HttpContext` 对象:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 假设使用局部变量
void HttpServer::onMessage(const TcpConnectionPtr& conn,
Buffer* buf,
Timestamp receiveTime)
{
HttpContext local_context; // 每次都创建一个新的 context
// 场景1:请求被拆分
// 第一次回调,buf 里只有 "GET /index.html...",不完整。
// parseRequest 会解析一部分,然后返回。local_context 随函数结束而被销毁。
// 第二次回调,buf 里是剩下的部分 "Host: ...\r\n\r\n"。
// 此时又创建了一个全新的 local_context,它完全不知道之前已经解析过请求行了。
// 它会尝试把 "Host: ..." 当作一个新的请求行来解析,这必然会导致解析失败。
local_context.parseRequest(buf, receiveTime);
if (local_context.gotAll()) {
// ...
}
}
**局部变量是无状态的**。它无法“记住”上一次 `onMessage` 回调时解析到了哪里。
`context` 的真正作用:为每个连接维持状态,内部维护一个解析的状态机,记录着请求的解析状态。当一个不完整的 HTTP 请求到达时,`HttpContext` 会解析它所能解析的部分,并记录下当前的状态。当这个连接的下一个数据包到达时,`onMessage` 通过 `conn->getMutableContext()` 获取到的是**同一个 `HttpContext` 实例**,然后继续从上次的状态开始解析。
`HttpServer` 可能同时处理成千上万个连接,每个连接的 HTTP 请求解析进度都不同。`context` 机制确保了每个连接的解析状态都独立存储,互不干扰。所以看起来传进去啥都没干,实际上是确保了数据的正确解析,将TcpConnecton和应用层解析状态绑定到了一起