在刚开始学习golang语言的时候就听说Timer、Ticker的使用要尤其注意,很容易出现问题,这次就来一探究竟。
本文主要脉络:
timer和ticker都是定时器,不同的是:
对于Timer,可以通过三种函数创建:time.NewTimer?、time.AfterFunc?、time.After?。
其使用范例如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
// FnTimer1 Timer的使用用法 func FnTimer1() { timer := time.NewTimer(time.Second * 5) //返回生成的timer fmt.Printf("timer 的启动时间为:%v\n", time.Now())
expire := <-timer.C //注意这个channel是一个容量为1的channel! fmt.Printf("timer 的触发时间为:%v\n", expire) }
func FnTimer2() { ch1 := make(chan int, 1) select { case e1 := <-ch1: //如果ch1通道成功读取数据,则执行该case处理语句 fmt.Printf("1th case is selected. e1=%v",e1) case <- time.After(2 * time.Second): //time.After直接返回生成的timer的channel,所以一般用于超时控制 fmt.Println("Timed out") } }
func FnTimer3() { _ = time.AfterFunc(time.Second*5, func() { //返回的也是timer,不过可以自己传入函数进行执行 fmt.Printf("定时器触发了,触发时间为%v\n", time.Now()) })
fmt.Printf("timer 的启动时间为:%v\n", time.Now()) time.Sleep(10 * time.Second) // 确保定时器触发 } |
在底层原理上,三种不同的创建方式都是调用的time.NewTimer?,不同的是:
补充:
返回的channel的容量为1,因此是一个asynchronous的channel,即在定时器fired(触发)、stop(停止)、reset(重启)之后,仍然有可能能收到数据~
垃圾回收:在go1.23之前,对于处于active(没有触发且没有显式调用Stop)的Timer,gc是无法回收的,Ticket也是同样的道理。因此在高并发的场景下需要显式搭配??defer time.Stop()??来解决暂时的“内存泄露”的问题。
上述两点在Golang 1.23得到了解决,且可能在未来的版本中channel的容量会改为0(由asynchronous改成sync),在Go 1.23相关源码的注释部分有对应的说明。
ticket相比于timer,其会循环触发,因此通常用于循环的干一些事情,like:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
// FnTicket2 Ticker的正确的使用方法!! func FnTicket2() { ticker := time.NewTicker(1 * time.Second) stopTicker := make(chan struct{}) defer ticker.Stop() go func() { for { select { case now := <-ticker.C: // do something fmt.Printf("ticker 的触发时间为:%v\n", now) case <-stopTicker: fmt.Println("ticker 结束") return } } }()
time.Sleep(4 * time.Second) stopTicker <- struct{}{} } |
注意:代码块中使用stopTicker? + select?的方式是必须的,由于Stop函数只是修改定时器的状态,并不会主动关闭channel,因此如果直接使用循环(见?我的github仓库? )会直接导致Ticker永远不退出而导致内存泄露!
补充:TIcket = 触发时自动设置下一次时间的Timer,因此上面提到的Go1.23之前的Timer存在的问题依然存在。
- 返回的channel容量为1
- 垃圾回收:由于Ticket会一直循环触发,因此如果不显式调用??time.Stop??方法的话,会永久内存泄露。
此外,对于Ticket,还有一些额外的注意事项:
- 需要使用一个额外的channel+select的方式来正确停止Ticket。
- Reset函数的问题,由于比较长,单独整理在golang1.23版本之前 Timer Reset无法正确使用的问题,这里说个结论 :?Reset??函数由于内部非原子,因此无法完美的使用,建议使用goroutine+Timer的方式替代!
由于Golang 1.23版本对定时器timer、ticker做了很大的改进,因此要分成1.23之前和1.23及其之后的版本分开考虑:
以下是1.23版本关于timer、ticker部分的修改:
未停止的定时器和不再被引用的计时器可以进行垃圾回收。在 Go 1.23 之前,未停止的定时器无法被垃圾回收,直到定时器超时,而未停止的计时器永远无法被垃圾回收。Go 1.23 的实现避免了在不使用 t.Stop? 的程序中出现资源泄漏。
定时器通道现在是同步的(无缓冲的),这使 t.Reset? 和 t.Stop? 方法具有更强的保证:在其中一个方法返回后,将来从定时器通道接收到的任何值都不会观察到与旧定时器配置相对应的陈旧时间值。在 Go 1.23 之前,无法使用 t.Reset? 避免陈旧值,而使用 t.Stop? 避免陈旧值需要仔细使用 t.Stop? 的返回值。Go 1.23 的实现完全消除了这种担忧。
总结一下:1.23版本改进了timer、ticker的垃圾回收和 停止、重置的相关方法(Reset、Stop)。
这也就意味着在1.23版本之前,我们在使用的时候要注意:垃圾回收和停止、重置相关方法的使用。
由于Reset、Stop方法的外在表现本质上上是跟缓冲区由 有缓冲 改为 无缓冲 相关,因此如果有涉及读取缓冲区我们也需要注意相关特性。
具体来说,对于1.23之前:
源码解读版本:release-branch.go1.8?
timer(ticket)的运作依赖于struct p?,源码位置:src/runtime/runtime2.go:603。
所有的计时器timer都以最小四叉堆的形式存储在struct p?的timers?字段中。并且也是交给了计时器都交由处理器的网络轮询器和调度器触发,这种方式能够充分利用本地性、减少上下文的切换开销,也是目前性能最好的实现方式。
一般来说管理定时器的结构有3种:双向链表、最小堆、时间轮(很少)。
双向链表:插入|修改时间:需要遍历去寻找插入|修改的位置,时间复杂度O(N);触发:链表头触发即可,时间复杂度O(1)。
最小堆:插入|修改时间:O(logN)的时间复杂度;触发:队头触发,但是触发之后需要调整堆以保证堆的特性,O(logN)的时间复杂度。
时间轮:插入|修改时间|触发的时间复杂度都是O(1)。但是需要额外维护时间轮的结构,其占据空间随着需要维护的未来时间长度、时间精度增加。
定时器体系有两种timer:一次性触发的Timer?和循环触发的Ticker?,这两种的底层结构是相同的,触发逻辑是类似的。
毕竟Ticker?的功能包含Timer?的功能。
定时器在源码中使用的结构体是timer?,其定义为:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
// Package time knows the layout of this structure. // If this struct changes, adjust ../time/sleep.go:/runtimeTimer. type timer struct { // If this timer is on a heap, which P's heap it is on. // puintptr rather than *p to match uintptr in the versions // of this struct defined in other packages. pp puintptr //指针,指向持有当前timer的p
// Timer wakes up at when, and then at when+period, ... (period > 0 only) // each time calling f(arg, now) in the timer goroutine, so f must be // a well-behaved function and not block. // // when must be positive on an active timer. when int64 //当前计时器被唤醒的时间; period int64 //两次被唤醒的间隔; f func(any, uintptr) //唤醒时要执行的函数 arg any // 计时器被唤醒时调用 f 传入的参数; seq uintptr
// What to set the when field to in timerModifiedXX status. nextwhen int64 //当定时器状态变为timerModifiedXX的时,when 字段下一次要设置的值
// The status field holds one of the values below. status uint32 //计时器的状态,最重要的字段之一 } |
?timer?在p?中的相关字段为:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
type p struct { // Lock for timers. We normally access the timers while running // on this P, but the scheduler can also do it from a different P. timersLock mutex //操作timers的时候加锁
// Actions to take at some time. This is used to implement the // standard library's time package. // Must hold timersLock to access. timers []*timer //timers数组
// Number of timers in P's heap. // Modified using atomic instructions. numTimers uint32 //总timers的数量
// Number of timerDeleted timers in P's heap. // Modified using atomic instructions. deletedTimers uint32 //处于deleted状态的timers的数量
// Race context used while executing timer functions. timerRaceCtx uintptr //竞态检测相关 } |
go内部的定时器的操作是并发安全的(新建定时器、停止定时器)等,为了支持并发安全和定时器的高效调度,在源码中设计了一套关于定时器的状态机,全部的状态为:
状态 | 解释 |
---|---|
timerNoStatus | 还没有设置状态 |
timerWaiting | 定时器等待触发 |
timerRunning | timer正在运行 |
timerDeleted | 定时器被标记删除 |
timerRemoving | 定时器从标记删除到真正删除的中间态 |
timerRemoved | 定时器真正被删除 |
timerModifying | 正在被修改的中间状态 |
timerModifiedEarlier | 定时器被修改到了更早的触发时间 |
timerModifiedLater | 定时器被修改到了更晚的触发时间 |
timerMoving | 已经被修改正在被移动 |
修改定时器的状态机涉及如下所示的 7 种不同操作,它们分别承担了不同的职责:
状态机的变化流程图 可以大概帮我们看出timer?的不同状态的流转情况:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
@startuml
[*] --> timerNoStatus : 运行时创建Timer
timerNoStatus -->timerWaiting : addtimer
timerWaiting -->timerModifying : deltimer、modtimer
timerModifying -->timerDeleted : deltimer
timerModifiedLater -->timerDeleted : deltimer
timerModifiedEarlier -->timerModifying : deltimer
timerDeleted -->timerRemoving : cleantimers、adjusttimers、runtimer
timerRemoving -->timerRemoved : cleantimers、adjusttimers、runtimer
timerModifiedEarlier --> timerMoving : cleantimers、adjusttimers
timerModifiedLater --> timerMoving : cleantimers、adjusttimers
timerMoving --> timerWaiting : cleantimers
timerWaiting --> timerRunning : runtimer
timerRunning --> timerWaiting : runtimer
timerRunning --> timerNoStatus : runtimer
state timerModifiedXX { state timerModifiedEarlier { } state timerModifiedLater { } }
timerModifying --> timerModifiedXX : modtimer timerModifiedXX --> timerModifying : modtimer
timerNoStatus --> timerModifying : modtimer timerModifying --> timerWaiting : modtimer timerRemoved --> timerModifying : modtimer timerDeleted --> timerModifying : modtimer
timerWaiting : 定时器等待触发 timerModifying : 定时器状态修改的中间态 timerDeleted : 定时器被标记删除的状态
timerRemoving: 定时器从标记删除到真正被删除的中间态 timerRemoved: 定时器真正被删除
timerModifiedEarlier: 定时器被修改到了更早的触发时间 timerModifiedLater : 定时器被修改到了更晚的触发时间
timerMoving: 定时器在堆上的位置正在重新排序 timerRunning: timer正在运行 timerModifiedXX: 定时器在堆上的位置等待重新排序
@enduml |
实际上这些状态的流转都被完整的写在了golang的源码中,在后面逐个函数的讲解中也会涉及到:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
// addtimer: // timerNoStatus -> timerWaiting // anything else -> panic: invalid value // deltimer: // timerWaiting -> timerModifying -> timerDeleted // timerModifiedEarlier -> timerModifying -> timerDeleted // timerModifiedLater -> timerModifying -> timerDeleted // timerNoStatus -> do nothing // timerDeleted -> do nothing // timerRemoving -> do nothing // timerRemoved -> do nothing // timerRunning -> wait until status changes // timerMoving -> wait until status changes // timerModifying -> wait until status changes // modtimer: // timerWaiting -> timerModifying -> timerModifiedXX // timerModifiedXX -> timerModifying -> timerModifiedYY // timerNoStatus -> timerModifying -> timerWaiting // timerRemoved -> timerModifying -> timerWaiting // timerDeleted -> timerModifying -> timerModifiedXX // timerRunning -> wait until status changes // timerMoving -> wait until status changes // timerRemoving -> wait until status changes // timerModifying -> wait until status changes // cleantimers (looks in P's timer heap): // timerDeleted -> timerRemoving -> timerRemoved // timerModifiedXX -> timerMoving -> timerWaiting // adjusttimers (looks in P's timer heap): // timerDeleted -> timerRemoving -> timerRemoved // timerModifiedXX -> timerMoving -> timerWaiting // runtimer (looks in P's timer heap): // timerNoStatus -> panic: uninitialized timer // timerWaiting -> timerWaiting or // timerWaiting -> timerRunning -> timerNoStatus or // timerWaiting -> timerRunning -> timerWaiting // timerModifying -> wait until status changes // timerModifiedXX -> timerMoving -> timerWaiting // timerDeleted -> timerRemoving -> timerRemoved // timerRunning -> panic: concurrent runtimer calls // timerRemoved -> panic: inconsistent timer heap // timerRemoving -> panic: inconsistent timer heap // timerMoving -> panic: inconsistent timer heap |
addtimer对于状态的操作:
addtimer的主要功能:向当前p的定时器堆中添加当前定时器,并尝试唤醒网络轮训器(定时器的执行依赖于网络轮训器处理)。
这里提到的网络轮训器可能让人有点疑惑,定时器和网络有什么关系?实际上确实也没什么关系,这里提到的网络轮训器重点在于轮训器,指的更多的是select、poll、epoll那套东西。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
func addtimer(t *timer) { // xxx if t.status != timerNoStatus { throw("addtimer called with initialized timer") } t.status = timerWaiting
when := t.when
// Disable preemption while using pp to avoid changing another P's heap. mp := acquirem()
pp := getg().m.p.ptr() lock(&pp.timersLock) cleantimers(pp) //尝试清除timers堆中堆头的元素,以加速定时器添加 doaddtimer(pp, t) unlock(&pp.timersLock)
wakeNetPoller(when)
releasem(mp) } |
?time.stopTimer?的底层实际调用就是deltimer?。
?deltimer?对于状态的操作:
// timerWaiting -> timerModifying -> timerDeleted
// timerModifiedEarlier -> timerModifying -> timerDeleted
// timerModifiedLater -> timerModifying -> timerDeleted
// timerNoStatus -> do nothing
// timerDeleted -> do nothing
// timerRemoving -> do nothing
// timerRemoved -> do nothing
// timerRunning -> wait until status changes
// timerMoving -> wait until status changes
// timerModifying -> wait until status changes
?deltimer?的主要功能:对于传入的定时器进行标记删除(状态status设置为timerDeleted?)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
// deltimer deletes the timer t. It may be on some other P, so we can't // actually remove it from the timers heap. We can only mark it as deleted. // It will be removed in due course by the P whose heap it is on. // Reports whether the timer was removed before it was run. func deltimer(t *timer) bool { for { switch s := atomic.Load(&t.status); s { case timerWaiting, timerModifiedLater: // Prevent preemption while the timer is in timerModifying. // This could lead to a self-deadlock. See #38070. mp := acquirem() if atomic.Cas(&t.status, s, timerModifying) { // 必须要先拿到tpp,因为当状态设置为timerDeleted之后,timer就有可能被清除(cleantimers函数),就拿不到tpp了 tpp := t.pp.ptr() if !atomic.Cas(&t.status, timerModifying, timerDeleted) { badTimer() } releasem(mp) atomic.Xadd(&tpp.deletedTimers, 1) // Timer was not yet run. return true } else { releasem(mp) } case timerModifiedEarlier: // 这里和上面case的代码在源码中除了注释少一点其他一模一样,暂不清楚为什么 mp := acquirem() if atomic.Cas(&t.status, s, timerModifying) { tpp := t.pp.ptr() //先拿到tpp,原理同上 if !atomic.Cas(&t.status, timerModifying, timerDeleted) { badTimer() } releasem(mp) atomic.Xadd(&tpp.deletedTimers, 1) // Timer was not yet run. return true } else { releasem(mp) } case timerDeleted, timerRemoving, timerRemoved: // Timer was already run. return false case timerRunning, timerMoving: // The timer is being run or moved, by a different P. // Wait for it to complete. osyield() case timerNoStatus: // Removing timer that was never added or // has already been run. Also see issue 21874. return false case timerModifying: // Simultaneous calls to deltimer and modtimer. // Wait for the other call to complete. osyield() default: badTimer() } } } |
?time.reset?方法底层实际上调用就是modtimer?方法。
?modtimer?对于状态的修改,可以简单的归纳为:
1 2 3 4 5 6 7 8 9 |
// timerWaiting -> timerModifying -> timerModifiedXX // timerModifiedXX -> timerModifying -> timerModifiedYY // timerNoStatus -> timerModifying -> timerWaiting // timerRemoved -> timerModifying -> timerWaiting // timerDeleted -> timerModifying -> timerModifiedXX // timerRunning -> wait until status changes // timerMoving -> wait until status changes // timerRemoving -> wait until status changes // timerModifying -> wait until status changes |
modtimer的主要功能:重置定时器。具体来说,首先判断timer还在不在heap中
底层调用的就是modtimer,这里不多赘述了。
1 2 3 4 5 6 7 8 |
// resettimer resets the time when a timer should fire. // If used for an inactive timer, the timer will become active. // This should be called instead of addtimer if the timer value has been, // or may have been, used previously. // Reports whether the timer was modified before it was run. func resettimer(t *timer, when int64) bool { return modtimer(t, when, t.period, t.f, t.arg, t.seq) } |
在addtimer中有调用,具体会在添加新定时器之前调用(addtimer源码)。
函数作用为:尝试清理heap头(第一个元素)的定时器:移除或者调整到正确的位置,可以加速addtimer?添加定时器。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
// cleantimers cleans up the head of the timer queue. This speeds up // programs that create and delete timers; leaving them in the heap // slows down addtimer. Reports whether no timer problems were found. // The caller must have locked the timers for pp. func cleantimers(pp *p) { gp := getg() for { if len(pp.timers) == 0 { return }
t := pp.timers[0]
switch s := atomic.Load(&t.status); s { case timerDeleted: //被标记删除,现在正式移除 if !atomic.Cas(&t.status, s, timerRemoving) { continue } dodeltimer0(pp) if !atomic.Cas(&t.status, timerRemoving, timerRemoved) { badTimer() } atomic.Xadd(&pp.deletedTimers, -1) case timerModifiedEarlier, timerModifiedLater: //定时器被调整,移动其到正确的位置 if !atomic.Cas(&t.status, s, timerMoving) { continue } // Now we can change the when field. t.when = t.nextwhen // Move t to the right position. dodeltimer0(pp) doaddtimer(pp, t) if !atomic.Cas(&t.status, timerMoving, timerWaiting) { badTimer() } default: // Head of timers does not need adjustment. return } } } |
?adjusttimers?对状态的修改:
1 2 3 |
// adjusttimers (looks in P's timer heap): // timerDeleted -> timerRemoving -> timerRemoved // timerModifiedXX -> timerMoving -> timerWaiting |
?adjusttimers?的主要作用与cleantimers?相同:尝试清理heap的定时器:移除或者调整到正确的位置。
不同的是:??cleantimers??只会对堆头的元素进行处理,而??adjusttimers??是遍历堆中所有的元素进行处理。
很有意思的一点是:对于timerModifiedXX?状态的定时器,由于是触发时间修改了,因此需要调整其在堆中的位置,golang这边选择的做法是先删除(dodeltimer?)再添加(doaddtimer?)的方法调整位置。
对状态的修改:
1 2 3 4 5 6 7 8 9 10 11 12 |
// runtimer (looks in P's timer heap): // timerNoStatus -> panic: uninitialized timer // timerWaiting -> timerWaiting or // timerWaiting -> timerRunning -> timerNoStatus or // timerWaiting -> timerRunning -> timerWaiting // timerModifying -> wait until status changes // timerModifiedXX -> timerMoving -> timerWaiting // timerDeleted -> timerRemoving -> timerRemoved // timerRunning -> panic: concurrent runtimer calls // timerRemoved -> panic: inconsistent timer heap // timerRemoving -> panic: inconsistent timer heap // timerMoving -> panic: inconsistent timer heap |
主要作用:循环遍历堆中第一个定时器并操作:
为了保证正确性,runtimer?肯定会在adjusttimers?之后运行:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
if len(pp.timers) > 0 { adjusttimers(pp, now) for len(pp.timers) > 0 { // Note that runtimer may temporarily unlock // pp.timersLock. if tw := runtimer(pp, now); tw != 0 { if tw > 0 { pollUntil = tw } break } ran = true } } |
active?和inactive?:如果阅读了源码,会发现对于定时器的状态,还有active、inactive的分类,实际上active状态的定时器是等待未来触发的定时器(包括但不限与timeWaiting?状态),而正常不会再触发的定时器则为inactive(timeRemoved?、timerDeleted?等)。
在heap中和不在heap中:下方会解释状态机的管理和堆有什么关系?
XXing状态相当于是一个锁定状态,不允许其他goroutine并发操作,可以理解成锁。
在上面的部分中,讲解了timers体系中不同函数对于不同状态的流转。
这里将分析器的触发过程,Go 语言会在两个模块触发计时器,运行计时器中保存的函数:
我们将依次分析上述这两个触发过程。
?runtime.checkTimers? 是调度器用来运行处理器中计时器的函数,它会在发生以下情况时被调用:
这里不展开介绍 runtime.schedule? 和 runtime.findrunnable? 的实现了,重点分析用于执行计时器的runtime.checkTimers?,我们将该函数的实现分成调整计时器、运行计时器和删除计时器三个部分:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
// checkTimers runs any timers for the P that are ready. // If now is not 0 it is the current time. // It returns the passed time or the current time if now was passed as 0. // and the time when the next timer should run or 0 if there is no next timer, // and reports whether it ran any timers. // If the time when the next timer should run is not 0, // it is always larger than the returned time. // We pass now in and out to avoid extra calls of nanotime. //go:yeswritebarrierrec func checkTimers(pp *p, now int64) (rnow, pollUntil int64, ran bool) { // If it's not yet time for the first timer, or the first adjusted // timer, then there is nothing to do. next := int64(atomic.Load64(&pp.timer0When)) nextAdj := int64(atomic.Load64(&pp.timerModifiedEarliest)) if next == 0 || (nextAdj != 0 && nextAdj < next) { next = nextAdj }
if next == 0 { // No timers to run or adjust. return now, 0, false }
if now == 0 { now = nanotime() } if now < next { // Next timer is not ready to run, but keep going // if we would clear deleted timers. // This corresponds to the condition below where // we decide whether to call clearDeletedTimers. // 当前并没有到触发时间,这个检查的目的就是为了查看位于deletedTimers状态的 定时器 的比例,如果比例过大,就要清理 // 清理就是调用clearDeletedTimers函数。 if pp != getg().m.p.ptr() || int(atomic.Load(&pp.deletedTimers)) <= int(atomic.Load(&pp.numTimers)/4) { return now, next, false } }
lock(&pp.timersLock)
if len(pp.timers) > 0 { adjusttimers(pp, now) //上面生成的now会传下来,因此不用担心函数执行导致的时间流逝 for len(pp.timers) > 0 { // Note that runtimer may temporarily unlock // pp.timersLock. if tw := runtimer(pp, now); tw != 0 { //上面生成的now会传下来,因此不用担心函数执行导致的时间流逝 if tw > 0 { pollUntil = tw } break } ran = true } }
// If this is the local P, and there are a lot of deleted timers, // clear them out. We only do this for the local P to reduce // lock contention on timersLock. //如果运行当前goroutine的p是持有timers数组的p 且 处于deletedTimers状态的定时器 比例超过1/4,就清理掉这部分的定时器。 if pp == getg().m.p.ptr() && int(atomic.Load(&pp.deletedTimers)) > len(pp.timers)/4 { clearDeletedTimers(pp) }
unlock(&pp.timersLock)
return now, pollUntil, ran } |
?runtime.clearDeletedTimers? 能够避免堆中出现大量长时间运行的计时器,该函数和 runtime.moveTimers? 也是唯二会遍历计时器堆的函数(moveTimers? which only runs when the world is stopped)。
具体可见clearDeletedTimers?的注释:
1 2 3 |
// This is the only function that walks through the entire timer heap, // other than moveTimers which only runs when the world is stopped. func clearDeletedTimers(pp *p) { |
系统监控中也会触发调度器的执行,大概率是因为有时候m中可能存在不能被强占的情况,就有可能会导致timer的触发时间滞后。需要注意的是,虽然有系统监控,可以帮助timers及时触发,但是timers的触发并不能达到严格的实时性(系统监控检查timers调度延迟的阈值是10ms)。
这里我也对这个过程理解的不是很深刻,这里推荐去draveness大佬的在线图书中搜索【系统监控】关键词进行全面学习
首先需要明确这里的堆指的是struct p?管理定时器所用的四叉堆,而不是 内存管理涉及的栈和堆。
一个定时器创建之后是timerNoStatus?状态,其并不在堆上,需要放在p?的堆上之后才能进行调度和触发!
比如如下语句中的堆字眼:
1 2 |
// Active timers live in heaps attached to P, in the timers field. // Inactive timers live there too temporarily, until they are removed. |
衍生问题:哪些状态的timer在堆中?哪些状态在?
回答:可从源码中adjusttimers?函数中一窥究竟,timerNoStatus, timerRunning, timerRemoving, timerRemoved, timerMoving?状态的定时器是不在堆上的。
此外,有些同学可能有疑惑,定时器从堆中移除的过程(可以参考cleantimers?),是先标记成timerMoving?然后再从堆中移除,这两步不是原子的,如果状态已经是timerMoving?但是还没从堆中移除,遇上adjusttimers,岂不是会出现panic。
实际上这个问题并不会出现,因为只要涉及对p?的timers?数组操作(更改持续、加减元素)的地方都会加上锁(lock(&pp.timersLock)?),而且每个p?也只会修改自己的timers?数组,不会修改其它p?持有的timers?数组,但是同样如果不涉及数组更改,只设计状态变更的话就不需要加上锁(比如标记删除元素)。
相同的结构体示例:
time.go中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
// Package time knows the layout of this structure. // If this struct changes, adjust ../time/sleep.go:/runtimeTimer. // For GOOS=nacl, package syscall knows the layout of this structure. // If this struct changes, adjust ../syscall/net_nacl.go:/runtimeTimer. type timer struct { i int // heap index
// Timer wakes up at when, and then at when+period, ... (period > 0 only) // each time calling f(arg, now) in the timer goroutine, so f must be // a well-behaved function and not block. when int64 //当前计时器被唤醒的时间 period int64 //两次被唤醒的间隔; f func(interface{}, uintptr) //每当计时器被唤醒时都会调用的函数; arg interface{} //计时器被唤醒时调用 f 传入的参数; seq uintptr } |
sleep.go中:
1 2 3 4 5 6 7 8 9 10 |
// Interface to timers implemented in package runtime. // Must be in sync with ../runtime/time.go:/^type timer type runtimeTimer struct { i int when int64 period int64 f func(interface{}, uintptr) // NOTE: must not be closure arg interface{} seq uintptr } |
回答:实际上这两个结构体一个是运行时一个是编译时,不过作者目前对这块也不是特别清楚,也欢迎大家指点指点。
回答:因为定时器体系中,对于timers数组的更改需要加锁,如果没有更改的话就不需要加锁,为了能快速的StopTimer,因此标记删除并不需要拿锁,效率很高。什么时候加锁可参考:struct p中的定时器加锁。
这个问题和Go的调度模型GMP息息相关,这里就做一个不严谨的解释:acquirem? 的作用之一是 为了保证当前 ?P?? 不会被切换,粗暴理解就是对P而言,其相当于不会被“打断”,从而可以保证此时修改的p是当前goroutine所属的p。
1 2 3 4 5 6 7 8 9 10 11 12 |
// Disable preemption while using pp to avoid changing another P's heap. mp := acquirem()
pp := getg().m.p.ptr() lock(&pp.timersLock) cleantimers(pp) doaddtimer(pp, t) unlock(&pp.timersLock)
wakeNetPoller(when)
releasem(mp) |
相信这篇文章读完之后,这个已经不是问题了。
首先我们要明白nextwhen?字段的作用:用于记录下一次要设置when?字段为什么值
那么既然其用于标识下一次when?字段的值,那为什么不直接修改when?字段呢?
这是因为在当前的设计中,p只会修改自己的timers数组,如果当前p?修改了其他p?的when?字段,timers数组就无法正常排序了。所以需要使用nextwhen?来记录when?需要修改的值,等timers?数组对应的p?来修改when?的值。
这里涉及跨p操作定时器的问题。
每个p都会存放在其上创建的timer?,但是不同的goroutine?可能会在不同的p?上面,因此可能操作timer?的goroutine?所在的p?与存放timer?所在的p?并不是同一个p?。
1 2 3 4 5 |
// The timer is in some other P's heap, so we can't change // the when field. If we did, the other P's heap would // be out of order. So we put the new when value in the // nextwhen field, and let the other P set the when field // when it is prepared to resort the heap. |
猜测主要原因还是性能,锁可能还是太重了。而且实际上对于有重试的代码,atomic相关的设置可能更加优雅,比如下面代码:
1 2 3 |
if !atomic.Cas(&t.status, s, timerMoving) { continue //continue用于等待重试 } |
如果使用锁的话,那么伪代码如下,就算使用双重校验,可能还是很重
1 2 3 4 5 6 7 8 9 10 11 12 |
for { if t.status == s{ //双重校验,延迟加锁 t.mu.Lock() // 锁定 if t.status == s { t.status = timerMoving // 修改状态 t.mu.Unlock() // 释放锁 break // 修改成功,退出 } t.mu.Unlock() // 如果没有修改成功,解锁 } // 继续等待重试 } |
什么时候校验值:在每一次调用的入口。虽然函数值之前已经校验过。取之:src/runtime/time.go:255
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
func addtimer(t *timer) { // when must be positive. A negative value will cause runtimer to // overflow during its delta calculation and never expire other runtime // timers. Zero will cause checkTimers to fail to notice the timer. if t.when <= 0 { throw("timer when must be positive") } if t.period < 0 { throw("timer period must be non-negative") } if t.status != timerNoStatus { throw("addtimer called with initialized timer") } xxx } |
函数的分层设计:
1 2 3 4 5 6 7 8 9 |
// startTimer adds t to the timer heap. // //go:linkname startTimer time.startTimer func startTimer(t *timer) { if raceenabled { racerelease(unsafe.Pointer(t)) } addtimer(t) } |
函数与方法:
可以思考下下面这里为什么是方法而不是p的函数。
个人认为因为这里并不设计直接修改结构体p?的值,所以设计成方法可读性更强。换言之,如果要修改对应的结构体的值,才创建函数,否则优先使用方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
// updateTimerModifiedEarliest updates the recorded nextwhen field of the // earlier timerModifiedEarier value. // The timers for pp will not be locked. func updateTimerModifiedEarliest(pp *p, nextwhen int64) { for { old := atomic.Load64(&pp.timerModifiedEarliest) if old != 0 && int64(old) < nextwhen { return } if atomic.Cas64(&pp.timerModifiedEarliest, old, uint64(nextwhen)) { return } } } |
可以用注释表明当前函数必须被什么锁给锁定住:
1 2 3 4 5 6 |
// doaddtimer adds t to the current P's heap. // The caller must have locked the timers for pp. //注释说明,这个函数必须锁定之后才能进入 func doaddtimer(pp *p, t *timer) {
xxx } |
本文解开了Timer体系相关的状态流转,但是对于现在Timer中存在的问题(reset、垃圾回收)在golang1.23怎么得到解决的机制还没有探究,这个可以等待后续研究研究。