第7章 并发
# 第7章 并发
并发是Go语言的核心部分。与许多其他通过丰富的多线程库来支持并发的语言不同,Go语言仅提供了少量语言原语用于编写并发程序。
首先要强调的是,并发(Concurrency)不等于并行(Parallelism)。并发关注的是如何编写程序;而并行关注的是程序如何运行。一个并发程序会指明程序的哪些部分可以并行运行。根据实际的执行情况,程序的并发部分可能顺序执行,也可能并行执行。一个正确的并发程序,无论以何种方式运行,都会产生相同的结果。
本章将通过方法示例介绍一些Go语言的并发原语。在本章中,你将学习以下内容:
- 创建goroutine
- 并发运行多个独立函数并等待它们结束
- 使用通道(channels)发送和接收数据
- 从多个goroutine向通道发送数据
- 使用通道收集并发计算的结果
- 使用
select
语句处理多个通道 - 取消一个goroutine
- 使用非阻塞的
select
检测取消操作 - 并发更新共享变量
# 使用goroutine并发执行任务
goroutine是一种能与其他goroutine并发运行的函数。当一个程序启动时,Go运行时会创建几个goroutine。其中一个goroutine运行垃圾回收器,另一个goroutine运行main
函数。随着程序的执行,它会根据需要创建更多的goroutine。
一个典型的Go程序可能会有数千个goroutine同时运行。Go运行时会将这些goroutine调度到操作系统线程上。每个操作系统线程会被分配一定数量的goroutine,并通过时间分片的方式来运行它们。在任何给定时刻,活动的goroutine数量最多可以和逻辑处理器的数量相同,计算方式为:每个核心的线程数×每个CPU的核心数×CPU的数量。
# 创建goroutine
goroutine是Go语言不可或缺的一部分。你可以使用go
关键字来创建goroutine。
# 如何操作...
在函数调用前使用go
关键字来创建goroutine:
func f() {
// 执行一些工作
}
func main() {
go f()
...
}
2
3
4
5
6
7
8
9
当go f()
被求值时,运行时会创建一个新的goroutine并调用f
函数。运行main
函数的goroutine也会继续运行。换句话说,当go
关键字被求值时,程序执行会分裂为两个并发的执行流:一个是原始的执行流(在前面的示例中,就是运行main
函数的执行流),另一个则运行go
关键字后面的函数。
如果有需要,函数可以接受参数:
func f(i int) {
// 执行一些工作
}
func main() {
var x int
go f(x)
...
}
2
3
4
5
6
7
8
9
10
在goroutine启动前,函数的参数会先被求值。也就是说,主goroutine会先对f
函数的参数(在这个例子中是x
的值)进行求值,然后创建一个新的goroutine并运行f
函数。
使用闭包来运行goroutine是一种常见的做法。闭包提供了理解代码所需的上下文,还能避免将大量变量作为参数传递给goroutine:
func main() {
var x int
var y int
...
go func(i int) {
if y > 0 {
// 执行一些工作
}
}(x)
...
}
2
3
4
5
6
7
8
9
10
11
12
13
14
在前面的代码中,x
作为参数传递给了goroutine,而y
则被闭包捕获。当go
关键字运行的函数结束时,goroutine也会终止。
# 并发运行多个独立函数并等待它们完成
当你有多个不共享数据的独立函数时,可以使用本方法来并发运行它们。我们还将使用sync.WaitGroup
来等待goroutine完成。
# 如何操作...
- 创建一个
sync.WaitGroup
实例来等待goroutine:
wg := sync.WaitGroup{}
sync.WaitGroup
实际上就是一个线程安全的计数器。我们为创建的每个goroutine调用wg.Add(1)
,每当一个goroutine结束时,调用wg.Done()
来减1。然后我们可以等待sync.WaitGroup
的值变为零,这表明所有的goroutine都已终止。
2. 对于每个要并发运行的函数,执行以下操作:
- 对等待组加1
- 启动一个新的goroutine
- 调用defer wg.Done()
以确保能标记goroutine已结束
wg.Add(1)
go func() {
defer wg.Done()
// 执行工作
}()
2
3
4
5
提示 你不必为每个goroutine单独对等待组加1,而是可以直接加上goroutine的总数。例如,如果你知道要创建5个goroutine,可以在创建第一个goroutine之前直接执行 wg.Add(5) 。 |
---|
- 等待goroutine结束:
wg.Wait()
这个调用会阻塞,直到wg
的值变为零,即直到所有的goroutine都调用了wg.Done()
。
4. 现在,你可以使用所有goroutine的结果了。
这个方法的关键细节在于,所有goroutine都是独立的,这意味着:在调用wg.Done()
之前,每个goroutine写入的所有变量都仅由该goroutine使用。goroutine可以读取共享变量,但不能写入共享变量。在调用wg.Done()
之后,所有goroutine都已终止,它们写入的变量就可以使用了。
5. 任何一个goroutine都不依赖于其他goroutine的结果。
在调用wg.Wait()
之前,你不应尝试读取某个goroutine的结果,否则会出现内存竞争(memory race),导致未定义行为。
当你与其他写入或读取操作并发地写入一个共享变量时,就会发生内存竞争。包含内存竞争的程序,其运行结果是未定义的。
# 使用通道在goroutine之间进行通信
通常情况下,多个goroutine需要进行通信和协调,以分配工作、管理状态并整理计算结果。通道是实现这一目的的首选机制。通道是一种同步机制,它有一个可选的固定大小的缓冲区。
# 使用通道发送和接收数据
如果有另一个goroutine正在等待从通道接收数据,或者对于有缓冲区的通道而言,通道缓冲区还有可用空间,那么一个goroutine就可以向通道发送数据,否则,发送操作会被阻塞,直到可以发送为止。
如果有另一个goroutine正在等待向通道发送数据,或者对于有缓冲区的通道而言,通道缓冲区中有数据,那么一个goroutine就可以从通道接收数据,否则,接收操作会被阻塞,直到可以接收为止。
# 如何操作...
- 创建一个通道,并指定它要传递的数据类型。以下示例创建了一个可以传递字符串的通道:
ch := make(chan string)
- 在一个goroutine中,向通道发送数据元素。当所有数据元素都发送完毕后,关闭通道:
go func() {
for _, str := range stringData {
// 将字符串发送到通道。这会阻塞,直到另一个goroutine可以从通道接收数据。
ch <- str
}
// 发送完毕后关闭通道。这是向接收方goroutine发出信号,表明没有更多数据了。
close(ch)
}()
2
3
4
5
6
7
8
9
- 在另一个goroutine中从通道接收数据。在下面的示例中,主goroutine从通道接收字符串并打印出来。当通道关闭时,
for
循环结束:
for str := range ch {
fmt.Println(str)
}
2
3
# 从多个goroutine向通道发送数据
在某些情况下,你可能有许多goroutine在处理一个问题的不同部分,当它们完成时,会通过一个通道发送结果。这种情况下的一个问题是如何确定何时关闭通道。本方法展示了具体做法。
# 如何操作...
- 创建结果通道,并指定它要传递的数据类型:
ch := make(chan string)
- 创建监听器goroutine和一个等待组,以便稍后等待它完成。这个goroutine会被阻塞,直到其他goroutine开始发送数据:
// 用于存储结果
results := make([]string,0)
// 等待组,用于稍后等待监听器goroutine结束
listenerWg := sync.WaitGroup{}
listenerWg.Add(1)
go func() {
defer listenerWg.Done()
// 收集结果并存储在切片中
for str:=range ch {
results = append(results, str)
}
}()
2
3
4
5
6
7
8
9
10
11
12
13
- 创建一个等待组来跟踪要向结果通道写入数据的goroutine。然后,创建向通道发送数据的goroutine:
wg := sync.WaitGroup{}
for _,input := range inputs {
wg.Add(1)
go func(data string) {
defer wg.Done()
ch <- processInput(data)
}(input)
}
2
3
4
5
6
7
8
9
- 等待处理数据的goroutine结束,并关闭结果通道:
// 等待所有goroutine结束
wg.Wait()
// 关闭通道,表明数据发送结束
// 这会向监听器goroutine发出信号,表明不会再有数据通过该通道到达
close(ch)
2
3
4
5
6
- 等待监听器goroutine结束:
listenerWg.Wait()
现在你可以使用results
切片了。
# 使用通道收集并发计算的结果
通常,你会有多个goroutine分别处理一个问题的不同部分,你需要收集每个goroutine的结果,以组合成一个最终的结果对象。通道是实现这一目的的理想机制。
# 操作方法……
- 创建一个通道来收集计算结果:
resultCh := make(chan int)
在这个例子中,resultCh
通道是一个int
类型值的通道。也就是说,计算结果将是整数。
2. 创建一个sync.WaitGroup
实例来等待goroutine完成:
wg := sync.WaitGroup{}
- 在goroutine之间分配工作。每个goroutine都应该能够访问
resultCh
。将每个goroutine添加到等待组中,并确保在goroutine中调用defer wg.Done()
。 - 在goroutine中执行计算,并将结果发送到
resultCh
:
var inputs [][]int = []int{...}
...
for i := range inputs {
wg.Add(1)
go func(data []int) {
defer wg.Done()
// 执行计算
// computeResult接收一个[]int,并返回int
// 将结果发送到resultCh
resultCh <- computeResult(data)
}(inputs[i])
}
2
3
4
5
6
7
8
9
10
11
12
13
14
在这里,你需要做两件事:等待所有goroutine完成,并从
resultCh
收集结果。有两种方法可以做到这一点:- 在等待goroutine并发结束的同时收集结果。也就是说,创建一个goroutine并等待其他goroutine结束。当所有goroutine完成后,关闭通道:
go func() { // 等待goroutine结束 wg.Wait() // 当所有goroutine完成后,关闭通道 close(resultCh) }() // 创建一个切片来存储计算结果 results := make([]int, 0) // 从resultCh收集结果 // 当resultCh关闭时,for循环将终止 for result := range resultCh { results = append(results, result) }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15- 在等待goroutine结束的同时异步收集结果。当所有goroutine完成后,关闭通道。然而,当你关闭通道时,收集结果的goroutine可能仍在运行。我们也必须等待那个goroutine结束。为此,我们可以使用另一个等待组:
results := make([]int, 0) // 为结果收集goroutine创建一个新的等待组 collectWg := sync.WaitGroup{} // 将收集结果的goroutine添加到等待组 collectWg.Add(1) go func() { // 宣布这个goroutine完成 defer collectWg.Done() // 收集结果。当resultCh关闭时,for循环将终止。 for result := range resultCh { results = append(results, result) } }() // 等待goroutine结束。 wg.Wait() // 关闭通道,以便结果收集goroutine可以完成 close(resultCh) // 现在等待结果收集goroutine完成 collectWg.Wait() // results切片已准备好
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 使用select语句处理多个通道
在任何给定时间,你只能向通道发送数据或从通道接收数据。如果你要与多个goroutine交互(因此会涉及多个并发事件),就需要一种语言结构,让你能够同时与多个通道进行交互。这种结构就是select
语句。本节将展示如何使用select
语句。
# 操作方法……
阻塞式的select
语句会从零个或多个情况中选择一个活动的情况。每个情况都是一个通道发送或通道接收事件。如果没有活动的情况(即没有任何通道可以进行发送或接收操作),select
语句就会阻塞。
在下面的例子中,select
语句等待从两个通道之一接收数据。程序只会从其中一个通道接收数据。如果两个通道都已准备好,会随机选择其中一个通道。另一个通道将不会被读取:
ch1 := make(chan int)
ch2 := make(chan int)
go func() {
ch1 <- 1
}()
go func() {
ch2 <- 2
}()
select {
case data1 := <-ch1:
fmt.Println("Read from channel 1: %v", data1)
case data2 := <-ch2:
fmt.Println("Read from channel 2: %v", data2)
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 取消goroutine
在Go语言中创建goroutine既简单又高效,但你还必须确保goroutine最终能够结束。如果一个goroutine无意中一直运行,就称为“泄漏”的goroutine。如果一个程序不断泄漏goroutine,最终会因内存不足错误而崩溃。
有些goroutine执行有限数量的操作后会自然终止,但有些会无限期运行,直到收到外部刺激。长期运行的goroutine接收这种刺激的常见模式是使用一个done
通道。
# 操作方法……
- 创建一个数据类型为空的
done
通道:
done := make(chan struct{})
- 创建一个通道为goroutine提供输入:
input := make(chan int)
- 创建如下的goroutine:
go func() {
for {
select {
case data := <-input:
// 处理数据
case <-done:
// 完成信号。终止
return
}
}
}()
2
3
4
5
6
7
8
9
10
11
12
要取消goroutine,只需关闭done
通道:
close(done)
这将使所有监听done
通道的goroutine中的case <-done
分支生效,它们将终止运行。
# 使用非阻塞的select检测取消操作
非阻塞的select
有一个default
情况。当select
语句运行时,它会检查所有可用的情况,如果没有一个情况可用,就会选择default
情况。这使得select
可以继续执行而不会阻塞。
# 操作方法……
- 创建一个数据类型为空的
done
通道:
done := make(chan struct{})
- 创建如下的goroutine:
go func() {
for {
select {
case <-done:
// 完成信号。终止
return
default:
// 未收到完成信号。继续
}
// 执行工作
}
}()
2
3
4
5
6
7
8
9
10
11
12
13
要取消goroutine,只需关闭done
通道:
close(done)
# 共享内存
Go语言中最著名的习惯用法之一是:“不要通过共享内存来通信,而要通过通信来共享内存”。通道用于通过通信来共享内存。而通过共享内存进行通信是在多个goroutine中使用共享变量来实现的。尽管不鼓励这种做法,但在很多情况下,共享内存比通道更合理。如果至少有一个goroutine更新了其他goroutine会读取的共享变量,就必须确保不会出现内存竞争(memory race)。
当一个goroutine在另一个goroutine读取某个变量的同时并发更新该变量时,就会发生内存竞争。发生这种情况时,无法保证其他goroutine能看到对该变量的更新。这种情况的一个经典例子是忙等待循环:
func main() {
done := false
go func() {
// 当done为false时等待
for!done {}
fmt.Println("Done is true now")
}()
done = true
// 无限期等待
select {}
}
2
3
4
5
6
7
8
9
10
11
12
13
这个程序存在内存竞争。done=true
的赋值操作与for!done
循环是并发的。这意味着,即使主goroutine执行了done=true
,读取done
的goroutine可能永远看不到这个更新,从而无限期地停留在for
循环中。
# 并发更新共享变量
Go语言的内存模型保证,变量写入的效果仅对该goroutine中在写入之后的指令可见。也就是说,如果你更新了一个共享变量,必须使用特殊工具,使这个更新对其他goroutine可见。一种简单的确保方法是使用互斥锁(mutex)。Mutex代表“互斥(mutual exclusion)”。互斥锁是一种工具,可用于确保以下两点:
- 在任何给定时间,只有一个goroutine可以更新变量。
- 一旦更新完成并释放互斥锁,所有goroutine都能看到这个更新。在本方法中,我们将展示如何实现这一点。
# 操作方法……
程序中更新共享变量的部分称为“临界区(critical section)”。你可以使用互斥锁确保只有一个goroutine能够进入其临界区。
声明一个互斥锁来保护临界区:
// cacheMutex将用于保护对cache的访问
var cacheMutex sync.Mutex
var cache map[string]any = map[string]any{}
2
3
互斥锁保护一组共享变量。例如,如果你有一些goroutine会更新一个整数,就需要为更新该整数的临界区声明一个互斥锁。每次读取或写入该整数值时,都必须使用相同的互斥锁。
在更新共享变量时,首先锁定互斥锁,然后执行更新操作,最后解锁互斥锁:
cacheMutex.Lock()
cache[key] = value
cacheMutex.Unlock()
2
3
通过上述代码,如果多个goroutine尝试更新cache
,它们将在cacheMutex.Lock()
处排队,每次只允许一个goroutine进行操作。当这个goroutine完成更新后,会调用cacheMutex.Unlock()
,这将使一个等待的goroutine能够获取锁并再次更新cache
。
在读取共享变量时,首先锁定互斥锁,然后执行读取操作,最后解锁互斥锁:
cacheMutex.Lock()
cachedValue, cached := cache[key]
cacheMutex.Unlock()
if cached {
// 在缓存中找到值
}
2
3
4
5
6