CppGuide社区 CppGuide社区
首页
  • 🔥最新谷歌C++风格指南(含C++17/20)
  • 🔥C++17详解
  • 🔥C++20完全指南
  • 🔥C++23快速入门
  • C++语言面试问题集锦
  • 🔥交易系统开发岗位求职与面试指南 (opens new window)
  • 第1章 高频C++11重难点知识解析
  • 第2章 Linux GDB高级调试指南
  • 第3章 C++多线程编程从入门到进阶
  • 第4章 C++网络编程重难点解析
  • 第5章 网络通信故障排查常用命令
  • 第6章 高性能网络通信协议设计精要
  • 第7章 高性能服务结构设计
  • 第8章 Redis网络通信模块源码分析
  • 第9章 后端服务重要模块设计探索
  • 🚀 全部章节.pdf 下载 (opens new window)
  • 源码分析系列

    • leveldb源码分析
    • libevent源码分析
    • Memcached源码分析
    • TeamTalk源码分析
    • 优质源码分享 (opens new window)
    • 🔥远程控制软件gh0st源码分析
  • 从零手写C++项目系列

    • 🔥C++游戏编程入门(零基础学C++)
    • 🔥使用C++17从零开发一个调试器 (opens new window)
    • 🔥使用C++20从零构建一个完整的低延迟交易系统 (opens new window)
    • 🔥使用C++从零写一个C语言编译器 (opens new window)
    • 🔥从零用C语言写一个Redis
  • 🔥Windows 10系统编程
  • 🔥Linux 5.x内核开发与调试 完全指南 (opens new window)
  • TCP源码实现超详细注释版.pdf (opens new window)
  • Go语言特性

    • Go系统接口编程
    • 高效Go并发编程
    • Go性能调优
    • Go项目架构设计
  • Go项目实战

    • 🔥使用Go从零开发一个数据库
    • 🔥使用Go从零开发一个编译器 (opens new window)
    • 🔥使用Go从零开发一个解释器 (opens new window)
    • 🔥使用Go从零开发一个解释器 (opens new window)
    • 🔥用Go从零写一个编排器(类Kubernetes) (opens new window)
Rust编程指南
  • SQL零基础指南
  • MySQL开发与调试指南
GitHub (opens new window)
首页
  • 🔥最新谷歌C++风格指南(含C++17/20)
  • 🔥C++17详解
  • 🔥C++20完全指南
  • 🔥C++23快速入门
  • C++语言面试问题集锦
  • 🔥交易系统开发岗位求职与面试指南 (opens new window)
  • 第1章 高频C++11重难点知识解析
  • 第2章 Linux GDB高级调试指南
  • 第3章 C++多线程编程从入门到进阶
  • 第4章 C++网络编程重难点解析
  • 第5章 网络通信故障排查常用命令
  • 第6章 高性能网络通信协议设计精要
  • 第7章 高性能服务结构设计
  • 第8章 Redis网络通信模块源码分析
  • 第9章 后端服务重要模块设计探索
  • 🚀 全部章节.pdf 下载 (opens new window)
  • 源码分析系列

    • leveldb源码分析
    • libevent源码分析
    • Memcached源码分析
    • TeamTalk源码分析
    • 优质源码分享 (opens new window)
    • 🔥远程控制软件gh0st源码分析
  • 从零手写C++项目系列

    • 🔥C++游戏编程入门(零基础学C++)
    • 🔥使用C++17从零开发一个调试器 (opens new window)
    • 🔥使用C++20从零构建一个完整的低延迟交易系统 (opens new window)
    • 🔥使用C++从零写一个C语言编译器 (opens new window)
    • 🔥从零用C语言写一个Redis
  • 🔥Windows 10系统编程
  • 🔥Linux 5.x内核开发与调试 完全指南 (opens new window)
  • TCP源码实现超详细注释版.pdf (opens new window)
  • Go语言特性

    • Go系统接口编程
    • 高效Go并发编程
    • Go性能调优
    • Go项目架构设计
  • Go项目实战

    • 🔥使用Go从零开发一个数据库
    • 🔥使用Go从零开发一个编译器 (opens new window)
    • 🔥使用Go从零开发一个解释器 (opens new window)
    • 🔥使用Go从零开发一个解释器 (opens new window)
    • 🔥用Go从零写一个编排器(类Kubernetes) (opens new window)
