CppGuide社区 CppGuide社区
首页
  • 🔥最新谷歌C++风格指南(含C++17/20)
  • 🔥C++17详解
  • 🔥C++20完全指南
  • 🔥C++23快速入门
🔥C++面试
  • 第1章 C++ 惯用法与Modern C++篇
  • 第2章 C++开发工具与调试进阶
  • 第3章 C++多线程编程从入门到进阶
  • 第4章 C++网络编程重难点解析
  • 第5章 网络通信故障排查常用命令
  • 第6章 网络通信协议设计
  • 第7章 高性能服务结构设计
  • 第8章 Redis网络通信模块源码分析
  • 第9章 服务其他模块设计
  • 🚀 全部章节.pdf 下载 (opens new window)
  • 🔥C++游戏编程入门(零基础学C++)
  • 🔥使用C++17从零开发一个调试器 (opens new window)
  • 🔥使用C++20从零构建一个完整的低延迟交易系统 (opens new window)
  • 🔥使用C++从零写一个C语言编译器 (opens new window)
  • 🔥从零用C语言写一个Redis
  • leveldb源码分析
  • libevent源码分析
  • Memcached源码分析
  • TeamTalk源码分析
  • 优质源码分享 (opens new window)
  • 🔥远程控制软件gh0st源码分析
  • 🔥Windows 10系统编程
  • 🔥Linux 5.x内核开发与调试 完全指南 (opens new window)
  • TCP源码实现超详细注释版.pdf (opens new window)
  • 高效Go并发编程
  • Go性能调优
  • Go项目架构设计
  • 🔥使用Go从零开发一个数据库
  • 🔥使用Go从零开发一个编译器 (opens new window)
  • 🔥使用Go从零开发一个解释器 (opens new window)
Rust编程指南
  • SQL零基础指南
  • MySQL开发与调试指南
GitHub (opens new window)
首页
  • 🔥最新谷歌C++风格指南(含C++17/20)
  • 🔥C++17详解
  • 🔥C++20完全指南
  • 🔥C++23快速入门
🔥C++面试
  • 第1章 C++ 惯用法与Modern C++篇
  • 第2章 C++开发工具与调试进阶
  • 第3章 C++多线程编程从入门到进阶
  • 第4章 C++网络编程重难点解析
  • 第5章 网络通信故障排查常用命令
  • 第6章 网络通信协议设计
  • 第7章 高性能服务结构设计
  • 第8章 Redis网络通信模块源码分析
  • 第9章 服务其他模块设计
  • 🚀 全部章节.pdf 下载 (opens new window)
  • 🔥C++游戏编程入门(零基础学C++)
  • 🔥使用C++17从零开发一个调试器 (opens new window)
  • 🔥使用C++20从零构建一个完整的低延迟交易系统 (opens new window)
  • 🔥使用C++从零写一个C语言编译器 (opens new window)
  • 🔥从零用C语言写一个Redis
  • leveldb源码分析
  • libevent源码分析
  • Memcached源码分析
  • TeamTalk源码分析
  • 优质源码分享 (opens new window)
  • 🔥远程控制软件gh0st源码分析
  • 🔥Windows 10系统编程
  • 🔥Linux 5.x内核开发与调试 完全指南 (opens new window)
  • TCP源码实现超详细注释版.pdf (opens new window)
  • 高效Go并发编程
  • Go性能调优
  • Go项目架构设计
  • 🔥使用Go从零开发一个数据库
  • 🔥使用Go从零开发一个编译器 (opens new window)
  • 🔥使用Go从零开发一个解释器 (opens new window)
Rust编程指南
  • SQL零基础指南
  • MySQL开发与调试指南
GitHub (opens new window)
  • 从零用C语言写一个Redis 引言
  • 02. 套接字入门
  • 03. 简易服务器/客户端
  • 04. 协议解析
  • 05. 事件循环与非阻塞I/O
  • 06. 事件循环的实现
  • 07. 基础服务器:实现get、set、del功能
  • 08. 数据结构:哈希表
  • 09. 数据序列化
  • 10. AVL树:实现与测试
  • 11. AVL树和有序集合
  • 12. 事件循环和定时器
  • 13. 堆数据结构和生存时间(TTL)
  • 14. 线程池与异步任务
    • 14. 线程池与异步任务
      • 练习题
