广告位联系
返回顶部
分享到

从源码解析golang Timer定时器体系

Golang 来源:互联网 作者:佚名 发布时间:2025-01-28 23:34:39 人浏览
摘要

Timer、Ticker使用及其注意事项 在刚开始学习golang语言的时候就听说Timer、Ticker的使用要尤其注意,很容易出现问题,这次就来一探究竟。 本文主要脉络: 介绍定时器体系,并介绍常用使用方式

Timer、Ticker使用及其注意事项

在刚开始学习golang语言的时候就听说Timer、Ticker的使用要尤其注意,很容易出现问题,这次就来一探究竟。

本文主要脉络:

  • 介绍定时器体系,并介绍常用使用方式和错误使用方式
  • 源码解读

timer、ticker是什么?

timer和ticker都是定时器,不同的是:

  • timer是一次性的定时器
  • ticker是循环定时器,在底层实现上是timer触发后又重新设置下一次触发时间来实现的

正确的使用姿势

Timer

对于Timer,可以通过三种函数创建:time.NewTimer?、time.AfterFunc?、time.After?。

其使用范例如下:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

// FnTimer1 Timer的使用用法

func FnTimer1() {

    timer := time.NewTimer(time.Second * 5) //返回生成的timer

    fmt.Printf("timer 的启动时间为:%v\n", time.Now())

 

    expire := <-timer.C //注意这个channel是一个容量为1的channel!

    fmt.Printf("timer 的触发时间为:%v\n", expire)

}

 

func FnTimer2() {

    ch1 := make(chan int, 1)

    select {

    case e1 := <-ch1:

        //如果ch1通道成功读取数据,则执行该case处理语句

        fmt.Printf("1th case is selected. e1=%v",e1)

    case <- time.After(2 * time.Second): //time.After直接返回生成的timer的channel,所以一般用于超时控制

        fmt.Println("Timed out")

    }

}

 

func FnTimer3() {

    _ = time.AfterFunc(time.Second*5, func() { //返回的也是timer,不过可以自己传入函数进行执行

        fmt.Printf("定时器触发了,触发时间为%v\n", time.Now())

    })

 

    fmt.Printf("timer 的启动时间为:%v\n", time.Now())

    time.Sleep(10 * time.Second) // 确保定时器触发

}

在底层原理上,三种不同的创建方式都是调用的time.NewTimer?,不同的是:

  • ?time.After?直接返回的是生成的timer的channel,而time.NewTimer?是返回生成的timer。
  • ?time.NewTimer?定时触发channel的原理就是定时调用一个sendTime?函数,这个函数负责向channel中发送触发时间;time.AfterFunc?就是将定时调用的sendTime?函数换成了一个自定义的函数。

补充:

  • 返回的channel的容量为1,因此是一个asynchronous的channel,即在定时器fired(触发)、stop(停止)、reset(重启)之后,仍然有可能能收到数据~

  • 垃圾回收:在go1.23之前,对于处于active(没有触发且没有显式调用Stop)的Timer,gc是无法回收的,Ticket也是同样的道理。因此在高并发的场景下需要显式搭配??defer time.Stop()??来解决暂时的“内存泄露”的问题。

  • 上述两点在Golang 1.23得到了解决,且可能在未来的版本中channel的容量会改为0(由asynchronous改成sync),在Go 1.23相关源码的注释部分有对应的说明。

Ticket

ticket相比于timer,其会循环触发,因此通常用于循环的干一些事情,like:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

// FnTicket2 Ticker的正确的使用方法!!

func FnTicket2() {

    ticker := time.NewTicker(1 * time.Second)

    stopTicker := make(chan struct{})

    defer ticker.Stop()

    go func() {

        for {

            select {

            case now := <-ticker.C:

                // do something

                fmt.Printf("ticker 的触发时间为:%v\n", now)

            case <-stopTicker:

                fmt.Println("ticker 结束")

                return

            }

        }

    }()

 

    time.Sleep(4 * time.Second)

    stopTicker <- struct{}{}

}

注意:代码块中使用stopTicker? + select?的方式是必须的,由于Stop函数只是修改定时器的状态,并不会主动关闭channel,因此如果直接使用循环(见?我的github仓库? )会直接导致Ticker永远不退出而导致内存泄露!

补充:TIcket = 触发时自动设置下一次时间的Timer,因此上面提到的Go1.23之前的Timer存在的问题依然存在。

  • 返回的channel容量为1
  • 垃圾回收:由于Ticket会一直循环触发,因此如果不显式调用??time.Stop??方法的话,会永久内存泄露。

此外,对于Ticket,还有一些额外的注意事项:

  • 需要使用一个额外的channel+select的方式来正确停止Ticket。
  • Reset函数的问题,由于比较长,单独整理在golang1.23版本之前 Timer Reset无法正确使用的问题,这里说个结论 :?Reset??函数由于内部非原子,因此无法完美的使用,建议使用goroutine+Timer的方式替代!

使用的注意事项

由于Golang 1.23版本对定时器timer、ticker做了很大的改进,因此要分成1.23之前和1.23及其之后的版本分开考虑:

以下是1.23版本关于timer、ticker部分的修改:

