CppGuide社区 CppGuide社区
首页
  • 🔥最新谷歌C++风格指南(含C++17/20)
  • 🔥C++17详解
  • 🔥C++20完全指南
  • 🔥C++23快速入门
🔥C++面试
  • 第1章 C++ 惯用法与Modern C++篇
  • 第2章 C++开发工具与调试进阶
  • 第3章 C++多线程编程从入门到进阶
  • 第4章 C++网络编程重难点解析
  • 第5章 网络通信故障排查常用命令
  • 第6章 网络通信协议设计
  • 第7章 高性能服务结构设计
  • 第8章 Redis网络通信模块源码分析
  • 第9章 服务其他模块设计
  • 🚀 全部章节.pdf 下载 (opens new window)
  • 🔥C++游戏编程入门(零基础学C++)
  • 🔥使用C++17从零开发一个调试器 (opens new window)
  • 🔥使用C++20从零构建一个完整的低延迟交易系统 (opens new window)
  • 🔥使用C++从零写一个C语言编译器 (opens new window)
  • 🔥从零用C语言写一个Redis
  • leveldb源码分析
  • libevent源码分析
  • Memcached源码分析
  • TeamTalk源码分析
  • 优质源码分享 (opens new window)
  • 🔥远程控制软件gh0st源码分析
  • 🔥Windows 10系统编程
  • 🔥Linux 5.x内核开发与调试 完全指南 (opens new window)
  • TCP源码实现超详细注释版.pdf (opens new window)
  • 高效Go并发编程
  • Go性能调优
  • Go项目架构设计
  • 🔥使用Go从零开发一个数据库
  • 🔥使用Go从零开发一个编译器 (opens new window)
  • 🔥使用Go从零开发一个解释器 (opens new window)
Rust编程指南
  • SQL零基础指南
  • MySQL开发与调试指南
GitHub (opens new window)
首页
  • 🔥最新谷歌C++风格指南(含C++17/20)
  • 🔥C++17详解
  • 🔥C++20完全指南
  • 🔥C++23快速入门
🔥C++面试
  • 第1章 C++ 惯用法与Modern C++篇
  • 第2章 C++开发工具与调试进阶
  • 第3章 C++多线程编程从入门到进阶
  • 第4章 C++网络编程重难点解析
  • 第5章 网络通信故障排查常用命令
  • 第6章 网络通信协议设计
  • 第7章 高性能服务结构设计
  • 第8章 Redis网络通信模块源码分析
  • 第9章 服务其他模块设计
  • 🚀 全部章节.pdf 下载 (opens new window)
  • 🔥C++游戏编程入门(零基础学C++)
  • 🔥使用C++17从零开发一个调试器 (opens new window)
  • 🔥使用C++20从零构建一个完整的低延迟交易系统 (opens new window)
  • 🔥使用C++从零写一个C语言编译器 (opens new window)
  • 🔥从零用C语言写一个Redis
  • leveldb源码分析
  • libevent源码分析
  • Memcached源码分析
  • TeamTalk源码分析
  • 优质源码分享 (opens new window)
  • 🔥远程控制软件gh0st源码分析
  • 🔥Windows 10系统编程
  • 🔥Linux 5.x内核开发与调试 完全指南 (opens new window)
  • TCP源码实现超详细注释版.pdf (opens new window)
  • 高效Go并发编程
  • Go性能调优
  • Go项目架构设计
  • 🔥使用Go从零开发一个数据库
  • 🔥使用Go从零开发一个编译器 (opens new window)
  • 🔥使用Go从零开发一个解释器 (opens new window)
Rust编程指南
  • SQL零基础指南
  • MySQL开发与调试指南
GitHub (opens new window)
  • 前言
  • 第1章:C++23中可变参数功能的潜力
  • 第2章:函数与lambda表达式变形
  • 第3章:掌控低级输入输出操作
  • 第4章:掌握缓冲与异步IO
  • 第5章:优化内存管理
  • 第6章:优化内存性能
  • 第7章:面向专家的高级多线程编程
    • 概述
    • 轻松创建和控制线程
      • 基本线程创建和管理
      • 高级线程管理和同步
      • 示例程序:创建多个线程
      • 线程安全和数据竞争
      • 为高性能应用程序创建线程
      • 线程池(Thread Pooling)
      • 基于任务的并行(Task-based Parallelism)
      • 负载均衡(Load Balancing)
      • 示例程序:使用std::async动态创建线程
    • 使用互斥锁(Mutex)和锁(Lock)实现并行
      • 什么是互斥锁和锁?
      • 示例程序:使用互斥锁保护共享数据
      • 示例程序:使用多个互斥锁避免死锁
      • 使用std::lock()避免死锁
      • 示例程序:使用std::unique_lock避免竞态条件
    • 使用条件变量(Condition Variables)驯服线程通信
      • 条件变量如何工作?
      • 示例程序:使用条件变量
      • 示例程序:使用条件变量的多个生产者和消费者
    • 使用线程池进行负载均衡
      • 背景
      • 线程池的工作原理
      • 示例程序:设计一个线程池
      • 示例程序:为线程池实现任务
    • 总结
  • 第8章:线程同步与原子操作精通
  • 第9章:优化浮点数和整数运算
  • 后记
