广告位联系
返回顶部
分享到

Go语言k8s kubernetes使用leader election实现选举

Golang 来源:互联网 作者:佚名 发布时间:2022-10-26 22:18:39 人浏览
摘要

在kubernetes的世界中,很多组件仅仅需要一个实例在运行,比如controller-manager或第三方的controller,但是为了高可用性,需要组件有多个副本,在发生故障的时候需要自动切换。因此,需

在kubernetes的世界中,很多组件仅仅需要一个实例在运行,比如controller-manager或第三方的controller,但是为了高可用性,需要组件有多个副本,在发生故障的时候需要自动切换。因此,需要利用leader election的机制多副本部署,单实例运行的模式。应用程序可以使用外部的组件比如ZooKeeper或Etcd等中间件进行leader eleaction, ZooKeeper的实现是采用临时节点的方案,临时节点存活与客户端与ZooKeeper的会话期间,在会话结束后,临时节点会被立刻删除,临时节点被删除后,其他处于被动状态的服务实例会竞争生成临时节点,生成临时节点的客户端(服务实例)就变成Leader,从而保证整个集群中只有一个活跃的实例,在发生故障的时候,也能快速的实现主从之间的迁移。Etcd是一个分布式的kv存储组件,利用Raft协议维护副本的状态服务,Etcd的Revision机制可以实现分布式锁的功能,Etcd的concurrency利用的分布式锁的能力实现了选Leader的功能(本文更多关注的是k8s本身的能力,Etcd的concurrency机制不做详细介绍)。

kubernetes使用的Etcd作为底层的存储组件,因此我们是不是有可能利用kubernetes的API实现选leader的功能呢?其实kubernetes的SIG已经提供了这方面的能力,主要是通过configmap/lease/endpoint的资源实现选Leader的功能。

二、官网代码示例

kubernetes官方提供了一个使用的例子,源码在:github.com/kubernetes/…

选举的过程中,每个实例的状态有可能是:

  • 选择成功->运行业务代码
  • 等待状态,有其他实例成为了leader。当leader放弃锁后,此状态的实例有可能会成为新的leader
  • 释放leader的锁,在运行的业务代码退出

在稳定的环境中,实例一旦成为了leader,通常情况是不会释放锁的,会保持一直运行的状态,这样有利于业务的稳定和Controller快速的对资源的状态变化做成相应的操作。只有在网络不稳定或误操作删除实例的情况下,才会触发leader的重新选举。

kubernetes官方提供的选举例子详解如下:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

package main

import (

  "context"

  "flag"

  "os"

  "os/signal"

  "syscall"

  "time"

  "github.com/google/uuid"

  metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

  clientset "k8s.io/client-go/kubernetes"

  "k8s.io/client-go/rest"

  "k8s.io/client-go/tools/clientcmd"

  "k8s.io/client-go/tools/leaderelection"

  "k8s.io/client-go/tools/leaderelection/resourcelock"

  "k8s.io/klog/v2"

)

func buildConfig(kubeconfig string) (*rest.Config, error) {

  if kubeconfig != "" {

    cfg, err := clientcmd.BuildConfigFromFlags("", kubeconfig)

    if err != nil {

      return nil, err

    }

    return cfg, nil

  }

  cfg, err := rest.InClusterConfig()

  if err != nil {

    return nil, err

  }

  return cfg, nil

}