未停止的定时器和不再被引用的计时器可以进行垃圾回收。在 Go 1.23 之前,未停止的定时器无法被垃圾回收,直到定时器超时,而未停止的计时器永远无法被垃圾回收。Go 1.23 的实现避免了在不使用 t.Stop? 的程序中出现资源泄漏。

定时器通道现在是同步的(无缓冲的),这使 t.Reset? 和 t.Stop? 方法具有更强的保证:在其中一个方法返回后,将来从定时器通道接收到的任何值都不会观察到与旧定时器配置相对应的陈旧时间值。在 Go 1.23 之前,无法使用 t.Reset? 避免陈旧值,而使用 t.Stop? 避免陈旧值需要仔细使用 t.Stop? 的返回值。Go 1.23 的实现完全消除了这种担忧。

总结一下:1.23版本改进了timer、ticker的垃圾回收和 停止、重置的相关方法(Reset、Stop)。

这也就意味着在1.23版本之前,我们在使用的时候要注意:垃圾回收和停止、重置相关方法的使用。
由于Reset、Stop方法的外在表现本质上上是跟缓冲区由 有缓冲 改为 无缓冲 相关,因此如果有涉及读取缓冲区我们也需要注意相关特性。

具体来说,对于1.23之前:

  • 垃圾回收:TImer的回收只会在定时器触发(expired)或者Stop?之后;Ticker只显式触发Stop?之后才会回收;
  • ?Reset?、Stop?使用:对于Timer,没有完美的做法,无论怎么样Reset?和Stop?都可能存在一些问题;对于Ticker,记得使用完之后显式的Stop?;

源码解读

源码解读版本:release-branch.go1.8?

运作原理

timer(ticket)的运作依赖于struct p?,源码位置:src/runtime/runtime2.go:603。

所有的计时器timer都以最小四叉堆的形式存储在struct p?的timers?字段中。并且也是交给了计时器都交由处理器的网络轮询器和调度器触发,这种方式能够充分利用本地性、减少上下文的切换开销,也是目前性能最好的实现方式。

一般来说管理定时器的结构有3种:双向链表、最小堆、时间轮(很少)。

双向链表:插入|修改时间:需要遍历去寻找插入|修改的位置,时间复杂度O(N);触发:链表头触发即可,时间复杂度O(1)。

最小堆:插入|修改时间:O(logN)的时间复杂度;触发:队头触发,但是触发之后需要调整堆以保证堆的特性,O(logN)的时间复杂度。

时间轮:插入|修改时间|触发的时间复杂度都是O(1)。但是需要额外维护时间轮的结构,其占据空间随着需要维护的未来时间长度、时间精度增加。

涉及结构体

定时器体系有两种timer:一次性触发的Timer?和循环触发的Ticker?,这两种的底层结构是相同的,触发逻辑是类似的。

毕竟Ticker?的功能包含Timer?的功能。

定时器在源码中使用的结构体是timer?,其定义为:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

// Package time knows the layout of this structure.

// If this struct changes, adjust ../time/sleep.go:/runtimeTimer.

type timer struct {

    // If this timer is on a heap, which P's heap it is on.

    // puintptr rather than *p to match uintptr in the versions

    // of this struct defined in other packages.

    pp puintptr //指针,指向持有当前timer的p

 

    // Timer wakes up at when, and then at when+period, ... (period > 0 only)

    // each time calling f(arg, now) in the timer goroutine, so f must be

    // a well-behaved function and not block.

    //

    // when must be positive on an active timer.

    when   int64              //当前计时器被唤醒的时间;

    period int64              //两次被唤醒的间隔;

    f      func(any, uintptr) //唤醒时要执行的函数

    arg    any                // 计时器被唤醒时调用 f 传入的参数;

    seq    uintptr

 

    // What to set the when field to in timerModifiedXX status.

    nextwhen int64 //当定时器状态变为timerModifiedXX的时,when 字段下一次要设置的值

 

    // The status field holds one of the values below.

    status uint32 //计时器的状态,最重要的字段之一

}

?timer?在p?中的相关字段为:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

type p struct {

    // Lock for timers. We normally access the timers while running

    // on this P, but the scheduler can also do it from a different P.

    timersLock mutex //操作timers的时候加锁

 

    // Actions to take at some time. This is used to implement the

    // standard library's time package.

    // Must hold timersLock to access.

    timers []*timer //timers数组

 

    // Number of timers in P's heap.

    // Modified using atomic instructions.

    numTimers uint32 //总timers的数量

 

    // Number of timerDeleted timers in P's heap.

    // Modified using atomic instructions.

    deletedTimers uint32 //处于deleted状态的timers的数量

 

    // Race context used while executing timer functions.

    timerRaceCtx uintptr //竞态检测相关

}

状态机

go内部的定时器的操作是并发安全的(新建定时器、停止定时器)等,为了支持并发安全和定时器的高效调度,在源码中设计了一套关于定时器的状态机,全部的状态为:

状态 解释
timerNoStatus 还没有设置状态
timerWaiting 定时器等待触发
timerRunning timer正在运行
timerDeleted 定时器被标记删除
timerRemoving 定时器从标记删除到真正删除的中间态
timerRemoved 定时器真正被删除
timerModifying 正在被修改的中间状态
timerModifiedEarlier 定时器被修改到了更早的触发时间
timerModifiedLater 定时器被修改到了更晚的触发时间
timerMoving 已经被修改正在被移动

