7 定时器和时钟
# 7 定时器和时钟
许多长期运行的应用程序会对操作的持续时间进行限制。它们还会定期执行诸如健康检查之类的任务,以确保所有组件都按预期工作。许多平台都提供高精度的定时器操作,Go标准库在time
包中提供了这些服务的可移植抽象。在本章中,我们将学习定时器(Timers)和时钟(Tickers)。定时器用于在一段时间后执行任务,而时钟用于定期执行任务。
本章我们将学习的主要内容如下:
- 定时器 —— 在一段时间后运行任务
- 时钟 —— 定期运行任务
在本章结束时,你将了解如何使用定时器和时钟,以及如何使用心跳机制来监控其他goroutine。
# 技术要求
无。
# 定时器 —— 在一段时间后运行任务
如果你想在一段时间后执行某项任务,可以使用time.Timer
。Timer
实现的功能类似于以下代码(仅用于说明,请勿实际使用):
type TimerMockup struct {
C chan<- time.Time
}
func NewTimerMockup(dur time.Duration) *TimerMockup {
t := &TimerMockup{
C: make(chan time.Time, 1),
}
go func() {
// 休眠,然后向通道发送消息
time.Sleep(dur)
t.C <- time.Now()
}()
return t
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
所以,定时器就像是一个goroutine,它会在休眠指定的时间后,向一个通道发送消息。Timer
的实际实现使用了特定平台的定时器,因此它更加精确,并不像启动一个goroutine然后等待那么简单。需要记住的一点是,当你从定时器通道接收到事件时,这意味着在消息发送时定时器的持续时间已到,这与消息被接收的时间并不相同。
你可能已经注意到,定时器使用的通道容量为1。这可以防止在没有其他goroutine监听定时器通道时出现goroutine泄漏。有缓冲的通道意味着在持续时间结束时会生成事件,但如果没有goroutine监听该通道,事件会在通道中等待,直到被读取或定时器被垃圾回收。
定时器的一个常见用途是限制任务的运行时间:
func main() {
// 定时器将用于在100毫秒后取消工作
timer := time.NewTimer(100 * time.Millisecond)
// 100毫秒后关闭超时通道
timeout := make(chan struct{})
go func() {
<-timer.C
close(timeout)
fmt.Println("Timeout")
}()
// 执行一些工作,直到超时
x := 0
done := false
for!done {
// 检查是否超时
select {
case <-timeout:
done = true
default:
}
time.Sleep(time.Millisecond)
x++
}
fmt.Println(x)
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
使用time.AfterFunc
函数可以大大简化定时器的设置。下面的函数调用可以替代前面代码片段中的定时器设置和goroutine。time.AfterFunc
函数会在给定的持续时间后简单地调用给定的函数:
time.AfterFunc(100*time.Millisecond, func() {
close(timeout)
fmt.Println("Timeout")
})
2
3
4
类似的方法是使用time.After
:
ch := time.After(100*time.Millisecond)
然后,ch
通道将在100毫秒后接收一个时间值。
停止定时器很简单。在前面的程序中,如果长时间运行的任务在超时之前完成,我们希望停止定时器,否则它会打印错误的“Timeout”消息。调用Stop()
方法可能会成功停止尚未过期的定时器,也有可能在你调用Stop()
之后定时器才过期。这两种情况如图7.1所示。
图7.1 在定时器触发之前和之后停止定时器
如果Stop()
返回true
,则表示你成功停止了定时器。然而,如果Stop()
返回false
,则表示定时器已经过期,因此它已经停止。但这并不意味着定时器通道中的消息已经被消费,该消息可能在Stop()
返回后才被消费。不要忘记定时器通道的容量为1,所以即使没有任何goroutine接收消息,定时器也会向该通道发送消息。
Timer
类型允许你重置定时器。通过NewTimer
创建的定时器和通过AfterFunc
创建的定时器,它们重置时的行为有所不同:
- 如果定时器是通过
AfterFunc
创建的,重置定时器要么重置函数首次运行的时间(在这种情况下,Reset
将返回true
),要么设置函数再次运行的时间(在这种情况下,Reset
将返回false
) 。 - 如果定时器是通过
NewTimer
创建的,只能在停止且通道已清空的定时器上进行重置。并且,清空通道和重置定时器不能与从定时器接收消息的goroutine并发进行。下面的代码块展示了正确的做法。需要注意的重要一点是,在进行定时器通道清空和重置操作时,不能使用select
语句的timeout
分支从定时器通道接收消息。换句话说,在重置定时器时,不应有其他goroutine监听该定时器的通道:
select {
case <-timer.C:
// 超时
case d := <-resetTimer:
if!timer.Stop() {
<-timer.C
}
timer.Reset(d)
}
2
3
4
5
6
7
8
9
定时器,尤其是AfterFunc
,有很多不同且有趣的用例。对于超时处理,context.Context
是一种更符合Go语言习惯的工具。我们将在下一章学习它。
# 时钟 —— 定期运行任务
通过重复调用AfterFunc
来定期运行一个函数,这可能是个合理的想法:
var periodicTask func()
periodicTask = func() {
DoSomething()
time.AfterFunc(time.Second, periodicTask)
}
time.AfterFunc(time.Second, periodicTask)
2
3
4
5
6
采用这种方法,每次函数运行时都会调度下一次运行,但函数运行持续时间的变化会随着时间累积。这在某些用例中可能完全可以接受,但有一种更好、更简单的方法:使用time.Ticker
。
time.Ticker
的API与time.Timer
非常相似:你可以使用time.NewTicker
创建一个时钟,然后监听一个通道,该通道会定期发送节拍信号,直到时钟被显式停止。节拍的周期不会根据监听器的运行时间而改变。下面的程序会在10秒内打印程序开始后经过的毫秒数:
func main() {
start := time.Now()
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
done := time.After(10 * time.Second)
for {
select {
case <-ticker.C:
fmt.Printf("Tick: %d\n",
time.Since(start).Milliseconds())
case <-done:
return
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
如果你在下次节拍到来之前无法完成任务会怎样呢?如果错过了几个节拍,是否需要担心接收到一堆节拍信号?幸运的是,time.Ticker
能够合理地处理这些情况。假设我们有一个任务由时钟触发,并且该任务可能在下次节拍到来之前完成,也可能无法完成。这可能是对第三方服务的网络调用,耗时比预期长,或者是在高负载下的数据库调用。不管是什么原因,当下次节拍到来时,由于任务尚未完成,它还没有准备好接收节拍信号。
图7.2 正常的时钟行为与错过的信号
图7.2展示了在这种情况下Ticker
的行为。最左边的图中,任务始终在下次节拍到来之前完成,所以执行是定期的,间隔均匀。中间的图显示,任务的第一次执行在下次节拍到来之前完成,但第二次执行耗时更长,应用程序错过了节拍。在这种情况下,应用程序一旦监听通道,下一个节拍就会立即到达。第三次执行比平常晚开始,但第四次执行恢复了正常节奏。最右边的图显示,任务的第一次执行耗时太长,导致错过了多个节拍。发生这种情况时,任务一旦监听通道,下一个节拍就会立即到达,随后的节拍按正常节奏到达。简而言之,时钟通道中最多只会有一条消息等待。如果你错过了多个节拍,对于这些错过的节拍,你只会收到一个节拍信号。
需要记住的重要一点是,使用完时钟后,必须使用Stop()
方法停止它。与定时器不同,定时器触发一次后就会被垃圾回收,而时钟有一个goroutine会通过通道持续发送节拍信号。如果你忘记停止时钟,这个goroutine就会泄漏,永远不会被垃圾回收。所以,应使用defer ticker.Stop()
。
# 心跳机制
超时机制对于限制函数的执行时间很有用。但当函数预计需要很长时间才能返回,或者根本不会返回时,超时机制就不起作用了。你需要一种方法来监控该函数,确保它在正常进行并且仍在运行。有几种方法可以实现这一点。
一种方法是编写一个长期运行的函数,向监控器报告其进度。这些报告不必均匀发送。如果监控器发现长期运行的函数有一段时间没有报告进度,它可以尝试停止该进程、提醒管理员或打印错误消息。下面的代码块给出了这样一个监控函数。该函数期望从长期运行的函数的心跳通道接收信息。如果在两个连续的定时器节拍之间没有收到心跳信号,就认为该进程已停止,并且关闭done
通道以尝试取消该进程:
func monitor(heartbeat, done chan struct{}, tick <-chan time.Time) {
// 记录上次收到心跳的时间
var lastHeartbeat time.Time
var numTicks int
for {
select {
case <-tick:
numTicks++
if numTicks >= 2 {
fmt.Printf("No progress since %s, terminating\n", lastHeartbeat)
close(done)
return
}
case <-heartbeat:
lastHeartbeat = time.Now()
numTicks = 0
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
长期运行的函数通常具有以下结构:
func longRunningProcess(heartbeat, done chan struct{}) {
for {
// 执行可能耗时较长的操作
DoSomething()
select {
case <-done:
return
case heartbeat <- struct{}{}:
// 这个select语句可以有一个default分支,用于非阻塞操作
}
}
}
2
3
4
5
6
7
8
9
10
11
12
时钟决定了长期运行函数保持无进度状态的最长允许时间:
func main() {
heartbeat := make(chan struct{})
done := make(chan struct{})
// 期望至少每秒收到一次心跳
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
go longRunningProcess(heartbeat, done)
go monitor(heartbeat, done, ticker.C)
<-done
}
2
3
4
5
6
7
8
9
10
这个心跳机制的实现只是发送一个struct{}{}
值。它也可以发送一个递增的序列值来显示进度,或者发送其他类型的元数据,以便记录进度指示或显示给最终用户。
无法保证一个挂起的goroutine有机会从done
通道读取数据并正常返回。它可能只是一直等待一个永远不会发生的事件,且没有任何进度指示。这在使用你无法控制的第三方库或API时尤其明显。在这种情况下,你能做的事情不多。你可以关闭done
通道,希望该goroutine最终会终止。不过,你应该记录这些情况,以便在程序外部进行处理。我见过一些情况,对于无法终止的进程,会将其放在一个单独的二进制文件中处理。第二个二进制文件执行长期运行的任务,一段时间后,由于无法修复的资源泄漏,它会终止。然后由编排软件或程序本身再次启动它。
# 总结
定时器和时钟使你能够在未来执行任务以及定期执行任务。我们这里只介绍了几个用例。它们是功能多样的工具,经常在一些意想不到的地方发挥作用。Go运行时为这些工具提供了极其高效的实现。不过,你需要小心,因为它们总会使程序流程变得复杂。确保关闭你的时钟。
在接下来的章节中,我们将开始整合所学内容,并探讨一些并发模式在实际中的应用案例。