第15章 协程详解
# 第15章 协程详解
在上一章我们学习了协程相关知识,本章将详细讨论协程的几个方面。
# 15.1 协程约束
协程具有以下属性和约束:
- 协程不允许有返回语句。
- 协程不能是
constexpr
或consteval
。 - 协程不能有
auto
或其他占位符类型的返回类型。 main()
函数不能是协程。- 构造函数或析构函数不能是协程。
协程可以是静态的。如果不是构造函数或析构函数,协程可以是成员函数。协程甚至可以是lambda表达式。不过,在这种情况下需要格外小心。
# 15.1.1 协程lambda表达式
协程可以是lambda表达式。我们也可以像下面这样实现第一个协程:
auto coro = [](int max) -> CoroTask {
std::cout << " CORO " << max << " start\n";
for (int val = 1; val <= max; ++val) {
// 打印下一个值:
std::cout << " CORO " << val << "/" << max << "\n";
co_await std::suspend_always{}; // 暂停
}
std::cout << " CORO " << max << " end\n";
};
2
3
4
5
6
7
8
9
不过,请注意,协程lambda表达式不应捕获任何东西1。这是因为lambda表达式是定义函数对象的一种快捷方式,该函数对象在lambda表达式定义的作用域中创建。当离开该作用域时,即使lambda对象已被销毁,协程lambda表达式仍可能被恢复。
下面是一个会导致致命运行时错误的错误代码示例:
auto getCoro() {
string str = " CORO ";
auto coro = [str](int max) -> CoroTask {
std::cout << str << max << " start\n";
for (int val = 1; val <= max; ++val) {
std::cout << str << val << "/" << max << "\n";
co_await std::suspend_always{}; // 暂停
}
std::cout << str << max << " end\n";
};
return coro;
}
auto coroTask = getCoro()(3); // 初始化协程
// 糟糕:lambda对象在此处被销毁
coroTask.resume(); // 致命运行时错误
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
通常情况下,返回按值捕获的lambda表达式不存在生命周期问题。我们可以像这样使用协程lambda表达式:
auto coro = getCoro(); // 初始化协程lambda表达式
auto coroTask = coro(3); // 初始化协程
coroTask.resume(); // 没问题
2
3
我们也可以通过引用将str
传递给getCoro()
,并确保在使用协程期间str
一直有效。
然而,不能将两个初始化合并为一条语句这一事实可能会让人感到意外,并且很容易出现更微妙的生命周期问题。因此,强烈建议在协程lambda表达式中完全不使用捕获。
# 15.2 协程帧和承诺对象
当启动一个协程时,会发生以下三件事:
- 创建一个协程帧(coroutine frame)来存储协程的所有必要数据。这通常发生在堆上。不过,编译器也可以将协程帧放在栈上。如果协程的生命周期在调用者的生命周期内,并且编译器有足够的信息来计算帧的大小,通常就会这样做。
- 协程的所有参数都会被复制到帧中。注意,引用是按引用复制的,而不是复制其值。这意味着引用参数所引用的参数在协程运行期间必须保持有效。建议永远不要将协程参数声明为引用,否则可能会出现具有未定义行为的致命运行时错误。
- 在帧内创建承诺对象(promise object)。其目的是存储协程的状态,并在协程运行时提供用于定制的钩子函数。你可以将这些对象视为“协程状态控制器”(一个控制协程行为并可用于跟踪其状态的对象)。
图15.1展示了这种初始化过程,并说明了在协程运行时会使用承诺对象的哪些定制点。
图15.1 协程帧和承诺对象
# 15.2.1 协程接口、承诺和可等待对象如何交互
我们再来探讨一下可等待对象(awaitables),看看整体的工作流程是如何组织的:
- 对于每一个启动的协程,编译器都会创建一个承诺(promise)对象。
- 这个承诺对象被嵌入到一个协程句柄(coroutine handle)中,然后被放置在协程接口中。也就是说,协程接口通常控制着协程句柄及其承诺对象的生命周期。
- 在挂起时,协程使用可等待对象来控制挂起和恢复时的操作。
下面的程序通过使用一个跟踪协程接口、承诺和等待器,展示了具体的控制流程。跟踪协程接口和承诺的实现如下:
// coro/tracingcoro.hpp
#include <iostream>
#include <coroutine>
#include <exception> // 用于terminate()
// 用于处理简单任务的协程接口
// - 提供resume()来恢复协程
class [[nodiscard]] TracingCoro {
public:
// 原生协程句柄及其承诺类型:
struct promise_type;
using CoroHdl = std::coroutine_handle<promise_type>;
CoroHdl hdl; // 协程句柄
// 用于状态和定制的辅助类型:
struct promise_type {
promise_type() {
std::cout << " PROMISE : constructor\n";
}
~promise_type() {
std::cout << " PROMISE : destructor\n";
}
auto get_return_object() { // 初始化并返回协程接口
std::cout << " PROMISE : get_return_object()\n";
return TracingCoro{CoroHdl::from_promise(*this)};
}
auto initial_suspend() { // 初始挂起点
std::cout << " PROMISE : initial_suspend()\n";
return std::suspend_always{}; // - 延迟启动
}
void unhandled_exception() { // 处理异常
std::cout << " PROMISE : unhandled_exception()\n";
std::terminate(); // - 终止程序
}
void return_void() { // 处理结束或co_return;
std::cout << " PROMISE : return_void()\n";
}
auto final_suspend() noexcept { // 最终挂起点
std::cout << " PROMISE : final_suspend()\n";
return std::suspend_always{}; // - 立即挂起
}
};
// 构造函数和析构函数:
TracingCoro(auto h)
: hdl{h} { // 将协程句柄存储在接口中
std::cout << " INTERFACE : construct\n";
}
~TracingCoro() {
std::cout << " INTERFACE : destruct\n";
if (hdl) {
hdl.destroy(); // 销毁协程句柄
}
}
// 禁止复制和移动:
TracingCoro(const TracingCoro&) = delete;
TracingCoro& operator=(const TracingCoro&) = delete;
// 恢复协程的API
// - 返回是否还有需要处理的内容
bool resume() const {
std::cout << " INTERFACE : resume()\n";
if (!hdl || hdl.done()) {
return false; // 没有(更多)内容需要处理
}
hdl.resume(); // 恢复
return !hdl.done();
}
};
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
我们跟踪:
- 协程接口何时初始化和销毁。
- 协程接口何时恢复协程。
- 每个承诺操作。
跟踪等待器的实现如下:
// coro/tracingawaiter.hpp
#include <iostream>
class TracingAwaiter {
inline static int maxId = 0;
int id;
public:
TracingAwaiter() : id{++maxId} {
std::cout << " AWAITER " << id << " : ==> constructor\n";
}
~TracingAwaiter() {
std::cout << " AWAITER " << id << " : <== destructor\n";
}
// 禁止复制和移动:
TracingAwaiter(const TracingAwaiter&) = delete;
TracingAwaiter& operator=(const TracingAwaiter&) = delete;
//constexpr
bool await_ready() const noexcept {
std::cout << " AWAITER " << id << " : await_ready()\n";
return false; // true: 不(尝试)挂起
}
// 返回类型/值的含义:
// - void: 进行挂起
// - bool: true: 进行挂起
// - handle: 恢复句柄的协程
//constexpr
bool await_suspend(auto) const noexcept {
std::cout << " AWAITER " << id << " : await_suspend()\n";
return false;
}
//constexpr
void await_resume() const noexcept {
std::cout << " AWAITER " << id << " : await_resume()\n";
}
};
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
在这里,我们也对每个操作进行跟踪。
注意,成员函数不能是constexpr
的,因为它们包含输入/输出操作。
协程的实现和使用如下:
// coro/corotrace.cpp
#include "tracingcoro.hpp"
#include "tracingawaiter.hpp"
#include <iostream>
TracingCoro coro(int max) {
std::cout << " START coro( " << max << ")\n";
for (int i = 1; i <= max; ++i) {
std::cout << " CORO : " << i << "/" << max << "\n";
co_await TracingAwaiter{}; // 挂起
std::cout << " CONTINUE coro( " << max << ")\n";
}
std::cout << " END coro( " << max << ")\n";
}
int main() {
// 启动协程:
std::cout << "**** start coro()\n";
auto coroTask = coro(3); // 初始化协程
std::cout << "**** coro() started\n";
// 循环恢复协程,直到它结束:
std::cout << "\n**** resume coro() in loop\n";
while (coroTask.resume()) { // 恢复
std::cout << "**** coro() suspended\n";
...
std::cout << "\n**** resume coro() in loop\n";
}
std::cout << "\n**** coro() loop done\n";
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
该程序的输出如下:
**** start coro()
PROMISE: constructor
PROMISE: get_return_object()
INTERFACE: construct
PROMISE: initial_suspend()
**** coro() started
**** resume coro() in loop
INTERFACE: resume()
START coro(3)
CORO: 1/3
AWAITER1: ==> constructor
AWAITER1: await_ready()
AWAITER1: await_suspend()
**** coro() suspended
**** resume coro() in loop
INTERFACE: resume()
AWAITER1: await_resume()
AWAITER1: <== destructor
CONTINUE coro(3)
CORO: 2/3
AWAITER2: ==> constructor
AWAITER2: await_ready()
AWAITER2: await_suspend()
**** coro() suspended
**** resume coro() in loop
INTERFACE: resume()
AWAITER2: await_resume()
AWAITER2: <== destructor
CONTINUE coro(3)
CORO: 3/3
AWAITER3: ==> constructor
AWAITER3: await_ready()
AWAITER3: await_suspend()
**** coro() suspended
**** resume coro() in loop
INTERFACE: resume()
AWAITER3: await_resume()
AWAITER3: <== destructor
CONTINUE coro(3)
END coro(3)
PROMISE: return_void()
PROMISE: final_suspend()
**** coro() loop done
INTERFACE: destruct
PROMISE: destructor
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
当我们调用一个协程时,首先发生的是创建协程承诺对象。
然后,对创建的承诺对象调用get_return_object()
。这个函数通常会初始化协程句柄,并返回用该句柄初始化的协程接口。为了创建句柄,通常会调用静态成员函数from_promise()
。然后,句柄被传递用于初始化TracingCoro
类型的协程接口。协程接口随后被返回给get_return_object()
的调用者,以便作为协程调用的返回值(在极少数情况下,可能会有其他返回类型)。
接着,调用initial_suspend()
来判断协程是否应该立即挂起(延迟启动),在这个例子中是这种情况。因此,控制流返回给协程的调用者。
之后,main()
函数调用resume()
,它会对协程句柄调用resume()
。这个调用会恢复协程,这意味着协程会处理后续的语句,直到下一个挂起点或结束。
每次挂起时,co_await
会使用一个可等待对象,在这个例子中是TracingAwaiter
类型的等待器。它通过默认构造函数创建。对于等待器,会调用成员函数await_ready()
和await_suspend()
来控制挂起(它们甚至可能拒绝挂起)。由于await_ready()
返回false
且await_suspend()
不返回任何内容,挂起请求被接受。因此,协程的恢复结束,main()
函数继续执行。在下一次恢复时,会调用await_resume()
,协程继续执行。
当协程到达结束或co_return
语句时,会调用处理结束的相应成员函数。首先,根据是否有返回值,调用return_void()
或return_value()
。然后,调用最终挂起的函数final_suspend()
。注意,即使在final_suspend()
内部,协程仍然 “在运行”,这意味着在final_suspend()
内部再次调用resume()
或destroy()
会导致运行时错误。
在协程接口的生命周期结束时,会调用其析构函数,该析构函数会销毁协程句柄。最后,承诺对象被销毁。
作为另一种情况,假设initial_suspend()
返回一个承诺类型,指示立即启动协程而不是初始挂起:
class [[nodiscard]] TracingCoro {
public:
...
struct promise_type {
...
auto initial_suspend() { // 初始挂起点
std::cout << " PROMISE : initial_suspend()\n";
return std::suspend_never{}; // - 立即启动
}
...
};
...
};
2
3
4
5
6
7
8
9
10
11
12
13
在这种情况下,我们会得到以下输出:
**** start coro()
PROMISE: constructor
PROMISE: get_return_object()
INTERFACE: construct
PROMISE: initial_suspend()
START coro(3)
CORO: 1/3
AWAITER1: ==> constructor
AWAITER1: await_ready()
AWAITER1: await_suspend()
**** coro() started
**** resume coro() in loop
INTERFACE: resume()
AWAITER1: await_resume()
AWAITER1: <== destructor
CONTINUE coro(3)
CORO: 2/3
AWAITER2: ==> constructor
AWAITER2: await_ready()
AWAITER2: await_suspend()
**** coro() suspended
**** resume coro() in loop
INTERFACE: resume()
AWAITER2: await_resume()
AWAITER2: <== destructor
CONTINUE coro(3)
CORO: 3/3
AWAITER3: ==> constructor
AWAITER3: await_ready()
AWAITER3: await_suspend()
**** coro() suspended
**** resume coro() in loop
INTERFACE: resume()
AWAITER3: await_resume()
AWAITER3: <== destructor
CONTINUE coro(3)
END coro(3)
PROMISE: return_void()
PROMISE: final_suspend()
**** coro() loop done
INTERFACE: destruct
PROMISE: destructor
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
同样,协程框架会创建承诺对象,并为其调用get_return_object()
,这会初始化协程句柄和协程接口。然而,initial_suspend()
不会挂起。因此,协程会立即开始执行第一条语句,直到第一个co_await
第一次挂起它。这个挂起点就是最初在get_return_object()
中创建的TracingCoro
对象被返回给协程调用者的时刻。在那之后,和之前一样,我们循环恢复协程。
# 15.3 协程承诺(Coroutine Promises)详解
下表列出了在协程句柄的承诺类型中提供的所有操作。
操作 | 效果 |
---|---|
构造函数get_return_object() initial_suspend() yield_value(val) unhandled_exception() return_void() return_value(val) final_suspend() await_transform(...) operator new(sz) operator delete(ptr, sz) get_return_object_on_... ...allocation_failure() | 初始化承诺 定义协程返回的对象(通常是协程接口类型) 初始挂起点(用于让协程延迟启动) 处理 co_yield 产生的值协程内部未处理异常时的反应 处理协程结束或无返回值的 co_return 处理 co_return 的返回值最终挂起点(用于让协程延迟结束) 将 co_await 的值映射到等待者定义协程分配内存的方式 定义协程释放内存的方式 定义内存分配失败时的反应 |
表15.1 协程承诺的特殊成员函数
这些函数有些是必需的,有些取决于协程是否产生中间或最终结果,还有些是可选的。
# 15.3.1 必需的承诺操作
对于协程承诺,以下成员函数是必需的。没有它们,代码将无法编译或会导致未定义行为。
# 构造函数
协程初始化时,由协程框架调用协程承诺的构造函数。
编译器可以使用该构造函数,通过一些参数初始化协程状态。为此,构造函数的签名必须与调用协程时传递的参数匹配。这种技术尤其用于协程特性(coroutine traits)。
# get_return_object()
get_return_object()
由协程框架用于创建协程的返回值。
该函数通常必须返回协程接口,该接口使用协程句柄进行初始化,而协程句柄通常使用静态成员函数from_promise()
进行初始化,from_promise()
本身又使用承诺进行初始化。
例如:
class CoroTask {
public:
// 原生协程句柄及其承诺类型:
struct promise_type;
using CoroHdl = std::coroutine_handle<promise_type>;
CoroHdl hdl;
struct promise_type {
auto get_return_object() { // 初始化并返回协程接口
return CoroTask{CoroHdl::from_promise(*this)};
}
// ...
}
// ...
};
2
3
4
5
6
7
8
9
10
11
12
13
14
15
原则上,get_return_object()
可以有不同的返回类型:
- 典型的做法是
get_return_object()
通过用协程句柄显式初始化来返回协程接口(如上所示)。 - 或者,
get_return_object()
也可以返回协程句柄。在这种情况下,会隐式创建协程接口。这要求接口构造函数不是explicit
的。 目前尚不清楚在这种情况下协程接口具体何时被创建(见http://wg21.link/cwg2563 (opens new window))。因此,在调用initial_suspend()
时,协程接口对象可能存在也可能不存在,如果出现问题,可能会带来麻烦。 因此,你最好避免使用这种方法。 - 在极少数情况下,不返回任何内容并将返回类型指定为
void
甚至可能是有用的。使用协程特性就是一个例子。
# initial_suspend()
initial_suspend()
定义了协程的初始挂起点,主要用于指定协程在初始化后是应该自动挂起(延迟启动)还是立即启动(急切启动)。
该函数针对协程的承诺prm
按如下方式调用:
co_await prm.initial_suspend();
因此,initial_suspend()
应该返回一个等待者。通常,initial_suspend()
返回:
std::suspend_always{}
,如果协程体稍后/延迟启动;std::suspend_never{}
,如果协程体立即/急切启动。
例如:
class CoroTask {
public:
// ...
struct promise_type {
// ...
auto initial_suspend() { // 立即挂起
return std::suspend_always{}; // - 是的,总是挂起
}
// ...
}
// ...
};
2
3
4
5
6
7
8
9
10
11
12
不过,你也可以让急切启动还是延迟启动的决定取决于某些业务逻辑,或者使用此函数为协程体的作用域执行一些初始化操作。
# final_suspend() noexcept
final_suspend()
定义了协程的最终挂起点,与initial_suspend()
类似,它针对协程的承诺prm
按如下方式调用:
co_await prm.final_suspend();
该函数在包含协程体的try
块外部,在调用return_void()
、return_value()
或unhandled_exception()
之后,由协程框架调用。由于final_suspend()
在try
块外部,它必须是noexcept
的。
这个成员函数的名称有点误导人,因为它给人的印象是你也可以在这里返回std::suspend_never{}
,以便在到达协程末尾后强制再进行一次恢复。然而,恢复真正在最终挂起点挂起的协程是未定义行为。对于在此挂起的协程,唯一能做的就是销毁它。
因此,这个成员函数的真正目的是执行一些逻辑,比如发布结果、发出完成信号,或者在其他地方恢复延续的协程。
相反,建议你构建协程,尽可能让它们在这里挂起。一个原因是,这使得编译器更容易确定协程框架的生命周期何时嵌套在协程调用者内部,这使得编译器更有可能省略协程框架的堆内存分配。
因此,除非你有充分的理由不这么做,否则final_suspend()
应该始终返回std::suspend_always{}
。例如:
class CoroTask {
public:
// ...
struct promise_type {
// ...
auto final_suspend() noexcept { // 在末尾挂起
// ...
return std::suspend_always{}; // - 是的,总是挂起
}
// ...
}
// ...
};
2
3
4
5
6
7
8
9
10
11
12
13
# unhandled_exception()
unhandled_exception()
定义了协程体抛出异常时的反应。该函数在协程框架的catch
子句内部被调用。
这里对异常可能的反应有:
- 忽略异常
- 在本地处理异常
- 结束或终止程序(例如,通过调用
std::terminate()
) - 使用
std::current_exception()
存储异常以供以后使用
后面会在单独的小节中讨论实现这些反应的方式。
无论如何,在调用此函数且不结束程序之后,会直接调用final_suspend()
,并且协程会挂起。
如果你在unhandled_exception()
中抛出或重新抛出异常,协程也会挂起。从unhandled_exception()
抛出的任何异常都会被忽略。
# 15.3.2 用于返回或产生值的承诺操作
根据是否使用以及如何使用co_yield
或co_return
,你还需要以下一些承诺操作。
# return_void()或return_value()
你必须实现这两个成员函数中的一个,用于处理返回语句和协程的结束:
return_void()
当协程到达末尾(通过到达其函数体末尾或到达不带参数的co_return
语句)时被调用。 详见第一个协程示例。return_value(Type)
当协程到达带参数的co_return
语句时被调用。传递的参数必须是或必须能转换为指定的类型。 详见带co_return
的协程示例。
如果协程的实现方式使得它有时可能返回值,有时可能不返回值,这是未定义行为。考虑以下示例:
ResultTask<int> coroUB( ... ) {
if ( ... ) {
co_return 42;
}
}
2
3
4
5
这个协程是无效的。不允许同时存在return_void()
和return_value()
。
遗憾的是,如果你只提供return_value(int)
成员函数,这段代码甚至可能编译并运行,而且编译器可能不会发出任何警告。希望这种情况很快会有所改变。
注意,你可以为不同类型(除void
外)重载return_value()
,或者使其成为泛型函数:
struct promise_type {
// ...
void return_value(int val) { // 对int类型的co_yield的反应
// ... // - 处理返回的int
}
void return_value(std::string val) { // 对string类型的co_yield的反应
// ... // - 处理返回的string
}
};
2
3
4
5
6
7
8
9
在这种情况下,协程可以co_return
不同类型的值。
CoroGen coro()
{
int value = 0;
// ...
if ( ... ) {
co_return "ERROR: can't compute value";
}
// ...
co_return value;
}
2
3
4
5
6
7
8
9
10
# yield_value(Type)
当协程到达co_yield
语句时,会调用yield_value(Type)
。有关基本细节,详见带co_yield
的协程示例。
你可以为不同类型重载yield_value()
,或者使其成为泛型函数:
struct promise_type {
// ...
auto yield_value(int val) { // 对int类型的co_yield的反应
// ... // - 处理产生的int
return std::suspend_always{}; // - 挂起协程
}
auto yield_value(std::string val) {// 对string类型的co_yield的反应
// ... // - 处理产生的string
return std::suspend_always{}; // - 挂起协程
}
};
2
3
4
5
6
7
8
9
10
11
在这种情况下,协程可以co_yield
不同类型的值:
CoroGen coro()
{
while ( ... ) {
if ( ... ) {
co_yield "ERROR: can't compute value";
}
int value = 0;
// ...
co_yield value;
}
}
2
3
4
5
6
7
8
9
10
11
# 15.3.3 可选的承诺操作
承诺(promises)还可用于定义一些可选操作,这些操作定义了协程的特殊行为,在通常情况下会采用某些默认行为。
# await_transform()
await_transform()
可用于将 co_await
的值映射到等待器(awaiters)。
# operator new() 和 operator delete()
operator new()
和 operator delete()
允许程序员为协程状态定义不同的内存分配方式。这些函数也可用于确保协程不会意外使用堆内存。
# get_return_object_on_allocation_failure()
get_return_object_on_allocation_failure()
允许程序员定义在协程内存分配无异常失败时的应对方式。
# 15.4 协程句柄详解
std::coroutine_handle<>
是协程句柄的通用类型。它可用于引用正在执行或已挂起的协程。其模板参数是协程的承诺类型,可用于放置额外的数据成员和行为。
表“std::coroutine_handle<>
的API”列出了为协程句柄提供的所有操作。
操作 | 效果 |
---|---|
coroutine_handle<PrmT>::from_promise(prm) CoroHandleType{} CoroHandleType{nullptr} CoroHandleType{hdl} hdl = hdl2 if (hdl) ==, != <, <=, >, >=, <=> hdl.resume() hdl() hdl.done() hdl.destroy() hdl.promise() hdl.address() coroutine_handle<PrmT>::from_address(addr) | 使用承诺 prm 创建一个句柄 创建一个不指向任何协程的句柄 创建一个不指向任何协程的句柄 复制句柄 hdl (两者都指向同一个协程) 将句柄 hdl2 赋值给 hdl (两者都指向同一个协程) 返回句柄是否指向一个协程 检查两个句柄是否指向同一个协程 在协程句柄之间创建顺序关系 恢复协程的执行 恢复协程的执行 返回一个挂起的协程是否已结束,且不允许再调用 resume() 销毁协程 返回协程的承诺 返回协程数据的内部地址 返回地址 addr 对应的句柄 |
表15.2 std::coroutine_handle<>
的API
静态成员函数 from_promise()
提供了用协程初始化协程句柄的方法。该函数只是将承诺的地址存储在句柄中。如果我们有一个承诺 prm
,代码如下:
auto hdl = std::coroutine_handle<decltype(prm)>::from_promise(prm);
from_promise()
通常在由协程框架创建的承诺的 get_return_object()
中被调用:
class CoroIf {
public:
struct promise_type {
auto get_return_object() { // 初始化并返回协程接口
return CoroIf{std::coroutine_handle<promise_type>::from_promise(*this)};
}
...
};
...
};
2
3
4
5
6
7
8
9
10
当句柄默认初始化或使用 nullptr
作为初始值或赋值时,协程句柄不指向任何协程。在这种情况下,任何转换为布尔值的操作都将返回 false
:
std::coroutine_handle<PrmType> hdl = nullptr;
if (hdl) ... // false
2
复制和赋值协程句柄的开销很小。它们只是复制和赋值内部指针。因此,协程句柄通常按值传递。这也意味着多个协程句柄可以指向同一个协程。作为程序员,你必须确保在另一个协程句柄恢复或销毁协程后,任何句柄都不会调用 resume()
或 destroy()
。
address()
接口返回协程的内部指针,类型为 void*
。这允许程序员将协程句柄导出到其他地方,之后再使用静态函数 from_address()
重新创建一个句柄:
auto hdl = std::coroutine_handle<decltype(prm)>::from_promise(prm);
...
void* hdlPtr = hdl.address() ;
...
auto hdl2 = std::coroutine_handle<decltype(prm)>::from_address(hdlPtr);
hdl == hdl2 // true
2
3
4
5
6
然而,请注意,只有在协程存在时才能使用该地址。协程被销毁后,该地址可能会被另一个协程句柄重新使用。
# 15.4.1 std::coroutine_handle<void>
所有协程句柄类型都可以隐式转换为 std::coroutine<void>
类(在这个声明中可以省略 void
):
namespace std {
template<typename Promise>
struct coroutine_handle {
...
// 隐式转换为coroutine_handle<void>:
constexpr operator coroutine_handle<>() const noexcept ;
...
};
}
2
3
4
5
6
7
8
9
最初的C++标准规定所有协程句柄类型都派生自
std::coroutine<void>
。然而,这一点在http://wg21.link/lwg3460中进行了更改。感谢黄雍翔指出这一点。
因此,如果你不需要承诺,可以始终使用 std::coroutine_handle<void>
类型或不带任何模板参数的 std::coroutine_handle<>
类型。请注意,std::coroutine_handle<>
不提供 promise()
成员函数:
void callResume(std::coroutine_handle<> h) {
h.resume(); // 没问题
h.promise(); // 错误:h没有提供promise()成员
}
auto hdl = std::coroutine_handle<decltype(prm)>::from_promise(prm);
...
callResume(hdl); // 没问题:hdl转换为std::coroutine_handle<void>
2
3
4
5
6
7
8
# 15.5 协程中的异常
当在协程内部抛出一个异常,且该异常未在本地处理时,unhandled_exception()
会在协程体后面的 catch
子句中被调用(见图15.1)。在 initial_suspend()
、yield_value()
、return_void()
、return_value()
或协程中使用的任何可等待对象(awaitable)抛出异常时,也会调用它。
处理这些异常有以下几种选择:
# 忽略异常
在这种情况下,unhandled_exception()
只需一个空函数体:
void unhandled_exception() { }
# 在本地处理异常
在这种情况下,unhandled_exception()
仅处理异常。因为你处于 catch
子句内部,所以必须重新抛出异常并在本地处理它:
void unhandled_exception() {
try {
throw; // 重新抛出捕获到的异常
}
catch (const std::exception& e) {
std::cerr << "EXCEPTION : " << e.what() << std::endl;
}
catch (...) {
std::cerr << "UNKNOWN EXCEPTION " << std::endl;
}
}
2
3
4
5
6
7
8
9
10
11
# 结束或终止程序(例如,通过调用 std::terminate()
)
void unhandled_exception() {
...
std::terminate();
}
2
3
4
# 使用 std::current_exception()
存储异常以供后续使用
在这种情况下,你需要在承诺类型中使用一个异常指针,然后对其进行设置:
struct promise_type {
std::exception_ptr ePtr;
...
void unhandled_exception() {
ePtr = std::current_exception();
}
};
2
3
4
5
6
7
你还必须在协程接口中提供处理异常的方法。例如:
class [[nodiscard]] CoroTask {
...
bool resume() const {
if (!hdl || hdl.done()) {
return false;
}
hdl.promise().ePtr = nullptr; // 尚未发生异常
hdl.resume(); // 恢复执行
if (hdl.promise().ePtr) { // 重新抛出协程中的任何异常
std::rethrow_exception(hdl.promise().ePtr);
}
return !hdl.done();
}
};
2
3
4
5
6
7
8
9
10
11
12
13
14
当然,这些方法可以组合使用。
请注意,在调用 unhandled_exception()
且程序未结束后,协程就处于结束状态。会直接调用 final_suspend()
,协程被挂起。
如果你在 unhandled_exception()
中抛出或重新抛出异常,协程也会被挂起。从 unhandled_exception()
抛出的任何异常都会被忽略。
# 15.6 为协程帧分配内存
协程需要内存来存储其状态。由于协程可能会切换上下文,通常会使用堆内存。本节将讨论这是如何实现的以及如何更改这种方式。
# 15.6.1 协程如何分配内存
协程需要内存来存储其从挂起到恢复期间的状态。然而,由于恢复可能发生在非常不同的上下文环境中,一般来说,内存只能在堆上分配。事实上,C++标准规定: 实现可能需要为协程分配额外的存储。这种存储称为协程状态,通过调用非数组分配函数来获取。
这里的关键在于“可能”这个词。编译器可以优化掉对堆内存的需求。当然,编译器需要足够的信息,并且代码必须相当简单。实际上,在以下情况下最有可能进行优化:
- 协程的生命周期在调用者的生命周期内。
- 使用内联函数,这样编译器至少可以看到所有内容,以便计算帧的大小。
final_suspend()
返回std::suspend_always{}
,否则,生命周期管理会变得过于复杂。
然而,在撰写本章时(2022年3月),只有Clang编译器为协程提供了相应的分配省略优化(Visual C++曾通过/await:heapelide
选项提供优化,但目前似乎已关闭该功能。)。
例如,考虑以下协程:
CoroTask coro(int max) {
for (int val = 1; val <= max; ++val) {
std::cout << "coro( " << max << "): " << val << "\n";
co_await std::suspend_always{};
}
}
CoroTask coroStr(int max, std::string s) {
for (int val = 1; val <= max; ++val) {
std::cout << "coroStr( " << max << " , " << s << "): " << "\n";
co_await std::suspend_always{};
}
}
2
3
4
5
6
7
8
9
10
11
12
13
第二个协程多了一个字符串参数。假设我们只是给每个参数传递一些临时对象:
coro(3); // 创建并销毁临时协程
coroStr(3, "hello "); // 创建并销毁临时协程
2
如果没有优化并跟踪分配情况,我们可能会得到类似以下的输出:
::new #1 (36 Bytes) => 0x8002ccb8
::delete (no size) at 0x8002ccb8
::new #2 (60 Bytes) => 0x8004cd28
::delete (no size) at 0x8004cd28
2
3
4
5
这里,第二个协程由于额外的字符串参数多需要24字节。
完整示例请见coro/coromem.cpp
,该示例使用coro/tracknew.hpp
来查看何时分配或释放堆内存。
# 15.6.2 避免堆内存分配
协程的承诺类型允许程序员更改协程的内存分配方式。你只需提供以下成员:
void* operator new(std::size_t sz)
void operator delete(void* ptr, std::size_t sz)
# 确保不使用堆内存
首先,new()
和delete()
运算符可用于确定协程是否在堆上分配内存。只需声明operator new
而不实现它即可6:
class CoroTask {
...
public:
struct promise_type {
...
// 确定是否分配堆内存:
void* operator new(std::size_t sz); // 已声明,但未实现
};
...
};
2
3
4
5
6
7
8
9
10
6感谢Lewis Baker指出这一点。
# 让协程不使用堆内存
这些运算符的另一个用途是更改协程分配内存的方式。例如,你可以将对堆内存的调用映射到程序栈上或数据段中已分配的内存。
以下是一个具体示例:
[`coro/corotaskpmr.hpp`]
#include <coroutine>
#include <exception> // 用于terminate()
#include <cstddef> // 用于std::byte
#include <array>
#include <memory_resource>
// 处理简单任务的协程接口
// - 提供resume()来恢复协程
class [[nodiscard]] CoroTaskPmr {
// 为所有协程提供200k字节的内存
inline static std::array<std::byte, 200'000> buf;
inline static std::pmr::monotonic_buffer_resource
monobuf{buf.data(), buf.size(), std::pmr::null_memory_resource()};
inline static std::pmr::synchronized_pool_resource mempool{&monobuf};
public:
struct promise_type;
using CoroHdl = std::coroutine_handle<promise_type>;
private:
CoroHdl hdl; // 原生协程句柄
public:
struct promise_type {
auto get_return_object() { // 初始化并返回协程接口
return CoroTaskPmr{CoroHdl::from_promise(*this)};
}
auto initial_suspend() {
return std::suspend_always{};
}
void unhandled_exception() {
std::terminate();
}
void return_void() { }
auto final_suspend() noexcept {
return std::suspend_always{};
}
// 初始挂起点
// - 立即挂起
// 处理异常
// - 终止程序
// 处理结束或co_return
// 最终挂起点
// - 立即挂起
// 定义内存分配方式:
void* operator new(std::size_t sz) {
return mempool.allocate(sz);
}
void operator delete(void* ptr, std::size_t sz) {
mempool.deallocate(ptr, sz);
}
};
// 构造函数和析构函数:
CoroTaskPmr(auto h) : hdl{h} { }
~CoroTaskPmr() { if (hdl) hdl.destroy(); }
// 不支持复制或移动:
CoroTaskPmr(const CoroTaskPmr&) = delete;
CoroTaskPmr& operator=(const CoroTaskPmr&) = delete;
// 恢复协程的API
// - 返回是否还有内容需要处理
bool resume() const {
if (!hdl || hdl.done()) {
return false; // 没有(更多)内容需要处理
}
hdl.resume(); // 恢复(阻塞直到再次挂起或结束)
return !hdl.done();
}
};
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
这里,我们使用了多态内存资源(Polymorphic Memory Resources),这是C++17引入的一项功能,通过提供标准化的内存池来简化内存管理。在这种情况下,我们将200千字节的数据段传递给内存池monobuf
。使用null_memory_resource()
作为后备选项,确保在内存不足时抛出std::bad_alloc
异常。在此之上,我们创建了同步内存池mempool
,它可以减少内存碎片7:
// 为所有协程提供200k字节的内存:
std::array<std::byte, 200’000> buf;
std::pmr::monotonic_buffer_resource
monobuf{buf.data(), buf.size(), std::pmr::null_memory_resource()};
std::pmr::synchronized_pool_resource mempool{&monobuf};
2
3
4
5
为简单起见,我们将这些对象直接创建为协程接口CoroTaskPmr
的静态内联成员,并提供operator new
和operator delete
,将协程的“堆”内存请求映射到这个内存池:
class [[nodiscard]] CoroTaskPmr {
// 为所有协程提供200k字节的内存:
inline static std::array<std::byte, 200’000> buf;
inline static std::pmr::monotonic_buffer_resource
monobuf{buf.data(), buf.size(), std::pmr::null_memory_resource()};
inline static std::pmr::synchronized_pool_resource mempool{&monobuf};
...
public:
struct promise_type {
...
// 定义内存分配方式:
void* operator new(std::size_t sz) {
return mempool.allocate(sz);
}
void operator delete(void* ptr, std::size_t sz) {
mempool.deallocate(ptr, sz);
}
};
...
};
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
我们可以像往常一样使用这个协程接口:
//coro/coromempmr.cpp
#include <iostream>
#include <string>
#include "corotaskpmr.hpp"
#include "tracknew.hpp"
CoroTaskPmr coro(int max) {
for (int val = 1; val <= max; ++val) {
std::cout << " coro( " << max << "): " << val << "\n";
co_await std::suspend_always{};
}
}
CoroTaskPmr coroStr(int max, std::string s) {
for (int val = 1; val <= max; ++val) {
std::cout << " coroStr( " << max << " , " << s << "): " << "\n";
co_await std::suspend_always{};
}
}
int main() {
TrackNew::trace();
TrackNew::reset();
coro(3);
coroStr(3, "hello ");
auto coroTask = coro(3);
std::cout << "coro() started\n ";
while (coroTask.resume()) {
std::cout << "coro() suspended\n ";
}
std::cout << "coro() done\n ";
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
这些协程不再分配堆内存。
注意,你还可以提供一个特定的operator new
,在大小参数之后接受协程参数。例如,对于接受一个int
和一个字符串的协程,我们可以这样提供:
class CoroTaskPmr {
public:
struct promise_type {
...
void* operator new(std::size_t sz, int , const std::string&) {
return mempool.allocate(sz);
}
};
...
};
2
3
4
5
6
7
8
9
10
# 15.6.3 get_return_object_on_allocation_failure()
如果承诺类型有一个静态成员get_return_object_on_allocation_failure()
,则假定内存分配永远不会抛出异常。默认情况下,这会导致调用:
::operator new(std::size_t sz, std::nothrow_t)
在这种情况下,用户定义的operator new
必须是noexcept
的,并且在失败时返回nullptr
。然后,该函数可用于实现一些变通方法,比如创建一个不引用协程的协程句柄:
class CoroTask {
...
public:
struct promise_type {
...
static auto get_return_object_on_allocation_failure() {
return CoroTask{nullptr};
}
};
...
};
2
3
4
5
6
7
8
9
10
11
# 15.7 co_await
与等待器(Awaiters)详解
既然我们已经介绍了等待器,现在就来详细讨论它们,并查看更多示例。
# 15.7.1 等待器接口的细节
表15.3再次列出了等待器必须提供的关键操作。
操作 | 效果 |
---|---|
构造函数await_ready() await_suspend(awaitHdl) await_resume() | 初始化等待器 返回是否(当前)禁用挂起 处理挂起 处理恢复 |
表15.3 等待器的特殊成员函数 |
让我们详细了解这个等待器接口:
- 构造函数:构造函数允许协程(或等待器创建的任何地方)向等待器传递参数。这些参数可用于影响或改变等待器处理挂起和恢复的方式。例如,用于协程优先级请求的等待器。
bool await_ready()
:在调用该函数的协程即将挂起之前调用此函数。它可用于(暂时)完全关闭挂起。如果返回true
,表示“准备好”立即从挂起请求返回,继续执行协程而不挂起它。通常,这个函数只是返回false
(“不,不要避免/阻止任何挂起”)。然而,它也可能根据条件返回true
(例如,如果挂起取决于某些数据是否可用)。通过await_suspend()
的返回类型,也可以表示不接受协程的挂起(注意,这里true
和false
的含义相反:在await_suspend()
中返回true
表示接受挂起)。使用await_ready()
不接受挂起的好处是,程序可以完全节省启动协程挂起的开销。注意,在这个函数内部,被调用的协程尚未挂起。不要(间接)在这里调用resume()
或destroy()
。只要你能确保逻辑不会对这里挂起的协程调用resume()
或destroy()
,甚至可以在这里调用更复杂的业务逻辑。auto await_suspend(awaitHdl )
:在调用该函数的协程挂起之后立即调用此函数。awaitHdl
是请求挂起(使用co_await
)的协程,其类型为等待协程句柄std::coroutine_handle<PromiseType>
。在这里,你可以指定接下来要做什么,包括立即恢复挂起的协程或等待的协程,并安排另一个协程稍后恢复。为了支持这一点,可以使用特殊的返回类型(下面会讨论)。你甚至可以在这里销毁协程。然而,在这种情况下,你必须确保该协程不会在其他地方被使用(例如在协程接口中调用done()
)。auto await_resume()
:当调用该函数的协程在成功挂起后(即对协程句柄调用resume()
时)恢复时调用此函数。await_resume()
的返回类型是导致挂起的co_await
或co_yield
表达式产生的值的类型。如果不是void
,协程的上下文可以将一个值返回给恢复的协程。
await_suspend()
是这里的关键函数。它的参数和返回值可以有以下变化:
await_suspend()
的参数可以是:- 使用协程句柄的类型:
std::coroutine_handle<PrmType>
。 - 使用适用于所有协程句柄的基本类型:
std::coroutine_handle<void>
(或仅std::coroutine_handle<>
)。在这种情况下,你无法访问承诺(promise)。 - 使用
auto
让编译器确定类型。
- 使用协程句柄的类型:
await_suspend()
的返回类型可以是:void
,表示在执行await_suspend()
中的语句后继续挂起,并返回给协程的调用者。bool
,用于表示是否真的应该进行挂起。这里,false
表示“不再挂起”(这与await_ready()
的布尔返回值含义相反)。std::coroutine_handle<>
,用于恢复另一个协程。这种await_suspend()
的用法称为对称转移(symmetric transfer),稍后会详细介绍。在这种情况下,可以使用空操作协程(noop coroutine)表示根本不恢复任何协程(与函数返回false
的效果相同)。
此外,请注意以下几点:
- 成员函数通常是
const
的,除非等待器有一个会被修改的成员(例如在await_suspend()
中存储协程句柄,以便在恢复时使用)。 - 成员函数通常是
noexcept
的(这是在final_suspend()
中使用所必需的)。 - 成员函数可以是
constexpr
的。
# 15.7.2 让co_await
更新正在运行的协程
由于等待器可以在挂起时执行代码,你可以利用它们来改变处理协程的系统的行为。让我们看一个示例,在这个示例中,我们让协程改变其优先级。
假设我们在一个调度器中管理所有协程,并允许它们使用co_await
来改变优先级。为此,我们首先定义一个默认优先级和一些用于改变优先级的co_await
参数:
int CoroPrioDefVal = 10;
enum class CoroPrioRequest {same, less, more, def};
2
有了这些,一个协程可能如下所示:
//coro/coroprio.hpp
#include "coropriosched.hpp"
#include <iostream>
CoroPrioTask coro(int max) {
std::cout << " coro( " << max << ")\n ";
for (int val = 1; val <= max; ++val) {
std::cout << " coro( " << max << "): " << val << "\n";
co_await CoroPrio{CoroPrioRequest::less}; // 以较低优先级挂起
}
std::cout << " end coro( " << max << ")\n ";
}
2
3
4
5
6
7
8
9
10
11
12
通过使用特殊的协程接口CoroPrioTask
,该协程可以使用CoroPrio
类型的特殊等待器,这使得协程能够传递优先级更改请求。在挂起时,可以使用它来请求更改当前优先级:
co_await CoroPrio{CoroPrioRequest::less}; // 以较低优先级挂起
为此,主程序创建一个调度器,并调用start()
将每个协程传递给调度器:
[`coro/coroprio.cpp`]
#include "coroprio.hpp"
#include <iostream>
int main() {
std::cout << "start main()\n ";
CoroPrioScheduler sched;
std::cout << "schedule coroutines\n ";
sched.start(coro(5));
sched.start(coro(1));
sched.start(coro(4));
std::cout << "loop until all are processed\n ";
while (sched.resumeNext()) {
}
std::cout << "end main()\n ";
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
CoroPrioScheduler
和CoroPrioTask
类的交互如下:
- 调度器将所有协程按优先级顺序存储,并提供用于存储新协程、恢复下一个协程以及更改协程优先级的成员函数:
class CoroPrioScheduler {
std::multimap<int, CoroPrioTask> tasks; // 按优先级排序的所有任务
...
public:
void start(CoroPrioTask&& task);
bool resumeNext();
bool changePrio(CoroPrioTask::CoroHdl hdl, CoroPrioRequest pr);
};
2
3
4
5
6
7
8
- 调度器的成员函数
start()
将传递的协程存储在列表中,并将调度器存储在每个任务的承诺中:
class CoroPrioScheduler {
...
public:
void start(CoroPrioTask&& task) {
// 将调度器存储在协程状态中:
task.hdl.promise().schedPtr = this;
// 以默认优先级调度协程:
tasks.emplace(CoroPrioDefVal, std::move(task));
}
...
};
2
3
4
5
6
7
8
9
10
11
CoroPrioTask
类允许CoroPrioScheduler
类访问句柄,在承诺类型中提供指向调度器的成员,并使协程能够使用CoroPrioRequest
进行co_await
操作:
class CoroPrioTask {
...
friend class CoroPrioScheduler; // 允许访问句柄
struct promise_type {
...
CoroPrioScheduler* schedPtr = nullptr; // 每个任务都知道它的调度器
auto await_transform(CoroPrioRequest); // 处理对CoroPrioRequest的co_await操作
};
...
}
2
3
4
5
6
7
8
9
10
对于start()
,它也以通常的方式提供移动语义。
协程和调度器之间的接口是CoroPrio
类型的等待器。它的构造函数接受优先级请求,并在挂起时(当我们获得协程句柄时)将其存储在承诺中。为此,构造函数存储请求,以便在await_suspend()
中使用:
class CoroPrio {
private:
CoroPrioRequest prioRequest;
public:
CoroPrio(CoroPrioRequest pr)
: prioRequest{pr} { // 处理对CoroPrioRequest的co_await操作
}
...
void await_suspend(CoroPrioTask::CoroHdl h) noexcept {
h.promise().schedPtr->changePrio(h, prioRequest);
}
...
};
2
3
4
5
6
7
8
9
10
11
12
13
其余部分只是协程接口的常规样板代码以及优先级处理。完整代码见coro/coropriosched.hpp
。
如所示,主程序调度这三个协程,并循环直到所有协程都被处理:
sched.start(coro(5));
sched.start(coro(1));
sched.start(coro(4));
while (sched.resumeNext()) {
}
2
3
4
5
输出如下:
start main()
schedule coroutines
loop until all are processed
coro(5)
coro(5): 1
coro(1)
coro(1): 1
coro(4)
coro(4): 1
coro(5): 2
end coro(1)
coro(4): 2
coro(5): 3
coro(4): 3
coro(5): 4
coro(4): 4
coro(5): 5
end coro(4)
end coro(5)
end main()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
最初,启动的三个协程具有相同的优先级。由于每次协程挂起时其优先级都会降低,其他协程会依次运行,直到第一个协程再次具有最高优先级。
# 15.7.3 使用等待器进行对称转移以实现延续
await_suspend()
的返回类型可以是一个协程句柄。在这种情况下,一个协程通过立即恢复返回的协程来实现挂起。这种技术称为对称转移。
对称转移的引入是为了提高性能,并避免协程调用其他协程时发生栈溢出。通常,当使用resume()
恢复一个协程时,程序需要为新协程创建一个新的栈帧。如果我们在await_suspend()
内部或调用之后恢复一个协程,总是需要付出这个代价。通过返回要调用的协程,当前协程的栈帧只需替换其协程,这样就不需要新的栈帧了。
# 通过延续实现对称转移
这种技术的典型应用是通过使用处理延续(continuations)的最终等待器来实现的8。
假设你有一个协程结束后应该继续执行另一个后续协程,而不将控制权返回给调用者,你会遇到以下问题:在final_suspend()
内部,你的协程尚未处于挂起状态。你必须等到为返回的可等待对象调用await_suspend()
来处理任何恢复操作。这可能如下所示:
class CoroTask
{
public:
struct promise_type;
using CoroHdl = std::coroutine_handle<promise_type>;
private:
CoroHdl hdl; // 原生协程句柄
public:
struct promise_type {
std::coroutine_handle<> contHdl = nullptr; // 延续(如果有)
...
auto final_suspend() noexcept {
// 协程尚未挂起,使用等待器处理延续
return FinalAwaiter{};
}
};
...
};
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
因此,我们返回一个等待器,它可以在协程挂起后用于处理该协程。这里,返回的等待器类型为FinalAwaiter
,它可能如下所示:
8这项技术在Lewis Baker和Michael Eiler的文章及邮件的大力帮助下得以记录。特别是http://lewissbaker.github.io/2020/05/11/understanding_symmetric_transfer提供了详细的动机和解释。
struct FinalAwaiter {
bool await_ready() noexcept {
return false;
}
std::coroutine_handle<> await_suspend(CoroTask::CoroHdl h) noexcept {
// 协程现在在最终挂起点挂起
// - 如果有延续,则恢复其延续
if (h.promise().contHdl) {
return h.promise().contHdl; // 返回要恢复的下一个协程
}
else {
return std::noop_coroutine(); // 没有下一个协程 => 返回给调用者
}
}
void await_resume() noexcept {
}
};
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
由于await_suspend()
返回一个协程句柄,返回的协程在挂起时会自动恢复。在这种情况下,实用函数std::noop_coroutine()
表示不恢复任何其他协程,这意味着挂起的协程返回给调用者。
std::noop_coroutine()
返回一个std::noop_coroutine_handle
,它是std::coroutine_handle<std::noop_coroutine_promise>
的别名类型。这种类型的协程在调用resume()
或destroy()
时没有效果,调用address()
时返回nullptr
,调用done()
时总是返回false
。
std::noop_coroutine()
及其返回类型用于await_suspend()
可能可选地返回一个协程以继续执行的情况。由于await_suspend()
的返回类型为std::coroutine_handle<>
,它可以返回一个空操作协程,表示不自动恢复另一个协程。
注意,std::noop_coroutine()
的两个不同返回值不一定比较相等。因此,以下代码在可移植性方面存在问题:
std::coroutine_handle<> coro = std::noop_coroutine();
...
if (coro == std::noop_coroutine()) { // 错误:无法检查coro是否具有初始值
...
return coro;
}
2
3
4
5
6
你应该使用nullptr
代替:
std::coroutine_handle<> coro = nullptr;
...
if (coro) { // 正确(检查coro是否具有初始值)
...
return std::noop_coroutine();
}
2
3
4
5
6
在线程池中处理协程展示了这种技术的一个应用。
# 15.8 处理co_await的其他方式
到目前为止,我们只将等待器传递给co_await
。例如,使用标准等待器:
co_await std::suspend_always{};
或者,另一个例子是co_await
使用用户定义的等待器:
co_await CoroPrio{CoroPrioRequest::less}; // 以较低优先级挂起
然而,co_await expr
是一个操作符,它可以是更大表达式的一部分,并且可以接受不具有等待器类型的值。例如:
co_await 42;
co_await (x + y);
2
co_await
操作符的优先级与sizeof
或new
相同。因此,在上述示例中,如果不改变含义,就不能省略括号。语句
co_await x + y;
将按如下方式求值:
(co_await x) + y;
请注意,这里的x + y
或仅仅x
不一定是等待器。co_await
需要一个可等待对象(awaitable),而等待器只是可等待对象的一种(典型)实现方式。实际上,只要存在到等待器API的映射,co_await
就可以接受任何类型的任何值。对于这种映射,C++标准提供了两种方法:
- 承诺成员函数
await_transform()
operator co_await()
这两种方法都允许协程将任何类型的任何值传递给co_await
,从而隐式或间接地指定一个等待器。
# 15.8.1 await_transform()
如果在协程中出现co_await
表达式,编译器首先会查找协程的承诺是否提供了成员函数await_transform()
。如果存在,就会调用await_transform()
,它必须返回一个等待器,然后该等待器用于挂起协程。
例如,这意味着:
class CoroTask {
struct promise_type {
...
auto await_transform(int val) {
return MyAwaiter{val};
}
};
...
};
CoroTask coro() {
co_await 42;
}
2
3
4
5
6
7
8
9
10
11
12
13
与下面的代码效果相同:
class CoroTask {
...
};
CoroTask coro() {
co_await MyAwaiter{42};
}
2
3
4
5
6
7
你还可以使用它在使用(标准)等待器之前,让协程将一个值传递给承诺:
class CoroTask {
struct promise_type {
...
auto await_transform(int val) {
... // 处理val
return std::suspend_always{};
}
};
...
};
CoroTask coro() {
co_await 42; // 让承诺处理42并挂起
}
2
3
4
5
6
7
8
9
10
11
12
13
14
# 使用值让co_await更新正在运行的协程
还记得那个使用等待器更改协程优先级的示例吗?在那里,我们使用CoroPrio
类型的等待器,使协程能够按如下方式请求新的优先级:
co_await CoroPrio{CoroPrioRequest::less};// 以较低优先级挂起
我们也可以只传递新的优先级:
co_await CoroPrioRequest::less;// 以较低优先级挂起
我们所需要做的就是让协程接口承诺为这种类型的值提供一个await_transform()
成员函数:
class CoroPrioTask {
public:
struct promise_type;
using CoroHdl = std::coroutine_handle<promise_type>;
private:
CoroHdl hdl; // 原生协程句柄
friend class CoroPrioScheduler; // 允许访问句柄
public:
struct promise_type {
CoroPrioScheduler* schedPtr = nullptr; // 每个任务都知道它的调度器
...
auto await_transform(CoroPrioRequest); // 处理co_await CoroPrioRequest
};
...
};
2
3
4
5
6
7
8
9
10
11
12
13
14
15
await_transform()
的实现可能如下:
inline auto CoroPrioTask::promise_type::await_transform(CoroPrioRequest pr) {
auto hdl = CoroPrioTask::CoroHdl::from_promise(*this);
schedPtr->changePrio(hdl, pr);
return std::suspend_always{};
}
2
3
4
5
这里,我们再次使用协程句柄的静态成员函数from_promise()
来获取句柄,因为changePrio()
需要句柄作为其第一个参数。
这样,我们就可以完全跳过CoroPrio
等待器。完整代码见coro/coropriosched2.hpp
(使用示例见coro/coroprio2.cpp
和coro/coroprio2.hpp
)。
# 让co_await表现得像co_yield
还记得co_yield
的示例吗?为了处理要产生的值,我们这样做:
struct promise_type {
int coroValue = 0; // co_yield的最后一个值
auto yield_value(int val) { // 对co_yield的反应
coroValue = val; // - 本地存储值
return std::suspend_always{}; // - 挂起协程
}
...
};
co_yield val; // 调用promise上的yield_value(val)
2
3
4
5
6
7
8
9
通过以下方式,我们可以得到相同的效果:
struct promise_type {
int coroValue = 0; // co_yield的最后一个值
auto await_transform(int val) {
coroValue = val; // - 本地存储值
return std::suspend_always{};
}
...
};
co_await val; // 调用promise上的await_transform(val)
2
3
4
5
6
7
8
9
实际上,
co_yield val;
等同于
co_await prm.yield_value(val);
其中prm
是包含该协程的承诺。
# 15.8.2 operator co_await()
让co_await
处理(几乎)任何类型的任何值的另一种选择是为该类型实现operator co_await()
。这意味着你必须传递一个类的值。
假设我们为某个类型MyType
实现了operator co_await()
:
class MyType {
auto operator co_await() {
return std::suspend_always{};
}
};
2
3
4
5
然后,对该类型的对象调用co_await
:
CoroTask coro()
{
...
co_await MyType{};
}
2
3
4
5
会调用该操作符,并使用返回的等待器来处理挂起。这里,operator co_await()
返回std::suspend_always{}
,这意味着我们实际上得到:
CoroTask coro() {
...
co_await std::suspend_always{};
}
2
3
4
然而,你可以向MyType{}
传递参数,并将它们传递给等待器,这样就可以将值传递给挂起的协程,以便立即或稍后恢复执行。
另一个例子是将一个协程传递给co_await
,然后可以调用该协程,包括进行一些准备工作,如切换线程或将协程调度到线程池中。
# 15.9 协程的并发使用
原则上,向co_await
传递操作数可以执行任何代码。我们可以跳转到完全不同的上下文,或者执行一些操作,并在获得操作结果后继续执行。即使其他操作在不同的线程中运行,也不需要同步机制。
本节提供一个基本示例来演示这种技术。其中一部分是一个简单的线程池,用于处理协程,它使用了std::jthread
和C++20的一些新的并发特性。
# 15.9.1 co_await协程
假设我们有以下相互调用的协程:
// coro/coroasync.hpp
#include "coropool.hpp"
#include <iostream>
#include <syncstream> // 用于std::osyncstream
inline auto syncOut(std::ostream& strm = std::cout) {
return std::osyncstream{strm};
}
CoroPoolTask print(std::string id, std::string msg) {
syncOut() << " > " << id << " print : " << msg
<< " on thread : " << std::this_thread::get_id() << std::endl;
co_return; // 使其成为一个协程
}
CoroPoolTask runAsync(std::string id) {
syncOut() << "===== " << id << " start "
<< " on thread : " << std::this_thread::get_id() << std::endl;
co_await print(id + "a ", "start ");
syncOut() << "===== " << id << " resume "
<< " on thread " << std::this_thread::get_id() << std::endl;
co_await print(id + "b ", "end ");
syncOut() << "===== " << id << " resume "
<< " on thread " << std::this_thread::get_id() << std::endl;
syncOut() << "===== " << id << " done " << std::endl;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
这两个协程都使用CoroPoolTask
,这是一个用于在线程池中运行任务的协程接口。该接口和线程池的实现将在后面讨论。
重要的是,协程runAsync()
使用co_await
来调用另一个协程print()
:
CoroPoolTask runAsync(std::string id) {
...
co_await print( ... );
...
}
2
3
4
5
正如我们将看到的,这样做的效果是print()
协程将被调度到线程池中在不同的线程中运行。此外,co_await
会阻塞,直到print()
完成。
请注意,print()
需要一个co_return
来确保编译器将其视为一个协程。如果没有这个,我们根本就没有任何co_
关键字,编译器(假设这是一个普通函数)会抱怨我们有返回类型但没有返回语句。
还要注意,我们使用辅助函数syncOut()
,它返回一个std::osyncstream
,以确保不同线程的并发输出逐行同步。
假设我们如下调用协程runAsync()
:
// coro/coroasync1.cpp
#include "coroasync.hpp"
#include <iostream>
int main() {
// 初始化协程线程池:
syncOut() << "**** main() on thread " << std::this_thread::get_id()
<< std::endl;
CoroPool pool{4};
// 启动主协程并在线程池中运行它:
syncOut() << "runTask(runAsync(1)) " << std::endl;
CoroPoolTask t1 = runAsync( "1 ");
pool.runTask(std::move(t1));
// 等待所有协程完成:
syncOut() << "\n**** waitUntilNoCoros() " << std::endl;
pool.waitUntilNoCoros();
syncOut() << "\n**** main() done " << std::endl;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
在这里,我们首先使用CoroPoolTask
接口为所有协程创建一个CoroPool
类型的线程池:
CoroPool pool{4};
然后,我们调用协程,该协程会延迟启动并返回接口,我们将这个接口交给线程池来控制协程:
CoroPoolTask t1 = runAsync("1");
pool.runTask(std::move(t1));
2
我们(并且必须)对协程接口使用移动语义,因为runTask()
要求传递一个右值,以便能够接管协程的所有权(调用之后t1
不再拥有该协程的所有权)。
我们也可以将其写在一条语句中:
pool.runTask(runAsync("1"));
在结束程序之前,线程池会阻塞,直到所有已调度的协程都处理完毕:
pool.waitUntilNoCoros();
当我们运行这个程序时,会得到类似以下的输出(线程ID会有所不同):
**** main() on thread 0x80000008 runTask(runAsync(1))
**** waitUntilNoCoros()
===== 1 start on thread: 0x8002cd90
> 1a print: start on thread: 0x8002ce68
===== 1 resume on thread 0x8002ce68
> 1b print: end on thread: 0x8004d090
===== 1 resume on thread 0x8004d090
===== 1 done
**** main() done
2
3
4
5
6
7
8
9
如你所见,程序使用了不同的线程:
- 协程
runAsync()
在与main()
不同的线程上启动。 - 从那里调用的协程
print()
在第三个线程上启动。 - 从那里调用的第二次协程
print()
在第四个线程上启动。
然而,runAsync()
每次调用print()
时都会切换线程。通过使用co_await
,这些调用会挂起runAsync()
,并在不同的线程上调用print()
(正如我们将看到的,挂起操作会在线程池中调度被调用的协程)。在print()
结束时,调用print()
的协程runAsync()
会在print()
运行的同一线程上恢复执行。
作为一种变化,我们也可以启动并调度协程runAsync()
四次:
// coro/coroasync2.cpp
#include "coroasync.hpp"
#include <iostream>
int main() {
// 初始化协程线程池:
syncOut() << "**** main() on thread " << std::this_thread::get_id()
<< std::endl;
CoroPool pool{4};
// 启动多个协程并在线程池中运行它们:
for (int i = 1; i <= 4; ++i) {
syncOut() << "runTask(runAsync( " << i << ")) " << std::endl;
pool.runTask(runAsync(std::to_string(i)));
}
// 等待所有协程完成:
syncOut() << "\n**** waitUntilNoCoros() " << std::endl;
pool.waitUntilNoCoros();
syncOut() << "\n**** main() done " << std::endl;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
这个程序可能会有以下输出(由于平台不同,线程ID会有所不同):
**** main() on thread 17308
runTask(runAsync(1))
runTask(runAsync(2))
runTask(runAsync(3))
runTask(runAsync(4))
**** waitUntilNoCoros()
===== 1 start on thread: 18016
===== 2 start on thread: 9004
===== 3 start on thread: 17008
===== 4 start on thread: 2816
> 2a print: start on thread: 2816
> 1a print: start on thread: 17008
===== 1 resume on thread 17008
> 4a print: start on thread: 18016
===== 4 resume on thread 18016
> 3a print: start on thread: 9004
===== 3 resume on thread 9004
===== 2 resume on thread 2816
> 4b print: end on thread: 9004
> 1b print: end on thread: 2816
===== 1 resume on thread 2816
===== 1 done
===== 4 resume on thread 9004
===== 4 done
> 2b print: end on thread: 17008
===== 2 resume on thread 17008
===== 2 done
> 3b print: end on thread: 18016
===== 3 resume on thread 18016
===== 3 done
**** main() done
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
由于我们有并发协程使用下一个可用线程,现在的输出变化更大。然而,总体而言,我们有相同的行为:
- 每当协程调用
print()
时,它会被调度并可能在不同的线程上启动(如果没有其他线程可用,也可能是同一线程)。 - 每当
print()
完成时,调用print()
的协程runAsync()
会直接在print()
运行的同一线程上恢复执行,这意味着runAsync()
实际上每次调用print()
时都会切换线程。
# 15.9.2 用于协程任务的线程池
以下是协程接口CoroPoolTask
和相应线程池类CoroPool
的实现:
// coro/coropool.hpp
#include <iostream>
#include <list>
#include <utility> // 用于std::exchange()
#include <functional> // 用于std::function
#include <coroutine>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
class CoroPool;
class [[nodiscard]] CoroPoolTask {
friend class CoroPool;
public:
struct promise_type;
using CoroHdl = std::coroutine_handle<promise_type>;
private:
CoroHdl hdl;
public:
struct promise_type {
CoroPool* poolPtr = nullptr; // 如果不为空,生命周期由线程池控制
CoroHdl contHdl = nullptr; // 等待此协程的协程
CoroPoolTask get_return_object() noexcept {
return CoroPoolTask{CoroHdl::from_promise(*this)};
}
auto initial_suspend() const noexcept { return std::suspend_always{}; }
void unhandled_exception() noexcept { std::exit(1); }
void return_void() noexcept {}
auto final_suspend() const noexcept {
struct FinalAwaiter {
bool await_ready() const noexcept { return false; }
std::coroutine_handle<> await_suspend(CoroHdl h) noexcept {
if (h.promise().contHdl) {
return h.promise().contHdl; // 恢复延续协程
} else {
return std::noop_coroutine(); // 没有延续协程
}
}
void await_resume() noexcept {}
};
return FinalAwaiter{}; // 挂起后,如果有延续协程则恢复它
}
};
explicit CoroPoolTask(CoroHdl handle)
: hdl{handle} {}
~CoroPoolTask() {
if (hdl &&!hdl.promise().poolPtr) {
// 任务未被传递到线程池:
hdl.destroy();
}
}
CoroPoolTask(const CoroPoolTask&) = delete;
CoroPoolTask& operator=(const CoroPoolTask&) = delete;
CoroPoolTask(CoroPoolTask&& t)
: hdl{t.hdl} {
t.hdl = nullptr;
}
CoroPoolTask& operator=(CoroPoolTask&&) = delete;
// 用于co_await task()的等待器
// - 将新协程排入线程池
// - 将调用协程设置为延续协程
struct CoAwaitAwaiter {
CoroHdl newHdl;
bool await_ready() const noexcept { return false; }
void await_suspend(CoroHdl awaitingHdl) noexcept; // 见下文
void await_resume() noexcept {}
};
auto operator co_await() noexcept {
return CoAwaitAwaiter{std::exchange(hdl, nullptr)}; // 线程池接管hdl的所有权
}
};
class CoroPool {
private:
std::list<std::jthread> threads; // 线程列表
std::list<CoroPoolTask::CoroHdl> coros; // 已调度协程队列
std::mutex corosMx;
std::condition_variable_any corosCV;
std::atomic<int> numCoros = 0; // 线程池拥有的所有协程的计数器
public:
explicit CoroPool(int num) {
// 启动包含num个线程的线程池:
for (int i = 0; i < num; ++i) {
std::jthread worker_thread{[this](std::stop_token st) {
threadLoop(st);
}};
threads.push_back(std::move(worker_thread));
}
}
~CoroPool() {
for (auto& t : threads) { // 请求停止所有线程
t.request_stop();
}
for (auto& t : threads) { // 等待所有线程结束
t.join();
}
for (auto& c : coros) { // 销毁剩余的协程
c.destroy();
}
}
CoroPool(CoroPool&) = delete;
CoroPool& operator=(CoroPool&) = delete;
void runTask(CoroPoolTask&& coroTask) noexcept {
auto hdl = std::exchange(coroTask.hdl, nullptr); // 线程池接管hdl的所有权
if (coroTask.hdl.done()) {
coroTask.hdl.destroy(); // 糟糕,传递了一个已完成的协程
} else {
// 在池中调度协程
}
}
// runCoro(): 让线程池运行(并控制生命周期)协程
// 从以下调用:
// - pool.runTask(CoroPoolTask)
// - co_await task()
void runCoro(CoroPoolTask::CoroHdl coro) noexcept {
++numCoros;
coro.promise().poolPtr = this; // 防止在CoroPoolTask中销毁
{
std::scoped_lock lock(corosMx);
coros.push_front(coro); // 排入协程
corosCV.notify_one(); // 让一个线程恢复它
}
}
void threadLoop(std::stop_token st) {
while (!st.stop_requested()) {
// 从队列中获取下一个协程任务:
CoroPoolTask::CoroHdl coro;
{
std::unique_lock lock(corosMx);
if (!corosCV.wait(lock, st, [&] {
return!coros.empty();
})) {
return; // 请求停止
}
coro = coros.back();
coros.pop_back();
}
// 恢复它:
coro.resume(); // 恢复
// 注意:最初在这个线程上恢复的协程可能不是最终调用的协程。
// 如果一个主协程等待一个子协程,那么最终恢复子协程的线程会将主协程作为其延续协程来恢复。
// => 这次恢复后,这个协程和一些延续协程可能已经完成
std::function<void(CoroPoolTask::CoroHdl)> destroyDone;
destroyDone = [&destroyDone, this](auto hdl) {
if (hdl && hdl.done()) {
auto nextHdl = hdl.promise().contHdl;
hdl.destroy(); // 销毁已完成的协程
--numCoros; // 调整协程总数
destroyDone(nextHdl); // 对所有已完成的延续协程执行此操作
}
};
destroyDone(coro); // 递归销毁已完成的协程
numCoros.notify_all(); // 唤醒任何等待的waitUntilNoCoros()
// 稍作休眠,强制下一次使用另一个线程:
std::this_thread::sleep_for(std::chrono::milliseconds{100});
}
}
void waitUntilNoCoros() {
int num = numCoros.load();
while (num > 0) {
numCoros.wait(num); // 等待numCoros值改变的通知
num = numCoros.load();
}
}
};
// 用于co_await task()的CoroPoolTask等待器
// - 将新协程排入线程池
// - 将调用协程设置为延续协程
void CoroPoolTask::CoAwaitAwaiter::await_suspend(CoroHdl awaitingHdl) noexcept {
newHdl.promise().contHdl = awaitingHdl;
awaitingHdl.promise().poolPtr->runCoro(newHdl);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
CoroPoolTask
和CoroPool
这两个类紧密协作:
CoroPoolTask
用于将协程初始化为要调用的任务。该任务应在线程池中进行调度,然后线程池会一直控制它,直到它被销毁。CoroPool
实现了一个最小化的线程池,它可以运行已调度的协程。此外,它还提供了一个非常基本的API来:- 阻塞直到所有已调度的协程完成。
- 关闭线程池(其析构函数会自动执行此操作)。
让我们详细看看代码。
# CoroPoolTask类
CoroPoolTask
类提供了一个协程接口,用于运行具有以下特点的任务:
- 每个协程都有一个指向线程池的指针,当线程池接管协程的控制权时,该指针会被初始化:
class [[nodiscard]] CoroPoolTask {
...
struct promise_type {
CoroPool* poolPtr = nullptr; // 如果不为空,生命周期由线程池控制
...
};
...
};
2
3
4
5
6
7
8
- 每个协程都有一个用于可选延续协程的成员,如前所述:
class [[nodiscard]] CoroPoolTask
{
...
struct promise_type {
...
CoroHdl contHdl = nullptr; // 等待此协程的协程
...
auto final_suspend() const noexcept {
struct FinalAwaiter {
bool await_ready() const noexcept { return false; }
std::coroutine_handle<> await_suspend(CoroHdl h) noexcept {
if (h.promise().contHdl) {
return h.promise().contHdl; // 恢复延续协程
} else {
return std::noop_coroutine(); // 没有延续协程
}
}
void await_resume() noexcept {}
};
return FinalAwaiter{}; // 挂起后,如果有延续协程则恢复它
}
};
...
};
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
- 每个协程都有一个
operator co_await()
,这意味着co_await task()
会使用一个特殊的等待器,以便在将传递的协程调度到线程池并将当前协程设置为延续协程后,挂起当前协程。
下面是实现的operator co_await()
的工作原理。相关代码如下:
class [[nodiscard]] CoroPoolTask {
struct CoAwaitAwaiter {
CoroHdl newHdl;
bool await_ready() const noexcept { return false; }
void await_suspend(CoroHdl awaitingHdl) noexcept; // 见下文
void await_resume() noexcept {}
};
auto operator co_await() noexcept {
return CoAwaitAwaiter{std::exchange(hdl, nullptr)}; // 线程池接管hdl的所有权
}
};
void CoroPoolTask::CoAwaitAwaiter::await_suspend(CoroHdl awaitingHdl) noexcept {
newHdl.promise().contHdl = awaitingHdl;
awaitingHdl.promise().poolPtr->runCoro(newHdl);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
当我们使用co_await
调用print()
时:
CoroPoolTask runAsync(std::string id) {
...
co_await print(...);
...
}
2
3
4
5
会发生以下情况:
print()
作为协程被调用并初始化。- 对于返回的
CoroPoolTask
类型的协程接口,调用operator co_await()
。 operator co_await()
获取句柄以初始化一个CoAwaitAwaiter
类型的等待器。这意味着新的协程print()
成为等待器的成员newHdl
。std::exchange()
确保在初始化的协程接口中,句柄hdl
变为nullptr
,这样析构函数就不会调用destroy()
。runAsync()
被operator co_await()
初始化的等待器挂起,该等待器调用await_suspend()
,并将runAsync()
协程的句柄作为参数传递。- 在
await_suspend()
内部,我们将传递的已挂起的runAsync()
存储为延续协程,并将newHdl
(即print()
)排入线程池,以便由其中一个线程恢复执行。
# 类CoroPool
CoroPool
通过结合本书此处及其他部分介绍的一些特性,实现了其最简线程池。
首先来看看它的成员:
- 与任何典型线程池一样,这个类有一个使用新线程类
std::jthread
的线程成员,这样我们就无需处理异常,还能发出停止信号:
std::list<std::jthread> threads; // 线程列表
我们不需要任何同步操作,因为初始化后,我们仅在shutdown()
期间会再次使用线程,而shutdown()
会先发出停止信号,然后再连接所有线程。这里不支持复制和移动线程池(为简单起见,若需要,我们也能轻松提供移动支持)。
注意,为了提高性能,析构函数会在开始调用join()
之前,先请求所有线程停止。
- 然后,我们有一个用于处理协程的队列,以及相应的同步成员:
std::list<CoroPoolTask::CoroHdl> coros; // 已调度协程的队列
std::mutex corosMx;
std::condition_variable_any corosCV;
2
3
- 为避免过早停止线程池,线程池还会跟踪其拥有的协程总数:
std::atomic<int> numCoros = 0; // 线程池拥有的所有协程的计数器
并提供成员函数waitUntilNoCoros()
。
成员函数runCoro()
是调度协程恢复的关键函数,它以CoroPoolTask
接口的协程句柄作为参数。调度协程接口本身有两种方法:
- 调用
runTask()
。 - 使用
co_await task()
。
这两种方法都会将协程句柄移动到线程池中,这样,当句柄不再使用时,线程池就有责任调用destroy()
销毁它。
然而,正确且安全地确定调用destroy()
(并调整协程总数)的恰当时机并非易事。为此,协程应该处于最终暂停状态,这就排除了在任务的final_suspend()
或最终等待器的await_suspend()
中执行此操作的可能性。因此,跟踪和销毁的工作方式如下:
- 每次将协程句柄传递给线程池时,我们都会增加协程数量:
void runCoro(CoroPoolTask::CoroHdl coro) noexcept {
++numCoros;
...
}
2
3
4
- 每次线程恢复协程后,我们检查该协程及其可能的延续是否已完成。注意,协程帧会根据任务最终等待器的
await_suspend()
自动调用延续。
因此,每次恢复完成后,我们会递归地遍历所有延续,以销毁所有已完成的协程:
std::function<void(CoroPoolTask::CoroHdl)> destroyDone;
destroyDone = [&destroyDone, this](auto hdl) {
if (hdl && hdl.done()) {
auto nextHdl = hdl.promise().contHdl;
hdl.destroy(); // 销毁已完成的句柄
--numCoros; // 调整协程总数
destroyDone(nextHdl); // 对所有已完成的延续执行相同操作
}
};
destroyDone(coro); // 递归销毁已完成的协程
2
3
4
5
6
7
8
9
10
由于这个lambda表达式是递归使用的,我们必须将其前置声明为std::function<>
类型。
- 最后,我们使用原子类型的新线程同步特性,唤醒任何等待
waitUntilNoCoros()
的操作:
numCoros.notify_all(); // 唤醒任何等待waitUntilNoCoros()的操作
void waitUntilNoCoros() {
int num = numCoros.load();
while (num > 0) {
numCoros.wait(num); // 等待numCoros值变化的通知
num = numCoros.load();
}
}
2
3
4
5
6
7
- 如果线程池被销毁,在所有线程完成后,我们也会销毁所有剩余的协程句柄。
# 异步协程的同步等待
在实际的协程池中,很多方面可能看起来有所不同,更加复杂,并且会使用额外技巧使代码更健壮、更安全。
例如,线程池可能会提供一种调度任务并等待其结束的方式,对于CoroPool
来说,可能如下所示:
class CoroPool {
...
void syncWait(CoroPoolTask&& task) {
std::binary_semaphore taskDone{0};
auto makeWaitingTask = [&]() -> CoroPoolTask {
co_await task;
struct SignalDone {
std::binary_semaphore& taskDoneRef;
bool await_ready() { return false; }
bool await_suspend(std::coroutine_handle<>) {
taskDoneRef.release(); // 发出任务已完成的信号
return false; // 根本不暂停
}
void await_resume() { }
};
co_await SignalDone{taskDone};
};
runTask(makeWaitingTask());
taskDone.acquire();
}
};
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
这里我们使用了一个二元信号量(binary semaphore
),这样我们就可以发出已传递任务结束的信号,并在不同线程中等待它。
注意,你必须清楚具体发出的是什么信号。这里,信号表示我们在执行co_await
调用的任务之后。在这种情况下,你甚至可以在await_ready()
中发出这个信号:
bool await_ready() {
taskDoneRef.release(); // 发出任务已完成的信号
return true; // 根本不暂停
}
2
3
4
一般来说,你必须考虑到在await_ready()
中,协程尚未暂停。因此,任何导致它检查done()
甚至destroy()
的信号都会导致致命的运行时错误。因为我们这里使用了并发代码,你必须确保该信号甚至不会间接导致相应的调用。
# 15.9.3 C++20之后的C++库将提供什么
在本章关于协程的这部分内容中,你所看到的只是一个非常简单的代码示例,它不够健壮、线程不安全、不完整,也不够灵活,无法为协程的各种典型用例提供通用解决方案。
C++标准委员会正在研究更好的解决方案,以便在未来的库中提供。
对于一个能够以安全且灵活的方式并发运行协程的程序,你至少需要以下组件:
- 某种任务类型,允许你将协程链接在一起。
- 某种机制,允许你启动多个独立运行的协程,并在稍后连接它们。
- 类似
syncWait()
的功能,允许同步函数阻塞,等待异步函数完成。 - 某种机制,允许在较少数量的线程上复用多个协程。
CoroPool
类或多或少地将最后三个方面结合在了一起。然而,更灵活的方法是提供可以以各种方式组合的单个构建块。此外,良好的线程安全性需要更好的设计和实现技术。
我们仍在学习和讨论如何做到最好。因此,即使在C++23中,可能也只会有极少的支持(如果有的话)。
# 15.10 协程特性
到目前为止,在所有示例中,协程都是返回协程接口。然而,你也可以实现这样的协程:它们使用在其他地方创建的接口。考虑以下示例(第一个协程示例的修改版本):
void coro(int max, CoroTask&) {
std::cout << "CORO " << max << " start\n";
for (int val = 1; val <= max; ++val) {
// 打印下一个值:
std::cout << "CORO " << val << "/" << max << "\n";
co_await std::suspend_always{}; // 暂停
}
std::cout << "CORO " << max << " end\n";
}
2
3
4
5
6
7
8
9
这里,协程将CoroTask
类型的协程接口作为参数。然而,我们必须指定该参数用作协程接口。这是通过std::coroutine_traits<>
模板的特化来完成的。它的模板参数必须指定签名(返回类型和参数类型),并且在内部,我们必须将类型成员promise_type
映射到协程接口参数的类型成员:
template<>
struct std::coroutine_traits<void, int, CoroTask&> {
using promise_type = CoroTask::promise_type;
};
2
3
4
现在我们唯一需要确保的是,协程接口可以在没有协程的情况下创建,并且稍后可以引用协程。为此,我们需要一个带有构造函数的承诺对象,该构造函数接受与协程相同的参数:
class CoroTask {
public:
// 定制所需的类型:
struct promise_type {
promise_type(int, CoroTask& ct) { // 初始化传递的协程接口
ct.hdl = CoroHdl::from_promise(*this);
}
void get_return_object() { // 无需再做任何事
}
...
};
private:
// 用于分配状态的句柄(可以是私有的):
using CoroHdl = std::coroutine_handle<promise_type>;
CoroHdl hdl;
public:
// 构造函数和析构函数:
CoroTask() : hdl{} { // 允许在没有句柄的情况下创建协程接口
}
...
};
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
现在使用该协程的代码可能如下所示:
CoroTask coroTask; // 创建没有协程的协程接口
// 启动协程:
coro(3, coroTask); // 初始化并将协程存储在已创建的接口中
// 循环恢复协程,直到它完成:
while (coroTask.resume()) { // 恢复
std::this_thread::sleep_for(500ms);
}
2
3
4
5
6
7
你可以在coro/corotraits.cpp
中找到完整示例。
承诺对象的构造函数会获取传递给协程的所有参数。然而,由于承诺对象通常只需要协程接口,你可以将接口作为第一个参数传递,然后将auto&&...
作为最后一个参数。