修改定时器的状态机涉及如下所示的 7 种不同操作,它们分别承担了不同的职责:

  • ?runtime.addtimer? — 向当前处理器增加新的计时器
  • ?runtime.deltimer? — 将计时器标记成 timerDeleted? 删除处理器中的计时器
  • ?runtime.modtimer? — 网络轮询器会调用该函数修改计时器
  • ?runtime.cleantimers? — 清除队列头中的计时器,能够提升程序创建和删除计时器的性能
  • ?runtime.adjusttimers? — 调整处理器持有的计时器堆,包括移动会稍后触发的计时器、删除标记为 timerDeleted? 的计时器
  • ?runtime.runtimer? — 检查队列头中的计时器,在其准备就绪时运行该计时器

状态机的变化流程图 可以大概帮我们看出timer?的不同状态的流转情况:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

@startuml

 

[*] --> timerNoStatus : 运行时创建Timer

 

timerNoStatus -->timerWaiting : addtimer

 

timerWaiting -->timerModifying : deltimer、modtimer

 

timerModifying -->timerDeleted : deltimer

 

timerModifiedLater -->timerDeleted : deltimer

 

timerModifiedEarlier -->timerModifying : deltimer

 

 

timerDeleted -->timerRemoving : cleantimers、adjusttimers、runtimer

 

timerRemoving -->timerRemoved : cleantimers、adjusttimers、runtimer

 

timerModifiedEarlier --> timerMoving : cleantimers、adjusttimers

 

timerModifiedLater --> timerMoving : cleantimers、adjusttimers

 

timerMoving --> timerWaiting : cleantimers

 

timerWaiting --> timerRunning : runtimer

 

timerRunning --> timerWaiting : runtimer

 

timerRunning --> timerNoStatus : runtimer

 

state timerModifiedXX {

  state timerModifiedEarlier {

  }

  state timerModifiedLater {

  }

}

 

timerModifying --> timerModifiedXX : modtimer

timerModifiedXX --> timerModifying : modtimer

 

timerNoStatus   --> timerModifying : modtimer

timerModifying --> timerWaiting : modtimer

timerRemoved --> timerModifying : modtimer

timerDeleted    --> timerModifying : modtimer

 

timerWaiting : 定时器等待触发

timerModifying : 定时器状态修改的中间态

timerDeleted : 定时器被标记删除的状态

 

timerRemoving: 定时器从标记删除到真正被删除的中间态

timerRemoved: 定时器真正被删除

 

timerModifiedEarlier: 定时器被修改到了更早的触发时间

timerModifiedLater : 定时器被修改到了更晚的触发时间

 

timerMoving: 定时器在堆上的位置正在重新排序

timerRunning: timer正在运行

timerModifiedXX: 定时器在堆上的位置等待重新排序

 

@enduml

实际上这些状态的流转都被完整的写在了golang的源码中,在后面逐个函数的讲解中也会涉及到:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

// addtimer:

//   timerNoStatus   -> timerWaiting

//   anything else   -> panic: invalid value

// deltimer:

//   timerWaiting         -> timerModifying -> timerDeleted

//   timerModifiedEarlier -> timerModifying -> timerDeleted

//   timerModifiedLater   -> timerModifying -> timerDeleted

//   timerNoStatus        -> do nothing

//   timerDeleted         -> do nothing

//   timerRemoving        -> do nothing

//   timerRemoved         -> do nothing

//   timerRunning         -> wait until status changes

//   timerMoving          -> wait until status changes

//   timerModifying       -> wait until status changes

// modtimer:

//   timerWaiting    -> timerModifying -> timerModifiedXX

//   timerModifiedXX -> timerModifying -> timerModifiedYY

//   timerNoStatus   -> timerModifying -> timerWaiting

//   timerRemoved    -> timerModifying -> timerWaiting

//   timerDeleted    -> timerModifying -> timerModifiedXX

//   timerRunning    -> wait until status changes

//   timerMoving     -> wait until status changes

//   timerRemoving   -> wait until status changes

//   timerModifying  -> wait until status changes

// cleantimers (looks in P's timer heap):

//   timerDeleted    -> timerRemoving -> timerRemoved

//   timerModifiedXX -> timerMoving -> timerWaiting

// adjusttimers (looks in P's timer heap):

//   timerDeleted    -> timerRemoving -> timerRemoved

//   timerModifiedXX -> timerMoving -> timerWaiting

// runtimer (looks in P's timer heap):

//   timerNoStatus   -> panic: uninitialized timer

//   timerWaiting    -> timerWaiting or

//   timerWaiting    -> timerRunning -> timerNoStatus or

//   timerWaiting    -> timerRunning -> timerWaiting

//   timerModifying  -> wait until status changes

//   timerModifiedXX -> timerMoving -> timerWaiting

//   timerDeleted    -> timerRemoving -> timerRemoved

//   timerRunning    -> panic: concurrent runtimer calls

//   timerRemoved    -> panic: inconsistent timer heap