func main() {

  klog.InitFlags(nil)

  var kubeconfig string

  var leaseLockName string

  var leaseLockNamespace string

  var id string

  // kubeconfig 指定了kubernetes集群的配置文文件路径

  flag.StringVar(&kubeconfig, "kubeconfig", "", "absolute path to the kubeconfig file")

  // 锁的拥有者的ID,如果没有传参数进来,就随机生成一个

  flag.StringVar(&id, "id", uuid.New().String(), "the holder identity name")

  // 锁的ID,对应kubernetes中资源的name

  flag.StringVar(&leaseLockName, "lease-lock-name", "", "the lease lock resource name")

  // 锁的命名空间

  flag.StringVar(&leaseLockNamespace, "lease-lock-namespace", "", "the lease lock resource namespace")

  // 解析命令行参数

  flag.Parse()

  if leaseLockName == "" {

    klog.Fatal("unable to get lease lock resource name (missing lease-lock-name flag).")

  }

  if leaseLockNamespace == "" {

    klog.Fatal("unable to get lease lock resource namespace (missing lease-lock-namespace flag).")

  }

  // leader election uses the Kubernetes API by writing to a

  // lock object, which can be a LeaseLock object (preferred),

  // a ConfigMap, or an Endpoints (deprecated) object.

  // Conflicting writes are detected and each client handles those actions

  // independently.

  config, err := buildConfig(kubeconfig)

  if err != nil {

    klog.Fatal(err)

  }

  // 获取kubernetes集群的客户端,如果获取不到,就抛异常退出

  client := clientset.NewForConfigOrDie(config)

  // 模拟Controller的逻辑代码

  run := func(ctx context.Context) {

    // complete your controller loop here

    klog.Info("Controller loop...")

    // 不退出

    select {}

  }

  // use a Go context so we can tell the leaderelection code when we

  // want to step down

  ctx, cancel := context.WithCancel(context.Background())

  defer cancel()

  // listen for interrupts or the Linux SIGTERM signal and cancel

  // our context, which the leader election code will observe and

  // step down

  // 处理系统的系统,收到SIGTERM信号后,会退出进程

  ch := make(chan os.Signal, 1)

  signal.Notify(ch, os.Interrupt, syscall.SIGTERM)

  go func() {

    <-ch

    klog.Info("Received termination, signaling shutdown")

    cancel()

  }()

  // we use the Lease lock type since edits to Leases are less common

  // and fewer objects in the cluster watch "all Leases".

   

  // 根据参数,生成锁。这里使用的Lease这种类型资源作为锁

  lock := &resourcelock.LeaseLock{

    LeaseMeta: metav1.ObjectMeta{

      Name:      leaseLockName,

      Namespace: leaseLockNamespace,

    },

    // 跟kubernetes集群关联起来

    Client: client.CoordinationV1(),

    LockConfig: resourcelock.ResourceLockConfig{

      Identity: id,

    },

  }

  // start the leader election code loop

   

  // 注意,选举逻辑启动时候,会传入ctx参数,如果ctx对应的cancel函数被调用,那么选举也会结束

  leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{

    // 选举使用的锁

    Lock: lock,

    // IMPORTANT: you MUST ensure that any code you have that

    // is protected by the lease must terminate **before**

    // you call cancel. Otherwise, you could have a background

    // loop still running and another process could

    // get elected before your background loop finished, violating

    // the stated goal of the lease.

    //主动放弃leader,当ctx canceled的时候

    ReleaseOnCancel: true,

    LeaseDuration:   60 * time.Second,  // 选举的任期,60s一个任期,如果在60s后没有renew,那么leader就会释放锁,重新选举

    RenewDeadline:   15 * time.Second,  // renew的请求的超时时间

    RetryPeriod:     5 * time.Second, // leader获取到锁后,renew leadership的间隔。非leader,抢锁成为leader的间隔(有1.2的jitter因子,详细看代码)

     

    // 回调函数的注册

    Callbacks: leaderelection.LeaderCallbacks{

       

      // 成为leader的回调

      OnStartedLeading: func(ctx context.Context) {

        // we're notified when we start - this is where you would

        // usually put your code

        // 运行controller的逻辑

        run(ctx)

      },

      OnStoppedLeading: func() {

        // we can do cleanup here

        // 退出leader的

        klog.Infof("leader lost: %s", id)

        os.Exit(0)

      },

      OnNewLeader: func(identity string) {

        // 有新的leader当选

        // we're notified when new leader elected

        if identity == id {

          // I just got the lock

          return

        }

        klog.Infof("new leader elected: %s", identity)

      },

    },

  })

}

启动一个实例,观察日志输出和kubernetes集群上的lease资源,启动命令

go run main.go --kubeconfig=/tmp/test-kubeconfig.yaml -logtostderr=true -lease-lock-name=example -lease-lock-namespace=default -id=1

可以看到,日志有输出,id=1的实例获取到资源了。

go run main.go --kubeconfig=/tmp/test-kubeconfig.yaml -logtostderr=true -lease-lock-name=example -lease-lock-namespace=default -id=1 I1023 17:00:21.670298 94227 leaderelection.go:248] attempting to acquire leader lease default/example... I1023 17:00:21.784234 94227 leaderelection.go:258] successfully acquired lease default/example I1023 17:00:21.784316 94227 main.go:78] Controller loop...

在kubernetes的集群上,看到

我们接着启动一个实例,id=2,日志中输出

go run main.go --kubeconfig=/tmp/test-kubeconfig.yaml -logtostderr=true -lease-lock-name=example -lease-lock-namespace=default -id=2 I1023 17:05:00.555145 95658 leaderelection.go:248] attempting to acquire leader lease default/example... I1023 17:05:00.658202 95658 main.go:151] new leader elected: 1

可以看出,id=2的实例,没有获取到锁,并且观察到id=1的锁获取到了实例。接着我们尝试退出id=1的实例,观察id=2的实例是否会成为新的leader

三、锁的实现

kubernets的资源都可以实现Get/Create/Update的操作,因此,理论上所有的资源都可以作为锁的底层。kubernetes 提供了Lease/Configmap/Endpoint作为锁的底层。

锁的状态转移如下:

锁需要实现以下的接口

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

