Skip to content

Commit

Permalink
feat: use client-go leader election
Browse files Browse the repository at this point in the history
  • Loading branch information
adityathebe committed Oct 15, 2024
1 parent 921bc1b commit 6ffe025
Showing 1 changed file with 66 additions and 106 deletions.
172 changes: 66 additions & 106 deletions leader/election.go
Original file line number Diff line number Diff line change
@@ -1,139 +1,99 @@
package leader

import (
gocontext "context"
"errors"
"fmt"
"log"
"math/rand"
"os"
"sync/atomic"
"strings"
"time"

"github.com/flanksource/duty/context"
"github.com/samber/lo"

v1 "k8s.io/api/coordination/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"

"github.com/flanksource/duty/context"
)

var (
identity = getHostname()
hostname string
service string

// namespace the pod is running on
namespace string
)

var isLeader *atomic.Bool
const namespaceFilePath = "/var/run/secrets/kubernetes.io/serviceaccount/namespace"

func getHostname() string {
hostname, err := os.Hostname()
if err != nil {
log.Fatalf("Failed to get hostname: %v", err)
func podNamespace() (string, error) {
// passed using K8s downwards API
if ns, ok := os.LookupEnv("POD_NAMESPACE"); ok {
return ns, nil
}
return hostname
}

var watchers = []func(isLeader bool){}

func OnElection(ctx context.Context, leaseName string, fn func(isLeader bool)) {
watchers = append(watchers, fn)

if isLeader == nil {
leaseDuration := ctx.Properties().Duration("leader.lease.duration", 10*time.Minute)
isLeader = new(atomic.Bool)
go func() {
for {

leader, _, err := createOrUpdateLease(ctx, leaseName, 0)

if err != nil {
ctx.Warnf("failed to create/update lease: %v", err)
time.Sleep(time.Duration(rand.Intn(60)) * time.Second)
continue
}

if isLeader.Load() != leader {
isLeader.Store(leader)
notify(leader)
}

// sleep for just under half the lease duration before trying to renew
time.Sleep(leaseDuration/2 - time.Second*10)
}
}()
data, err := os.ReadFile(namespaceFilePath)
if err != nil {
return "", fmt.Errorf("failed to read namespace file: %w", err)
}

}

func notify(isLeader bool) {
for _, fn := range watchers {
fn(isLeader)
ns := strings.TrimSpace(string(data))
if ns == "" {
return "", errors.New("namespace was neither found in the env nor in the service account path")
}

return ns, nil
}

func IsLeader(ctx context.Context, leaseName string) (bool, error) {
leases := ctx.Kubernetes().CoordinationV1().Leases(ctx.GetNamespace())

lease, err := leases.Get(ctx, leaseName, metav1.GetOptions{})
func init() {
var err error
hostname, err = os.Hostname()
if err != nil {
return false, err
log.Fatalf("failed to get hostname: %v", err)
}

if *lease.Spec.HolderIdentity == identity {
return true, nil
if n, err := podNamespace(); err != nil {
log.Fatalf("failed to get pod namespace: %v", err)
} else {
namespace = n
}

return false, nil
service = strings.Split(hostname, "-")[0]
}

func createOrUpdateLease(ctx context.Context, leaseName string, attempt int) (bool, string, error) {
if attempt > 0 {
time.Sleep(time.Duration(rand.Intn(5000)) * time.Millisecond)
}
if attempt >= ctx.Properties().Int("leader.lease.attempts", 3) {
return false, "", fmt.Errorf("failed to acquire lease %s after %d attempts", leaseName, attempt)
}
now := metav1.MicroTime{Time: time.Now()}
leases := ctx.Kubernetes().CoordinationV1().Leases(ctx.GetNamespace())
leaseDuration := ctx.Properties().Duration("leader.lease.duration", 10*time.Minute)
lease, err := leases.Get(ctx, leaseName, metav1.GetOptions{})
if err != nil {
return false, "", err
}
if lease == nil {
ctx.Infof("Acquiring lease %s", leaseName)
lease = &v1.Lease{
ObjectMeta: metav1.ObjectMeta{
Name: leaseName,
Namespace: ctx.GetNamespace(),
},
Spec: v1.LeaseSpec{
HolderIdentity: &identity,
LeaseDurationSeconds: lo.ToPtr(int32(leaseDuration.Seconds())),
AcquireTime: &now,
RenewTime: &now,
},
}
_, err = leases.Create(ctx, lease, metav1.CreateOptions{})
if err != nil {
return false, "", err
}
return true, identity, nil
func Register(
ctx context.Context,
onLead func(ctx gocontext.Context),
onStoppedLead func(),
onNewLeader func(identity string),
) {
lock := &resourcelock.LeaseLock{
LeaseMeta: metav1.ObjectMeta{
Name: service,
Namespace: namespace,
},
Client: ctx.Kubernetes().CoordinationV1(),
LockConfig: resourcelock.ResourceLockConfig{
Identity: hostname,
},
}

if lease.Spec.HolderIdentity != nil && *lease.Spec.HolderIdentity == identity {
lease.Spec.RenewTime = &now
ctx.Debugf("Renewing lease %s : %s", leaseName, now.String())
_, err = leases.Update(ctx, lease, metav1.UpdateOptions{})
if err != nil {
return false, "", err
}
}
renewTime := lease.Spec.RenewTime.Time
if time.Since(renewTime) > leaseDuration {
ctx.Infof("Lease %s held by %s expired", leaseName, *lease.Spec.HolderIdentity)
if err := leases.Delete(ctx, leaseName, metav1.DeleteOptions{}); err != nil {
ctx.Infof("failed to delete leases %s: %v", leaseName, err)
}
return createOrUpdateLease(ctx, leaseName, attempt+1)
}
ctx.Debugf("Lease %s already held by %s, expires in %s", leaseName, *lease.Spec.HolderIdentity, time.Until(renewTime.Add(leaseDuration)).String())
return false, *lease.Spec.HolderIdentity, nil
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Lock: lock,
ReleaseOnCancel: true,
LeaseDuration: ctx.Properties().Duration("leader.lease.duration", 30*time.Second),
RenewDeadline: 15 * time.Second,
RetryPeriod: 5 * time.Second,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: onLead,
OnStoppedLeading: onStoppedLead,
OnNewLeader: func(identity string) {
if identity == hostname {
return
}

onNewLeader(identity)
},
},
})
}

0 comments on commit 6ffe025

Please sign in to comment.