//   timerRemoving   -> panic: inconsistent timer heap

//   timerMoving     -> panic: inconsistent timer heap

addtimer源码

addtimer对于状态的操作:

  • timerNoStatus -> timerWaiting
  • anything else -> panic: invalid value

addtimer的主要功能:向当前p的定时器堆中添加当前定时器,并尝试唤醒网络轮训器(定时器的执行依赖于网络轮训器处理)。

这里提到的网络轮训器可能让人有点疑惑,定时器和网络有什么关系?实际上确实也没什么关系,这里提到的网络轮训器重点在于轮训器,指的更多的是select、poll、epoll那套东西。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

func addtimer(t *timer) {

     // xxx

    if t.status != timerNoStatus {

        throw("addtimer called with initialized timer")

    }

    t.status = timerWaiting

 

    when := t.when

 

    // Disable preemption while using pp to avoid changing another P's heap.

    mp := acquirem()

 

    pp := getg().m.p.ptr()

    lock(&pp.timersLock)

    cleantimers(pp)       //尝试清除timers堆中堆头的元素,以加速定时器添加

    doaddtimer(pp, t)

    unlock(&pp.timersLock)

 

    wakeNetPoller(when)

 

    releasem(mp)

}

  • cleantimers真的能加速吗?为什么?

deltimer源码

?time.stopTimer?的底层实际调用就是deltimer?。

?deltimer?对于状态的操作:

// timerWaiting -> timerModifying -> timerDeleted
// timerModifiedEarlier -> timerModifying -> timerDeleted
// timerModifiedLater -> timerModifying -> timerDeleted
// timerNoStatus -> do nothing
// timerDeleted -> do nothing
// timerRemoving -> do nothing
// timerRemoved -> do nothing
// timerRunning -> wait until status changes
// timerMoving -> wait until status changes
// timerModifying -> wait until status changes

?deltimer?的主要功能:对于传入的定时器进行标记删除(状态status设置为timerDeleted?)。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

// deltimer deletes the timer t. It may be on some other P, so we can't

// actually remove it from the timers heap. We can only mark it as deleted.

// It will be removed in due course by the P whose heap it is on.

// Reports whether the timer was removed before it was run.

func deltimer(t *timer) bool {

    for {

        switch s := atomic.Load(&t.status); s {

        case timerWaiting, timerModifiedLater:

            // Prevent preemption while the timer is in timerModifying.

            // This could lead to a self-deadlock. See #38070.

            mp := acquirem()

            if atomic.Cas(&t.status, s, timerModifying) {

                // 必须要先拿到tpp,因为当状态设置为timerDeleted之后,timer就有可能被清除(cleantimers函数),就拿不到tpp了

                tpp := t.pp.ptr()

                if !atomic.Cas(&t.status, timerModifying, timerDeleted) {

                    badTimer()

                }

                releasem(mp)

                atomic.Xadd(&tpp.deletedTimers, 1)

                // Timer was not yet run.

                return true

            } else {

                releasem(mp)

            }

        case timerModifiedEarlier:

            // 这里和上面case的代码在源码中除了注释少一点其他一模一样,暂不清楚为什么

            mp := acquirem()

            if atomic.Cas(&t.status, s, timerModifying) {

                tpp := t.pp.ptr() //先拿到tpp,原理同上

                if !atomic.Cas(&t.status, timerModifying, timerDeleted) {

                    badTimer()

                }

                releasem(mp)

                atomic.Xadd(&tpp.deletedTimers, 1)

                // Timer was not yet run.

                return true

            } else {

                releasem(mp)

            }

        case timerDeleted, timerRemoving, timerRemoved:

            // Timer was already run.

            return false

        case timerRunning, timerMoving:

            // The timer is being run or moved, by a different P.

            // Wait for it to complete.

            osyield()

        case timerNoStatus:

            // Removing timer that was never added or

            // has already been run. Also see issue 21874.

            return false

        case timerModifying:

            // Simultaneous calls to deltimer and modtimer.

            // Wait for the other call to complete.

            osyield()

        default:

            badTimer()

        }

    }

}

modtimer源码

?time.reset?方法底层实际上调用就是modtimer?方法。

?modtimer?对于状态的修改,可以简单的归纳为:

  • 当前timer还在heap中:修改为timerModifiedXX?状态(等待重新排序触发)
  • 当前timer不在heap中:修改为等待调度timerWaiting?状态
  • 当前timer在修改的中间态(XXing状态):XXing相当于是被锁定的状态,因此等待状态发生变动

1

2

3

4

5

6

7

8

9

//   timerWaiting    -> timerModifying -> timerModifiedXX

//   timerModifiedXX -> timerModifying -> timerModifiedYY

//   timerNoStatus   -> timerModifying -> timerWaiting

//   timerRemoved    -> timerModifying -> timerWaiting

//   timerDeleted    -> timerModifying -> timerModifiedXX

//   timerRunning    -> wait until status changes

//   timerMoving     -> wait until status changes

//   timerRemoving   -> wait until status changes

//   timerModifying  -> wait until status changes

