第19章 并发
# 第19章 并发
从长远来看,用允许不受限制地使用存储位置及其地址的面向机器的语言编写大型并发程序并不可取。即便借助复杂的硬件机制,我们也根本无法让这类程序变得可靠。 ——佩尔·布林奇·汉森(1977年)
通信模式即并行模式。 ——惠特·莫里斯
如果在你的职业生涯中,对并发编程的态度有所转变,那你并不孤单,这是很常见的情况。
起初,编写并发代码轻松又有趣。线程、锁、队列等工具很容易上手使用。确实,其中存在诸多陷阱,但好在你清楚它们是什么,并且会小心避免犯错。
在某个阶段,你得调试别人写的多线程代码,然后不得不承认,有些人真的不适合用这些工具。
接着,又到了调试自己多线程代码的时候。
经验会让你对所有多线程代码产生一种合理的怀疑,甚至是彻底的怀疑主义。时不时就会有文章用令人头疼的细节解释,为什么某些看似正确的多线程编程习惯用法根本行不通(这与 “内存模型” 有关),这进一步加深了你的这种怀疑。但最终,你会找到一种自认为在实际使用中不会频繁出错的并发编程方法。你几乎可以把所有情况都套用到这种方法中,而且(要是你真的很厉害)还能学会对增加的复杂性说 “不”。
当然,并发编程的方法有很多。系统程序员常用的方法包括以下几种:
- 一个后台线程执行单一任务,并定期唤醒执行该任务。
- 通用的工作线程池,通过任务队列与客户端进行通信。
- 流水线式处理,数据从一个线程流向另一个线程,每个线程完成一部分工作。
- 数据并行处理,即假定(无论正确与否)整个计算机主要执行一个大型计算任务,因此将其拆分成
n
个部分,在n
个线程上运行,期望能让机器的所有n
个核心同时投入工作。 - 大量同步对象,多个线程可以访问相同的数据,通过基于互斥锁等底层原语的临时锁定方案来避免竞态条件。(Java内置了对这种模型的支持,在20世纪90年代和21世纪初,这种模型非常流行。)
- 原子整数操作,允许多个核心通过一个机器字大小的字段传递信息进行通信。(这种方式比其他所有方式都更难正确实现,除非正在交换的数据确实只是整数值。在实际应用中,通常是指针。)
随着时间的推移,你可能会掌握其中几种方法,并能安全地将它们结合起来。你成了这门艺术的大师。要是其他人永远都不被允许以任何方式修改系统,一切就都完美了。那些线程运用得当的程序里充满了不成文的规则。
Rust提供了一种更好的并发编程方式,它不是强迫所有程序采用单一风格(这对系统程序员来说根本不是解决方案),而是安全地支持多种风格。那些不成文的规则被写进了代码里,并由编译器强制执行。
你可能听说过,Rust能让你编写安全、快速的并发程序。本章就来告诉你如何实现。我们将介绍使用Rust线程的三种方式:
- 分治并行(Fork - join parallelism)
- 通道(Channels)
- 共享可变状态(Shared mutable state)
在这个过程中,你会用到目前所学的关于Rust语言的所有知识。Rust在引用、可变性和生命周期方面的严格处理,在单线程程序中就很有价值,而在并发编程中,这些规则的真正意义才得以凸显。它们让你能够扩充自己的编程工具库,快速且正确地编写多种风格的多线程代码,无需怀疑,无需担忧。
# 分治并行
当有几个完全独立的任务想要同时执行时,就会出现线程最简单的用例。
例如,假设我们要对一个大型文档语料库进行自然语言处理。我们可以编写一个循环:
fn process_files(filenames: Vec<String>) -> io::Result<()> {
for document in filenames {
let text = load(&document)?; // 读取源文件
let results = process(text); // 计算统计信息
save(&document, results)?; // 写入输出文件
}
Ok(())
}
2
3
4
5
6
7
8
该程序的运行过程如图19 - 1所示。
图19 - 1. process_files()的单线程执行
由于每个文档都是单独处理的,所以通过将语料库分成多个块,并在不同线程上处理每个块,相对容易加快这个任务的速度,如图19 - 2所示。
这种模式称为分治并行(Fork - join parallelism)。“分治(fork)” 指启动一个新线程,“合并(join)” 指等待线程完成。我们之前见过这种技术:在第2章中,我们用它来加速曼德布洛特集合绘制程序。
分治并行有几个吸引人的地方:
极其简单:分治并行很容易实现,而且Rust让其正确实现也变得轻松。
避免瓶颈:分治并行中不存在对共享资源的锁定。任何线程唯一需要等待其他线程的时候是在最后。在此期间,每个线程都能自由运行,这有助于降低任务切换的开销。
性能计算直观:在理想情况下,启动四个线程,我们就能在四分之一的时间内完成工作。图19 - 2展示了我们不能期望达到这种理想加速比的一个原因:我们可能无法将工作平均分配到所有线程上。另一个需要谨慎对待的原因是,有时分治并行程序在线程合并后,必须花费一些时间来合并线程计算出的结果。也就是说,完全隔离任务可能会产生一些额外工作。即便如此,除了这两点之外,任何具有独立工作单元且受CPU限制的程序都有望获得显著的性能提升。
易于推断程序正确性:只要线程真正相互隔离,比如曼德布洛特程序中的计算线程,分治并行程序就是确定性的。无论线程速度如何变化,程序总是产生相同的结果。这是一种没有竞态条件的并发模型。
图19 - 2. 使用分治并行方法的多线程文件处理
分治并行的主要缺点是它要求工作单元是独立的。在本章后面,我们会探讨一些不太容易分割的问题。
目前,让我们继续以自然语言处理为例。我们将展示几种把分治并行模式应用到process_files
函数的方法。
# spawn和join
std::thread::spawn
函数用于启动一个新线程:
use std::thread;
thread::spawn(|| {
println!("hello from a child thread");
});
2
3
4
5
它接受一个参数,即一个实现了FnOnce
的闭包或函数。Rust会启动一个新线程来运行该闭包或函数中的代码。新线程是一个真正的操作系统线程,拥有自己的栈,这和C++、C#以及Java中的线程一样。
下面是一个更具体的示例,使用spawn
实现之前process_files
函数的并行版本:
use std::{thread, io};
fn process_files_in_parallel(filenames: Vec<String>) -> io::Result<()> {
// 将工作划分为多个块。
const NTHREADS: usize = 8;
let worklists = split_vec_into_chunks(filenames, NTHREADS);
// 分治(Fork):启动一个线程来处理每个块。
let mut thread_handles = vec![];
for worklist in worklists {
thread_handles.push(
thread::spawn(move || process_files(worklist))
);
}
// 合并(Join):等待所有线程完成。
for handle in thread_handles {
handle.join().unwrap()?;
}
Ok(())
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
我们逐行分析这个函数。
fn process_files_in_parallel(filenames: Vec<String>) -> io::Result<()> {
我们的新函数与原始的process_files
函数具有相同的类型签名,这使得它可以很方便地直接替换原函数。
// Divide the work into several chunks.
const NTHREADS: usize = 8;
let worklists = split_vec_into_chunks(filenames, NTHREADS);
2
3
我们使用了一个未在此处展示的工具函数split_vec_into_chunks
来划分工作。结果worklists
是一个向量的向量,它包含了原始向量filenames
均匀划分的八个部分。
// Fork: Spawn a thread to handle each chunk.
let mut thread_handles = vec![];
for worklist in worklists {
thread_handles.push(
thread::spawn(move || process_files(worklist))
);
}
2
3
4
5
6
7
我们为每个工作列表启动一个线程。spawn()
函数返回一个名为JoinHandle
的值,我们稍后会用到它。目前,我们把所有的JoinHandle
存入一个向量中。
注意我们是如何将文件名列表传递到工作线程中的:
worklist
由父线程中的for
循环定义并填充数据。- 一旦创建了
move
闭包,worklist
就会被移动到闭包中。 - 然后
spawn
将闭包(包括worklist
向量)移动到新的子线程中。
这些移动操作的开销很小。就像我们在第4章讨论的Vec<String>
的移动一样,String
不会被克隆。实际上,没有进行任何内存分配或释放操作。唯一移动的数据是Vec
本身:占用三个机器字。
大多数情况下,你创建的每个线程都需要代码和数据才能开始运行。Rust的闭包很方便,它可以包含你想要的任何代码和数据。
继续看下面的代码:
// Join: Wait for all threads to finish.
for handle in thread_handles {
handle.join().unwrap()?;
}
2
3
4
我们使用之前收集的JoinHandle
的.join()
方法,等待所有八个线程完成任务。
等待线程完成通常是确保程序正确性的必要操作,因为Rust程序一旦main
函数返回就会退出,即使其他线程仍在运行。析构函数不会被调用,多余的线程会直接被终止。如果你不希望出现这种情况,务必在从main
函数返回之前,等待你关心的所有线程完成。
如果我们成功执行完这个循环,就意味着所有八个子线程都成功完成了任务。因此,我们的函数最后返回Ok(())
:
Ok(())
}
2
# 跨线程错误处理
在我们的示例中,用于等待子线程完成的代码比看起来要复杂,这是因为涉及到错误处理。我们再来看一下这行代码:
handle.join().unwrap()?;
.join()
方法为我们做了两件很巧妙的事情。
第一,handle.join()
返回一个std::thread::Result
,如果子线程发生了恐慌(panic),这个结果就会是一个错误。这使得Rust中的线程比C++中的线程健壮得多。在C++中,数组越界访问属于未定义行为,并且无法保护系统的其他部分不受其影响。在Rust中,恐慌是线程安全的,线程之间的边界就像恐慌的防火墙一样,恐慌不会自动从一个线程传播到依赖它的其他线程。相反,一个线程中的恐慌会在其他线程中作为错误结果报告。这样整个程序可以很容易地恢复。
不过,在我们的程序中,我们并没有尝试进行复杂的恐慌处理。相反,我们直接对这个Result
使用.unwrap()
,断言它是Ok
结果而不是Err
结果。如果某个子线程发生了恐慌,那么这个断言就会失败,进而导致父线程也发生恐慌。我们显式地将子线程的恐慌传递给了父线程。
第二,handle.join()
会将子线程的返回值传递回父线程。我们传递给spawn
的闭包返回类型是io::Result<()>
,因为process_files
的返回类型就是这样。这个返回值不会被丢弃。当子线程完成任务时,它的返回值会被保存下来,JoinHandle::join()
会将这个值传递回父线程。
在这个程序中,handle.join()
返回的完整类型是std::thread::Result<std::io::Result<()>>
。其中thread::Result
是spawn/join
API的一部分,而io::Result
是我们应用程序的一部分。
在我们的例子中,在解包thread::Result
之后,我们对io::Result
使用?
操作符,显式地将子线程中的I/O错误传递给父线程。
这一切看起来可能相当复杂。但要知道这只是一行代码,再和其他语言对比一下。在Java和C#中,子线程中的异常默认会被输出到终端然后被忽略。在C++中,默认做法是终止进程。在Rust中,错误是Result
值(数据)而不是异常(控制流)。它们和其他任何值一样在不同线程之间传递。任何时候你使用底层线程API,最终都得编写仔细的错误处理代码,不过既然必须要写,有Result
类型就方便多了。
# 跨线程共享不可变数据
假设我们正在进行的分析需要一个包含大量英语单词和短语的数据库:
// 之前
fn process_files(filenames: Vec<String>)
// 之后
fn process_files(filenames: Vec<String>, glossary: &GigabyteMap)
2
3
4
这个术语表会非常大,所以我们通过引用传递它。那么如何修改process_files_in_parallel
函数,以便将术语表传递给工作线程呢?
直接修改的方法行不通:
fn process_files_in_parallel(filenames: Vec<String>,
glossary: &GigabyteMap)
-> io::Result<()>
{
...
for worklist in worklists {
thread_handles.push(
spawn(move || process_files(worklist, glossary)) // 错误
);
}
...
}
2
3
4
5
6
7
8
9
10
11
12
我们只是在函数中添加了一个glossary
参数,并将其传递给process_files
。Rust会报错:
error[E0621]: explicit lifetime required in the type of `glossary`
--> src/lib.rs:75:17
|
------------
61 | glossary: &GigabyteMap)
`'static` to the BTreeMap<String,
help: add explicit lifetime
type of `glossary`: `&'static String>`
...
75| glossary))
2
3
4
5
6
7
8
9
10
Rust抱怨我们传递给spawn
的闭包的生命周期有问题,而编译器给出的这个 “帮助” 信息实际上没什么用。
spawn
会启动独立的线程。Rust无法知道子线程会运行多长时间,所以它会做最坏的假设:它假设子线程可能在父线程结束且父线程中的所有值都消失之后仍然继续运行。显然,如果子线程要运行那么久,它所运行的闭包也得持续那么久。但是这个闭包的生命周期是有限的:它依赖于glossary
这个引用,而引用不会一直存在。
注意,Rust拒绝这段代码是正确的!按照我们编写这个函数的方式,有可能一个线程遇到I/O错误,导致process_files_in_parallel
在其他线程完成之前就提前退出。这样子线程可能会在主线程释放glossary
之后还尝试使用它。这会产生竞态条件,如果主线程先完成,就会导致未定义行为。Rust不允许出现这种情况。
看起来spawn
的设计过于宽泛,不支持跨线程共享引用。实际上,我们在 “窃取所有权的闭包” 中已经遇到过类似的情况。在那里,我们的解决方案是使用move
闭包将数据的所有权转移到新线程。但在这里这种方法行不通,因为我们有多个线程都需要使用相同的数据。一个安全的替代方法是为每个线程克隆整个术语表,但由于术语表很大,我们希望避免这种做法。幸运的是,标准库提供了另一种方式:原子引用计数。
我们在 “Rc和Arc:共享所有权” 中介绍过Arc
。现在是时候使用它了:
use std::sync::Arc;
fn process_files_in_parallel(filenames: Vec<String>,
glossary: Arc<GigabyteMap>)
-> io::Result<()>
{
...
for worklist in worklists {
// 这个.clone()调用只会克隆Arc并增加引用计数,不会克隆GigabyteMap。
let glossary_for_child = glossary.clone();
thread_handles.push(
spawn(move || process_files(worklist,
&glossary_for_child))
);
}
...
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
我们修改了glossary
的类型:为了并行运行分析,调用者必须传入一个Arc<GigabyteMap>
,这是一个指向已移动到堆上的GigabyteMap
的智能指针,可以通过Arc::new(giga_map)
来创建。
当我们调用glossary.clone()
时,我们只是复制了Arc
智能指针,而不是整个GigabyteMap
。这相当于增加了引用计数。
通过这个修改,程序可以编译并运行,因为它不再依赖于引用的生命周期。只要有任何线程持有Arc<GigabyteMap>
,它就会使GigabyteMap
保持存活,即使父线程提前退出也没问题。而且不会有任何数据竞态,因为Arc
中的数据是不可变的。
# Rayon
标准库中的spawn
函数是一个重要的基础函数,但它并非专门为分治并行设计。在它之上,已经构建了更适合分治并行的API。例如,在第2章中,我们使用了Crossbeam
库将一些工作分配到八个线程中。
Crossbeam
的作用域线程(scoped threads)对分治并行的支持非常自然。
由Niko Matsakis和Josh Stone开发的Rayon
库是另一个例子。它提供了两种并发运行任务的方式:
use rayon::prelude::*;
// "并行执行两个任务"
let (v1, v2) = rayon::join(fn1, fn2);
// "并行执行N个任务"
giant_vector.par_iter().for_each(|value| {
do_thing_with_value(value);
});
2
3
4
5
6
7
8
9
rayon::join(fn1, fn2)
会同时调用这两个函数并返回它们的结果。.par_iter()
方法会创建一个ParallelIterator
,它拥有map
、filter
等方法,与Rust的Iterator
很相似。在这两种情况下,Rayon
会尽可能地使用它自己的工作线程池来分配工作。你只需告诉Rayon
哪些任务可以并行完成,Rayon
会管理线程并尽可能优化地分配工作。
图19 - 3中的示意图展示了理解giant_vector.par_iter().for_each(...)
调用的两种方式。(a) Rayon
的行为就好像为向量中的每个元素都创建了一个线程。(b) 在幕后,Rayon
为每个CPU核心分配一个工作线程,这样效率更高。这个工作线程池由程序中的所有线程共享。当同时有数千个任务到来时,Rayon
会对工作进行划分。
图19 - 3. Rayon在理论和实践中的情况
下面是使用Rayon
实现的process_files_in_parallel
版本,这里的process_file
函数接受的参数是&str
,而不是Vec<String>
:
use rayon::prelude::*;
fn process_files_in_parallel(filenames: Vec<String>, glossary: &GigabyteMap)
-> io::Result<()>
{
filenames.par_iter()
.map(|filename| process_file(filename, glossary))
.reduce_with(|r1, r2| {
if r1.is_err() { r1 } else { r2 }
})
.unwrap_or(Ok(()))
}
2
3
4
5
6
7
8
9
10
11
12
这段代码比使用std::thread::spawn
的版本更简短,也更简单。我们逐行分析一下:
- 首先,我们使用
filenames.par_iter()
创建一个并行迭代器。 - 接着使用
.map()
对每个文件名调用process_file
。这会生成一个包含一系列io::Result<()>
值的并行迭代器。 - 然后使用
.reduce_with()
来合并结果。在这里,如果有错误,我们保留第一个错误,丢弃其他错误。如果我们想要累积或打印所有错误,也可以在这里实现。当你传递给.map()
的闭包在成功时返回一个有用的值时,.reduce_with()
方法也很有用。这时,你可以给.reduce_with()
传递一个闭包,用于合并两个成功的结果。 reduce_with
返回一个Option
,只有当filenames
为空时才为None
。我们使用Option
的.unwrap_or()
方法,在这种情况下将结果设为Ok(())
。
在幕后,Rayon
使用一种称为工作窃取(work - stealing)的技术在各个线程之间动态平衡工作负载。通常,它在让所有CPU保持忙碌方面,比我们在 “spawn和join” 中手动提前划分工作的效果更好。
另外,Rayon
支持跨线程共享引用。在幕后进行的任何并行处理,在reduce_with
返回时都能保证完成。这就解释了为什么我们可以将glossary
传递给process_file
,即使这个闭包会在多个线程上被调用。
(顺便说一句,我们使用了map
方法和reduce
方法并非偶然。由谷歌和Apache Hadoop推广的MapReduce编程模型与分治并行有很多共同之处。它可以被看作是一种查询分布式数据的分治并行方法。)
# 回顾曼德布洛特集合绘制
在第2章中,我们使用分治并发来绘制曼德布洛特集合。这使得渲染速度提高了四倍,虽然令人印象深刻,但考虑到我们让程序启动了八个工作线程并在一个八核心的机器上运行,其实还可以更快!
问题在于我们没有均匀地分配工作负载。计算图像的一个像素相当于运行一个循环(参见 “曼德布洛特集合的实际原理”)。结果发现,图像中浅灰色部分的循环很快就会退出,渲染起来比黑色部分要快得多,黑色部分的循环需要完整运行255次迭代。所以,尽管我们将图像区域划分为大小相等的水平条带,但实际上创建了不均衡的工作负载,如图19 - 4所示。
图19 - 4. 曼德布洛特程序中不均衡的工作分配
使用Rayon
很容易解决这个问题。我们可以为输出图像的每一行像素启动一个并行任务。这样会创建几百个任务,Rayon
可以将这些任务分配到各个线程中。由于采用了工作窃取技术,任务大小不同也没关系。Rayon
会在运行过程中平衡工作负载。
下面是代码。第一行和最后一行是我们在 “一个并发的曼德布洛特程序” 中展示的main
函数的一部分,但我们修改了中间的渲染代码:
let mut pixels = vec![0; bounds.0 * bounds.1];
// 对`pixels`按水平条带进行切片的作用域
{
let bands: Vec<(usize, &mut [u8])> = pixels
.chunks_mut(bounds.0)
.enumerate()
.collect();
bands.into_par_iter()
.for_each(|(i, band)| {
let top = i;
let band_bounds = (bounds.0, 1);
let band_upper_left = pixel_to_point(bounds, (0, top), upper_left, lower_right);
let band_lower_right = pixel_to_point(bounds, (bounds.0, top + 1), upper_left, lower_right);
render(band, band_bounds, band_upper_left, band_lower_right);
});
}
write_image(&args[1], &pixels, bounds).expect("error writing PNG file");
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
首先,我们创建bands
,这是一个任务集合,我们将把它传递给Rayon
。每个任务只是一个(usize, &mut [u8])
类型的元组:行号(因为计算需要这个信息)和要填充的像素切片。我们使用chunks_mut
方法将图像缓冲区按行分割,使用enumerate
为每一行附上一个行号,再使用collect
将所有的行号 - 切片对收集到一个向量中(我们需要一个向量,因为Rayon
仅从数组和向量创建并行迭代器)。
接下来,我们将bands
转换为并行迭代器,并使用.for_each()
方法告诉Rayon
我们想要完成的工作。
因为我们使用了Rayon
,所以必须在main.rs
中添加这一行:
use rayon::prelude::*;
并在Cargo.toml
中添加:
[dependencies]
rayon = "1"
2
通过这些修改,这个程序现在在一个8核心的机器上可以利用大约7.75个核心。比我们之前手动划分工作时快了75%。而且代码也更简短了一些,这体现了让库来完成工作(工作分配)而不是我们自己去做的好处。
# 通道
通道是一种用于在不同线程间单向传递值的机制。换句话说,它是一个线程安全的队列。图19 - 5展示了通道的使用方式。它有点像Unix管道:一端用于发送数据,另一端用于接收数据。这两端通常由两个不同的线程持有。不过,Unix管道用于发送字节,而Rust中的通道用于发送Rust值。
sender.send(item)
将单个值放入通道;receiver.recv()
则从通道中取出一个值。值的所有权会从发送线程转移到接收线程。如果通道为空,receiver.recv()
会阻塞,直到有值被发送进来。
)
图19 - 5. 用于传递String的通道:字符串msg的所有权从线程1转移到线程2
通过通道,线程可以通过相互传递值进行通信。这是一种线程间协作的简单方式,无需使用锁或共享内存。
这并不是一种新技术。Erlang已经有独立的进程和消息传递机制长达30年之久。Unix管道也存在了近50年。我们通常认为管道提供了灵活性和可组合性,而非并发功能,但实际上,它兼具这些特性。图19 - 6展示了一个Unix管道的示例。实际上,这三个程序完全可以同时工作。
Rust通道比Unix管道更快。发送一个值时是移动该值而不是复制它,即使移动包含数兆字节数据的数据结构,这种移动操作也很快。
图19 - 6. Unix管道的执行过程
# 发送值
在接下来的几个部分中,我们将使用通道构建一个并发程序,用于创建倒排索引,这是搜索引擎的关键组成部分之一。每个搜索引擎都针对特定的文档集合进行工作。倒排索引是一个数据库,用于记录哪些单词出现在哪些文档中。
我们将展示代码中与线程和通道相关的部分。完整的程序很短,总共大约一千行代码。
我们的程序结构是一个流水线,如图19 - 7所示。流水线只是使用通道的众多方式之一,后面我们还会讨论其他几种用法,但它是将并发引入现有单线程程序的一种直接方式。
我们总共会使用五个线程,每个线程执行不同的任务。在程序的整个生命周期中,每个线程都会持续产生输出。例如,第一个线程的任务是将磁盘上的源文档逐个读取到内存中(我们让一个线程来做这件事,是因为这里我们要编写尽可能简单的代码,使用fs::read_to_string
,这是一个阻塞式API。我们不希望在磁盘工作时CPU处于空闲状态)。这个阶段的输出是每个文档对应的一个长字符串,所以这个线程通过一个用于传递字符串的通道与下一个线程相连。
图19 - 7. 索引构建流水线,箭头表示通过通道从一个线程发送到另一个线程的值(未展示磁盘I/O)
我们的程序将从启动读取文件的线程开始。假设documents
是一个Vec<PathBuf>
类型的向量,包含文件名。启动文件读取线程的代码如下:
use std::{fs, thread};
use std::sync::mpsc;
let (sender, receiver) = mpsc::channel();
let handle = thread::spawn(move || {
for filename in documents {
let text = fs::read_to_string(filename)?;
if sender.send(text).is_err() {
break;
}
}
Ok(())
});
2
3
4
5
6
7
8
9
10
11
12
13
通道是std::sync::mpsc
模块的一部分。我们稍后会解释这个名称的含义,首先来看这段代码的工作原理。我们首先创建一个通道:
let (sender, receiver) = mpsc::channel();
channel
函数返回一对值:一个发送端和一个接收端。底层的队列数据结构是标准库未公开的实现细节。
通道是有类型的。我们打算用这个通道发送每个文件的文本内容,所以我们有一个类型为Sender<String>
的发送端和一个类型为Receiver<String>
的接收端。我们本可以通过mpsc::channel::<String>()
显式指定这是一个用于传递字符串的通道,但这里我们让Rust的类型推断来确定。
let handle = thread::spawn(move || {
和之前一样,我们使用std::thread::spawn
启动一个线程。通过这个move
闭包,sender
(而非receiver
)的所有权被转移到新线程中。
接下来的几行代码只是从磁盘读取文件:
for filename in documents {
let text = fs::read_to_string(filename)?;
2
成功读取文件后,我们将文件的文本内容发送到通道中:
if sender.send(text).is_err() {
break;
}
2
3
sender.send(text)
将text
这个值移动到通道中。最终,它会再次被移动到接收该值的线程中。无论text
包含10行文本还是10兆字节的数据,这个操作只复制三个机器字(String
结构体的大小),相应的receiver.recv()
调用也只会复制三个机器字。send
和recv
方法都返回Result
,但只有在通道的另一端被丢弃时,这些方法才会失败。如果Receiver
被丢弃,send
调用会失败,因为否则这个值会永远留在通道中:没有Receiver
,任何线程都无法接收它。同样,如果通道中没有等待的值且Sender
被丢弃,recv
调用也会失败,因为否则recv
会永远等待:没有Sender
,任何线程都无法发送下一个值。丢弃通道的一端是正常的 “挂断” 方式,即当你使用完通道后关闭连接。
在我们的代码中,sender.send(text)
只有在接收端所在线程提前退出时才会失败。这在使用通道的代码中很常见。无论这种情况是有意为之还是由错误导致的,对于我们的读取线程来说,安静地关闭自身都是可以接受的。
当发生这种情况,或者线程读完所有文档后,它会返回Ok(())
:
Ok(())
});
2
注意,这个闭包返回一个Result
。如果线程遇到I/O错误,它会立即退出,并且错误会存储在线程的JoinHandle
中。
当然,和其他编程语言一样,在错误处理方面Rust也有很多其他选择。发生错误时,我们可以使用println!
打印错误信息,然后继续处理下一个文件。我们也可以通过用于传输数据的同一个通道传递错误,使其成为一个传递Result
的通道,或者专门创建第二个通道来传递错误。我们这里选择的方法既简洁又可靠:我们可以使用?
操作符,这样就没有一堆样板代码,甚至不像在Java中那样需要显式的try/catch
语句,但错误也不会被悄悄忽略。
为了方便,我们的程序将所有这些代码封装在一个函数中,该函数返回receiver
(我们还未使用它)和新线程的JoinHandle
:
fn start_file_reader_thread(documents: Vec<PathBuf>)
-> (mpsc::Receiver<String>,
thread::JoinHandle<io::Result<()>>)
{
let (sender, receiver) = mpsc::channel();
let handle = thread::spawn(move || {
...
});
(receiver, handle)
}
2
3
4
5
6
7
8
9
10
注意,这个函数启动新线程后会立即返回。我们会为流水线的每个阶段编写一个这样的函数。
# 接收值
现在我们有一个线程在运行一个发送值的循环。我们可以启动第二个线程,该线程运行一个调用receiver.recv()
的循环:
while let Ok(text) = receiver.recv() {
do_something_with(text);
}
2
3
不过,Receiver
是可迭代的,所以有一种更简洁的写法:
for text in receiver {
do_something_with(text);
}
2
3
这两个循环是等效的。不管我们采用哪种写法,如果当控制流到达循环顶部时通道恰好为空,接收线程将阻塞,直到其他线程发送一个值。当通道为空且Sender
已被丢弃时,循环将正常退出。在我们的程序中,当读取线程退出时,这种情况会自然发生。该线程运行的闭包拥有变量sender
,当闭包退出时,sender
就会被丢弃。
现在我们可以编写流水线第二阶段的代码:
fn start_file_indexing_thread(texts: mpsc::Receiver<String>)
-> (mpsc::Receiver<InMemoryIndex>, thread::JoinHandle<()>)
{
let (sender, receiver) = mpsc::channel();
let handle = thread::spawn(move || {
for (doc_id, text) in texts.into_iter().enumerate() {
let index =
InMemoryIndex::from_single_document(doc_id, text);
if sender.send(index).is_err() {
break;
}
}
});
(receiver, handle)
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
这个函数启动一个线程,该线程从一个通道(texts
)接收String
值,并将InMemoryIndex
值发送到另一个通道(sender/receiver
)。这个线程的任务是将第一阶段加载的每个文件转换为一个小型的、内存中的单文件倒排索引。
该线程的主循环很简单。对文档进行索引的所有工作都由InMemoryIndex::from_single_document
函数完成。我们这里不展示它的源代码,但它会在单词边界处分割输入字符串,然后生成一个从单词到位置列表的映射。
这个阶段不执行I/O操作,所以它无需处理io::Error
。它返回的不是io::Result<()>
,而是()
。
# 运行流水线
剩下的三个阶段在设计上类似。每个阶段都使用前一个阶段创建的Receiver
。我们流水线其余部分的目标是将所有小索引合并到磁盘上的一个大型索引文件中。我们发现实现这一目标最快的方法需要三个阶段。我们这里不展示代码,只给出这三个函数的类型签名。完整的源代码可以在网上找到 。
首先,我们在内存中合并索引,直到它们变得难以处理(阶段3):
fn start_in_memory_merge_thread(file_indexes:
mpsc::Receiver<InMemoryIndex>)
-> (mpsc::Receiver<InMemoryIndex>, thread::JoinHandle<()>)
2
3
然后,我们将这些大型索引写入磁盘(阶段4):
fn start_index_writer_thread(big_indexes: mpsc::Receiver<InMemoryIndex>,
output_dir: &Path)
-> (mpsc::Receiver<PathBuf>,
thread::JoinHandle<io::Result<()>>)
2
3
4
最后,如果我们有多个大型文件,我们使用基于文件的合并算法将它们合并(阶段5):
fn merge_index_files(files: mpsc::Receiver<PathBuf>, output_dir: &Path)
-> io::Result<()>
2
最后这个阶段不返回Receiver
,因为它是流水线的终点。它在磁盘上生成一个单一的输出文件。它也不返回JoinHandle
,因为我们没有为这个阶段启动线程,工作在调用者线程上完成。
现在来看启动线程并检查错误的代码:
fn run_pipeline(documents: Vec<PathBuf>, output_dir: PathBuf)
-> io::Result<()>
{
// 启动流水线的所有五个阶段。
let (texts, h1) = start_file_reader_thread(documents);
let (pints, h2) = start_file_indexing_thread(texts);
let (gallons, h3) = start_in_memory_merge_thread(pints);
let (files, h4) = start_index_writer_thread(gallons, &output_dir);
let result = merge_index_files(files, &output_dir);
// 等待线程完成,捕获它们遇到的任何错误。
let r1 = h1.join().unwrap();
h2.join().unwrap();
h3.join().unwrap();
let r4 = h4.join().unwrap();
// 如果有错误,返回遇到的第一个错误。
// (碰巧的是,h2和h3不会失败:这些线程只是进行纯内存数据处理。)
r1?;
r4?;
result
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
和之前一样,我们使用.join().unwrap()
将子线程中的恐慌(panic)显式地传播到主线程。
这里唯一不同寻常的地方是,我们没有立即使用?
,而是先保留io::Result
值,直到我们等待所有四个线程都完成。
这个流水线比等效的单线程程序快40%。对于一下午的工作成果来说,这还不错,但与曼德布洛特程序675%的提速相比,就显得微不足道了。显然,我们既没有充分利用系统的I/O能力,也没有让所有CPU核心都饱和运行。这是怎么回事呢?
流水线就像制造工厂中的装配线:性能受限于最慢阶段的吞吐量。一条全新的、未经优化的装配线可能和单件生产一样慢,但装配线在经过有针对性的优化后会有更好的表现。在我们的例子中,测量结果表明第二阶段是瓶颈。我们的索引线程使用了.to_lowercase()
和.is_alphanumeric()
,所以它花费了大量时间在Unicode表中查找。索引阶段下游的其他阶段大部分时间都在Receiver::recv
中等待输入。
这意味着我们应该可以让程序运行得更快。随着我们解决这些瓶颈问题,并行度将会提高。既然你已经知道如何使用通道,并且我们的程序由相互独立的代码片段组成,那么就很容易想到解决第一个瓶颈的方法。我们可以像优化其他代码一样,手动优化第二阶段的代码;将工作分解为两个或更多阶段;或者同时运行多个文件索引线程。
# 通道特性和性能
std::sync::mpsc
中的mpsc
代表 “多生产者,单消费者”(multiproducer, single-consumer),这简洁地描述了Rust通道所提供的通信类型。
我们示例程序中的通道将值从单个发送者传递到单个接收者。这是一种相当常见的情况。但Rust通道也支持多个发送者,例如,当你需要一个线程处理来自多个客户端线程的请求时,就可以使用这种特性,如图19 - 8所示。
图19 - 8. 一个通道接收来自多个发送者的请求
Sender<T>
实现了Clone
特性。要创建一个有多个发送者的通道,只需创建一个普通通道,然后根据需要克隆发送者。你可以将每个Sender
值移动到不同的线程中。
Receiver<T>
不能被克隆,所以如果你需要多个线程从同一个通道接收值,就需要使用Mutex
。我们将在本章后面展示如何实现。
Rust通道经过了精心优化。当首次创建通道时,Rust使用一种特殊的 “一次性” 队列实现。如果你只通过通道发送一个对象,开销极小。如果你发送第二个值,Rust会切换到另一种队列实现。实际上,这是为了长期使用做准备,让通道在传输大量值的同时尽量减少分配开销。如果你克隆了Sender
,Rust必须使用另一种实现,这种实现能确保在多个线程同时发送值时的安全性。
但即使是这三种实现中最慢的一种,也是无锁队列,所以发送或接收一个值最多只涉及几个原子操作和一次堆分配,再加上值的移动操作本身。只有当队列空了,接收线程需要进入睡眠状态时,才需要进行系统调用。当然,在这种情况下,你的通道流量无论如何都没有达到最大值。
尽管进行了这些优化,但在通道性能方面,应用程序很容易犯一个错误:发送值的速度比接收和处理值的速度快。这会导致通道中积压的值越来越多。例如,在我们的程序中,我们发现文件读取线程(阶段1)加载文件的速度比文件索引线程(阶段2)对文件进行索引的速度快得多。结果是,数百兆字节的原始数据会一次性从磁盘读取并塞进队列中。
这种不当行为会消耗内存并影响局部性。更糟糕的是,发送线程会持续运行,在接收端最需要资源的时候,却消耗CPU和其他系统资源来发送更多的值。
在这方面,Rust借鉴了Unix管道的做法。Unix使用一种巧妙的技巧来提供某种背压(backpressure),迫使快速的发送者慢下来:Unix系统上的每个管道都有一个固定的大小,如果一个进程试图写入一个已满的管道,系统会阻塞该进程,直到管道有空间为止。Rust中与之对应的是同步通道:
use std::sync::mpsc;
let (sender, receiver) = mpsc::sync_channel(1000);
2
3
同步通道与普通通道完全一样,只是在创建时,你需要指定它能容纳多少个值。对于同步通道,sender.send(value)
可能是一个阻塞操作。毕竟,阻塞并不总是坏事。在我们的示例程序中,将start_file_reader_thread
中的通道改为能容纳32个值的同步通道后,在我们的基准数据集上,内存使用量减少了三分之二,而吞吐量并没有下降。
# 线程安全性:Send 和 Sync
到目前为止,我们的操作就好像所有的值都可以在不同线程之间自由移动和共享。在很大程度上确实如此,但Rust完整的线程安全机制依赖于两个内置特性:std::marker::Send
和std::marker::Sync
。
- 实现了
Send
的类型可以安全地按值传递给另一个线程,它们可以在不同线程之间移动。 - 实现了
Sync
的类型可以安全地通过不可变引用传递给另一个线程,它们可以在不同线程之间共享。
这里所说的 “安全”,和我们一直以来的意思一样:没有数据竞争和其他未定义行为。
例如,在process_files_in_parallel
示例中,我们使用闭包将Vec<String>
从父线程传递到每个子线程。当时我们没有指出这一点,但这意味着向量及其包含的字符串是在父线程中分配的,却在子线程中释放。Vec<String>
实现了Send
,这是一个API保证,表明这种操作是没问题的:Vec
和String
内部使用的分配器是线程安全的。
(如果你要编写自己的Vec
和String
类型,使用快速但非线程安全的分配器,那么你必须使用诸如不安全指针这样的非Send
类型来实现它们。Rust会推断出你的NonThreadSafeVec
和NonThreadSafeString
类型不是Send
,并将它们限制在单线程环境中使用。但这种情况很少见。)
如图19 - 9所示,大多数类型既是Send
又是Sync
。你甚至无需使用#[derive]
为程序中的结构体和枚举获得这些特性,Rust会自动为你处理。如果结构体或枚举的字段是Send
,那么该结构体或枚举就是Send
;如果字段是Sync
,那么它就是Sync
。
有些类型是Send
但不是Sync
。这通常是有意为之,比如mpsc::Receiver
,它保证了mpsc
通道的接收端同一时间只被一个线程使用。
少数既不是Send
也不是Sync
的类型,大多是以一种线程不安全的方式使用了可变性的类型。例如,考虑std::rc::Rc<T>
,即引用计数智能指针类型。
图19 - 9. Send和Sync类型
如果Rc<String>
是Sync
类型,允许线程通过共享引用共享单个Rc
会发生什么呢?如图19 - 10所示,如果两个线程碰巧同时尝试克隆Rc
,就会出现数据竞争,因为两个线程都会增加共享的引用计数。引用计数可能会变得不准确,导致之后出现释放后使用(use - after - free)或双重释放(double free)的情况,这属于未定义行为。
图19 - 10. 为什么Rc<String>既不是Sync也不是Send
当然,Rust会阻止这种情况发生。下面是设置这个数据竞争的代码:
use std::thread;
use std::rc::Rc;
fn main() {
let rc1 = Rc::new("ouch".to_string());
let rc2 = rc1.clone();
thread::spawn(move || { // 错误
rc2.clone();
});
rc1.clone();
}
2
3
4
5
6
7
8
9
10
11
Rust会拒绝编译这段代码,并给出详细的错误信息:
error[E0277]: `Rc<String>` cannot be sent between threads safely
--> concurrency_send_rc.rs:10:5
|
^^^^^ `Rc<String>` cannot be sent between
10 | thread::spawn(move || { // error
threads safely
|
= help: the trait `std::marker::Send` is not implemented for
`Rc<String>`
= note: required because it appears within the type
`[closure@... ]`
= note: required by `std::thread::spawn`
2
3
4
5
6
7
8
9
10
11
12
现在你可以看到Send
和Sync
是如何帮助Rust强制实现线程安全的。它们作为函数类型签名中的限定条件,出现在跨线程边界传输数据的函数中。当你启动一个线程时,传递的闭包必须是Send
,这意味着它包含的所有值都必须是Send
。同样,如果你想通过通道将值发送到另一个线程,这些值也必须是Send
。
# 几乎可以将任何迭代器连接到通道
我们的倒排索引构建程序是按照流水线的方式构建的。代码足够清晰,但我们需要手动设置通道并启动线程。相比之下,我们在第15章构建的迭代器流水线只用了几行代码就完成了大量工作。我们能否为线程流水线构建类似的东西呢?
实际上,如果我们能将迭代器流水线和线程流水线统一起来就好了。这样我们的索引构建程序就可以写成迭代器流水线的形式。它可能会像这样开始:
documents.into_iter()
.map(read_whole_file)
.errors_to(error_sender) // 过滤掉错误结果
.off_thread() // 为上述操作启动一个线程
.map(make_single_file_index)
.off_thread() // 为第二阶段再启动一个线程
...
2
3
4
5
6
7
特性允许我们为标准库类型添加方法,所以实际上我们可以做到这一点。我们首先编写一个声明所需方法的特性:
use std::sync::mpsc;
pub trait OffThreadExt: Iterator {
/// 将这个迭代器转换为一个跨线程迭代器:`next()`调用在一个单独的工作线程上进行,这样迭代器和循环体就可以并发运行。
fn off_thread(self) -> mpsc::IntoIter<Self::Item>;
}
2
3
4
5
6
然后我们为迭代器类型实现这个特性。mpsc::Receiver
已经是可迭代的,这对我们很有帮助:
use std::thread;
impl<T> OffThreadExt for T
where
T: Iterator + Send + 'static,
T::Item: Send + 'static
{
fn off_thread(self) -> mpsc::IntoIter<Self::Item> {
// 创建一个通道,用于从工作线程传输项。
let (sender, receiver) = mpsc::sync_channel(1024);
// 将这个迭代器移动到一个新的工作线程并在那里运行。
thread::spawn(move || {
for item in self {
if sender.send(item).is_err() {
break;
}
}
});
// 返回一个从通道中提取值的迭代器。
receiver.into_iter()
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
这段代码中的where
子句是通过类似于 “逆向推导限定条件” 中描述的过程确定的。一开始,我们只写了这样的代码:impl<T> OffThreadExt for T
,也就是说,我们希望这个实现适用于所有迭代器。但Rust并不认可。因为我们使用spawn
将类型为T
的迭代器移动到一个新线程,所以必须指定T: Iterator + Send + 'static
。又因为我们要通过通道将项发送回来,所以必须指定T::Item: Send + 'static
。有了这些修改,Rust就满意了。
这就是Rust的特点:我们可以自由地为语言中的几乎每个迭代器添加一个并发功能工具,但前提是必须先理解并记录那些确保其安全使用的限制条件。
# 流水线之外的应用
在本节中,我们以流水线为例,因为流水线是使用通道的一种直观且不错的方式。大家都能理解流水线,它们具体、实用且具有确定性。不过,通道的用途不止于流水线,它还是一种快速、便捷的方式,能为同一进程中的其他线程提供异步服务。
例如,假设你想在单独的线程上进行日志记录,如图19 - 8所示。其他线程可以通过通道将日志消息发送给日志记录线程;由于通道的Sender
可以克隆,许多客户端线程都可以拥有发送者,将日志消息发送到同一个日志记录线程。
在单独的线程上运行像日志记录这样的服务有诸多优点。日志记录线程可以在需要时轮换日志文件,无需与其他线程进行复杂的协调,其他线程也不会被阻塞。在日志记录线程恢复工作之前,消息会在通道中无害地积累一段时间。
通道还可用于一个线程向另一个线程发送请求并需要获取某种响应的场景。第一个线程的请求可以是一个结构体或元组,其中包含一个Sender
,就像是一个写有自己地址的信封,第二个线程用它来发送回复。这并不意味着这种交互必须是同步的。第一个线程可以决定是阻塞等待响应,还是使用.try_recv()
方法轮询响应。
到目前为止,我们介绍的工具 —— 用于高度并行计算的分治并行,以及用于松散连接组件的通道,足以满足广泛的应用场景。但我们的介绍还未结束。
# 共享可变状态
在你发布了第8章中的fern_sim
库后的几个月里,你的蕨类植物模拟软件大受欢迎。现在,你正在开发一款多人实时战略游戏,八名玩家在模拟的侏罗纪景观中竞相培育高度逼真的史前蕨类植物。这个游戏的服务器是一个大规模并行应用程序,许多线程会不断收到请求。那么,这些线程如何协调,以便在有八名玩家准备好时立即开始游戏呢?
这里要解决的问题是,许多线程需要访问一个共享的等待加入游戏的玩家列表。这些数据必然是可变的,并且需要在所有线程之间共享。如果Rust不支持共享可变状态,那我们该怎么办呢?
你可以通过创建一个新线程来解决这个问题,这个线程的唯一任务就是管理这个列表。其他线程通过通道与它进行通信。当然,这会占用一个线程,会带来一些操作系统开销。
另一种选择是使用Rust提供的用于安全共享可变数据的工具。这些工具确实存在,它们是底层原语,任何使用过线程的系统程序员都很熟悉。在本节中,我们将介绍互斥锁(mutex)、读写锁、条件变量和原子整数。最后,我们将展示如何在Rust中实现全局可变变量。
# 什么是互斥锁?
互斥锁(或锁)用于强制多个线程在访问特定数据时依次进行。我们将在下一节介绍Rust中的互斥锁。首先,回顾一下其他语言中的互斥锁是很有意义的。在C++中,互斥锁的一个简单用法可能如下:
// C++代码,并非Rust代码
void FernEmpireApp::JoinWaitingList(PlayerId player) {
mutex.Acquire();
waitingList.push_back(player);
// 如果等待的玩家数量足够,就开始一场游戏
if (waitingList.size() >= GAME_SIZE) {
vector<PlayerId> players;
waitingList.swap(players);
StartGame(players);
}
mutex.Release();
}
2
3
4
5
6
7
8
9
10
11
12
在这段代码中,mutex.Acquire()
和mutex.Release()
调用标记了临界区的开始和结束。对于程序中的每个互斥锁,同一时间只有一个线程可以在临界区内运行。如果一个线程处于临界区,所有其他调用mutex.Acquire()
的线程将被阻塞,直到第一个线程执行到mutex.Release()
。
我们说互斥锁保护数据:在这个例子中,互斥锁保护waitingList
。不过,确保每个线程在访问数据之前都获取互斥锁,并在之后释放它,是程序员的责任。
互斥锁很有用,原因如下:
- 它们可以防止数据竞争,即多个线程同时读写相同内存的情况。在C++和Go中,数据竞争属于未定义行为。像Java和C#这样的托管语言承诺不会崩溃,但数据竞争的结果仍然(概括来说)毫无意义。
- 即使不存在数据竞争,即使所有的读写操作都按程序顺序依次进行,如果没有互斥锁,不同线程的操作也可能以任意方式交错。想象一下,编写一段代码,即使在其他线程修改其数据时仍能正常工作,再想象一下调试这样的代码。这就好像你的程序被 “幽灵” 困扰了一样。
- 互斥锁支持基于不变量的编程,不变量是关于受保护数据的规则,在你设置数据时这些规则就成立,并且每个临界区都要维护这些规则。
当然,这些原因本质上是一样的:不受控制的竞态条件会让编程变得难以处理。互斥锁为混乱的情况带来了一定的秩序(尽管不如通道或分治并行带来的秩序多)。
然而,在大多数语言中,互斥锁很容易用错。在C++以及大多数语言中,数据和锁是分开的对象。理想情况下,注释会说明每个线程在访问数据之前必须获取互斥锁:
class FernEmpireApp {
...
private:
// 等待加入游戏的玩家列表。由`mutex`保护。
vector<PlayerId> waitingList;
// 在读取或写入`waitingList`之前必须获取的锁。
Mutex mutex;
...
};
2
3
4
5
6
7
8
9
但即使有这样详细的注释,编译器也无法强制进行安全访问。当一段代码忽略获取互斥锁时,就会出现未定义行为。在实际情况中,这意味着会出现极难重现和修复的错误。
即使在Java中,对象和互斥锁之间存在某种概念上的关联,但这种关联并不紧密。编译器不会尝试强制执行,实际上,由锁保护的数据很少恰好是相关对象的字段,它通常包含多个对象中的数据。锁定方案仍然很复杂,注释仍然是强制执行的主要手段。
# Mutex<T>
现在我们将展示在Rust中等待列表的实现。在我们的《蕨类帝国》游戏服务器中,每个玩家都有一个唯一的ID:
type PlayerId = u32;
等待列表只是一个玩家集合:
const GAME_SIZE: usize = 8;
/// 等待列表中的玩家数量永远不会超过GAME_SIZE。
type WaitingList = Vec<PlayerId>;
2
3
等待列表作为FernEmpireApp
的一个字段存储,FernEmpireApp
是一个单例,在服务器启动时被设置在一个Arc
中。每个线程都有一个指向它的Arc
。它包含了我们程序所需的所有共享配置和其他杂项。其中大部分是只读的。由于等待列表既是共享的又是可变的,所以必须用Mutex
来保护:
use std::sync::Mutex;
/// 所有线程都可以共享访问这个大的上下文结构体。
struct FernEmpireApp {
...
waiting_list: Mutex<WaitingList>,
...
}
2
3
4
5
6
7
8
与C++不同,在Rust中,受保护的数据存储在Mutex
内部。设置Mutex
的代码如下:
use std::sync::Arc;
let app = Arc::new(FernEmpireApp {
...
waiting_list: Mutex::new(vec![]),
});
...
2
3
4
5
6
7
创建一个新的Mutex
看起来就像创建一个新的Box
或Arc
,但Box
和Arc
表示堆分配,而Mutex
仅用于锁定。如果你希望Mutex
在堆上分配,就必须明确指定,就像我们在这里所做的,对整个app
使用Arc::new
,对受保护的数据使用Mutex::new
。这些类型通常一起使用:Arc
便于在不同线程之间共享数据,Mutex
则便于处理在不同线程之间共享的可变数据 。
现在我们可以实现使用互斥锁的join_waiting_list
方法:
impl FernEmpireApp {
/// 将一个玩家添加到下一场游戏的等待列表中。
/// 如果等待的玩家数量足够,则立即开始一场新游戏。
fn join_waiting_list(&self, player: PlayerId) {
// 锁定互斥锁并获取对内部数据的访问权。
// `guard`的作用域就是一个临界区。
let mut guard = self.waiting_list.lock().unwrap();
// 现在进行游戏逻辑处理。
guard.push(player);
if guard.len() == GAME_SIZE {
let players = guard.split_off(0);
self.start_game(players);
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
访问数据的唯一方法是调用.lock()
方法:
let mut guard = self.waiting_list.lock().unwrap();
self.waiting_list.lock()
会阻塞,直到获取到互斥锁。这个方法调用返回的MutexGuard<WaitingList>
值是对&mut WaitingList
的一个简单包装。多亏了之前讨论过的解引用强制转换,我们可以直接在guard
上调用WaitingList
的方法:
guard.push(player);
guard
甚至允许我们借用对底层数据的直接引用。Rust的生命周期系统确保这些引用不会比guard
本身的生命周期更长。不持有锁就无法访问Mutex
中的数据。
当guard
被丢弃时,锁就会被释放。通常情况下,这发生在代码块的末尾,但你也可以手动丢弃它:
if guard.len() == GAME_SIZE {
let players = guard.split_off(0);
drop(guard); // 在开始游戏时不要一直锁定列表
self.start_game(players);
}
2
3
4
5
# mut和Mutex
我们的join_waiting_list
方法没有以可变引用的方式获取self
,这可能看起来很奇怪,至少我们一开始是这么觉得的。它的类型签名是:
fn join_waiting_list(&self, player: PlayerId)
底层的集合Vec<PlayerId>
,在调用其push
方法时确实需要可变引用。它的类型签名是:
pub fn push(&mut self, item: T)
然而,这段代码编译和运行都没有问题。这是怎么回事呢?
在Rust中,&mut
意味着独占访问,而普通的&
意味着共享访问。
我们习惯了类型从父对象到子对象、从容器到内容传递&mut
访问权限。通常只有在一开始就拥有对starships
的&mut
引用时,才期望能够调用starships[id].engine
的&mut self
方法(或者你拥有starships
,在这种情况下,恭喜你成为埃隆·马斯克)。这是默认情况,因为如果你没有对父对象的独占访问权,Rust通常无法确保你对其子对象有独占访问权。
但是Mutex
有办法做到这一点:通过锁。实际上,互斥锁的主要作用就是实现这一点,即提供对内部数据的独占(可变)访问,即使许多线程可能对Mutex
本身有共享(不可变)访问权。
Rust的类型系统告诉了我们Mutex
的作用。它动态地强制实现独占访问,而这通常是由Rust编译器在编译时静态完成的。
(你可能还记得std::cell::RefCell
也有类似功能,只是它不支持多线程。Mutex
和RefCell
都是内部可变性的不同形式,我们之前介绍过。)
# 为什么互斥锁并不总是好的选择
在开始介绍互斥锁之前,我们介绍了一些并发编程方法,如果你从C++转过来,可能会觉得这些方法出奇地容易正确使用。这并非巧合:这些方法旨在对并发编程中最令人困惑的方面提供强有力的保障。仅使用分治并行的程序是确定性的,不会发生死锁。使用通道的程序也表现得很好。像我们的索引构建程序那样仅将通道用于流水线操作的程序是确定性的:消息传递的时间可能会有所不同,但不会影响输出。诸如此类。多线程程序有这些保障是很好的!
Rust的Mutex
设计几乎可以肯定会让你比以往更系统、更合理地使用互斥锁。但值得停下来思考一下,Rust的安全保障能解决什么问题,不能解决什么问题。
安全的Rust代码不会触发数据竞争,数据竞争是一种特定类型的错误,即多个线程同时读写相同内存,产生无意义的结果。这很棒:数据竞争总是错误,而且在实际的多线程程序中并不罕见。
然而,使用互斥锁的线程会受到一些Rust无法为你解决的其他问题的影响:
- 有效的Rust程序不会出现数据竞争,但仍然可能存在其他竞态条件,即程序的行为依赖于线程之间的时间顺序,因此每次运行的结果可能会有所不同。有些竞态条件是良性的,但有些会表现为程序的不稳定和极难修复的错误。以无组织的方式使用互斥锁会引发竞态条件。你需要确保它们是良性的。
- 共享可变状态也会影响程序设计。通道在代码中充当抽象边界,便于将独立的组件分离出来进行测试,而互斥锁会鼓励一种 “只是添加一个方法” 的工作方式,这可能会导致代码变成一个相互关联的整体。
- 最后,互斥锁并不像乍看起来那么简单,接下来的两节将说明这一点。
所有这些问题都是这些工具固有的。尽可能使用更结构化的方法;必要时再使用Mutex
。
# 死锁
线程如果尝试获取它已经持有的锁,就可能会导致自身死锁:
let mut guard1 = self.waiting_list.lock().unwrap();
let mut guard2 = self.waiting_list.lock().unwrap(); // 死锁
2
假设对self.waiting_list.lock()
的第一次调用成功获取了锁。第二次调用发现锁已被持有,因此它会阻塞,等待锁被释放。但它会永远等待下去,因为等待的线程正是持有锁的那个线程。
换句话说,Mutex
中的锁不是递归锁。
这里的错误很明显。在实际程序中,这两个lock()
调用可能在两个不同的方法中,其中一个方法调用了另一个。单独看每个方法的代码,可能看起来都没问题。还有其他导致死锁的情况,涉及多个线程同时获取多个互斥锁。Rust的借用系统无法保护你避免死锁。最好的预防方法是将临界区保持得尽可能小:进入临界区,完成工作,然后离开。
使用通道也可能会出现死锁。例如,两个线程可能会阻塞,每个线程都在等待从另一个线程接收消息。然而,同样地,良好的程序设计可以让你有足够的信心,在实际中这种情况不会发生。在像我们的倒排索引构建程序这样的流水线中,数据流是无环的。在这样的程序中,死锁就像在Unix shell流水线中一样不太可能发生。
# 中毒的互斥锁
Mutex::lock()
和JoinHandle::join()
返回Result
的原因是一样的:如果另一个线程发生了恐慌(panic),能够优雅地处理失败情况。当我们写handle.join().unwrap()
时,我们是在告诉Rust将恐慌从一个线程传播到另一个线程。mutex.lock().unwrap()
这个习惯用法也是类似的。
如果一个线程在持有Mutex
时发生恐慌,Rust会将该Mutex
标记为中毒状态。后续任何尝试锁定这个中毒的Mutex
的操作都会得到一个错误结果。我们的.unwrap()
调用告诉Rust,如果发生这种情况就触发恐慌,将另一个线程的恐慌传播到这个线程。
一个中毒的互斥锁有多严重呢?“中毒” 听起来很严重,但这种情况不一定是致命的。正如我们在第7章所说,恐慌是安全的。一个发生恐慌的线程会让程序的其他部分处于安全状态。
那么,互斥锁在发生恐慌时被标记为中毒,并不是因为担心未定义行为。更确切地说,是因为我们可能一直在基于不变量进行编程。由于程序发生恐慌并在未完成临界区操作的情况下退出,可能更新了受保护数据的某些字段但没有更新其他字段,所以不变量现在可能已经被破坏了。Rust将互斥锁标记为中毒,是为了防止其他线程在不知情的情况下陷入这种被破坏的状态,从而使情况变得更糟。你仍然可以锁定一个中毒的互斥锁并访问其中的数据,并且完全强制实施互斥访问;可以查看PoisonError::into_inner()
的文档了解相关内容。但你不会意外地这么做。
# 使用互斥锁实现多消费者通道
我们之前提到过,Rust的通道是多生产者、单消费者的。或者更具体地说,一个通道只有一个Receiver
。我们不能让一个线程池中的许多线程将单个mpsc
通道用作共享的工作列表。
然而,事实证明,仅使用标准库中的组件就有一个非常简单的解决方法。我们可以在Receiver
周围添加一个Mutex
,然后仍然共享它。下面是一个实现此功能的模块:
pub mod shared_channel {
use std::sync::{Arc, Mutex};
use std::sync::mpsc::{channel, Sender, Receiver};
/// 对`Receiver`的线程安全包装。
#[derive(Clone)]
pub struct SharedReceiver<T>(Arc<Mutex<Receiver<T>>>);
impl<T> Iterator for SharedReceiver<T> {
type Item = T;
/// 从被包装的接收器获取下一个项。
fn next(&mut self) -> Option<T> {
let guard = self.0.lock().unwrap();
guard.recv().ok()
}
}
/// 创建一个新的通道,其接收器可以在多个线程间共享。
/// 它返回一个发送者和一个接收器,就像标准库中的`channel()`函数一样,并且有时可以直接替代它。
pub fn shared_channel<T>() -> (Sender<T>, SharedReceiver<T>) {
let (sender, receiver) = channel();
(sender, SharedReceiver(Arc::new(Mutex::new(receiver))))
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
我们使用了Arc<Mutex<Receiver<T>>>
。泛型在这里堆积得很多。这种情况在Rust中比在C++中更常见。这可能看起来会让人困惑,但通常,就像在这个例子中一样,仅从名称上就能帮助解释发生了什么,如图19 - 11所示。
图19 - 11. 如何解读复杂类型
# 读写锁(RwLock<T>)
现在,让我们从互斥锁转到std::sync
(Rust标准库的线程同步工具包)中提供的其他工具。我们将简要介绍,因为对这些工具的完整讨论超出了本书的范围。
服务器程序通常有配置信息,这些信息只加载一次,并且很少更改。大多数线程只是查询配置,但由于配置是可以更改的(例如,可能要求服务器从磁盘重新加载配置),所以无论如何都必须用锁来保护它。在这种情况下,互斥锁可以工作,但它会成为不必要的瓶颈。如果配置没有变化,线程不应该轮流查询配置。这种情况就适合使用读写锁(RwLock)。
互斥锁只有一个lock
方法,而读写锁有两个锁定方法:read
和write
。RwLock::write
方法类似于Mutex::lock
,它等待对受保护数据的独占、可变访问。RwLock::read
方法提供不可变访问,其优点是不太可能需要等待,因为许多线程可以同时安全地读取。使用互斥锁时,在任何给定时刻,受保护的数据只有一个读取者或一个写入者(或者没有)。而使用读写锁时,它可以有一个写入者或多个读取者,这与Rust的引用机制大致相似。
FernEmpireApp
可能有一个用于配置的结构体,由RwLock
保护:
use std::sync::RwLock;
struct FernEmpireApp {
...
config: RwLock<AppConfig>,
...
}
2
3
4
5
6
7
读取配置的方法会使用RwLock::read()
:
/// 如果应该使用实验性的真菌代码,则返回true。
fn mushrooms_enabled(&self) -> bool {
let config_guard = self.config.read().unwrap();
config_guard.mushrooms_enabled
}
2
3
4
5
重新加载配置的方法会使用RwLock::write()
:
fn reload_config(&self) -> io::Result<()> {
let new_config = AppConfig::load()?;
let mut config_guard = self.config.write().unwrap();
*config_guard = new_config;
Ok(())
}
2
3
4
5
6
当然,Rust非常适合强制执行关于RwLock
数据的安全规则。单写入者或多读取者的概念是Rust借用系统的核心。
self.config.read()
返回一个guard
,提供对AppConfig
的不可变(共享)访问;self.config.write()
返回另一种类型的guard
,提供可变(独占)访问。
# 条件变量(Condvar)
通常,一个线程需要等待某个条件变为真:
- 在服务器关闭期间,主线程可能需要等待所有其他线程完成退出。
- 当工作线程无事可做时,它需要等待直到有数据需要处理。
- 实现分布式共识协议的线程可能需要等待直到达到法定数量的对等节点做出响应。
有时,对于我们想要等待的特定条件,有方便的阻塞API,比如在服务器关闭的例子中使用JoinHandle::join
。在其他情况下,没有内置的阻塞API。程序可以使用条件变量来构建自己的等待机制。在Rust中,std::sync::Condvar
类型实现了条件变量。Condvar
有.wait()
和.notify_all()
方法;.wait()
会阻塞,直到其他某个线程调用.notify_all()
。
实际情况比这稍微复杂一些,因为条件变量总是与由某个特定Mutex
保护的某些数据的某个真假条件相关。因此,这个Mutex
和Condvar
是相关的。这里没有足够的篇幅进行完整的解释,但为了帮助之前使用过条件变量的程序员,我们将展示两个关键代码片段。
当期望的条件变为真时,我们调用Condvar::notify_all
(或notify_one
)来唤醒任何等待的线程:
self.has_data_condvar.notify_all();
为了进入睡眠状态并等待某个条件变为真,我们使用Condvar::wait()
:
while!guard.has_data() {
guard = self.has_data_condvar.wait(guard).unwrap();
}
2
3
这个while
循环是使用条件变量的标准习惯用法。然而,Condvar::wait
的签名不太寻常。它按值接受一个MutexGuard
对象,消耗它,并在成功时返回一个新的MutexGuard
。这体现了这样一种逻辑:wait
方法先释放互斥锁,然后在返回之前重新获取它。按值传递MutexGuard
就像是在说:“我授予你,.wait()
方法,释放这个互斥锁的独占权限。”
# 原子类型
std::sync::atomic
模块包含用于无锁并发编程的原子类型。这些类型基本上与标准C++原子类型相同,还增加了一些额外的类型:
AtomicIsize
和AtomicUsize
是与单线程的isize
和usize
类型相对应的共享整数类型。AtomicI8
、AtomicI16
、AtomicI32
、AtomicI64
以及它们的无符号变体(如AtomicU8
)是与单线程的i8
、i16
等类型相对应的共享整数类型。AtomicBool
是一个共享的布尔值。AtomicPtr<T>
是不安全指针类型*mut T
的共享值。
原子数据的正确使用超出了本书的范围。可以说,多个线程可以同时读取和写入一个原子值而不会导致数据竞争。
原子类型没有使用通常的算术和逻辑运算符,而是提供了执行原子操作的方法,包括单个加载、存储、交换以及即使在其他线程也对相同内存位置执行原子操作时,仍能安全作为一个单元执行的算术操作。对一个名为atom
的AtomicIsize
进行递增操作如下:
use std::sync::atomic::{AtomicIsize, Ordering};
let atom = AtomicIsize::new(0);
atom.fetch_add(1, Ordering::SeqCst);
2
3
4
这些方法可能会编译成特定的机器语言指令。在x86 - 64架构上,这个.fetch_add()
调用会编译成lock incq
指令,而普通的n += 1
可能会编译成简单的incq
指令或其各种变体。Rust编译器还必须放弃围绕原子操作的一些优化,因为与普通的加载或存储不同,原子操作会立即受到其他线程的影响,或者立即影响其他线程。
参数Ordering::SeqCst
是一种内存排序。内存排序有点像数据库中的事务隔离级别。它们告诉系统你对因果关系和时间无环等概念的关注程度,与性能相对。内存排序对程序正确性至关重要,但理解和推理起来很棘手。幸运的是,选择顺序一致性(最严格的内存排序)带来的性能损失通常相当低,这与将SQL数据库设置为SERIALIZABLE
模式时的性能损失不同。所以如果不确定,就使用Ordering::SeqCst
。Rust从标准C++原子类型继承了其他几种内存排序方式,对存在的本质和因果关系有各种较弱的保证。我们这里不讨论它们。
原子类型的一个简单用途是用于取消操作。假设我们有一个线程正在进行一些长时间运行的计算,比如渲染视频,并且我们希望能够异步取消它。问题在于如何通知这个线程我们希望它关闭。我们可以通过一个共享的AtomicBool
来实现:
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
let cancel_flag = Arc::new(AtomicBool::new(false));
let worker_cancel_flag = cancel_flag.clone();
2
3
4
5
这段代码创建了两个指向同一个在堆上分配的AtomicBool
的Arc<AtomicBool>
智能指针,其初始值为false
。第一个名为cancel_flag
,将留在主线程中。第二个worker_cancel_flag
将被移动到工作线程中。
下面是工作线程的代码:
use std::thread;
use std::sync::atomic::Ordering;
let worker_handle = thread::spawn(move || {
for pixel in animation.pixels_mut() {
render(pixel); // 光线追踪 - 这需要几微秒
if worker_cancel_flag.load(Ordering::SeqCst) {
return None;
}
}
Some(animation)
});
2
3
4
5
6
7
8
9
10
11
12
在渲染每个像素之后,线程通过调用.load()
方法检查标志的值:
worker_cancel_flag.load(Ordering::SeqCst)
如果在主线程中我们决定取消工作线程,我们在AtomicBool
中存储true
,然后等待线程退出:
// 取消渲染。
cancel_flag.store(true, Ordering::SeqCst);
// 丢弃结果,结果可能是`None`。
worker_handle.join().unwrap();
2
3
4
当然,还有其他实现方式。这里的AtomicBool
可以用Mutex<bool>
或通道来替代。主要的区别在于原子类型的开销最小。原子操作从不使用系统调用。加载或存储操作通常编译成单个CPU指令。
原子类型是一种内部可变性的形式,类似于Mutex
或RwLock
,所以它们的方法通过共享(不可变)引用获取self
。这使得它们可以用作简单的全局变量。
# 全局变量
假设我们正在编写网络代码。我们希望有一个全局变量,一个每次处理数据包时都会递增的计数器:
/// 服务器成功处理的数据包数量。
static PACKETS_SERVED: usize = 0;
2
这段代码编译没问题,但有一个问题。PACKETS_SERVED
是不可变的,所以我们永远无法更改它。
Rust会尽可能地不鼓励使用全局可变状态。用 const
声明的常量当然是不可变的。静态变量默认也是不可变的,所以无法获取对其的可变引用。静态变量可以声明为 mut
,但这样访问它是不安全的。
Rust对线程安全性的坚持是这些规则的主要原因。
全局可变状态在软件工程方面也有不良影响:它往往会使程序的各个部分耦合更紧密,更难测试,以后也更难修改。不过,在某些情况下确实没有合理的替代方案,所以我们最好找到一种安全的方式来声明可变静态变量。
在保证线程安全的同时支持递增 PACKETS_SERVED
的最简单方法是将其设为原子整数:
use std::sync::atomic::AtomicUsize;
static PACKETS_SERVED: AtomicUsize = AtomicUsize::new(0);
2
声明这个静态变量后,递增数据包计数就很简单了:
use std::sync::atomic::Ordering;
PACKETS_SERVED.fetch_add(1, Ordering::SeqCst);
2
原子全局变量仅限于简单的整数和布尔值。不过,创建任何其他类型的全局变量相当于要解决两个问题。
首先,变量必须以某种方式保证线程安全,否则它不能是全局的:为了安全起见,静态变量必须既是 Sync
的又是不可变的。幸运的是,我们已经看到了解决这个问题的方法。Rust有用于安全共享可变值的类型:Mutex
、RwLock
和原子类型。即使这些类型被声明为不可变,它们的值也可以被修改。这就是它们的作用(见 “mut和Mutex”)。
其次,静态初始化器只能调用特别标记为 const
的函数,编译器可以在编译时计算这些函数。换句话说,它们的输出是确定的;仅取决于它们的参数,而不依赖于任何其他状态或输入输出。这样,编译器可以将计算结果作为编译时常量嵌入。这与C++ 中的 constexpr
类似。
原子类型(AtomicUsize
、AtomicBool
等)的构造函数都是 const
函数,这使得我们之前可以创建一个静态的 AtomicUsize
。其他一些类型,如 String
、Ipv4Addr
和 Ipv6Addr
,也有简单的 const
构造函数。
你也可以通过在函数签名前加上 const
来定义自己的 const
函数。Rust将 const
函数能做的事情限制在一小部分操作内,这些操作既实用又不会产生不确定的结果。const
函数不能将类型作为泛型参数,只能使用生命周期,并且即使在 unsafe
块中,也不能分配内存或操作原始指针。不过,我们可以使用算术运算(包括环绕和饱和算术运算)、不会短路的逻辑运算以及其他 const
函数。例如,我们可以创建一些辅助函数,使定义静态变量和常量更容易,并减少代码重复:
const fn mono_to_rgba(level: u8) -> Color {
Color {
red: level,
green: level,
blue: level,
alpha: 0xFF
}
}
const WHITE: Color = mono_to_rgba(255);
const BLACK: Color = mono_to_rgba(000);
2
3
4
5
6
7
8
9
10
11
结合这些技术,我们可能会想这样写:
static HOSTNAME: Mutex<String> =
Mutex::new(String::new()); // 错误:静态变量中的调用仅限于常量函数、元组结构体和元组变体
2
不幸的是,虽然 AtomicUsize::new()
和 String::new()
是 const fn
,但 Mutex::new()
不是。为了绕过这些限制,我们需要使用 lazy_static
库。
我们在 “延迟构建正则表达式值” 中介绍过 lazy_static
库。使用 lazy_static!
宏定义变量时,你可以使用任何你喜欢的表达式来初始化它;在变量第一次被解引用时运行该表达式,并且这个值会被保存下来供后续使用。
我们可以使用 lazy_static
这样声明一个由 Mutex
控制的全局 HashMap
:
use lazy_static::lazy_static;
use std::sync::Mutex;
lazy_static! {
static ref HOSTNAME: Mutex<String> =
Mutex::new(String::new());
}
2
3
4
5
6
7
同样的技术适用于其他复杂的数据结构,如 HashMap
和 Deque
。对于那些根本不可变但只是需要复杂初始化的静态变量,它也非常方便。
使用 lazy_static!
会在每次访问静态数据时带来微小的性能开销。其实现使用了 std::sync::Once
,这是一个用于一次性初始化的低级同步原语。在幕后,每次访问延迟静态变量时,程序都会执行一条原子加载指令,以检查初始化是否已经完成。(Once
用途比较特殊,所以我们这里不详细介绍。通常使用 lazy_static!
更方便。不过,它在初始化非Rust库时很有用;例如,见 “libgit2的安全接口”。)
# 在Rust中编写并发代码的体验
我们展示了在Rust中使用线程的三种技术:分叉 - 合并并行、通道以及使用锁的共享可变状态。我们的目标是对Rust提供的这些部分进行很好的介绍,重点是它们如何组合到实际程序中。
Rust坚持安全性,所以从你决定编写多线程程序的那一刻起,重点就在于构建安全、结构化的通信。让线程尽可能保持隔离是让Rust相信你所做的事情是安全的好方法。碰巧的是,隔离也是确保你所做的事情正确且可维护的好方法。
再次强调,Rust引导你编写出优秀的程序。
更重要的是,Rust允许你组合各种技术并进行尝试。你可以快速迭代:与编译器 “争论” 比调试数据竞争能让你更快地正确运行程序。