Skip to content

Commit

Permalink
fix: make leaderelection possible
Browse files Browse the repository at this point in the history
  • Loading branch information
Ajpantuso committed Nov 22, 2022
1 parent 2cb6d9f commit fd83a17
Show file tree
Hide file tree
Showing 14 changed files with 604 additions and 125 deletions.
117 changes: 82 additions & 35 deletions cmd/manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"errors"
"fmt"
"net/url"
"os"
"runtime"
"strconv"
Expand All @@ -19,7 +20,6 @@ import (
dvo_prom "github.com/app-sre/deployment-validation-operator/pkg/prometheus"
"github.com/app-sre/deployment-validation-operator/pkg/validations"
"github.com/app-sre/deployment-validation-operator/version"
"github.com/prometheus/client_golang/prometheus"

"github.com/go-logr/logr"
osappsv1 "github.com/openshift/api/apps/v1"
Expand All @@ -43,13 +43,18 @@ func main() {
os.Setenv(operatorNameEnvVar, dvconfig.OperatorName)

opts := options.Options{
MetricsPort: 8383,
MetricsPath: "metrics",
ProbeAddr: ":8081",
ConfigFile: "config/deployment-validation-operator-config.yaml",
MetricsBindAddr: ":8383",
MetricsPath: "metrics",
MetricsServiceName: "deployment-validation-operator-metrics",
ProbeAddr: ":8081",
ConfigFile: "config/deployment-validation-operator-config.yaml",
}

opts.Process()
if err := opts.Process(); err != nil {
fmt.Fprintf(os.Stdout, "processing options: %v\n", err)

os.Exit(1)
}

// Use a zap logr.Logger implementation. If none of the zap
// flags are configured (or if the zap flag set is not being
Expand Down Expand Up @@ -106,12 +111,8 @@ func setupManager(log logr.Logger, opts options.Options) (manager.Manager, error
return nil, fmt.Errorf("initializing manager: %w", err)
}

if err := mgr.AddHealthzCheck("health", healthz.Ping); err != nil {
return nil, fmt.Errorf("adding healthz check: %w", err)
}

if err := mgr.AddReadyzCheck("check", healthz.Ping); err != nil {
return nil, fmt.Errorf("adding readyz check: %w", err)
if err := setupProbes(mgr, opts); err != nil {
return nil, fmt.Errorf("setting up probes: %w", err)
}

log.Info("Registering Components")
Expand All @@ -126,29 +127,12 @@ func setupManager(log logr.Logger, opts options.Options) (manager.Manager, error
return nil, fmt.Errorf("initializing generic reconciler: %w", err)
}

if err = gr.AddToManager(mgr); err != nil {
if err := gr.AddToManager(mgr); err != nil {
return nil, fmt.Errorf("adding generic reconciler to manager: %w", err)
}

log.Info("Initializing Prometheus Registry")

reg := prometheus.NewRegistry()

log.Info(fmt.Sprintf("Initializing Prometheus metrics endpoint on %q", opts.MetricsEndpoint()))

srv, err := dvo_prom.NewServer(reg, opts.MetricsPath, fmt.Sprintf(":%d", opts.MetricsPort))
if err != nil {
return nil, fmt.Errorf("initializing metrics server: %w", err)
}

if err := mgr.Add(srv); err != nil {
return nil, fmt.Errorf("adding metrics server to manager: %w", err)
}

log.Info("Initializing Validation Engine")

if err := validations.InitializeValidationEngine(opts.ConfigFile, reg); err != nil {
return nil, fmt.Errorf("initializing validation engine: %w", err)
if err := setupComponents(log, mgr, opts); err != nil {
return nil, fmt.Errorf("setting up components: %w", err)
}

return mgr, nil
Expand Down Expand Up @@ -193,9 +177,13 @@ func getManagerOptions(scheme *k8sruntime.Scheme, opts options.Options) (manager
}

mgrOpts := manager.Options{
Namespace: ns,
HealthProbeBindAddress: opts.ProbeAddr,
MetricsBindAddress: "0", // disable controller-runtime managed prometheus endpoint
LeaderElection: opts.EnableLeaderElection,
LeaderElectionID: "23h85e23.deployment-validation-operator-lock",
LeaderElectionNamespace: opts.LeaderElectionNamespace,
LeaderElectionResourceLock: "leases",
Namespace: ns,
HealthProbeBindAddress: opts.ProbeAddr,
MetricsBindAddress: "0", // disable controller-runtime managed prometheus endpoint
// disable caching of everything
NewClient: newClient,
Scheme: scheme,
Expand Down Expand Up @@ -237,3 +225,62 @@ func kubeClientQPS() (float32, error) {
qps = float32(val)
return qps, err
}

func setupProbes(mgr manager.Manager, opts options.Options) error {
if err := mgr.AddHealthzCheck("health", healthz.Ping); err != nil {
return fmt.Errorf("adding healthz check: %w", err)
}

if err := mgr.AddReadyzCheck("check", healthz.Ping); err != nil {
return fmt.Errorf("adding readyz check: %w", err)
}

return nil
}

func setupComponents(log logr.Logger, mgr manager.Manager, opts options.Options) error {
log.Info("Initializing Prometheus Registry")

reg, err := dvo_prom.NewRegistry()
if err != nil {
return fmt.Errorf("initializing prometheus registry: %w", err)
}

log.Info(fmt.Sprintf("Initializing Prometheus metrics endpoint on %q", opts.MetricsEndpoint()))

svcURL := &url.URL{
Scheme: "http",
Host: opts.MetricsServiceName,
}
if parts := strings.Split(opts.MetricsBindAddr, ":"); len(parts) > 0 {
if len(parts) > 1 {
svcURL.Host += parts[len(parts)-1]
}
}

srv, err := dvo_prom.NewServer(reg,
dvo_prom.WithMetricsAddr(opts.MetricsBindAddr),
dvo_prom.WithMetricsPath(opts.MetricsPath),
dvo_prom.WithServiceURL(svcURL.String()),
)
if err != nil {
return fmt.Errorf("initializing metrics server: %w", err)
}

go func() {
<-mgr.Elected()
srv.Ready()
}()

if err := mgr.Add(srv); err != nil {
return fmt.Errorf("adding metrics server to manager: %w", err)
}

log.Info("Initializing Validation Engine")

if err := validations.InitializeValidationEngine(opts.ConfigFile, reg); err != nil {
return fmt.Errorf("initializing validation engine: %w", err)
}

return nil
}
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ require (
github.com/prometheus/common v0.32.1
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.9.0
github.com/stretchr/testify v1.7.0
github.com/stretchr/testify v1.7.1
go.uber.org/multierr v1.6.0
golang.org/x/crypto v0.0.0-20220427172511-eb4f295cb31f // indirect
golang.stackrox.io/kube-linter v0.0.0-20210928184316-5e1ead387f43
k8s.io/api v0.22.2
k8s.io/apimachinery v0.22.2
Expand Down
7 changes: 5 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1104,8 +1104,9 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s=
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
github.com/syndtr/gocapability v0.0.0-20170704070218-db04d3cc01c8/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww=
Expand Down Expand Up @@ -1262,8 +1263,9 @@ golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83/go.mod h1:jdWPYTVW3xRLrWP
golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a/go.mod h1:P+XmwS30IXTQdn5tA2iutPOUgjI07+tq3H3K9MVA1s8=
golang.org/x/crypto v0.0.0-20210817164053-32db794688a5/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 h1:7I4JAnoQBe7ZtJcBaYHi5UtiO8tQHbUSXxL+pnGRANg=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20220427172511-eb4f295cb31f h1:OeJjE6G4dgCY4PIXvIRQbE8+RX+uXZyGhUy/ksMGJoc=
golang.org/x/crypto v0.0.0-20220427172511-eb4f295cb31f/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
Expand Down Expand Up @@ -1363,6 +1365,7 @@ golang.org/x/net v0.0.0-20210503060351-7fd8e65b6420/go.mod h1:9nx3DQGgdP8bBQD5qx
golang.org/x/net v0.0.0-20210520170846-37e1c6afe023/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/net v0.0.0-20220722155237-a158d28d115b h1:PxfKdU9lEEDYjdIzOtC4qFWgkU2rGHdKlKowJSMN9h0=
Expand Down
92 changes: 92 additions & 0 deletions internal/handler/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package handler

import (
"net/http"
"strings"
"sync"

"github.com/go-logr/logr"
)

func NewSwitchableHandler(handA http.Handler, handB http.Handler, opts ...SwitchableHandlerOption) *SwitchableHandler {
var cfg SwitchableHandlerConfig

cfg.Option(opts...)
cfg.Default()

return &SwitchableHandler{
cfg: cfg,
handlerA: handA,
handlerB: handB,
}
}

type SwitchableHandler struct {
cfg SwitchableHandlerConfig
handlerA http.Handler
handlerB http.Handler

lock sync.Mutex
switched bool
}

func (h *SwitchableHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
log := h.cfg.Log.WithValues("remote", r.RemoteAddr)

if h.switched {
log.WithValues("handler", "B").Info("serving request")

h.handlerB.ServeHTTP(w, r)
} else {
log.WithValues("handler", "A").Info("serving request")

h.handlerA.ServeHTTP(w, r)
}
}

func (h *SwitchableHandler) Switch() {
h.lock.Lock()
defer h.lock.Unlock()

if h.switched {
h.switched = false
} else {
h.switched = true
}
}

type SwitchableHandlerConfig struct {
Log logr.Logger
}

func (c *SwitchableHandlerConfig) Option(opts ...SwitchableHandlerOption) {
for _, opt := range opts {
opt.ConfigureSwitchableHandler(c)
}
}

func (c *SwitchableHandlerConfig) Default() {
if c.Log == nil {
c.Log = logr.Discard()
}
}

type SwitchableHandlerOption interface {
ConfigureSwitchableHandler(*SwitchableHandlerConfig)
}

func StopAfterNForwards(n uint, h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
rawFwds, ok := r.Header["X-Forwarded-For"]
if !ok {
h.ServeHTTP(w, r)
}

splitFwds := strings.Split(strings.Join(rawFwds, ", "), ", ")
if uint(len(splitFwds)) >= n {
http.Error(w, "", http.StatusLoopDetected)
}

h.ServeHTTP(w, r)
})
}
Loading

0 comments on commit fd83a17

Please sign in to comment.