modtimer的主要功能:重置定时器。具体来说,首先判断timer还在不在heap中

  • 还在:修改状态timerModifiedXX?,等待重新触发
  • 不在:重新添加到heap中,等待重新触发

resettimer源码

底层调用的就是modtimer,这里不多赘述了。

1

2

3

4

5

6

7

8

// resettimer resets the time when a timer should fire.

// If used for an inactive timer, the timer will become active.

// This should be called instead of addtimer if the timer value has been,

// or may have been, used previously.

// Reports whether the timer was modified before it was run.

func resettimer(t *timer, when int64) bool {

    return modtimer(t, when, t.period, t.f, t.arg, t.seq)

}

cleantimers源码

在addtimer中有调用,具体会在添加新定时器之前调用(addtimer源码)。

函数作用为:尝试清理heap头(第一个元素)的定时器:移除或者调整到正确的位置,可以加速addtimer?添加定时器。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

// cleantimers cleans up the head of the timer queue. This speeds up

// programs that create and delete timers; leaving them in the heap

// slows down addtimer. Reports whether no timer problems were found.

// The caller must have locked the timers for pp.

func cleantimers(pp *p) {

    gp := getg()

    for {

        if len(pp.timers) == 0 {

            return

        }

 

        t := pp.timers[0]

 

        switch s := atomic.Load(&t.status); s {

        case timerDeleted: //被标记删除,现在正式移除

            if !atomic.Cas(&t.status, s, timerRemoving) {

                continue

            }

            dodeltimer0(pp)

            if !atomic.Cas(&t.status, timerRemoving, timerRemoved) {

                badTimer()

            }

            atomic.Xadd(&pp.deletedTimers, -1)

        case timerModifiedEarlier, timerModifiedLater: //定时器被调整,移动其到正确的位置

            if !atomic.Cas(&t.status, s, timerMoving) {

                continue

            }

            // Now we can change the when field.

            t.when = t.nextwhen

            // Move t to the right position.

            dodeltimer0(pp)

            doaddtimer(pp, t)

            if !atomic.Cas(&t.status, timerMoving, timerWaiting) {

                badTimer()

            }

        default:

            // Head of timers does not need adjustment.

            return

        }

    }

}

adjusttimers源码

?adjusttimers?对状态的修改:

1

2

3

// adjusttimers (looks in P's timer heap):

//   timerDeleted    -> timerRemoving -> timerRemoved

//   timerModifiedXX -> timerMoving -> timerWaiting

?adjusttimers?的主要作用与cleantimers?相同:尝试清理heap的定时器:移除或者调整到正确的位置。

不同的是:??cleantimers??只会对堆头的元素进行处理,而??adjusttimers??是遍历堆中所有的元素进行处理。

很有意思的一点是:对于timerModifiedXX?状态的定时器,由于是触发时间修改了,因此需要调整其在堆中的位置,golang这边选择的做法是先删除(dodeltimer?)再添加(doaddtimer?)的方法调整位置。

runtimer

对状态的修改:

1

2

3

4

5

6

7

8

9

10

11

12

// runtimer (looks in P's timer heap):

//   timerNoStatus   -> panic: uninitialized timer

//   timerWaiting    -> timerWaiting or

//   timerWaiting    -> timerRunning -> timerNoStatus or

//   timerWaiting    -> timerRunning -> timerWaiting

//   timerModifying  -> wait until status changes

//   timerModifiedXX -> timerMoving -> timerWaiting

//   timerDeleted    -> timerRemoving -> timerRemoved

//   timerRunning    -> panic: concurrent runtimer calls

//   timerRemoved    -> panic: inconsistent timer heap

//   timerRemoving   -> panic: inconsistent timer heap

//   timerMoving     -> panic: inconsistent timer heap

主要作用:循环遍历堆中第一个定时器并操作:

  • 如果第一个定时器为timerWaiting?状态:已经到达触发时间久运行并调整时间,然后返回;未到触发时间久直接返回。
  • 其它状态:进行对应的操作并再次循环。对应的操作举例:timerModifiedXX?-》调整时间;timerDeleted?-》从堆中移除定时器。

为了保证正确性,runtimer?肯定会在adjusttimers?之后运行:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

if len(pp.timers) > 0 {

        adjusttimers(pp, now)

        for len(pp.timers) > 0 {

            // Note that runtimer may temporarily unlock

            // pp.timersLock.

            if tw := runtimer(pp, now); tw != 0 {

                if tw > 0 {

                    pollUntil = tw

                }

                break

            }

            ran = true

        }

}

状态机规律总结

  • active?和inactive?:如果阅读了源码,会发现对于定时器的状态,还有active、inactive的分类,实际上active状态的定时器是等待未来触发的定时器(包括但不限与timeWaiting?状态),而正常不会再触发的定时器则为inactive(timeRemoved?、timerDeleted?等)。

  • 在heap中和不在heap中:下方会解释状态机的管理和堆有什么关系?

  • XXing状态相当于是一个锁定状态,不允许其他goroutine并发操作,可以理解成锁。

定时器的触发

在上面的部分中,讲解了timers体系中不同函数对于不同状态的流转。

这里将分析器的触发过程,Go 语言会在两个模块触发计时器,运行计时器中保存的函数:

  • 调度器调度时会检查处理器中的计时器是否准备就绪;
  • 系统监控会检查是否有未执行的到期计时器;