目录

第7章:面向专家的高级多线程编程

# 第7章:面向专家的高级多线程编程

# 概述

在本章中,我们将探讨C++中多线程的高级内容。高效管理多个线程可以显著提升性能。我们将从学习如何轻松创建和管理线程开始,让你能够以结构化且高效的方式完全掌控线程的创建、管理和终止。接着,我们会研究如何通过使用互斥锁(mutexes)和锁(locks)来最大化并行性,防止竞态条件(race conditions),确保线程安全地访问共享资源,避免数据损坏或不可预测的行为。

下一部分将重点介绍如何使用条件变量(condition variables)来控制线程通信。最后,我们会了解线程池(thread pools)如何通过管理一组可复用的线程来帮助平衡工作负载分配,更好地利用系统资源,这些线程能够同时处理多个任务。掌握这些概念后,你将有能力权威地处理多线程问题。

# 轻松创建和控制线程

多线程允许程序通过将工作分配到多个线程来同时执行多个任务。每个线程独立运行,共享相同的进程资源,如内存,但与其他线程并发执行。对于能够并行化任务的应用程序,如数据处理、模拟和实时系统,这可以显著提高性能。在这里,我们将探索如何使用标准库中的std::thread来创建、管理和同步线程,以实现高效的多线程编程。

# 基本线程创建和管理

std::thread类是创建和管理线程的关键组件。下面是一个示例程序,展示了如何创建一个基本线程,该线程与主线程并行执行一个任务:

#include <iostream>
#include <thread>

void task() {
    std::cout << "Thread ID: " << std::this_thread::get_id() << " is running the task.\n";
}