目录

14. 线程池与异步任务

# 14. 线程池与异步任务

自从咱们的服务器引入了有序集合(sorted set)这种数据类型,就出现了一个小“bug”:删除键的时候有点问题。要是有序集合特别大,释放它的节点可能得花老长时间,在销毁键的过程中,服务器就像被施了定身咒一样“卡”住了。不过别担心,用多线程把析构操作从主线程里“拽”出来,就能轻松解决这个问题。

首先,咱们来认识一下“线程池(thread pool)”,从名字就能看出来,它就像一个线程的“大池子”。池子里的线程负责从任务队列里“捞”任务,然后执行。用pthread的API实现一个多生产者多消费者的队列,其实也没那么难。(虽然在咱们这儿只有一个生产者。)

pthread里有两个重要的“小帮手”:pthread_mutex_t和pthread_cond_t,它们分别叫互斥锁(mutex)和条件变量(condition variable)。要是你对它们还不太熟,建议读完这章之后,去恶补一下多线程的知识。(比如看看pthread API的手册、翻翻操作系统的教材、找些线上课程啥的。)

下面简单给大家介绍一下这俩“小帮手”:

  • 任务队列会被多个线程(生产者和消费者)访问,就像大家都想去抢同一个宝贝,所以很明显,得用互斥锁来“把门”,保证同一时间只有一个线程能进去“拿东西”。
  • 消费者线程没事干的时候,就应该“睡大觉”,只有当队列里有任务了,才能把它们“叫醒”,这就是条件变量的活儿啦。

线程池的数据类型定义如下:

struct  Work {
    void (*f)(void * )  =  NULL;
    void *arg =  NULL;
};

struct  TheadPool {
    std::vector<pthread_t>  threads;
    std::deque<Work>  queue;
    pthread_mutex_t mu;
    pthread_cond_t not_empty;
};
1
2
3
4
5
6
7
8
9
10
11

thread_pool_init函数负责初始化线程池,并且启动线程。pthread类型的变量得用pthread_xxx_init函数来初始化,pthread_create函数则用来启动一个线程,目标函数是worker。

