CppGuide社区 CppGuide社区
首页
  • 🔥最新谷歌C++风格指南(含C++17/20)
  • 🔥C++17详解
  • 🔥C++20完全指南
  • 🔥C++23快速入门
🔥C++面试
  • 第1章 C++ 惯用法与Modern C++篇
  • 第2章 C++开发工具与调试进阶
  • 第3章 C++多线程编程从入门到进阶
  • 第4章 C++网络编程重难点解析
  • 第5章 网络通信故障排查常用命令
  • 第6章 网络通信协议设计
  • 第7章 高性能服务结构设计
  • 第8章 Redis网络通信模块源码分析
  • 第9章 服务其他模块设计
  • 🚀 全部章节.pdf 下载 (opens new window)
  • 🔥C++游戏编程入门(零基础学C++)
  • 🔥使用C++17从零开发一个调试器 (opens new window)
  • 🔥使用C++20从零构建一个完整的低延迟交易系统 (opens new window)
  • 🔥使用C++从零写一个C语言编译器 (opens new window)
  • 🔥从零用C语言写一个Redis
  • leveldb源码分析
  • libevent源码分析
  • Memcached源码分析
  • TeamTalk源码分析
  • 优质源码分享 (opens new window)
  • 🔥远程控制软件gh0st源码分析
  • 🔥Windows 10系统编程
  • 🔥Linux 5.x内核开发与调试 完全指南 (opens new window)
  • TCP源码实现超详细注释版.pdf (opens new window)
  • 高效Go并发编程
  • Go性能调优
  • Go项目架构设计
  • 🔥使用Go从零开发一个数据库
  • 🔥使用Go从零开发一个编译器 (opens new window)
  • 🔥使用Go从零开发一个解释器 (opens new window)
Rust编程指南
  • SQL零基础指南
  • MySQL开发与调试指南
GitHub (opens new window)
首页
  • 🔥最新谷歌C++风格指南(含C++17/20)
  • 🔥C++17详解
  • 🔥C++20完全指南
  • 🔥C++23快速入门
🔥C++面试
  • 第1章 C++ 惯用法与Modern C++篇
  • 第2章 C++开发工具与调试进阶
  • 第3章 C++多线程编程从入门到进阶
  • 第4章 C++网络编程重难点解析
  • 第5章 网络通信故障排查常用命令
  • 第6章 网络通信协议设计
  • 第7章 高性能服务结构设计
  • 第8章 Redis网络通信模块源码分析
  • 第9章 服务其他模块设计
  • 🚀 全部章节.pdf 下载 (opens new window)
  • 🔥C++游戏编程入门(零基础学C++)
  • 🔥使用C++17从零开发一个调试器 (opens new window)
  • 🔥使用C++20从零构建一个完整的低延迟交易系统 (opens new window)
  • 🔥使用C++从零写一个C语言编译器 (opens new window)
  • 🔥从零用C语言写一个Redis
  • leveldb源码分析
  • libevent源码分析
  • Memcached源码分析
  • TeamTalk源码分析
  • 优质源码分享 (opens new window)
  • 🔥远程控制软件gh0st源码分析
  • 🔥Windows 10系统编程
  • 🔥Linux 5.x内核开发与调试 完全指南 (opens new window)
  • TCP源码实现超详细注释版.pdf (opens new window)
  • 高效Go并发编程
  • Go性能调优
  • Go项目架构设计
  • 🔥使用Go从零开发一个数据库
  • 🔥使用Go从零开发一个编译器 (opens new window)
  • 🔥使用Go从零开发一个解释器 (opens new window)
Rust编程指南
  • SQL零基础指南
  • MySQL开发与调试指南
GitHub (opens new window)
  • C++20 完全指南 说明
  • 第1章 比较和<=>运算符
  • 第2章 函数参数的占位符类型
  • 第3章 概念、要求和约束
  • 第4章 概念、需求和约束详解
  • 第5章 标准概念详解
  • 第6章 范围与视图
  • 第7章 范围和视图的实用工具
  • 第8章 视图类型详解
  • 第9章 跨度(Spans)
  • 第10章 格式化输出
  • 第11章 <chrono>中的日期和时区
  • 第12章 std::jthread和停止令牌
  • 第13章 并发特性
    • 13.1 使用闩锁和栅栏进行线程同步
      • 13.1.1 闩锁
      • 闩锁详解
    • 13.1.2 屏障
      • 屏障详解
    • 13.2 信号量
      • 13.2.1 使用计数信号量的示例
      • 线程调度并不公平
      • 13.2.2 使用二元信号量的示例
      • 信号量详解
    • 13.3 原子类型的扩展
      • 13.3.1 使用std::atomic_ref<>的原子引用
      • 使用原子引用的示例
      • 原子引用的特性
      • 原子引用详解
      • 13.3.2 原子共享指针
      • 使用原子共享指针的示例
      • 使用原子弱指针的示例
      • 13.3.3 原子浮点型
      • 13.3.4 原子类型的线程同步
      • 原子通知的公平票务系统
      • 13.3.5 std::atomic_flag的扩展
    • 13.4 同步输出流
      • 13.4.1 同步输出流的设计动机
      • 13.4.2 使用同步输出流
      • 13.4.3 对文件使用同步输出流
      • 13.4.4 将同步输出流当作输出流使用
      • 13.4.5 同步输出流的实际应用
    • 13.5 补充说明
  • 第14章 协程
  • 第15章 协程详解
  • 第16章 模块
  • 第17章 Lambda扩展
  • 第18章 编译期计算
  • 第19章 非类型模板参数(NTTP)扩展
  • 第20章 新的类型特性
  • 第21章 核心语言的小改进
  • 第22章 泛型编程的小改进
  • 第23章 C++标准库的小改进
  • 第24章 已弃用和移除的特性
  • cpp20completeguides
zhangxf
2025-03-20
目录

第13章 并发特性

# 第13章 并发特性

上一章介绍了std::jthread和停止令牌(stop tokens),本章将介绍C++20引入的所有其他并发特性:

  • 闩锁(Latches)和栅栏(Barriers)
  • 计数信号量(Counting Semaphores)和二元信号量(Binary Semaphores)
  • 原子类型(Atomic Types)的各种扩展
  • 同步输出流(Synchronized Output Streams)

# 13.1 使用闩锁和栅栏进行线程同步

两种新类型为多线程的异步计算/处理同步提供了新机制:

  • 闩锁允许进行一次性同步,线程可以等待多个任务完成。
  • 栅栏允许对多个线程进行重复同步,当所有线程都完成当前/下一步处理时,你需要做出相应反应。

# 13.1.1 闩锁

闩锁是一种新的并发执行同步机制,支持一次性异步倒计时。从一个初始整数值开始,不同的线程可以以原子操作的方式将该值减到零。当计数器达到零时,所有等待这个倒计时的线程将继续执行。

考虑以下示例:

//lib/latch.cpp

#include <iostream>
#include <array>
#include <thread>
#include <latch>

using namespace std::literals;   // 用于持续时间字面量

void loopOver(char c) {
    // 循环打印字符c
    for (int j = 0; j < c / 2; ++j) {
        std::cout.put(c).flush();
        std::this_thread::sleep_for(100ms);
    }
}