我们将依次分析上述这两个触发过程。

调度器调度

?runtime.checkTimers? 是调度器用来运行处理器中计时器的函数,它会在发生以下情况时被调用:

  • 调度器调用 runtime.schedule? 执行调度时;
  • 调度器调用 runtime.findrunnable? 获取可执行的 Goroutine 时;
  • 调度器调用 runtime.findrunnable? 从其他处理器窃取计时器时;

这里不展开介绍 runtime.schedule? 和 runtime.findrunnable? 的实现了,重点分析用于执行计时器的runtime.checkTimers?,我们将该函数的实现分成调整计时器、运行计时器和删除计时器三个部分:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

// checkTimers runs any timers for the P that are ready.

// If now is not 0 it is the current time.

// It returns the passed time or the current time if now was passed as 0.

// and the time when the next timer should run or 0 if there is no next timer,

// and reports whether it ran any timers.

// If the time when the next timer should run is not 0,

// it is always larger than the returned time.

// We pass now in and out to avoid extra calls of nanotime.

//go:yeswritebarrierrec

func checkTimers(pp *p, now int64) (rnow, pollUntil int64, ran bool) {

    // If it's not yet time for the first timer, or the first adjusted

    // timer, then there is nothing to do.

    next := int64(atomic.Load64(&pp.timer0When))

    nextAdj := int64(atomic.Load64(&pp.timerModifiedEarliest))

    if next == 0 || (nextAdj != 0 && nextAdj < next) {

        next = nextAdj

    }

 

    if next == 0 {

        // No timers to run or adjust.

        return now, 0, false

    }

 

    if now == 0 {

        now = nanotime()

    }

    if now < next {

        // Next timer is not ready to run, but keep going

        // if we would clear deleted timers.

        // This corresponds to the condition below where

        // we decide whether to call clearDeletedTimers.

        //  当前并没有到触发时间,这个检查的目的就是为了查看位于deletedTimers状态的 定时器 的比例,如果比例过大,就要清理

        //  清理就是调用clearDeletedTimers函数。

        if pp != getg().m.p.ptr() || int(atomic.Load(&pp.deletedTimers)) <= int(atomic.Load(&pp.numTimers)/4) {

            return now, next, false

        }

    }

 

    lock(&pp.timersLock)

 

    if len(pp.timers) > 0 {

        adjusttimers(pp, now)  //上面生成的now会传下来,因此不用担心函数执行导致的时间流逝

        for len(pp.timers) > 0 {

            // Note that runtimer may temporarily unlock

            // pp.timersLock.

            if tw := runtimer(pp, now); tw != 0 { //上面生成的now会传下来,因此不用担心函数执行导致的时间流逝

                if tw > 0 {

                    pollUntil = tw

                }

                break

            }

            ran = true

        }

    }

 

    // If this is the local P, and there are a lot of deleted timers,

    // clear them out. We only do this for the local P to reduce

    // lock contention on timersLock.

    //如果运行当前goroutine的p是持有timers数组的p 且 处于deletedTimers状态的定时器 比例超过1/4,就清理掉这部分的定时器。

    if pp == getg().m.p.ptr() && int(atomic.Load(&pp.deletedTimers)) > len(pp.timers)/4 {

        clearDeletedTimers(pp)

    }

 

    unlock(&pp.timersLock)

 

    return now, pollUntil, ran

}

?runtime.clearDeletedTimers? 能够避免堆中出现大量长时间运行的计时器,该函数和 runtime.moveTimers? 也是唯二会遍历计时器堆的函数(moveTimers? which only runs when the world is stopped)。

具体可见clearDeletedTimers?的注释:

1

2

3

// This is the only function that walks through the entire timer heap,

// other than moveTimers which only runs when the world is stopped.