void thread_pool_init(TheadPool *tp,  size_t num_threads)  {
    assert(num_threads >  0);

    int rv =  pthread_mutex_init(&tp->mu,  NULL);
    assert(rv ==  0);
    rv =  pthread_cond_init(&tp->not_empty,  NULL);
    assert(rv ==  0);

    tp->threads.resize(num_threads);
    for  (size_t i =  0;  i <  num_threads;  ++i)  {
        int rv =  pthread_create(&tp->threads[i],  NULL,  &worker,  tp);
        assert(rv ==  0);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

消费者的代码如下:

static void *worker(void *arg)  {
    TheadPool *tp =  (TheadPool * )arg;
    while  (true)  {
        pthread_mutex_lock(&tp->mu);
        // 等待条件:队列不为空
        while  (tp->queue.empty())  {
            pthread_cond_wait(&tp->not_empty,  &tp->mu);
        }

        // 拿到任务啦
        Work w =  tp->queue.front();
        tp->queue.pop_front();
        pthread_mutex_unlock(&tp->mu);

        // 开始干活
        w.f(w.arg);
    }
    return  NULL;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

生产者的代码如下:

void thread_pool_queue(TheadPool *tp,  void (*f)(void * ),  void *arg)  {
    Work w;
    w.f =  f;
    w.arg =  arg;

    pthread_mutex_lock(&tp->mu);
    tp->queue.push_back(w);
    pthread_cond_signal(&tp->not_empty);
    pthread_mutex_unlock(&tp->mu);
}
1
2
3
4
5
6
7
8
9
10

下面给大家解释一下:

  1. 不管是生产者还是消费者,访问队列的代码都被pthread_mutex_lock和pthread_mutex_unlock“包”起来了,这就好比给队列加了个“门禁”,同一时间只有一个线程能进入队列。
  2. 消费者拿到互斥锁之后,得检查一下队列:
    • 如果队列里有任务,就从队列里“捞”一个任务出来,然后释放互斥锁,开始干活。
    • 要是队列空了,那就先释放互斥锁,然后“睡大觉”,等条件变量把它“叫醒”。这一步通过调用pthread_cond_wait就能实现。
  3. 生产者把任务放进队列之后,得调用pthread_cond_signal,把可能正在“呼呼大睡”的消费者叫醒。
  4. 消费者被pthread_cond_wait叫醒之后,会自动再次持有互斥锁。醒了之后,消费者还得再检查一下条件,如果队列还是空的(条件不满足),那就接着回去“睡大觉”。

关于条件变量的使用,还得再多说几句:pthread_cond_wait函数一般都得放在一个循环里,用来检查条件是否满足。这是因为在被唤醒的消费者拿到互斥锁之前,其他消费者可能已经把条件给改变了;互斥锁可不是从发出信号的线程直接“移交”到被唤醒的线程手里的!要是你看到条件变量没用在循环里,那很可能是写错啦。

下面给大家梳理一下具体的操作顺序,帮助大家理解条件变量的使用:

  1. 生产者发出信号。
  2. 生产者释放互斥锁。
  3. 某个消费者拿到互斥锁,把队列里的任务都“清空”了。
  4. 有个消费者被生产者的信号叫醒,也拿到了互斥锁,结果发现队列是空的!

注意,pthread_cond_signal不需要用互斥锁保护,在释放互斥锁之后再发出信号也是没问题的。

线程池搞定啦,现在把它加到咱们的服务器里:

// 全局变量
static struct  {
    HMap db;
    // 所有客户端连接的映射,通过fd作为键
    std::vector<Conn *>  fd2conn;
    // 空闲连接的定时器
    DList idle_list;
    // 生存时间(TTL)的定时器
    std::vector<HeapItem>  heap;
    // 线程池
    TheadPool tp;
}  g_data;
1
2
3
4
5
6
7
8
9
10
11
12

在main函数里:

// 一些初始化操作
dlist_init(&g_data.idle_list);
thread_pool_init(&g_data.tp,  4);
1
2
3

entry_del函数也得改改:把销毁大的有序集合的操作放到线程池里。不过线程池只处理大的集合,毕竟多线程也是有“开销”的。

// 立即释放键
static void entry_destroy(Entry *ent)  {
    switch  (ent->type)  {
        case  T_ZSET:
            zset_dispose(ent->zset);
            delete  ent->zset;
            break;
    }
    delete  ent;
}

static void entry_del_async(void *arg)  {
    entry_destroy((Entry * )arg);
}

// 从键空间分离后释放entry
static void entry_del(Entry *ent)  {
    entry_set_ttl(ent,  -1);

    const size_t k_large_container_size =   10000;
    bool too_big =  false;
    switch  (ent->type)  {
        case  T_ZSET:
            too_big =  hm_size(&ent->zset->hmap)  >  k_large_container_size;
            break;
    }

    if  (too_big)  {
        thread_pool_queue(&g_data.tp,  &entry_del_async,  ent);
    }  else  {
        entry_destroy(ent);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

# 练习题

  1. 信号量(semaphore)常被当作多线程的原语,来替代条件变量和互斥锁。试着用信号量实现一个线程池。
  2. 下面这些有意思的练习题能帮你进一步理解这些原语:
    1. 用信号量实现互斥锁。(很简单)
    2. 用条件变量实现信号量。(比较容易)
    3. 只用互斥锁实现条件变量。(有点难度)
    4. 现在你知道这些原语在某种程度上是等价的,那为啥会更倾向于用其中一个呢?
  • 14_server.cpp
  • avl.cpp
  • avl.h
  • common.h
  • hashtable.cpp
  • hashtable.h
  • heap.cpp
  • heap.h
  • list.h
  • thread_pool.cpp
  • thread_pool.h
  • zset.cpp
  • zset.h
上次更新: 2025/03/25, 00:48:42
13. 堆数据结构和生存时间(TTL)

← 13. 堆数据结构和生存时间(TTL)

最近更新
01
C++语言面试问题集锦 目录与说明
03-27
02
第四章 Lambda函数
03-27
03
第二章 关键字static及其不同用法
03-27
更多文章>
Copyright © 2024-2025 沪ICP备2023015129号 张小方 版权所有
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式