6.7 包分片
我们这里说的包分片,指的是应用层的对包的拆分。当一个包的数据较大,超过一个包的最长长度时,我们需要对包进行分片。有的读者可能会有疑问:分成多个包就行了,为什么要对包进行分片?在实际应用中,一般会根据业务需求对包的类型进行编号,例如使用一个 wCmd 表示业务号,但某些业务类型某次携带的数据可能比较大,超过了单个包的最大长度,这个时候我们需要将该数据拆分成多个包片,但其业务号隶属于同一个包,这就是所谓的”包分片“。
在理解了包分片的原理后,设计包分片功能也很简单了。这里提供两种包分片的思路。
设置分片标志
在包头部分设置一个字段表示当前包是否属于某个大包的分片,分片标志字段一般有 4 种取值类型:无分片标志、包的第一个分片标志、包的最后一个分片标志、第一个分片与最后一个分片之间的包分片标志。
每个包分片的包头部分有该包的总分片数目和当前分片编号。
对于 TCP 协议来说,由于其数据传输本身是有序的,因此多个分片,只要我们一端按顺序依次发送,另外一端收包时一定会按发送的顺序收到。因此,我们不用考虑包分片的顺序问题。
我们来看一个具体的包分片的例子:
假设现在有如下协议头定义:
//与客户端交互协议包头
#pragma pack(push, 1)
typedef struct tagNtPkgHead
{
unsigned char bStartFlag; //协议包起始标志 0xFF
unsigned char bVer; //版本号
unsigned char bEncryptFlag; //加密标志(如果不加密,则为0)
unsigned char bFrag; //是否有包分片(1 有包分片 0 无包分片)
unsigned short wLen; //总包长
unsigned short wCmd; //命令号
unsigned short wSeq; //包的序列号,业务使用
unsigned short wCrc; //Crc16校验码
unsigned int dwSID; //会话ID
unsigned short wTotal; //有包分片时,分片总数
unsigned short wCurSeq; //有包分片时,分片序号,从0开始,无分片时也为0
} NtPkgHead, *PNtPkgHead;
#pragma pack(pop)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
对端在处理包分片的逻辑伪码如下:
UINT CSocketClient::RecvDataThreadProc(LPVOID lpParam)
{
LOG_NORMAL("Start recv data thread.");
DWORD dwWaitResult;
std::string strPkg;
//临时存放一个完整的包数据的变量
std::string strTotalPkg;
unsigned short uPkgLen = 0;
unsigned int uBodyLen = 0;
unsigned int uTotalPkgLen = 0;
unsigned int uCmd = 0;
NtPkgHead pkgHead;
unsigned short uTotal = 0;
//记录上一次的包分片序号,包分片序号从0开始
unsigned short uCurSeq = 0;
int nWaitTimeout = 1;
CSocketClient* pSocketClient = (CSocketClient*)lpParam;
while (!m_bExit)
{
//检测是否有数据
if (!pSocketClient->CheckReceivedData())
{
//休眠10豪秒
Sleep(10);
continue;
}
//接收数据,并放入pSocketClient->m_strRecvBuf中
if (!pSocketClient->Recv())
{
LOG_ERROR("Recv data error");
//收数据出错,清空接收缓冲区,可以做一些关闭连接、重连等动作,
pSocketClient->m_strRecvBuf.clear();
Reconnect();
continue;
}
//一定要放在一个循环里面解包,因为当前缓冲区中可能存在多个数据包
while (true)
{
//判断当前收到的数据是否够一个包头大小
if (pSocketClient->m_strRecvBuf.length() < sizeof(NtPkgHead))
break;
memset(&pkgHead, 0, sizeof(pkgHead));
memcpy_s(&pkgHead, sizeof(pkgHead), pSocketClient->m_strRecvBuf.c_str(), sizeof(pkgHead));
//对包消息头检验
if (!CheckPkgHead(&pkgHead))
{
//如果包头检验不通过,缓冲区里面的数据已经是脏数据了,直接清空掉,
//可以做一些关闭连接并重连的动作
LOG_ERROR("Check package head error, discard data %d bytes", (int)pSocketClient->m_strRecvBuf.length());
pSocketClient->m_strRecvBuf.clear();
Reconnect();
break;
}
//判断当前数据是否够一个整包的大小
uPkgLen = ntohs(pkgHead.wLen);
if (pSocketClient->m_strRecvBuf.length() < uPkgLen)
break;
strPkg.clear();
strPkg.append(pSocketClient->m_strRecvBuf.c_str(), uPkgLen);
//从收取缓冲区中移除已经处理的数据部分
pSocketClient->m_strRecvBuf.erase(0, uPkgLen);
uTotal = ::ntohs(pkgHead.wTotal);
uCurSeq = ::ntohs(pkgHead.wCurSeq);
//无分片或第一个分片
if (uCurSeq == 0)
{
strTotalPkg.clear();
uTotalPkgLen = 0;
}
uBodyLen = uPkgLen - sizeof(NtPkgHead);
uTotalPkgLen += uBodyLen;
strTotalPkg.append(strPkg.data() + sizeof(NtPkgHead), uBodyLen);
//无分包 或 分包的最后一个包 则将组装后的包发送出去
if (uTotal == 0 || (uTotal != 0 && uTotal == uCurSeq + 1))
{
uCmd = ::ntohs(pkgHead.wCmd);
//ProxyPackage是解析出来的业务包定义
ProxyPackage proxyPackage;
//拷贝业务号
proxyPackage.nCmd = uCmd;
//拷贝包长度
proxyPackage.nLength = uTotalPkgLen;
//拷贝包体内容
proxyPackage.pszJson = new char[uTotalPkgLen];
memset(proxyPackage.pszJson, 0, uTotalPkgLen * sizeof(char));
memcpy_s(proxyPackage.pszJson, uTotalPkgLen, strTotalPkg.c_str(), strTotalPkg.length());
//将一个完整的包交给业务处理
pSocketClient->m_pNetProxy->AddPackage((const char*)&proxyPackage, sizeof(proxyPackage));
}
}// end inner-while-loop
}// end outer-while-loop
LOG_NORMAL("Exit recv data thread.");
return 0;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
上述代码在一个网络数据收取线程中,先检测是否有可读数据,如果有可读数据,则从 socket 上读取该数据存入接收缓冲区 pSocketClient->m_strRecvBuf 中,然后判断收到的数据是否够一个包头的大小(sizeof(NtPkgHead)),如果不够,退出当前循环等待后续数据到来;如果够,对包头数据进行校验后,从包头中得到整包的大小(ntohs(pkgHead.wLen))(这里表示整包的大小的字段 wLen 使用了网络字节序,我们调用 ntohs() 函数得到本机字节序);然后判断收到的数据是否够一个整包的大小,如果不够,退出当前循环等待后续数据到来;如果够,则根据记录当前包分片序号的变量 uCurSeq (uCurSeq = ::ntohs(pkgHead.wCurSeq))来确定该包是否是某个分片,uCurSeq 等于 0 时说明此次从一个新的包片或完整的包开始的;从接收缓冲区中将当前包片或者完整包的数据放入变量 strTotalPkg 中存储起来(注意 pkgHead.wTotal 和 pkgHead.wCurSeq 均使用了网络字节序,需要转换成本地字节序)。接着,根据包头字段 pkgHead.wTotal 和 pkgHead.wCurSeq 转换成本机字节序的值判断这是否是一个完整的包(当 uTotal == 0 时)或者是最后一个包分片(当 uTotal != 0 && uTotal == uCurSeq + 1 时),此时 strTotalPkg 存放的就是一个完整的包数据了,接着将其拷贝出来(这里是拷贝至 ProxyPackage 结构中),进行业务逻辑处理。如果当前包片只是一个大包的中间包片,则继续进行下一轮数据的处理。strTotalPkg 中存放的数据达到一个完整的包时会在业务处理后、下一轮循环存入新的包片数据前清空掉(代码第 81 行)。
上述流程可用如下流程图表示: