CppGuide社区 CppGuide社区
首页
  • 🔥最新谷歌C++风格指南(含C++17/20)
  • 🔥C++17详解
  • 🔥C++20完全指南
  • 🔥C++23快速入门
🔥C++面试
  • 第1章 C++ 惯用法与Modern C++篇
  • 第2章 C++开发工具与调试进阶
  • 第3章 C++多线程编程从入门到进阶
  • 第4章 C++网络编程重难点解析
  • 第5章 网络通信故障排查常用命令
  • 第6章 网络通信协议设计
  • 第7章 高性能服务结构设计
  • 第8章 Redis网络通信模块源码分析
  • 第9章 服务其他模块设计
  • 🚀 全部章节.pdf 下载 (opens new window)
  • 🔥C++游戏编程入门(零基础学C++)
  • 🔥使用C++17从零开发一个调试器 (opens new window)
  • 🔥使用C++20从零构建一个完整的低延迟交易系统 (opens new window)
  • 🔥使用C++从零写一个C语言编译器 (opens new window)
  • 🔥从零用C语言写一个Redis
  • leveldb源码分析
  • libevent源码分析
  • Memcached源码分析
  • TeamTalk源码分析
  • 优质源码分享 (opens new window)
  • 🔥远程控制软件gh0st源码分析
  • 🔥Windows 10系统编程
  • 🔥Linux 5.x内核开发与调试 完全指南 (opens new window)
  • TCP源码实现超详细注释版.pdf (opens new window)
  • 高效Go并发编程
  • Go性能调优
  • Go项目架构设计
  • 🔥使用Go从零开发一个数据库
  • 🔥使用Go从零开发一个编译器 (opens new window)
  • 🔥使用Go从零开发一个解释器 (opens new window)
Rust编程指南
  • SQL零基础指南
  • MySQL开发与调试指南
GitHub (opens new window)
首页
  • 🔥最新谷歌C++风格指南(含C++17/20)
  • 🔥C++17详解
  • 🔥C++20完全指南
  • 🔥C++23快速入门
🔥C++面试
  • 第1章 C++ 惯用法与Modern C++篇
  • 第2章 C++开发工具与调试进阶
  • 第3章 C++多线程编程从入门到进阶
  • 第4章 C++网络编程重难点解析
  • 第5章 网络通信故障排查常用命令
  • 第6章 网络通信协议设计
  • 第7章 高性能服务结构设计
  • 第8章 Redis网络通信模块源码分析
  • 第9章 服务其他模块设计
  • 🚀 全部章节.pdf 下载 (opens new window)
  • 🔥C++游戏编程入门(零基础学C++)
  • 🔥使用C++17从零开发一个调试器 (opens new window)
  • 🔥使用C++20从零构建一个完整的低延迟交易系统 (opens new window)
  • 🔥使用C++从零写一个C语言编译器 (opens new window)
  • 🔥从零用C语言写一个Redis
  • leveldb源码分析
  • libevent源码分析
  • Memcached源码分析
  • TeamTalk源码分析
  • 优质源码分享 (opens new window)
  • 🔥远程控制软件gh0st源码分析
  • 🔥Windows 10系统编程
  • 🔥Linux 5.x内核开发与调试 完全指南 (opens new window)
  • TCP源码实现超详细注释版.pdf (opens new window)
  • 高效Go并发编程
  • Go性能调优
  • Go项目架构设计
  • 🔥使用Go从零开发一个数据库
  • 🔥使用Go从零开发一个编译器 (opens new window)
  • 🔥使用Go从零开发一个解释器 (opens new window)
Rust编程指南
  • SQL零基础指南
  • MySQL开发与调试指南
GitHub (opens new window)
  • C++20 完全指南 说明
  • 第1章 比较和<=>运算符
  • 第2章 函数参数的占位符类型
  • 第3章 概念、要求和约束
  • 第4章 概念、需求和约束详解
  • 第5章 标准概念详解
  • 第6章 范围与视图
  • 第7章 范围和视图的实用工具
  • 第8章 视图类型详解
  • 第9章 跨度(Spans)
  • 第10章 格式化输出
  • 第11章 <chrono>中的日期和时区
  • 第12章 std::jthread和停止令牌
    • 12.1 std::jthread的设计动机
      • 12.1.1 std::thread的问题
      • 12.1.2 使用std::jthread
      • 12.1.3 停止令牌和停止回调
      • 12.1.4 停止令牌(Stop Tokens)和条件变量
    • 12.2 停止源(Stop Sources)和停止令牌
      • 12.2.1 停止源和停止令牌详解
      • 停止源详解
      • 停止令牌详解
      • 12.2.2 使用停止回调
      • 停止回调详解
      • 12.2.3 停止令牌的约束和保证
    • 12.3 std::jthread详解
      • 12.3.1 将停止令牌(Stop Tokens)与std::jthread一起使用
      • 将停止令牌与std::jthreads集合一起使用
      • 对多个std::jthreads使用相同的停止令牌
    • 12.4 补充说明
  • 第13章 并发特性
  • 第14章 协程
  • 第15章 协程详解
  • 第16章 模块
  • 第17章 Lambda扩展
  • 第18章 编译期计算
  • 第19章 非类型模板参数(NTTP)扩展
  • 第20章 新的类型特性
  • 第21章 核心语言的小改进
  • 第22章 泛型编程的小改进
  • 第23章 C++标准库的小改进
  • 第24章 已弃用和移除的特性
  • cpp20completeguides