Rust编程指南
  • SQL零基础指南
  • MySQL开发与调试指南
GitHub (opens new window)
  • 第1章高频C++11重难点知识解析

  • 第2章Linux GDB高级调试指南

  • 第3章C++多线程编程从入门到进阶

    • 3.1 线程的基本概念
    • 3.2 线程基本操作
    • 3.3 线程函数传C++类实例指针惯用法
    • 3.4 整型变量的原子操作
    • 3.5 Linux线程同步对象
    • 3.6 Windows 线程资源同步对象
    • 3.7 C++ 11/14/17 线程同步对象
    • 3.8 如何确保创建的线程一定运行起来?
    • 3.9 多线程使用锁实践经验总结
    • 3.10 线程局部存储
    • 3.11 C 库的非线程安全函数
    • 3.12 线程池与队列系统的设计
      • 3.13 纤程(Fiber)与协程(Coroutine)
      • 3.14 本章总结
    • 第4章C++网络编程重难点解析

    • 第5章网络通信故障排查常用命令

    • 第6章高性能网络通信协议设计精要

    • 第7章高性能服务结构设计

    • 第8章Redis 网络通信模块源码分析

    • 第9章后端服务重要模块设计探索

    • C++后端开发进阶
    • 第3章C++多线程编程从入门到进阶
    zhangxf
    2023-04-05
    目录

    3.12 线程池与队列系统的设计

    # 3.12.1 线程池的设计原理

    在很多新手的认知中,线程池和队列系统 是一项非常高深的技术,其实也不然,当你熟练掌握了多线程编程技术后,这一切将会变的很容易,你需要克服的是内心的恐惧而已。

    所谓线程池不过是一组线程而已,一般情况下,我们需要异步执行一些任务,这些任务的产生和执行是存在于我们程序的整个生命周期的,与其让操作系统频繁地为我们创建和销毁线程,我们通常需要创建一组在我们程序生命周期内不会退出的线程,为了不浪费系统资源,我们的基本要求是当有任务需要执行时,这些线程可以自动拿到任务去执行,没有任务时这些线程处于阻塞或者睡眠状态。这里就涉及到这些处理任务的工作线程的唤醒与睡眠,如果你理解了上文中介绍的各种线程同步技术,相信你现在对如何唤醒和睡眠线程已经很熟悉了。

    既然在程序生命周期内会产生很多任务,那么这些任务必须有一个存放的地方,而这个地方就是队列,所以不要一提到队列就认为是一个具体的 list,它可以是一个全局变量、链表等等。而线程池中的线程从队列中如何取任务,则也可以设计得非常灵活,如从尾部放入任务,从头部取出,或者从头部放入,从尾部取出等等。而队列也可以根据实际应用设计得”丰富多彩“,例如,可以根据任务的优先级设计多个队列(例如分为高、中、低三个级别或者分为关键、普通两个级别)。

    这本质上就是生产者消费者模式,产生任务的线程是生产者,线程池中的线程是消费者。当然,这不是绝对的,线程池中的线程处理一个任务以后可能会产生一个新的关联任务,那么此时这个工作线程又是生产者的角色。

    既然会有多个线程同时操作这个队列,根据多线程程序的原则,这个队列我们一般需要对其加锁,以避免多线程竞争产生非预期的结果。

    当然,技术上除了要解决线程池的创建、往队列中投递任务、从队列中取任务处理,我们还需要做一些善后工作,如线程池的清理,即如何退出线程池中的工作线程和清理任务队列。

    这就是线程池和任务队列的核心原理,希望读者能认真体会。

    说了这么多,结合前文介绍的,具体的实现也变得很容易,我们来看一个具体的例子:

    /** 
     * 任务池模型,TaskPool.h
     * zhangyl 2019.02.14
     */
    #include <thread>
    #include <mutex>
    #include <condition_variable>
    #include <list>
    #include <vector>
    #include <memory>
    #include <iostream>
    
    class Task
    {
    public:
        virtual void doIt()
        {
            std::cout << "handle a task..." << std::endl;
        }
    
        virtual ~Task()
        {
            //为了看到一个task的销毁,这里刻意补上其析构函数
            std::cout << "a task destructed..." << std::endl;
        }
    };
    
    class TaskPool final
    {
    public:
        TaskPool();
        ~TaskPool();
        TaskPool(const TaskPool& rhs) = delete;
        TaskPool& operator=(const TaskPool& rhs) = delete;
    
    public:
        void init(int threadNum = 5);
        void stop();
    
        void addTask(Task* task);
        void removeAllTasks();
    
    private:
        void threadFunc();
    
    private:
        std::list<std::shared_ptr<Task>>            m_taskList;
        std::mutex                                  m_mutexList;
        std::condition_variable                     m_cv;
        bool                                        m_bRunning;
        std::vector<std::shared_ptr<std::thread>>   m_threads;
    };
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    /**
     * 任务池模型,TaskPool.cpp
     * zhangyl 2019.02.14
     */
    
    #include "TaskPool.h"
    
    TaskPool::TaskPool() : m_bRunning(false)
    {
    
    }
    
    TaskPool::~TaskPool()
    {
        removeAllTasks();
    }
    
    void TaskPool::init(int threadNum/* = 5*/)
    {
        if (threadNum <= 0)
            threadNum = 5;
    
        m_bRunning = true;
    
        for (int i = 0; i < threadNum; ++i)
        {
            std::shared_ptr<std::thread> spThread;
            spThread.reset(new std::thread(std::bind(&TaskPool::threadFunc, this)));
            m_threads.push_back(spThread);
        }
    }
    
    void TaskPool::threadFunc()
    {
        std::shared_ptr<Task> spTask;
        while (true)
        {
            std::unique_lock<std::mutex> guard(m_mutexList);
            while (m_taskList.empty())
            {                 
                if (!m_bRunning)
                    break;
                
                //如果获得了互斥锁,但是条件不满足的话,m_cv.wait()调用会释放锁,且挂起当前
                //线程,因此不往下执行。
                //当发生变化后,条件满足,m_cv.wait() 将唤醒挂起的线程,且获得锁。
                m_cv.wait(guard);
            }
    
            if (!m_bRunning)
                break;
    
            spTask = m_taskList.front();
            m_taskList.pop_front();
    
            if (spTask == NULL)
                continue;
    
            spTask->doIt();
            spTask.reset();
        }
    
        std::cout << "exit thread, threadID: " << std::this_thread::get_id() << std::endl;
    }
    
    void TaskPool::stop()
    {
        m_bRunning = false;
        m_cv.notify_all();
    
        //等待所有线程退出
        for (auto& iter : m_threads)
        {
            if (iter->joinable())
                iter->join();
        }
    }
    
    void TaskPool::addTask(Task* task)
    {
        std::shared_ptr<Task> spTask;
        spTask.reset(task);
    
        {
            std::lock_guard<std::mutex> guard(m_mutexList);             
            m_taskList.push_back(spTask);
            std::cout << "add a Task." << std::endl;
        }
        
        m_cv.notify_one();
    }
    
    void TaskPool::removeAllTasks()
    {
        {
            std::lock_guard<std::mutex> guard(m_mutexList);
            for (auto& iter : m_taskList)
            {
                iter.reset();
            }
            m_taskList.clear();
        }
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103

    上述代码封装了一个简单的任务队列模型,我们可以这么使用这个 TaskPool 对象:

    #include "TaskPool.h"
    #include <chrono>
    
    int main()
    {
        TaskPool threadPool;
        threadPool.init();
    
        Task* task = NULL;
        for (int i = 0; i < 10; ++i)
        {
            task = new Task();
            threadPool.addTask(task);
        }
        
        std::this_thread::sleep_for(std::chrono::seconds(5));
    
        threadPool.stop();
    
        return 0;
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21

    程序执行结果如下:

    由于退出线程的输出提示不是原子的,多个线程并行执行,因此上图中这部分的输出出现了”错乱“。

    上述代码演示了一个基本的多线程队列模型,虽然简单,但是比较典型,可以应付实际生产中的一部分需求,你可以基于这个基础模型进行扩展,不管怎么扩展其基本原理都是一样的。

    # 3.12.2 环形队列

    例如,如果生产者和消费者(即产生任务者和处理任务者)的速度差不多,可以将队列改成环形队列,以节省内存空间。另外,很多应用为了追求效率,利用一些技巧将队列无锁化。这些都是仁者见仁智者见智的扩展了,本文不再介绍。不管如何,希望读者一定要理解线程池和任务队列的基本设计原理,只有这样你才能做更多高级的扩展和设计。

    # 3.12.3 消息中间件

    基于生产者消费者理论模型的队列系统在实际开发中实在是太常用了,以至于在一组服务中可能每个进程都需要一个这样的队列系统。既然如此,出于复用和解耦的目的,业界产生了许多独立的队列系统,这些队列系统或以一个独立的进程运行或以支持分布式的一组服务运行。我们把这种独立的队列系统称之为消息中间件。这些消息中间件在功能上做了丰富的扩展,如消费的方式、主备切换、容灾容错,数据自动备份和过期数据自动清理等等,比较典型的有 Kafka、ActiveMQ、RabbitMQ、RocketMQ 等。下图是 Kafka 官网提供的一张介绍 Kafka 作用的图片:

    下图是笔者开发过的一个金融交易系统后台服务拓扑图,其大量使用消息中间件 kafka:

    整个交易的流程如下:

    1. 前端通过 HTTP 请求向下单服务请求下单,下单服务在校验完数据后,会向**消息中间件 A1 **投递一条下单请求;
    2. 成交服务订阅了消息中间件 A1的消息,取出下单请求,结合自己的成交规则,如果可以成交,向**消息中间件 A2 **投递一条成交后的消息;
    3. 结算服务订阅了消息中间件A2,从其中拿到成交消息后,对用户资金账户进行结算,结算完成后,用户的下单就算正式完成了,然后产生一条行情消息投递给消息中间件 A3;
    4. 行情推送服务器从消息中间件 A3 中拿到行情消息后推送给所有已经连接的客户端。

    上述过程中,每个消息中间件(kafka)都有一个生产者和消费者,虚线箭头表示短连接,实线箭头表示长连接。当然,实际的金融交易系统要比这里的模型复杂许多,这里为了演示方便做了大量简化。

    有了这种专门的队列系统,生产者和消费者将最大化解耦,利用消息中间件提供的对外消息接口,生产者只需要负责生产消息,它不必关心谁是消费者,消费者也不用关心生产者是谁、何时有数据,而队列系统本身也不关心自己有多少生产者和消费者。当然,这种消息中间件还有其他一些非常优秀的功能,如对数据的备份、负载和容灾容错措施。笔者建议学有余力的读者适当地去了解一两种开源的队列系统的使用方法,如果掌握其设计思路那就善莫大焉了。

    上次更新: 2025/04/01, 20:53:14
    3.11 C 库的非线程安全函数
    3.13 纤程(Fiber)与协程(Coroutine)

    ← 3.11 C 库的非线程安全函数 3.13 纤程(Fiber)与协程(Coroutine)→

    最近更新
    01
    第二章 关键字static及其不同用法
    03-27
    02
    第一章 auto与类型推导
    03-27
    03
    C++语言面试问题集锦 目录与说明
    03-27
    更多文章>
    Copyright © 2024-2025 沪ICP备2023015129号 张小方 版权所有
    • 跟随系统
    • 浅色模式
    • 深色模式
    • 阅读模式