type Interface interface {

  // Get returns the LeaderElectionRecord

  Get(ctx context.Context) (*LeaderElectionRecord, []byte, error)

  // Create attempts to create a LeaderElectionRecord

  Create(ctx context.Context, ler LeaderElectionRecord) error

  // Update will update and existing LeaderElectionRecord

  Update(ctx context.Context, ler LeaderElectionRecord) error

  // RecordEvent is used to record events

  RecordEvent(string)

  // Identity will return the locks Identity

  Identity() string

  // Describe is used to convert details on current resource lock

  // into a string

  Describe() string

}

理论上,有Get/Create/Update三个方法,就可以实现锁的机制了。但是,需要保证update和create操作的原子性,这个就是kuberenetes的机制保证了。第二章的官网代码例子中,leaderelection.RunOrDie使用的RunOrDie接口,其实就是调用Run接口,而Run接口实现非常简单:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

func (le *LeaderElector) Run(ctx context.Context) {

  defer runtime.HandleCrash()

  defer func() {

    le.config.Callbacks.OnStoppedLeading()

  }()

  // 获取锁,如果没有获取到,就一直等待

  if !le.acquire(ctx) {

    return // ctx signalled done

  }

  ctx, cancel := context.WithCancel(ctx)

  defer cancel()

  // 获取到锁后,需要调用回调函数中的OnStartedLeading,运行controller的代码

  go le.config.Callbacks.OnStartedLeading(ctx)

   

  // 获取到锁后,需要不断地进行renew操作

  le.renew(ctx)

}

LeaderElector关键是需要acquire和renew的操作,acquire和renew操作代码如下:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

func (le *LeaderElector) acquire(ctx context.Context) bool {

  ctx, cancel := context.WithCancel(ctx)

  defer cancel()

  succeeded := false

  desc := le.config.Lock.Describe()

  klog.Infof("attempting to acquire leader lease %v...", desc)

  // 此接口会阻塞,利用定时的机制,获取锁,如果获取不到一直循环,除非ctx被取消。

  wait.JitterUntil(func() {

    // 获取锁

    succeeded = le.tryAcquireOrRenew(ctx)

    le.maybeReportTransition()

    if !succeeded {

      klog.V(4).Infof("failed to acquire lease %v", desc)

      return

    }

    le.config.Lock.RecordEvent("became leader")

    le.metrics.leaderOn(le.config.Name)

    klog.Infof("successfully acquired lease %v", desc)

    cancel()

  }, le.config.RetryPeriod, JitterFactor, true, ctx.Done())

  return succeeded

}

// renew loops calling tryAcquireOrRenew and returns immediately when tryAcquireOrRenew fails or ctx signals done.

func (le *LeaderElector) renew(ctx context.Context) {

  ctx, cancel := context.WithCancel(ctx)

  defer cancel()

  // 循环renew机制,renew成功,不会返回true,导致Until会不断循环

  wait.Until(func() {

    //RenewDeadline的实现在这里,如果renew超过了RenewDeadline,会导致renew失败,主退出

    timeoutCtx, timeoutCancel := context.WithTimeout(ctx, le.config.RenewDeadline)

    defer timeoutCancel()

    err := wait.PollImmediateUntil(le.config.RetryPeriod, func() (bool, error) {

      // renew锁

      return le.tryAcquireOrRenew(timeoutCtx), nil

    }, timeoutCtx.Done())

    le.maybeReportTransition()

    desc := le.config.Lock.Describe()

    if err == nil {

      klog.V(5).Infof("successfully renewed lease %v", desc)

      // renew成功

      return

    }

    le.config.Lock.RecordEvent("stopped leading")

    le.metrics.leaderOff(le.config.Name)

    klog.Infof("failed to renew lease %v: %v", desc, err)

    cancel()

  }, le.config.RetryPeriod, ctx.Done())

  // if we hold the lease, give it up

  if le.config.ReleaseOnCancel {

    le.release()

  }

}

