CppGuide社区 CppGuide社区
首页
  • 最新谷歌C++风格指南(含C++17/20)
  • C++17详解
  • C++20完全指南
  • C++23快速入门
  • C++语言面试问题集锦
  • 🔥C/C++后端开发常见面试题解析 (opens new window)
  • 网络编程面试题 (opens new window)
  • 网络编程面试题 答案详解 (opens new window)
  • 聊聊WebServer作面试项目那些事儿 (opens new window)
  • 字节跳动面试官现身说 (opens new window)
  • 技术简历指南 (opens new window)
  • 🔥交易系统开发岗位求职与面试指南 (opens new window)
  • 第1章 高频C++11重难点知识解析
  • 第2章 Linux GDB高级调试指南
  • 第3章 C++多线程编程从入门到进阶
  • 第4章 C++网络编程重难点解析
  • 第5章 网络通信故障排查常用命令
  • 第6章 高性能网络通信协议设计精要
  • 第7章 高性能服务结构设计
  • 第8章 Redis网络通信模块源码分析
  • 第9章 后端服务重要模块设计探索
  • 🚀 全部章节.pdf 下载 (opens new window)
  • 源码分析系列

    • leveldb源码分析
    • libevent源码分析
    • Memcached源码分析
    • TeamTalk源码分析
    • 优质源码分享 (opens new window)
    • 🔥远程控制软件gh0st源码分析
  • 从零手写C++项目系列

    • C++游戏编程入门(零基础学C++)
    • 🔥使用C++17从零开发一个调试器 (opens new window)
    • 🔥使用C++20从零构建一个完整的低延迟交易系统 (opens new window)
    • 🔥使用C++从零写一个C语言编译器 (opens new window)
    • 从零用C语言写一个Redis
  • Windows 10系统编程
  • 🔥Linux 5.x内核开发与调试 完全指南 (opens new window)
  • TCP源码实现超详细注释版.pdf (opens new window)
  • Go语言特性

    • Go开发实用指南
    • Go系统接口编程
    • 高效Go并发编程
    • Go性能调优
    • Go项目架构设计
  • Go项目实战

    • 使用Go从零开发一个数据库
    • 🔥使用Go从零开发一个编译器 (opens new window)
    • 🔥使用Go从零开发一个解释器 (opens new window)
    • 🔥用Go从零写一个编排器(类Kubernetes) (opens new window)
  • Rust编程

    • Rust编程指南
  • 数据库

    • SQL零基础指南
    • MySQL开发与调试指南
GitHub (opens new window)
首页
  • 最新谷歌C++风格指南(含C++17/20)
  • C++17详解
  • C++20完全指南
  • C++23快速入门
  • C++语言面试问题集锦
  • 🔥C/C++后端开发常见面试题解析 (opens new window)
  • 网络编程面试题 (opens new window)
  • 网络编程面试题 答案详解 (opens new window)
  • 聊聊WebServer作面试项目那些事儿 (opens new window)
  • 字节跳动面试官现身说 (opens new window)
  • 技术简历指南 (opens new window)
  • 🔥交易系统开发岗位求职与面试指南 (opens new window)
  • 第1章 高频C++11重难点知识解析
  • 第2章 Linux GDB高级调试指南
  • 第3章 C++多线程编程从入门到进阶
  • 第4章 C++网络编程重难点解析
  • 第5章 网络通信故障排查常用命令
  • 第6章 高性能网络通信协议设计精要
  • 第7章 高性能服务结构设计
  • 第8章 Redis网络通信模块源码分析
  • 第9章 后端服务重要模块设计探索
  • 🚀 全部章节.pdf 下载 (opens new window)
  • 源码分析系列

    • leveldb源码分析
    • libevent源码分析
    • Memcached源码分析
    • TeamTalk源码分析
    • 优质源码分享 (opens new window)
    • 🔥远程控制软件gh0st源码分析
  • 从零手写C++项目系列

    • C++游戏编程入门(零基础学C++)
    • 🔥使用C++17从零开发一个调试器 (opens new window)
    • 🔥使用C++20从零构建一个完整的低延迟交易系统 (opens new window)
    • 🔥使用C++从零写一个C语言编译器 (opens new window)
    • 从零用C语言写一个Redis
  • Windows 10系统编程
  • 🔥Linux 5.x内核开发与调试 完全指南 (opens new window)
  • TCP源码实现超详细注释版.pdf (opens new window)
  • Go语言特性

    • Go开发实用指南
    • Go系统接口编程
    • 高效Go并发编程
    • Go性能调优
    • Go项目架构设计
  • Go项目实战

    • 使用Go从零开发一个数据库
    • 🔥使用Go从零开发一个编译器 (opens new window)
    • 🔥使用Go从零开发一个解释器 (opens new window)
    • 🔥用Go从零写一个编排器(类Kubernetes) (opens new window)
  • Rust编程

    • Rust编程指南
  • 数据库

    • SQL零基础指南
    • MySQL开发与调试指南
