Skip to content

Commit

Permalink
feat: leader election
Browse files Browse the repository at this point in the history
  • Loading branch information
adityathebe committed Oct 22, 2024
1 parent bc44648 commit af1f419
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 21 deletions.
40 changes: 28 additions & 12 deletions cmd/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ import (
"github.com/flanksource/canary-checker/pkg/controllers"
"github.com/flanksource/canary-checker/pkg/labels"
"github.com/flanksource/commons/logger"
"github.com/flanksource/duty/job"
dutyKubernetes "github.com/flanksource/duty/kubernetes"
"github.com/flanksource/duty/leader"
"github.com/flanksource/duty/shutdown"
"github.com/spf13/cobra"
"go.opentelemetry.io/otel"
Expand All @@ -37,10 +39,10 @@ var (
Use: "operator",
Short: "Start the kubernetes operator",
Run: func(cmd *cobra.Command, args []string) {
if err := run(cmd, args); err != nil {
if err := run(); err != nil {
shutdown.ShutdownAndExit(1, err.Error())
} else {
shutdown.ShutdownAndExit(0, err.Error())
shutdown.ShutdownAndExit(0, "")
}
},
}
Expand All @@ -56,15 +58,7 @@ func init() {
// +kubebuilder:scaffold:scheme
}

func run(cmd *cobra.Command, args []string) error {
logger := logger.GetLogger("operator")
logger.SetLogLevel(k8sLogLevel)

scheme := runtime.NewScheme()

_ = clientgoscheme.AddToScheme(scheme)
_ = canaryv1.AddToScheme(scheme)

func run() error {
ctx, err := InitContext()
if err != nil {
return err
Expand All @@ -78,12 +72,23 @@ func run(cmd *cobra.Command, args []string) error {
return errors.New("operator requires a kubernetes connection")
}

ctx.WithTracer(otel.GetTracerProvider().Tracer("canary-checker"))
ctx.WithTracer(otel.GetTracerProvider().Tracer(app))

apicontext.DefaultContext = ctx.WithNamespace(runner.WatchNamespace)

cache.PostgresCache = cache.NewPostgresCache(apicontext.DefaultContext)

if enableLeaderElection {
job.DisableCronStartOnSchedule()

go func() {
err := leader.Register(ctx, app, runner.WatchNamespace, nil, nil, nil)
if err != nil {
shutdown.ShutdownAndExit(1, fmt.Sprintf("leader election failed: %v", err))
}
}()
}

if runner.OperatorExecutor {
logger.Infof("Starting executors")

Expand All @@ -94,6 +99,16 @@ func run(cmd *cobra.Command, args []string) error {
}

go serve()
return startControllers()
}

func startControllers() error {
scheme := runtime.NewScheme()
_ = clientgoscheme.AddToScheme(scheme)
_ = canaryv1.AddToScheme(scheme)

logger := logger.GetLogger("operator")
logger.SetLogLevel(k8sLogLevel)

ctrl.SetLogger(logr.FromSlogHandler(logger.Handler()))
setupLog := ctrl.Log.WithName("setup")
Expand All @@ -102,6 +117,7 @@ func run(cmd *cobra.Command, args []string) error {
Scheme: scheme,
LeaderElection: enableLeaderElection,
LeaderElectionNamespace: runner.WatchNamespace,
LeaderElectionID: "fa62cd4d.flanksource.com",
Metrics: ctrlMetrics.Options{
BindAddress: ":0",
},
Expand Down
10 changes: 6 additions & 4 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,26 @@ import (
"go.opentelemetry.io/otel"
)

const app = "canary-checker"

func InitContext() (context.Context, error) {
ctx, closer, err := duty.Start("canary-checker", duty.SkipChangelogMigration, duty.SkipMigrationByDefaultMode)
ctx, closer, err := duty.Start(app, duty.SkipChangelogMigration, duty.SkipMigrationByDefaultMode)
if err != nil {
return ctx, fmt.Errorf("Failed to initialize db: %v", err.Error())
return ctx, fmt.Errorf("failed to initialize db: %v", err.Error())
}
shutdown.AddHook(closer)

if err := properties.LoadFile(propertiesFile); err != nil {
return ctx, fmt.Errorf("failed to load properties: %v", err)
}

ctx.WithTracer(otel.GetTracerProvider().Tracer("canary-checker"))
ctx.WithTracer(otel.GetTracerProvider().Tracer(app))

return ctx, nil
}

var Root = &cobra.Command{
Use: "canary-checker",
Use: app,
PersistentPreRun: func(cmd *cobra.Command, args []string) {
logger.UseSlog()
shutdown.WaitForSignal()
Expand Down
2 changes: 1 addition & 1 deletion cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ var Run = &cobra.Command{
log.Fatalln("Must specify at least one canary")
}

ctx, closer, err := duty.Start("canary-checker", duty.ClientOnly, duty.SkipMigrationByDefaultMode)
ctx, closer, err := duty.Start(app, duty.ClientOnly, duty.SkipMigrationByDefaultMode)
if err != nil {
logger.Fatalf("Failed to initialize db: %v", err.Error())
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ var RunTopology = &cobra.Command{

defer shutdown.Shutdown()
var err error
apicontext.DefaultContext, _, err = duty.Start("canary-checker", duty.ClientOnly, duty.SkipMigrationByDefaultMode)
apicontext.DefaultContext, _, err = duty.Start(app, duty.ClientOnly, duty.SkipMigrationByDefaultMode)
if err != nil {
logger.Errorf(err.Error())
return
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ require (
github.com/elastic/go-elasticsearch/v8 v8.13.1
github.com/flanksource/artifacts v1.0.15
github.com/flanksource/commons v1.30.5
github.com/flanksource/duty v1.0.719
github.com/flanksource/duty v1.0.720
github.com/flanksource/gomplate/v3 v3.24.36
github.com/flanksource/is-healthy v1.0.33
github.com/flanksource/kommons v0.31.4
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -861,8 +861,8 @@ github.com/flanksource/artifacts v1.0.15 h1:3ImJr2y0ZCXw/QrMhfJJktAT7pYD3sMZR5ix
github.com/flanksource/artifacts v1.0.15/go.mod h1:qHVCnQu5k50aWNJ5UhpcAKEl7pAzqUrFFKGSm147G70=
github.com/flanksource/commons v1.30.5 h1:p8PXGiNt7SurBBh9K3ea8/ZrDvacXSYHJSs/cqJLDK8=
github.com/flanksource/commons v1.30.5/go.mod h1:26zdVkmMPsGpvfcsvst5WgsqcyRL8KqFNxkumagBN+A=
github.com/flanksource/duty v1.0.719 h1:Gqs9p0YI6YT+fMpvGrlgGZ2FX8qXbXeFJebx5LPO34s=
github.com/flanksource/duty v1.0.719/go.mod h1:1+h6/KDFTtkmqitfV8cuQJWhLakxTMvKqBjgqW0y2Ps=
github.com/flanksource/duty v1.0.720 h1:n3cHvYuEv0a2lhcPX/v7le0BYnq1MCQSpGILaqTFgH4=
github.com/flanksource/duty v1.0.720/go.mod h1:1+h6/KDFTtkmqitfV8cuQJWhLakxTMvKqBjgqW0y2Ps=
github.com/flanksource/gomplate/v3 v3.20.4/go.mod h1:27BNWhzzSjDed1z8YShO6W+z6G9oZXuxfNFGd/iGSdc=
github.com/flanksource/gomplate/v3 v3.24.36 h1:aJx4MEzPyzZyJ0VQI11SUFU7QaTUVcusCskwNsxexzM=
github.com/flanksource/gomplate/v3 v3.24.36/go.mod h1:7m2vIaBstYGIGoanua6Q3s0GbXs7KbBEVRHrEcdHxW4=
Expand Down

0 comments on commit af1f419

Please sign in to comment.