CppGuide社区 CppGuide社区
首页
  • 🔥最新谷歌C++风格指南(含C++17/20)
  • 🔥C++17详解
  • 🔥C++20完全指南
  • 🔥C++23快速入门
🔥C++面试
  • 第1章 C++ 惯用法与Modern C++篇
  • 第2章 C++开发工具与调试进阶
  • 第3章 C++多线程编程从入门到进阶
  • 第4章 C++网络编程重难点解析
  • 第5章 网络通信故障排查常用命令
  • 第6章 网络通信协议设计
  • 第7章 高性能服务结构设计
  • 第8章 Redis网络通信模块源码分析
  • 第9章 服务其他模块设计
  • 🚀 全部章节.pdf 下载 (opens new window)
  • 🔥C++游戏编程入门(零基础学C++)
  • 🔥使用C++17从零开发一个调试器 (opens new window)
  • 🔥使用C++20从零构建一个完整的低延迟交易系统 (opens new window)
  • 🔥使用C++从零写一个C语言编译器 (opens new window)
  • 🔥从零用C语言写一个Redis
  • leveldb源码分析
  • libevent源码分析
  • Memcached源码分析
  • TeamTalk源码分析
  • 优质源码分享 (opens new window)
  • 🔥远程控制软件gh0st源码分析
  • 🔥Windows 10系统编程
  • 🔥Linux 5.x内核开发与调试 完全指南 (opens new window)
  • TCP源码实现超详细注释版.pdf (opens new window)
  • 高效Go并发编程
  • Go性能调优
  • Go项目架构设计
  • 🔥使用Go从零开发一个数据库
  • 🔥使用Go从零开发一个编译器 (opens new window)
  • 🔥使用Go从零开发一个解释器 (opens new window)
Rust编程指南
  • SQL零基础指南
  • MySQL开发与调试指南
GitHub (opens new window)
首页
  • 🔥最新谷歌C++风格指南(含C++17/20)
  • 🔥C++17详解
  • 🔥C++20完全指南
  • 🔥C++23快速入门
🔥C++面试
  • 第1章 C++ 惯用法与Modern C++篇
  • 第2章 C++开发工具与调试进阶
  • 第3章 C++多线程编程从入门到进阶
  • 第4章 C++网络编程重难点解析
  • 第5章 网络通信故障排查常用命令
  • 第6章 网络通信协议设计
  • 第7章 高性能服务结构设计
  • 第8章 Redis网络通信模块源码分析
  • 第9章 服务其他模块设计
  • 🚀 全部章节.pdf 下载 (opens new window)
  • 🔥C++游戏编程入门(零基础学C++)
  • 🔥使用C++17从零开发一个调试器 (opens new window)
  • 🔥使用C++20从零构建一个完整的低延迟交易系统 (opens new window)
  • 🔥使用C++从零写一个C语言编译器 (opens new window)
  • 🔥从零用C语言写一个Redis
  • leveldb源码分析
  • libevent源码分析
  • Memcached源码分析
  • TeamTalk源码分析
  • 优质源码分享 (opens new window)
  • 🔥远程控制软件gh0st源码分析
  • 🔥Windows 10系统编程
  • 🔥Linux 5.x内核开发与调试 完全指南 (opens new window)
  • TCP源码实现超详细注释版.pdf (opens new window)
  • 高效Go并发编程
  • Go性能调优
  • Go项目架构设计
  • 🔥使用Go从零开发一个数据库
  • 🔥使用Go从零开发一个编译器 (opens new window)
  • 🔥使用Go从零开发一个解释器 (opens new window)
Rust编程指南
  • SQL零基础指南
  • MySQL开发与调试指南
GitHub (opens new window)
  • 从零用C语言写一个Redis 引言
  • 02. 套接字入门
  • 03. 简易服务器/客户端
  • 04. 协议解析
  • 05. 事件循环与非阻塞I/O
  • 06. 事件循环的实现
  • 07. 基础服务器:实现get、set、del功能
  • 08. 数据结构:哈希表
  • 09. 数据序列化
  • 10. AVL树:实现与测试
  • 11. AVL树和有序集合
  • 12. 事件循环和定时器
  • 13. 堆数据结构和生存时间(TTL)
    • 13. 堆数据结构和生存时间(TTL)
      • 练习:
  • 14. 线程池与异步任务
目录

13. 堆数据结构和生存时间(TTL)

# 13. 堆数据结构和生存时间(TTL)

Redis主要是当缓存服务器用,管理缓存大小的一个办法就是显式设置生存时间(TTL,time to live)。TTL可以用定时器来实现。可惜的是,上一章的定时器是固定值(用链表实现的),所以要实现任意可变的超时,就需要一个排序的数据结构,而堆(heap)数据结构就是个热门选择。和咱们之前用的AVL树比起来,堆数据结构的优势就是占用空间更少。