zhangxf
2025-03-20
目录

第12章 std::jthread和停止令牌

# 第12章 std::jthread和停止令牌

C++20引入了一种表示线程的新类型:std::jthread。它解决了std::thread存在的一个严重设计问题,并受益于一种用于发出取消信号的新特性。

本章将介绍异步场景中用于发出取消信号的特性以及新类std::jthread。

# 12.1 std::jthread的设计动机

C++11引入了std::thread类型,它与操作系统提供的线程一一对应。然而,该类型存在一个严重的设计缺陷:它不是一个RAII类型。

让我们看看为什么这是个问题,以及新的线程类型是如何解决这个问题的。

# 12.1.1 std::thread的问题

std::thread要求在其生命周期结束时,如果它表示一个正在运行的线程,要么调用join()(等待线程结束),要么调用detach()(让线程在后台运行)。如果两者都未调用,析构函数会立即导致程序异常终止(在某些系统上会导致核心转储)。因此,以下代码通常是错误的(除非你不关心程序异常终止):

void foo() {
   ...
    // 启动线程,调用task(),并传入name和val作为参数:
    std::thread t{task, name, val};
   ...    // 既未调用t.join(),也未调用t.detach()
}  // 调用std::terminate()
1
2
3
4
5
6

当表示正在运行线程的t的析构函数在未调用join()或detach()的情况下被调用时,程序会调用std::terminate(),进而调用std::abort()。

即使调用join()等待正在运行的线程结束,仍然存在一个严重的问题:

void foo() {
   ...
    // 启动线程,调用task(),并传入name和val作为参数:
    std::thread t{task, name, val};
   ...        // 发生异常时调用std::terminate()
    // 等待任务完成:
    t.join();
   ...
}
1
2
3
4
5
6
7
8
9

这段代码也可能导致程序异常终止,因为如果在foo()函数中,在线程启动和调用join()之间发生异常(或者由于任何其他原因,控制流从未到达对join()的调用),就不会调用t.join()。

正确的编程方式如下:

void foo() {
   ...
    // 启动线程,调用task(),并传入name和val作为参数:
    std::thread t{task, name, val};
    try {
       ...        // 可能会抛出异常
    }
    catch (...) { // 如果发生异常
        // 清理已启动的线程:
        t.join(); // - 等待线程结束(阻塞直到完成)
        throw;    // - 重新抛出捕获到的异常
    }
    // 等待线程结束:
    t.join();
}  ...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

在这里,我们通过确保在离开作用域时调用join()来处理异常,但不解决异常本身。不幸的是,这可能会(永远)阻塞。然而,调用detach()也有问题,因为线程会在程序后台继续运行,占用可能已被销毁的CPU时间和资源。

如果在更复杂的上下文中使用多个线程,问题会变得更糟,导致代码非常糟糕。例如,当只启动两个线程时,你必须编写类似这样的代码:

