12. 事件循环和定时器
# 12. 事件循环和定时器
咱们这服务器啊,还缺个超重要的东西——超时处理!在网络世界里混,哪个应用不得处理超时啊?毕竟网络那头的小伙伴随时可能“玩失踪”。不仅像读/写这样正在进行的输入输出(IO)操作需要设置超时,把那些闲置的TCP连接“扫地出门”也是个不错的主意。要实现超时处理,就得给事件循环(event loop)“动个小手术”,毕竟poll
是唯一会阻塞的部分嘛。
瞅瞅咱们现有的事件循环代码:
int rv = poll(poll_args.data(), (nfds_t)poll_args.size(), 1000);
poll
这个系统调用(syscall)有个超时参数,它能给poll
的执行时间设个上限。现在这个超时值是个随便设的1000毫秒。要是咱们根据定时器来设置这个超时值,poll
应该在定时器到期时或者之前“醒来”,这样咱们就能及时“触发”定时器啦。
问题来了,咱们可能有好几个定时器呢,poll
的超时值应该是离当前时间最近的那个定时器的超时值。这就需要找个数据结构来帮忙找到最近的定时器。堆(heap)数据结构在找最值方面可是“大明星”,经常被用来干这种活儿。当然啦,任何能用来排序的数据结构都能派上用场。比如说,咱们可以用AVL树给定时器排个序,甚至还能给这棵树“加点料”,让它能随时跟踪最小值。
那就先从给闲置的TCP连接加定时器开始吧。每个连接都配一个定时器,设定一个固定的未来超时时间。每次连接上有IO活动,就把这个定时器“续个命”,重新设置成固定的超时时间。注意哦,当咱们更新定时器的时候,它就变成离当前时间最远的那个啦。利用这个特点,咱们能把数据结构弄得简单点,一个简单的链表就足够给定时器排序啦:新的或者更新过的定时器直接扔到链表末尾,链表就能一直保持有序。而且链表操作的时间复杂度是O(1),比那些排序数据结构可强多啦。
定义链表是个小菜一碟的事儿:
struct DList {
DList *prev = NULL;
DList *next = NULL;
};
inline void dlist_init(DList *node) {
node->prev = node->next = node;
}
inline bool dlist_empty(DList *node) {
return node->next == node;
}
inline void dlist_detach(DList *node) {
DList *prev = node->prev;
DList *next = node->next;
prev->next = next;
next->prev = prev;
}
inline void dlist_insert_before(DList *target, DList * rookie) {
DList *prev = target->prev;
prev->next = rookie;
rookie->prev = prev;
rookie->next = target;
target->prev = rookie;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
get_monotonic_usec
这个函数是用来获取时间的。要注意,时间戳必须是单调递增的哦。要是时间戳往后跳,那在计算机系统里可就“捅大篓子”啦,各种麻烦事儿都得找上门。
static uint64_t get_monotonic_usec() {
timespec tv = {0, 0};
clock_gettime(CLOCK_MONOTONIC, &tv);
return uint64_t(tv.tv_sec) * 1000000 + tv.tv_nsec / 1000;
}
2
3
4
5
接下来,把链表加到服务器和连接结构体里。
// 全局变量
static struct {
HMap db;
// 所有客户端连接的映射,以文件描述符(fd)为键
std::vector<Conn *> fd2conn;
// 闲置连接的定时器
DList idle_list;
} g_data;
struct Conn {
int fd = -1;
uint32_t state = 0; // 要么是STATE_REQ,要么是STATE_RES
// 读缓冲区
size_t rbuf_size = 0;
uint8_t rbuf[4 + k_max_msg];
// 写缓冲区
size_t wbuf_size = 0;
size_t wbuf_sent = 0;
uint8_t wbuf[4 + k_max_msg];
uint64_t idle_start = 0;
// 定时器
DList idle_list;
};
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
修改后的事件循环大概是这样:
int main() {
// 一些初始化操作
dlist_init(&g_data.idle_list);
int fd = socket(AF_INET, SOCK_STREAM, 0);
// 绑定、监听以及其他杂七杂八的操作
// 代码省略...
// 事件循环
std::vector<struct pollfd> poll_args;
while (true) {
// 准备poll()的参数
// 代码省略...
// 轮询活跃的文件描述符
int timeout_ms = (int)next_timer_ms();
int rv = poll(poll_args.data(), (nfds_t)poll_args.size(), timeout_ms);
if ( rv < 0) {
die("poll");
}
// 处理活跃连接
for (size_t i = 1; i < poll_args.size(); ++i) {
if (poll_args[i].revents) {
Conn *conn = g_data.fd2conn[poll_args[i].fd];
connection_io(conn);
if (conn->state == STATE_END) {
// 客户端正常关闭,或者出了点啥岔子
// 销毁这个连接
conn_done(conn);
}
}
}
// 处理定时器
process_timers();
// 如果监听的文件描述符活跃,尝试接受新连接
if (poll_args[0].revents) {
(void)accept_new_conn(fd);
}
}
return 0;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
这里有几个改动的地方:
poll
的超时参数由next_timer_ms
函数来计算。- 销毁连接的代码挪到了
conn_done
函数里。 - 新增了
process_timers
函数来触发定时器。 - 定时器在
connection_io
函数里更新,在accept_new_conn
函数里初始化。
next_timer_ms
函数从链表头取出第一个(也就是离当前时间最近的)定时器,用它来计算poll
的超时值。
const uint64_t k_idle_timeout_ms = 5 * 1000;
static uint32_t next_timer_ms() {
if (dlist_empty(&g_data.idle_list)) {
return 10000; // 没有定时器,这个值随便设
}
uint64_t now_us = get_monotonic_usec();
Conn *next = container_of(g_data.idle_list.next, Conn, idle_list);
uint64_t next_us = next->idle_start + k_idle_timeout_ms * 1000;
if (next_us <= now_us) {
// 超时了?
return 0;
}
return (uint32_t)((next_us - now_us) / 1000);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
在事件循环的每次迭代里,都会检查链表,好让定时器能按时“开工”。
static void process_timers() {
uint64_t now_us = get_monotonic_usec();
while (!dlist_empty(&g_data.idle_list)) {
Conn *next = container_of(g_data.idle_list.next, Conn, idle_list);
uint64_t next_us = next->idle_start + k_idle_timeout_ms * 1000;
if (next_us >= now_us + 1000) {
// 还没到时间,额外加1000微秒是考虑到poll()的毫秒精度
break;
}
printf("removing idle connection: %d\n", next->fd);
conn_done(next);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
定时器在connection_io
函数里更新:
static void connection_io(Conn *conn) {
// 被poll唤醒,更新闲置定时器
// 把conn移到链表末尾
conn->idle_start = get_monotonic_usec();
dlist_detach(&conn->idle_list);
dlist_insert_before(&g_data.idle_list, &conn->idle_list);
// 开始干活
if (conn->state == STATE_REQ) {
state_req(conn);
} else if (conn->state == STATE_RES) {
state_res(conn);
} else {
assert(0); // 不应该出现这种情况
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
定时器在accept_new_conn
函数里初始化:
static int32_t accept_new_conn(int fd) {
// 代码省略...
// 创建Conn结构体
struct Conn *conn = (struct Conn * )malloc(sizeof(struct Conn));
if (! conn) {
close(connfd);
return -1;
}
conn->fd = connfd;
conn->state = STATE_REQ;
conn->rbuf_size = 0;
conn->wbuf_size = 0;
conn->wbuf_sent = 0;
conn->idle_start = get_monotonic_usec();
dlist_insert_before(&g_data.idle_list, &conn->idle_list);
conn_put(g_data.fd2conn, conn);
return 0;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
别忘了,连接用完后要把它从链表上删掉:
static void conn_done(Conn *conn) {
g_data.fd2conn[conn->fd] = NULL;
(void)close(conn->fd);
dlist_detach(&conn->idle_list);
free(conn);
}
2
3
4
5
6
咱们可以用nc
或者socat
命令来测试闲置连接的超时功能:
$ ./server
removing idle connection: 4
$ socat tcp:127.0.0.1:1234 -
2
3
4
服务器应该在5秒内关闭连接。
# 练习:
- 给IO操作(读和写)加上超时处理。
- 试试用排序数据结构实现更通用的定时器。
12_server.cpp
avl.cpp
avl.h
common.h
hashtable.cpp
hashtable.h
list.h
zset.cpp
zset.h