Skip to content

Commit

Permalink
feat: manage all crons from elector
Browse files Browse the repository at this point in the history
  • Loading branch information
adityathebe committed Oct 16, 2024
1 parent 5908bd4 commit 9c45330
Showing 1 changed file with 17 additions and 6 deletions.
23 changes: 17 additions & 6 deletions leader/election.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,23 @@ func Register(
RetryPeriod: 5 * time.Second,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(leadCtx gocontext.Context) {
ctx.Infof("started leading election")

updateLeaderLabel(ctx, app)
onLead(leadCtx)

for entry := range echo.Crons.IterBuffered() {
entry.Val.Start()
}

if onLead != nil {
onLead(leadCtx)
}
},
OnStoppedLeading: func() {
for _, k := range echo.Crons.Items() {
k.Stop()
ctx.Infof("stopped leading election")

for entry := range echo.Crons.IterBuffered() {
entry.Val.Stop()
}

if onStoppedLead != nil {
Expand Down Expand Up @@ -172,13 +183,13 @@ func updateLeaderLabel(ctx context.Context, app string) {
pods := lo.Map(podList.Items, func(p corev1.Pod, _ int) string { return p.Name })
pods = append(pods, hostname)

for _, podName := range pods {
for _, podName := range lo.Uniq(pods) {
var payload string
if podName == hostname {
ctx.Infof("adding leader metadata from pod: %s", podName)
ctx.Infof("adding leader metadata to pod: %s", podName)
payload = fmt.Sprintf(`{"metadata":{"labels":{"%s/leader":"true"}}}`, app)
} else {
ctx.Infof("removing leader metadata from pod: %s", podName)
ctx.Infof("removing leader metadata to pod: %s", podName)
payload = fmt.Sprintf(`{"metadata":{"labels":{"%s/leader": null}}}`, app)
}

Expand Down

0 comments on commit 9c45330

Please sign in to comment.