void foo() {
   ...
    // 启动线程,调用task1(),并传入name和val作为参数:
    std::thread t1{task1, name, val};
    std::thread t2;
    try {
        // 启动线程,调用task2(),并传入name和val作为参数:
        t2 = std::thread{task2, name, val};
       ...
    }
    catch (...) { // 如果发生异常
        // 清理已启动的线程:
        t1.join();             // 等待第一个线程结束
        if (t2.joinable()) {   // 如果第二个线程已启动
            t2.join();         // - 等待第二个线程结束
        }
        throw;                 // 重新抛出捕获到的异常
    }
    // 等待线程结束:
    t1.join();
    t2.join();
   ...
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

一方面,启动第一个线程后,启动第二个线程时可能会抛出异常,因此启动第二个线程必须放在try子句中。另一方面,我们希望在同一作用域中使用并调用两个线程的join()。为了满足这两个要求,我们必须预先声明第二个线程,并在第一个线程的try子句中进行移动赋值。此外,在发生异常时,我们必须检查第二个线程是否已启动,因为对没有关联线程的线程对象调用join()会引发另一个异常。

另一个问题是,对两个线程都调用join()可能会花费大量时间(甚至可能永远阻塞)。请注意,你不能“杀死”已启动的线程。线程不是进程。线程只能通过自身结束或整个程序结束来终止。

因此,在调用join()之前,你应该确保等待的线程会取消其执行。然而,对于std::thread,没有这样的机制。你必须自己实现取消请求和对其的响应。

# 12.1.2 使用std::jthread

std::jthread解决了这些问题。首先,它是一个RAII类型。如果线程是可连接的(“j”代表“joining”),其析构函数会调用join()。因此,上述复杂的代码简单地变成:

void foo() {
   ...
    // 启动线程,调用task1(),并传入name和val作为参数:
    std::jthread t1{task1, name, val};
    // 启动线程,调用task2(),并传入name和val作为参数:
    std::jthread t2{task2, name, val};
   ...
    // 等待线程结束:
    t1.join();
    t2.join();
   ...
}
1
2
3
4
5
6
7
8
9
10
11
12

通过简单地使用std::jthread代替std::thread,程序异常终止的风险不再存在,并且也不需要异常处理。为了尽可能轻松地切换到std::jthread类,该类提供了与std::thread相同的API,包括:

  • 使用相同的头文件<thread>。
  • 调用get_id()时返回std::thread::id(std::jthread::id类型只是一个别名类型)。
  • 提供静态成员hardware_concurrency()。

这意味着:只需将std::thread替换为std::jthread并重新编译,你的代码就会更安全(前提是你之前没有自己实现异常处理)1。 1你可能想知道为什么我们不直接修复std::thread,而是引入新类型std::jthread。原因是向后兼容性。可能有一些应用程序希望在离开正在运行的线程作用域时终止程序。而且对于接下来要讨论的一些新功能,我们也会破坏二进制兼容性。

# 12.1.3 停止令牌和停止回调

std::jthread的功能不止于此:它提供了一种使用停止令牌(stop tokens)发出取消信号的机制,jthread的析构函数在调用join()之前会使用这些停止令牌。然而,线程启动的可调用对象(函数、函数对象或lambda表达式)必须支持这个请求:

  • 如果可调用对象只为所有传入参数提供参数,那么停止请求将被忽略:
void task (std::string s, double value) {
   ...    // join()会等待这段代码结束
}
1
2
3
  • 为了响应停止请求,可调用对象可以添加一个新的可选的第一个参数,类型为std::stop_token,并定期检查是否有停止请求:
void task (std::stop_token st,
    std::string s, double value) {
    while (!st.stop_requested()) {    // 是否有停止请求(例如,由析构函数发出)?
       ...    // 确保我们定期检查
    }
}
1
2
3
4
5
6

这意味着std::jthread提供了一种协作机制来表明线程应该停止运行。它是“协作式”的,因为该机制不会杀死正在运行的线程(杀死线程可能很容易使程序处于损坏状态,因此C++线程根本不支持这种做法)。为了响应停止请求,启动的线程必须将停止令牌声明为额外的第一个参数,并定期使用它来检查是否应该继续运行。

你也可以手动请求已启动的jthread停止。例如:

void foo() {
   ...
    // 启动线程,调用task(),并传入name和val作为参数:
    std::jthread t{task, name, val};
   ...
    if (...) {
        t.request_stop();   // 显式请求task()停止执行
    }
   ...
    // 等待线程结束:
    t.join();
   ...
}
1
2
3
4
5
6
7
8
9
10
11
12
13

此外,还有另一种响应停止请求的方式:你可以为停止令牌注册回调函数,当发出停止请求时,这些回调函数会自动被调用。例如:

void task (std::stop_token st,
    std::string s, double value) {
    std::stop_callback cb{st, [] {
       ...    // 停止请求时被调用
    }};
   ...
}
1
2
3
4
5
6
7

在这种情况下,请求停止执行task()的线程(无论是通过显式调用request_stop()还是由析构函数导致)会调用你注册为停止回调的lambda表达式。请注意,回调函数通常由发出停止请求的线程调用。

在停止回调cb的生命周期结束时,析构函数会自动注销该回调,这样如果之后再发出停止信号,它将不再被调用。你可以通过这种方式注册任意数量的可调用对象(函数、函数对象或lambda表达式)。

请注意,停止机制比乍看起来更灵活:

  • 你可以传递用于请求停止的句柄和用于检查是否有停止请求的令牌。
  • 它支持条件变量,因此发出的停止信号可以中断在那里的等待。
  • 你可以独立于std::jthread使用该机制来请求和检查停止。

线程、其停止源、停止令牌和停止回调之间也没有生命周期限制。存储停止状态的位置在堆上分配。当线程以及使用该状态的最后一个停止源、停止令牌或停止回调被销毁时,停止状态的内存将被释放。

在以下部分,我们将讨论请求停止的机制及其在线程中的应用。之后,我们将讨论底层的停止令牌机制。

# 12.1.4 停止令牌(Stop Tokens)和条件变量

当请求停止时,线程可能会因等待条件变量的通知而被阻塞(这是避免主动轮询的一个重要场景)。停止令牌的回调接口也支持这种情况。你可以使用传递的停止令牌调用条件变量的wait()方法,这样在请求停止时,等待操作会被挂起。注意,由于技术原因,条件变量必须使用std::condition_variable_any类型。

下面的示例展示了如何在条件变量中使用停止令牌:

// lib/stopcv.cpp
#include <iostream>
#include <queue>
#include <thread>
#include <stop_token>
#include <mutex>
#include <condition_variable>

using namespace std::literals;   // 用于持续时间字面量

int main() {
    std::queue<std::string> messages;
    std::mutex messagesMx;
    std::condition_variable_any messagesCV;

    // 启动一个线程,该线程打印队列中出现的消息:
    std::jthread t1{[&](std::stop_token st) {
        while (!st.stop_requested()) {
            std::string msg;
            {
                // 等待下一条消息:
                std::unique_lock lock(messagesMx);
                if (!messagesCV.wait(lock, st, [&] {
                    return!messages.empty();
                })) {
                    return;   // 请求停止
                }
                // 从队列中取出下一条消息:
                msg = messages.front();
                messages.pop();
            }
            // 打印下一条消息:
            std::cout << "msg : " << msg << std::endl;
        }
    }};

    // 存储3条消息,并每次通知一个等待的线程:
    for (std::string s : { "Tic ", "Tac ", "Toe "}) {
        std::scoped_lock lg{messagesMx};
        messages.push(s);
        messagesCV.notify_one();
    }

    // 一段时间后
    // - 存储1条消息并通知所有等待的线程:
    std::this_thread::sleep_for(1s);
    {
        std::scoped_lock lg{messagesMx};
        messages.push("done ");
        messagesCV.notify_all();
    }

    // 一段时间后
    // - 结束程序(请求停止,这会中断wait())
    std::this_thread::sleep_for(1s);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56

我们启动了一个线程,该线程循环等待消息队列不为空,并在有消息时进行打印:

while (!st.stop_requested()) {
    std::string msg;
    {
        // 等待下一条消息:
        std::unique_lock lock(messagesMx);
        if (!messagesCV.wait(lock, st, [&] {
            return!messages.empty();
        })) {
            return;   // 请求停止
        }
        // 从队列中取出下一条消息:
        msg = messages.front();
        messages.pop();
    }
    // 打印下一条消息:
    std::cout << "msg : " << msg << std::endl;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

条件变量messagesCV的类型是std::condition_variable_any:

std::condition_variable_any messagesCV;
1

这使我们能够使用停止令牌调用wait(),通常会传递用于指示停止线程的停止令牌。结果是,等待可能会因为以下两个原因之一而结束:

  • 有通知(队列不再为空)。
  • 请求停止。

wait()的返回值表示条件是否满足。如果返回false,则表明结束等待的原因是请求停止,这意味着我们可以做出相应反应(在这里,我们停止循环)。

表12.1“用于停止令牌的condition_variable_any成员函数”列出了std::condition_variable_any类型针对停止令牌使用锁保护对象lg的新成员函数。

操作 效果
cv.wait(lg, st, pred)
cv.wait_for(lg, dur, st, pred)
cv.wait_until(lg, tp, st, pred)
等待通知,直到pred为true或者st请求停止
最多等待持续时间dur,直到pred为true或者st请求停止
等待直到时间点tp,直到pred为true或者st请求停止

表12.1 用于停止令牌的condition_variable_any成员函数

目前,其他阻塞函数还不支持停止令牌。

# 12.2 停止源(Stop Sources)和停止令牌

C++20不仅为线程提供了停止令牌。这是一种通用机制,用于异步请求停止,并提供多种方式对该请求做出反应。

基本机制如下:

  • C++20标准库允许我们建立一个共享的停止状态。默认情况下,不会发出停止信号。
  • std::stop_source类型的停止源可以在其关联的共享停止状态中请求停止。
  • std::stop_token类型的停止令牌可用于在其关联的共享停止状态中对停止请求做出反应。你可以主动轮询是否有停止请求,或者注册一个std::stop_callback类型的回调函数,该函数会在请求停止时被调用。
  • 一旦发出停止请求,就无法撤回(后续的停止请求没有效果)。
  • 停止源和停止令牌可以复制和移动,以便代码在多个位置发出停止信号或对停止做出反应。复制一个源或令牌的开销相对较小,所以通常按值传递它们以避免任何生命周期问题。

不过,复制的开销不像传递整数值或原始指针那么小,更像是传递共享指针。如果你频繁将它们传递给子函数,最好按引用传递。

  • 该机制是线程安全的,可以在并发情况下使用。停止请求、检查是否有请求停止以及注册或注销回调的操作都经过了适当的同步,并且当最后一个使用者(停止源、停止令牌或停止回调)被销毁时,相关联的共享停止状态会自动被销毁。

下面的示例展示了如何创建停止源和停止令牌:

#include <stop_token>
...
// 创建stop_source和stop_token:
std::stop_source ssrc;   				// 创建一个共享停止状态
std::stop_token stok{ssrc.get_token()}; // 为停止状态创建一个令牌
1
2
3
4
5

第一步是简单地创建stop_source对象,它提供了请求停止的API。构造函数还会创建相关联的共享停止状态。然后,你可以从停止源获取stop_token对象,该对象提供了对停止请求做出反应的API(通过轮询或注册回调)。

然后,你可以将令牌(和 / 或源)传递到不同的位置 / 线程,在可能请求停止的地方和可能对停止做出反应的地方之间建立异步通信。

没有其他方法可以创建具有关联共享停止状态的停止令牌。停止令牌的默认构造函数没有关联的停止状态。

# 12.2.1 停止源和停止令牌详解

让我们详细了解一下停止源、停止令牌和停止回调的API。所有类型都在头文件<stop_token>中声明。

# 停止源详解

表12.2“stop_source类对象的操作”列出了std::stop_source的API。

操作 效果
stop_source s
stop_source s{nostopstate}
stop_source s{s2}
stop_source s{move(s2)}
s.~stop_source()
s = s2
s = move(s2)
s.get_token()
s.request_stop()
s.stop_possible()
s.stop_requested()
s1 == s2
s1 != s2
s1.swap(s2)
swap(s1, s2)
默认构造函数;创建一个具有关联停止状态的停止源
创建一个没有关联停止状态的停止源
复制构造函数;创建一个与s2共享关联停止状态的停止源
移动构造函数;创建一个获取s2关联停止状态的停止源(s2不再有相关联的停止状态)
析构函数;如果这是最后一个使用它的对象,则销毁相关联的共享停止状态
复制赋值;复制赋值s2的状态,使s现在也共享s2的停止状态(s之前的任何停止状态都会被释放)
移动赋值;移动赋值s2的状态,使s现在共享s2的停止状态(s2不再有停止状态,s之前的任何停止状态都会被释放)
返回与关联停止状态对应的停止令牌(如果没有共享的停止状态,则返回没有关联停止状态的停止令牌)
如果关联的停止状态尚未完成,则请求停止(返回是否请求了停止)
返回s是否有相关联的停止状态
返回s是否有请求了停止的相关联停止状态
返回s1和s2是否共享相同的停止状态(或者两者都不共享)
返回s1和s2是否不共享相同的停止状态
交换s1和s2的状态
交换s1和s2的状态

表12.2 stop_source类对象的操作

构造函数通常会在堆上为停止状态分配内存,所有停止令牌和停止回调都会使用该内存。无法使用分配器指定不同的内存位置。

注意,停止源、停止令牌和停止回调之间没有生命周期约束。当最后一个使用此状态的停止源、停止令牌或停止回调被销毁时,停止状态的内存会自动释放。

为了让你能够创建没有关联停止状态的停止源(这可能很有用,因为停止状态需要资源),你可以使用特殊构造函数创建一个停止源,然后再分配一个停止源:

std::stop_source ssrc{std::nostopstate};   // 没有关联的共享停止状态
...
ssrc = std::stop_source{};                 // 分配新的共享停止状态
1
2
3

# 停止令牌详解

表12.3“stop_token类对象的操作”列出了std::stop_token的API。

操作 效果
stop_token t
stop_token t{t2}
stop_token t{move(t2)}
t.~stop_token()
t = t2
t = move(t2)


t.stop_possible()
t.stop_requested()
t1 == t2
t1 != t2
t1.swap(t2)
swap( t1, t2)
stop_token cb{t, f}
默认构造函数;创建一个没有关联停止状态的停止令牌
复制构造函数;创建一个与t2共享关联停止状态的停止令牌
移动构造函数;创建一个获取t2关联停止状态的停止令牌(t2不再有相关联的停止状态)
析构函数;如果这是最后一个使用该关联共享停止状态的对象,则销毁该状态
复制赋值;复制赋值t2的状态,使t现在也共享t2的停止状态(t之前的任何停止状态都会被释放)
移动赋值;移动赋值t2的状态,使t现在共享t2的停止状态(t2不再有停止状态,t之前的任何停止状态都会被释放)
返回t是否有相关联的停止状态,以及是否已经或仍然可以请求停止
返回t是否有相关联的停止状态,并且已经请求了停止
返回t1和t2是否共享相同的停止状态(或者两者都不共享)
返回t1和t2是否不共享相同的停止状态
交换t1和t2的状态
交换t1和t2的状态
将cb注册为t的停止回调,调用f

表12.3 stop_token类对象的操作

注意,stop_possible()返回false表示是否不再可能发生停止。在以下两种情况下它会返回false:

  • 没有关联的停止状态。
  • 存在停止状态,但不再有停止源,并且从未请求过停止。

这可以用于避免为永远不会发生的停止定义反应。

# 12.2.2 使用停止回调

停止回调(stop callback)是RAII类型std::stop_callback的对象。其构造函数会注册一个可调用对象(函数、函数对象或lambda表达式),当为指定的停止令牌(stop token)请求停止时,该可调用对象就会被调用:

void task(std::stop_token st) {
    // 注册临时回调:
    std::stop_callback cb{st, []{
        std::cout << "stop requested\n";
        // ...
    }};
    // ...
}  // 注销回调
1
2
3
4
5
6
7
8

假设我们创建了共享的停止状态,并创建了一个异步场景,其中一个线程可能请求停止,而另一个线程可能运行task()。我们可以使用以下代码创建这种场景:

// 创建带有关联停止状态的stop_source:
std::stop_source ssrc;
// 注册/启动task() 并将相应的停止令牌传递给它:
registerOrStartInBackgound(task, ssrc.get_token());
// ...
1
2
3
4
5

函数registerOrStartInBackgound()可以立即启动task(),或者通过调用std::async()、初始化一个std::thread、调用一个协程,或者注册一个事件处理程序来稍后启动task()。

现在,每当我们请求停止时:

ssrc.request_stop();
1

可能会发生以下情况之一:

  • 如果task()已经启动,其回调已初始化,并且仍在运行,且回调的析构函数尚未被调用,那么注册的可调用对象会在调用request_stop()的线程中立即被调用。request_stop()会阻塞,直到所有注册的可调用对象都被调用。调用顺序未定义。
  • 如果task()尚未启动(或者其回调尚未初始化),request_stop()会更改停止状态,以表明已请求停止,然后返回。如果task()稍后启动并且回调被初始化,那么可调用对象会在初始化回调的线程中立即被调用。回调的构造函数会阻塞,直到可调用对象返回。
  • 如果task()已经完成(或者至少回调的析构函数已经被调用),那么可调用对象将永远不会被调用。回调生命周期的结束表明不再需要调用该可调用对象。

这些场景经过了仔细的同步处理。如果我们正处于初始化stop_callback(以便注册可调用对象)的过程中,上述场景之一将会发生。如果在由于销毁stop_callback而注销可调用对象时请求停止,情况也是如此。如果可调用对象已经被另一个线程启动,析构函数会阻塞,直到可调用对象完成。

对于你的编程逻辑来说,这意味着从你初始化回调的那一刻起,直到其销毁结束,注册的可调用对象都有可能被调用。在构造函数结束之前,回调在初始化线程中运行;之后,它在请求停止的线程中运行。请求停止的代码可能会立即调用注册的可调用对象,也可能稍后调用(如果回调稍后才初始化),或者可能永远不会调用(如果调用回调为时已晚)。

例如,考虑以下程序:

// lib/stop.cpp
#include <iostream>
#include <stop_token>
#include <future>         //for std::async()
#include <thread>         //for sleep_for()
#include <syncstream> //for std::osyncstream
#include <chrono>

using namespace std::literals;   //for duration literals

auto syncOut(std::ostream& strm = std::cout) {
    return std::osyncstream{strm};
}

void task(std::stop_token st, int num) {
    auto id = std::this_thread::get_id();
    syncOut() << "call task( " << num << ")\n";

    // 注册第一个回调:
    std::stop_callback cb1{st, [num, id]{
        syncOut() << "- STOP1 requested in task( " << num
                  << (id == std::this_thread::get_id() ? ")\n" : ") in main thread\n");
    }};
    std::this_thread::sleep_for(9ms);

    // 注册第二个回调:
    std::stop_callback cb2{st, [num, id]{
        syncOut() << "- STOP2 requested in task( " << num
                  << (id == std::this_thread::get_id() ? ")\n" : ") in main thread\n");
    }};
    std::this_thread::sleep_for(2ms);
}

int main() {
    // 创建stop_source和stop_token:
    std::stop_source ssrc;
    std::stop_token stok{ssrc.get_token()};

    // 注册回调:
    std::stop_callback cb{stok, []{
        syncOut() << "- STOP requested in main()\n" << std::flush;
    }};

    // 在后台多次调用task():
    auto fut = std::async([stok] {
        for (int num = 1; num < 10; ++num) {
            task(stok, num);
        }
    });

    // 过一会儿,请求停止:
    std::this_thread::sleep_for(120ms);
    ssrc.request_stop();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54

注意,我们使用同步输出流来确保不同线程的打印语句逐行同步。

例如,输出可能如下:

call task(1)
call task(2)
...
call task(7)
call task(8)
- STOP2 requested in task(8) in main thread
- STOP1 requested in task(8) in main thread
- STOP requested in main()
call task(9)
- STOP1 requested in task(9)
- STOP2 requested in task(9)
1
2
3
4
5
6
7
8
9
10
11

或者也可能是这样:

call task(1)
call task(2)
call task(3)
call task(4)
- STOP2 requested in task(4) in main thread
call task(5)
- STOP requested in main()
- STOP1 requested in task(5)
- STOP2 requested in task(5)
call task(6)
- STOP1 requested in task(6)
- STOP2 requested in task(6)
call task(7)
- STOP1 requested in task(7)
- STOP2 requested in task(7)
...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

或者可能是这样:

call task(1)
call task(2)
call task(3)
call task(4)
- STOP requested in main()
call task(5)
- STOP1 requested in task(5)
- STOP2 requested in task(5)
call task(6)
- STOP1 requested in task(6)
- STOP2 requested in task(6)
...
1
2
3
4
5
6
7
8
9
10
11
12

甚至可能只是:

call task(1)
call task(2)
...
call task(8)
call task(9)
- STOP requested in main()
1
2
3
4
5
6

如果不使用syncOut(),输出的字符甚至可能会交错,因为主线程和运行task()的线程的输出可能会完全混合在一起。

# 停止回调详解

停止回调(stop callback)的类型stop_callback是一个类模板,其API非常有限。实际上,它仅提供一个构造函数用于为停止令牌(stop token)注册一个可调用对象,以及一个析构函数用于注销该可调用对象。复制和移动操作被删除,且没有提供其他成员函数。

模板参数是可调用对象的类型,通常在构造函数初始化时进行推导:

auto  func  =  []  {  ...  };
std::stop_callback  cb{myToken,  func};   // 推导为stop_callback<decltype(func)>
1
2

除了构造函数和析构函数之外,唯一的公共成员是callback_type,它是存储的可调用对象的类型。

构造函数既接受左值(有名称的对象),也接受右值(临时对象或用std::move()标记的对象):

auto  func  =  []  {  ...  };

std::stop_callback cb1{myToken,  func};				// 复制func
std::stop_callback cb2{myToken,  std::move(func)};	// 移动func
std::stop_callback cb3{myToken,  []  {  ...  }};	// 移动lambda表达式
1
2
3
4
5

# 12.2.3 停止令牌的约束和保证

在处理异步上下文中可能出现的多种场景时,处理停止请求的功能相当健壮。然而,它无法避免所有的陷阱。

C++20标准库保证如下几点:

  • 所有对request_stop()、stop_requested()和stop_possible()的调用都是同步的。
  • 回调注册保证是原子操作。如果在另一个线程中有并发调用request_stop(),那么当前线程要么会看到停止请求并立即在当前线程中调用回调,要么另一个线程会看到回调注册,并在从request_stop()返回之前调用回调。
  • 保证在stop_callback的析构函数返回后,其可调用对象不会被调用。
  • 如果回调的可调用对象刚被另一个线程调用,那么回调的析构函数会等待其完成(它不会等待其他可调用对象完成)。

不过,请注意以下约束条件:

  • 回调不应抛出异常。如果其可调用对象的调用因异常而退出,那么会调用std::terminate()。
  • 不要在回调自身的可调用对象中销毁回调。析构函数不会等待回调完成。

# 12.3 std::jthread详解

表“jthread类对象的操作”列出了std::jthread的API。“Diff”列指出了与std::thread相比,行为有修改或新增的成员函数。

操作 效果 Diff
jthread t 默认构造函数;创建一个不可连接(nonjoinable)的线程对象
jthread t{f, ... } 创建一个代表新线程的对象,该线程调用f(可带有其他参数),否则抛出std::system_error
jthread t{rv} 移动构造函数;创建一个新的线程对象,获取rv的状态,并使rv变为不可连接
t.~jthread() 析构函数;如果对象是可连接的,则调用request_stop()和join() 修改
t = rv 移动赋值;将rv的状态移动赋值给t(如果t是可连接的,则调用request_stop()和join()) 修改
t.joinable() 如果t有一个关联线程(即可连接),则返回true
t.join() 等待关联线程完成,并使对象变为不可连接(如果线程不可连接,则抛出std::system_error)
t.detach() 在关联线程继续运行的同时,解除t与线程的关联,并使对象变为不可连接(如果线程不可连接,则抛出std::system_error)
t.request_stop() 请求关联的停止令牌停止 新增
t.get_stop_source() 返回一个用于请求停止的对象 新增
t.get_stop_token() 返回一个用于检查是否有停止请求的对象 新增
t.get_id() 如果可连接,则返回成员类型id的唯一线程ID;否则返回默认构造的ID
t.native_handle() 返回一个特定于平台的成员类型native_handle_type,用于非可移植的线程处理
t1.swap(t2) 交换t1和t2的状态
swap( t1 , t2) 交换t1和t2的状态
hardware_concurrency() 静态函数,给出关于可能的硬件线程数量的提示

表12.4 jthread类对象的操作

请注意,std::thread和std::jthread的成员类型id和native_handle_type是相同的,所以使用decltype(mythread)::id还是std::thread::id都没有关系。这样,你可以在现有代码中直接用std::jthread替换std::thread,而无需更改其他内容。

# 12.3.1 将停止令牌(Stop Tokens)与std::jthread一起使用

除了析构函数会执行线程的join操作之外,std::jthread的主要优势在于它会自动建立用于发出停止信号的机制。为此,启动线程的构造函数会创建一个停止源(stop source),将其存储为线程对象的成员,并在被调用函数将额外的停止令牌(stop token)作为首个参数时,把相应的停止令牌传递给该函数。

你还可以通过线程的成员函数获取停止源和停止令牌:

std::jthread  t1{[]  (std::stop_token  st)  {
   ...
}};
...
foo(t1.get_token());             // 将停止令牌传递给foo()
...
std::stop_source  ssrc{t1.get_stop_source()};
ssrc.request_stop();            // 请求停止t1的停止令牌
1
2
3
4
5
6
7
8

由于get_token()和get_stop_source()按值返回,即使线程分离并在后台运行后,停止源和停止令牌仍可继续使用。

# 将停止令牌与std::jthreads集合一起使用

如果你启动多个jthreads,每个线程都有自己的停止令牌。请注意,这可能会导致停止所有线程所需的时间比预期更长。考虑以下代码:

{
    std::vector<std::jthread>  threads;
    for  (int  i  =  0;  i  <  numThreads;  ++i)  {
        pool.push_back(std::jthread{[&]  (std::stop_token  st)  {
            while  (! st.stop_requested())  {
               ...
            }   
        }});
    }
   ...
}  // 析构函数停止所有线程
1
2
3
4
5
6
7
8
9
10
11

在循环结束时,析构函数停止所有正在运行的线程,其操作类似于以下代码:

for  (auto&  t  :  threads)  {
    t.request_stop();
    t.join();
}
1
2
3
4

这意味着我们总是在向一个线程发出停止信号之前,等待前一个线程结束。

通过在对所有线程调用join()(通过析构函数)之前请求停止所有线程,可以改进这样的代码:

{
    std::vector<std::jthread>  threads;
    for  (int  i  =  0;  i  <  numThreads;  ++i)  {
        pool.push_back(std::jthread{[&]  (std::stop_token  st)  {
            while  (! st.stop_requested())  {
               ...
            }   
        }});
    }
   ...
    // 更好的做法:在开始连接所有线程之前,请求停止所有线程:
    for  (auto&  t  :  threads)  { 
        t.request_stop();
    }
}  // 析构函数停止所有线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

现在,我们首先请求停止所有线程,并且在线程的析构函数调用join()以完成线程之前,线程可能就已经结束了。协程线程池的析构函数就展示了这种技术。

# 对多个std::jthreads使用相同的停止令牌

有时可能需要使用相同的停止令牌请求多个线程停止。这很容易实现。你可以自己创建停止令牌,或者从已启动的第一个线程获取停止令牌,然后将这个停止令牌作为首个参数启动(其他)线程。例如:

// 为所有线程初始化一个公共的停止令牌:
std::stop_source  allStopSource;
std::stop_token  allStopToken{allStopSource.get_token()};
for  (int  i  =  0;  i  <  9;  ++i)  {
    threads.push_back(std::jthread{[]  (std::stop_token  st)  {
       ...
        while  (! st.stop_requested())  {
           ...
        } 
    },
    allStopToken   // 将令牌传递给这个线程
    });
}
1
2
3
4
5
6
7
8
9
10
11
12
13

请记住,可调用对象通常会接收所有传递给它的参数。仅当存在一个额外的停止令牌参数且未传递参数时,才会使用启动线程的内部停止令牌。完整示例请见lib/atomicref.cpp。

# 12.4 补充说明

线程应执行join操作的提议最初由Herb Sutter在http://wg21.link/n3630 (opens new window)中提出,作为对std::thread的一种修正。最终被接受的表述由Nicolai Josuttis、Lewis Baker、Billy O’Neal、Herb Sutter和Anthony Williams在http://wg21.link/p0660r10 (opens new window)中制定。

上次更新: 2025/03/20, 19:44:38
第11章 <chrono>中的日期和时区
第13章 并发特性

← 第11章 <chrono>中的日期和时区 第13章 并发特性→

最近更新
01
C++语言面试问题集锦 目录与说明
03-27
02
第一章 auto与类型推导
03-27
03
第二章 关键字static及其不同用法
03-27
更多文章>
Copyright © 2024-2025 沪ICP备2023015129号 张小方 版权所有
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式