DEV Community

cylon
cylon

Posted on

1

深入解析kubernetes中的选举机制

Overview

在 Kubernetes的 kube-controller-manager , kube-scheduler, 以及使用 Operator 的底层实现 controller-rumtime 都支持高可用系统中的leader选举,本文将以理解 controller-rumtime (底层的实现是 client-go) 中的leader选举以在kubernetes controller中是如何实现的。

Background

在运行 kube-controller-manager 时,是有一些参数提供给cm进行leader选举使用的,可以参考官方文档提供的 参数 来了解相关参数。

--leader-elect                               Default: true
--leader-elect-renew-deadline duration       Default: 10s
--leader-elect-resource-lock string          Default: "leases"
--leader-elect-resource-name string          Default: "kube-controller-manager"
--leader-elect-resource-namespace string     Default: "kube-system"
--leader-elect-retry-period duration         Default: 2s
...
Enter fullscreen mode Exit fullscreen mode

本身以为这些组件的选举动作时通过etcd进行的,但是后面对 controller-runtime 学习时,发现并没有配置其相关的etcd相关参数,这就引起了对选举机制的好奇。怀着这种好奇心搜索了下有关于 kubernetes的选举,发现官网是这么介绍的,下面是对官方的说明进行一个通俗总结。simple leader election with kubernetes

通过阅读文章得知,kubernetes API 提供了一中选举机制,只要运行在集群内的容器,都是可以实现选举功能的。

Kubernetes API通过提供了两个属性来完成选举动作的

  • ResourceVersions:每个API对象唯一一个ResourceVersion
  • Annotations:每个API对象都可以对这些key进行注释

注:这种选举会增加APIServer的压力。也就对etcd会产生影响

那么有了这些信息之后,我们来看一下,在Kubernetes集群中,谁是cm的leader(我们提供的集群只有一个节点,所以本节点就是leader)

在Kubernetes中所有启用了leader选举的服务都会生成一个 EndPoint ,在这个 EndPoint 中会有上面提到的label(Annotations)来标识谁是leader。

$ kubectl get ep -n kube-system
NAME                      ENDPOINTS   AGE
kube-controller-manager   <none>      3d4h
kube-dns                              3d4h
kube-scheduler            <none>      3d4h
Enter fullscreen mode Exit fullscreen mode

这里以 kube-controller-manager 为例,来看下这个 EndPoint 有什么信息

[root@master-machine ~]# kubectl describe ep kube-controller-manager -n kube-system
Name:         kube-controller-manager
Namespace:    kube-system
Labels:       <none>
Annotations:  control-plane.alpha.kubernetes.io/leader:
                {"holderIdentity":"master-machine_06730140-a503-487d-850b-1fe1619f1fe1","leaseDurationSeconds":15,"acquireTime":"2022-06-27T15:30:46Z","re...
Subsets:
Events:
  Type    Reason          Age    From                     Message
  ----    ------          ----   ----                     -------
  Normal  LeaderElection  2d22h  kube-controller-manager  master-machine_76aabcb5-49ff-45ff-bd18-4afa61fbc5af became leader
  Normal  LeaderElection  9m     kube-controller-manager  master-machine_06730140-a503-487d-850b-1fe1619f1fe1 became leader
Enter fullscreen mode Exit fullscreen mode

可以看出 Annotations: control-plane.alpha.kubernetes.io/leader: 标出了哪个node是leader。

election in controller-runtime

controller-runtime 有关leader选举的部分在 pkg/leaderelection 下面,总共100行代码,我们来看下做了些什么?

可以看到,这里只提供了创建资源锁的一些选项

type Options struct {
    // 在manager启动时,决定是否进行选举
    LeaderElection bool
    // 使用那种资源锁 默认为租用 lease
    LeaderElectionResourceLock string
    // 选举发生的名称空间
    LeaderElectionNamespace string
    // 该属性将决定持有leader锁资源的名称
    LeaderElectionID string
}
Enter fullscreen mode Exit fullscreen mode

通过 NewResourceLock 可以看到,这里是走的 client-go/tools/leaderelection下面,而这个leaderelection也有一个 example 来学习如何使用它。

通过 example 可以看到,进入选举的入口是一个 RunOrDie() 的函数

// 这里使用了一个lease锁,注释中说愿意为集群中存在lease的监听较少
lock := &resourcelock.LeaseLock{
    LeaseMeta: metav1.ObjectMeta{
        Name:      leaseLockName,
        Namespace: leaseLockNamespace,
    },
    Client: client.CoordinationV1(),
    LockConfig: resourcelock.ResourceLockConfig{
        Identity: id,
    },
}

// 开启选举循环
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
    Lock: lock,
    // 这里必须保证拥有的租约在调用cancel()前终止,否则会仍有一个loop在运行
    ReleaseOnCancel: true,
    LeaseDuration:   60 * time.Second,
    RenewDeadline:   15 * time.Second,
    RetryPeriod:     5 * time.Second,
    Callbacks: leaderelection.LeaderCallbacks{
        OnStartedLeading: func(ctx context.Context) {
            // 这里填写你的代码,
            // usually put your code
            run(ctx)
        },
        OnStoppedLeading: func() {
            // 这里清理你的lease
            klog.Infof("leader lost: %s", id)
            os.Exit(0)
        },
        OnNewLeader: func(identity string) {
            // we're notified when new leader elected
            if identity == id {
                // I just got the lock
                return
            }
            klog.Infof("new leader elected: %s", identity)
        },
    },
})
Enter fullscreen mode Exit fullscreen mode