GitHub (opens new window)
  • Go开发实用指南 说明
  • 第1章 Go项目如何组织
  • 第2章 字符串处理
  • 第3章 处理日期和时间
  • 第4章 使用数组、切片和映射
  • 第5章 使用类型、结构体和接口
  • 第6章 使用泛型
  • 第7章 并发
    • 使用goroutine并发执行任务
      • 创建goroutine
      • 如何操作...
      • 并发运行多个独立函数并等待它们完成
      • 如何操作...
    • 使用通道在goroutine之间进行通信
      • 使用通道发送和接收数据
      • 如何操作...
      • 从多个goroutine向通道发送数据
      • 如何操作...
      • 使用通道收集并发计算的结果
      • 操作方法……
    • 使用select语句处理多个通道
      • 操作方法……
      • 取消goroutine
      • 操作方法……
      • 使用非阻塞的select检测取消操作
      • 操作方法……
    • 共享内存
      • 并发更新共享变量
      • 操作方法……
  • 第8章 错误与恐慌(panic)
  • 第9章 context包
  • 第10章 处理大量数据
  • 第11章 处理JSON数据
  • 第12章 进程
  • 第13章 网络编程
  • 第14章 流式输入/输出
  • 第15章 数据库
  • 第16章 日志记录
  • 第17章 测试、基准测试和性能分析
目录

第7章 并发

# 第7章 并发

并发是Go语言的核心部分。与许多其他通过丰富的多线程库来支持并发的语言不同,Go语言仅提供了少量语言原语用于编写并发程序。

首先要强调的是,并发(Concurrency)不等于并行(Parallelism)。并发关注的是如何编写程序;而并行关注的是程序如何运行。一个并发程序会指明程序的哪些部分可以并行运行。根据实际的执行情况,程序的并发部分可能顺序执行,也可能并行执行。一个正确的并发程序,无论以何种方式运行,都会产生相同的结果。

本章将通过方法示例介绍一些Go语言的并发原语。在本章中,你将学习以下内容:

  • 创建goroutine
  • 并发运行多个独立函数并等待它们结束
  • 使用通道(channels)发送和接收数据
  • 从多个goroutine向通道发送数据
  • 使用通道收集并发计算的结果
  • 使用select语句处理多个通道
  • 取消一个goroutine
  • 使用非阻塞的select检测取消操作
  • 并发更新共享变量

# 使用goroutine并发执行任务

goroutine是一种能与其他goroutine并发运行的函数。当一个程序启动时,Go运行时会创建几个goroutine。其中一个goroutine运行垃圾回收器,另一个goroutine运行main函数。随着程序的执行,它会根据需要创建更多的goroutine。

一个典型的Go程序可能会有数千个goroutine同时运行。Go运行时会将这些goroutine调度到操作系统线程上。每个操作系统线程会被分配一定数量的goroutine,并通过时间分片的方式来运行它们。在任何给定时刻,活动的goroutine数量最多可以和逻辑处理器的数量相同,计算方式为:每个核心的线程数×每个CPU的核心数×CPU的数量。