快速回顾一下堆数据结构:

  1. 堆是一棵二叉树,被“打包”到数组里,树的布局是固定的。节点间的父子关系是隐含的,堆元素里可没有指针哦。
  2. 这棵树唯一的限制就是父节点的值不能比子节点大。
  3. 元素的值可以更新。要是值变了:
    • 如果比原来大,它可能就比自己的子节点大了。要是这样,就和最小的子节点交换一下位置,让父子节点的大小限制重新满足。现在有个“小家伙”子节点比原来大了,那就接着这么处理,一直到叶子节点为止。
    • 如果值变小了,同样的道理,和父节点交换位置,一直换到根节点为止。
  4. 新元素作为叶子节点加到数组末尾,然后按上面说的规则保持堆的特性。
  5. 从堆里删除一个元素的时候,用数组里最后一个元素替换它,然后就当作这个元素的值更新了一样,重新保持堆的特性。

代码开始啦:

struct  HeapItem {
    uint64_t val =  0;
    size_t * ref =  NULL;
};

// 键的结构
struct  Entry {
    struct  HNode node;
    std::string key;
    std::string val;
    uint32_t type =  0;
    ZSet *zset =  NULL;
    // 用于TTL
    size_t heap_idx =  -1;
};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

堆用来给时间戳排序,Entry和时间戳相互关联。heap_idx是对应的HeapItem的索引,ref指向Entry。咱们又用上了侵入式数据结构,ref指针指向heap_idx字段。

父子节点的关系是固定的:

static size_t heap_parent(size_t i)  {
    return  (i +  1)  /  2 -  1;
}

static size_t heap_left(size_t i)  {
    return  i *  2 +  1;
}

static size_t heap_right(size_t i)  {
    return  i *  2 +  2;
}
1
2
3
4
5
6
7
8
9
10
11

如果子节点比父节点小,就和父节点交换。注意,交换的时候通过ref指针更新heap_idx。

static void heap_up(HeapItem *a,  size_t pos)  {
    HeapItem t =  a[pos];
    while  (pos >  0 &&  a[heap_parent(pos)].val >  t.val)  {
        // 与父节点交换
        a[pos]  =  a[heap_parent(pos)];
        *a[pos].ref =  pos;
        pos =  heap_parent(pos);
    }
    a[pos]  =  t;
    *a[pos].ref =  pos;
}
1
2
3
4
5
6
7
8
9
10
11

和最小的子节点交换也差不多。