到这里,我们了解了锁的概念和如何启动一个锁,下面看下,client-go都提供了那些锁。

在代码 tools/leaderelection/resourcelock/interface.go 定义了一个锁抽象,interface提供了一个通用接口,用于锁定leader选举中使用的资源。

type Interface interface {
    // Get 返回选举记录
    Get(ctx context.Context) (*LeaderElectionRecord, []byte, error)

    // Create 创建一个LeaderElectionRecord
    Create(ctx context.Context, ler LeaderElectionRecord) error

    // Update will update and existing LeaderElectionRecord
    Update(ctx context.Context, ler LeaderElectionRecord) error

    // RecordEvent is used to record events
    RecordEvent(string)

    // Identity 返回锁的标识
    Identity() string

    // Describe is used to convert details on current resource lock into a string
    Describe() string
}
Enter fullscreen mode Exit fullscreen mode

那么实现这个抽象接口的就是,实现的资源锁,我们可以看到,client-go提供了四种资源锁

  • leaselock
  • configmaplock
  • multilock
  • endpointlock

leaselock

Lease是kubernetes控制平面中的通过ETCD来实现的一个Leases的资源,主要为了提供分布式租约的一种控制机制。相关对这个API的描述可以参考于:Lease

在Kubernetes集群中,我们可以使用如下命令来查看对应的lease

$ kubectl get leases -A
NAMESPACE         NAME                      HOLDER                                                AGE
kube-node-lease   master-machine            master-machine                                        3d19h
kube-system       kube-controller-manager   master-machine_06730140-a503-487d-850b-1fe1619f1fe1   3d19h
kube-system       kube-scheduler            master-machine_1724e2d9-c19c-48d7-ae47-ee4217b27073   3d19h

$ kubectl describe leases kube-controller-manager -n kube-system
Name:         kube-controller-manager
Namespace:    kube-system
Labels:       <none>
Annotations:  <none>
API Version:  coordination.k8s.io/v1
Kind:         Lease
Metadata:
  Creation Timestamp:  2022-06-24T11:01:51Z
  Managed Fields:
    API Version:  coordination.k8s.io/v1
    Fields Type:  FieldsV1
    fieldsV1:
      f:spec:
        f:acquireTime:
        f:holderIdentity:
        f:leaseDurationSeconds:
        f:leaseTransitions:
        f:renewTime:
    Manager:         kube-controller-manager
    Operation:       Update
    Time:            2022-06-24T11:01:51Z
  Resource Version:  56012
  Self Link:         /apis/coordination.k8s.io/v1/namespaces/kube-system/leases/kube-controller-manager
  UID:               851a32d2-25dc-49b6-a3f7-7a76f152f071
Spec:
  Acquire Time:            2022-06-27T15:30:46.000000Z
  Holder Identity:         master-machine_06730140-a503-487d-850b-1fe1619f1fe1
  Lease Duration Seconds:  15
  Lease Transitions:       2
  Renew Time:              2022-06-28T06:09:26.837773Z
Events:                    <none>
Enter fullscreen mode Exit fullscreen mode

下面来看下leaselock的实现,leaselock会实现了作为资源锁的抽象

type LeaseLock struct {
    // LeaseMeta 就是类似于其他资源类型的属性,包含name ns 以及其他关于lease的属性
    LeaseMeta  metav1.ObjectMeta
    Client     coordinationv1client.LeasesGetter // Client 就是提供了informer中的功能
    // lockconfig包含上面通过 describe 看到的 Identity与recoder用于记录资源锁的更改
    LockConfig ResourceLockConfig
    // lease 就是 API中的Lease资源,可以参考下上面给出的这个API的使用
    lease      *coordinationv1.Lease
}
Enter fullscreen mode Exit fullscreen mode

