CppGuide社区 CppGuide社区
首页
  • 🔥最新谷歌C++风格指南(含C++17/20)
  • 🔥C++17详解
  • 🔥C++20完全指南
  • 🔥C++23快速入门
🔥C++面试
  • 第1章 C++ 惯用法与Modern C++篇
  • 第2章 C++开发工具与调试进阶
  • 第3章 C++多线程编程从入门到进阶
  • 第4章 C++网络编程重难点解析
  • 第5章 网络通信故障排查常用命令
  • 第6章 网络通信协议设计
  • 第7章 高性能服务结构设计
  • 第8章 Redis网络通信模块源码分析
  • 第9章 服务其他模块设计
  • 🚀 全部章节.pdf 下载 (opens new window)
  • 🔥C++游戏编程入门(零基础学C++)
  • 🔥使用C++17从零开发一个调试器 (opens new window)
  • 🔥使用C++20从零构建一个完整的低延迟交易系统 (opens new window)
  • 🔥使用C++从零写一个C语言编译器 (opens new window)
  • 🔥从零用C语言写一个Redis
  • leveldb源码分析
  • libevent源码分析
  • Memcached源码分析
  • TeamTalk源码分析
  • 优质源码分享 (opens new window)
  • 🔥远程控制软件gh0st源码分析
  • 🔥Windows 10系统编程
  • 🔥Linux 5.x内核开发与调试 完全指南 (opens new window)
  • TCP源码实现超详细注释版.pdf (opens new window)
  • 高效Go并发编程
  • Go性能调优
  • Go项目架构设计
  • 🔥使用Go从零开发一个数据库
  • 🔥使用Go从零开发一个编译器 (opens new window)
  • 🔥使用Go从零开发一个解释器 (opens new window)
Rust编程指南
  • SQL零基础指南
  • MySQL开发与调试指南
GitHub (opens new window)
首页
  • 🔥最新谷歌C++风格指南(含C++17/20)
  • 🔥C++17详解
  • 🔥C++20完全指南
  • 🔥C++23快速入门
🔥C++面试
  • 第1章 C++ 惯用法与Modern C++篇
  • 第2章 C++开发工具与调试进阶
  • 第3章 C++多线程编程从入门到进阶
  • 第4章 C++网络编程重难点解析
  • 第5章 网络通信故障排查常用命令
  • 第6章 网络通信协议设计
  • 第7章 高性能服务结构设计
  • 第8章 Redis网络通信模块源码分析
  • 第9章 服务其他模块设计
  • 🚀 全部章节.pdf 下载 (opens new window)
  • 🔥C++游戏编程入门(零基础学C++)
  • 🔥使用C++17从零开发一个调试器 (opens new window)
  • 🔥使用C++20从零构建一个完整的低延迟交易系统 (opens new window)
  • 🔥使用C++从零写一个C语言编译器 (opens new window)
  • 🔥从零用C语言写一个Redis
  • leveldb源码分析
  • libevent源码分析
  • Memcached源码分析
  • TeamTalk源码分析
  • 优质源码分享 (opens new window)
  • 🔥远程控制软件gh0st源码分析
  • 🔥Windows 10系统编程
  • 🔥Linux 5.x内核开发与调试 完全指南 (opens new window)
  • TCP源码实现超详细注释版.pdf (opens new window)
  • 高效Go并发编程
  • Go性能调优
  • Go项目架构设计
  • 🔥使用Go从零开发一个数据库
  • 🔥使用Go从零开发一个编译器 (opens new window)
  • 🔥使用Go从零开发一个解释器 (opens new window)
Rust编程指南
  • SQL零基础指南
  • MySQL开发与调试指南
GitHub (opens new window)
  • 高效Go并发编程 说明
  • 1. 并发——高级概述
  • 2 Go并发原语
  • 3 Go内存模型
  • 4. 一些著名的并发问题
    • 技术要求
    • 生产者 - 消费者问题
    • 哲学家就餐问题
    • 速率限制
    • 总结
  • 5 工作池与管道
  • 6. 错误处理
  • 7 定时器和时钟
  • 8 并发处理请求
  • 9. 原子内存操作
  • 10 并发问题排查