int main() {
    // 创建一个新线程来运行任务
    std::thread t1(task);

    // 从主线程打印信息
    std::cout << "Main thread ID: " << std::this_thread::get_id() << " is executing.\n";

    // 等待线程完成
    t1.join();

    return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

在这个程序中,函数task()是一个简单的任务,用于打印线程ID。我们使用std::thread创建一个新线程t1,它与主线程并发运行这个任务。join()函数用于确保主线程在继续执行之前等待t1完成。如果没有join(),主线程可能在t1仍在运行时终止,这可能会导致任务未完成。std::this_thread::get_id()返回当前执行线程的ID,以便区分主线程和创建的线程。

# 高级线程管理和同步

在处理多个线程时,尤其是在对性能要求苛刻的应用程序中,你需要仔细管理线程间的交互。现在,为了演示高级线程管理,我们将改进前面的示例,创建多个线程,每个线程执行一个更大任务的一部分。

# 示例程序:创建多个线程

假设我们想要并行执行一个计算任务,将工作分配给几个线程。为此,我们将使用多个线程对数组元素求和,每个线程负责对数组的一部分进行求和。

#include <iostream>
#include <thread>
#include <vector>
#include <numeric>  // 用于std::accumulate

// 每个线程对数组的一部分求和的函数
void partial_sum(const std::vector<int>& data, size_t start, size_t end, int& result) {
    result = std::accumulate(data.begin() + start, data.begin() + end, 0);
}

int main() {
    const size_t num_elements = 1000;
    const size_t num_threads = 4;

    // 生成一个数字数组
    std::vector<int> data(num_elements);
    for (size_t i = 0; i < num_elements; ++i) {
        data[i] = i + 1;  // 填充1到1000的数字
    }

    // 创建线程和部分和结果
    std::vector<std::thread> threads;
    std::vector<int> results(num_threads, 0);  // 每个线程在这里存储其部分结果

    size_t block_size = num_elements / num_threads;
    // 创建线程来计算部分和
    for (size_t i = 0; i < num_threads; ++i) {
        size_t start = i * block_size;
        size_t end = (i == num_threads - 1)? num_elements : (i + 1) * block_size;
        threads.emplace_back(partial_sum, std::ref(data), start, end, std::ref(results[i]));
    }

    // 等待所有线程完成
    for (auto& t : threads) {
        t.join();
    }

    // 对部分结果求和
    int total_sum = std::accumulate(results.begin(), results.end(), 0);
    std::cout << "Total sum: " << total_sum << "\n";

    return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43

在这里,我们将数组data分成num_threads个部分,这种分工有助于将工作负载均匀地分布到多个CPU核心上,对于大型数据集可以提高性能。

此外,我们使用std::ref传递对向量data和结果results[i]的引用。这确保每个线程可以修改其各自的结果,而无需创建不必要的数据副本。所有线程创建完成后,主线程使用join()等待每个线程完成。这确保只有在所有线程完成工作后,才会对部分结果进行最终求和。所有线程完成后,我们使用std::accumulate()对每个线程的结果求和,得到数组的总和。

这整个过程展示了创建和同步线程如何提高那些可以分解为更小、独立工作单元的任务的性能。

# 线程安全和数据竞争

当多个线程访问共享资源(例如共享数据或内存)时,我们必须确保它们的访问是协调的,以防止数据竞争。数据竞争是指两个或多个线程同时访问共享数据,导致不可预测的行为。例如,如果两个线程试图同时写入同一内存位置,数据的最终值可能取决于执行顺序,而在多线程程序中,执行顺序通常是不确定的。

在上面的示例中,每个线程处理数组的不同部分,并且有自己的结果存储(results[i]),因此不存在共享内存冲突。然而,在一些场景中,线程必须更新共享数据,这时就需要像互斥锁(mutexes)和锁(locks)这样的机制来确保线程安全。

# 为高性能应用程序创建线程

在高性能并行应用程序中,可以应用以下技术来进一步改进复杂程序中的线程管理:

# 线程池(Thread Pooling)

与不断创建和销毁线程(这可能开销很大)不同,线程池允许你维持固定数量的线程,这些线程可以被多个任务复用。这减少了线程创建和销毁的开销,特别是在任务小且频繁的应用程序中。

# 基于任务的并行(Task-based Parallelism)

现代C++提供了像std::async和std::future这样的更高级抽象,用于处理异步任务,而不是直接管理线程。这些抽象允许运行时高效地管理线程执行,而无需显式的线程管理。

# 负载均衡(Load Balancing)

如果一个线程比其他线程完成得早很多,一些CPU核心可能会闲置,从而降低程序的整体效率。像动态调度或自适应分区这样的技术可以帮助动态平衡工作负载 。

# 示例程序:使用std::async动态创建线程

我们还可以使用std::async来异步处理任务,而无需手动创建和管理线程。下面的示例展示了如何使用std::async完成相同的并行求和任务:

#include <iostream>
#include <vector>
#include <numeric>
#include <future>  // 用于std::async和std::future

// 每个线程对数组的一部分求和的函数
int partial_sum(const std::vector<int>& data, size_t start, size_t end) {
    return std::accumulate(data.begin() + start, data.begin() + end, 0);
}

int main() {
    const size_t num_elements = 1000;
    const size_t num_threads = 4;

    // 生成一个数字数组
    std::vector<int> data(num_elements);
    for (size_t i = 0; i < num_elements; ++i) {
        data[i] = i + 1;  // 填充1到1000的数字
    }

    // 创建futures来处理部分和
    std::vector<std::future<int>> futures;
    size_t block_size = num_elements / num_threads;

    // 异步创建任务来计算部分和
    for (size_t i = 0; i < num_threads; ++i) {
        size_t start = i * block_size;
        size_t end = (i == num_threads - 1)? num_elements : (i + 1) * block_size;
        futures.push_back(std::async(std::launch::async, partial_sum, std::cref(data), start, end));
    }

    // 对每个future的结果求和
    int total_sum = 0;
    for (auto& future : futures) {
        total_sum += future.get(); // 等待每个任务完成
    }

    std::cout << "Total sum: " << total_sum << "\n";

    return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41

在本节中,我们展示了如何创建、管理和同步线程以实现并行执行,显著提高数据并行任务的性能。此外,我们还探索了高级线程管理技术,如使用std::async进行基于任务的并行和动态线程创建,这些技术实现了更灵活和高效的多线程编程。

# 使用互斥锁(Mutex)和锁(Lock)实现并行

在多线程编程中,一个关键挑战是确保多个线程能够安全地访问共享资源。当多个线程试图同时修改或读取共享数据时,可能会导致竞态条件(race condition)或数据损坏,进而引发不可预测和错误的行为。虽然互斥锁和锁是确保线程安全的强大工具,但使用不当会导致诸如死锁(deadlock)之类的问题,即两个或多个线程相互等待对方释放资源,导致程序无限期挂起。在本节中,我们将探讨如何有效地使用互斥锁和锁,并展示避免常见陷阱(如死锁和竞态条件)的实用技巧。

# 什么是互斥锁和锁?

互斥锁(mutex)是一种同步原语,用于防止多个线程同时访问共享资源。当一个线程锁定(lock)一个互斥锁时,它获得对共享资源的独占访问权,在互斥锁被释放之前,其他线程无法获取相同的锁。

锁(lock)是一个更通用的概念,表示对互斥锁的所有权。C++ 提供了几种类型的锁,例如:

  • std::lock_guard:在创建时自动获取(acquire)一个互斥锁,并在超出作用域时释放它。
  • std::unique_lock:比lock_guard更灵活,允许手动锁定和解锁互斥锁。

这些同步机制可以确保一次只有一个线程访问共享数据,从而防止竞态条件。

# 示例程序:使用互斥锁保护共享数据

我们从一个简单的示例开始,展示互斥锁的使用。假设有两个线程对一个共享计数器进行递增操作。如果没有适当的同步,两个线程可能会同时尝试修改计数器,导致结果错误。

#include <iostream> 
#include <thread>     
#include <mutex>

int counter = 0;  // 共享资源
std::mutex mtx;  // 用于保护共享资源的互斥锁

void increment() {
    for (int i = 0; i < 1000000; ++i) {
        std::lock_guard<std::mutex> lock(mtx);  // 锁定互斥锁以防止竞态条件
        ++counter;
    }
}

int main() {
    std::thread t1(increment);
    std::thread t2(increment);

    t1.join();
    t2.join();

    std::cout << "Final counter value: " << counter << "\n";

    return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
  • counter变量在两个线程之间共享,mtx是一个互斥锁,用于控制对counter的访问。
  • 在increment()函数内部,我们在修改counter之前使用std::lock_guard获取互斥锁。这确保了一次只有一个线程可以递增计数器。当lock_guard超出作用域(在每次循环迭代结束时),锁会自动释放。
  • join()调用确保在程序退出之前,两个线程都完成执行。

在上述示例脚本中,使用互斥锁防止了竞态条件,确保了计数器的最终值是正确的。

# 示例程序:使用多个互斥锁避免死锁

在更复杂的程序中,线程可能需要获取多个锁才能访问不同的共享资源。如果两个线程以不同的顺序锁定资源,可能会导致死锁,即两个线程都在等待对方释放资源,从而都无法继续执行。

现在,我们考虑一个示例,其中两个共享资源由两个互斥锁保护。查看以下示例程序,该程序展示了一个潜在的死锁情况:

#include <iostream> 
#include <thread>
#include <mutex>

std::mutex mtx1;  
std::mutex mtx2;  

void task1() {
    std::lock_guard<std::mutex> lock1(mtx1); // 首先锁定mtx1
    std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 模拟一些工作
    std::lock_guard<std::mutex> lock2(mtx2); // 接下来锁定mtx2
    std::cout << "Task 1 acquired both locks.\n";
}

void task2() {
    std::lock_guard<std::mutex> lock2(mtx2); // 首先锁定mtx2
    std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 模拟一些工作
    std::lock_guard<std::mutex> lock1(mtx1); // 接下来锁定mtx1
    std::cout << "Task 2 acquired both locks.\n";
}

int main() {
    std::thread t1(task1);
    std::thread t2(task2);

    t1.join();
    t2.join();

    return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

在上述程序中: 我们在task1()中先锁定mtx1,然后锁定mtx2。而在task2()中,操作顺序相反:先锁定mtx2,然后锁定mtx1。如果task1()锁定了mtx1,同时task2()锁定了mtx2,两个线程将都在等待对方释放第二个锁,从而导致死锁。

对std::this_thread::sleep_for()的调用模拟了获取第一个锁和第二个锁之间的延迟,使死锁场景更容易出现。现在,为了防止死锁,我们必须确保所有线程以相同的顺序获取锁。

# 使用std::lock()避免死锁

C++ 提供了std::lock(),以无死锁的方式安全地锁定多个互斥锁。它尝试同时锁定所有给定的互斥锁,确保没有线程会因为等待其他线程持有的资源而陷入阻塞。

下面展示了如何修改前面的示例,使用std::lock()避免死锁:

#include <iostream> 
#include <thread>    
#include <mutex>    

std::mutex mtx1;  
std::mutex mtx2;  

void task1() {
    // 以无死锁的方式锁定两个互斥锁
    std::lock(mtx1, mtx2);

    // 使用std::lock_guard管理两个锁
    std::lock_guard<std::mutex> lock1(mtx1, std::adopt_lock);  
    std::lock_guard<std::mutex> lock2(mtx2, std::adopt_lock);  
    std::cout << "Task 1 acquired both locks.\n";
}

void task2() {
    // 以无死锁的方式锁定两个互斥锁
    std::lock(mtx1, mtx2);

    // 使用std::lock_guard管理两个锁
    std::lock_guard<std::mutex> lock1(mtx1, std::adopt_lock);  
    std::lock_guard<std::mutex> lock2(mtx2, std::adopt_lock);  
    std::cout << "Task 2 acquired both locks.\n";
}

int main() {
    std::thread t1(task1);  
    std::thread t2(task2);

    t1.join();
    t2.join();

    return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36

这里,对std::lock(mtx1, mtx2)的调用会同时锁定mtx1和mtx2。它在内部处理锁的顺序,因此没有线程会无限期等待。使用std::lock()锁定两个互斥锁后,我们使用带有std::adopt_lock标记的std::lock_guard。这告诉lock_guard互斥锁已经被锁定,所以它不需要再次锁定。这确保了当lock_guard对象超出作用域时,互斥锁会自动释放。

# 示例程序:使用std::unique_lock避免竞态条件

多线程程序中的另一个常见问题是竞态条件,即两个或多个线程试图同时修改共享数据。我们可以使用std::unique_lock避免竞态条件,它比std::lock_guard更灵活,因为它允许手动锁定和解锁互斥锁。

为此,我们将修改前面的计数器示例,使用std::unique_lock代替std::lock_guard:

#include <iostream> 
#include <thread>
#include <mutex>

int counter = 0;
std::mutex mtx;    

void increment() {
    for (int i = 0; i < 1000000; ++i) {
        std::unique_lock<std::mutex> lock(mtx);  // 锁定互斥锁
        ++counter;
        // 锁可以显式解锁,或者在超出作用域时解锁
    }
}

int main() {
    std::thread t1(increment);
    std::thread t2(increment);

    t1.join();
    t2.join();

    std::cout << " Final counter value: " << counter << "\n";

    return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

在上述示例脚本中,std::unique_lock提供了对锁定和解锁互斥锁的更多控制,在需要在同一作用域内多次锁定和解锁互斥锁的情况下非常有用。然而,互斥锁使用不当可能会导致死锁,即两个或多个线程无限期地等待对方释放资源。

# 使用条件变量(Condition Variables)驯服线程通信

在处理多线程时,经常会遇到这样的情况:线程之间需要相互通信,或者在继续执行之前等待特定条件得到满足。管理这种情况最有效的方法之一就是通过条件变量。条件变量允许一个线程进入睡眠状态,等待特定条件变为真。当条件满足时,另一个线程可以向等待的线程发出信号,使其恢复执行。这种机制在诸如生产者 - 消费者模型等场景中至关重要,在该模型中,一个线程生成数据,另一个线程消费数据。

在本节中,我们将探讨条件变量的工作原理,演示如何有效地使用它们,并构建一个实际的多线程示例,利用条件变量来设计高效的等待机制以及线程之间的信号传递。

# 条件变量如何工作?

条件变量与互斥锁(mutex)协同工作,以控制对共享资源的访问。以下是一般过程:

  • 一个线程锁定互斥锁,并检查特定条件是否为真。
  • 如果条件为假,该线程调用条件变量的wait()方法,这会释放互斥锁并使线程进入睡眠状态。
  • 另一个线程修改共享数据,并调用条件变量的notify_one()或notify_all()方法,向等待的线程发出信号。
  • 等待的线程醒来,重新获取互斥锁,并检查条件现在是否为真。如果是,线程继续执行;如果不是,则再次等待。

条件变量的关键优势在于,它们允许线程在等待条件变化时进入睡眠状态,而不是主动轮询并消耗CPU资源。

# 示例程序:使用条件变量

为了更实际地理解,我们将创建一个简单的生产者 - 消费者场景,其中生产者线程生成数据,消费者线程在消费数据之前等待数据被生成。

#include <iostream> 
#include <thread>     
#include <mutex>
#include <condition_variable> 
#include <queue>

std::queue<int> data_queue; 
std::mutex mtx;
std::condition_variable cv; 
bool done = false;

void producer() {
    for (int i = 0; i < 10; ++i) {
        std::unique_lock<std::mutex> lock(mtx);
        data_queue.push(i);  // Produce data
        std::cout << "Produced: " << i << "\n";

        lock.unlock();
        cv.notify_one(); // Notify the consumer
        std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Simulate work
    }
    // Signal that production is done
    std::unique_lock<std::mutex> lock(mtx);
    done = true;
    lock.unlock();
    cv.notify_one(); // Notify the consumer that production is finished
}

void consumer() { 
    while (true) {
        std::unique_lock<std::mutex> lock(mtx);
        cv.wait(lock, [] { return !data_queue.empty() || done; }); // Wait for data or done signal
        while (!data_queue.empty()) {  
            int data = data_queue.front(); 
            data_queue.pop();

            std::cout << "Consumed: " << data << "\n"; 
        }
        if (done) break;
    }
}

int main() {
    std::thread prod(producer);
    std::thread cons(consumer);

    prod.join();

    cons.join();

    return 0; 
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52

在这个程序中,我们使用std::queue<int>作为存储生成数据的共享数据结构。std::condition_variable cv用于同步生产者和消费者。生产者通过调用cv.notify_one(),在有新数据可用时通知消费者,消费者使用cv.wait()在条件变量上等待。

producer()函数生成数字并将它们推送到队列中。添加数据后,它解锁互斥锁,并使用cv.notify_one()通知消费者。在生成10个项目后,它将done标志设置为true,并最后一次通知消费者,表示生产已完成。

consumer()函数使用cv.wait()等待数据。wait()函数释放互斥锁并使线程进入睡眠状态,直到条件变量收到通知。一旦有数据可用,消费者就会处理它。当done标志设置为true时,消费者停止等待并结束。

通过这个示例,展示了条件变量如何帮助线程在不进行忙等待或消耗CPU资源的情况下,等待特定条件。在多个线程需要协调其操作的复杂系统中,这些条件变量变得更加有用。我们将把生产者 - 消费者示例扩展到一个更高级的场景,其中多个生产者和消费者并行工作,但我们要确保消费者在数据完全生成之前不会消费数据。

在这种情况下,我们可以使用条件变量有效地管理线程之间的通信流,并防止竞态条件(race conditions)。

# 示例程序:使用条件变量的多个生产者和消费者

在上述程序的基础上,我们将有两个生产者线程生成数据,两个消费者线程消费数据。条件变量确保消费者等待数据可用,生产者在数据准备好时通知消费者。

#include <iostream> 
#include <thread>    
#include <mutex>
#include <condition_variable> 
#include <queue>
#include <vector>

std::queue<int> data_queue;
std::mutex mtx;
std::condition_variable cv; 
bool done = false;
int production_count = 0;  // Track the number of produced items

// Producer function: Generates data and pushes it to the queue
void producer(int producer_id, int num_items) { 
    for (int i = 0; i < num_items; ++i) {
        std::unique_lock<std::mutex> lock(mtx);
        data_queue.push(i + (producer_id * 100));  // Distinguish producer data
        std::cout << "Producer " << producer_id << " produced: " << i + (producer_id * 100) << "\n";
        ++production_count; 
        lock.unlock();
        cv.notify_one(); // Notify a consumer
        std::this_thread::sleep_for(std::chrono::milliseconds(100));  // Simulate work
    }
    // Signal that this producer is done producing
    std::unique_lock<std::mutex> lock(mtx); 
    done = true;

    cv.notify_all(); // Notify all consumers that production is finished
}

// Consumer function: Consumes data from the queue
void consumer(int consumer_id) { 
    while (true) {
        std::unique_lock<std::mutex> lock(mtx);
        cv.wait(lock, [] { return !data_queue.empty() || done; }); // Wait for data or done signal
        while (!data_queue.empty()) {  
            int data = data_queue.front(); 
            data_queue.pop();

            std::cout << "Consumer " << consumer_id << " consumed: " << data << "\n"; 
        }
        if (done && data_queue.empty()) break; // Exit if done and no more data
    }
}

int main() {
    const int num_producers = 2;
    const int num_consumers = 2;
    const int items_per_producer = 5; 

    // Create producer threads
    std::vector<std::thread> producers;    
    for (int i = 0; i < num_producers; ++i) {
        producers.emplace_back(producer,i, items_per_producer);
    }

    // Create consumer threads
    std::vector<std::thread> consumers;    
    for (int i = 0; i < num_consumers; ++i) {
        consumers.emplace_back(consumer, i); 
    }

    // Wait for producers to finish
    for (auto& prod : producers) { 
        prod.join();
    }

    // Wait for consumers to finish 
    for (auto& cons : consumers) {
        cons.join(); 
    }
    std::cout << "All producers and consumers have finished.\n";

    return 0; 
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76

在这个示例中:

  • 有两个生产者,每个生产者生成五个项目。producer()函数将数据推送到队列中,并使用条件变量通知消费者。每个生产者生成的数据与其他生产者不同,以模拟不同的数据来源。
  • 有两个消费者,它们使用cv.wait()等待数据可用。每个消费者检查条件变量,并在收到通知时消费数据。
  • 消费者通过条件变量高效地等待,这确保了它们在等待数据生成时不会消耗CPU资源。它们仅在数据可用或所有生产者都完成时才会醒来。
  • 通过同时使用互斥锁和条件变量,我们确保对共享的data_queue的访问是线程安全的。cv.wait()函数在线程等待时自动释放互斥锁,并在线程收到通知时重新获取它。

producer()函数在将数据添加到队列后调用cv.notify_one(),这会唤醒一个等待的消费者线程。一旦所有数据都已生成,生产者使用cv.notify_all()发出完成信号,这会唤醒所有剩余的消费者以完成它们的工作。

通过使线程在等待条件变化时进入睡眠状态,实现了消除忙等待和减少CPU使用。在这里,我们展示了如何通过使用条件变量同步多个线程,在生产者 - 消费者场景中避免竞态问题并有效地管理共享资源。

# 使用线程池进行负载均衡

# 背景

在多线程应用程序中,管理线程的创建和销毁会带来显著的开销,尤其是在任务较小或频繁执行的情况下。每次创建一个新线程时,都要分配资源,而一旦线程完成任务,这些资源又会被释放。在对性能要求苛刻的应用程序中,这种开销可能会抵消多线程带来的优势。为了解决这个问题,线程池(thread pool)提供了一种高效的方式,可以复用固定数量的线程,从而降低不断创建和销毁线程所带来的成本。

线程池维护着一组随时准备执行任务的线程。它不是为每个任务都创建一个新线程,而是将任务提交到一个队列中,线程池里可用的线程会从队列中取出任务并执行。这种模式可以更好地控制资源的利用,确保CPU核心得到充分利用,并有助于在多个线程之间平衡工作负载,从而实现性能优化。

# 线程池的工作原理

线程池通常由以下组件组成:

  1. 线程池:在线程池初始化时会创建固定数量的线程。这些线程等待分配任务。
  2. 任务队列:任务(工作单元)被提交到线程池并放入队列中。当一个线程可用时,它会从队列中取出一个任务并执行。
  3. 任务同步:使用条件变量(condition variable)或其他同步机制,在队列中有新任务时通知线程。
  4. 负载均衡:线程池确保任务在线程之间均匀分配,最大限度地减少空闲时间并最大化CPU利用率。

# 示例程序:设计一个线程池

我们将为多线程应用程序设计一个线程池。该线程池将由工作线程组成,这些工作线程会不断从共享任务队列中获取并执行任务。所有任务处理完毕后,线程池将优雅地关闭。

下面是一个简单线程池的代码:

#include <iostream> 
#include <vector>     
#include <thread>     
#include <queue>     
#include <mutex>
#include <condition_variable>
#include <functional> 
#include <atomic>

class ThreadPool { 
public:
    ThreadPool(size_t num_threads); 
    ~ThreadPool();
    // Add a new task to the thread pool
    void enqueue_task(std::function<void()> task);
    // Shutdown the thread pool 
    void shutdown();

private:
    std::vector<std::thread> workers;             
    std::queue<std::function<void()>> task_queue;  
    std::mutex queue_mutex;
    std::condition_variable cv;
    std::atomic<bool> stop; 
    // Worker thread function 
    void worker_thread();
};

// ThreadPool constructor: initializes worker threads
ThreadPool::ThreadPool(size_t num_threads) : stop(false) {
    for (size_t i = 0; i < num_threads; ++i) {
        workers.emplace_back(&ThreadPool::worker_thread, this);
    } 
}
// ThreadPool destructor: shuts down the pool and joins all threads
ThreadPool::~ThreadPool() {
    shutdown();
}
// Add a task to the task queue
void ThreadPool::enqueue_task(std::function<void()> task) {
    {
        std::unique_lock<std::mutex> lock(queue_mutex);
        task_queue.push(std::move(task)); 
    }
    cv.notify_one(); 
}
// Worker thread function: continuously fetch and execute tasks
void ThreadPool::worker_thread() {
    while (true) {
        std::function<void()> task;
        {
            std::unique_lock<std::mutex> lock(queue_mutex);
            cv.wait(lock, [this] { return stop || !task_queue.empty(); });
            if (stop && task_queue.empty()) return; 
            task = std::move(task_queue.front()); 
            task_queue.pop();
        }
        task();
    } 
}
// Shutdown the thread pool
void ThreadPool::shutdown() {
    {
        std::unique_lock<std::mutex> lock(queue_mutex);
        stop = true; 
    }
    cv.notify_all(); 
    for (std::thread &worker : workers) {
        if (worker.joinable()) {
            worker.join(); 
        }
    } 
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73

在上述程序中:

  • ThreadPool构造函数使用指定数量的线程初始化线程池。每个工作线程运行worker_thread()函数,该函数不断从任务队列中获取任务。
  • enqueue_task()函数允许外部代码向线程池提交任务。然后任务被推送到队列中,并且会通知其中一个工作线程唤醒并处理该任务。
  • 每个工作线程不断从队列中获取任务并执行。如果没有可用任务且线程池尚未停止,线程会等待直到有任务可用。一旦停止标志(stop flag)设置为true,工作线程就会停止获取任务并优雅地退出。
  • shutdown()函数将停止标志设置为true,并通知所有工作线程完成它们的任务。一旦所有线程完成当前任务,线程池就会关闭。

# 示例程序:为线程池实现任务

现在,让我们扩展生产者 - 消费者场景以使用线程池。在这里,我们将有多个模拟生产者和消费者的任务,线程池将在多个线程之间平衡负载。

#include <iostream>
#include <thread>
#include <mutex>
#include <queue>
#include <chrono>
#include <condition_variable>
#include <functional>
// 全局数据结构
std::queue<int> data_queue;
std::mutex mtx;
std::condition_variable cv;
bool done = false;
// 生产者任务:生成数据
void producer_task(int producer_id) {
    for (int i = 0; i < 5; ++i) {
        std::this_thread::sleep_for(std::chrono::milliseconds(100));  // 模拟工作
        {
            std::unique_lock<std::mutex> lock(mtx);
            data_queue.push(i + producer_id * 100);  // 生成数据
            std::cout << "Producer " << producer_id << " produced: " << i + producer_id * 100 << "\n";
        }
        cv.notify_one(); // 通知一个消费者
    }
    {
        std::unique_lock<std::mutex> lock(mtx);
        done = true;  // 表示生产完成
    }
    cv.notify_all();
}
// 消费者任务:消费数据
void consumer_task(int consumer_id) {
    while (true) {
        std::unique_lock<std::mutex> lock(mtx);
        cv.wait(lock, [] { return !data_queue.empty() || done; });
        while (!data_queue.empty()) {
            int data = data_queue.front();
            data_queue.pop();
            std::cout << "Consumer " << consumer_id << " consumed: " << data << "\n";
        }
        if (done) break;
    }
}
int main() {
    // 创建一个包含4个线程的线程池
    ThreadPool pool(4);
    // 向线程池添加生产者任务
    pool.enqueue_task(std::bind(producer_task, 1));
    pool.enqueue_task(std::bind(producer_task, 2));
    // 向线程池添加消费者任务
    pool.enqueue_task(std::bind(consumer_task, 1));
    pool.enqueue_task(std::bind(consumer_task, 2));
    // 等待任务完成
    std::this_thread::sleep_for(std::chrono::seconds(3));
    // 关闭线程池
    pool.shutdown();
    std::cout << "All tasks completed.\n";
    return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58

在上面的脚本中,我们创建了一个包含四个工作线程的线程池。然后,我们使用pool.enqueue_task()将任务添加到线程池。生产者任务生成数据并将其推送到队列中,而消费者任务等待数据并在数据可用时进行处理。线程池中的工作线程从队列中获取任务并并发执行它们。线程池确保任务在可用线程之间分配,平衡负载,并确保没有线程长时间处于空闲状态。

与前面的示例一样,生产者在有新数据可用时使用条件变量通知消费者。消费者等待任务生成,并在任务可用时立即进行处理。在所有任务都被添加到队列后,main()函数的线程等待几秒钟,让任务完成,然后调用pool.shutdown()停止线程池并加入所有工作线程。shutdown()函数将停止标志设置为true,通知所有工作线程,并在退出前等待它们完成当前任务。

这确保了线程池的优雅关闭,即所有任务在程序终止前都被处理完毕。此外,线程池将任务分配到多个工作线程中,确保了工作量的平衡。同样,消费者任务并行执行,减少了空闲时间并最大化了CPU利用率。这种平衡使得程序能够更高效地运行,特别是在有多个线程竞争系统资源的情况下。

在实际应用中,可以根据系统的能力调整线程池中的线程数量,从而进一步优化线程池。例如,一种常见的方法是将线程数量设置为CPU核心数,确保每个核心都有一个线程来执行任务,这可以防止过度订阅并最小化上下文切换开销。

size_t num_threads = std::thread::hardware_concurrency();
ThreadPool pool(num_threads);
1
2

std::thread::hardware_concurrency()函数返回系统支持的硬件线程数量,这有助于确定线程池的最佳线程数量。通过这种方式,你可以动态调整线程池的大小,以匹配系统的能力并最大化性能。

整个实现展示了如何使用线程池在多线程应用程序中平衡负载,从而实现更好的可扩展性、降低开销并改进资源管理。如果你能掌握线程池和负载平衡,就可以设计出强大的多线程应用程序,在充分利用系统资源的同时保持最佳性能。

# 总结

简而言之,本章首先介绍了如何使用std::thread类轻松创建和管理线程,展示了如何生成和同步多个线程以执行并行任务。强调了使用互斥锁(mutex)和锁(lock)的重要性,这有助于避免多个线程访问共享数据时出现竞态条件(race condition)。通过使用std::lock_guard和std::unique_lock等工具,在确保线程安全的同时最小化性能瓶颈。

本章进一步解释了如何通过控制线程获取多个锁的方式来避免死锁(deadlock),使用std::lock()安全地管理多个资源的锁定。此外,还引入了条件变量(condition variable)作为一种管理线程间通信的方式。这些条件变量允许线程在恢复任务之前高效地等待信号。它还包含实际示例,展示了如何实现高级等待机制,并在满足特定条件(如在生产者 - 消费者模型中)时向线程发送信号。

最后,探讨了线程池的概念,它是平衡工作量、减少频繁创建和销毁线程的开销的有效方法。线程池允许将多个任务排队,并由固定数量的线程执行,确保在保持高性能的同时优化系统资源的使用。通过这种方法,多线程应用程序可以实现更好的可扩展性和负载平衡,使其在处理并发任务时更加高效。

第6章:优化内存性能
第8章:线程同步与原子操作精通

← 第6章:优化内存性能 第8章:线程同步与原子操作精通→

最近更新
01
C++语言面试问题集锦 目录与说明
03-27
02
第四章 Lambda函数
03-27
03
第二章 关键字static及其不同用法
03-27
更多文章>
Copyright © 2024-2025 沪ICP备2023015129号 张小方 版权所有
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式