下面来看下leaselock实现了那些方法?

Get

Get 是从spec中返回选举的记录

func (ll *LeaseLock) Get(ctx context.Context) (*LeaderElectionRecord, []byte, error) {
    var err error
    ll.lease, err = ll.Client.Leases(ll.LeaseMeta.Namespace).Get(ctx, ll.LeaseMeta.Name, metav1.GetOptions{})
    if err != nil {
        return nil, nil, err
    }
    record := LeaseSpecToLeaderElectionRecord(&ll.lease.Spec)
    recordByte, err := json.Marshal(*record)
    if err != nil {
        return nil, nil, err
    }
    return record, recordByte, nil
}

// 可以看出是返回这个资源spec里面填充的值
func LeaseSpecToLeaderElectionRecord(spec *coordinationv1.LeaseSpec) *LeaderElectionRecord {
    var r LeaderElectionRecord
    if spec.HolderIdentity != nil {
        r.HolderIdentity = *spec.HolderIdentity
    }
    if spec.LeaseDurationSeconds != nil {
        r.LeaseDurationSeconds = int(*spec.LeaseDurationSeconds)
    }
    if spec.LeaseTransitions != nil {
        r.LeaderTransitions = int(*spec.LeaseTransitions)
    }
    if spec.AcquireTime != nil {
        r.AcquireTime = metav1.Time{spec.AcquireTime.Time}
    }
    if spec.RenewTime != nil {
        r.RenewTime = metav1.Time{spec.RenewTime.Time}
    }
    return &r
}
Enter fullscreen mode Exit fullscreen mode

Create

Create 是在kubernetes集群中尝试去创建一个租约,可以看到,Client就是API提供的对应资源的REST客户端,结果会在Kubernetes集群中创建这个Lease

func (ll *LeaseLock) Create(ctx context.Context, ler LeaderElectionRecord) error {
    var err error
    ll.lease, err = ll.Client.Leases(ll.LeaseMeta.Namespace).Create(ctx, &coordinationv1.Lease{
        ObjectMeta: metav1.ObjectMeta{
            Name:      ll.LeaseMeta.Name,
            Namespace: ll.LeaseMeta.Namespace,
        },
        Spec: LeaderElectionRecordToLeaseSpec(&ler),
    }, metav1.CreateOptions{})
    return err
}
Enter fullscreen mode Exit fullscreen mode

Update

Update 是更新Lease的spec

func (ll *LeaseLock) Update(ctx context.Context, ler LeaderElectionRecord) error {
    if ll.lease == nil {
        return errors.New("lease not initialized, call get or create first")
    }
    ll.lease.Spec = LeaderElectionRecordToLeaseSpec(&ler)

    lease, err := ll.Client.Leases(ll.LeaseMeta.Namespace).Update(ctx, ll.lease, metav1.UpdateOptions{})
    if err != nil {
        return err
    }

    ll.lease = lease
    return nil
}
Enter fullscreen mode Exit fullscreen mode

RecordEvent

RecordEvent 是记录选举时出现的事件,这时候我们回到上部分 在kubernetes集群中查看 ep 的信息时可以看到的event中存在 became leader 的事件,这里就是将产生的这个event添加到 meta-data 中。

func (ll *LeaseLock) RecordEvent(s string) {
   if ll.LockConfig.EventRecorder == nil {
      return
   }
   events := fmt.Sprintf("%v %v", ll.LockConfig.Identity, s)
   subject := &coordinationv1.Lease{ObjectMeta: ll.lease.ObjectMeta}
   // Populate the type meta, so we don't have to get it from the schema
   subject.Kind = "Lease"
   subject.APIVersion = coordinationv1.SchemeGroupVersion.String()
   ll.LockConfig.EventRecorder.Eventf(subject, corev1.EventTypeNormal, "LeaderElection", events)
}
Enter fullscreen mode Exit fullscreen mode

到这里大致上了解了资源锁究竟是什么了,其他种类的资源锁也是相同的实现的方式,这里就不过多阐述了;下面的我们来看看选举的过程。

election workflow

选举的代码入口是在 leaderelection.go ,这里会继续上面的 example 向下分析整个选举的过程。

前面我们看到了进入选举的入口是一个 RunOrDie() 的函数,那么就继续从这里开始来了解。进入 RunOrDie,看到其实只有几行而已,大致上了解到了RunOrDie会使用提供的配置来启动选举的客户端,之后会阻塞,直到 ctx 退出,或停止持有leader的租约。