目录

4. 一些著名的并发问题

# 4. 一些著名的并发问题

本章将介绍一些在实际应用中非常常见的并发问题,主要包括以下几个:

  • 生产者 - 消费者问题
  • 哲学家就餐问题
  • 速率限制

在本章结束时,你将看到这些问题的多种实现方式,并了解处理并发问题的一些实际思路。

# 技术要求

无。

# 生产者 - 消费者问题

在上一章中,我们使用条件变量实现了一个版本的生产者 - 消费者问题,并提到在大多数情况下,条件变量可以用通道来替代。本章中我们要实现的生产者 - 消费者问题将进一步说明这一点。像生产者 - 消费者问题这类并发问题,本质上属于消息传递问题,若尝试使用共享内存工具来解决,会导致代码不必要地复杂和冗长。

生产者 - 消费者问题的核心在于有限的中间存储。从高层次来看,该问题包含以不同速率生成对象的生产者进程,以及以不同速率消费这些对象的消费者进程,两者之间有有限的存储空间,用于存储已生成但尚未被消费的对象。在任何需要平衡对象生产和消费的系统中,都会涉及生产者 - 消费者问题。例如,工厂生产的商品必须先存储起来,直到销售出去。如果生产过多,就必须减缓生产速度;如果需求过大,则必须提高产量。

我们先在这里再次阐述一下生产者 - 消费者问题。有一个或多个生产者协程生成值,同时有一个或多个消费者协程以某种方式使用这些值。我们编写的生产者协程要能通过主协程发送的信号停止运行。当所有生产者停止后,消费者也应该随之停止。

我们将逐步完善这个程序。这里的目标是展示如何从最简单的功能片段开始实现这样一个程序,然后通过迭代增强,最终开发出更优的版本。下面是一个不错的生产者函数起始版本:

