第7章:面向专家的高级多线程编程
# 第7章:面向专家的高级多线程编程
# 概述
在本章中,我们将探讨C++中多线程的高级内容。高效管理多个线程可以显著提升性能。我们将从学习如何轻松创建和管理线程开始,让你能够以结构化且高效的方式完全掌控线程的创建、管理和终止。接着,我们会研究如何通过使用互斥锁(mutexes)和锁(locks)来最大化并行性,防止竞态条件(race conditions),确保线程安全地访问共享资源,避免数据损坏或不可预测的行为。
下一部分将重点介绍如何使用条件变量(condition variables)来控制线程通信。最后,我们会了解线程池(thread pools)如何通过管理一组可复用的线程来帮助平衡工作负载分配,更好地利用系统资源,这些线程能够同时处理多个任务。掌握这些概念后,你将有能力权威地处理多线程问题。
# 轻松创建和控制线程
多线程允许程序通过将工作分配到多个线程来同时执行多个任务。每个线程独立运行,共享相同的进程资源,如内存,但与其他线程并发执行。对于能够并行化任务的应用程序,如数据处理、模拟和实时系统,这可以显著提高性能。在这里,我们将探索如何使用标准库中的std::thread
来创建、管理和同步线程,以实现高效的多线程编程。
# 基本线程创建和管理
std::thread
类是创建和管理线程的关键组件。下面是一个示例程序,展示了如何创建一个基本线程,该线程与主线程并行执行一个任务:
#include <iostream>
#include <thread>
void task() {
std::cout << "Thread ID: " << std::this_thread::get_id() << " is running the task.\n";
}
int main() {
// 创建一个新线程来运行任务
std::thread t1(task);
// 从主线程打印信息
std::cout << "Main thread ID: " << std::this_thread::get_id() << " is executing.\n";
// 等待线程完成
t1.join();
return 0;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
在这个程序中,函数task()
是一个简单的任务,用于打印线程ID。我们使用std::thread
创建一个新线程t1
,它与主线程并发运行这个任务。join()
函数用于确保主线程在继续执行之前等待t1
完成。如果没有join()
,主线程可能在t1
仍在运行时终止,这可能会导致任务未完成。std::this_thread::get_id()
返回当前执行线程的ID,以便区分主线程和创建的线程。
# 高级线程管理和同步
在处理多个线程时,尤其是在对性能要求苛刻的应用程序中,你需要仔细管理线程间的交互。现在,为了演示高级线程管理,我们将改进前面的示例,创建多个线程,每个线程执行一个更大任务的一部分。
# 示例程序:创建多个线程
假设我们想要并行执行一个计算任务,将工作分配给几个线程。为此,我们将使用多个线程对数组元素求和,每个线程负责对数组的一部分进行求和。
#include <iostream>
#include <thread>
#include <vector>
#include <numeric> // 用于std::accumulate
// 每个线程对数组的一部分求和的函数
void partial_sum(const std::vector<int>& data, size_t start, size_t end, int& result) {
result = std::accumulate(data.begin() + start, data.begin() + end, 0);
}
int main() {
const size_t num_elements = 1000;
const size_t num_threads = 4;
// 生成一个数字数组
std::vector<int> data(num_elements);
for (size_t i = 0; i < num_elements; ++i) {
data[i] = i + 1; // 填充1到1000的数字
}
// 创建线程和部分和结果
std::vector<std::thread> threads;
std::vector<int> results(num_threads, 0); // 每个线程在这里存储其部分结果
size_t block_size = num_elements / num_threads;
// 创建线程来计算部分和
for (size_t i = 0; i < num_threads; ++i) {
size_t start = i * block_size;
size_t end = (i == num_threads - 1)? num_elements : (i + 1) * block_size;
threads.emplace_back(partial_sum, std::ref(data), start, end, std::ref(results[i]));
}
// 等待所有线程完成
for (auto& t : threads) {
t.join();
}
// 对部分结果求和
int total_sum = std::accumulate(results.begin(), results.end(), 0);
std::cout << "Total sum: " << total_sum << "\n";
return 0;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
在这里,我们将数组data
分成num_threads
个部分,这种分工有助于将工作负载均匀地分布到多个CPU核心上,对于大型数据集可以提高性能。
此外,我们使用std::ref
传递对向量data
和结果results[i]
的引用。这确保每个线程可以修改其各自的结果,而无需创建不必要的数据副本。所有线程创建完成后,主线程使用join()
等待每个线程完成。这确保只有在所有线程完成工作后,才会对部分结果进行最终求和。所有线程完成后,我们使用std::accumulate()
对每个线程的结果求和,得到数组的总和。
这整个过程展示了创建和同步线程如何提高那些可以分解为更小、独立工作单元的任务的性能。
# 线程安全和数据竞争
当多个线程访问共享资源(例如共享数据或内存)时,我们必须确保它们的访问是协调的,以防止数据竞争。数据竞争是指两个或多个线程同时访问共享数据,导致不可预测的行为。例如,如果两个线程试图同时写入同一内存位置,数据的最终值可能取决于执行顺序,而在多线程程序中,执行顺序通常是不确定的。
在上面的示例中,每个线程处理数组的不同部分,并且有自己的结果存储(results[i]
),因此不存在共享内存冲突。然而,在一些场景中,线程必须更新共享数据,这时就需要像互斥锁(mutexes)和锁(locks)这样的机制来确保线程安全。
# 为高性能应用程序创建线程
在高性能并行应用程序中,可以应用以下技术来进一步改进复杂程序中的线程管理:
# 线程池(Thread Pooling)
与不断创建和销毁线程(这可能开销很大)不同,线程池允许你维持固定数量的线程,这些线程可以被多个任务复用。这减少了线程创建和销毁的开销,特别是在任务小且频繁的应用程序中。
# 基于任务的并行(Task-based Parallelism)
现代C++提供了像std::async
和std::future
这样的更高级抽象,用于处理异步任务,而不是直接管理线程。这些抽象允许运行时高效地管理线程执行,而无需显式的线程管理。
# 负载均衡(Load Balancing)
如果一个线程比其他线程完成得早很多,一些CPU核心可能会闲置,从而降低程序的整体效率。像动态调度或自适应分区这样的技术可以帮助动态平衡工作负载 。
# 示例程序:使用std::async
动态创建线程
我们还可以使用std::async
来异步处理任务,而无需手动创建和管理线程。下面的示例展示了如何使用std::async
完成相同的并行求和任务:
#include <iostream>
#include <vector>
#include <numeric>
#include <future> // 用于std::async和std::future
// 每个线程对数组的一部分求和的函数
int partial_sum(const std::vector<int>& data, size_t start, size_t end) {
return std::accumulate(data.begin() + start, data.begin() + end, 0);
}
int main() {
const size_t num_elements = 1000;
const size_t num_threads = 4;
// 生成一个数字数组
std::vector<int> data(num_elements);
for (size_t i = 0; i < num_elements; ++i) {
data[i] = i + 1; // 填充1到1000的数字
}
// 创建futures来处理部分和
std::vector<std::future<int>> futures;
size_t block_size = num_elements / num_threads;
// 异步创建任务来计算部分和
for (size_t i = 0; i < num_threads; ++i) {
size_t start = i * block_size;
size_t end = (i == num_threads - 1)? num_elements : (i + 1) * block_size;
futures.push_back(std::async(std::launch::async, partial_sum, std::cref(data), start, end));
}
// 对每个future的结果求和
int total_sum = 0;
for (auto& future : futures) {
total_sum += future.get(); // 等待每个任务完成
}
std::cout << "Total sum: " << total_sum << "\n";
return 0;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
在本节中,我们展示了如何创建、管理和同步线程以实现并行执行,显著提高数据并行任务的性能。此外,我们还探索了高级线程管理技术,如使用std::async
进行基于任务的并行和动态线程创建,这些技术实现了更灵活和高效的多线程编程。
# 使用互斥锁(Mutex)和锁(Lock)实现并行
在多线程编程中,一个关键挑战是确保多个线程能够安全地访问共享资源。当多个线程试图同时修改或读取共享数据时,可能会导致竞态条件(race condition)或数据损坏,进而引发不可预测和错误的行为。虽然互斥锁和锁是确保线程安全的强大工具,但使用不当会导致诸如死锁(deadlock)之类的问题,即两个或多个线程相互等待对方释放资源,导致程序无限期挂起。在本节中,我们将探讨如何有效地使用互斥锁和锁,并展示避免常见陷阱(如死锁和竞态条件)的实用技巧。
# 什么是互斥锁和锁?
互斥锁(mutex)是一种同步原语,用于防止多个线程同时访问共享资源。当一个线程锁定(lock)一个互斥锁时,它获得对共享资源的独占访问权,在互斥锁被释放之前,其他线程无法获取相同的锁。
锁(lock)是一个更通用的概念,表示对互斥锁的所有权。C++ 提供了几种类型的锁,例如:
std::lock_guard
:在创建时自动获取(acquire)一个互斥锁,并在超出作用域时释放它。std::unique_lock
:比lock_guard
更灵活,允许手动锁定和解锁互斥锁。
这些同步机制可以确保一次只有一个线程访问共享数据,从而防止竞态条件。
# 示例程序:使用互斥锁保护共享数据
我们从一个简单的示例开始,展示互斥锁的使用。假设有两个线程对一个共享计数器进行递增操作。如果没有适当的同步,两个线程可能会同时尝试修改计数器,导致结果错误。
#include <iostream>
#include <thread>
#include <mutex>
int counter = 0; // 共享资源
std::mutex mtx; // 用于保护共享资源的互斥锁
void increment() {
for (int i = 0; i < 1000000; ++i) {
std::lock_guard<std::mutex> lock(mtx); // 锁定互斥锁以防止竞态条件
++counter;
}
}
int main() {
std::thread t1(increment);
std::thread t2(increment);
t1.join();
t2.join();
std::cout << "Final counter value: " << counter << "\n";
return 0;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
counter
变量在两个线程之间共享,mtx
是一个互斥锁,用于控制对counter
的访问。- 在
increment()
函数内部,我们在修改counter
之前使用std::lock_guard
获取互斥锁。这确保了一次只有一个线程可以递增计数器。当lock_guard
超出作用域(在每次循环迭代结束时),锁会自动释放。 join()
调用确保在程序退出之前,两个线程都完成执行。
在上述示例脚本中,使用互斥锁防止了竞态条件,确保了计数器的最终值是正确的。
# 示例程序:使用多个互斥锁避免死锁
在更复杂的程序中,线程可能需要获取多个锁才能访问不同的共享资源。如果两个线程以不同的顺序锁定资源,可能会导致死锁,即两个线程都在等待对方释放资源,从而都无法继续执行。
现在,我们考虑一个示例,其中两个共享资源由两个互斥锁保护。查看以下示例程序,该程序展示了一个潜在的死锁情况:
#include <iostream>
#include <thread>
#include <mutex>
std::mutex mtx1;
std::mutex mtx2;
void task1() {
std::lock_guard<std::mutex> lock1(mtx1); // 首先锁定mtx1
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 模拟一些工作
std::lock_guard<std::mutex> lock2(mtx2); // 接下来锁定mtx2
std::cout << "Task 1 acquired both locks.\n";
}
void task2() {
std::lock_guard<std::mutex> lock2(mtx2); // 首先锁定mtx2
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 模拟一些工作
std::lock_guard<std::mutex> lock1(mtx1); // 接下来锁定mtx1
std::cout << "Task 2 acquired both locks.\n";
}
int main() {
std::thread t1(task1);
std::thread t2(task2);
t1.join();
t2.join();
return 0;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
在上述程序中:
我们在task1()
中先锁定mtx1
,然后锁定mtx2
。而在task2()
中,操作顺序相反:先锁定mtx2
,然后锁定mtx1
。如果task1()
锁定了mtx1
,同时task2()
锁定了mtx2
,两个线程将都在等待对方释放第二个锁,从而导致死锁。
对std::this_thread::sleep_for()
的调用模拟了获取第一个锁和第二个锁之间的延迟,使死锁场景更容易出现。现在,为了防止死锁,我们必须确保所有线程以相同的顺序获取锁。
# 使用std::lock()
避免死锁
C++ 提供了std::lock()
,以无死锁的方式安全地锁定多个互斥锁。它尝试同时锁定所有给定的互斥锁,确保没有线程会因为等待其他线程持有的资源而陷入阻塞。
下面展示了如何修改前面的示例,使用std::lock()
避免死锁:
#include <iostream>
#include <thread>
#include <mutex>
std::mutex mtx1;
std::mutex mtx2;
void task1() {
// 以无死锁的方式锁定两个互斥锁
std::lock(mtx1, mtx2);
// 使用std::lock_guard管理两个锁
std::lock_guard<std::mutex> lock1(mtx1, std::adopt_lock);
std::lock_guard<std::mutex> lock2(mtx2, std::adopt_lock);
std::cout << "Task 1 acquired both locks.\n";
}
void task2() {
// 以无死锁的方式锁定两个互斥锁
std::lock(mtx1, mtx2);
// 使用std::lock_guard管理两个锁
std::lock_guard<std::mutex> lock1(mtx1, std::adopt_lock);
std::lock_guard<std::mutex> lock2(mtx2, std::adopt_lock);
std::cout << "Task 2 acquired both locks.\n";
}
int main() {
std::thread t1(task1);
std::thread t2(task2);
t1.join();
t2.join();
return 0;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
这里,对std::lock(mtx1, mtx2)
的调用会同时锁定mtx1
和mtx2
。它在内部处理锁的顺序,因此没有线程会无限期等待。使用std::lock()
锁定两个互斥锁后,我们使用带有std::adopt_lock
标记的std::lock_guard
。这告诉lock_guard
互斥锁已经被锁定,所以它不需要再次锁定。这确保了当lock_guard
对象超出作用域时,互斥锁会自动释放。
# 示例程序:使用std::unique_lock
避免竞态条件
多线程程序中的另一个常见问题是竞态条件,即两个或多个线程试图同时修改共享数据。我们可以使用std::unique_lock
避免竞态条件,它比std::lock_guard
更灵活,因为它允许手动锁定和解锁互斥锁。
为此,我们将修改前面的计数器示例,使用std::unique_lock
代替std::lock_guard
:
#include <iostream>
#include <thread>
#include <mutex>
int counter = 0;
std::mutex mtx;
void increment() {
for (int i = 0; i < 1000000; ++i) {
std::unique_lock<std::mutex> lock(mtx); // 锁定互斥锁
++counter;
// 锁可以显式解锁,或者在超出作用域时解锁
}
}
int main() {
std::thread t1(increment);
std::thread t2(increment);
t1.join();
t2.join();
std::cout << " Final counter value: " << counter << "\n";
return 0;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
在上述示例脚本中,std::unique_lock
提供了对锁定和解锁互斥锁的更多控制,在需要在同一作用域内多次锁定和解锁互斥锁的情况下非常有用。然而,互斥锁使用不当可能会导致死锁,即两个或多个线程无限期地等待对方释放资源。
# 使用条件变量(Condition Variables)驯服线程通信
在处理多线程时,经常会遇到这样的情况:线程之间需要相互通信,或者在继续执行之前等待特定条件得到满足。管理这种情况最有效的方法之一就是通过条件变量。条件变量允许一个线程进入睡眠状态,等待特定条件变为真。当条件满足时,另一个线程可以向等待的线程发出信号,使其恢复执行。这种机制在诸如生产者 - 消费者模型等场景中至关重要,在该模型中,一个线程生成数据,另一个线程消费数据。
在本节中,我们将探讨条件变量的工作原理,演示如何有效地使用它们,并构建一个实际的多线程示例,利用条件变量来设计高效的等待机制以及线程之间的信号传递。
# 条件变量如何工作?
条件变量与互斥锁(mutex)协同工作,以控制对共享资源的访问。以下是一般过程:
- 一个线程锁定互斥锁,并检查特定条件是否为真。
- 如果条件为假,该线程调用条件变量的
wait()
方法,这会释放互斥锁并使线程进入睡眠状态。 - 另一个线程修改共享数据,并调用条件变量的
notify_one()
或notify_all()
方法,向等待的线程发出信号。 - 等待的线程醒来,重新获取互斥锁,并检查条件现在是否为真。如果是,线程继续执行;如果不是,则再次等待。
条件变量的关键优势在于,它们允许线程在等待条件变化时进入睡眠状态,而不是主动轮询并消耗CPU资源。
# 示例程序:使用条件变量
为了更实际地理解,我们将创建一个简单的生产者 - 消费者场景,其中生产者线程生成数据,消费者线程在消费数据之前等待数据被生成。
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
std::queue<int> data_queue;
std::mutex mtx;
std::condition_variable cv;
bool done = false;
void producer() {
for (int i = 0; i < 10; ++i) {
std::unique_lock<std::mutex> lock(mtx);
data_queue.push(i); // Produce data
std::cout << "Produced: " << i << "\n";
lock.unlock();
cv.notify_one(); // Notify the consumer
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Simulate work
}
// Signal that production is done
std::unique_lock<std::mutex> lock(mtx);
done = true;
lock.unlock();
cv.notify_one(); // Notify the consumer that production is finished
}
void consumer() {
while (true) {
std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock, [] { return !data_queue.empty() || done; }); // Wait for data or done signal
while (!data_queue.empty()) {
int data = data_queue.front();
data_queue.pop();
std::cout << "Consumed: " << data << "\n";
}
if (done) break;
}
}
int main() {
std::thread prod(producer);
std::thread cons(consumer);
prod.join();
cons.join();
return 0;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
在这个程序中,我们使用std::queue<int>
作为存储生成数据的共享数据结构。std::condition_variable cv
用于同步生产者和消费者。生产者通过调用cv.notify_one()
,在有新数据可用时通知消费者,消费者使用cv.wait()
在条件变量上等待。
producer()
函数生成数字并将它们推送到队列中。添加数据后,它解锁互斥锁,并使用cv.notify_one()
通知消费者。在生成10个项目后,它将done
标志设置为true
,并最后一次通知消费者,表示生产已完成。
consumer()
函数使用cv.wait()
等待数据。wait()
函数释放互斥锁并使线程进入睡眠状态,直到条件变量收到通知。一旦有数据可用,消费者就会处理它。当done
标志设置为true
时,消费者停止等待并结束。
通过这个示例,展示了条件变量如何帮助线程在不进行忙等待或消耗CPU资源的情况下,等待特定条件。在多个线程需要协调其操作的复杂系统中,这些条件变量变得更加有用。我们将把生产者 - 消费者示例扩展到一个更高级的场景,其中多个生产者和消费者并行工作,但我们要确保消费者在数据完全生成之前不会消费数据。
在这种情况下,我们可以使用条件变量有效地管理线程之间的通信流,并防止竞态条件(race conditions)。
# 示例程序:使用条件变量的多个生产者和消费者
在上述程序的基础上,我们将有两个生产者线程生成数据,两个消费者线程消费数据。条件变量确保消费者等待数据可用,生产者在数据准备好时通知消费者。
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <vector>
std::queue<int> data_queue;
std::mutex mtx;
std::condition_variable cv;
bool done = false;
int production_count = 0; // Track the number of produced items
// Producer function: Generates data and pushes it to the queue
void producer(int producer_id, int num_items) {
for (int i = 0; i < num_items; ++i) {
std::unique_lock<std::mutex> lock(mtx);
data_queue.push(i + (producer_id * 100)); // Distinguish producer data
std::cout << "Producer " << producer_id << " produced: " << i + (producer_id * 100) << "\n";
++production_count;
lock.unlock();
cv.notify_one(); // Notify a consumer
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Simulate work
}
// Signal that this producer is done producing
std::unique_lock<std::mutex> lock(mtx);
done = true;
cv.notify_all(); // Notify all consumers that production is finished
}
// Consumer function: Consumes data from the queue
void consumer(int consumer_id) {
while (true) {
std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock, [] { return !data_queue.empty() || done; }); // Wait for data or done signal
while (!data_queue.empty()) {
int data = data_queue.front();
data_queue.pop();
std::cout << "Consumer " << consumer_id << " consumed: " << data << "\n";
}
if (done && data_queue.empty()) break; // Exit if done and no more data
}
}
int main() {
const int num_producers = 2;
const int num_consumers = 2;
const int items_per_producer = 5;
// Create producer threads
std::vector<std::thread> producers;
for (int i = 0; i < num_producers; ++i) {
producers.emplace_back(producer,i, items_per_producer);
}
// Create consumer threads
std::vector<std::thread> consumers;
for (int i = 0; i < num_consumers; ++i) {
consumers.emplace_back(consumer, i);
}
// Wait for producers to finish
for (auto& prod : producers) {
prod.join();
}
// Wait for consumers to finish
for (auto& cons : consumers) {
cons.join();
}
std::cout << "All producers and consumers have finished.\n";
return 0;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
在这个示例中:
- 有两个生产者,每个生产者生成五个项目。
producer()
函数将数据推送到队列中,并使用条件变量通知消费者。每个生产者生成的数据与其他生产者不同,以模拟不同的数据来源。 - 有两个消费者,它们使用
cv.wait()
等待数据可用。每个消费者检查条件变量,并在收到通知时消费数据。 - 消费者通过条件变量高效地等待,这确保了它们在等待数据生成时不会消耗CPU资源。它们仅在数据可用或所有生产者都完成时才会醒来。
- 通过同时使用互斥锁和条件变量,我们确保对共享的
data_queue
的访问是线程安全的。cv.wait()
函数在线程等待时自动释放互斥锁,并在线程收到通知时重新获取它。
producer()
函数在将数据添加到队列后调用cv.notify_one()
,这会唤醒一个等待的消费者线程。一旦所有数据都已生成,生产者使用cv.notify_all()
发出完成信号,这会唤醒所有剩余的消费者以完成它们的工作。
通过使线程在等待条件变化时进入睡眠状态,实现了消除忙等待和减少CPU使用。在这里,我们展示了如何通过使用条件变量同步多个线程,在生产者 - 消费者场景中避免竞态问题并有效地管理共享资源。
# 使用线程池进行负载均衡
# 背景
在多线程应用程序中,管理线程的创建和销毁会带来显著的开销,尤其是在任务较小或频繁执行的情况下。每次创建一个新线程时,都要分配资源,而一旦线程完成任务,这些资源又会被释放。在对性能要求苛刻的应用程序中,这种开销可能会抵消多线程带来的优势。为了解决这个问题,线程池(thread pool)提供了一种高效的方式,可以复用固定数量的线程,从而降低不断创建和销毁线程所带来的成本。
线程池维护着一组随时准备执行任务的线程。它不是为每个任务都创建一个新线程,而是将任务提交到一个队列中,线程池里可用的线程会从队列中取出任务并执行。这种模式可以更好地控制资源的利用,确保CPU核心得到充分利用,并有助于在多个线程之间平衡工作负载,从而实现性能优化。
# 线程池的工作原理
线程池通常由以下组件组成:
- 线程池:在线程池初始化时会创建固定数量的线程。这些线程等待分配任务。
- 任务队列:任务(工作单元)被提交到线程池并放入队列中。当一个线程可用时,它会从队列中取出一个任务并执行。
- 任务同步:使用条件变量(condition variable)或其他同步机制,在队列中有新任务时通知线程。
- 负载均衡:线程池确保任务在线程之间均匀分配,最大限度地减少空闲时间并最大化CPU利用率。
# 示例程序:设计一个线程池
我们将为多线程应用程序设计一个线程池。该线程池将由工作线程组成,这些工作线程会不断从共享任务队列中获取并执行任务。所有任务处理完毕后,线程池将优雅地关闭。
下面是一个简单线程池的代码:
#include <iostream>
#include <vector>
#include <thread>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <atomic>
class ThreadPool {
public:
ThreadPool(size_t num_threads);
~ThreadPool();
// Add a new task to the thread pool
void enqueue_task(std::function<void()> task);
// Shutdown the thread pool
void shutdown();
private:
std::vector<std::thread> workers;
std::queue<std::function<void()>> task_queue;
std::mutex queue_mutex;
std::condition_variable cv;
std::atomic<bool> stop;
// Worker thread function
void worker_thread();
};
// ThreadPool constructor: initializes worker threads
ThreadPool::ThreadPool(size_t num_threads) : stop(false) {
for (size_t i = 0; i < num_threads; ++i) {
workers.emplace_back(&ThreadPool::worker_thread, this);
}
}
// ThreadPool destructor: shuts down the pool and joins all threads
ThreadPool::~ThreadPool() {
shutdown();
}
// Add a task to the task queue
void ThreadPool::enqueue_task(std::function<void()> task) {
{
std::unique_lock<std::mutex> lock(queue_mutex);
task_queue.push(std::move(task));
}
cv.notify_one();
}
// Worker thread function: continuously fetch and execute tasks
void ThreadPool::worker_thread() {
while (true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(queue_mutex);
cv.wait(lock, [this] { return stop || !task_queue.empty(); });
if (stop && task_queue.empty()) return;
task = std::move(task_queue.front());
task_queue.pop();
}
task();
}
}
// Shutdown the thread pool
void ThreadPool::shutdown() {
{
std::unique_lock<std::mutex> lock(queue_mutex);
stop = true;
}
cv.notify_all();
for (std::thread &worker : workers) {
if (worker.joinable()) {
worker.join();
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
在上述程序中:
ThreadPool
构造函数使用指定数量的线程初始化线程池。每个工作线程运行worker_thread()
函数,该函数不断从任务队列中获取任务。enqueue_task()
函数允许外部代码向线程池提交任务。然后任务被推送到队列中,并且会通知其中一个工作线程唤醒并处理该任务。- 每个工作线程不断从队列中获取任务并执行。如果没有可用任务且线程池尚未停止,线程会等待直到有任务可用。一旦停止标志(stop flag)设置为
true
,工作线程就会停止获取任务并优雅地退出。 shutdown()
函数将停止标志设置为true
,并通知所有工作线程完成它们的任务。一旦所有线程完成当前任务,线程池就会关闭。
# 示例程序:为线程池实现任务
现在,让我们扩展生产者 - 消费者场景以使用线程池。在这里,我们将有多个模拟生产者和消费者的任务,线程池将在多个线程之间平衡负载。
#include <iostream>
#include <thread>
#include <mutex>
#include <queue>
#include <chrono>
#include <condition_variable>
#include <functional>
// 全局数据结构
std::queue<int> data_queue;
std::mutex mtx;
std::condition_variable cv;
bool done = false;
// 生产者任务:生成数据
void producer_task(int producer_id) {
for (int i = 0; i < 5; ++i) {
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 模拟工作
{
std::unique_lock<std::mutex> lock(mtx);
data_queue.push(i + producer_id * 100); // 生成数据
std::cout << "Producer " << producer_id << " produced: " << i + producer_id * 100 << "\n";
}
cv.notify_one(); // 通知一个消费者
}
{
std::unique_lock<std::mutex> lock(mtx);
done = true; // 表示生产完成
}
cv.notify_all();
}
// 消费者任务:消费数据
void consumer_task(int consumer_id) {
while (true) {
std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock, [] { return !data_queue.empty() || done; });
while (!data_queue.empty()) {
int data = data_queue.front();
data_queue.pop();
std::cout << "Consumer " << consumer_id << " consumed: " << data << "\n";
}
if (done) break;
}
}
int main() {
// 创建一个包含4个线程的线程池
ThreadPool pool(4);
// 向线程池添加生产者任务
pool.enqueue_task(std::bind(producer_task, 1));
pool.enqueue_task(std::bind(producer_task, 2));
// 向线程池添加消费者任务
pool.enqueue_task(std::bind(consumer_task, 1));
pool.enqueue_task(std::bind(consumer_task, 2));
// 等待任务完成
std::this_thread::sleep_for(std::chrono::seconds(3));
// 关闭线程池
pool.shutdown();
std::cout << "All tasks completed.\n";
return 0;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
在上面的脚本中,我们创建了一个包含四个工作线程的线程池。然后,我们使用pool.enqueue_task()
将任务添加到线程池。生产者任务生成数据并将其推送到队列中,而消费者任务等待数据并在数据可用时进行处理。线程池中的工作线程从队列中获取任务并并发执行它们。线程池确保任务在可用线程之间分配,平衡负载,并确保没有线程长时间处于空闲状态。
与前面的示例一样,生产者在有新数据可用时使用条件变量通知消费者。消费者等待任务生成,并在任务可用时立即进行处理。在所有任务都被添加到队列后,main()
函数的线程等待几秒钟,让任务完成,然后调用pool.shutdown()
停止线程池并加入所有工作线程。shutdown()
函数将停止标志设置为true
,通知所有工作线程,并在退出前等待它们完成当前任务。
这确保了线程池的优雅关闭,即所有任务在程序终止前都被处理完毕。此外,线程池将任务分配到多个工作线程中,确保了工作量的平衡。同样,消费者任务并行执行,减少了空闲时间并最大化了CPU利用率。这种平衡使得程序能够更高效地运行,特别是在有多个线程竞争系统资源的情况下。
在实际应用中,可以根据系统的能力调整线程池中的线程数量,从而进一步优化线程池。例如,一种常见的方法是将线程数量设置为CPU核心数,确保每个核心都有一个线程来执行任务,这可以防止过度订阅并最小化上下文切换开销。
size_t num_threads = std::thread::hardware_concurrency();
ThreadPool pool(num_threads);
2
std::thread::hardware_concurrency()
函数返回系统支持的硬件线程数量,这有助于确定线程池的最佳线程数量。通过这种方式,你可以动态调整线程池的大小,以匹配系统的能力并最大化性能。
整个实现展示了如何使用线程池在多线程应用程序中平衡负载,从而实现更好的可扩展性、降低开销并改进资源管理。如果你能掌握线程池和负载平衡,就可以设计出强大的多线程应用程序,在充分利用系统资源的同时保持最佳性能。
# 总结
简而言之,本章首先介绍了如何使用std::thread
类轻松创建和管理线程,展示了如何生成和同步多个线程以执行并行任务。强调了使用互斥锁(mutex)和锁(lock)的重要性,这有助于避免多个线程访问共享数据时出现竞态条件(race condition)。通过使用std::lock_guard
和std::unique_lock
等工具,在确保线程安全的同时最小化性能瓶颈。
本章进一步解释了如何通过控制线程获取多个锁的方式来避免死锁(deadlock),使用std::lock()
安全地管理多个资源的锁定。此外,还引入了条件变量(condition variable)作为一种管理线程间通信的方式。这些条件变量允许线程在恢复任务之前高效地等待信号。它还包含实际示例,展示了如何实现高级等待机制,并在满足特定条件(如在生产者 - 消费者模型中)时向线程发送信号。
最后,探讨了线程池的概念,它是平衡工作量、减少频繁创建和销毁线程的开销的有效方法。线程池允许将多个任务排队,并由固定数量的线程执行,确保在保持高性能的同时优化系统资源的使用。通过这种方法,多线程应用程序可以实现更好的可扩展性和负载平衡,使其在处理并发任务时更加高效。