# 创建goroutine

goroutine是Go语言不可或缺的一部分。你可以使用go关键字来创建goroutine。

# 如何操作...

在函数调用前使用go关键字来创建goroutine:

func f() {
	// 执行一些工作
}

func main() {
	go f()
	
    ...
}
1
2
3
4
5
6
7
8
9

当go f()被求值时,运行时会创建一个新的goroutine并调用f函数。运行main函数的goroutine也会继续运行。换句话说,当go关键字被求值时,程序执行会分裂为两个并发的执行流:一个是原始的执行流(在前面的示例中,就是运行main函数的执行流),另一个则运行go关键字后面的函数。

如果有需要,函数可以接受参数:

func f(i int) {
	// 执行一些工作
}

func main() { 
	var x int
	go f(x)
	
    ...
}
1
2
3
4
5
6
7
8
9
10

在goroutine启动前,函数的参数会先被求值。也就是说,主goroutine会先对f函数的参数(在这个例子中是x的值)进行求值,然后创建一个新的goroutine并运行f函数。

使用闭包来运行goroutine是一种常见的做法。闭包提供了理解代码所需的上下文,还能避免将大量变量作为参数传递给goroutine:

func main() {
    var x int
    var y int
    
    ...
    
    go func(i int) {
        if y > 0 {
        // 执行一些工作
        }
    }(x)
    
    ...
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

在前面的代码中,x作为参数传递给了goroutine,而y则被闭包捕获。当go关键字运行的函数结束时,goroutine也会终止。

# 并发运行多个独立函数并等待它们完成

当你有多个不共享数据的独立函数时,可以使用本方法来并发运行它们。我们还将使用sync.WaitGroup来等待goroutine完成。

# 如何操作...

  1. 创建一个sync.WaitGroup实例来等待goroutine:
wg := sync.WaitGroup{}
1

sync.WaitGroup实际上就是一个线程安全的计数器。我们为创建的每个goroutine调用wg.Add(1),每当一个goroutine结束时,调用wg.Done()来减1。然后我们可以等待sync.WaitGroup的值变为零,这表明所有的goroutine都已终止。 2. 对于每个要并发运行的函数,执行以下操作: - 对等待组加1 - 启动一个新的goroutine - 调用defer wg.Done()以确保能标记goroutine已结束

wg.Add(1)
go func() {
	defer wg.Done()
	// 执行工作
}()
1
2
3
4
5
提示
你不必为每个goroutine单独对等待组加1,而是可以直接加上goroutine的总数。例如,如果你知道要创建5个goroutine,可以在创建第一个goroutine之前直接执行wg.Add(5)。
  1. 等待goroutine结束:
wg.Wait()
1

这个调用会阻塞,直到wg的值变为零,即直到所有的goroutine都调用了wg.Done()。 4. 现在,你可以使用所有goroutine的结果了。

这个方法的关键细节在于,所有goroutine都是独立的,这意味着:在调用wg.Done()之前,每个goroutine写入的所有变量都仅由该goroutine使用。goroutine可以读取共享变量,但不能写入共享变量。在调用wg.Done()之后,所有goroutine都已终止,它们写入的变量就可以使用了。 5. 任何一个goroutine都不依赖于其他goroutine的结果。

在调用wg.Wait()之前,你不应尝试读取某个goroutine的结果,否则会出现内存竞争(memory race),导致未定义行为。

当你与其他写入或读取操作并发地写入一个共享变量时,就会发生内存竞争。包含内存竞争的程序,其运行结果是未定义的。

# 使用通道在goroutine之间进行通信

通常情况下,多个goroutine需要进行通信和协调,以分配工作、管理状态并整理计算结果。通道是实现这一目的的首选机制。通道是一种同步机制,它有一个可选的固定大小的缓冲区。

# 使用通道发送和接收数据

如果有另一个goroutine正在等待从通道接收数据,或者对于有缓冲区的通道而言,通道缓冲区还有可用空间,那么一个goroutine就可以向通道发送数据,否则,发送操作会被阻塞,直到可以发送为止。

如果有另一个goroutine正在等待向通道发送数据,或者对于有缓冲区的通道而言,通道缓冲区中有数据,那么一个goroutine就可以从通道接收数据,否则,接收操作会被阻塞,直到可以接收为止。

# 如何操作...

  1. 创建一个通道,并指定它要传递的数据类型。以下示例创建了一个可以传递字符串的通道:
ch := make(chan string)
1
  1. 在一个goroutine中,向通道发送数据元素。当所有数据元素都发送完毕后,关闭通道:
go func() {
    for _, str := range stringData {
        // 将字符串发送到通道。这会阻塞,直到另一个goroutine可以从通道接收数据。
        ch <- str
	}
    
    // 发送完毕后关闭通道。这是向接收方goroutine发出信号,表明没有更多数据了。
    close(ch)
}()
1
2
3
4
5
6
7
8
9
  1. 在另一个goroutine中从通道接收数据。在下面的示例中,主goroutine从通道接收字符串并打印出来。当通道关闭时,for循环结束:
for str := range ch {
	fmt.Println(str)
}
1
2
3

# 从多个goroutine向通道发送数据

在某些情况下,你可能有许多goroutine在处理一个问题的不同部分,当它们完成时,会通过一个通道发送结果。这种情况下的一个问题是如何确定何时关闭通道。本方法展示了具体做法。

# 如何操作...

  1. 创建结果通道,并指定它要传递的数据类型:
ch := make(chan string)
1
  1. 创建监听器goroutine和一个等待组,以便稍后等待它完成。这个goroutine会被阻塞,直到其他goroutine开始发送数据:
// 用于存储结果
results := make([]string,0)
// 等待组,用于稍后等待监听器goroutine结束
listenerWg := sync.WaitGroup{}
listenerWg.Add(1)
go func() {
    defer listenerWg.Done()
    
    // 收集结果并存储在切片中
    for str:=range ch {
    	results = append(results, str)
    }
}()
1
2
3
4
5
6
7
8
9
10
11
12
13
  1. 创建一个等待组来跟踪要向结果通道写入数据的goroutine。然后,创建向通道发送数据的goroutine:
wg := sync.WaitGroup{}
for _,input := range inputs {
    wg.Add(1)
    
    go func(data string) {
        defer wg.Done()
        ch <- processInput(data)
    }(input)
}
1
2
3
4
5
6
7
8
9
  1. 等待处理数据的goroutine结束,并关闭结果通道:
// 等待所有goroutine结束
wg.Wait()

// 关闭通道,表明数据发送结束
// 这会向监听器goroutine发出信号,表明不会再有数据通过该通道到达
close(ch)
1
2
3
4
5
6
  1. 等待监听器goroutine结束:
listenerWg.Wait()
1

现在你可以使用results切片了。

# 使用通道收集并发计算的结果

通常,你会有多个goroutine分别处理一个问题的不同部分,你需要收集每个goroutine的结果,以组合成一个最终的结果对象。通道是实现这一目的的理想机制。

# 操作方法……

  1. 创建一个通道来收集计算结果:
resultCh := make(chan int)
1

在这个例子中,resultCh通道是一个int类型值的通道。也就是说,计算结果将是整数。 2. 创建一个sync.WaitGroup实例来等待goroutine完成:

wg := sync.WaitGroup{}
1
  1. 在goroutine之间分配工作。每个goroutine都应该能够访问resultCh。将每个goroutine添加到等待组中,并确保在goroutine中调用defer wg.Done()。
  2. 在goroutine中执行计算,并将结果发送到resultCh:
var inputs [][]int = []int{...}

...

for i := range inputs {
    wg.Add(1)
    go func(data []int) {
        defer wg.Done()
        // 执行计算
        // computeResult接收一个[]int,并返回int
        // 将结果发送到resultCh
        resultCh <- computeResult(data)
    }(inputs[i])
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
  1. 在这里,你需要做两件事:等待所有goroutine完成,并从resultCh收集结果。有两种方法可以做到这一点:

    • 在等待goroutine并发结束的同时收集结果。也就是说,创建一个goroutine并等待其他goroutine结束。当所有goroutine完成后,关闭通道:
    go func() {
        // 等待goroutine结束
        wg.Wait()
        // 当所有goroutine完成后,关闭通道
        close(resultCh)
    }()
    
    // 创建一个切片来存储计算结果
    results := make([]int, 0)
    
    // 从resultCh收集结果
    // 当resultCh关闭时,for循环将终止
    for result := range resultCh {
        results = append(results, result)
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    • 在等待goroutine结束的同时异步收集结果。当所有goroutine完成后,关闭通道。然而,当你关闭通道时,收集结果的goroutine可能仍在运行。我们也必须等待那个goroutine结束。为此,我们可以使用另一个等待组:
    results := make([]int, 0)
    // 为结果收集goroutine创建一个新的等待组
    collectWg := sync.WaitGroup{}
    // 将收集结果的goroutine添加到等待组
    collectWg.Add(1)
    go func() {
        // 宣布这个goroutine完成
        defer collectWg.Done()
        // 收集结果。当resultCh关闭时,for循环将终止。
        for result := range resultCh {
            results = append(results, result)
        }
    }()
    
    // 等待goroutine结束。
    wg.Wait()
    // 关闭通道,以便结果收集goroutine可以完成
    close(resultCh)
    // 现在等待结果收集goroutine完成
    collectWg.Wait()
    // results切片已准备好
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21

# 使用select语句处理多个通道

在任何给定时间,你只能向通道发送数据或从通道接收数据。如果你要与多个goroutine交互(因此会涉及多个并发事件),就需要一种语言结构,让你能够同时与多个通道进行交互。这种结构就是select语句。本节将展示如何使用select语句。

# 操作方法……

阻塞式的select语句会从零个或多个情况中选择一个活动的情况。每个情况都是一个通道发送或通道接收事件。如果没有活动的情况(即没有任何通道可以进行发送或接收操作),select语句就会阻塞。

在下面的例子中,select语句等待从两个通道之一接收数据。程序只会从其中一个通道接收数据。如果两个通道都已准备好,会随机选择其中一个通道。另一个通道将不会被读取:

ch1 := make(chan int)
ch2 := make(chan int)

go func() {
    ch1 <- 1
}()

go func() {
    ch2 <- 2
}()

select {
case data1 := <-ch1:
    fmt.Println("Read from channel 1: %v", data1)
    
case data2 := <-ch2:
    fmt.Println("Read from channel 2: %v", data2)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

# 取消goroutine

在Go语言中创建goroutine既简单又高效,但你还必须确保goroutine最终能够结束。如果一个goroutine无意中一直运行,就称为“泄漏”的goroutine。如果一个程序不断泄漏goroutine,最终会因内存不足错误而崩溃。

有些goroutine执行有限数量的操作后会自然终止,但有些会无限期运行,直到收到外部刺激。长期运行的goroutine接收这种刺激的常见模式是使用一个done通道。

# 操作方法……

  1. 创建一个数据类型为空的done通道:
done := make(chan struct{})
1
  1. 创建一个通道为goroutine提供输入:
input := make(chan int)
1
  1. 创建如下的goroutine:
go func() {
    for {
        select {
        case data := <-input:
            // 处理数据
            
        case <-done:
            // 完成信号。终止
            return
        }
    }
}()
1
2
3
4
5
6
7
8
9
10
11
12

要取消goroutine,只需关闭done通道:

close(done)
1

这将使所有监听done通道的goroutine中的case <-done分支生效,它们将终止运行。

# 使用非阻塞的select检测取消操作

非阻塞的select有一个default情况。当select语句运行时,它会检查所有可用的情况,如果没有一个情况可用,就会选择default情况。这使得select可以继续执行而不会阻塞。

# 操作方法……

  1. 创建一个数据类型为空的done通道:
done := make(chan struct{})
1
  1. 创建如下的goroutine:
go func() {
    for {
        select {
        case <-done:
            // 完成信号。终止
            return
            
        default:
            // 未收到完成信号。继续
        }
        // 执行工作
    }
}()
1
2
3
4
5
6
7
8
9
10
11
12
13

要取消goroutine,只需关闭done通道:

close(done)
1

# 共享内存

Go语言中最著名的习惯用法之一是:“不要通过共享内存来通信,而要通过通信来共享内存”。通道用于通过通信来共享内存。而通过共享内存进行通信是在多个goroutine中使用共享变量来实现的。尽管不鼓励这种做法,但在很多情况下,共享内存比通道更合理。如果至少有一个goroutine更新了其他goroutine会读取的共享变量,就必须确保不会出现内存竞争(memory race)。

当一个goroutine在另一个goroutine读取某个变量的同时并发更新该变量时,就会发生内存竞争。发生这种情况时,无法保证其他goroutine能看到对该变量的更新。这种情况的一个经典例子是忙等待循环:

func main() {
    done := false
    go func() {
        // 当done为false时等待
        for!done {}
        fmt.Println("Done is true now")
    }()
    
    done = true
    
    // 无限期等待
    select {}
}
1
2
3
4
5
6
7
8
9
10
11
12
13

这个程序存在内存竞争。done=true的赋值操作与for!done循环是并发的。这意味着,即使主goroutine执行了done=true,读取done的goroutine可能永远看不到这个更新,从而无限期地停留在for循环中。

# 并发更新共享变量

Go语言的内存模型保证,变量写入的效果仅对该goroutine中在写入之后的指令可见。也就是说,如果你更新了一个共享变量,必须使用特殊工具,使这个更新对其他goroutine可见。一种简单的确保方法是使用互斥锁(mutex)。Mutex代表“互斥(mutual exclusion)”。互斥锁是一种工具,可用于确保以下两点:

  • 在任何给定时间,只有一个goroutine可以更新变量。
  • 一旦更新完成并释放互斥锁,所有goroutine都能看到这个更新。在本方法中,我们将展示如何实现这一点。

# 操作方法……

程序中更新共享变量的部分称为“临界区(critical section)”。你可以使用互斥锁确保只有一个goroutine能够进入其临界区。

声明一个互斥锁来保护临界区:

// cacheMutex将用于保护对cache的访问
var cacheMutex sync.Mutex
var cache map[string]any = map[string]any{}
1
2
3

互斥锁保护一组共享变量。例如,如果你有一些goroutine会更新一个整数,就需要为更新该整数的临界区声明一个互斥锁。每次读取或写入该整数值时,都必须使用相同的互斥锁。

在更新共享变量时,首先锁定互斥锁,然后执行更新操作,最后解锁互斥锁:

cacheMutex.Lock()
cache[key] = value
cacheMutex.Unlock()
1
2
3

通过上述代码,如果多个goroutine尝试更新cache,它们将在cacheMutex.Lock()处排队,每次只允许一个goroutine进行操作。当这个goroutine完成更新后,会调用cacheMutex.Unlock(),这将使一个等待的goroutine能够获取锁并再次更新cache。

在读取共享变量时,首先锁定互斥锁,然后执行读取操作,最后解锁互斥锁:

cacheMutex.Lock()
cachedValue, cached := cache[key] 
cacheMutex.Unlock()
if cached {
    // 在缓存中找到值
}
1
2
3
4
5
6
第6章 使用泛型
第8章 错误与恐慌(panic)

← 第6章 使用泛型 第8章 错误与恐慌(panic)→

最近更新
01
第二章 关键字static及其不同用法
03-27
02
第一章 auto与类型推导
03-27
03
第四章 Lambda函数
03-27
更多文章>
Copyright © 2024-2025 沪ICP备2023015129号 张小方 版权所有
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式