func producer(index int, done <-chan struct{}, output chan<- int) {
    for {
        // 生成一个值
        value := rand.Int()
        // 稍作等待
        time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
        // 发送值
        select {
        case output <- value:
        case <-done:
            return
        }
        fmt.Printf("Producer %d sent %d\n", index, value)
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

这个函数首先生成一个随机值,等待一小段时间,然后将该值发送到通道中。index参数仅用于打印是哪个生产者实例生成了特定的值。在将值发送到通道时,函数还会检查done通道是否被触发(通过关闭它),如果是,则返回。这个函数会在一个协程中运行,所以从函数返回也会终止该协程。

现在,让我们编写一个消费者函数:

func consumer(index int, input <-chan int) {
    for value := range input {
        fmt.Printf("Consumer %d received %d\n", index, value)
    }
}
1
2
3
4
5

消费者函数接收消费者协程的索引和数据通道。它只是监听数据通道,并打印接收到的值。当输入通道关闭时,消费者函数将终止。

现在,将它们组合起来:

func main() {
    doneCh := make(chan struct{})
    dataCh := make(chan int, 0)
    for i := 0; i < 10; i++ {
        go producer(i, doneCh, dataCh)
    }
    for i := 0; i < 10; i++ {
        go consumer(i, dataCh)
    }
    select {}
}
1
2
3
4
5
6
7
8
9
10
11

这个程序创建了一个数据通道和一个done通道,启动了10个生产者和10个消费者,然后无限期运行。与使用条件变量的版本相比,你应该能注意到这个程序的简洁性。这里不存在锁定共享对象的问题,因为没有共享对象;也不用担心缓冲生产者生成的数据,通道会处理所有这些问题。多个生产者会将数据放入通道,消费者会被随机分配来接收和处理这些数据,这一切都由运行时管理。

但这个程序还不完整,因为它无法优雅地终止。首先,我们可以用一个延迟来替换select{}语句,让程序运行一段时间(10秒),然后关闭done通道:

// select {}
time.Sleep(time.Second * 10)
close(doneCh)
1
2
3

然而,这还不够。我们关闭了通道并广播了终止所有生产者的请求,现在还得等待它们实际终止。这可以通过WaitGroup来实现:

producers := sync.WaitGroup{}
for i := 0; i < 10; i++ {
    producers.Add(1)
    go producer(i, &producers, doneCh, dataCh)
}
…
time.Sleep(time.Second * 10)
close(doneCh)
producers.Wait()
1
2
3
4
5
6
7
8
9

我们需要修改生产者函数以适应这种变化:

func producer(index int, wg *sync.WaitGroup, done chan struct{}, output chan<- int) {
    defer wg.Done()
    …
1
2
3

通过这些修改,我们现在可以在程序运行10秒后向生产者发送信号(close(done)),然后等待它们完成。一旦生产者完成,我们就可以向消费者发送终止信号。我们不使用done通道来实现这一点,因为我们希望消费者在处理完所有数据元素后再终止。为此,我们会在所有生产者完成后关闭dataCh。关闭dataCh将终止消费者中的for循环,使它们能够返回。这次,我们需要使用另一个等待组来等待所有消费者完成。完整的main函数如下:

func main() {
    doneCh := make(chan struct{})
    dataCh := make(chan int)
    producers := sync.WaitGroup{}
    consumers := sync.WaitGroup{}
    for i := 0; i < 10; i++ {
        producers.Add(1)
        go producer(i, &producers, doneCh, dataCh)
    }
    for i := 0; i < 10; i++ {
        consumers.Add(1)
        go consumer(i, &consumers, dataCh)
    }
    time.Sleep(time.Second * 10)
    close(doneCh)
    producers.Wait()
    close(dataCh)
    consumers.Wait()
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

消费者函数明显的修改如下:

func consumer(index int, wg *sync.WaitGroup, input <-chan int) {
    defer wg.Done()
    …
1
2
3

你可能会注意到,与使用条件变量的版本相比,使用简单的通道大大降低了实现的复杂度。

回到本节开头的工厂类比,通道非常准确地模拟了 “各方之间转移货物” 的过程。通过使用不同容量的通道,并调整生产者和消费者的数量,你可以针对特定的负载模式微调系统的行为。请记住,这种调优和优化活动应该在程序能够正常运行之后,并且在测量了基线行为之后进行。在观察程序的运行情况之前,绝不要尝试优化程序。先让它运行起来,然后再让它变得更好。

# 哲学家就餐问题

在第1章 “并发——高级概述” 中,我们从更高层面讨论并发时,提到过哲学家就餐问题。这是临界区研究中的一个重要问题。这个问题可能看起来有些人为设计的感觉,但它展示了一个在现实世界中经常出现的问题:进入临界区可能需要获取多个资源(互斥锁)。每当临界区依赖多个互斥锁时,就有可能出现死锁和饥饿的情况。现在,我们将研究在Go语言中解决这个问题的一些方法。我们先重新阐述一下这个问题:有五位哲学家围坐在同一张圆桌旁用餐。每位哲学家面前有一个盘子,每个盘子之间有一把叉子,总共五把叉子。他们吃的食物需要用到两把叉子,分别在盘子的左侧和右侧。每位哲学家会随机思考一段时间,然后进食一会儿。为了进食,哲学家必须拿起两把叉子,即盘子左侧和右侧各一把。

我们的第一个解决方案是使用五个协程代表哲学家,五个互斥锁代表叉子。当一个哲学家协程决定进食时,它必须锁定两把互斥锁。这个模型如图4.1所示。

img 图4.1——使用协程和互斥锁解决哲学家就餐问题

哲学家协程的代码如下:

func philosopher(index int, firstFork, secondFork *sync.Mutex) {
    for {
        // 思考一段时间
        fmt.Printf("Philosopher %d is thinking\n", index)
        time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
        // 拿起叉子
        firstFork.Lock()
        secondFork.Lock()
        // 进食
        fmt.Printf("Philosopher %d is eating\n", index)
        time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
        secondFork.Unlock()
        firstFork.Unlock()
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

在第4行,哲学家会随机思考一段时间。此时所有叉子都还未被拿起,所以互斥锁(mutexes)也都未被锁定。接着,在第7行,哲学家拿起第一把叉子。如果这把叉子已经被旁边的哲学家使用,那么这个哲学家就会阻塞,直到叉子被释放。然后,哲学家拿起第二把叉子。同样,如果这把叉子正被另一个哲学家使用,这个哲学家就必须等待。拿到两把叉子后,哲学家会随机进食一段时间,然后放下两把叉子。

使用下面的main函数,这个实现很容易产生死锁:

func main() {
    forks := [5]sync.Mutex{}
    go philosopher(0, &forks[4], &forks[0])
    go philosopher(1, &forks[0], &forks[1])
    go philosopher(2, &forks[1], &forks[2])
    go philosopher(3, &forks[2], &forks[3])
    go philosopher(4, &forks[3], &forks[4])
    select {}
}
1
2
3
4
5
6
7
8
9

分析一个算法是否会产生死锁,关键在于找出goroutine可能阻塞的地方。哲学家goroutine可能在第7行和第8行阻塞。要知道,产生死锁的条件之一是至少有一个goroutine必须独占一个互斥锁(这是科夫曼条件(Coffman conditions)之一)。这意味着至少有一个goroutine必须成功执行了第7行并锁定了一个互斥锁。另一个条件是,至少有一个goroutine在持有一个互斥锁的同时,还在等待另一个goroutine持有的互斥锁。也就是说,至少有一个goroutine必须在第8行阻塞。这也意味着,如果发生死锁,至少有一个goroutine必定在第8行。该实现保证了科夫曼的第三个条件:只有锁定互斥锁的goroutine才能解锁它。那么,寻找死锁的问题就简化为判断是否会出现循环等待的情况(科夫曼的第四个条件)。

假设系统发生了死锁。我们可以用表格列出每个goroutine持有哪些互斥锁(叉子),以及每个goroutine正在阻塞等待哪些互斥锁。图4.2展示了这样一个表格。这里,G0到G4代表goroutine,f0到f4代表被该goroutine锁定的叉子(互斥锁)以及该goroutine正在阻塞等待的叉子。我们从假设一个goroutine在尝试锁定互斥锁时被阻塞开始,逆向推导,看是否能达到死锁状态。例如,我们在第一行填入G0在f4处阻塞。这意味着G4已经锁定了f4。要实现这一点,G4肯定也锁定了f3。这表明G4进入了临界区,所以这不可能是死锁状态。

第二行展示了一种死锁情况。G0锁定了f4,但在f0处阻塞;G1锁定了f0,但在f1处阻塞,以此类推;G4锁定了f3并在f4处阻塞。由于这种循环依赖,所有goroutine都无法进入它们的临界区。这满足了科夫曼的第四个条件,所以实际上是可能发生死锁的。如果所有goroutine都在任何一个goroutine执行第8行之前,依次执行第7行,那么程序就会发生死锁。

图4.2:通过列出锁定的互斥锁和阻塞的goroutine来查找死锁

我们能打破这个循环吗?当然可以:如果我们改变其中一个哲学家拿起叉子的顺序,循环就会被打破。例如,如果第一个哲学家先拿起右边的叉子,而其他哲学家都先拿起左边的叉子,就不会发生死锁:

func main() {
    forks := [5]sync.Mutex{}
    go philosopher(0, &forks[0], &forks[4])
    go philosopher(1, &forks[0], &forks[1])
    go philosopher(2, &forks[1], &forks[2])
    go philosopher(3, &forks[2], &forks[3])
    go philosopher(4, &forks[3], &forks[4])
    select {}
}
1
2
3
4
5
6
7
8
9

不过,这样一来goroutine就不再完全相同了,所以我们需要做更多工作来证明不会发生死锁。

图4.3:通过列出锁定的互斥锁和阻塞的goroutine来查找死锁

要详尽地列出所有情况,我们需要为每个goroutine填写所有可能的选项,即一种情况是在第7行阻塞,另一种情况是在第7行锁定并在第8行阻塞。由于实现具有对称性,我们在图4.3中展示了G0和G1的情况。在第一行,我们假设G0在第7行阻塞,这意味着G1锁定了f0并在f1处阻塞,这又意味着G2锁定了f1并在f2处阻塞,以此类推。最后,我们发现G4可以进入它的临界区。第二行展示了G0锁定了f0但在f4处阻塞的情况。第三行展示了G1在锁定f0(第7行)时被阻塞的情况。

第四行展示了一种不可能的情况。我们从假设G2锁定了f0但在f1处阻塞开始填写这一行。这意味着G1在f0处阻塞。但这也意味着G1无法锁定f1,所以G2不可能在f1处阻塞。

如果你继续这个过程,你会发现每一行要么像第四行那样存在矛盾,要么其中一个goroutine能够进入它的临界区,这就意味着不会发生死锁。

许多并发库,包括Go的较新版本,都为互斥锁提供了TryLock方法。这看起来似乎是个很有用的特性:如果无法锁定互斥锁,就做其他事情。实际上,TryLock真正能派上用场的情况少之又少。你必须记住,当你调用TryLock并得到无法锁定互斥锁的结果时,这个互斥锁有可能其实是可以被锁定的。TryLock的一种可能用途是预防死锁:

func philosopher(index int, leftFork, rightFork *sync.Mutex) {
    for {
        fmt.Printf("Philospher %d is thinking\n", index)
        time.Sleep(time.Millisecond*
            time.Duration(rand.Intn(1000)))
        // 获取左边的叉子
        leftFork.Lock()
        // 获取右边的叉子
        if rightFork.TryLock() {
            // 进食
            fmt.Printf("Philosopher %d is eating\n",
                index)
            time.Sleep(time.Millisecond*
                time.Duration(rand.Intn(1000)))
            rightFork.Unlock()
        }
        leftFork.Unlock()
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

在这个实现中,哲学家goroutine先拿起左边的叉子,然后尝试拿起右边的叉子。如果失败,就把左边的叉子放回去,继续思考。这种方式不会产生死锁,但容易导致忙等待(busy-spinning),进而造成饥饿(starvation)。哲学家goroutine可能会长时间处于思考状态,不断拿起又放下左边的叉子。每次尝试锁定右边的叉子失败时,左边的叉子都会被释放然后再次锁定,这就消除了它在goroutine队列中等待更长时间可能带来的优势。

有没有一种既不使用TryLock,也不依赖锁定顺序的无死锁实现呢?答案就在下面基于通道的实现中:

func philosopher(index int, leftFork, rightFork chan bool) {
    for {
        // 思考一段时间
        fmt.Printf("Philospher %d is thinking\n", index)
        time.Sleep(time.Duration(rand.Intn(1000)))
        select {
        case <-leftFork:
            select {
            case <-rightFork:
                fmt.Printf("Philosopher %d is eating\n", index)
                time.Sleep(time.Millisecond*
                    time.Duration(rand.Intn(1000)))
                rightFork <- true
            default:
            }
            leftFork <- true
        }
    }
}

func main() {
    var forks [5]chan bool
    for i := range forks {
        forks[i] = make(chan bool, 1)
        forks[i] <- true
    }
    go philosopher(0, forks[4], forks[0])
    go philosopher(1, forks[0], forks[1])
    go philosopher(2, forks[1], forks[2])
    go philosopher(3, forks[2], forks[3])
    go philosopher(4, forks[3], forks[4])
    select {}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

在这个实现中,每把叉子都被建模为一个容量为1的通道。当叉子在桌子上时,通道里有一个值。这就是为什么程序一开始会初始化通道并向通道中放入值(也就是把叉子放在桌子上)。为了从桌子上拿起一把叉子,哲学家goroutine会从通道中读取数据。为了把叉子放回去,哲学家goroutine会向通道中写入数据。所有哲学家都会等待拿起左边的叉子。一旦拿到左边的叉子,他们也会尝试拿起右边的叉子。如果也成功了,哲学家就开始进食。否则,左边的叉子会被放回桌子上,哲学家继续思考。如你所见,这几乎和使用TryLock的互斥锁解决方案一样,只是这里用的是通道。

# 速率限制

限制对资源的请求速率对于维持可预测的服务质量至关重要。有多种实现速率控制的方法。我们将研究同一算法的两种实现。第一种是使用通道、定时器(ticker)和goroutine对令牌桶算法(token bucket algorithm)的相对简单的实现。然后,我们将研究一种更高级的实现,它需要的资源更少。

首先,让我们来看看令牌桶算法,并了解它是如何用于速率限制的。想象有一个固定大小的桶,里面装着令牌。有一个生产者进程以固定速率向这个桶中存入令牌,比如每秒存入两个令牌。每500毫秒,如果桶中有空槽,这个进程就会向桶中添加一个令牌。如果桶已满,它会再等待500毫秒,然后再次检查桶的状态。还有一个消费者进程会以随机间隔消耗令牌。然而,为了继续执行,消费者进程必须获取一个令牌。如果桶为空,消费者进程就必须等待新令牌存入。图4.4展示了这一过程。

img 图4.4:容量为4、速率为2的令牌桶算法

为了分析这个结构如何用于速率限制,我们首先来看一下当令牌桶容量为1、速率为每秒两个令牌时它的表现。假设请求随机到来,并且我们从一个满桶开始。第一个请求在时间t = 100ms时到来并消耗一个令牌。下一个令牌将在时间t = 500ms时存入,所以在此之前到来的任何请求都必须等待,直到新令牌生成。假设另一个请求在t = 300ms时到来,还有一个在t = 400ms时到来。第一个请求可以在t = 500ms新令牌生成时继续执行,第二个请求可以在t = 1000ms下一个令牌生成时继续执行。因此,任意两个请求之间的间隔都不会小于500ms,这实际上就将请求速率限制为每秒两个请求。

但是,当桶的容量大于1时会发生什么呢?比如说,桶可以容纳四个令牌,如图4.4所示。同样从满桶开始,假设在时间t = 100ms、t = 110ms、t = 120ms、t = 130ms和t = 140ms时来了一波突发请求。桶中已经有四个令牌,所以前四个请求会消耗这些令牌并继续执行。但是当在时间t = 140ms到来第五个请求时,桶为空,这个请求必须等待,直到t = 500ms有新令牌到来。假设接下来的请求在t = 1600ms、t = 1700ms和t = 1800ms到来。桶在t = 1000ms和t = 1500ms时存入了新令牌,所以前两个请求可以在t = 1600ms和t = 1700ms继续执行,但下一个请求必须等待新令牌,直到t = 2000ms。所以,容量大于1的令牌桶可以处理相同规模的突发请求,同时又不违反平均速率要求。在给定的时间段内,允许的请求数量仍然是根据速率存入的令牌数量。但是请求可以突发到来,只要不超过速率限制,系统就能接受这种突发情况。

基于我们的讨论,速率限制器可以实现一个如下的接口:

type RateLimit interface {
    Wait()
}
1
2
3

要对HTTP服务进行速率限制,可以在处理器(handlers)之间共享一个限制器。Wait调用会使处理器延迟,直到可以处理请求的时间:

func handle(w http.ResponseWriter,req *http.Request) {
    limiter.Wait()
    // 处理请求
}
1
2
3
4

此时,你可能已经意识到令牌桶和通道非常相似。确实,通道提供了一个简单的模型来准确实现该算法所描述的功能。通道成为令牌桶,一个生产者goroutine以均匀的速率向通道中放入令牌,并且可以通过从通道中读取数据来消耗令牌。如果通道为空,读取操作将阻塞,直到有新令牌存入桶中。因此,我们需要一个通道和一个定时器(ticker)。我们还会添加另一个通道,以便在速率限制器完成任务后关闭它:

type ChannelRate struct {
    bucket  chan struct{}
    ticker *time.Ticker
    done    chan struct{}
}
1
2
3
4
5

这里我们将通道类型设为struct{}。struct{}不占用任何字节,正如预期的那样,Go在处理这种情况时不会为通道缓冲区分配任何内存。bucket通道将用于存放令牌,而done通道仅用于在我们用完速率限制器后关闭它。我们需要定时器(ticker)来生成周期性的滴答信号,以便生成令牌。我们从一个构造函数开始,该函数初始化结构体成员并填充令牌桶:

func NewChannelRate(rate float64, limit int) *ChannelRate {
    ret := &ChannelRate{
        bucket: make(chan struct{}, limit),
        ticker: time.NewTicker(time.Duration(1 / rate * 1000000000)),
        done:   make(chan struct{}),
    }
    for i := 0; i < limit; i++ {
        ret.bucket <- struct{}{}
    }
    go func() {
        for {
            select {
            case <-ret.done:
                return
            case <-ret.ticker.C:
                select {
                case ret.bucket <- struct{}{}:
                default:
                }
            }
        }
    }()
    return ret
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

NewChannelRate函数有两个参数。rate指定每秒生成令牌的速率,limit指定令牌桶的大小。所以,我们会初始化一个容量为limit的通道,并启动一个周期为1/rate的定时器(ticker)(第2 - 6行)。然后,我们将令牌桶填满(第7 - 9行)。

这个函数的其余部分是生成令牌的goroutine(第10 - 22行)。这里有几点需要注意。首先,如果生成令牌时桶已满,令牌应该被丢弃(第16 - 19行)。其次,当done通道关闭时,我们必须关闭定时器(ticker)并终止goroutine(第13 - 14行)。

注意在生成新令牌时采用的非阻塞发送方式。这确保了在令牌桶已满的情况下,协程不会被阻塞。此外,要注意这个协程是一个闭包,所以它与实例化它的速率限制器实例相关联。

Wait方法现在很容易实现。下面的方法会等待,直到令牌桶中有可用的令牌:

func (s *ChannelRate) Wait() {
    <-s.bucket
}
1
2
3

我们可以通过以下方法优雅地关闭定时器:

func (s *ChannelRate) Close() {
    close(s.done)
    s.ticker.Stop()
}
1
2
3
4

这种限制器存在一个潜在问题,即每个限制器实例都需要额外启动两个协程:一个用于生成令牌,一个用于定时器。这通常不是什么大问题,特别是当速率限制器用于控制对公共服务的访问时。但如果需要大量的速率限制器实例,可能会导致资源紧张。例如,许多API提供商根据客户账户进行速率限制。使用这种速率限制器,每个请求需要三个协程:一个为每个客户执行实际工作,另外两个用于进行速率限制。有没有办法在不创建额外协程的情况下实现呢?

答案是肯定的。解决问题的关键在于认识到实际的速率限制只在消耗令牌时才会发生。所以,我们不用创建一个协程定期填充令牌桶,而是在需要的时候,也就是当我们想要消耗一个令牌且令牌桶为空时才去填充它。我们可以从用一个整数值nTokens替换令牌桶通道开始,用它来记录令牌桶中的令牌数量。每次消耗一个令牌,我们就将这个数字减1。当我们试图消耗一个令牌但令牌桶为空(即nTokens = 0)时,才进行实际的填充工作。

首先,考虑图4.5中的情况。最后一个令牌在tlast时刻生成。一个请求消耗了这个令牌,所以现在令牌桶为空。一个新请求在treq时刻到来,由于treq距离tlast足够久,在tlast和treq之间本应生成多个令牌。所以,我们只需通过(treq - tlast) / period计算出令牌桶应有的令牌数量。然后我们消耗其中一个令牌并继续。最后生成令牌的新时间值为tlast + nTokens * period。

img 图4.5 - 多个令牌生成后有新请求到来

另一种可能的情况如图4.6所示。这是新请求在生成下一个令牌之前到来的情况。在这种情况下,请求必须等待,直到到了生成新令牌的时间,即tlast + period - treq。

img 图4.6 - 新请求在生成下一个令牌之前到来

基于此,我们必须将限制器的定义更改为如下内容:

type Limiter struct {
    mu sync.Mutex
    // 每秒填充到令牌桶的令牌数
    rate int
    // 令牌桶大小
    bucketSize int
    // 令牌桶中的令牌数量
    nTokens int
    // 最后一个令牌的生成时间
    lastToken time.Time
}
1
2
3
4
5
6
7
8
9
10
11

现在我们需要一个互斥锁(mutex)来保护速率限制器的变量,因为我们不再有能确保互斥的通道了。Wait方法只能由一个协程调用,其他所有协程都必须等待,直到活动协程完成操作。初始化很简单:

func NewLimiter(rate, limit int) *Limiter {
    return &Limiter{
        rate:       rate,
        bucketSize: limit,
        nTokens:    limit,
        lastToken:  time.Now(),
    }
}
1
2
3
4
5
6
7
8

这次,我们必须在结构体中保留rate和limit变量,因为我们将用它们来计算令牌的生成时间。

所有的操作都在Wait方法中完成:

1: func (s *Limiter) Wait() {
2:    s.mu.Lock()
3:    defer s.mu.Unlock()
4:    if s.nTokens > 0 {
5:        s.nTokens--
6:        return
7:    }
8:    // 这里表示令牌桶中没有足够的令牌
9:    tElapsed := time.Since(s.lastToken)
10:     period := time.Second / time.Duration(s.rate)
11:    nTokens := tElapsed.Nanoseconds() / period.Nanoseconds()
12:    s.nTokens = int(nTokens)
13:    if s.nTokens > s.bucketSize {
14:        s.nTokens = s.bucketSize
15:    }
16:    s.lastToken = s.lastToken.Add(time.Duration(nTokens) *
period)
17:    // 我们填充了令牌桶。但可能仍然没有足够的令牌
18:    if s.nTokens > 0 {
19:        s.nTokens--
20:        return
21:    }
22:    // 我们必须等待,直到有更多令牌可用
23:    // 下一个令牌应该在这个时间可用:
24:    next := s.lastToken.Add(period)
25:    wait := next.Sub(time.Now())
26:    if wait >= 0 {
27:        time.Sleep(wait)
28:    }
29:    s.lastToken = next
30:}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

第4 - 7行通常被称为“顺利路径”。当令牌桶中有令牌时,我们直接获取一个并返回。如果算法执行到第9行,说明令牌桶中没有令牌。第9 - 11行根据自上次生成令牌以来的 elapsed 时间计算应该生成的令牌数量。如果这个数量大于令牌桶的大小,我们将令牌桶填满,并丢弃多余的令牌(第13 - 15行)。第16行更新最后生成令牌的时间。注意,这里使用的是实际生成的令牌数量,而不仅仅是存储在令牌桶中的数量。此时,如果令牌桶中有令牌,我们获取它并返回(第18 - 21行)。如果没有,我们就必须等待。等待时间根据当前时间和下一个令牌的生成时间计算(第24 - 25行)。然后,速率限制器等待,等待结束后,我们知道令牌桶中只有一个令牌,所以消耗这个令牌并返回。

这种速率限制器在不创建任何额外协程的情况下就能工作。它比前面那种消耗的资源更少,因此更适合在需要大量速率限制器的场景中使用,比如在根据API用户配置速率限制的API提供商中。有一个公开可用的速率限制器包golang.org/x/time/rate,它的实现方式与我们这里类似。在生产环境中,建议使用这个包,因为它提供了更丰富的API和上下文支持。上下文支持很有必要,因为正如你所见,我们的速率限制器即使在请求者取消请求的情况下仍会继续等待。

# 总结

在本章中,我们研究了三个在处理复杂问题时经常出现的著名并发问题。生产者 - 消费者实现可用于数据处理管道、爬虫、设备交互、网络通信等领域。哲学家就餐问题很好地展示了需要多个互斥锁的临界区。最后,速率限制在确保服务质量、限制资源利用和API计费等方面都有应用。

在下一章中,我们将开始探讨更实际的并发编程示例,特别是工作池、并发数据管道以及扇入/扇出模式。

上次更新: 2025/04/08, 19:40:35
3 Go内存模型
5 工作池与管道

← 3 Go内存模型 5 工作池与管道→

最近更新
01
C++语言面试问题集锦 目录与说明
03-27
02
第四章 Lambda函数
03-27
03
第二章 关键字static及其不同用法
03-27
更多文章>
Copyright © 2024-2025 沪ICP备2023015129号 张小方 版权所有
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式