关键的实现在于tryAcquireOrRenew,而tryAcquireOrRenew就是依赖锁的状态转移机制完成核心逻辑。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool {

  now := metav1.Now()

  leaderElectionRecord := rl.LeaderElectionRecord{

    HolderIdentity:       le.config.Lock.Identity(),

    LeaseDurationSeconds: int(le.config.LeaseDuration / time.Second),

    RenewTime:            now,

    AcquireTime:          now,

  }

  // 1. obtain or create the ElectionRecord

  // 检查锁有没有

  oldLeaderElectionRecord, oldLeaderElectionRawRecord, err := le.config.Lock.Get(ctx)

  if err != nil {

    // 没有锁的资源,就创建一个

    if !errors.IsNotFound(err) {

      klog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err)

      return false

    }

    if err = le.config.Lock.Create(ctx, leaderElectionRecord); err != nil {

      klog.Errorf("error initially creating leader election record: %v", err)

      return false

    }

    //对外宣称自己成为了leader

    le.setObservedRecord(&leaderElectionRecord)

    return true

  }

  // 2. Record obtained, check the Identity & Time

  if !bytes.Equal(le.observedRawRecord, oldLeaderElectionRawRecord) {

    // 这个机制很重要,会如果leader会不断正常renew这个锁,oldLeaderElectionRawRecord会一直发生变化,发生变化会更新le.observedTime

    le.setObservedRecord(oldLeaderElectionRecord)

    le.observedRawRecord = oldLeaderElectionRawRecord

  }

  // 如果还没超时并且此实例不是leader(leader是其他实例),那么就直接退出

  if len(oldLeaderElectionRecord.HolderIdentity) > 0 &&

    le.observedTime.Add(le.config.LeaseDuration).After(now.Time) &&

    !le.IsLeader() {

    klog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity)

    return false

  }

  // 3. We're going to try to update. The leaderElectionRecord is set to it's default

  // here. Let's correct it before updating.

  // 如果是leader,就更新时间RenewTime,保证其他实例(非主)可以观察到:主还活着

  if le.IsLeader() {

    leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime

    leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions

  } else {

  // 不是leader,那么锁就发生了转移

    leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1

  }

  // 更新锁

  // update the lock itself

  if err = le.config.Lock.Update(ctx, leaderElectionRecord); err != nil {

    klog.Errorf("Failed to update lock: %v", err)

    return false

  }

  le.setObservedRecord(&leaderElectionRecord)

  return true

}


版权声明 : 本文内容来源于互联网或用户自行发布贡献,该文观点仅代表原作者本人。本站仅提供信息存储空间服务和不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权, 违法违规的内容, 请发送邮件至2530232025#qq.cn(#换@)举报,一经查实,本站将立刻删除。
原文链接 : https://juejin.cn/post/7157648925078323207
相关文章
  • 教你如何优雅处理Golang中的异常
    我们在使用Golang时,不可避免会遇到异常情况的处理,与Java、Python等语言不同的是,Go中并没有try...catch...这样的语句块,我们知道在Java中
  • Go语言k8s kubernetes使用leader election实现选举

    Go语言k8s kubernetes使用leader election实现选举
    在kubernetes的世界中,很多组件仅仅需要一个实例在运行,比如controller-manager或第三方的controller,但是为了高可用性,需要组件有多个副本,
  • golang中的defer函数理解
    golang的defer 什么是defer defer的的官方文档:https://golang.org/ref/spec#Defer_statements go语言中defer可以完成延迟功能,当前函数执行完成后再执行defer的
  • Windows系统中搭建Go语言开发环境图文介绍

    Windows系统中搭建Go语言开发环境图文介绍
    本文详细讲述如何在 Windows 系统上搭建 Go语言的开发环境,以供借鉴或参考。文章将介绍Go语言的VSCode、GoLand、Vim三种开发环境,大家可以灵
  • 深入理解Golang channel的应用
    channel是用于 goroutine 之间的同步、通信的数据结构 channel 的底层是通过 mutex 来控制并发的,但它为程序员提供了更高一层次的抽象,封装了
  • 基于GORM实现CreateOrUpdate的方法
    CreateOrUpdate 是业务开发中很常见的场景,我们支持用户对某个业务实体进行创建/配置。希望实现的 repository 接口要达到以下两个要求: 如果
  • Golang中的内存逃逸的介绍
    什么是内存逃逸分析 内存逃逸分析是go的编译器在编译期间,根据变量的类型和作用域,确定变量是堆上还是栈上 简单说就是编译器在编译
  • Golang自旋锁的介绍
    自旋锁 获取锁的线程一直处于活跃状态,但是并没有执行任何有效的任务,使用这种锁会造成busy-waiting。 它是为实现保护共享资源而提出的
  • Go语言读写锁RWMutex的源码

    Go语言读写锁RWMutex的源码
    在前面两篇文章中初见 Go Mutex、Go Mutex 源码详解,我们学习了Go语言中的Mutex,它是一把互斥锁,每次只允许一个goroutine进入临界区,可以保
  • Go项目实现优雅关机与平滑重启功能
    什么是优雅关机? 优雅关机就是服务端关机命令发出后不是立即关机,而是等待当前还在处理的请求全部处理完毕后再退出程序,是一种对
  • 本站所有内容来源于互联网或用户自行发布,本站仅提供信息存储空间服务,不拥有版权,不承担法律责任。如有侵犯您的权益,请您联系站长处理!
  • Copyright © 2017-2022 F11.CN All Rights Reserved. F11站长开发者网 版权所有 | 苏ICP备2022031554号-1 | 51LA统计