func clearDeletedTimers(pp *p) {

系统监控

系统监控中也会触发调度器的执行,大概率是因为有时候m中可能存在不能被强占的情况,就有可能会导致timer的触发时间滞后。需要注意的是,虽然有系统监控,可以帮助timers及时触发,但是timers的触发并不能达到严格的实时性(系统监控检查timers调度延迟的阈值是10ms)。

这里我也对这个过程理解的不是很深刻,这里推荐去draveness大佬的在线图书中搜索【系统监控】关键词进行全面学习

补充问题

状态机的管理和堆有什么关系?

首先需要明确这里的堆指的是struct p?管理定时器所用的四叉堆,而不是 内存管理涉及的栈和堆。

一个定时器创建之后是timerNoStatus?状态,其并不在堆上,需要放在p?的堆上之后才能进行调度和触发!

比如如下语句中的堆字眼:

  • ?timerNoStatus? 和 timerRemoved? — 计时器不在堆上;
  • ?timerModifiedEarlier? 和 timerModifiedLater? — 计时器虽然在堆上,但是可能位于错误的位置上,需要重新排序;

1

2

// Active timers live in heaps attached to P, in the timers field.

// Inactive timers live there too temporarily, until they are removed.

衍生问题:哪些状态的timer在堆中?哪些状态在?

回答:可从源码中adjusttimers?函数中一窥究竟,timerNoStatus, timerRunning, timerRemoving, timerRemoved, timerMoving?状态的定时器是不在堆上的。

此外,有些同学可能有疑惑,定时器从堆中移除的过程(可以参考cleantimers?),是先标记成timerMoving?然后再从堆中移除,这两步不是原子的,如果状态已经是timerMoving?但是还没从堆中移除,遇上adjusttimers,岂不是会出现panic。

实际上这个问题并不会出现,因为只要涉及对p?的timers?数组操作(更改持续、加减元素)的地方都会加上锁(lock(&pp.timersLock)?),而且每个p?也只会修改自己的timers?数组,不会修改其它p?持有的timers?数组,但是同样如果不涉及数组更改,只设计状态变更的话就不需要加上锁(比如标记删除元素)。

为什么time.go和sleep.go中有接近相同的结构体?

相同的结构体示例:

time.go中:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

// Package time knows the layout of this structure.

// If this struct changes, adjust ../time/sleep.go:/runtimeTimer.

// For GOOS=nacl, package syscall knows the layout of this structure.

// If this struct changes, adjust ../syscall/net_nacl.go:/runtimeTimer.

type timer struct {

    i int // heap index

 

    // Timer wakes up at when, and then at when+period, ... (period > 0 only)

    // each time calling f(arg, now) in the timer goroutine, so f must be

    // a well-behaved function and not block.

    when   int64                      //当前计时器被唤醒的时间

    period int64                      //两次被唤醒的间隔;

    f      func(interface{}, uintptr) //每当计时器被唤醒时都会调用的函数;

    arg    interface{}                //计时器被唤醒时调用 f 传入的参数;

    seq    uintptr

}

sleep.go中:

1

2

3

4

5

6

7

8

9

10

// Interface to timers implemented in package runtime.

// Must be in sync with ../runtime/time.go:/^type timer

type runtimeTimer struct {

    i      int

    when   int64

    period int64

    f      func(interface{}, uintptr) // NOTE: must not be closure

    arg    interface{}

    seq    uintptr

}

回答:实际上这两个结构体一个是运行时一个是编译时,不过作者目前对这块也不是特别清楚,也欢迎大家指点指点。

为什么deltimer?函数只是标记删除,并不直接删除timer?

回答:因为定时器体系中,对于timers数组的更改需要加锁,如果没有更改的话就不需要加锁,为了能快速的StopTimer,因此标记删除并不需要拿锁,效率很高。什么时候加锁可参考:struct p中的定时器加锁。

acquirem()?和releasem()?的作用

这个问题和Go的调度模型GMP息息相关,这里就做一个不严谨的解释:acquirem? 的作用之一是 为了保证当前 ?P?? 不会被切换,粗暴理解就是对P而言,其相当于不会被“打断”,从而可以保证此时修改的p是当前goroutine所属的p。

1

2

3

4

5

6

7

8

9

10

11

12

// Disable preemption while using pp to avoid changing another P's heap.

mp := acquirem()

 

pp := getg().m.p.ptr()

lock(&pp.timersLock)

cleantimers(pp)

doaddtimer(pp, t)

unlock(&pp.timersLock)

 

wakeNetPoller(when)

 

releasem(mp)

定时器的状态机中为什么有这么多中间状态?

相信这篇文章读完之后,这个已经不是问题了。

nextwhen?字段的作用,其存在的必要性

首先我们要明白nextwhen?字段的作用:用于记录下一次要设置when?字段为什么值

那么既然其用于标识下一次when?字段的值,那为什么不直接修改when?字段呢?

这是因为在当前的设计中,p只会修改自己的timers数组,如果当前p?修改了其他p?的when?字段,timers数组就无法正常排序了。所以需要使用nextwhen?来记录when?需要修改的值,等timers?数组对应的p?来修改when?的值。

这里涉及跨p操作定时器的问题。

每个p都会存放在其上创建的timer?,但是不同的goroutine?可能会在不同的p?上面,因此可能操作timer?的goroutine?所在的p?与存放timer?所在的p?并不是同一个p?。

1

2

3

4

5

// The timer is in some other P's heap, so we can't change

        // the when field. If we did, the other P's heap would

        // be out of order. So we put the new when value in the

        // nextwhen field, and let the other P set the when field

        // when it is prepared to resort the heap.

为什么要用atomic相关变量,而不直接使用锁

猜测主要原因还是性能,锁可能还是太重了。而且实际上对于有重试的代码,atomic相关的设置可能更加优雅,比如下面代码:

1

2

3

if !atomic.Cas(&t.status, s, timerMoving) {

    continue //continue用于等待重试

}

如果使用锁的话,那么伪代码如下,就算使用双重校验,可能还是很重

1

2

3

4

5

6

7

8

9

10

11

12

for {

        if t.status == s{ //双重校验,延迟加锁

            t.mu.Lock() // 锁定

            if t.status == s {

                t.status = timerMoving // 修改状态

                t.mu.Unlock()           // 释放锁

                break                    // 修改成功,退出

            }

            t.mu.Unlock() // 如果没有修改成功,解锁

        }

        // 继续等待重试

    }

编程风格的学习

什么时候校验值:在每一次调用的入口。虽然函数值之前已经校验过。取之:src/runtime/time.go:255

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

func addtimer(t *timer) {

    // when must be positive. A negative value will cause runtimer to

    // overflow during its delta calculation and never expire other runtime

    // timers. Zero will cause checkTimers to fail to notice the timer.

    if t.when <= 0 {

        throw("timer when must be positive")

    }

    if t.period < 0 {

        throw("timer period must be non-negative")

    }

    if t.status != timerNoStatus {

        throw("addtimer called with initialized timer")

    }

xxx

}

函数的分层设计:

1

2

3

4

5

6

7

8

9

// startTimer adds t to the timer heap.

//

//go:linkname startTimer time.startTimer

func startTimer(t *timer) {

    if raceenabled {

        racerelease(unsafe.Pointer(t))

    }

    addtimer(t)

}

函数与方法:

可以思考下下面这里为什么是方法而不是p的函数。

个人认为因为这里并不设计直接修改结构体p?的值,所以设计成方法可读性更强。换言之,如果要修改对应的结构体的值,才创建函数,否则优先使用方法。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

// updateTimerModifiedEarliest updates the recorded nextwhen field of the

// earlier timerModifiedEarier value.

// The timers for pp will not be locked.

func updateTimerModifiedEarliest(pp *p, nextwhen int64) {

    for {

        old := atomic.Load64(&pp.timerModifiedEarliest)

        if old != 0 && int64(old) < nextwhen {

            return

        }

        if atomic.Cas64(&pp.timerModifiedEarliest, old, uint64(nextwhen)) {

            return

        }

    }

}

可以用注释表明当前函数必须被什么锁给锁定住:

1

2

3

4

5

6

// doaddtimer adds t to the current P's heap.

// The caller must have locked the timers for pp.   //注释说明,这个函数必须锁定之后才能进入

func doaddtimer(pp *p, t *timer) {

 

xxx

}

总结

本文解开了Timer体系相关的状态流转,但是对于现在Timer中存在的问题(reset、垃圾回收)在golang1.23怎么得到解决的机制还没有探究,这个可以等待后续研究研究。


版权声明 : 本文内容来源于互联网或用户自行发布贡献,该文观点仅代表原作者本人。本站仅提供信息存储空间服务和不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权, 违法违规的内容, 请发送邮件至2530232025#qq.cn(#换@)举报,一经查实,本站将立刻删除。
原文链接 :
相关文章
  • golang1.23版本之前Timer Reset方法无法正确使用

    golang1.23版本之前Timer Reset方法无法正确使用
    golang1.23 之前 Reset ?到底有什么问题 在golang 的 time.Reset 文档中有这么一句话,为了防止文档更新而导致内容变动,这里粘贴出来: Before Go 1
  • 从源码解析golang Timer定时器体系
    Timer、Ticker使用及其注意事项 在刚开始学习golang语言的时候就听说Timer、Ticker的使用要尤其注意,很容易出现问题,这次就来一探究竟。 本
  • 如何修改Go结构体的私有字段
    文章正文 在 Go 语言中,结构体字段的访问权限是由字段名的首字母决定的:首字母大写表示公共字段(public),首字母小写表示私有字段(
  • Golang实现超时机制读取文件的方法
    协程与通道 协程(Goroutine)是轻量级线程,可实现函数或方法与主程序流并行执行。使用go关键字:go func(){}。通道是协程直接的通讯管道,
  • golang如何解决go get命令无响应问题

    golang如何解决go get命令无响应问题
    golang go get命令无响应问题 操作 1 go get -v github.com/golang/net 现象 go get github.com/golang/net: module github.com/golang/net: Get https://proxy.golang.org/github.co
  • Go语言中字符串赋值中的问题与解决方法
    字符串的拼接方式 使用+号 使用+号拼接字符串的方式,每次拼接都会创建一个新的字符串,然后将原来的字符串复制到新的字符串中,这样
  • 使用Go语言中的Context取消协程执行的操作介绍
    使用 Go 语言中的 Context 取消协程执行 在 Go 语言中,协程(goroutine)是一种轻量级的线程,非常适合处理并发任务。然而,如何优雅地取消
  • Go语言加解密利器之go-crypto库用法解析
    在软件开发中,数据安全和隐私保护越来越受到重视。Go 语言以其简洁高效的特性,成为了许多开发者的首选。然而,在实际项目中使用加
  • golang怎么判断文件是否存在

    golang怎么判断文件是否存在
    判断一个文件是否存在是一个相当常见的需求,在golang中也有多种方案实现这一功能。 现在我们介绍其中两种最常用也是最简单的实现,第
  • Go语言中的Defer机制的介绍
    在Go语言中,defer是一个关键字,用于确保资源的清理和释放,特别是在函数中创建的资源。defer语句会将其后的函数调用推迟到包含它的函
  • 本站所有内容来源于互联网或用户自行发布,本站仅提供信息存储空间服务,不拥有版权,不承担法律责任。如有侵犯您的权益,请您联系站长处理!
  • Copyright © 2017-2022 F11.CN All Rights Reserved. F11站长开发者网 版权所有 | 苏ICP备2022031554号-1 | 51LA统计