4.14 Linux epoll 模型
# 4.14.1 基本用法
综合 select 和 poll 的一些优缺点,Linux 从内核 2.6 版本开始引入了更高效的 epoll 模型,本节我们来详细介绍 epoll 模型。
要想使用 epoll 模型,必须先需要创建一个 epollfd,这需要使用 epoll_create 函数去创建:
#include <sys/epoll.h>
int epoll_create(int size);
2
3
参数 size 从 Linux 2.6.8 以后就不再使用,但是必须设置一个大于 0 的值。epoll_create 函数调用成功返回一个非负值的 epollfd,调用失败返回 -1。
有了 epollfd 之后,我们需要将我们需要检测事件的其他 fd 绑定到这个 epollfd 上,或者修改一个已经绑定上去的 fd 的事件类型,或者在不需要时将 fd 从 epollfd 上解绑,这都可以使用 epoll_ctl 函数:
int epoll_ctl(int epfd, int op, int fd, struct epoll_event* event);
参数说明:
参数 epfd 即上文提到的 epollfd;
参数 op,操作类型,取值有 EPOLL_CTL_ADD、EPOLL_CTL_MOD 和 EPOLL_CTL_DEL,分别表示向 epollfd 上添加、修改和移除一个其他 fd,当取值是 EPOLL_CTL_DEL,第四个参数 event 忽略不计,可以设置为 NULL;
参数 fd,即需要被操作的 fd;
参数 event,这是一个 epoll_event 结构体的地址,epoll_event 结构体定义如下:
struct epoll_event { uint32_t events; /* 需要检测的 fd 事件,取值与 poll 函数一样 */ epoll_data_t data; /* 用户自定义数据 */ };
1
2
3
4
5epoll_event 结构体的 data 字段的类型是 epoll_data_t,我们可以利用这个字段设置一个自己的自定义数据,它本质上是一个 Union 对象,在 64 位操作系统中其大小是 8 字节,其定义如下:
typedef union epoll_data { void* ptr; int fd; uint32_t u32; uint64_t u64; } epoll_data_t;
1
2
3
4
5
6
7函数返回值:epoll_ctl 调用成功返回 0,调用失败返回 -1,你可以通过 errno 错误码获取具体的错误原因。
创建了 epollfd,设置好某个 fd 上需要检测事件并将该 fd 绑定到 epollfd 上去后,我们就可以调用 epoll_wait 检测事件了,epoll_wait 函数签名如下:
int epoll_wait(int epfd, struct epoll_event* events, int maxevents, int timeout);
参数的形式和 poll 函数很类似,参数 events 是一个 epoll_event 结构数组的首地址,这是一个输出参数,函数调用成功后,events 中存放的是与就绪事件相关 epoll_event 结构体数组;参数 maxevents 是数组元素的个数;timeout 是超时时间,单位是毫秒,如果设置为 0,epoll_wait 会立即返回。
当 epoll_wait 调用成功会返回有事件的 fd 数目;如果返回 0 表示超时;调用失败返回 -1。
epoll_wait 使用示例如下:
while (true)
{
epoll_event epoll_events[1024];
int n = epoll_wait(epollfd, epoll_events, 1024, 1000);
if (n < 0)
{
//被信号中断
if (errno == EINTR)
continue;
//出错,退出
break;
}
else if (n == 0)
{
//超时,继续
continue;
}
for (size_t i = 0; i < n; ++i)
{
if (epoll_events[i].events & EPOLLIN)
{
// 处理可读事件
}
else if (epoll_events[i].events & EPOLLOUT)
{
// 处理可写事件
}
else if (epoll_events[i].events & EPOLLERR)
{
//处理出错事件
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# 4.14.2 epoll_wait 与 poll 的区别
通过前面介绍 poll 与 epoll_wait 函数的介绍,我们可以发现:
epoll_wait 函数调用完之后,我们可以直接在 event 参数中拿到所有有事件就绪的 fd,直接处理即可(event 参数仅仅是个出参);而 poll 函数的事件集合调用前后数量都未改变,只不过调用前我们通过 pollfd 结构体的 events 字段设置待检测事件,调用后我们需要通过 pollfd 结构体的 revents 字段去检测就绪的事件( 参数 fds 既是入参也是出参)。
举个生活中的例子,某人不断给你一些苹果,这些苹果有生有熟,调用 epoll_wait 相当于:
1. 你把苹果挨个投入到 epoll 机器中(调用 epoll_ctl);
2. 调用 epoll_wait 加工,你直接通过另外一个袋子就能拿到所有熟苹果。
2
调用 poll 相当于:
1. 把收到的苹果装入一个袋子里面然后调用 poll 加工;
2. 调用结束后,拿到原来的袋子,袋子中还是原来那么多苹果,只不过熟苹果被贴上了标签纸,你还是需要挨个去查看标签纸挑选熟苹果。
2
当然,这并不意味着,poll 函数的效率不如 epoll_wait,一般在 fd 数量比较多,但某段时间内,就绪事件 fd 数量较少的情况下,epoll_wait 才会体现出它的优势,也就是说 socket 连接数量较大时而活跃连接较少时 epoll 模型更高效。
# 4.14.3 LT 模式和 ET 模式
与 poll 的事件宏相比,epoll 新增了一个事件宏 EPOLLET,这就是所谓的边缘触发模式(Edge Trigger,ET),而默认的模式我们称为水平触发模式(Level Trigger,LT)。这两种模式的区别在于:
- 对于水平触发模式,一个事件只要有,就会一直触发;
- 对于边缘触发模式,只有一个事件从无到有才会触发。
这两个词汇来自电学术语,你可以将 fd 上有数据认为是高电平,没有数据认为是低电平,将 fd 可写认为是高电平,fd 不可写认为是低电平。那么水平模式的触发条件是状态处于高电平,而边缘模式的触发条件是新来一次电信号将当前状态变为高电平,即:
水平模式的触发条件
1. 低电平 => 高电平
2. 处于高电平状态
2
边缘模式的触发条件
1. 低电平 => 高电平
说的有点抽象,以 socket 的读事件为例,对于水平模式,只要 socket 上有未读完的数据,就会一直产生 EPOLLIN 事件;而对于边缘模式,socket 上每新来一次数据就会触发一次,如果上一次触发后,未将 socket 上的数据读完,也不会再触发,除非再新来一次数据。对于 socket 写事件,如果 socket 的 TCP 窗口一直不饱和,会一直触发 EPOLLOUT 事件;而对于边缘模式,只会触发一次,除非 TCP 窗口由不饱和变成饱和再一次变成不饱和,才会再次触发 EPOLLOUT 事件。
socket 可读事件水平模式触发条件:
1. socket上无数据 => socket上有数据
2. socket处于有数据状态
2
socket 可读事件边缘模式触发条件:
1. socket上无数据 => socket上有数据
2. socket又新来一次数据
2
socket 可写事件水平模式触发条件:
1. socket可写 => socket可写
2. socket不可写 => socket可写
2
socket 可写事件边缘模式触发条件:
1. socket不可写 => socket可写
也就是说,如果对于一个非阻塞 socket,如果使用 epoll 边缘模式去检测数据是否可读,触发可读事件以后,一定要一次性把 socket 上的数据收取干净才行,也就是说一定要循环调用 recv 函数直到 recv 出错,错误码是EWOULDBLOCK(EAGAIN 一样)(此时表示 socket 上本次数据已经读完);如果使用水平模式,则不用,你可以根据业务一次性收取固定的字节数,或者收完为止。边缘模式下收取数据的代码写法示例如下:
bool TcpSession::RecvEtMode()
{
//每次只收取256个字节
char buff[256];
while (true)
{
int nRecv = ::recv(clientfd_, buff, 256, 0);
if (nRecv == -1)
{
if (errno == EWOULDBLOCK)
return true;
else if (errno == EINTR)
continue;
return false;
}
//对端关闭了socket
else if (nRecv == 0)
return false;
inputBuffer_.add(buff, (size_t)nRecv);
}
return true;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
下面我们来看几个具体的例子来比较一下 LT 模式与 ET 模式的区别。
先来测试一下 LT 模式 与 ET 模式在处理读事件上的区别。
代码如下:
/**
* 验证epoll的LT与ET模式的区别, epoll_server.cpp
* zhangyl 2019.04.01
*/
#include<sys/types.h>
#include<sys/socket.h>
#include<arpa/inet.h>
#include<unistd.h>
#include<fcntl.h>
#include<sys/epoll.h>
#include<poll.h>
#include<iostream>
#include<string.h>
#include<vector>
#include<errno.h>
#include<iostream>
int main()
{
//创建一个监听socket
int listenfd = socket(AF_INET, SOCK_STREAM, 0);
if (listenfd == -1)
{
std::cout << "create listen socket error" << std::endl;
return -1;
}
//设置重用ip地址和端口号
int on = 1;
setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, (char*)&on, sizeof(on));
setsockopt(listenfd, SOL_SOCKET, SO_REUSEPORT, (char*)&on, sizeof(on));
//将监听socker设置为非阻塞的
int oldSocketFlag = fcntl(listenfd, F_GETFL, 0);
int newSocketFlag = oldSocketFlag | O_NONBLOCK;
if (fcntl(listenfd, F_SETFL, newSocketFlag) == -1)
{
close(listenfd);
std::cout << "set listenfd to nonblock error" << std::endl;
return -1;
}
//初始化服务器地址
struct sockaddr_in bindaddr;
bindaddr.sin_family = AF_INET;
bindaddr.sin_addr.s_addr = htonl(INADDR_ANY);
bindaddr.sin_port = htons(3000);
if (bind(listenfd, (struct sockaddr*)&bindaddr, sizeof(bindaddr)) == -1)
{
std::cout << "bind listen socker error." << std::endl;
close(listenfd);
return -1;
}
//启动监听
if (listen(listenfd, SOMAXCONN) == -1)
{
std::cout << "listen error." << std::endl;
close(listenfd);
return -1;
}
//创建epollfd
int epollfd = epoll_create(1);
if (epollfd == -1)
{
std::cout << "create epollfd error." << std::endl;
close(listenfd);
return -1;
}
epoll_event listen_fd_event;
listen_fd_event.data.fd = listenfd;
listen_fd_event.events = EPOLLIN;
//取消注释掉这一行,则使用ET模式
//listen_fd_event.events |= EPOLLET;
//将监听sokcet绑定到epollfd上去
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, listenfd, &listen_fd_event) == -1)
{
std::cout << "epoll_ctl error" << std::endl;
close(listenfd);
return -1;
}
int n;
while (true)
{
epoll_event epoll_events[1024];
n = epoll_wait(epollfd, epoll_events, 1024, 1000);
if (n < 0)
{
//被信号中断
if (errno == EINTR)
continue;
//出错,退出
break;
}
else if (n == 0)
{
//超时,继续
continue;
}
for (size_t i = 0; i < n; ++i)
{
//事件可读
if (epoll_events[i].events & EPOLLIN)
{
if (epoll_events[i].data.fd == listenfd)
{
//侦听socket,接受新连接
struct sockaddr_in clientaddr;
socklen_t clientaddrlen = sizeof(clientaddr);
int clientfd = accept(listenfd, (struct sockaddr*)&clientaddr, &clientaddrlen);
if (clientfd != -1)
{
int oldSocketFlag = fcntl(clientfd, F_GETFL, 0);
int newSocketFlag = oldSocketFlag | O_NONBLOCK;
if (fcntl(clientfd, F_SETFL, newSocketFlag) == -1)
{
close(clientfd);
std::cout << "set clientfd to nonblocking error." << std::endl;
}
else
{
epoll_event client_fd_event;
client_fd_event.data.fd = clientfd;
client_fd_event.events = EPOLLIN;
//取消注释这一行,则使用ET模式
//client_fd_event.events |= EPOLLET;
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, clientfd, &client_fd_event) != -1)
{
std::cout << "new client accepted,clientfd: " << clientfd << std::endl;
}
else
{
std::cout << "add client fd to epollfd error" << std::endl;
close(clientfd);
}
}
}
}
else
{
std::cout << "client fd: " << epoll_events[i].data.fd << " recv data." << std::endl;
//普通clientfd
char ch;
//每次只收一个字节
int m = recv(epoll_events[i].data.fd, &ch, 1, 0);
if (m == 0)
{
//对端关闭了连接,从epollfd上移除clientfd
if (epoll_ctl(epollfd, EPOLL_CTL_DEL, epoll_events[i].data.fd, NULL) != -1)
{
std::cout << "client disconnected,clientfd:" << epoll_events[i].data.fd << std::endl;
}
close(epoll_events[i].data.fd);
}
else if (m < 0)
{
//出错
if (errno != EWOULDBLOCK && errno != EINTR)
{
if (epoll_ctl(epollfd, EPOLL_CTL_DEL, epoll_events[i].data.fd, NULL) != -1)
{
std::cout << "client disconnected,clientfd:" << epoll_events[i].data.fd << std::endl;
}
close(epoll_events[i].data.fd);
}
}
else
{
//正常收到数据
std::cout << "recv from client:" << epoll_events[i].data.fd << ", " << ch << std::endl;
}
}
}
else if (epoll_events[i].events & EPOLLERR)
{
// TODO 暂不处理
}
}
}
close(listenfd);
return 0;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
我们先来看水平模式的行为,将代码 79 行和 135 行注释掉则使用 LT 模式,我们编译下程序并运行:
[root@localhost testepoll]# g++ -g -o epoll_server epoll_server.cpp
[root@localhost testepoll]# ./epoll_server
2
然后再另外开启一个 shell 窗口,使用 nc 命令模拟一个客户端,连接服务器成功后,我们给服务器发送一个消息"abcef":
[root@localhost ~]# nc -v 127.0.0.1 3000
Ncat: Version 7.50 ( https://nmap.org/ncat )
Ncat: Connected to 127.0.0.1:3000.
abcdef
2
3
4
5
此时服务器端输出:
[root@localhost testepoll]# ./epoll_server
new client accepted,clientfd: 5
client fd: 5 recv data.
recv from client:5, a
client fd: 5 recv data.
recv from client:5, b
client fd: 5 recv data.
recv from client:5, c
client fd: 5 recv data.
recv from client:5, d
client fd: 5 recv data.
recv from client:5, e
client fd: 5 recv data.
recv from client:5, f
client fd: 5 recv data.
recv from client:5,
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
nc 命令实际发送了 a、b、c、d、e、f 和 \n 七个字符,由于服务器端使用的是 LT 模式,每次接收一个字符,只要 socket 接收缓冲区中仍有数据可读,POLLIN 事件就会一直触发,所以服务器一共有 7 次输出,直到 socket 接收缓冲区没有数据为止。
我们将代码 79 行和 135 行注释取消掉,使用 ET 模式再试一下,修改代码并重新编译,然后重新运行一下。再次使用 nc 命令模拟一个客户端连接后发送"abcef",服务器只会有一次输出,效果如下:
由于使用了 ET 模式,只会触发一次 POLLIN 事件,如果此时没有新数据到来,就再也不会触发。所以,如果我们继续给服务器发送一条新数据,如 123,服务器将再次触发一次 POLLIN 事件,然后打印出字母 b,效果如下:
所以如果使用 ET 模式 处理读事件,切记要将该次 socket 上的数据收完。
再来测试一下 LT 模式 与 ET 模式在处理写事件上的区别。
修改上述代码如下:
/**
* 验证epoll的LT与ET模式的区别, epoll_server.cpp
* zhangyl 2019.04.01
*/
#include<sys/types.h>
#include<sys/socket.h>
#include<arpa/inet.h>
#include<unistd.h>
#include<fcntl.h>
#include<sys/epoll.h>
#include<poll.h>
#include<iostream>
#include<string.h>
#include<vector>
#include<errno.h>
#include<iostream>
int main()
{
//创建一个监听socket
int listenfd = socket(AF_INET, SOCK_STREAM, 0);
if (listenfd == -1)
{
std::cout << "create listen socket error" << std::endl;
return -1;
}
//设置重用ip地址和端口号
int on = 1;
setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, (char*)& on, sizeof(on));
setsockopt(listenfd, SOL_SOCKET, SO_REUSEPORT, (char*)& on, sizeof(on));
//将监听socker设置为非阻塞的
int oldSocketFlag = fcntl(listenfd, F_GETFL, 0);
int newSocketFlag = oldSocketFlag | O_NONBLOCK;
if (fcntl(listenfd, F_SETFL, newSocketFlag) == -1)
{
close(listenfd);
std::cout << "set listenfd to nonblock error" << std::endl;
return -1;
}
//初始化服务器地址
struct sockaddr_in bindaddr;
bindaddr.sin_family = AF_INET;
bindaddr.sin_addr.s_addr = htonl(INADDR_ANY);
bindaddr.sin_port = htons(3000);
if (bind(listenfd, (struct sockaddr*) & bindaddr, sizeof(bindaddr)) == -1)
{
std::cout << "bind listen socker error." << std::endl;
close(listenfd);
return -1;
}
//启动监听
if (listen(listenfd, SOMAXCONN) == -1)
{
std::cout << "listen error." << std::endl;
close(listenfd);
return -1;
}
//创建epollfd
int epollfd = epoll_create(1);
if (epollfd == -1)
{
std::cout << "create epollfd error." << std::endl;
close(listenfd);
return -1;
}
epoll_event listen_fd_event;
listen_fd_event.data.fd = listenfd;
listen_fd_event.events = EPOLLIN;
//取消注释掉这一行,则使用ET模式
//listen_fd_event.events |= EPOLLET;
//将监听sokcet绑定到epollfd上去
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, listenfd, &listen_fd_event) == -1)
{
std::cout << "epoll_ctl error" << std::endl;
close(listenfd);
return -1;
}
int n;
while (true)
{
epoll_event epoll_events[1024];
n = epoll_wait(epollfd, epoll_events, 1024, 1000);
if (n < 0)
{
//被信号中断
if (errno == EINTR)
continue;
//出错,退出
break;
}
else if (n == 0)
{
//超时,继续
continue;
}
for (size_t i = 0; i < n; ++i)
{
//事件可读
if (epoll_events[i].events & EPOLLIN)
{
if (epoll_events[i].data.fd == listenfd)
{
//侦听socket,接受新连接
struct sockaddr_in clientaddr;
socklen_t clientaddrlen = sizeof(clientaddr);
int clientfd = accept(listenfd, (struct sockaddr*)&clientaddr, &clientaddrlen);
if (clientfd != -1)
{
int oldSocketFlag = fcntl(clientfd, F_GETFL, 0);
int newSocketFlag = oldSocketFlag | O_NONBLOCK;
if (fcntl(clientfd, F_SETFL, newSocketFlag) == -1)
{
close(clientfd);
std::cout << "set clientfd to nonblocking error." << std::endl;
}
else
{
epoll_event client_fd_event;
client_fd_event.data.fd = clientfd;
//同时侦听新来连接socket的读和写时间
client_fd_event.events = EPOLLIN | EPOLLOUT;
//取消注释这一行,则使用ET模式
//client_fd_event.events |= EPOLLET;
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, clientfd, &client_fd_event) != -1)
{
std::cout << "new client accepted,clientfd: " << clientfd << std::endl;
}
else
{
std::cout << "add client fd to epollfd error" << std::endl;
close(clientfd);
}
}
}
}
else
{
std::cout << "client fd: " << epoll_events[i].data.fd << " recv data." << std::endl;
//普通clientfd
char recvbuf[1024] = { 0 };
//每次只收一个字节
int m = recv(epoll_events[i].data.fd, recvbuf, 1024, 0);
if (m == 0)
{
//对端关闭了连接,从epollfd上移除clientfd
if (epoll_ctl(epollfd, EPOLL_CTL_DEL, epoll_events[i].data.fd, NULL) != -1)
{
std::cout << "client disconnected,clientfd:" << epoll_events[i].data.fd << std::endl;
}
close(epoll_events[i].data.fd);
}
else if (m < 0)
{
//出错
if (errno != EWOULDBLOCK && errno != EINTR)
{
if (epoll_ctl(epollfd, EPOLL_CTL_DEL, epoll_events[i].data.fd, NULL) != -1)
{
std::cout << "client disconnected,clientfd:" << epoll_events[i].data.fd << std::endl;
}
close(epoll_events[i].data.fd);
}
}
else
{
//正常收到数据
std::cout << "recv from client:" << epoll_events[i].data.fd << ", " << recvbuf << std::endl;
}
}
}
else if (epoll_events[i].events & EPOLLOUT)
{
//只处理客户端fd的可写事件
if (epoll_events[i].data.fd != listenfd)
{
//打印结果
std::cout << "EPOLLOUT triggered,clientfd:" << epoll_events[i].data.fd << std::endl;
}
}
else if (epoll_events[i].events & EPOLLERR)
{
// TODO 暂不处理
}
}
}
close(listenfd);
return 0;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
上述代码中,我们对新来的连接 fd 同时注册读和写事件(代码 133 行),再次编译程序执行:
[root@iZ238vnojlyZ testepollet]# vi epoll.cpp
[root@iZ238vnojlyZ testepollet]# g++ -g -o epoll epoll_server.cpp
[root@iZ238vnojlyZ testepollet]# ./epoll_server
2
3
然后使用 nc 命令模拟一个客户端去连接 epoll_server:
[root@iZ238vnojlyZ ~]# nc -v 127.0.0.1 3000
Ncat: Version 6.40 ( http://nmap.org/ncat )
Ncat: Connected to 127.0.0.1:3000.
2
3
此时服务器端(epoll_server)会疯狂的输出可写事件触发消息:
之所以是这样,是因为我们注册了可写事件且使用的是 LT 模式,LT 模式下,由于这里的服务器端对应的客户端 fd 一直是可写的,有写事件一直触发,所以看到屏幕不断输出。
我们再将服务器端与客户端建立连接时新建的 fd 设置为 ET 模式再实验一下:
/**
* 验证epoll的LT与ET模式的区别, epoll_server.cpp
* zhangyl 2019.04.01
*/
#include<sys/types.h>
#include<sys/socket.h>
#include<arpa/inet.h>
#include<unistd.h>
#include<fcntl.h>
#include<sys/epoll.h>
#include<poll.h>
#include<iostream>
#include<string.h>
#include<vector>
#include<errno.h>
#include<iostream>
int main()
{
//创建一个监听socket
int listenfd = socket(AF_INET, SOCK_STREAM, 0);
if (listenfd == -1)
{
std::cout << "create listen socket error" << std::endl;
return -1;
}
//设置重用ip地址和端口号
int on = 1;
setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, (char*)& on, sizeof(on));
setsockopt(listenfd, SOL_SOCKET, SO_REUSEPORT, (char*)& on, sizeof(on));
//将监听socker设置为非阻塞的
int oldSocketFlag = fcntl(listenfd, F_GETFL, 0);
int newSocketFlag = oldSocketFlag | O_NONBLOCK;
if (fcntl(listenfd, F_SETFL, newSocketFlag) == -1)
{
close(listenfd);
std::cout << "set listenfd to nonblock error" << std::endl;
return -1;
}
//初始化服务器地址
struct sockaddr_in bindaddr;
bindaddr.sin_family = AF_INET;
bindaddr.sin_addr.s_addr = htonl(INADDR_ANY);
bindaddr.sin_port = htons(3000);
if (bind(listenfd, (struct sockaddr*) & bindaddr, sizeof(bindaddr)) == -1)
{
std::cout << "bind listen socker error." << std::endl;
close(listenfd);
return -1;
}
//启动监听
if (listen(listenfd, SOMAXCONN) == -1)
{
std::cout << "listen error." << std::endl;
close(listenfd);
return -1;
}
//创建epollfd
int epollfd = epoll_create(1);
if (epollfd == -1)
{
std::cout << "create epollfd error." << std::endl;
close(listenfd);
return -1;
}
epoll_event listen_fd_event;
listen_fd_event.data.fd = listenfd;
listen_fd_event.events = EPOLLIN;
//取消注释掉这一行,则使用ET模式
//listen_fd_event.events |= EPOLLET;
//将监听sokcet绑定到epollfd上去
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, listenfd, &listen_fd_event) == -1)
{
std::cout << "epoll_ctl error" << std::endl;
close(listenfd);
return -1;
}
int n;
while (true)
{
epoll_event epoll_events[1024];
n = epoll_wait(epollfd, epoll_events, 1024, 1000);
if (n < 0)
{
//被信号中断
if (errno == EINTR)
continue;
//出错,退出
break;
}
else if (n == 0)
{
//超时,继续
continue;
}
for (size_t i = 0; i < n; ++i)
{
//事件可读
if (epoll_events[i].events & EPOLLIN)
{
if (epoll_events[i].data.fd == listenfd)
{
//侦听socket,接受新连接
struct sockaddr_in clientaddr;
socklen_t clientaddrlen = sizeof(clientaddr);
int clientfd = accept(listenfd, (struct sockaddr*) & clientaddr, &clientaddrlen);
if (clientfd != -1)
{
int oldSocketFlag = fcntl(clientfd, F_GETFL, 0);
int newSocketFlag = oldSocketFlag | O_NONBLOCK;
if (fcntl(clientfd, F_SETFL, newSocketFlag) == -1)
{
close(clientfd);
std::cout << "set clientfd to nonblocking error." << std::endl;
}
else
{
epoll_event client_fd_event;
client_fd_event.data.fd = clientfd;
//同时侦听新来连接socket的读和写时间
client_fd_event.events = EPOLLIN | EPOLLOUT;
//取消注释这一行,则使用ET模式
client_fd_event.events |= EPOLLET;
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, clientfd, &client_fd_event) != -1)
{
std::cout << "new client accepted,clientfd: " << clientfd << std::endl;
}
else
{
std::cout << "add client fd to epollfd error" << std::endl;
close(clientfd);
}
}
}
}
else
{
std::cout << "client fd: " << epoll_events[i].data.fd << " recv data." << std::endl;
//普通clientfd
char recvbuf[1024] = { 0 };
//每次只收一个字节
int m = recv(epoll_events[i].data.fd, recvbuf, 1024, 0);
if (m == 0)
{
//对端关闭了连接,从epollfd上移除clientfd
if (epoll_ctl(epollfd, EPOLL_CTL_DEL, epoll_events[i].data.fd, NULL) != -1)
{
std::cout << "client disconnected,clientfd:" << epoll_events[i].data.fd << std::endl;
}
close(epoll_events[i].data.fd);
}
else if (m < 0)
{
//出错
if (errno != EWOULDBLOCK && errno != EINTR)
{
if (epoll_ctl(epollfd, EPOLL_CTL_DEL, epoll_events[i].data.fd, NULL) != -1)
{
std::cout << "client disconnected,clientfd:" << epoll_events[i].data.fd << std::endl;
}
close(epoll_events[i].data.fd);
}
}
else
{
//正常收到数据
std::cout << "recv from client:" << epoll_events[i].data.fd << ", " << recvbuf << std::endl;
epoll_event client_fd_event;
client_fd_event.data.fd = epoll_events[i].data.fd;
//再次给clientfd注册检测可写事件
client_fd_event.events = EPOLLIN | EPOLLOUT | EPOLLET;
if (epoll_ctl(epollfd, EPOLL_CTL_MOD, epoll_events[i].data.fd, &client_fd_event) != -1)
{
std::cout << "epoll_ctl successfully, mode: EPOLL_CTL_MOD, clientfd:" << epoll_events[i].data.fd << std::endl;
}
}
}
}
else if (epoll_events[i].events & EPOLLOUT)
{
//只处理客户端fd的可写事件
if (epoll_events[i].data.fd != listenfd)
{
//打印结果
std::cout << "EPOLLOUT triggered,clientfd:" << epoll_events[i].data.fd << std::endl;
}
}
else if (epoll_events[i].events & EPOLLERR)
{
// TODO 暂不处理
}
}
}
close(listenfd);
return 0;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
上述逻辑中,服务器端在每次收到客户端消息时会重新给客户端 fd 注册检测可写事件(EPOLLOUT),重新编译代码,启动 epoll_server,再次使用 nc 命令模拟客户端给 epoll_server 发送几条消息,结果如下:
通过上述输出,我们可以发现,epoll_server 使用 ET 模式下即使给客户端 fd 注册了检测可写事件不会一直触发,只会触发一次,触发完后只有再次注册检测可写事件才会继续触发,这里是靠客户端来新消息驱动再次注册检测可写事件。也就是说,如果我们使用 ET 模式去处理可写事件时不必像 LT 模式那样为了避免不必要的可写触发在触发后需要立即移除检测可写事件。
这就意味着,使用 LT 模式,如果你的实现依赖于可写事件触发去发送数据,那么你一定要在数据发送完之后移除检测可写事件,避免没有数据发送时无意义的触发;使用 ET 模式时,如果你的实现也依赖于可写事件触发去发送数据,可写事件触发后,你调用 send 函数(Linux 平台也可以使用 write)去发送数据,如果数据本次不能全部发送完(对于非阻塞的 socket,此时 send 函数返回 -1,错误码为 EAGAIN 或 EWOULDBLOCK),你一定要继续注册检测可写事件,否则你剩余的数据就再也没有机会发送了,因为 ET 模式的可写事件再也不会触发。
在目前主流的网络库中,发数据的逻辑都不是上面所说的依赖于写事件触发,在写事件触发时去发数据。这种做法不好,那好的做法是什么呢?我会在后面《收数据与发数据的正确做法》那一节详细介绍。
最后容我再啰嗦几句,总结起来:
- LT 模式下,读事件触发后,可以按需收取想要的字节数,不用把本次接收到的数据收取干净(即不用循环到 recv 或者 read 函数返回 -1,错误码为 EWOULDBLOCK 或 EAGAIN);ET 模式下,读事件必须把数据收取干净,因为你不一定有下一次机会再收取数据了,即使有机会,也可能存在上次没读完的数据没有及时处理,造成客户端响应延迟。
- LT 模式下,不需要写事件一定要及时移除,避免不必要的触发,浪费 CPU 资源;ET 模式下,写事件触发后,如果还需要下一次的写事件触发来驱动任务(例如发上次剩余的数据),你需要继续注册一次检测可写事件。
- LT 模式和 ET 模式各有优缺点,无所谓孰优孰劣。使用 LT 模式,我们可以自由决定每次收取多少字节(对于普通 socket)或何时接收连接(对于侦听 socket),但是可能会导致多次触发;使用 ET 模式,我们必须每次都要将数据收完(对于普通 socket)或必须理解调用 accept 接收连接(对于侦听socket),其优点是触发次数少。
你一定要透彻地理解 epoll 的 LT 模式和 ET 模式在数据读写时的区别。因为,现代互联网大环境下作为后台服务载体的主流操作系统是 Linux,而 epoll 系统调用是 Linux 下实现高性能服务网络模块的必备组件!只有理解了它们,你才能编写出高性能的网络通信库乃至整个服务。
# 4.14.4 EPOLLONESHOT 选项
epoll 模型还有一个选项叫 EPOLLONESHOT,Linux Manual 手册上对这个选项是这样说明的:
Sets the one-shot behavior for the associated file descriptor. This means that after an event is pulled out with epoll_wait the associated file descriptor is internally disabled and no other events will be reported by the epoll interface. The user must call epoll_ctl() with EPOLL_CTL_MOD to rearm the file descriptor with a new event mask.
这段描述的含义是说如果某个 socket 注册了该标志, 其注册监听的事件(例如 EPOLLIN )触发一次后再也不会继续触发,除非重新注册监听该事件类型。
我们通过一个实例来看一下 EPOLLONESHOT 选项的效果,代码如下:
/**
* 验证 epoll EPOLLONESHOT 选项,epoll_server_with_oneshot.cpp
* zhangyl 2019.04.01
*/
#include<sys/types.h>
#include<sys/socket.h>
#include<arpa/inet.h>
#include<unistd.h>
#include<fcntl.h>
#include<sys/epoll.h>
#include<poll.h>
#include<iostream>
#include<string.h>
#include<vector>
#include<errno.h>
#include<iostream>
int main()
{
//创建一个监听socket
int listenfd = socket(AF_INET, SOCK_STREAM, 0);
if (listenfd == -1)
{
std::cout << "create listen socket error" << std::endl;
return -1;
}
//设置重用ip地址和端口号
int on = 1;
setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, (char*)&on, sizeof(on));
setsockopt(listenfd, SOL_SOCKET, SO_REUSEPORT, (char*)&on, sizeof(on));
//将监听socker设置为非阻塞的
int oldSocketFlag = fcntl(listenfd, F_GETFL, 0);
int newSocketFlag = oldSocketFlag | O_NONBLOCK;
if (fcntl(listenfd, F_SETFL, newSocketFlag) == -1)
{
close(listenfd);
std::cout << "set listenfd to nonblock error" << std::endl;
return -1;
}
//初始化服务器地址
struct sockaddr_in bindaddr;
bindaddr.sin_family = AF_INET;
bindaddr.sin_addr.s_addr = htonl(INADDR_ANY);
bindaddr.sin_port = htons(3000);
if (bind(listenfd, (struct sockaddr*)&bindaddr, sizeof(bindaddr)) == -1)
{
std::cout << "bind listen socker error." << std::endl;
close(listenfd);
return -1;
}
//启动监听
if (listen(listenfd, SOMAXCONN) == -1)
{
std::cout << "listen error." << std::endl;
close(listenfd);
return -1;
}
//创建epollfd
int epollfd = epoll_create(1);
if (epollfd == -1)
{
std::cout << "create epollfd error." << std::endl;
close(listenfd);
return -1;
}
epoll_event listen_fd_event;
listen_fd_event.data.fd = listenfd;
listen_fd_event.events = EPOLLIN;
//将监听sokcet绑定到epollfd上去
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, listenfd, &listen_fd_event) == -1)
{
std::cout << "epoll_ctl error" << std::endl;
close(listenfd);
return -1;
}
int n;
while (true)
{
epoll_event epoll_events[1024];
n = epoll_wait(epollfd, epoll_events, 1024, 1000);
if (n < 0)
{
//被信号中断
if (errno == EINTR)
continue;
//出错,退出
break;
}
else if (n == 0)
{
//超时,继续
continue;
}
for (size_t i = 0; i < n; ++i)
{
//事件可读
if (epoll_events[i].events & EPOLLIN)
{
if (epoll_events[i].data.fd == listenfd)
{
//侦听socket,接受新连接
struct sockaddr_in clientaddr;
socklen_t clientaddrlen = sizeof(clientaddr);
int clientfd = accept(listenfd, (struct sockaddr*)&clientaddr, &clientaddrlen);
if (clientfd != -1)
{
int oldSocketFlag = fcntl(clientfd, F_GETFL, 0);
int newSocketFlag = oldSocketFlag | O_NONBLOCK;
if (fcntl(clientfd, F_SETFL, newSocketFlag) == -1)
{
close(clientfd);
std::cout << "set clientfd to nonblocking error." << std::endl;
}
else
{
epoll_event client_fd_event;
client_fd_event.data.fd = clientfd;
client_fd_event.events = EPOLLIN;
client_fd_event.events |= EPOLLONESHOT;
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, clientfd, &client_fd_event) != -1)
{
std::cout << "new client accepted,clientfd: " << clientfd << std::endl;
}
else
{
std::cout << "add client fd to epollfd error" << std::endl;
close(clientfd);
}
}
}
}
else
{
std::cout << "client fd: " << epoll_events[i].data.fd << " recv data." << std::endl;
//普通clientfd
char ch;
//每次只收一个字节
int m = recv(epoll_events[i].data.fd, &ch, 1, 0);
if (m == 0)
{
//对端关闭了连接,从epollfd上移除clientfd
if (epoll_ctl(epollfd, EPOLL_CTL_DEL, epoll_events[i].data.fd, NULL) != -1)
{
std::cout << "client disconnected,clientfd:" << epoll_events[i].data.fd << std::endl;
}
close(epoll_events[i].data.fd);
}
else if (m < 0)
{
//出错
if (errno != EWOULDBLOCK && errno != EINTR)
{
if (epoll_ctl(epollfd, EPOLL_CTL_DEL, epoll_events[i].data.fd, NULL) != -1)
{
std::cout << "client disconnected,clientfd:" << epoll_events[i].data.fd << std::endl;
}
close(epoll_events[i].data.fd);
}
}
else
{
//正常收到数据
std::cout << "recv from client:" << epoll_events[i].data.fd << ", " << ch << std::endl;
}
}
}
else if (epoll_events[i].events & POLLERR)
{
// TODO 暂不处理
}
}
}
close(listenfd);
return 0;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
上述代码 132 行给 clientfd 注册 EPOLLONESHOT 事件,由于使用的是水平模式,每次 EPOLLIN 触发后,只读取一个字符。我们使用 nc 命令模拟一个客户端,看下执行效果:
客户端执行效果:
服务器端执行效果:
服务器端每次只读取一个字符,由于注册了 EPOLLONESHOT 选项,第一次 EPOLLIN 事件触发后,服务器端再也不会触发 EPOLLIN,即使客户端再次发送“hello"字符串。
将上面的代码稍微修改一下,在读取一个字符成功后,我们再次给这个 clientfd 注册 EPOLLIN 事件,在原代码 176 行之后添加如下代码:
//添加在原代码 176 行之后
epoll_event client_fd_event;
client_fd_event.data.fd = epoll_events[i].data.fd;
client_fd_event.events = EPOLLIN;
//在这里再次为clientfd再次注册EPOLLIN事件
if (epoll_ctl(epollfd, EPOLL_CTL_MOD, epoll_events[i].data.fd, &client_fd_event) != -1)
{
std::cout << "rearm EPOLLIN event to clientfd: " << epoll_events[i].data.fd << std::endl;
}
else
{
//epoll_ctl调用失败从epollfd移除clientfd,并关闭clientfd
if (epoll_ctl(epollfd, EPOLL_CTL_DEL, epoll_events[i].data.fd, NULL) != -1)
{
std::cout << "remove clientfd from epoll fd successfully, clientfd:" << epoll_events[i].data.fd << std::endl;
}
close(epoll_events[i].data.fd);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
重新编译程序并执行,客户端再次发送两个“hello”字符串,执行效果如下:
这次可以正常收完整每次的数据包了。
在一些特殊应用场景中,如果涉及到多个线程同时处理某个 socket 上的事件,为了避免数据乱序,我们不得不使用复杂的多线程同步机制;但是有了 EPOLLONESHOT 选项,我们就可以减少线程同步逻辑了。以 EPOLLIN 事件处理为例,多个线程同时从一个 socket 上读数据,可以使某个线程先处理,在该线程处理完了之后再重新给该 socket 添加读事件,这样读事件再次触发时,可以被其他线程继续处理。这种做法的本质上还是保证同一个时刻只有一个线程在处理某个 socket 上的事件。当然,多个线程同时操作一个 socket 本来就是一种不好的实践,读者在实际开发时应该尽量避免这种做法。