func RunOrDie(ctx context.Context, lec LeaderElectionConfig) {
    le, err := NewLeaderElector(lec)
    if err != nil {
        panic(err)
    }
    if lec.WatchDog != nil {
        lec.WatchDog.SetLeaderElection(le)
    }
    le.Run(ctx)
}
Enter fullscreen mode Exit fullscreen mode

下面看下 NewLeaderElector 做了些什么?可以看到,LeaderElector是一个结构体,这里只是创建他,这个结构体提供了我们选举中所需要的一切(LeaderElector就是RunOrDie创建的选举客户端)。

func NewLeaderElector(lec LeaderElectionConfig) (*LeaderElector, error) {
    if lec.LeaseDuration <= lec.RenewDeadline {
        return nil, fmt.Errorf("leaseDuration must be greater than renewDeadline")
    }
    if lec.RenewDeadline <= time.Duration(JitterFactor*float64(lec.RetryPeriod)) {
        return nil, fmt.Errorf("renewDeadline must be greater than retryPeriod*JitterFactor")
    }
    if lec.LeaseDuration < 1 {
        return nil, fmt.Errorf("leaseDuration must be greater than zero")
    }
    if lec.RenewDeadline < 1 {
        return nil, fmt.Errorf("renewDeadline must be greater than zero")
    }
    if lec.RetryPeriod < 1 {
        return nil, fmt.Errorf("retryPeriod must be greater than zero")
    }
    if lec.Callbacks.OnStartedLeading == nil {
        return nil, fmt.Errorf("OnStartedLeading callback must not be nil")
    }
    if lec.Callbacks.OnStoppedLeading == nil {
        return nil, fmt.Errorf("OnStoppedLeading callback must not be nil")
    }

    if lec.Lock == nil {
        return nil, fmt.Errorf("Lock must not be nil.")
    }
    le := LeaderElector{
        config:  lec,
        clock:   clock.RealClock{},
        metrics: globalMetricsFactory.newLeaderMetrics(),
    }
    le.metrics.leaderOff(le.config.Name)
    return &le, nil
}
Enter fullscreen mode Exit fullscreen mode

LeaderElector 是建立的选举客户端,

type LeaderElector struct {
    config LeaderElectionConfig // 这个的配置,包含一些时间参数,健康检查
    // recoder相关属性
    observedRecord    rl.LeaderElectionRecord
    observedRawRecord []byte
    observedTime      time.Time
    // used to implement OnNewLeader(), may lag slightly from the
    // value observedRecord.HolderIdentity if the transition has
    // not yet been reported.
    reportedLeader string
    // clock is wrapper around time to allow for less flaky testing
    clock clock.Clock
    // 锁定 observedRecord
    observedRecordLock sync.Mutex
    metrics leaderMetricsAdapter
}
Enter fullscreen mode Exit fullscreen mode

可以看到 Run 实现的选举逻辑就是在初始化客户端时传入的 三个 callback

func (le *LeaderElector) Run(ctx context.Context) {
    defer runtime.HandleCrash()
    defer func() { // 退出时执行callbacke的OnStoppedLeading
        le.config.Callbacks.OnStoppedLeading()
    }()

    if !le.acquire(ctx) {
        return
    }
    ctx, cancel := context.WithCancel(ctx)
    defer cancel()
    go le.config.Callbacks.OnStartedLeading(ctx) // 选举时,执行 OnStartedLeading
    le.renew(ctx)
}
Enter fullscreen mode Exit fullscreen mode

在 Run 中调用了 acquire,这个是 通过一个loop去调用 tryAcquireOrRenew,直到ctx传递过来结束信号

func (le *LeaderElector) acquire(ctx context.Context) bool {
    ctx, cancel := context.WithCancel(ctx)
    defer cancel()
    succeeded := false
    desc := le.config.Lock.Describe()
    klog.Infof("attempting to acquire leader lease %v...", desc)
    // jitterUntil是执行定时的函数 func() 是定时任务的逻辑
    // RetryPeriod是周期间隔
    // JitterFactor 是重试系数,类似于延迟队列中的系数 (duration + maxFactor * duration)
    // sliding 逻辑是否计算在时间内
    // 上下文传递
    wait.JitterUntil(func() {
        succeeded = le.tryAcquireOrRenew(ctx)
        le.maybeReportTransition()
        if !succeeded {
            klog.V(4).Infof("failed to acquire lease %v", desc)
            return
        }
        le.config.Lock.RecordEvent("became leader")
        le.metrics.leaderOn(le.config.Name)
        klog.Infof("successfully acquired lease %v", desc)
        cancel()
    }, le.config.RetryPeriod, JitterFactor, true, ctx.Done())
    return succeeded
}
Enter fullscreen mode Exit fullscreen mode

