6.10 一个自定义协议示例
本节我们结合前面的理论知识,演示如何自定义一个灵活的通信协议类。在我的开源即时通讯 Flamingo 中我自定义了一个协议格式,这个协议也分为包头和包体两部分,其中包头的定义如下:
#pragma pack(push, 1)
//协议头
struct chat_msg_header
{
char compressflag; //压缩标志,如果为1,则启用压缩,反之不启用压缩
int32_t originsize; //包体压缩前大小
int32_t compresssize; //包体压缩后大小
char reserved[16];
};
#pragma pack(pop)
2
3
4
5
6
7
8
9
10
包体的内容长度无论是否设置了 compressflag 压缩标志,最后其实际长度都是 originsize,在得到了包体内容后,我们可以按通信两端规定好的协议来解析一个个的业务字段。
假设是一个聊天内容协议,发送方示例代码如下:
//发送方组装包体的格式
std::string outbuf;
net::BinaryStreamWriter writeStream(&outbuf);
writeStream.WriteInt32(msg_type_chat);
writeStream.WriteInt32(m_seq);
//senderId
writeStream.Write(senderId);
//消息内容
writeStream.WriteString(chatMsg);
//receiverId
writeStream.WriteInt32(receiverId);
writeStream.Flush();
2
3
4
5
6
7
8
9
10
11
12
上述代码开始处定义一个自动扩展的字符串缓冲区 outbuf(这里使用了 std::string),然后依次写入如下信息:
字段标号 | 字段名 | 类型/字节数目 | 说明 |
---|---|---|---|
字段 1 | msgType | int32/4 字节 | 消息类型 |
字段 2 | seq | int32/4 字节 | 消息序号 |
字段 3 | senderId | int32/4 字节 | 发送者 id |
字段 4 | chatMsg | string/长度由消息内容定 | 聊天消息,可以定义成一个 json 字符串 |
字段 5 | receiverId | int32/4 字节 | 接收者 id |
写入上述几个字段后,消息体结构示意图如下:
上图中有一个小的细节,即对于 string 类型的消息,在写入实际的字符串内容之前会先写入这个字符串的长度,writeStream.WriteString(chatMsg); 函数的实现如下:
//WriteString实际上调用了WriteCString方法
bool BinaryStreamWriter::WriteString(const string& str)
{
return WriteCString(str.c_str(), str.length());
}
bool BinaryStreamWriter::WriteCString(const char* str, size_t len)
{
std::string buf;
write7BitEncoded(len, buf);
m_data->append(buf);
m_data->append(str, len);
return true;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
这里的 m_data 是前面介绍的 outbuf 的指针,也就是说使用一个 std::string 来存放二进制流, BinaryStreamWriter::WriteCString() 方法会先将字符串的长度写入流中,再写入字符串本身的内容,对于字符串的长度会根据其长度值压缩成 1 ~ 5 个字节,这在前面章节《整型数值的压缩》已经介绍过了。
//将一个4字节的整型数值压缩成1~5个字节
void write7BitEncoded(uint32_t value, std::string& buf)
{
do
{
unsigned char c = (unsigned char)(value & 0x7F);
value >>= 7;
if (value)
c |= 0x80;
buf.append(1, c);
} while (value);
}
2
3
4
5
6
7
8
9
10
11
12
13
再写入上述 5 个字段后会调用 writeStream.Flush() 方法,该方法的实现如下:
void BinaryStreamWriter::Flush()
{
char* ptr = &(*m_data)[0];
unsigned int ulen = htonl(m_data->length());
memcpy(ptr, &ulen, sizeof(ulen));
}
2
3
4
5
6
这个函数的作用是在流的前 4 个字节处存放流数据的长度,存储长度使用的是网络字节序,这 4 个字节在创建 BinaryStreamWriter 对象时被预留出来。
std::string outbuf;
net::BinaryStreamWriter writeStream(&outbuf);
2
上述代码调用 BinaryStreamWriter 的构造函数:
enum
{
//4字节头长度
BINARY_PACKLEN_LEN_2 = 4,
CHECKSUM_LEN = 2,
};
BinaryStreamWriter::BinaryStreamWriter(string* data) :
m_data(data)
{
m_data->clear();
char str[BINARY_PACKLEN_LEN_2 + CHECKSUM_LEN];
m_data->append(str, sizeof(str));
}
2
3
4
5
6
7
8
9
10
11
12
13
14
实际上在 m_data 指向流起始处一共预留了 6 个字节,前 4 个字节是放置将来整个流数据的长度(网络字节序),后 2 个字节存放数据的校验和(checksum,这里未使用)。
因此调用 writeStream.Flush() 方法后,流对象的结构变成如下所示:
图中 4 字节的 streamLength 表示包体长度。
至此,这个二进制流虽然在我们这里的含义是包体部分,但是已经可以做到自我分界和解析了。在简单的业务中,我们先读取 4 个字节的 streamLength,然后根据 streamLength 转换成本机字节序后的长度来获取实际的内容长度。但是,我觉得这个不够方便,我没有在这个流的基础上继续扩展,而是选择在这个流的前面再加一个包头定义(即上述代码中的 chat_msg_header struct)。 chat_msg_header struct(包头)和这里的流(包体)组装成一个完整的包:
//p 即是包体流的指针
void TcpSession::sendPackage(const char* p, int32_t length)
{
string srcbuf(p, length);
string destbuf;
//按需压缩
if (m_bNeedCompress)
{
if (!ZlibUtil::compressBuf(srcbuf, destbuf))
{
LOGE("compress buf error");
return;
}
}
string strPackageData;
chat_msg_header header;
if (m_bNeedCompress)
{
//设置压缩标志
header.compressflag = PACKAGE_COMPRESSED;
//设置压缩后的包体大小
header.compresssize = destbuf.length();
}
else
header.compressflag = PACKAGE_UNCOMPRESSED;
//设置压缩前的包体大小
header.originsize = length;
//插入真正的包头
strPackageData.append((const char*)&header, sizeof(header));
strPackageData.append(destbuf);
//将整个包发到网络上去
conn->send(strPackageData);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
实际上可以基于 BinaryStreamWriter 这个流对象进行扩展,而不用单独再定义一个 chat_msg_header 结构体作为包头。
在 BinaryStreamWriter 对于浮点型的处理,是先将浮点数按一定的精度转换成字符串,然后将字符串写入流中:
// isNULL 参数表示可以写入一个 double 类型占位符
bool BinaryStreamWriter::WriteDouble(double value, bool isNULL)
{
char doublestr[128];
if (isNULL == false)
{
sprintf(doublestr, "%f", value);
WriteCString(doublestr, strlen(doublestr));
}
else
WriteCString(doublestr, 0);
return true;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
上面是对这个自定义协议的封包过程,解包过程我实现了一个 BinaryStreamReader 类,该类的操作对象是去除了包头 chat_msg_header 结构后拿到的包体流。上述聊天协议的解包代码如下:
bool CRecvMsgThread::HandleMessage(const std::string& strMsg)
{
//strMsg是包体流,如何得到包体流在《解包与处理》一节已经介绍过了
net::BinaryStreamReader readStream(strMsg.c_str(), strMsg.length());
//读取消息类型
int32_t msgType;
if (!readStream.ReadInt32(msgType))
{
return false;
}
//读取消息序列号
if (!readStream.ReadInt32(m_seq))
{
return false;
}
//根据消息类型做处理
switch (msgType)
{
//聊天消息
case msg_type_chat:
{
//从流中读取发送者id
int32_t senderId;
if (!readStream.ReadInt32(senderId))
{
break;
}
//从流中读取聊天消息本身
std::string chatMsg;
size_t chatMsgLength;
if (!readStream.ReadString(&chatMsg, 0, chatMsgLength))
{
return false;
}
//从流中读取接收者id
int32_t receiverId;
if (!readStream.ReadInt32(receiverId))
{
break;
}
//对聊天消息进行处理
HandleChatMessage(senderId, receiverId, data);
}
break;
//对其他消息的处理
}// end switch
return false;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
上述代码根据写入的流的字段类型顺序依次读出相应的字段值,在拿到各个字段值后就可以进行相应的业务处理了。
BinaryStreamReader 和 BinaryStreamWriter 类完整实现如下:
ProtocolStream.h(文件位置:flamingoserver/net/ProtocolStream.h)
/**
* 一个强大的协议类, protocolstream.h
* zhangyl 2017.05.27
*/
#ifndef __PROTOCOL_STREAM_H__
#define __PROTOCOL_STREAM_H__
#include <stdlib.h>
#include <sys/types.h>
#include <string>
#include <sstream>
#include <stdint.h>
//二进制协议的打包解包类,内部的服务器之间通讯,统一采用这些类
namespace net
{
enum
{
TEXT_PACKLEN_LEN = 4,
TEXT_PACKAGE_MAXLEN = 0xffff,
BINARY_PACKLEN_LEN = 2,
BINARY_PACKAGE_MAXLEN = 0xffff,
TEXT_PACKLEN_LEN_2 = 6,
TEXT_PACKAGE_MAXLEN_2 = 0xffffff,
BINARY_PACKLEN_LEN_2 = 4, //4字节头长度
BINARY_PACKAGE_MAXLEN_2 = 0x10000000, //包最大长度是256M,足够了
CHECKSUM_LEN = 2,
};
//计算校验和
unsigned short checksum(const unsigned short* buffer, int size);
//将一个4字节的整型数值压缩成1~5个字节
void write7BitEncoded(uint32_t value, std::string& buf);
//将一个8字节的整型值编码成1~10个字节
void write7BitEncoded(uint64_t value, std::string& buf);
//将一个1~5个字节的字符数组值还原成4字节的整型值
void read7BitEncoded(const char* buf, uint32_t len, uint32_t& value);
//将一个1~10个字节的值还原成4字节的整型值
void read7BitEncoded(const char* buf, uint32_t len, uint64_t& value);
class BinaryStreamReader final
{
public:
BinaryStreamReader(const char* ptr, size_t len);
~BinaryStreamReader() = default;
virtual const char* GetData() const;
virtual size_t GetSize() const;
bool IsEmpty() const;
bool ReadString(std::string* str, size_t maxlen, size_t& outlen);
bool ReadCString(char* str, size_t strlen, size_t& len);
bool ReadCCString(const char** str, size_t maxlen, size_t& outlen);
bool ReadInt32(int32_t& i);
bool ReadInt64(int64_t& i);
bool ReadShort(short& i);
bool ReadChar(char& c);
size_t ReadAll(char* szBuffer, size_t iLen) const;
bool IsEnd() const;
const char* GetCurrent() const { return cur; }
public:
bool ReadLength(size_t& len);
bool ReadLengthWithoutOffset(size_t& headlen, size_t& outlen);
private:
BinaryStreamReader(const BinaryStreamReader&) = delete;
BinaryStreamReader& operator=(const BinaryStreamReader&) = delete;
private:
const char* const ptr;
const size_t len;
const char* cur;
};
class BinaryStreamWriter final
{
public:
BinaryStreamWriter(std::string* data);
~BinaryStreamWriter() = default;
virtual const char* GetData() const;
virtual size_t GetSize() const;
bool WriteCString(const char* str, size_t len);
bool WriteString(const std::string& str);
bool WriteDouble(double value, bool isNULL = false);
bool WriteInt64(int64_t value, bool isNULL = false);
bool WriteInt32(int32_t i, bool isNULL = false);
bool WriteShort(short i, bool isNULL = false);
bool WriteChar(char c, bool isNULL = false);
size_t GetCurrentPos() const { return m_data->length(); }
void Flush();
void Clear();
private:
BinaryStreamWriter(const BinaryStreamWriter&) = delete;
BinaryStreamWriter& operator=(const BinaryStreamWriter&) = delete;
private:
std::string* m_data;
};
}// end namespace
#endif //!__PROTOCOL_STREAM_H__
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
ProtocolStream.cpp (文件位置:flamingoserver/net/ProtocolStream.cpp)
#ifndef _WIN32
#include <arpa/inet.h>
#else
#include <Winsock2.h>
#pragma comment(lib, "Ws2_32.lib")
#endif
#include "ProtocolStream.h"
#include <string.h>
#include <stdio.h>
#include <sys/types.h>
#include <cassert>
#include <algorithm>
#include <stdio.h>
using namespace std;
namespace net
{
//计算校验和
unsigned short checksum(const unsigned short* buffer, int size)
{
unsigned int cksum = 0;
while (size > 1)
{
cksum += *buffer++;
size -= sizeof(unsigned short);
}
if (size)
{
cksum += *(unsigned char*)buffer;
}
//将32位数转换成16位数
while (cksum >> 16)
cksum = (cksum >> 16) + (cksum & 0xffff);
return (unsigned short)(~cksum);
}
//将一个4字节的整型数值压缩成1~5个字节
void write7BitEncoded(uint32_t value, std::string& buf)
{
do
{
unsigned char c = (unsigned char)(value & 0x7F);
value >>= 7;
if (value)
c |= 0x80;
buf.append(1, c);
} while (value);
}
//将一个8字节的整型值编码成1~10个字节
void write7BitEncoded(uint64_t value, std::string& buf)
{
do
{
unsigned char c = (unsigned char)(value & 0x7F);
value >>= 7;
if (value)
c |= 0x80;
buf.append(1, c);
} while (value);
}
//将一个1~5个字节的字符数组值还原成4字节的整型值
void read7BitEncoded(const char* buf, uint32_t len, uint32_t& value)
{
char c;
value = 0;
int bitCount = 0;
int index = 0;
do
{
c = buf[index];
uint32_t x = (c & 0x7F);
x <<= bitCount;
value += x;
bitCount += 7;
++index;
} while (c & 0x80);
}
//将一个1~10个字节的值还原成4字节的整型值
void read7BitEncoded(const char* buf, uint32_t len, uint64_t& value)
{
char c;
value = 0;
int bitCount = 0;
int index = 0;
do
{
c = buf[index];
uint64_t x = (c & 0x7F);
x <<= bitCount;
value += x;
bitCount += 7;
++index;
} while (c & 0x80);
}
BinaryStreamReader::BinaryStreamReader(const char* ptr_, size_t len_)
: ptr(ptr_), len(len_), cur(ptr_)
{
cur += BINARY_PACKLEN_LEN_2 + CHECKSUM_LEN;
}
bool BinaryStreamReader::IsEmpty() const
{
return len <= BINARY_PACKLEN_LEN_2;
}
size_t BinaryStreamReader::GetSize() const
{
return len;
}
bool BinaryStreamReader::ReadCString(char* str, size_t strlen, /* out */ size_t& outlen)
{
size_t fieldlen;
size_t headlen;
if (!ReadLengthWithoutOffset(headlen, fieldlen)) {
return false;
}
// user buffer is not enough
if (fieldlen > strlen) {
return false;
}
// 偏移到数据的位置
//cur += BINARY_PACKLEN_LEN_2;
cur += headlen;
if (cur + fieldlen > ptr + len)
{
outlen = 0;
return false;
}
memcpy(str, cur, fieldlen);
outlen = fieldlen;
cur += outlen;
return true;
}
bool BinaryStreamReader::ReadString(string* str, size_t maxlen, size_t& outlen)
{
size_t headlen;
size_t fieldlen;
if (!ReadLengthWithoutOffset(headlen, fieldlen)) {
return false;
}
// user buffer is not enough
if (maxlen != 0 && fieldlen > maxlen) {
return false;
}
// 偏移到数据的位置
//cur += BINARY_PACKLEN_LEN_2;
cur += headlen;
if (cur + fieldlen > ptr + len)
{
outlen = 0;
return false;
}
str->assign(cur, fieldlen);
outlen = fieldlen;
cur += outlen;
return true;
}
bool BinaryStreamReader::ReadCCString(const char** str, size_t maxlen, size_t& outlen)
{
size_t headlen;
size_t fieldlen;
if (!ReadLengthWithoutOffset(headlen, fieldlen)) {
return false;
}
// user buffer is not enough
if (maxlen != 0 && fieldlen > maxlen) {
return false;
}
// 偏移到数据的位置
//cur += BINARY_PACKLEN_LEN_2;
cur += headlen;
//memcpy(str, cur, fieldlen);
if (cur + fieldlen > ptr + len)
{
outlen = 0;
return false;
}
*str = cur;
outlen = fieldlen;
cur += outlen;
return true;
}
bool BinaryStreamReader::ReadInt32(int32_t& i)
{
const int VALUE_SIZE = sizeof(int32_t);
if (cur + VALUE_SIZE > ptr + len)
return false;
memcpy(&i, cur, VALUE_SIZE);
i = ntohl(i);
cur += VALUE_SIZE;
return true;
}
bool BinaryStreamReader::ReadInt64(int64_t& i)
{
char int64str[128];
size_t length;
if (!ReadCString(int64str, 128, length))
return false;
i = atoll(int64str);
return true;
}
bool BinaryStreamReader::ReadShort(short& i)
{
const int VALUE_SIZE = sizeof(short);
if (cur + VALUE_SIZE > ptr + len) {
return false;
}
memcpy(&i, cur, VALUE_SIZE);
i = ntohs(i);
cur += VALUE_SIZE;
return true;
}
bool BinaryStreamReader::ReadChar(char& c)
{
const int VALUE_SIZE = sizeof(char);
if (cur + VALUE_SIZE > ptr + len) {
return false;
}
memcpy(&c, cur, VALUE_SIZE);
cur += VALUE_SIZE;
return true;
}
bool BinaryStreamReader::ReadLength(size_t& outlen)
{
size_t headlen;
if (!ReadLengthWithoutOffset(headlen, outlen)) {
return false;
}
//cur += BINARY_PACKLEN_LEN_2;
cur += headlen;
return true;
}
bool BinaryStreamReader::ReadLengthWithoutOffset(size_t& headlen, size_t& outlen)
{
headlen = 0;
const char* temp = cur;
char buf[5];
for (size_t i = 0; i < sizeof(buf); i++)
{
memcpy(buf + i, temp, sizeof(char));
temp++;
headlen++;
//if ((buf[i] >> 7 | 0x0) == 0x0)
if ((buf[i] & 0x80) == 0x00)
break;
}
if (cur + headlen > ptr + len)
return false;
unsigned int value;
read7BitEncoded(buf, headlen, value);
outlen = value;
/*if ( cur + BINARY_PACKLEN_LEN_2 > ptr + len ) {
return false;
}
unsigned int tmp;
memcpy(&tmp, cur, sizeof(tmp));
outlen = ntohl(tmp);*/
return true;
}
bool BinaryStreamReader::IsEnd() const
{
assert(cur <= ptr + len);
return cur == ptr + len;
}
const char* BinaryStreamReader::GetData() const
{
return ptr;
}
size_t BinaryStreamReader::ReadAll(char* szBuffer, size_t iLen) const
{
size_t iRealLen = min(iLen, len);
memcpy(szBuffer, ptr, iRealLen);
return iRealLen;
}
//=================class BinaryStreamWriter implementation============//
BinaryStreamWriter::BinaryStreamWriter(string* data) :
m_data(data)
{
m_data->clear();
char str[BINARY_PACKLEN_LEN_2 + CHECKSUM_LEN];
m_data->append(str, sizeof(str));
}
bool BinaryStreamWriter::WriteCString(const char* str, size_t len)
{
std::string buf;
write7BitEncoded(len, buf);
m_data->append(buf);
m_data->append(str, len);
//unsigned int ulen = htonl(len);
//m_data->append((char*)&ulen,sizeof(ulen));
//m_data->append(str,len);
return true;
}
bool BinaryStreamWriter::WriteString(const string& str)
{
return WriteCString(str.c_str(), str.length());
}
const char* BinaryStreamWriter::GetData() const
{
return m_data->data();
}
size_t BinaryStreamWriter::GetSize() const
{
return m_data->length();
}
bool BinaryStreamWriter::WriteInt32(int32_t i, bool isNULL)
{
int32_t i2 = 999999999;
if (isNULL == false)
i2 = htonl(i);
m_data->append((char*)& i2, sizeof(i2));
return true;
}
bool BinaryStreamWriter::WriteInt64(int64_t value, bool isNULL)
{
char int64str[128];
if (isNULL == false)
{
#ifndef _WIN32
sprintf(int64str, "%ld", value);
#else
sprintf(int64str, "%lld", value);
#endif
WriteCString(int64str, strlen(int64str));
}
else
WriteCString(int64str, 0);
return true;
}
bool BinaryStreamWriter::WriteShort(short i, bool isNULL)
{
short i2 = 0;
if (isNULL == false)
i2 = htons(i);
m_data->append((char*)& i2, sizeof(i2));
return true;
}
bool BinaryStreamWriter::WriteChar(char c, bool isNULL)
{
char c2 = 0;
if (isNULL == false)
c2 = c;
(*m_data) += c2;
return true;
}
bool BinaryStreamWriter::WriteDouble(double value, bool isNULL)
{
char doublestr[128];
if (isNULL == false)
{
sprintf(doublestr, "%f", value);
WriteCString(doublestr, strlen(doublestr));
}
else
WriteCString(doublestr, 0);
return true;
}
void BinaryStreamWriter::Flush()
{
char* ptr = &(*m_data)[0];
unsigned int ulen = htonl(m_data->length());
memcpy(ptr, &ulen, sizeof(ulen));
}
void BinaryStreamWriter::Clear()
{
m_data->clear();
char str[BINARY_PACKLEN_LEN_2 + CHECKSUM_LEN];
m_data->append(str, sizeof(str));
}
}// end namespace
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
该协议类提供了对 char、short、int32、int64、string 等常用的字段类型的读写,功能非常强大,免去了定义各种结构体的麻烦。但是从业务上来讲,在实际开发中,每个字段的含义以及读写字段的顺序需要通信的双方提前协商好。
从下一节开始我们将介绍一些常用的协议格式和数据交换格式。