第8章:线程同步与原子操作精通
# 第8章:线程同步与原子操作精通
# 概述
本章将探讨高级线程同步和原子操作的概念,重点关注那些能实现更高效、更可靠多线程编程的方法。我们将从简化线程同步技术入手,着重讲解如何预防死锁以及实现线程间的安全协作。接下来,我们会学习原子操作,它能让线程以“要么全做,要么全不做”的方式执行某些操作,而无需使用锁。
我们还将研究如何通过Future、Promise和Task来协调线程,这些工具为管理异步Task和线程间通信提供了更高级的抽象。最后,我们会讨论无锁数据结构,它允许多个线程在不使用锁的情况下访问和修改共享数据,从而提高性能和可扩展性,在高并发系统中尤其如此。总的来说,在本章中你将学习到高级同步技术,这些技术将帮助你创建既高效又功能丰富的多线程应用程序。
# 针对死锁的线程同步
在当今的多线程应用程序中,从操作系统到大规模分布式系统、数据库事务,再到游戏引擎等实时应用程序,死锁可能在各种场景下出现。这些程序通常依赖多个线程或进程来并发处理Task,因此,正确的同步至关重要。当线程无限期地等待资源被释放时,整个程序可能会变得无响应,从而导致崩溃或严重的性能下降。
# 死锁的根本原因
有四个被称为“ Coffman条件”的条件,只有当这四个条件同时满足时,才会发生死锁:
- 互斥(Mutual Exclusion):至少有一个资源以非共享模式被持有,这意味着一次只有一个线程可以访问该资源。如果另一个线程试图访问它,就必须等待。
- 占有并等待(Hold and Wait):一个线程在等待获取其他线程当前持有的额外资源时,自身至少持有一个资源。
- 不可抢占(No Preemption):资源不能被强制从一个线程中夺走;只有当线程完成Task后,才会自愿释放资源。
- 循环等待(Circular Wait):存在一个线程的循环链,其中每个线程都持有一个资源,并等待链中下一个线程持有的另一个资源。
只有这四个条件全部成立,才会发生死锁。在大多数情况下,循环等待条件是关键因素,因为它形成了一个循环,使得没有线程能够继续执行。识别程序中这些条件的存在,是解决死锁可能性的第一步。
# 死锁的应用场景
# 数据库死锁
在数据库管理系统中,当多个事务试图同时锁定不同的行或表时,常常会发生死锁。例如,如果一个事务锁定了行A并等待行B,而另一个事务锁定了行B并等待行A,就会出现循环等待条件,两个事务都将陷入死锁。
# 操作系统
在操作系统中,当多个进程需要独占访问共享资源(如内存、文件或设备)时,可能会发生死锁。例如,两个进程可能各自持有不同的锁,并等待对方释放锁,从而导致死锁。
# 多线程应用程序
死锁在多线程程序中经常出现,线程在执行某些操作时需要获取多个锁。如果线程以不同的顺序获取锁,或者在等待其他锁时持有锁,就可能发生死锁。
# 死锁避免技术
可以通过打破Coffman条件中的一个或多个来避免死锁。在本节中,我们将探讨现代系统用于预防死锁和设计无死锁系统的几种先进技术。
# 资源排序
避免死锁最有效的技术之一是资源排序(Resource Ordering)。在这种方法中,所有线程必须按照预先定义的全局顺序获取资源。通过确保所有线程以相同的顺序锁定资源,消除了循环等待条件,因为没有线程会等待链中其他线程持有的资源。
例如,如果线程需要锁定资源A、B和C,可以建立一个全局排序规则,使所有线程必须按照A -> B -> C的顺序获取锁。这确保不会出现循环依赖,因为线程在持有更高顺序的资源后,不能再请求更低顺序的资源。
std::mutex mtxA, mtxB, mtxC;
// 线程1必须按照A -> B -> C的顺序锁定
void thread1() {
std::lock_guard<std::mutex> lockA(mtxA);
std::lock_guard<std::mutex> lockB(mtxB);
std::lock_guard<std::mutex> lockC(mtxC);
// 执行操作
}
// 线程2也必须按照A -> B -> C的顺序锁定
void thread2() {
std::lock_guard<std::mutex> lockA(mtxA);
std::lock_guard<std::mutex> lockB(mtxB);
std::lock_guard<std::mutex> lockC(mtxC);
// 执行操作
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
通过强制严格的锁获取顺序,我们可以防止循环等待条件,确保不会发生死锁。
# 锁层次结构和分层锁定
另一种方法是使用锁层次结构(Lock Hierarchies)。这种技术涉及为每个锁分配一个数字或层次值,并要求线程只能按升序获取锁。与资源排序类似,锁层次结构通过确保线程不会无序锁定资源来防止循环等待。
锁层次结构在具有复杂资源依赖关系的系统中特别有用。例如,在一个系统中,某些锁被认为比其他锁更关键,更关键的锁会被分配更低的层次值。只要线程在锁定了较低层次值的资源后,只锁定更高层次值的资源,就可以获取多个锁。
# 使用定时锁进行死锁检测
死锁检测是指允许死锁发生,但在死锁发生时进行检测,并通过强制终止死锁中的一个线程来解决问题。虽然在所有场景中都不是理想的解决方案,但死锁检测在数据库管理系统等系统中很有用,因为在这些系统中事务可以安全回滚。
一种更巧妙的死锁检测形式是定时锁。在C++中,std::timed_mutex
允许线程在指定的时间段内尝试获取锁。如果线程在规定时间内无法获取锁,它会放弃并继续执行,之后可能会再次尝试。这可以防止线程无限期阻塞,降低死锁发生的可能性。
#include <iostream>
#include <thread>
#include <mutex>
#include <chrono>
std::timed_mutex timed_mtx;
void task(int id) {
if (timed_mtx.try_lock_for(std::chrono::milliseconds(100))) {
std::cout << "Thread " << id << " acquired the lock.\n";
std::this_thread::sleep_for(std::chrono::milliseconds(200));
timed_mtx.unlock();
} else {
std::cout << "Thread " << id << " could not acquire the lock in time.\n";
}
}
int main() {
std::thread t1(task, 1);
std::thread t2(task, 2);
t1.join();
t2.join();
return 0;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
在上面的示例脚本中,每个线程都尝试获取一个定时锁。如果线程在100毫秒内无法获取锁,它会继续执行,而不会无限期阻塞。这种技术可以防止线程永远等待,从而降低死锁发生的几率。
# 线程优先级反转和优先级继承
死锁也可能由线程优先级反转(Thread Priority Inversion)引起,即高优先级线程被持有所需资源的低优先级线程阻塞。在严重依赖线程优先级的系统(如实时系统)中,优先级反转可能会导致性能瓶颈并引发死锁。
为了解决这个问题,现代系统采用优先级继承(Priority Inheritance),即持有资源的低优先级线程的优先级会暂时提高,以匹配被阻塞的高优先级线程的优先级。这确保低优先级线程能更快地完成Task并释放资源,使高优先级线程得以继续执行。
// 模拟优先级反转场景
// 优先级继承会提高持有锁的低优先级线程的优先级。
2
虽然C++没有直接提供对优先级继承的支持,但许多操作系统和实时调度库都实现了这种技术,以避免基于优先级调度系统中的死锁。此外,优先级继承和无锁数据结构等技术提供了更多有效管理线程同步的方法,确保多线程系统保持响应性、可扩展性和高性能。
# 精准执行原子操作
原子操作(Atomic operations)是作为单个、不可分割的步骤执行的操作。在C++中,原子操作由std::atomic
类模板提供,它确保对变量的读取-修改-写入操作以原子方式执行。这意味着没有其他线程可以中断或干扰该操作,从而在无需传统锁定机制的情况下确保线程安全。
在这里,我们将在示例程序中使用原子操作设计高效的并发算法,展示原子操作如何在保持线程安全和提高性能的同时,避免传统锁的开销。
# 原子操作概述
原子操作的主要优点是它们提供无锁同步。这使得线程可以在无需获取和释放锁的情况下修改共享数据,减少线程之间的竞争,提高整体系统性能。
C++ 提供了多种原子类型和操作,包括:
std::atomic<int>
:一种原子整数类型,支持原子递增、递减和其他操作。fetch_add()
和fetch_sub()
:以原子方式对变量进行加或减操作。compare_exchange_weak()
和compare_exchange_strong()
:根据特定条件以原子方式比较和交换值。- 内存顺序(Memory ordering):使用宽松(relaxed)、获取(acquire)和释放(release)语义控制内存顺序。
现在,我们将把这些原子操作应用到一个实际场景中,在多线程程序中高效管理共享资源。
# 示例程序:在计数器中使用原子操作
我们以一个共享计数器为例,多个线程会同时对其进行递增操作。传统上,我们可能会使用互斥锁(mutex)来保护对计数器的访问,确保一次只有一个线程可以修改计数器。然而,使用互斥锁会引入锁定开销,尤其是当有许多线程竞争访问时。但是,通过改用原子操作,我们可以在确保多个线程安全递增计数器的同时,消除对锁的需求。
以下是如何使用std::atomic
实现一个原子计数器:
#include <iostream>
#include <thread>
#include <vector>
#include <atomic>
std::atomic<int> atomic_counter(0); // 原子计数器变量
void increment_counter(int num_iterations) {
for (int i = 0; i < num_iterations; ++i) {
atomic_counter.fetch_add(1); // 以原子方式递增计数器
}
}
int main() {
const int num_threads = 4;
const int iterations_per_thread = 1000000;
// 创建线程以递增原子计数器
std::vector<std::thread> threads;
for (int i = 0; i < num_threads; ++i) {
threads.emplace_back(increment_counter, iterations_per_thread);
}
// 等待所有线程完成
for (auto& thread : threads) {
thread.join();
}
std::cout << "Final counter value: " << atomic_counter.load() << "\n";
return 0;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
在上述程序中:
std::atomic<int>
类型确保计数器可以被多个线程以原子方式递增。这消除了对锁的需求,因为原子操作保证即使多个线程同时访问,计数器也能被安全修改 。fetch_add()
函数用于以原子方式将计数器递增1。这个函数确保没有其他线程可以干扰该操作,这意味着无论有多少线程在递增它,计数器都将始终被正确更新。- 我们创建了四个线程,每个线程将计数器递增一百万次。通过使用原子操作,我们避免了锁定和解锁互斥锁的开销,否则当线程相互等待释放锁时,程序会变慢。
在所有线程完成工作后,会打印计数器的最终值。原子操作的使用确保了即使多个线程同时修改计数器,最终的计数器值也是正确的。
# 使用比较并交换的高级原子操作
除了像fetch_add()
这样的基本原子操作外,原子操作还可以用于实现更复杂的同步机制。其中最强大的原子操作之一是比较并交换(Compare-and-Swap,CAS),它允许线程根据共享变量的当前值有条件地更新它。这个操作在设计无锁数据结构和算法时特别有用。
compare_exchange_weak()
和compare_exchange_strong()
函数实现了CAS。它们将原子变量的当前值与预期值进行比较,如果匹配,则更新变量。否则,操作失败,线程可以重试。
以下是一个使用CAS实现简单无锁算法的示例程序:
#include <iostream>
#include <thread>
#include <atomic>
std::atomic<int> shared_value(0);
void compare_and_swap_task() {
int expected_value = 0;
int new_value = 100;
if (shared_value.compare_exchange_strong(expected_value, new_value)) {
std::cout << "Thread successfully updated shared value to " << shared_value.load() << "\n";
} else {
std::cout << "Thread failed to update shared value. Current value: " << shared_value.load() << "\n";
}
}
int main() {
std::thread t1(compare_and_swap_task);
std::thread t2(compare_and_swap_task);
t1.join();
t2.join();
return 0;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
在这里,compare_exchange_strong()
函数以原子方式将shared_value
的当前值与expected_value
进行比较。如果值匹配,shared_value
将被更新为new_value
。如果另一个线程已经更新了shared_value
,操作将失败,线程可以重试或采取其他措施。
通过使用原子操作,我们展示了原子计数器如何允许多个线程在无需互斥锁的情况下同时递增共享变量。我们还探索了像比较并交换这样的高级原子操作,这些操作有助于设计无锁算法和数据结构。
# 使用Future)、Promise和Task协调线程
# Future、Promise和Task
Future、Promise和Task提供了高级抽象,使线程能够有效地通信,实现异步操作的清晰同步。这些结构提供了一种灵活的方式来处理异步计算,使协调不同线程并检索其结果变得更加容易。Future和Promise以互补的方式协同工作。Promise用于设置一个将由线程或其他Task计算的值。然后,相应的Future会访问这个值,Future会等待结果可用。一旦计算完成,Future就会检索结果,从而实现不同线程或Task之间的同步。
在异步编程中,Future代表一个在未来某个时刻会可用的值。它提供了一种访问异步操作结果的方式。另一方面,Promise是在异步操作完成后设置Future值的对应部分。当一个线程或函数完成其Task时,它会实现Promise,使结果对Future可用。
C++标准库提供了以下关键结构:
std::future
:在异步操作完成后检索其结果。std::promise
:允许线程设置一个将由Future检索的结果。std::async
:异步启动一个Task,并返回一个Future,以便稍后访问结果。std::packaged_task
:包装一个函数或可调用对象,使其能够异步运行,并且可以通过Future访问其结果。
我们将在示例程序中应用这些概念,协调线程执行异步Task。我们将展示Future和Promise如何帮助同步不同线程,同时确保在需要时结果可用,而不会造成阻塞或引入不必要的延迟。
# 示例程序:使用Future和Promise进行异步计算
考虑这样一个场景:多个线程执行计算的不同部分,并且这些计算的结果需要在最后收集并同步。我们可以使用Future和Promise,如下所示来干净利落地实现这一点:
#include <iostream>
#include <thread>
#include <future>
#include <vector>
// 一个执行计算并通过Promise返回结果的函数
void compute_value(std::promise<int>&& promise, int value) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
promise.set_value(value * value);
}
int main() {
const int num_threads = 4;
std::vector<std::future<int>> futures;
std::vector<std::promise<int>> promises(num_threads);
// 生成线程以异步执行计算
for (int i = 0; i < num_threads; ++i) {
// 将Promise移动到线程中并获取相应的Future
futures.push_back(promises[i].get_future());
std::thread(compute_value, std::move(promises[i]), i + 1).detach();
}
// 等待所有Future准备好并打印结果
for (auto& future : futures) {
std::cout << "Computed value: " << future.get() << "\n";
}
return 0;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
在这里:
- 每个线程都与一个
std::promise
和一个相应的std::future
相关联。Promise用于设置计算结果,而Future用于在计算完成后检索结果。在这里,我们创建了四个Promise,每个Promise都被移动到一个单独的线程中。 compute_value()
函数模拟一些工作(通过睡眠100毫秒),然后将计算结果设置到Promise中。结果是输入值的平方。- 主线程使用
future.get()
等待所有Future准备好。这个调用会阻塞,直到相应的Promise设置了一个值,确保在继续之前结果是可用的。一旦结果准备好,它就会被打印到控制台。
这个简单的示例说明了如何使用Future和Promise来同步多个线程,并在不产生不必要阻塞的情况下检索异步计算的结果。
# 使用std::async
进行高级线程编排
除了promises
(Promise)和futures
(未来对象),C++ 还提供了std::async
函数,它简化了启动异步Task的过程。与promises
不同,std::async
直接返回一个与异步Task结果相关联的future
。
我们将修改前面的示例,使用std::async
,而不是手动管理promises
和线程,如下所示:
#include <iostream>
#include <future>
#include <vector>
// A function that performs a computation asynchronously
int compute_value(int value) {
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Simulate some work
return value * value; // Compute the square and return the result
}
int main() {
const int num_threads = 4;
std::vector<std::future<int>> futures;
// Launch tasks asynchronously using std::async
for(int i = 0; i < num_threads; ++i) {
futures.push_back(std::async(std::launch::async, compute_value, i + 1));
}
// Wait for all futures to be ready and print the results
for (auto& future : futures) {
std::cout << "Computed value: " << future.get() << "\n"; // Wait for the result
}
return 0;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
在上面的程序中,std::async
函数异步启动compute_value()
函数,并返回一个与结果相关联的future
。通过传递std::launch::async
,我们明确请求异步执行(与延迟执行相反,如果使用默认启动策略,可能会发生延迟执行)。
# 组合Futures
、Promises
和Task
在更复杂的场景中,futures
、promises
和Task可以组合起来,构建复杂的工作流程,其中多个异步Task相互依赖。例如,一个异步Task的结果可能被用作另一个Task的输入,依此类推。这可以通过链式调用futures
或使用promises
协调多个Task来实现。
我们将扩展示例程序,演示一个Task的结果用于触发另一个Task执行的场景:
#include <iostream>
#include <thread>
#include <future>
#include <functional>
// A function that performs the first computation asynchronously
int compute_initial(int value) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
// Simulate some work
return value * value;
}
// A function that depends on the result of the first computation
int compute_final(int initial_result) {
std::this_thread::sleep_for(std::chrono::milliseconds(50)); // Simulate additional work
return initial_result + 10;
}
int main() {
// Launch the first task using std::async
std::future<int> initial_future = std::async(std::launch::async, compute_initial, 5);
// Wait for the first task to complete and use its result for the second task
int final_result = compute_final(initial_future.get());
// Output the final result
std::cout << "Final computed value: " << final_result << "\n";
return 0;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
在这里,第一个Taskcompute_initial()
使用std::async
异步执行初始计算。使用future
(initial_future.get()
)检索此Task的结果,它会等待结果准备就绪。
然后,第一个Task的结果被传递给compute_final()
函数,该函数根据结果执行额外的工作。这说明了异步Task如何链接在一起,一个Task的结果作为下一个Task的输入。
Futures
、promises
和Task为编排多线程应用程序中的异步操作提供了强大的工具。通过使用这些结构,我们展示了如何使用futures
和promises
管理异步计算,以及std::async
如何简化异步启动Task的过程。
# 构建无锁数据结构以实现极致性能
# 是什么让数据结构无锁?
无锁数据结构为传统的线程同步机制提供了强大的替代方案。无锁数据结构通过确保多个线程可以同时修改共享数据结构而不会导致数据损坏,从而完全消除了对锁的需求,减少了竞争并提高了性能。这些结构依赖于原子操作,原子操作允许线程以全有或全无的方式执行更新,而不会相互阻塞。
如果多个线程可以在不使用锁的情况下同时对数据结构进行操作,并且任何时候至少有一个线程可以取得进展,则该数据结构被认为是无锁的。换句话说,无锁结构保证系统的进展不会受到任何单个线程的阻碍。这使得它们特别适合高并发场景的系统,如实时系统、数据库和网络应用程序。
# 示例程序:构建无锁栈
在本节中,我们将构建一个简单的无锁数据结构,并展示它如何消除与传统锁定机制相关的开销。具体来说,我们将使用原子操作实现一个无锁栈,允许多个线程同时进行入栈和出栈操作。
我们将从使用std::atomic
和比较并交换(compare-and-swap,CAS)操作构建一个简单的无锁栈开始。栈是一种常见的数据结构,基于后进先出(last in, first out,LIFO)原则操作,元素按相反顺序入栈和出栈。在多线程环境中,如果没有正确同步,多个线程可能会同时进行入栈和出栈操作,从而导致潜在的数据竞争。使用CAS,我们可以确保入栈和出栈操作以原子方式执行,允许多个线程在不锁定的情况下同时修改栈。
以下是用C++实现的无锁栈示例:
#include <iostream>
#include <atomic>
#include <thread>
#include <vector>
template <typename T>
class LockFreeStack {
private:
struct Node {
T data;
Node* next;
Node(T value) : data(value), next(nullptr) {}
};
std::atomic<Node*> head; // Atomic pointer to the head of the stack
public:
LockFreeStack() : head(nullptr) {}
// Push a new value onto the stack
void push(T value) {
Node* new_node = new Node(value);
new_node->next = head.load(); // Set the new node's next pointer to the current head
// Atomically update the head to point to the new node
while (!head.compare_exchange_weak(new_node->next, new_node)) {
// If the head changed, retry with the updated head value
}
}
// Pop a value from the stack
bool pop(T& result) {
Node* old_head = head.load();
// Try to atomically update the head to the next node
while (old_head &&!head.compare_exchange_weak(old_head, old_head->next)) {
// Retry if the head was modified by another thread
}
if (old_head) {
result = old_head->data; // Retrieve the data from the old head
delete old_head; // Free the memory for the old head
return true;
}
return false; // Stack was empty
}
// Check if the stack is empty
bool empty() const {
return head.load() == nullptr;
}
};
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
在上述实现中:
- 要将一个值入栈,我们首先创建一个新节点。新节点的下一个指针设置为指向当前栈顶。使用
compare_exchange_weak()
,我们尝试以原子方式将栈顶指针更新为指向新节点。如果在此期间另一个线程修改了栈顶(即进行了另一次入栈或出栈操作),则使用更新后的栈顶值重试该操作。 - 要从栈中弹出一个值,我们首先加载当前栈顶的值。然后,我们尝试使用
compare_exchange_weak()
将栈顶更新为指向栈中的下一个节点。如果在此期间另一个线程修改了栈顶,则重试该操作。如果栈不为空,则返回栈顶的值,并删除该节点。如果栈为空,则操作返回false
。 - 然后,通过使用原子操作和CAS,我们确保入栈和出栈操作都是无锁的,这意味着线程可以同时对栈进行操作而不会相互阻塞。比较并交换操作确保即使在存在并发更新的情况下,也没有线程会使栈处于不一致的状态。
无锁数据结构提供了显著的性能优势,包括减少竞争、提高可扩展性和降低延迟,使其成为对性能要求苛刻的高并发系统的理想选择。通过掌握无锁编程技术,开发人员可以构建更高效、可扩展和高性能的应用程序。
# 总结
综上所述,本章介绍了用于设计高效、高性能多线程系统的高级线程同步技术和原子操作。首先探讨了死锁,解释了其潜在原因,并提供了诸如资源排序、锁层次结构和定时锁等解决方案,以防止线程陷入相互等待的状态。展示了std::lock()
在安全处理多个锁时的作用,确保在处理共享资源时无死锁执行。
接下来,引入原子操作作为实现无锁同步的一种方法。通过利用原子变量和诸如compare_exchange_strong()
和fetch_add()
等函数,展示了并发算法如何避免传统锁的开销。本章还深入探讨了使用futures
、promises
和Task编排异步操作。介绍了std::async
结构,它是一种以最小的工作量管理异步Task的高级方法,实现了简单而强大的线程协调。最后,通过实现无锁栈探索了无锁数据结构的构建。这展示了原子操作如何允许多个线程在不使用锁的情况下同时进行数据的入栈和出栈操作,消除了锁定开销并提高了高并发系统的性能。