int main() {
    std::array tags{'.', '?', '8', '+', '-'};    // 我们需要执行任务的标签
    // 初始化闩锁,以便在所有任务完成时做出反应
    std::latch allDone{tags.size()};     		 // 用任务数量初始化倒计时
    // 启动两个线程,分别处理每隔一个的标签
    std::jthread t1{[tags, &allDone] {
        for (unsigned i = 0; i < tags.size(); i += 2) {   // 偶数索引
            loopOver(tags[i]);
            // 表示任务已完成
            allDone.count_down();   // 以原子操作方式递减闩锁的计数器
        }
    }};
    std::jthread t2{[tags, &allDone] {
        for (unsigned i = 1; i < tags.size(); i += 2) {   // 奇数索引
            loopOver(tags[i]);
            // 表示任务已完成
            allDone.count_down();   // 以原子操作方式递减闩锁的计数器
        }
    }};
    // 等待所有任务完成
    std::cout << "\nwaiting until all tasks are done\n";
    allDone.wait();                        // 等待闩锁的计数器变为零
    std::cout << "\nall tasks done\n";     // 注意:线程可能仍在运行...
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41

在此示例中,我们启动两个线程(使用std::jthread)来执行一些任务,每个任务处理数组tags中的一个字符。因此,tags的大小定义了任务的数量。主线程会阻塞,直到所有任务完成。为此:

  • 我们用标签/任务的数量初始化一个闩锁:
std::latch allDone{tags.size()};
1
  • 我们让每个任务完成时递减计数器:
allDone.count_down();
1
  • 我们让主线程等待,直到所有任务完成(计数器为零):
allDone.wait();
1

该程序的输出可能如下:

.?
waiting until all tasks are done
.??..??..?..?..??..?..??..?..?..??..?..?..?..?..?..8??88??88??8?8?8+8+88++8+8+8+88++
8+8+8+8+88++8+8+88++8+8-+---------------------
all tasks done
1
2
3
4
5

请注意,主线程不应假设任务完成时所有线程都已结束。线程可能仍在做其他事情,系统可能仍在清理它们。要等待所有线程结束,你必须对两个线程都调用join()。

你也可以使用闩锁在特定点同步多个线程,然后继续执行。不过要注意,每个闩锁只能这样使用一次。这种用法的一个应用场景是,尽可能确保多个线程即使在启动和初始化可能需要一些时间的情况下,也能一起开始实际工作1。

考虑以下示例:

//lib/latchready.cpp
#include <iostream>
#include <array>
#include <vector>
#include <thread>
#include <latch>

using namespace std::literals;   // 用于持续时间字面量

int main() {
    std::size_t numThreads = 10;
    // 初始化闩锁,以便在所有线程都初始化完成后启动它们
    std::latch allReady = 10;     // 用线程数量初始化倒计时

    // 启动numThreads个线程
    std::vector<std::jthread> threads;
    for (int i = 0; i < numThreads; ++i) {
        std::jthread t{[i, &allReady] {
            // 初始化每个线程(模拟花费一些时间)
            std::this_thread::sleep_for(100ms * i);
            // 同步线程,使它们都在这里一起开始
            allReady.arrive_and_wait();
            // 执行线程要做的任何事情(循环打印其索引)
            for (int j = 0; j < i + 5; ++j) {
                std::cout.put(static_cast<char>('0' + i)).flush();
                std::this_thread::sleep_for(50ms);
            }
        }};
        threads.push_back(std::move(t));
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

1感谢Anthony Williams描述了这种场景。

我们启动numThreads个线程(使用std::jthread),这些线程在初始化和启动时需要花费一些时间。为了让它们一起开始执行功能,我们使用一个闩锁来阻塞,直到所有已启动的线程都初始化并启动。为此:

  • 我们用线程数量初始化闩锁:
std::latch allReady{numThreads};
1
  • 我们让每个线程递减计数器,以等待所有线程的初始化完成:
allReady.arrive_and_wait();   // count_down()和wait()
1

该程序的输出可能如下:

86753421098675342019901425376886735241907863524910768352491942538679453876945
876957869786789899
1
2

你可以看到,所有10个线程(每个线程打印其索引)或多或少是一起开始的。如果没有闩锁,输出可能如下:

00101021021321324132435243524365463547635746854768547968579685796587968769876
987987987989898999
1
2

在这里,启动较早的线程已经在运行,而较晚启动的线程还未开始。

# 闩锁详解

std::latch类在头文件<latch>中声明。表“latch类对象的操作”列出了std::latch的API。

操作 效果
latch l{counter}
l.count_down()
l.count_down(val)
l.wait()
l.try_wait()
l.arrive_and_wait()
l.arrive_and_wait(val)
max()
创建一个闩锁,将counter作为倒计时的起始值
原子地递减计数器(如果计数器尚未为0)
原子地将计数器递减val
阻塞,直到闩锁的计数器为0
返回闩锁的计数器是否为0
调用count_down()并等待
调用count_down(val)并等待
静态函数,返回计数器的最大可能值

表13.1 latch类对象的操作

请注意,你不能复制或移动(赋值)闩锁。

还要注意,将容器(std::array除外)的大小作为计数器的初始值是错误的。构造函数接受一个std::ptrdiff_t类型的值,它是有符号的,因此会出现以下情况:

std::latch  l1{10};                // 正确
std::latch  l2{10u};               // 可能会出现警告
std::vector<int>  coll{ ... };
...
std::latch  l3{coll.size()};       // 错误
std::latch  l4  =  coll.size();    // 错误
std::latch  l5(coll.size());       // 正确(不检查窄化转换)
std::latch  l6{int(coll.size())};  // 正确
std::latch  l7{ssize(coll)};       // 正确(见std::ssize())
1
2
3
4
5
6
7
8
9

# 13.1.2 屏障

屏障(barrier)是一种新的用于并发执行的同步机制,它允许你多次同步多个异步任务。在设置初始计数后,多个线程可以对其进行递减操作,并等待计数器达到零。然而,与闩锁不同的是,当计数器达到零时,会调用一个(可选的)回调函数,并且计数器会重新初始化为初始计数。

当多个线程反复共同计算/执行某些操作时,屏障非常有用。每当所有线程完成其任务时,可选的回调函数可以处理结果或新状态,之后异步计算/处理可以继续下一轮。

例如,考虑我们反复使用多个线程来计算多个值的平方根:

// lib/barrier.cpp
#include  <iostream>
#include  <format>
#include  <vector>
#include  <thread>
#include  <cmath>
#include  <barrier>

int main() {
    // 初始化并打印一个浮点值集合:
    std::vector  values{1.0,  2.0,  3.0,  4.0,  5.0,  6.0,  7.0,  8.0};

    // 定义一个lambda函数来打印所有值
    // - 注意:必须声明为noexcept才能用作屏障回调
    auto  printValues  =  [&values]  ()  noexcept{
        for  (auto  val  :  values)  {
            std::cout  <<  std::format( "  {:<7.5} " ,  val);
        }
        std::cout  <<  "\n";
    };

    // 打印初始值:
    printValues();

    // 初始化一个屏障,当所有线程完成计算时打印值:
    // 计数器的初始值
    // 每当计数器为0时调用的回调
    std::barrier  allDone{int(values.size()), printValues};

    // 为每个值初始化一个线程,在循环中计算其平方根:
    std::vector<std::jthread>  threads;
    for  (std::size_t  idx  =  0;  idx  <  values.size();  ++idx)  {
        threads.push_back(std::jthread{[idx,  &values,  &allDone]  {
            // 反复执行:
            for  (int  i  =  0;  i  <  5;  ++i)  {
                // 计算平方根:
                values[idx]  =  std::sqrt(values[idx]);
                // 与其他线程同步以打印值:
                allDone.arrive_and_wait();
            }
        }});
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43

在声明一个浮点值数组后,我们定义了一个函数来打印这些值(使用std::format()进行格式化输出):

// 初始化并打印一个浮点值集合:
std::vector  values{1.0,  2.0,  3.0,  4.0,  5.0,  6.0,  7.0,  8.0};

// 定义一个lambda函数来打印所有值
// - 注意:必须声明为noexcept才能用作屏障回调
auto  printValues  =  [&values]  ()  noexcept{
    for  (auto  val  :  values)  {
        std::cout  <<  std::format( "  {:<7.5} " ,  val);
    }
    std::cout  <<  "\n";
};
1
2
3
4
5
6
7
8
9
10
11

注意,回调函数必须声明为noexcept。

我们的目标是使用多个线程,使每个线程处理一个值。在这种情况下,我们反复计算这些值的平方根。为此:

  • 我们初始化屏障allDone,以便每当所有线程完成下一次计算时打印所有值:
std::barrier  allDone{int(values.size()), printValues};
// 计数器的初始值
// 每当计数器为0时调用的回调
1
2
3

请注意,构造函数应该接受一个有符号整数值。否则,代码可能无法编译。

  • 在循环中,每个线程在计算后递减计数器,并等待计数器达到零(即所有其他线程也发出完成信号):
...
allDone.arrive_and_wait();
1
2
  • 当计数器达到零时,调用回调函数来打印结果。

注意,回调函数由最后将计数器递减为零的线程调用。这意味着在循环中,回调函数由不同的线程调用。

该程序的输出可能如下:

1 	2 	   3 	  4 	   5 	  6 	 7 		8
1 	1.4142 1.7321 2 	   2.2361 2.4495 2.6458 2.8284
1 	1.1892 1.3161 1.4142   1.4953 1.5651 1.6266 1.6818
1 	1.0905 1.1472 1.1892   1.2228 1.251  1.2754 1.2968
1 	1.0443 1.0711 1.0905   1.1058 1.1185 1.1293 1.1388
1 	1.0219 1.0349 1.0443   1.0516 1.0576 1.0627 1.0671
1
2
3
4
5
6

屏障的API还提供了一个将线程从该机制中移除的函数。例如,当循环运行直到收到停止信号时,为了避免死锁,你就需要这个功能。

那么启动线程的代码可能如下:

// 为每个值初始化一个线程,在循环中计算其平方根:
std::vector<std::jthread>  threads;
for  (std::size_t  idx  =  0;  idx  <  values.size();  ++idx)  {
    threads.push_back(std::jthread{[idx,  &values,  &allDone]  (std::stop_token  st)  {
        // 反复执行:
        while  (! st.stop_requested())  {
            // 计算平方根:
            values[idx]  =  std::sqrt(values[idx]);
            // 与其他线程同步以打印值:
            allDone.arrive_and_wait();
        }
        // 将线程从屏障中移除,以便其他线程无需等待:
        allDone.arrive_and_drop();
    }});
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

现在每个线程调用的lambda函数接受一个停止令牌,以便在请求停止时(显式请求或通过调用线程的析构函数)做出反应。如果主线程向这些线程发出停止计算的信号(例如,通过调用threads.clear()),每个线程将自身从屏障中移除非常重要:

allDone.arrive_and_drop();
1

这个调用会递减计数器,并确保下一个初始值也会递减。在下一轮中(其他线程可能仍在运行),屏障将不再等待已移除的线程。

完整示例见lib/barrierstop.cpp。

# 屏障详解

std::barrier类在头文件<barrier>中声明。表“barrier类对象的操作”列出了std::barrier的API。

操作 效果
barrier b{num}
barrier b{num , cb}
b.arrive()
b.arrive(val)
b.wait(arrivalToken)
b.arrive_and_wait()
b.arrive_and_drop()
max()
为num个异步任务创建一个屏障
为num个异步任务创建一个屏障,并将cb作为回调函数
标记一个任务已完成并返回一个到达令牌
标记val个任务已完成并返回一个到达令牌
阻塞,直到所有任务都已完成并且回调函数(如果有)已被调用
标记一个任务已完成,并阻塞直到所有任务都已完成并且回调函数(如果有)已被调用
标记一个任务已完成,并减少要重复执行的任务数量
静态函数,返回num的最大可能值

表13.2 barrier类对象的操作

std::barrier<>是一个类模板,回调函数的类型作为模板参数。通常,该类型由类模板参数推导得出:

void  callback()  noexcept ;   //前向声明
...
std::barrier  b{6,  callback};   // 推导为std::barrier<decltype(callback)>
1
2
3

注意,C++标准要求屏障的回调函数保证不抛出异常。因此,为了具有可移植性,你必须将函数或lambda声明为noexcept。如果没有传递回调函数,则使用特定于实现的类型,代表一个无效果的操作。

调用l.arrive_and_wait();等同于l.wait(l.arrive());。

这意味着arrive()函数返回一个std::barrier::arrival_token类型的到达令牌,它确保屏障知道要等待哪些线程。否则,它无法正确处理arrive_and_drop()。

请注意,你不能复制或移动(赋值)屏障。

还要注意,将容器(std::array除外)的大小作为计数器的初始值是错误的。构造函数接受一个std::ptrdiff_t类型的值,它是有符号的,因此会出现以下情况:

std::barrier  b1{10,  cb};               // 正确
std::barrier  b2{10u,  cb};              // 可能会出现警告
std::vector<int>  coll{ ... };
...
std::barrier  b3{coll.size(),  cb};      // 错误
std::barrier  b4(coll.size(),  cb);      // 正确(不检查窄化转换)
std::barrier  b5{int(coll.size()),  cb}; // 正确
std::barrier  b6{std::ssize(coll),  cb}; // 正确(见std::ssize())
1
2
3
4
5
6
7
8

std::ssize()函数是在C++20中引入的。

# 13.2 信号量

C++20引入了用于处理信号量的新类型。信号量是一种轻量级的同步原语,可用于同步或限制对一个或一组资源的访问。

你可以像使用互斥锁(mutex)一样使用信号量,其优势在于,获得资源访问权限的线程不一定是最初获取该资源访问权的线程。你还可以使用信号量来限制资源的可用性,例如启用和禁用线程池中线程的使用。

C++标准库提供了两种信号量类型:

  • std::counting_semaphore<>,用于将多个资源的使用限制在最大值以内。
  • std::binary_semaphore<>,用于限制单个资源的使用。

# 13.2.1 使用计数信号量的示例

以下程序展示了信号量的一般操作方式:

// lib/semaphore.cpp
#include <iostream>
#include <queue>
#include <chrono>
#include <thread>
#include <mutex>
#include <semaphore>

using namespace std::literals;   //for duration literals

int main()
{
    // 值的队列
    // 用于保护对队列访问的互斥锁
    std::queue<char> values;
    std::mutex valuesMx;

    // 用从'a'到'z'的多个序列初始化队列
    // - 此时无需互斥锁,因为尚无其他线程运行
    for (int i = 0; i < 1000; ++i) {
        values.push(static_cast<char>('a' + (i % ('z' - 'a'))));
    }

    // 创建一个包含numThreads个线程的线程池:
    // - 使用信号量限制其可用性(最初没有线程可用)
    constexpr int numThreads = 10;
    std::counting_semaphore<numThreads> enabled{0};

    // 创建并启动线程池中的所有线程:
    std::vector<std::jthread> pool;
    for (int idx = 0; idx < numThreads; ++idx) {
        std::jthread t{[idx, &enabled, &values, &valuesMx] (std::stop_token st) {
            while (!st.stop_requested()) {
                // 请求线程成为启用的线程之一:
                enabled.acquire();

                // 从队列中获取下一个值:
                char val;
                {
                    std::lock_guard lg{valuesMx};
                    val = values.front();
                    values.pop();
                }

                // 打印该值10次:
                for (int i = 0; i < 10; ++i) {
                    std::cout.put(val).flush();
                    auto dur = 130ms * ((idx % 3) + 1);
                    std::this_thread::sleep_for(dur);
                }

                // 将线程从启用线程集合中移除:
                enabled.release();
            }
        }};
        pool.push_back(std::move(t));
    }

    std::cout << "== 等待2秒(没有线程启用)\n" << std::flush;
    std::this_thread::sleep_for(2s);

    // 启用3个并发线程:
    std::cout << "== 启用3个并行线程\n" << std::flush;
    enabled.release(3);
    std::this_thread::sleep_for(2s);

    // 再启用2个并发线程:
    std::cout << "\n== 再启用2个并行线程\n" << std::flush;
    enabled.release(2);
    std::this_thread::sleep_for(2s);

    // 通常我们会让程序一直运行,但在此处结束程序:
    std::cout << "\n== 停止处理\n" << std::flush;
    for (auto& t : pool) {
        t.request_stop();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77

在这个程序中,我们启动了10个线程,但限制了允许同时运行和处理数据的线程数量。为此,我们用可能允许的最大数量(10)和初始允许的资源数量(零)初始化一个信号量:

constexpr int numThreads = 10;
std::counting_semaphore<numThreads> enabled{0};
1
2

你可能会疑惑为什么要将最大值作为模板参数指定。原因是通过这个编译期的值,库可以决定切换到可能的最有效实现(可能仅对特定值以内的情况提供原生支持,或者如果最大值为1,我们可以使用简化实现)。

在每个线程中,我们使用信号量请求运行权限。我们尝试 “获取” 一个可用资源以启动任务,完成后 “释放” 该资源供其他线程使用:

std::jthread{[&, idx] (std::stop_token st) {
    while (!st.stop_requested()) {
        // 请求线程成为启用的线程之一:
        enabled.acquire();
        // ...
        // 将线程从启用线程集合中移除:
        enabled.release();
    }
}}
1
2
3
4
5
6
7
8
9

最初,我们的线程会被阻塞,因为信号量初始化为零,没有可用资源。然而,之后我们使用信号量允许三个线程并发执行:

// 启用3个并发线程:
enabled.release(3);
1
2

再后来,我们又允许两个线程并发运行:

// 再启用2个并发线程:
enabled.release(2);
1
2

如果你在无法获取资源时想要休眠或执行其他操作,可以使用try_acquire():

std::jthread{[&, idx] (std::stop_token st) {
    while (!st.stop_requested()) {
        // 请求线程成为启用的线程之一:
        if (enabled.try_acquire()) {
            // ...
            // 将线程从启用线程集合中移除:
            enabled.release();
        }
        else {
            // ...
        }
    }
}}
1
2
3
4
5
6
7
8
9
10
11
12
13

你还可以使用try_acquire_for()或try_acquire_until()尝试在有限时间内获取资源:

std::jthread{[&, idx] (std::stop_token st) {
    while (!st.stop_requested()) {
        // 请求线程成为启用的线程之一:
        if (enabled.try_acquire_for(100ms)) {
            // ...
            // 将线程从启用线程集合中移除:
            enabled.release();
        }
    }
}}
1
2
3
4
5
6
7
8
9
10

这使我们能够不时地再次检查停止令牌(stop token)的状态。

# 线程调度并不公平

请注意,线程调度不一定是公平的。因此,主线程在想要结束程序时,可能需要一些时间(甚至永远)才能被调度。在调用线程的析构函数之前请求所有线程停止也是一个好方法。否则,我们会在之前的线程完成后很久才请求停止。

线程调度不公平的另一个后果是,无法保证等待时间最长的线程会被优先处理。通常情况恰恰相反:如果线程调度器已有一个线程正在运行并调用release(),然后它立即调用acquire(),调度器会让该线程继续运行(“很好,无需进行上下文切换”)。因此,我们无法保证因调用acquire()而等待的多个线程中哪一个会被唤醒。有可能总是同一个(或几个)线程被使用。

因此,如果处理顺序很重要,你不应在请求运行权限之前从队列中取出下一个值。

// 如果处理顺序很重要,这样做是错误的:
{
    std::lock_guard lg{valuesMx};
    val = values.front();
    values.pop();
}
enabled.acquire();
// ...
1
2
3
4
5
6
7
8

可能会出现val的值在几个后来读取的值之后才被处理,甚至永远不会被处理的情况。应在读取下一个值之前请求权限:

enabled.acquire();
{
    std::lock_guard lg{valuesMx};
    val = values.front();
    values.pop();
}
// ...
1
2
3
4
5
6
7

出于同样的原因,你无法轻易减少启用线程的数量。确实,你可以尝试调用:

// 将启用的并发线程数量减少一个:
enabled.acquire();
1
2

然而,我们不知道这条语句何时会被处理。由于线程处理不公平,减少启用线程数量的操作可能需要很长时间,甚至永远不会执行。

为了公平地处理队列并对资源限制做出即时反应,你可能需要使用新的原子操作的wait()和notify机制。

# 13.2.2 使用二元信号量的示例

对于信号量,C++20定义了一种特殊类型std::binary_semaphore,它是std::counting_semaphore<1>的别名,因此它只能启用或禁用单个资源的使用。

你也可以将其用作互斥锁(mutex),好处是释放资源的线程不必是先前获取该资源的线程。不过,更典型的应用是一种线程间的信号通知机制。与条件变量不同,你可以多次发出信号。

考虑以下示例:

// lib/semaphorenotify.cpp
#include <iostream>
#include <chrono>
#include <thread>
#include <semaphore>

using namespace std::literals;   // 用于持续时间字面量

int main()
{
    int sharedData;
    std::binary_semaphore dataReady{0};   // 表示有数据需要处理的信号
    std::binary_semaphore dataDone{0};     // 表示处理完成的信号

    // 启动线程按值读取和处理数据:
    std::jthread process{[&](std::stop_token st) {
        while (!st.stop_requested()) {
            // 等待下一个值准备好:
            // - 1秒超时以检查stop_token
            if (dataReady.try_acquire_for(1s)) {
                int data = sharedData;
                // 处理数据:
                std::cout << " [process]  read  " << data << std::endl;
                std::this_thread::sleep_for(data * 0.5s);
                std::cout << " [process]          done " << std::endl;
                // 发出处理完成的信号:
                dataDone.release();
            }
            else {
                std::cout << " [process]  timeout " << std::endl;
            }
        }
    }};

    // 生成一些值:
    for (int i = 0; i < 10; ++i) {
        // 存储下一个值:
        std::cout << " [main]  store  " << i << std::endl;
        sharedData = i;
        // 发出开始处理的信号:
        dataReady.release();
        // 等待处理完成:
        dataDone.acquire();
        std::cout << " [main]  processing  done\n " << std::endl;
    }
    // 循环结束表示停止
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47

我们使用两个二元信号量让一个线程通知另一个线程:

  • 通过dataReady,主线程通知process线程sharedData中有新数据需要处理。
  • 通过dataDone,处理线程通知主线程数据已处理完成。

两个信号量都初始化为0,因此默认情况下,获取信号量的线程会被阻塞。当通知线程调用release()时,获取信号量的线程会被解除阻塞,从而可以做出反应。

该程序的输出类似于以下内容:

[main]  store  0
[process]  read  0
[process]            done
[main]  processing  done
[main]  store  1
[process]  read  1
[process]            done
[main]  processing  done
[main]  store  2
[process]  read  2
[process]            done
[main]  processing  done
[main]  store  3
[process]  read  3
[process]            done
[main]  processing  done
...
[main]  store  9
[process]  read  9
[process]            done
[main]  processing  done
[process]  timeout
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

注意,处理线程使用try_acquire_for()仅在有限时间内尝试获取信号量。返回值表示是否收到通知(获得资源访问权)。这使得线程可以不时检查是否有停止信号(就像主线程结束后那样)。

二元信号量的另一个应用是等待在不同线程中运行的协程结束。

# 信号量详解

类模板std::counting_semaphore<>与std::binary_semaphore(std::counting_semaphore<1>的别名)一起在头文件<semaphore>中声明:

namespace std {
    template<ptrdiff_t least_max_value = implementation-defined  >
    class counting_semaphore;
    using binary_semaphore = counting_semaphore<1>;
}
1
2
3
4
5

表13.3“counting_semaphore<>和binary_semaphore类对象的操作”列出了信号量的API。

注意,信号量不能复制或移动(赋值)。

还要注意,将容器(std::array除外)的大小作为计数器的初始值是错误的。构造函数接受一个std::ptrdiff_t类型的值,它是有符号的,因此会有以下行为:

std::counting_semaphore s1{10};   				// 正确
std::counting_semaphore s2{10u};   				// 可能会出现警告
std::vector<int> coll{ ... };
...
std::counting_semaphore s3{coll.size()};   		// 错误
std::counting_semaphore s4 = coll.size();   	// 错误
std::counting_semaphore s4(coll.size());   		// 正确(不检查窄化转换)
std::counting_semaphore s6{int(coll.size())};   // 正确
std::counting_semaphore s7{std::ssize(coll)};   // 正确(见std::ssize())
1
2
3
4
5
6
7
8
9

std::ssize()函数是在C++20中引入的。

操作 效果
semaphore s{num}
s.acquire()
s.try_acquire()
s.try_acquire_for(dur)
s.try_acquire_until(tp)
s.release()
s.release(num)
max()
创建一个计数器初始值为num的信号量
阻塞直到可以原子地递减计数器(请求多一个资源)
尝试立即原子地递减计数器(请求多一个资源),如果成功则返回true
在持续时间dur内尝试原子地递减计数器(请求多一个资源),如果成功则返回true
尝试直到时间点tp原子地递减计数器(请求多一个资源),如果成功则返回true
原子地递增计数器(启用多一个资源)
原子地将num加到计数器上(启用多num个资源)
静态函数,返回计数器的最大可能值

表13.3 counting_semaphore<>和binary_semaphore类对象的操作

# 13.3 原子类型的扩展

C++20引入了一些新的原子类型(涉及引用和共享指针)以及原子类型的新特性。此外,现在还有std::atomic<char8_t>类型。

# 13.3.1 使用std::atomic_ref<>的原子引用

自C++11起,C++标准库提供了类模板std::atomic<>,为平凡可复制类型提供原子操作的API。

C++20现在引入了类模板std::atomic_ref<>,为平凡可复制的引用类型提供原子操作的API。这使你能够为通常不具备原子性的现有对象提供临时的原子操作API。一种应用场景是,在初始化对象时无需考虑并发问题,而在后续多线程环境中使用该对象时确保原子性操作。

之所以给这个类型起名为atomic_ref,而不是直接为引用类型提供std::atomic<>,是为了让用户清楚该对象可能提供非原子访问,并且其保证比std::atomic<>更弱。

# 使用原子引用的示例

以下程序展示了如何使用原子引用:

// lib/atomicref.cpp
#include <iostream>
#include <array>
#include <algorithm>     // 用于std::fill_n()
#include <vector>
#include <format>
#include <random>
#include <thread>
#include <atomic>        // 用于std::atomic_ref<>

using namespace std::literals;   // 用于持续时间字面量

int main() {
    // 创建并初始化一个包含1000个整数值的数组,初始值为100:
    std::array<int, 1000> values;
    std::fill_n(values.begin(), values.size(), 100);

    // 为所有线程初始化一个通用的停止令牌:
    std::stop_source allStopSource;
    std::stop_token allStopToken{allStopSource.get_token()};

    // 启动多个线程并发地递减数组中的值:
    std::vector<std::jthread> threads;
    for (int i = 0; i < 9; ++i) {
        threads.push_back(std::jthread{
            [&values](std::stop_token st) {
                // 初始化随机数生成引擎以生成索引:
                std::mt19937 eng{std::random_device{}()};
                std::uniform_int_distribution distr{0, int(values.size() - 1)};
                while (!st.stop_requested()) {
                    // 计算下一个索引:
                    int idx = distr(eng);
                    // 对索引对应的值启用原子访问:
                    std::atomic_ref val{values[idx]};
                    // 使用该值:
                    --val;
                    if (val <= 0) {
                        std::cout << std::format("index {} is zero\n", idx);
                    }
                }
            },
            allStopToken   // 传递通用的停止令牌
        });
    }

    // 一段时间后 / 事件发生时请求停止所有线程:
    std::this_thread::sleep_for(0.5s);
    std::cout << "\nSTOP\n";
    allStopSource.request_stop();
   ...
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51

我们首先创建并初始化一个包含1000个整数值的数组,初始化过程未使用原子操作:

std::array<int, 1000> values;
std::fill_n(values.begin(), values.size(), 100);
1
2

然而,之后我们启动了多个线程,并发地递减这些值。关键在于,仅在这种情况下,我们将这些值当作原子整数来使用。为此,我们初始化一个std::atomic_ref<>:

std::atomic_ref val{values[idx]};
1

注意,由于类模板参数推导,我们无需指定所引用对象的类型。

这种初始化的效果是,在使用原子引用时,对值的访问是原子性的:

  • --val会原子性地递减该值。
  • val <= 0会原子性地加载该值并与0进行比较(该表达式在读取值后使用隐式类型转换为基础类型)。

你可能会考虑将后者实现为val.load() <= 0,以表明使用了原子操作接口。

注意,该程序中的不同线程并没有使用相同的atomic_ref<>对象。这是没问题的。std::atomic_ref<>保证,通过为特定对象创建的任何atomic_ref<>对该对象的所有并发访问都是同步的。

# 原子引用的特性

原子引用的头文件同样是<atomic>。与针对原始指针、整型以及(自C++20起)浮点型的std::atomic<>特化类似,原子引用也有相应的特化:

namespace std {
    template<typename T> struct atomic_ref; 	// 主模板
    template<typename T> struct atomic_ref<T*>; // 指针的偏特化
    template<> struct atomic_ref<integralType>; // 整型的全特化
    template<> struct atomic_ref<floatType>; 	// 浮点型的全特化
}
1
2
3
4
5
6

2标准库的实现方式决定了如何确保这一点。如果需要互斥锁或锁机制,可能会使用全局哈希表,在其中为每个被包装对象的地址存储一个关联锁。这与标准库为用户自定义类型实现std::atomic<>的方式并无不同。

与std::atomic<>相比,原子引用有以下限制:

  • 不支持volatile。
  • 被引用的对象可能需要比其底层类型通常所需的对齐方式更高的对齐要求。std::atomic_ref<type>::required_alignment静态成员提供了这个最小对齐值。

与std::atomic<>相比,原子引用有以下扩展:

  • 提供了复制构造函数,用于创建对同一底层对象的另一个引用(不过仅提供了用于赋值底层值的赋值运算符)。
  • 常量性不会传播到被包装的对象。这意味着你可以给const std::atomic_ref<>赋一个新值:
MyType x, y;
const std::atomic_ref cr{x};
cr = y;     // 可行(对于`const std::atomic<>`则不可行)
1
2
3
  • 提供了wait()、notify_one()和notify_all()用于线程同步支持,这与现在所有原子类型所提供的功能一致。

在其他方面,原子引用提供了与std::atomic<>相同的特性:

  • 该类型既提供了带有内存屏障的高级API,也提供了用于禁用内存屏障的低级API。
  • 静态成员is_always_lock_free和非静态成员函数is_lock_free()用于判断原子支持是否是无锁的。这可能仍然取决于所使用的对齐方式。
# 原子引用详解

原子引用提供了与相应原子类型相同的API。对于可平凡复制的类型、指针类型、整型或浮点型T,C++标准库提供的std::atomic_ref<T>具有与std::atomic<T>相同的原子API。原子引用类型也在<atomic>头文件中提供。

这也意味着,通过静态成员is_always_lock_free()或非静态成员函数is_lock_free(),你可以检查一个原子类型在内部是否使用锁来实现原子操作。如果不是,则表明硬件对原子操作提供了原生支持(这是在信号处理程序中使用原子操作的前提条件)。对类型的检查如下所示:

if constexpr(std::atomic<int>::is_always_lock_free) {
   ...
} else {
   ...      // 如果使用锁,则进行特殊处理
}
1
2
3
4
5

对特定对象的检查(其是否无锁可能取决于对齐方式)如下所示:

std::atomic_ref val{values[idx]};
if (val.is_lock_free()) {
   ...
} else {
   ...      // 如果使用锁,则进行特殊处理
}
1
2
3
4
5
6

请注意,对于类型T,即使std::atomic<T>是无锁的,std::atomic_ref<T>也可能不是无锁的。

原子引用所引用的对象可能还需要满足特定于架构的约束条件。例如,该对象可能需要在内存中正确对齐,或者可能不允许驻留在GPU寄存器内存中。对于所需的对齐方式,有一个静态成员:

namespace std {
    template<typename T>
    struct atomic_ref {
        static constexpr size_t required_alignment;
       ...
    };
}
1
2
3
4
5
6
7

该值至少为alignof(T)。不过,例如对于std::complex<double>,为了支持无锁操作,该值可能是2 * alignof(double)。

# 13.3.2 原子共享指针

C++11引入了带有可选原子接口的共享指针。通过atomic_load()、atomic_store()和atomic_exchange()等函数,你可以并发访问共享指针所指向的值。然而,问题在于你仍然可以使用这些共享指针的非原子接口,这会破坏共享指针的所有原子操作的效果。

C++20现在为共享指针和弱指针提供了偏特化:

  • std::atomic<std::shared_ptr<T>>
  • std::atomic<std::weak_ptr<T>>

以前用于共享指针的原子API现在已被弃用。

请注意,原子共享/弱指针并不提供原子原始指针所具备的额外操作。它们提供的API与std::atomic<>通常为可平凡复制类型T提供的API相同。

它们还提供了wait()、notify_one()和notify_all()用于线程同步支持。也支持带有可选择内存序参数的低级原子接口。

# 使用原子共享指针的示例

以下示例展示了如何使用原子共享指针作为共享值链表的头节点。

// lib/atomicshared.cpp
#include <iostream>
#include <thread>
#include <memory> // 现在包含<atomic>
using namespace std::literals; // 用于时长字面量

template<typename T>
class AtomicList {
private:
    struct Node {
        T val;
        std::shared_ptr<Node> next;
    };
    std::atomic<std::shared_ptr<Node>> head;
public:
    AtomicList() = default;
    void insert(T v) {
        auto p = std::make_shared<Node>();
        p->val = v;
        p->next = head;
        while (!head.compare_exchange_weak(p->next, p)) {   // 原子更新
        }
    }
    void print() const {
        std::cout << "HEAD ";
        for (auto p = head.load(); p; p = p->next) {   		// 原子读取
            std::cout << "-> " << p->val;
        }
        std::cout << std::endl;
    }
};

int main() {
    AtomicList<std::string> alist;
    // 用10个线程向链表中填充元素:
    {
        std::vector<std::jthread> threads;
        for (int i = 0; i < 100; ++i) {
            threads.push_back(std::jthread{[&, i]{
                for (auto s : { "hi ", "hey ", "ho ", "last " }) {
                    alist.insert(std::to_string(i) + s);
                    std::this_thread::sleep_for(5ns);
                }
            }});
        }
    } // 等待所有线程完成
    alist.print();     // 打印最终的链表
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48

该程序可能的输出如下:

HEAD->94last->94ho->76last->68last->57last->57ho->60last->72last-> ... ->1hey->1hi
1

与通常的原子操作一样,你也可以这样写:

for (auto p = head.load(); p; p = p->next)
1

而不是:

for (auto p = head.load(); p.load(); p = p->next)
1

# 使用原子弱指针的示例

以下示例展示了原子弱指针的用法:

// lib/atomicweak.cpp
#include <iostream>
#include <thread>
#include <memory>
using namespace std::literals;
// 现在包含<atomic>
// 用于时长字面量

int main() {
    std::atomic<std::weak_ptr<int>> pShared;   // 指向当前共享值的指针(如果存在)
    // 循环设置共享值一段时间:
    std::atomic<bool> done{false};
    std::jthread updates{[&] {
        for (int i = 0; i < 10; ++i) {
            {
                auto sp = std::make_shared<int>(i);
                pShared.store(sp);     // 原子更新
                std::this_thread::sleep_for(0.1s);
            }
            std::this_thread::sleep_for(0.1s);
        }
        done.store(true);
    }};
    // 循环打印共享值(如果有):
    while (!done.load()) {
        if (auto sp = pShared.load().lock()) {   // 原子读取
            std::cout << "shared: " << *sp << "\n";
        } else {
            std::cout << "shared: <no data>\n";
        }
        std::this_thread::sleep_for(0.07s);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

请注意,在这个程序中,我们不必将共享指针设为原子的,因为它仅由一个线程使用。唯一的问题是两个线程会并发更新或使用弱指针。

该程序可能的输出如下:

shared: <no data>
shared: 0
shared: <no data>
shared: 1
shared: <no data>
shared: <no data>
shared: 2
shared: <no data>
shared: <no data>
shared: 3
shared: <no data>
shared: 4
shared: 4
shared: <no data>
shared: 5
shared: 5
shared: <no data>
shared: 6
shared: <no data>
shared: <no data>
shared: 7
shared: <no data>
shared: 8
shared: 8
shared: <no data>
shared: 9
shared: 9
shared: <no data>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

与通常的原子操作一样,你也可以这样写:

pShared = sp;
1

而不是:

pShared.store(sp);
1

# 13.3.3 原子浮点型

std::atomic<>和std::atomic_ref<>现在都为float、double和long double类型提供了全特化。

与针对任意可平凡复制类型的主模板不同,它们提供了额外的原子操作来进行值的加和减3:

  • fetch_add()、fetch_sub()
  • operator+=、operator-=

因此,现在以下操作是可行的:

std::atomic<double> d{0};
...
d += 10.3;     // 自C++20起可行
1
2
3

3与整型的特化不同,整型特化还提供原子支持来进行值的递增/递减以及按位修改操作。

# 13.3.4 原子类型的线程同步

所有原子类型(std::atomic<>、std::atomic_ref<>和std::atomic_flag)现在都提供了一个简单的API,让线程能够阻塞并等待其他线程对其值的改变。

因此,对于一个原子值:

std::atomic<int> aVal{100};
1

或者一个原子引用:

int value = 100;
std::atomic_ref<int> aVal{value};
1
2

你可以定义等待,直到引用的值发生变化:

int lastValue = aVal.load();
aVal.wait(lastValue);   // 除非/直到值发生变化(并收到通知),否则阻塞
1
2

如果引用对象的值与传入的参数不匹配,它会立即返回。否则,它会阻塞,直到对该原子值或引用调用了notify_one()或notify_all():

--aVal;				// 原子性地修改(引用的)值
aVal.notify_all();	// 通知所有等待变化的线程
1
2

然而,与条件变量一样,wait()可能会因为虚假唤醒(即没有调用通知)而结束。因此,在wait()之后,你应该始终再次检查值。

等待特定原子值的代码可能如下所示:

while ((int val = aVal.load()) != expectedVal) {
    aVal.wait(val);
    // 在这里,aVal的值可能改变了,也可能没有改变
}
1
2
3
4

请注意,不能保证你能获取到所有的更新。考虑以下程序:

// lib/atomicwait.cpp
#include <iostream>
#include <thread>
#include <atomic>
using namespace std::literals;

int main() {
    std::atomic<int> aVal{0};

    // 读取线程:
    std::jthread tRead{[&] {
        int lastX = aVal.load();
        while (lastX >= 0) {
            aVal.wait(lastX);
            std::cout << "=> x changed to " << lastX << std::endl;
            lastX = aVal.load();
        }
        std::cout << "READER DONE" << std::endl;
    }};

    // 写入线程:
    std::jthread tWrite{[&] {
        for (int newVal : { 17, 34, 3, 42, -1}) {
            std::this_thread::sleep_for(5ns);
            aVal = newVal;
            aVal.notify_all();
        }
    }};
   ...
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

输出可能是:

=> x changed to 17
=> x changed to 34
=> x changed to 3
=> x changed to 42
=> x changed to -1
READER DONE
1
2
3
4
5
6

或者:

=> x changed to 17
=> x changed to 3
=> x changed to -1
READER DONE
1
2
3
4

或者仅仅是:

READER DONE
1

请注意,通知函数是const成员函数。

# 原子通知的公平票务系统

使用原子wait()和通知的一个应用场景是将它们像互斥锁一样使用。这通常很有意义,因为使用互斥锁的开销可能要大得多。

这里有一个示例,我们使用原子操作来实现对队列中值的公平处理(与使用信号量的不公平版本进行对比)。虽然可能有多个线程在等待,但只有有限数量的线程可以运行。通过使用票务系统,我们确保队列中的元素按顺序处理(这个示例的思路基于Bryce Adelstein Lelbach在2029年CppCon上的演讲《C++20同步库》中的一个例子(见http://youtu.be/Zcqwb3CWqs4?t=1810 (opens new window))):

// lib/atomicticket.cpp
#include <iostream>
#include <queue>
#include <chrono>
#include <thread>
#include <atomic>
#include <semaphore>
using namespace std::literals;

int main() {
    char actChar = 'a';
    std::mutex actCharMx;
    // 字符值从'a'到'z'无限循环
    // 用于访问actChar的互斥锁

    // 使用票务系统限制线程的可用性:
    std::atomic<int> maxTicket{0};     // 最大请求票号
    std::atomic<int> actTicket{0};     // 当前允许的票号

    // 创建并启动一个包含numThreads个线程的线程池:
    constexpr int numThreads = 10;
    std::vector<std::jthread> threads;
    for (int idx = 0; idx < numThreads; ++idx) {
        threads.push_back(std::jthread{[&, idx](std::stop_token st) {
            while (!st.stop_requested()) {
                // 获取下一个字符值:
                char val;
                {
                    std::lock_guard lg{actCharMx};
                    val = actChar++;
                    if (actChar > 'z') actChar = 'a';
                }

                // 请求一张票来处理该值,并等待直到被允许:
                int myTicket{++maxTicket};
                int act = actTicket.load();
                while (act < myTicket) {
                    actTicket.wait(act);
                    act = actTicket.load();
                }

                // 将字符值打印10次:
                for (int i = 0; i < 10; ++i) {
                    std::cout.put(val).flush();
                    auto dur = 20ms * ((idx % 3) + 1);
                    std::this_thread::sleep_for(dur);
                }

                // 完成,因此允许下一张票:
                ++actTicket;
                actTicket.notify_all();
            }
        }});
    }

    // 启用和禁用线程池中的线程:
    auto adjust = [&, oldNum = 0](int newNum) mutable {
        // 启用/禁用票
        // 唤醒等待的线程
        actTicket += newNum - oldNum;
        if (newNum > 0) actTicket.notify_all();
        oldNum = newNum;
    };

    for (int num : {0, 3, 5, 2, 0, 1}) {
        std::cout << "\n====== enable " << num << " threads" << std::endl;
        adjust(num);
        std::this_thread::sleep_for(2s);
    }

    for (auto& t : threads) {     // 请求所有线程停止(离开作用域时join操作完成)
        t.request_stop();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74

每个线程请求下一个字符值,然后请求一张票来处理这个值:

int myTicket{++maxTicket};
1

然后线程等待,直到该票被启用:

int act = actTicket.load();
while (act < myTicket) {
    actTicket.wait(act);
    act = actTicket.load();
}
1
2
3
4
5

每次线程唤醒时,它会再次检查其票是否被启用(actTicket至少等于myTicket)。通过增加actTicket的值并通知所有等待的线程来启用新的票。这会在线程完成一次处理时发生:

++actTicket;
actTicket.notify_all();
1
2

或者当启用新数量的票时发生:

actTicket += newNum - oldNum;			// 启用/禁用票
if (newNum > 0) actTicket.notify_all();	// 唤醒等待的线程
1
2

该示例可能的输出如下:

====== enable 0 threads
====== enable 3 threads
acbabacabacbaacbabacbacdbdbdcdbdcdecdeddcedegfgegfegegfgegfgegfegfhhfhfhhfhfhijhjjhijhji jkjkijkjkijkkliklkilkkilmmlimlmilmmlnmlmnml
====== enable 5 threads
pmpnoqrpropnqpronpqorppnorqppornqsrosnqrsosnrqosrsntqstustvqstuvstqtvwuwtvxwtxuvwtxwtxvu wyxwvyxuwvxwyxuvwyxzxvuyzabyuzbabyzubyabzybczabyzbcabdzbdcazdeedczadedecfafdefcedaefdedf caefgfecghfigfichfgijhcjgijhgkijjkgihjkgjihjkgij
====== enable 2 threads
ijkihkhkhkkllmllmlmlllmllmnnmnnmnnmnnmnmnoopoopoopoopoopqpqqpqpqpqpqqrrqrqrrsrsrrsrsrsts tsts
====== enable 0 threads
ststttttt
====== enable 1 threads
uuuuuuuuuuvvvvvvvvvvwwwwwwwwwwxxxxxxxxxxyyyyyyyyyyzzzzzzzzzzaaaaaaaaaabbbbbbbbbbcccccccc ccddddddddddeeeeeeeeeeffffffffffgggggggggghhhhhhhhhh
1
2
3
4
5
6
7
8
9
10
11

使用同步输出流来确保输出中没有交错的字符。

# 13.3.5 std::atomic_flag的扩展

在C++20之前,没有办法在不设置std::atomic_flag的情况下检查其值,因此C++20添加了全局函数和成员函数来检查当前值:

  • atomic_flag_test(const atomic_flag*) noexcept;
  • atomic_flag_test_explicit(const atomic_flag*, memory_order) noexcept;
  • atomic_flag::test() const noexcept;
  • atomic_flag::test(memory_order) const noexcept;

# 13.4 同步输出流

C++20提供了一种新机制,用于同步多线程对输出流的并发写入操作。

# 13.4.1 同步输出流的设计动机

如果多个线程同时向一个流写入数据,通常需要对输出进行同步:

  • 一般来说,多线程并发向一个流写入数据会导致未定义行为(这是一种数据竞争,在C++中,数据竞争指的是会导致未定义行为的竞态条件)。
  • 虽然支持多个线程并发向标准流(如std::cout)写入数据,但结果往往没什么用,因为不同线程的字符可能会以任意顺序混合。

例如,考虑以下程序:

// lib/concstream.cpp
#include <iostream>
#include <cmath>
#include <thread>

void squareRoots(int num) {
    for (int i = 0; i < num; ++i) {
        std::cout << "squareroot  of  " << i << "  is  "
                  << std::sqrt(i) << "\n";
    }
}

int main() {
    std::jthread t1(squareRoots, 5);
    std::jthread t2(squareRoots, 5);
    std::jthread t3(squareRoots, 5);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

三个线程同时向std::cout写入数据。由于使用的是标准输出流,这种操作是合法的。然而,输出可能会像这样:

squareroot  of  squareroot  of  0  is  0  is  0
0squareroot  of  squareroot  of
01squareroot  of    is    is  1
01  is
1squareroot  of  squareroot  of
12squareroot  of    is    is  2
1 .41421
1 .41421squareroot  of  squareroot  of
23squareroot  of    is    is  3
1 .41421  is  1 .73205
1 .73205squareroot  of  squareroot  of
34squareroot  of    is    is  4
1 .73205  is  2
2squareroot  of
4  is  2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

# 13.4.2 使用同步输出流

通过使用同步输出流,我们现在可以同步多个线程对同一流的并发输出。我们只需要使用用相应输出流初始化的std::osyncstream即可。例如:

// lib/syncstream.cpp
#include <iostream>
#include <cmath>
#include <thread>
#include <syncstream>

void squareRoots(int num) {
    for (int i = 0; i < num; ++i) {
        std::osyncstream coutSync{std::cout};
        coutSync << "squareroot  of  " << i << "  is  "
                 << std::sqrt(i) << "\n";
    }
}

int main() {
    std::jthread t1(squareRoots, 5);
    std::jthread t2(squareRoots, 5);
    std::jthread t3(squareRoots, 5);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

在这里,同步输出缓冲区会将输出与其他同步输出缓冲区的输出进行同步,因此只有在同步输出缓冲区的析构函数被调用时,数据才会被刷新。

结果,输出会像这样:

squareroot  of  0  is  0
squareroot  of  0  is  0
squareroot  of  1  is  1
squareroot  of  0  is  0
squareroot  of  1  is  1
squareroot  of  2  is  1 .41421
squareroot  of  1  is  1
squareroot  of  2  is  1 .41421
squareroot  of  3  is  1 .73205
squareroot  of  2  is  1 .41421
squareroot  of  3  is  1 .73205
squareroot  of  4  is  2
squareroot  of  3  is  1 .73205
squareroot  of  4  is  2
squareroot  of  4  is  2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

现在,三个线程逐行向std::cout写入数据。不过,写入行的具体顺序仍然不确定。你也可以通过如下方式实现循环,得到相同的结果:

for (int i = 0; i < num; ++i) {
    std::osyncstream{std::cout} << "squareroot  of  " << i << "  is  "
                               << std::sqrt(i) << "\n";
}
1
2
3
4

注意,'\n'、std::endl和std::flush都不会立即输出数据。真正起作用的是析构函数。如果我们在循环外部创建同步输出流,那么任何线程的全部输出都会在析构函数被调用时一起打印出来。

不过,现在有一个新的操纵符std::flush_emit,可以在析构函数调用之前输出数据。因此,你也可以像这样创建并初始化同步输出流,然后逐行输出数据:

std::osyncstream coutSync{std::cout};
for (int i = 0; i < num; ++i) {
    coutSync << "squareroot  of  " << i << "  is  "
             << std::sqrt(i) << "\n" << std::flush_emit;
}
1
2
3
4
5

# 13.4.3 对文件使用同步输出流

同步输出流也可以用于文件。考虑以下示例:

// lib/syncfilestream.cpp
#include <fstream>
#include <cmath>
#include <thread>
#include <syncstream>

void squareRoots(std::ostream& strm, int num) {
    std::osyncstream syncStrm{strm};
    for (int i = 0; i < num; ++i) {
        syncStrm << "squareroot  of  " << i << "  is  "
                 << std::sqrt(i) << "\n" << std::flush_emit;
    }
}

int main() {
    std::ofstream fs{"tmp.out "};
    std::jthread t1(squareRoots, std::ref(fs), 5);
    std::jthread t2(squareRoots, std::ref(fs), 5);
    std::jthread t3(squareRoots, std::ref(fs), 5);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

这个程序使用三个并发线程逐行写入程序开头打开的同一个文件。

注意,每个线程都使用自己的同步输出流。不过,它们都必须使用同一个文件流。因此,如果每个线程自己打开文件,程序将无法正常工作。

# 13.4.4 将同步输出流当作输出流使用

同步输出流本身就是一个流。std::osyncstream类派生自std::ostream(确切地说:和流类通常的情况一样,std::basic_osyncstream<>类派生自std::basic_ostream<>)。因此,你也可以像下面这样实现上述程序:

// lib/syncfilestream2.cpp
#include <fstream>
#include <cmath>
#include <thread>
#include <syncstream>

void squareRoots(std::ostream& strm, int num) {
    for (int i = 0; i < num; ++i) {
        strm << "squareroot  of  " << i << "  is  "
             << std::sqrt(i) << "\n" << std::flush_emit;
    }
}

int main() {
    std::ofstream fs{"tmp.out "};
    std::osyncstream syncStrm1{fs};
    std::jthread t1(squareRoots, std::ref(syncStrm1), 5);
    std::osyncstream syncStrm2{fs};
    std::jthread t2(squareRoots, std::ref(syncStrm2), 5);
    std::osyncstream syncStrm3{fs};
    std::jthread t3(squareRoots, std::ref(syncStrm3), 5);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

操纵符std::flush_emit是为通用输出流定义的,这里也可以使用。对于非同步输出流,它没有任何效果。

注意,创建一个同步输出流并将其传递给所有三个线程是行不通的,因为这样多个线程会写入同一个流:

// 未定义行为(多个线程并发写入同一个流):
std::osyncstream syncStrm{fs};
std::jthread t1(squareRoots, std::ref(syncStrm), 5);
std::jthread t2(squareRoots, std::ref(syncStrm), 5);
std::jthread t3(squareRoots, std::ref(syncStrm), 5);
1
2
3
4
5

# 13.4.5 同步输出流的实际应用

自C++20起,我经常使用同步输出流,通过打印语句来 “调试” 多线程程序。我只需要定义以下内容:

#include <iostream>   // 用于std::cout
#include <syncstream> // 用于std::osyncstream

inline auto syncOut(std::ostream& strm = std::cout) {
    return std::osyncstream{strm};
}
1
2
3
4
5
6

有了这个定义,我只需使用syncOut()代替std::cout,就能确保并发输出是逐行写入的。例如:

void foo(std::string name) {
    syncOut() << "calling  foo( " << name
              << ")  in  thread  " << std::this_thread::get_id() << "\n";
   ...
}
1
2
3
4
5

在本书中,我们用这种方法来可视化并发协程的输出。

为了能够关闭这样的调试输出,我有时会这样做:

#include <iostream>
#include <syncstream> // 用于std::osyncstream

constexpr bool debug = true;     // 设为false可禁用输出

inline auto coutDebug() {
    if constexpr (debug) {
        return std::osyncstream{std::cout};
    } else {
        struct devnullbuf : public std::streambuf {
            int_type overflow(int_type c) { // 基本输出原语
                return c;                   // - 不包含任何打印语句
            }
        };

        static devnullbuf devnull;
        return std::ostream{&devnull};
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

# 13.5 补充说明

闩锁(Latches)和屏障(barriers)最初由阿拉斯代尔·麦金托什(Alasdair Mackintosh)在http://wg21.link/n3600 (opens new window)中提出。最终被接受的措辞由布莱斯·阿德尔斯坦·勒尔巴赫(Bryce Adelstein Lelbach)、奥利维尔·吉鲁(Olivier Giroux)、JF·巴斯蒂安(JF Bastien)、德特勒夫·福尔曼(Detlef Vollmann)和大卫·奥尔森(David Olsen)在http://wg21.link/p1135r6 (opens new window)中确定。

信号量(Semaphores)最初由奥利维尔·吉鲁在http://wg21.link/p0514r1中提出。最终被接受的措辞由布莱斯·阿德尔斯坦·勒尔巴赫、奥利维尔·吉鲁、JF·巴斯蒂安、德特勒夫·福尔曼和大卫·奥尔森在http://wg21.link/p1135r6 (opens new window)中确定。

原子引用(Atomic references)最初由H.卡特·爱德华兹(H. Carter Edwards)、汉斯·博姆(Hans Boehm)、奥利维尔·吉鲁和詹姆斯·鲁斯(James Reus)作为std::atomic_view<>在http://wg21.link/p0019r0 (opens new window)中提出。最终被接受的措辞由丹尼尔·桑德兰(Daniel Sunderland)、H.卡特·爱德华兹、汉斯·博姆、奥利维尔·吉鲁、马克·赫门(Mark Hoemmen)、D. 霍尔曼(D. Hollman)、布莱斯·阿德尔斯坦·勒尔巴赫和延斯·毛雷尔(Jens Maurer)在http://wg21.link/p0019r8 (opens new window)中确定。wait()和通知的API最终由大卫·奥尔森在http://wg21.link/p1643r1 (opens new window)中提出并添加。

原子共享指针(Atomic shared pointers)最初由赫布·萨特(Herb Sutter)作为atomic_shared_ptr<>在http://wg21.link/n4058 (opens new window)中提出,随后被纳入并发技术规范(http://wg21.link/n4577 (opens new window))。最终将其作为std::atomic<>的部分特化集成到C++标准中的措辞由阿利斯代尔·梅雷迪思(Alisdair Meredith)在http://wg21.link/p0718r2 (opens new window)中确定。

浮点类型的原子特化(Atomic specializations for floating - point types)最初由H.卡特·爱德华兹、汉斯·博姆、奥利维尔·吉鲁、JF·巴斯蒂安和詹姆斯·鲁斯在http://wg21.link/p0020r0 (opens new window)中提出。最终被接受的措辞由H.卡特·爱德华兹、汉斯·博姆、奥利维尔·吉鲁、JF·巴斯蒂安和詹姆斯·鲁斯在http://wg21.link/p0020r6 (opens new window)中确定。

使用原子类型进行线程同步(Thread synchronization with atomic types)最初由奥利维尔·吉鲁在http://wg21.link/p0514r0 (opens new window)中提出。最终被接受的措辞由布莱斯·阿德尔斯坦·勒尔巴赫、奥利维尔·吉鲁、JF·巴斯蒂安、德特勒夫·福尔曼和大卫·奥尔森在http://wg21.link/p1135r6 (opens new window)、http://wg21.link/p1643r1 (opens new window)和http://wg21.link/p1644r0 (opens new window)中确定。

同步输出流(Synchronized output streams)最初由劳伦斯·克劳尔(Lawrence Crowl)在http://wg21.link/n3750 (opens new window)中提出。最终被接受的措辞由劳伦斯·克劳尔、彼得·索默拉德(Peter Sommerlad)、尼古拉·约苏蒂斯(Nicolai Josuttis)和巴勃罗·哈尔彭(Pablo Halpern)在http://wg21.link/p0053r7 (opens new window)中确定,以及由彼得·索默拉德和巴勃罗·哈尔彭在http://wg21.link/p0753r2 (opens new window)中确定。

上次更新: 2025/03/20, 19:44:38
第12章 std::jthread和停止令牌
第14章 协程

← 第12章 std::jthread和停止令牌 第14章 协程→

最近更新
01
C++语言面试问题集锦 目录与说明
03-27
02
第四章 Lambda函数
03-27
03
第二章 关键字static及其不同用法
03-27
更多文章>
Copyright © 2024-2025 沪ICP备2023015129号 张小方 版权所有
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式