13. 堆数据结构和生存时间(TTL)
# 13. 堆数据结构和生存时间(TTL)
Redis主要是当缓存服务器用,管理缓存大小的一个办法就是显式设置生存时间(TTL,time to live)。TTL可以用定时器来实现。可惜的是,上一章的定时器是固定值(用链表实现的),所以要实现任意可变的超时,就需要一个排序的数据结构,而堆(heap)数据结构就是个热门选择。和咱们之前用的AVL树比起来,堆数据结构的优势就是占用空间更少。
快速回顾一下堆数据结构:
- 堆是一棵二叉树,被“打包”到数组里,树的布局是固定的。节点间的父子关系是隐含的,堆元素里可没有指针哦。
- 这棵树唯一的限制就是父节点的值不能比子节点大。
- 元素的值可以更新。要是值变了:
- 如果比原来大,它可能就比自己的子节点大了。要是这样,就和最小的子节点交换一下位置,让父子节点的大小限制重新满足。现在有个“小家伙”子节点比原来大了,那就接着这么处理,一直到叶子节点为止。
- 如果值变小了,同样的道理,和父节点交换位置,一直换到根节点为止。
- 新元素作为叶子节点加到数组末尾,然后按上面说的规则保持堆的特性。
- 从堆里删除一个元素的时候,用数组里最后一个元素替换它,然后就当作这个元素的值更新了一样,重新保持堆的特性。
代码开始啦:
struct HeapItem {
uint64_t val = 0;
size_t * ref = NULL;
};
// 键的结构
struct Entry {
struct HNode node;
std::string key;
std::string val;
uint32_t type = 0;
ZSet *zset = NULL;
// 用于TTL
size_t heap_idx = -1;
};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
2
3
4
5
6
7
8
9
10
11
12
13
14
15
堆用来给时间戳排序,Entry
和时间戳相互关联。heap_idx
是对应的HeapItem
的索引,ref
指向Entry
。咱们又用上了侵入式数据结构,ref
指针指向heap_idx
字段。
父子节点的关系是固定的:
static size_t heap_parent(size_t i) {
return (i + 1) / 2 - 1;
}
static size_t heap_left(size_t i) {
return i * 2 + 1;
}
static size_t heap_right(size_t i) {
return i * 2 + 2;
}
1
2
3
4
5
6
7
8
9
10
11
2
3
4
5
6
7
8
9
10
11
如果子节点比父节点小,就和父节点交换。注意,交换的时候通过ref
指针更新heap_idx
。
static void heap_up(HeapItem *a, size_t pos) {
HeapItem t = a[pos];
while (pos > 0 && a[heap_parent(pos)].val > t.val) {
// 与父节点交换
a[pos] = a[heap_parent(pos)];
*a[pos].ref = pos;
pos = heap_parent(pos);
}
a[pos] = t;
*a[pos].ref = pos;
}
1
2
3
4
5
6
7
8
9
10
11
2
3
4
5
6
7
8
9
10
11
和最小的子节点交换也差不多。
static void heap_down(HeapItem *a, size_t pos, size_t len) {
HeapItem t = a[pos];
while (true) {
// 找到父节点和子节点中最小的那个
size_t l = heap_left(pos);
size_t r = heap_right(pos);
size_t min_pos = -1;
size_t min_val = t.val;
if (l < len && a[l].val < min_val) {
min_pos = l;
min_val = a[l].val;
}
if ( r < len && a[r].val < min_val) {
min_pos = r;
}
if (min_pos == (size_t)-1) {
break;
}
// 与子节点交换
a[pos] = a[min_pos];
*a[pos].ref = pos;
pos = min_pos;
}
a[pos] = t;
*a[pos].ref = pos;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
heap_update
是用来更新堆中某个位置的函数,插入、删除操作也会用到它。
void heap_update(HeapItem *a, size_t pos, size_t len) {
if (pos > 0 && a[heap_parent(pos)].val > a[pos].val) {
heap_up(a, pos);
} else {
heap_down(a, pos, len);
}
}
1
2
3
4
5
6
7
2
3
4
5
6
7
把堆加到咱们的服务器里:
// 全局变量
static struct {
HMap db;
// 所有客户端连接的映射,以文件描述符(fd)为键
std::vector<Conn *> fd2conn;
// 闲置连接的定时器
DList idle_list;
// TTL的定时器
std::vector<HeapItem> heap;
} g_data;
1
2
3
4
5
6
7
8
9
10
2
3
4
5
6
7
8
9
10
在堆里更新、添加、删除定时器,更新数组里的元素后,直接调用heap_update
就行。
// 设置或删除TTL
static void entry_set_ttl(Entry *ent, int64_t ttl_ms) {
if (ttl_ms < 0 && ent->heap_idx != (size_t)-1) {
// 从堆里删除一个元素
// 用数组里最后一个元素替换它
size_t pos = ent->heap_idx;
g_data.heap[pos] = g_data.heap.back();
g_data.heap.pop_back();
if (pos < g_data.heap.size()) {
heap_update(g_data.heap.data(), pos, g_data.heap.size());
}
ent->heap_idx = -1;
} else if (ttl_ms >= 0) {
size_t pos = ent->heap_idx;
if (pos == (size_t)-1) {
// 向堆里添加一个新元素
HeapItem item;
item.ref = &ent->heap_idx;
g_data.heap.push_back(item);
pos = g_data.heap.size() - 1;
}
g_data.heap[pos].val = get_monotonic_usec() + (uint64_t)ttl_ms * 1000;
heap_update(g_data.heap.data(), pos, g_data.heap.size());
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
删除Entry
的时候,把可能存在的TTL定时器也删掉:
static void entry_del(Entry *ent) {
switch (ent->type) {
case T_ZSET:
zset_dispose(ent->zset);
delete ent->zset;
break;
}
entry_set_ttl(ent, -1);
delete ent;
}
1
2
3
4
5
6
7
8
9
10
2
3
4
5
6
7
8
9
10
next_timer_ms
函数也改了,现在既能用闲置连接的定时器,也能用TTL定时器。
static uint32_t next_timer_ms() {
uint64_t now_us = get_monotonic_usec();
uint64_t next_us = (uint64_t)-1;
// 闲置连接的定时器
if (!dlist_empty(&g_data.idle_list)) {
Conn *next = container_of(g_data.idle_list.next, Conn, idle_list);
next_us = next->idle_start + k_idle_timeout_ms * 1000;
}
// TTL定时器
if (!g_data.heap.empty() && g_data.heap[0].val < next_us) {
next_us = g_data.heap[0].val;
}
if (next_us == (uint64_t)-1) {
return 10000; // 没有定时器,这个值无所谓
}
if (next_us <= now_us) {
// 超时了?
return 0;
}
return (uint32_t)((next_us - now_us) / 1000);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
在process_timers
函数里添加TTL定时器的处理:
static void process_timers() {
// 额外加1000微秒是考虑到poll()的毫秒精度
uint64_t now_us = get_monotonic_usec() + 1000;
// 闲置连接的定时器
while (!dlist_empty(&g_data.idle_list)) {
// 代码省略...
}
// TTL定时器
const size_t k_max_works = 2000;
size_t nworks = 0;
while (!g_data.heap.empty() && g_data.heap[0].val < now_us) {
Entry *ent = container_of(g_data.heap[0].ref, Entry, heap_idx);
HNode *node = hm_pop(&g_data.db, &ent->node, &hnode_same);
assert(node == &ent->node);
entry_del(ent);
if (nworks++ >= k_max_works) {
// 如果一下子有太多键过期,别让服务器“死机”
break;
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
这部分就是检查堆里的最小值,然后删除过期的键。注意,咱们给每次事件循环迭代中过期的键的数量设了个限制,这是为了防止一下子过期太多键,把服务器“搞瘫痪”。
添加更新和查询TTL的命令很简单:
static void do_expire(std::vector<std::string> &cmd, std::string &out) {
int64_t ttl_ms = 0;
if (! str2int(cmd[2], ttl_ms)) {
return out_err(out, ERR_ARG, "expect int64");
}
Entry key;
key.key.swap(cmd[1]);
key.node.hcode = str_hash((uint8_t * )key.key.data(), key.key.size());
HNode *node = hm_lookup(&g_data.db, &key.node, &entry_eq);
if (node) {
Entry *ent = container_of(node, Entry, node);
entry_set_ttl(ent, ttl_ms);
}
return out_int(out, node ? 1: 0);
}
static void do_ttl(std::vector<std::string> &cmd, std::string &out) {
Entry key;
key.key.swap(cmd[1]);
key.node.hcode = str_hash((uint8_t * )key.key.data(), key.key.size());
HNode *node = hm_lookup(&g_data.db, &key.node, &entry_eq);
if (! node) {
return out_int(out, -2);
}
Entry *ent = container_of(node, Entry, node);
if (ent->heap_idx == (size_t)-1) {
return out_int(out, -1);
}
uint64_t expire_at = g_data.heap[ent->heap_idx].val;
uint64_t now_us = get_monotonic_usec();
return out_int(out, expire_at > now_us ? (expire_at - now_us) / 1000 : 0);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# 练习:
- 基于堆的定时器给服务器带来了O(log(n))的操作,要是键的数量特别大,这可能会成为瓶颈。对于大量定时器,你能想出优化办法吗?
- 真正的Redis在处理过期的时候可没用排序的方法,去研究一下它是怎么做的,然后列出这两种方法各自的优缺点。
13_server.cpp
avl.cpp
avl.h
common.h
hashtable.cpp
hashtable.h
heap.cpp
heap.h
list.h
test_heap.cpp
zset.cpp
zset.h
上次更新: 2025/03/25, 00:48:42