这里实际上选举动作在 tryAcquireOrRenew 中,下面来看下tryAcquireOrRenew;tryAcquireOrRenew 是尝试获得一个leader租约,如果已经获得到了,则更新租约;否则可以得到租约则为true,反之false

func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool {
    now := metav1.Now() // 时间
    leaderElectionRecord := rl.LeaderElectionRecord{ // 构建一个选举record
        HolderIdentity:       le.config.Lock.Identity(), // 选举人的身份特征,ep与主机名有关
        LeaseDurationSeconds: int(le.config.LeaseDuration / time.Second), // 默认15s
        RenewTime:            now, // 重新获取时间
        AcquireTime:          now, // 获得时间
    }

    // 1. 从API获取或创建一个recode,如果可以拿到则已经有租约,反之创建新租约
    oldLeaderElectionRecord, oldLeaderElectionRawRecord, err := le.config.Lock.Get(ctx)
    if err != nil {
        if !errors.IsNotFound(err) {
            klog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err)
            return false
        }
        // 创建租约的动作就是新建一个对应的resource,这个lock就是leaderelection提供的四种锁,
        // 看你在runOrDie中初始化传入了什么锁
        if err = le.config.Lock.Create(ctx, leaderElectionRecord); err != nil {
            klog.Errorf("error initially creating leader election record: %v", err)
            return false
        }
        // 到了这里就已经拿到或者创建了租约,然后记录其一些属性,LeaderElectionRecord
        le.setObservedRecord(&leaderElectionRecord)

        return true
    }

    // 2. 获取记录检查身份和时间
    if !bytes.Equal(le.observedRawRecord, oldLeaderElectionRawRecord) {
        le.setObservedRecord(oldLeaderElectionRecord)

        le.observedRawRecord = oldLeaderElectionRawRecord
    }
    if len(oldLeaderElectionRecord.HolderIdentity) > 0 &&
        le.observedTime.Add(le.config.LeaseDuration).After(now.Time) &&
        !le.IsLeader() { // 不是leader,进行HolderIdentity比较,再加上时间,这个时候没有到竞选其,跳出
        klog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity)
        return false
    }

    // 3.我们将尝试更新。 在这里leaderElectionRecord设置为默认值。让我们在更新之前更正它。
    if le.IsLeader() { // 到这就说明是leader,修正他的时间
        leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime
        leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions
    } else { // LeaderTransitions 就是指leader调整(转变为其他)了几次,如果是,
        // 则为发生转变,保持原有值
        // 反之,则+1
        leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1
    }
    // 完事之后更新APIServer中的锁资源,也就是更新对应的资源的属性信息
    if err = le.config.Lock.Update(ctx, leaderElectionRecord); err != nil {
        klog.Errorf("Failed to update lock: %v", err)
        return false
    }
    // setObservedRecord 是通过一个新的record来更新这个锁中的record
    // 操作是安全的,会上锁保证临界区仅可以被一个线程/进程操作
    le.setObservedRecord(&leaderElectionRecord)
    return true
}
Enter fullscreen mode Exit fullscreen mode

summary

到这里,已经完整知道利用kubernetes进行选举的流程都是什么了;下面简单回顾下,上述leader选举所有的步骤:

  • 首选创建的服务就是该服务的leader,锁可以为 lease , endpoint 等资源进行上锁
  • 已经是leader的实例会不断续租,租约的默认值是15秒 (leaseDuration);leader在租约满时更新租约时间(renewTime)。
  • 其他的follower,会不断检查对应资源锁的存在,如果已经有leader,那么则检查 renewTime,如果超过了租用时间(),则表明leader存在问题需要重新启动选举,直到有follower提升为leader。
  • 而为了避免资源被抢占,Kubernetes API使用了 ResourceVersion 来避免被重复修改(如果版本号与请求版本号不一致,则表示已经被修改了,那么APIServer将返回错误)

Reference

Kubernetes 并发控制与数据一致性的实现原理

Controller manager 的高可用实现方式

deep dive into kubernetes simple leader election

AWS Security LIVE!

Join us for AWS Security LIVE!

Discover the future of cloud security. Tune in live for trends, tips, and solutions from AWS and AWS Partners.

Learn More

Top comments (0)

Sentry image

See why 4M developers consider Sentry, “not bad.”

Fixing code doesn’t have to be the worst part of your day. Learn how Sentry can help.

Learn more

👋 Kindness is contagious

Please leave a ❤️ or a friendly comment on this post if you found it helpful!

Okay