From 6ffe02550b9ad7869ea8e0c39c35bd1306484d25 Mon Sep 17 00:00:00 2001 From: Aditya Thebe Date: Tue, 15 Oct 2024 09:40:28 +0545 Subject: [PATCH] feat: use client-go leader election --- leader/election.go | 172 +++++++++++++++++---------------------------- 1 file changed, 66 insertions(+), 106 deletions(-) diff --git a/leader/election.go b/leader/election.go index 962328ef..b786acdf 100644 --- a/leader/election.go +++ b/leader/election.go @@ -1,139 +1,99 @@ package leader import ( + gocontext "context" + "errors" "fmt" "log" - "math/rand" "os" - "sync/atomic" + "strings" "time" - "github.com/flanksource/duty/context" - "github.com/samber/lo" - - v1 "k8s.io/api/coordination/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/tools/leaderelection" + "k8s.io/client-go/tools/leaderelection/resourcelock" + + "github.com/flanksource/duty/context" ) var ( - identity = getHostname() + hostname string + service string + + // namespace the pod is running on + namespace string ) -var isLeader *atomic.Bool +const namespaceFilePath = "/var/run/secrets/kubernetes.io/serviceaccount/namespace" -func getHostname() string { - hostname, err := os.Hostname() - if err != nil { - log.Fatalf("Failed to get hostname: %v", err) +func podNamespace() (string, error) { + // passed using K8s downwards API + if ns, ok := os.LookupEnv("POD_NAMESPACE"); ok { + return ns, nil } - return hostname -} - -var watchers = []func(isLeader bool){} - -func OnElection(ctx context.Context, leaseName string, fn func(isLeader bool)) { - watchers = append(watchers, fn) - - if isLeader == nil { - leaseDuration := ctx.Properties().Duration("leader.lease.duration", 10*time.Minute) - isLeader = new(atomic.Bool) - go func() { - for { - - leader, _, err := createOrUpdateLease(ctx, leaseName, 0) - - if err != nil { - ctx.Warnf("failed to create/update lease: %v", err) - time.Sleep(time.Duration(rand.Intn(60)) * time.Second) - continue - } - - if isLeader.Load() != leader { - isLeader.Store(leader) - notify(leader) - } - // sleep for just under half the lease duration before trying to renew - time.Sleep(leaseDuration/2 - time.Second*10) - } - }() + data, err := os.ReadFile(namespaceFilePath) + if err != nil { + return "", fmt.Errorf("failed to read namespace file: %w", err) } -} - -func notify(isLeader bool) { - for _, fn := range watchers { - fn(isLeader) + ns := strings.TrimSpace(string(data)) + if ns == "" { + return "", errors.New("namespace was neither found in the env nor in the service account path") } + return ns, nil } -func IsLeader(ctx context.Context, leaseName string) (bool, error) { - leases := ctx.Kubernetes().CoordinationV1().Leases(ctx.GetNamespace()) - - lease, err := leases.Get(ctx, leaseName, metav1.GetOptions{}) +func init() { + var err error + hostname, err = os.Hostname() if err != nil { - return false, err + log.Fatalf("failed to get hostname: %v", err) } - if *lease.Spec.HolderIdentity == identity { - return true, nil + if n, err := podNamespace(); err != nil { + log.Fatalf("failed to get pod namespace: %v", err) + } else { + namespace = n } - return false, nil + service = strings.Split(hostname, "-")[0] } -func createOrUpdateLease(ctx context.Context, leaseName string, attempt int) (bool, string, error) { - if attempt > 0 { - time.Sleep(time.Duration(rand.Intn(5000)) * time.Millisecond) - } - if attempt >= ctx.Properties().Int("leader.lease.attempts", 3) { - return false, "", fmt.Errorf("failed to acquire lease %s after %d attempts", leaseName, attempt) - } - now := metav1.MicroTime{Time: time.Now()} - leases := ctx.Kubernetes().CoordinationV1().Leases(ctx.GetNamespace()) - leaseDuration := ctx.Properties().Duration("leader.lease.duration", 10*time.Minute) - lease, err := leases.Get(ctx, leaseName, metav1.GetOptions{}) - if err != nil { - return false, "", err - } - if lease == nil { - ctx.Infof("Acquiring lease %s", leaseName) - lease = &v1.Lease{ - ObjectMeta: metav1.ObjectMeta{ - Name: leaseName, - Namespace: ctx.GetNamespace(), - }, - Spec: v1.LeaseSpec{ - HolderIdentity: &identity, - LeaseDurationSeconds: lo.ToPtr(int32(leaseDuration.Seconds())), - AcquireTime: &now, - RenewTime: &now, - }, - } - _, err = leases.Create(ctx, lease, metav1.CreateOptions{}) - if err != nil { - return false, "", err - } - return true, identity, nil +func Register( + ctx context.Context, + onLead func(ctx gocontext.Context), + onStoppedLead func(), + onNewLeader func(identity string), +) { + lock := &resourcelock.LeaseLock{ + LeaseMeta: metav1.ObjectMeta{ + Name: service, + Namespace: namespace, + }, + Client: ctx.Kubernetes().CoordinationV1(), + LockConfig: resourcelock.ResourceLockConfig{ + Identity: hostname, + }, } - if lease.Spec.HolderIdentity != nil && *lease.Spec.HolderIdentity == identity { - lease.Spec.RenewTime = &now - ctx.Debugf("Renewing lease %s : %s", leaseName, now.String()) - _, err = leases.Update(ctx, lease, metav1.UpdateOptions{}) - if err != nil { - return false, "", err - } - } - renewTime := lease.Spec.RenewTime.Time - if time.Since(renewTime) > leaseDuration { - ctx.Infof("Lease %s held by %s expired", leaseName, *lease.Spec.HolderIdentity) - if err := leases.Delete(ctx, leaseName, metav1.DeleteOptions{}); err != nil { - ctx.Infof("failed to delete leases %s: %v", leaseName, err) - } - return createOrUpdateLease(ctx, leaseName, attempt+1) - } - ctx.Debugf("Lease %s already held by %s, expires in %s", leaseName, *lease.Spec.HolderIdentity, time.Until(renewTime.Add(leaseDuration)).String()) - return false, *lease.Spec.HolderIdentity, nil + leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{ + Lock: lock, + ReleaseOnCancel: true, + LeaseDuration: ctx.Properties().Duration("leader.lease.duration", 30*time.Second), + RenewDeadline: 15 * time.Second, + RetryPeriod: 5 * time.Second, + Callbacks: leaderelection.LeaderCallbacks{ + OnStartedLeading: onLead, + OnStoppedLeading: onStoppedLead, + OnNewLeader: func(identity string) { + if identity == hostname { + return + } + + onNewLeader(identity) + }, + }, + }) }