static void heap_down(HeapItem *a,  size_t pos,  size_t len)  {
    HeapItem t =  a[pos];
    while  (true)  {
        // 找到父节点和子节点中最小的那个
        size_t l =  heap_left(pos);
        size_t r =  heap_right(pos);
        size_t min_pos =  -1;
        size_t min_val =  t.val;
        if  (l <  len &&  a[l].val <  min_val)  {
            min_pos =  l;
            min_val =  a[l].val;
        }
        if  ( r <  len &&  a[r].val <  min_val)  {
            min_pos =  r;
        }
        if  (min_pos ==  (size_t)-1)  {
            break;
        }
        // 与子节点交换
        a[pos]  =  a[min_pos];
        *a[pos].ref =  pos;
        pos =  min_pos;
    }
    a[pos]  =  t;
    *a[pos].ref =  pos;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

heap_update是用来更新堆中某个位置的函数,插入、删除操作也会用到它。

void heap_update(HeapItem *a,  size_t pos,  size_t len)  {
    if  (pos >  0 &&  a[heap_parent(pos)].val >  a[pos].val)  {
        heap_up(a,  pos);
    }  else  {
        heap_down(a,  pos,  len);
    }
}
1
2
3
4
5
6
7

把堆加到咱们的服务器里:

// 全局变量
static struct  {
    HMap db;
    // 所有客户端连接的映射,以文件描述符(fd)为键
    std::vector<Conn *>  fd2conn;
    // 闲置连接的定时器
    DList idle_list;
    // TTL的定时器
    std::vector<HeapItem>  heap;
}  g_data;
1
2
3
4
5
6
7
8
9
10

在堆里更新、添加、删除定时器,更新数组里的元素后,直接调用heap_update就行。

// 设置或删除TTL
static void entry_set_ttl(Entry *ent,  int64_t ttl_ms)  {
    if  (ttl_ms <  0 &&  ent->heap_idx !=  (size_t)-1)  {
        // 从堆里删除一个元素
        // 用数组里最后一个元素替换它
        size_t pos =  ent->heap_idx;
        g_data.heap[pos]  =  g_data.heap.back();
        g_data.heap.pop_back();
        if  (pos <  g_data.heap.size())  {
            heap_update(g_data.heap.data(),  pos,  g_data.heap.size());
        }
        ent->heap_idx =  -1;
    }  else  if  (ttl_ms >=  0)  {
        size_t pos =  ent->heap_idx;
        if  (pos ==  (size_t)-1)  {
            // 向堆里添加一个新元素
            HeapItem item;
            item.ref =  &ent->heap_idx;
            g_data.heap.push_back(item);
            pos =  g_data.heap.size()  -  1;
        }
        g_data.heap[pos].val =  get_monotonic_usec()  +   (uint64_t)ttl_ms *  1000;
        heap_update(g_data.heap.data(),  pos,  g_data.heap.size());
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

删除Entry的时候,把可能存在的TTL定时器也删掉:

static void entry_del(Entry *ent)  {
    switch  (ent->type)  {
    case  T_ZSET:
        zset_dispose(ent->zset);
        delete  ent->zset;
        break;
    }
    entry_set_ttl(ent,  -1);
    delete  ent;
}
1
2
3
4
5
6
7
8
9
10

next_timer_ms函数也改了,现在既能用闲置连接的定时器,也能用TTL定时器。

static uint32_t next_timer_ms()  {
    uint64_t now_us =  get_monotonic_usec();
    uint64_t next_us =   (uint64_t)-1;

    // 闲置连接的定时器
    if  (!dlist_empty(&g_data.idle_list))  {
        Conn *next =  container_of(g_data.idle_list.next,  Conn,  idle_list);
        next_us =  next->idle_start +   k_idle_timeout_ms *  1000;
    }

    // TTL定时器
    if  (!g_data.heap.empty()  &&  g_data.heap[0].val <  next_us)  {
        next_us =  g_data.heap[0].val;
    }

    if  (next_us ==  (uint64_t)-1)  {
        return  10000;     // 没有定时器,这个值无所谓
    }

    if  (next_us <=  now_us)  {
        // 超时了?
        return  0;
    }
    return  (uint32_t)((next_us -  now_us)  /  1000);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

在process_timers函数里添加TTL定时器的处理:

static void process_timers()  {
    // 额外加1000微秒是考虑到poll()的毫秒精度
    uint64_t now_us =  get_monotonic_usec()  +   1000;

    // 闲置连接的定时器
    while  (!dlist_empty(&g_data.idle_list))  {
        // 代码省略...
    }

    // TTL定时器
    const size_t k_max_works =  2000;
    size_t nworks =  0;
    while  (!g_data.heap.empty()  &&  g_data.heap[0].val <  now_us)  {
        Entry *ent =  container_of(g_data.heap[0].ref,  Entry,  heap_idx);
        HNode *node =  hm_pop(&g_data.db,  &ent->node,  &hnode_same);
        assert(node ==  &ent->node);
        entry_del(ent);
        if  (nworks++  >=  k_max_works)  {
            // 如果一下子有太多键过期,别让服务器“死机”
            break;
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

这部分就是检查堆里的最小值,然后删除过期的键。注意,咱们给每次事件循环迭代中过期的键的数量设了个限制,这是为了防止一下子过期太多键,把服务器“搞瘫痪”。

添加更新和查询TTL的命令很简单:

static void do_expire(std::vector<std::string>  &cmd,  std::string &out)  {
    int64_t ttl_ms =  0;
    if  (! str2int(cmd[2],  ttl_ms))  {
        return  out_err(out,  ERR_ARG,   "expect int64");
    }

    Entry key;
    key.key.swap(cmd[1]);
    key.node.hcode =  str_hash((uint8_t * )key.key.data(),  key.key.size());

    HNode *node =  hm_lookup(&g_data.db,  &key.node,  &entry_eq);
    if  (node)  {
        Entry *ent =  container_of(node,  Entry,  node);
        entry_set_ttl(ent,  ttl_ms);
    }
    return  out_int(out,  node ?  1:  0);
}

static void do_ttl(std::vector<std::string>  &cmd,  std::string &out)  {
    Entry key;
    key.key.swap(cmd[1]);
    key.node.hcode =  str_hash((uint8_t * )key.key.data(),  key.key.size());

    HNode *node =  hm_lookup(&g_data.db,  &key.node,  &entry_eq);
    if  (! node)  {
        return  out_int(out,  -2);
    }

    Entry *ent =  container_of(node,  Entry,  node);
    if  (ent->heap_idx ==  (size_t)-1)  {
        return  out_int(out,  -1);
    }

    uint64_t expire_at =  g_data.heap[ent->heap_idx].val;
    uint64_t now_us =  get_monotonic_usec();
    return  out_int(out,  expire_at >  now_us ?   (expire_at -  now_us)  /  1000 :  0);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37

# 练习:

  1. 基于堆的定时器给服务器带来了O(log(n))的操作,要是键的数量特别大,这可能会成为瓶颈。对于大量定时器,你能想出优化办法吗?
  2. 真正的Redis在处理过期的时候可没用排序的方法,去研究一下它是怎么做的,然后列出这两种方法各自的优缺点。
  • 13_server.cpp
  • avl.cpp
  • avl.h
  • common.h
  • hashtable.cpp
  • hashtable.h
  • heap.cpp
  • heap.h
  • list.h
  • test_heap.cpp
  • zset.cpp
  • zset.h
上次更新: 2025/03/25, 00:48:42
12. 事件循环和定时器
14. 线程池与异步任务

← 12. 事件循环和定时器 14. 线程池与异步任务→

最近更新
01
C++语言面试问题集锦 目录与说明
03-27
02
第四章 Lambda函数
03-27
03
第二章 关键字static及其不同用法
03-27
更多文章>
Copyright © 2024-2025 沪ICP备2023015129号 张小方 版权所有
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式