7.7 发送/接收缓冲区设计要点
# 7.7.1 为什么需要发送/接收缓冲区
上一节我们介绍了正确的收发数据的方法,网络层在发送数据的过程中,由于 TCP 窗口太小,导致数据无法发送出去,而上层可能不断产生新的数据,此时我们需要将数据先存储起来以便等 socket 可写时再次发送,这个存储数据的地方,我们称之为发送缓冲区。
对于接收缓冲区也是一样的道理,当收到数据以后,我们可以直接进行解包,但是这样并不好,理由有三:
理由一:除非一些约定俗成的协议格式(例如 http 协议),大多数服务器的业务的协议都是不同的,也就是说一个数据包里面的数据格式的解读应该是业务层的事情。不同的业务一般有不同的协议格式,协议格式与具体的业务有关,网络通信层一般不知道也不需要知道上层协议中的数据具体格式,为了让网络层更加通用,网络通信层应该与业务层解耦。
理由二:即使知道协议格式,由于 TCP 协议是流式协议,我们某一次收到的数据长度不一定够一个完整的包大小,此时我们需要一个地方将这些不完整的数据缓存起来,以便等数据足够一个包时再处理。
理由三:即便接收到的数据足够一个包,出于一些特殊的业务逻辑要求,我们需要将收到的数据暂时缓存起来,等满足一定的条件时再取出来处理。
鉴于以上三点,我们的网络层确实需要一个接收缓冲区,将收取到的数据放到该缓冲区里面去,并由专门的业务线程或者业务逻辑去从接收缓冲区中取出数据,并解包处理业务。
# 7.7.2 如何设计发送/接收缓冲区
根据前面的描述,无论是接收还是发送缓冲区一般我们建议将其设计成一个内存连续的存储容器。
当然,你也可以将其设计为不连续的,如链表结构,每个链表的节点是一块存储数据的内存块。这种设计在存储数据时相对来说比较麻烦。
一般的发送缓冲区和接收缓冲区根据功能至少需要提供两类接口,即存数据和取数据的接口。对于发送缓冲区,由于上层交给网络层的数据是有序的,某次需要发送的数据未发完,其剩余的数据一定排在后续产生的数据前面;对于接收缓冲区,不断从 socket 上读取数据,后面读到的数据一定排在前面读到的数据的后面。
另外,发送和接收缓冲区其容量应该设置成多大?这是一个老生常谈的问题了:预分配的内存太小不够用,太大的话可能会造成浪费。怎么办呢?答案就是像 std::string、vector 一样,设计出一个可以动态增长的缓冲区,按需分配,不够还可以扩展。
既然是用于收发数据的缓冲区,我们可能需要向其中写入或者读取各种数据类型,如 char、short、int32、int64、string 等,所以这是我们在设计缓冲区对象时需要考虑的情形。
对于接收缓冲区,我们可能需要从接收缓冲区中寻找特殊的标志,例如在某个业务的数据包以 \n 为结束标志的,我们需要在其中寻找 \n 标志以确定缓冲区中的数据长度是否至少足够一个包的长度。
我们这里介绍一个缓冲区结构的例子,这个缓冲区创建时,会自动分配一块固定大小的内容,其结构如下:
这个结构中,内存是连续的,内存由两块构成:预留空间和存储数据的空间,其中预留空间可来做一些特殊用途,例如存储一些元数据信息等,预留空间大小可以设置为 0,不是必需的;存储数据的空间顾名思义,即用于存储数据的空间,由于该空间同时提供读数据和写数据的功能,分别使用一个读指针和写指针来标明读写位置。当缓冲区是空的时候,读写指针位置相同。当写入一段数据后,该缓冲区结构变成如下图所示:
此时该缓冲区中已存储数据的范围是读指针位置~ 写指针位置的区间,下一次读数据从缓冲区读指针位置开始读,一共可以读取的数据长度即写指针位置减去读指针位置的长度。假设我们读取了 n 个字节,n 小于可读数据的长度。该缓冲区结构变为如下图所示:
下一次写数据会从写指针位置开始继续写,此时会存在多种情形,为了更清楚的说明问题,我们假设预留的空间大小为 prependableBytes,读指针使用 readerIndex,写指针使用 writerIndex,总缓冲区长度为 size,假设我们现在需要写入 m 个字节,有以下几种情形:
当 m <= size - writerIndex 时,即即将写入的数据长度小于等于写指针位置到缓冲区结束位置,此时可以直接在 writerIndex 位置写入;
当 m > size - writerIndex 时,此时从 writerIndex 位置一直到缓冲区的结束位置的内容已经不够写入了,可以对当前缓冲区的未充分利用的内存进行整理,哪里有未充分使用的内存呢?从预留空间结束位置到读指针之前的位置,这段空间的内存的数据已经被读取了。 将读指针位置挪到预留空间结束处(即起始位置),然后将原来读指针之后的数据也挪到起始位置,然后将写指针向前挪到数据结束的位置。挪动后的缓冲区结构如下:
挪动之后,如果剩余空间足够写入 m 字节的数据,则在新的 writerIndex 位置写入;如果挪动之后,剩余空间仍然不够写入 m 个字节,则需要重新扩展缓冲区了,即新建一个更大的缓冲区将现有的缓冲区结构和数据复制过去。扩容示意图如下:
上述逻辑在代码上实现起来很简单,我们甚至可以基于 stl 提供的 std::string、std::vector 等现成的类来实现,这里就不再贴出具体的实现代码了。
关于接收和发送缓冲区,我需要另外强调几点:
- 对于服务器端程序来说,由于需要同时服务多个客户端,而每一路连接都会有一个接收和发送缓冲区,即所谓的 per-socket buffer,也就是说不同连接的从 socket 上读取出来但还没被业务处理的数据会放在自己对应的接收缓冲区中,因 TCP 窗口太小,暂时发不出去的数据,也会存放在自己所属连接的发送缓冲区中。
- 上述例子中,我们假定的缓冲区是可以无限扩展的,实际开发中,尤其对于服务器开发,由于存在多个连接,一般接收和发送缓冲区的初始容量都设置的不大,然后按需扩展,但一定有一个上限,且这个上限一般不会太大。如果在较长时间内,发送缓冲区中的数据一直发不出去,我们可以认为这路连接出问题了,可以将其缓冲区回收并关闭连接。同样的道理,如果接收缓冲区的数据一直滞留、甚至积压,我们就要好好的检查一下我们处理数据的逻辑是否有问题,为何不能及时处理数据。当然,实际开发中还会遇到这样一种情况,即对端短时间内给服务器端发送大量数据,此时我们需要做一个限制策略,例如在 1 秒内,某路连接的接收缓冲区中的数据已经达到 30 M,我们可以设置一个标志,不再从该路连接的 socket 上继续读取数据,直到接收缓冲区的数据被处理掉再清除该标志以便继续从该连接的 socket 上收取数据。
# 7.7.3 服务器端发数据时,如果对端一直不收,怎么办?
这类问题一般出现在跨部门尤其是与外部开发人员合作的时候。假设现在有这样一种情况,我们的服务器提供对外的服务,指定好了协议,然后对外提供服务,客户端由外部人员去开发,由于存在太多的不确定性,如果我们在给对端(客户端)发送数据时,对端因为一些问题(可能是逻辑 bug 或者其他的一些问题)一直不从 socket 系统缓冲区中收取数据,而服务器端可能定期产生一些数据需要发送给客户端,再发了一段时间后,由于 TCP 窗口太小,导致数据发送不出去,这样待发送的数据会在服务器端对应的连接的发送缓冲区中积压,如果我们不做任何处理,很快系统就会因为缓冲区过大内存耗尽,导致服务被系统杀死。
对于这种情况,我们一般建议从以下几个方面来增加一些防御措施:
设置每路连接的发送缓冲区大小上限(如 2 M,或者小于这个值),当某路连接上的数据发送不出去的时候,即将数据存入发送缓冲区时,先判断一下缓冲区最大剩余空间,如果剩余空间已经小于我们要放入的数据大小,也就是说缓冲区中数据大小超过了我们规定的上限,则认为该连接出现了问题,关闭该路连接并回收相应的资源(如清空缓冲区、回收套接字资源等)。示例代码如下:
//outputBuffer_为发送缓冲区对象 size_t remainingLen = outputBuffer_.remainingBytes(); //如果加入到缓冲区中的数据长度超出了发送缓冲区最大剩余量 if (remainingLen < dataToAppend.length()) { //关闭连接 forceClose(); return; } outputBuffer_.append(static_cast<const char*>(dataToAppend.c_str()), dataToAppend.length());
1
2
3
4
5
6
7
8
9
10
11还有另外一种场景,当有一部分数据已经积压在发送缓冲区了,此后服务器端未产生新的待发送的数据,此时如果不做任何处理,发送缓冲区的数据会一直积压,但是发送缓冲区的数据容量也不会超过上限。如果不做任何处理的话,该数据会一直在缓冲区中积压,白白浪费系统资源。对于这种情况一般我们会设置一个定时器,每隔一段时间(如 6 秒)去检查一下各路连接的发送缓冲区中是否还有数据未发送出去,也就是说如果一个连接超过一定时间内还存在未发送出去的数据,我们也认为该连接出现了问题,我们可以关闭该路连接并回收相应的资源(如清空缓冲区、回收套接字资源等)。示例代码如下:
//每3秒检测一次 const int SESSION_CHECK_INTERVAL = 6000; SetTimer(SESSION_CHECK_TIMER_ID, SESSION_CHECK_INTERVAL); void CSessionManager::OnTimer() { for (auto iter = m_mapSession.begin(); iter != m_mapSession.end(); ++iter) { if (!CheckSession(iter->value)) { //关闭session,回收相关的资源 iter->value->ForceClose(); iter = m_mapSession.erase(iter); } } } //检测相应连接的发送缓冲区中是否还有未发送数据 void CSessionManager::CheckSession(CSession* pSession) { return pSession->GetConnection().OutputBuffer.IsEmpty(); }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24上述代码,每隔 6 秒检测所有的 Session 的对应的 Connection 对象,如果发现发送缓冲区非空,说明该连接中发送缓冲区中数据已经驻留 6 秒了,将该连接关闭并清理资源。