From eb1895579f1e88f9302938c5e4e7974f3e98ec4f Mon Sep 17 00:00:00 2001 From: Julien Levesy Date: Tue, 30 May 2023 13:14:30 +0200 Subject: [PATCH] Add healthcheck for the local container (#37) * feat(health): add an healthchecker * feat(readiness): add a timeout * feat(notifier): add a timeout * feat(cmd): wire notify and readiness timeouts * refactor(cmd): make config test suite less painful * feat(cmd): run healthcheck * feat(helm): run healtcheck * chore(cmd): improve flags description * chore(README): update reference --- README.md | 43 +++- cmd/config.go | 69 ++++++- cmd/config_test.go | 341 +++++++++++++++----------------- cmd/main.go | 72 ++++++- health/checker.go | 41 ++++ health/http.go | 115 +++++++++++ health/http_test.go | 128 ++++++++++++ helm/templates/statefulset.yaml | 1 + notifier/http.go | 9 +- notifier/notifier_test.go | 2 + readiness/http.go | 7 +- readiness/http_test.go | 2 +- 12 files changed, 620 insertions(+), 210 deletions(-) create mode 100644 health/checker.go create mode 100644 health/http.go create mode 100644 health/http_test.go diff --git a/README.md b/README.md index 0d9cd12..1170943 100644 --- a/README.md +++ b/README.md @@ -19,7 +19,11 @@ Prometheus (in agent mode) can be used to push metrics to a remote storage backe One approach of this problem is to have multiple agents pushing the same set of metrics to the storage backend. This requires to run some sort of metrics deduplication on the storage backend side to ensure correctness. -Using `prometheus-elector`, we can instead make sure that only one Prometheus instance has `remote_write` enabled at any point of time and guarantee a reasonable delay (seconds) for another instance to take over when leading instance becomes unavailable. It minimizes (avoids?) data loss and avoids running some expensive deduplication logic on the storage backend side. +Using `prometheus-elector`, we can instead make sure that only one Prometheus instance has `remote_write` enabled at any point of time and guarantee a reasonable delay (seconds) for another instance to take over when leading instance becomes unavailable. This brings the following advantages: + +- It minimizes (avoids?) data loss +- It avoids running some expensive deduplication logic on the storage backend side +- It is also much more efficient in term of resource usage (RAM and Network) because only one replica does the scrapping and pushing samples ![illustration](./docs/assets/agent-diagram.svg) @@ -99,6 +103,13 @@ As it is implemented, it relies on a few assumptions: - The `member_id` of the replica is the `pod` name. - The `.` domain name is resolvable via DNS. This is a property of statfulsets in Kubernetes, but it requires the cluster to have DNS support enabled. +#### Monitoring the Local Prometheus + + prometheus-elector also continuously monitors its local Prometheus instance to optimize its participation to the elader election to minimize downtime: + +- When starting, it waits for the local prometheus instance to be ready before taking part to the election +- It automatically leaves the election if the local Prometheus instance is not considered healthy.It then joins back as soon as the local instance goes back to an healthy state. + ### Installing Prometheus Elector You can find [an helm chart](./helm) in this repository, as well as [values for the HA agent example](./example/k8s/agent-values.yaml). @@ -117,7 +128,7 @@ If the leader proxy is enabled, all HTTP calls received on the port 9095 are for ``` -api-listen-address string - HTTP listen address to use for the API. (default ":9095") + HTTP listen address for the API. (default ":9095") -api-proxy-enabled Turn on leader proxy on the API -api-proxy-prometheus-local-port uint @@ -130,6 +141,16 @@ If the leader proxy is enabled, all HTTP calls received on the port 9095 are for Grace delay to apply when shutting down the API server (default 15s) -config string Path of the prometheus-elector configuration + -healthcheck-failure-threshold int + Amount of consecutives failures to consider Prometheus unhealthy (default 3) + -healthcheck-http-url string + URL to the Prometheus health endpoint + -healthcheck-period duration + Healthcheck period (default 5s) + -healthcheck-success-threshold int + Amount of consecutives success to consider Prometheus healthy (default 3) + -healthcheck-timeout duration + HTTP timeout for healthchecks (default 2s) -init Only init the prometheus config file -kubeconfig string @@ -137,27 +158,31 @@ If the leader proxy is enabled, all HTTP calls received on the port 9095 are for -lease-duration duration Duration of a lease, client wait the full duration of a lease before trying to take it over (default 15s) -lease-name string - Name of lease lock + Name of lease resource -lease-namespace string - Name of lease lock namespace + Name of lease resource namespace -lease-renew-deadline duration Maximum duration spent trying to renew the lease (default 10s) -lease-retry-period duration Delay between two attempts of taking/renewing the lease (default 2s) -notify-http-method string - HTTP method to use when sending the reload config request. (default "POST") + HTTP method to use when sending the reload config request (default "POST") -notify-http-url string URL to the reload configuration endpoint -notify-retry-delay duration - How much time to wait between two notify retries. (default 10s) + Delay between two notify retries. (default 10s) -notify-retry-max-attempts int - How many times to retry notifying prometheus on failure. (default 5) + How many retries for configuration update (default 5) + -notify-timeout duration + HTTP timeout for notify retries. (default 2s) -output string - Path to write the active prometheus configuration + Path to write the Prometheus configuration -readiness-http-url string - URL to Prometheus ready endpoint + URL to the Prometheus ready endpoint -readiness-poll-period duration Poll period prometheus readiness check (default 5s) + -readiness-timeout duration + HTTP timeout for readiness calls (default 2s) -runtime-metrics Export go runtime metrics ``` diff --git a/cmd/config.go b/cmd/config.go index 83e4e40..17dd2f2 100644 --- a/cmd/config.go +++ b/cmd/config.go @@ -36,10 +36,19 @@ type cliConfig struct { notifyHTTPMethod string notifyRetryMaxAttempts int notifyRetryDelay time.Duration + notifyTimeout time.Duration // How to wait for prometheus to be ready. readinessHTTPURL string readinessPollPeriod time.Duration + readinessTimeout time.Duration + + // How to monitor prometheus health. + healthcheckHTTPURL string + healthcheckPeriod time.Duration + healthcheckTimeout time.Duration + healthcheckSuccessThreshold int + healthcheckFailureThreshold int // API setup apiListenAddr string @@ -107,10 +116,38 @@ func (c *cliConfig) validateRuntimeConfig() error { return errors.New("invalid notify-retry-delay, should be >= 1") } + if c.notifyTimeout < 1 { + return errors.New("invalid notify-timeout, should be >= 1") + } + if c.readinessPollPeriod < 1 { return errors.New("invalid readiness-poll-period, should be >= 1") } + if c.readinessTimeout < 1 { + return errors.New("invalid readiness-timeout, should be >= 1") + } + + if c.healthcheckPeriod < 1 { + return errors.New("invalid healthcheck-period, should be >= 1") + } + + if c.healthcheckTimeout < 1 { + return errors.New("invalid healthcheck-timeout, should be >= 1") + } + + if c.healthcheckSuccessThreshold < 1 { + return errors.New("invalid healthcheck-success-threshold, should be >= 1") + } + + if c.healthcheckFailureThreshold < 1 { + return errors.New("invalid healthcheck-failure-threshold, should be >= 1") + } + + if c.readinessTimeout < 1 { + return errors.New("invalid readiness-timeout, should be >= 1") + } + if c.apiListenAddr == "" { return errors.New("missing api-listen-address") } @@ -137,22 +174,36 @@ func (c *cliConfig) validateRuntimeConfig() error { } func (c *cliConfig) setupFlags() { - flag.StringVar(&c.leaseName, "lease-name", "", "Name of lease lock") - flag.StringVar(&c.leaseNamespace, "lease-namespace", "", "Name of lease lock namespace") + flag.BoolVar(&c.init, "init", false, "Only init the prometheus config file") + + flag.StringVar(&c.leaseName, "lease-name", "", "Name of lease resource") + flag.StringVar(&c.leaseNamespace, "lease-namespace", "", "Name of lease resource namespace") flag.DurationVar(&c.leaseDuration, "lease-duration", 15*time.Second, "Duration of a lease, client wait the full duration of a lease before trying to take it over") flag.DurationVar(&c.leaseRenewDeadline, "lease-renew-deadline", 10*time.Second, "Maximum duration spent trying to renew the lease") flag.DurationVar(&c.leaseRetryPeriod, "lease-retry-period", 2*time.Second, "Delay between two attempts of taking/renewing the lease") + flag.StringVar(&c.kubeConfigPath, "kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.") + flag.StringVar(&c.configPath, "config", "", "Path of the prometheus-elector configuration") - flag.StringVar(&c.outputPath, "output", "", "Path to write the active prometheus configuration") - flag.StringVar(&c.readinessHTTPURL, "readiness-http-url", "", "URL to Prometheus ready endpoint") + flag.StringVar(&c.outputPath, "output", "", "Path to write the Prometheus configuration") + + flag.StringVar(&c.readinessHTTPURL, "readiness-http-url", "", "URL to the Prometheus ready endpoint") flag.DurationVar(&c.readinessPollPeriod, "readiness-poll-period", 5*time.Second, "Poll period prometheus readiness check") + flag.DurationVar(&c.readinessTimeout, "readiness-timeout", 2*time.Second, "HTTP timeout for readiness calls") + + flag.StringVar(&c.healthcheckHTTPURL, "healthcheck-http-url", "", "URL to the Prometheus health endpoint") + flag.DurationVar(&c.healthcheckPeriod, "healthcheck-period", 5*time.Second, "Healthcheck period") + flag.DurationVar(&c.healthcheckTimeout, "healthcheck-timeout", 2*time.Second, "HTTP timeout for healthchecks") + flag.IntVar(&c.healthcheckSuccessThreshold, "healthcheck-success-threshold", 3, "Amount of consecutives success to consider Prometheus healthy") + flag.IntVar(&c.healthcheckFailureThreshold, "healthcheck-failure-threshold", 3, "Amount of consecutives failures to consider Prometheus unhealthy") + flag.StringVar(&c.notifyHTTPURL, "notify-http-url", "", "URL to the reload configuration endpoint") - flag.StringVar(&c.notifyHTTPMethod, "notify-http-method", http.MethodPost, "HTTP method to use when sending the reload config request.") - flag.IntVar(&c.notifyRetryMaxAttempts, "notify-retry-max-attempts", 5, "How many times to retry notifying prometheus on failure.") - flag.DurationVar(&c.notifyRetryDelay, "notify-retry-delay", 10*time.Second, "How much time to wait between two notify retries.") - flag.BoolVar(&c.init, "init", false, "Only init the prometheus config file") - flag.StringVar(&c.apiListenAddr, "api-listen-address", ":9095", "HTTP listen address to use for the API.") + flag.StringVar(&c.notifyHTTPMethod, "notify-http-method", http.MethodPost, "HTTP method to use when sending the reload config request") + flag.IntVar(&c.notifyRetryMaxAttempts, "notify-retry-max-attempts", 5, "How many retries for configuration update") + flag.DurationVar(&c.notifyRetryDelay, "notify-retry-delay", 10*time.Second, "Delay between two notify retries.") + flag.DurationVar(&c.notifyTimeout, "notify-timeout", 2*time.Second, "HTTP timeout for notify retries.") + + flag.StringVar(&c.apiListenAddr, "api-listen-address", ":9095", "HTTP listen address for the API.") flag.DurationVar(&c.apiShutdownGraceDelay, "api-shutdown-grace-delay", 15*time.Second, "Grace delay to apply when shutting down the API server") flag.BoolVar(&c.apiProxyEnabled, "api-proxy-enabled", false, "Turn on leader proxy on the API") flag.UintVar(&c.apiProxyPrometheusLocalPort, "api-proxy-prometheus-local-port", 9090, "Listening port of the local prometheus instance") diff --git a/cmd/config_test.go b/cmd/config_test.go index bd5df03..85e1bfb 100644 --- a/cmd/config_test.go +++ b/cmd/config_test.go @@ -50,271 +50,244 @@ func TestCliConfig_ValidateInitConfig(t *testing.T) { } } +var goodConfig = cliConfig{ + leaseName: "lease", + leaseNamespace: "namespace", + memberID: "bloupi", + notifyHTTPURL: "http://reload.com", + notifyHTTPMethod: http.MethodPost, + notifyRetryMaxAttempts: 1, + notifyRetryDelay: 10 * time.Second, + notifyTimeout: time.Second, + readinessPollPeriod: 10 * time.Second, + readinessTimeout: time.Second, + healthcheckPeriod: 10 * time.Second, + healthcheckTimeout: 10 * time.Second, + healthcheckSuccessThreshold: 3, + healthcheckFailureThreshold: 3, + apiListenAddr: ":5678", + apiShutdownGraceDelay: 15 * time.Second, +} + +var goodConfigWithProxy = cliConfig{ + leaseName: "lease", + leaseNamespace: "namespace", + memberID: "bloupi", + notifyHTTPURL: "http://reload.com", + notifyHTTPMethod: http.MethodPost, + notifyRetryMaxAttempts: 1, + notifyRetryDelay: 10 * time.Second, + notifyTimeout: time.Second, + readinessPollPeriod: 10 * time.Second, + readinessTimeout: time.Second, + healthcheckPeriod: 10 * time.Second, + healthcheckTimeout: 10 * time.Second, + healthcheckSuccessThreshold: 3, + healthcheckFailureThreshold: 3, + apiListenAddr: ":5678", + apiShutdownGraceDelay: 15 * time.Second, + apiProxyEnabled: true, + apiProxyPrometheusLocalPort: 9095, + apiProxyPrometheusRemotePort: 9090, + apiProxyPrometheusServiceName: "prometheus", +} + func TestCliConfig_ValidateRuntimeConfig(t *testing.T) { hostname, err := os.Hostname() require.NoError(t, err) for _, testCase := range []struct { desc string - cfg cliConfig + baseConfig cliConfig + patchConfig func(c *cliConfig) wantMemberID string wantErr error }{ { - desc: "ok", - cfg: cliConfig{ - leaseName: "lease", - leaseNamespace: "namespace", - memberID: "bloupi", - notifyHTTPURL: "http://reload.com", - notifyHTTPMethod: http.MethodPost, - notifyRetryMaxAttempts: 1, - notifyRetryDelay: 10 * time.Second, - readinessPollPeriod: 10 * time.Second, - apiListenAddr: ":5678", - apiShutdownGraceDelay: 15 * time.Second, - }, + desc: "ok", + baseConfig: goodConfig, + patchConfig: func(c *cliConfig) {}, wantMemberID: "bloupi", wantErr: nil, }, { - desc: "falls back to hostname if memberID is empty", - cfg: cliConfig{ - leaseName: "lease", - leaseNamespace: "namespace", - memberID: "", - notifyHTTPURL: "http://reload.com", - notifyHTTPMethod: http.MethodPost, - notifyRetryMaxAttempts: 1, - notifyRetryDelay: 10 * time.Second, - readinessPollPeriod: 10 * time.Second, - apiListenAddr: ":5678", - apiShutdownGraceDelay: 15 * time.Second, + desc: "falls back to hostname if memberID is empty", + baseConfig: goodConfig, + patchConfig: func(c *cliConfig) { + c.memberID = "" }, wantMemberID: hostname, wantErr: nil, }, { - desc: "missing leaseName", - cfg: cliConfig{ - leaseName: "", - leaseNamespace: "namespace", - notifyHTTPURL: "http://reload.com", - notifyHTTPMethod: http.MethodPost, - notifyRetryMaxAttempts: 1, - notifyRetryDelay: 10 * time.Second, - readinessPollPeriod: 10 * time.Second, - apiListenAddr: ":5678", - apiShutdownGraceDelay: 15 * time.Second, + desc: "missing leaseName", + baseConfig: goodConfig, + patchConfig: func(c *cliConfig) { + c.leaseName = "" }, wantErr: errors.New("missing lease-name flag"), }, { - desc: "missing lease namespace", - cfg: cliConfig{ - leaseName: "lease", - leaseNamespace: "", - notifyHTTPURL: "http://reload.com", - notifyHTTPMethod: http.MethodPost, - notifyRetryMaxAttempts: 1, - notifyRetryDelay: 10 * time.Second, - readinessPollPeriod: 10 * time.Second, - apiListenAddr: ":5678", - apiShutdownGraceDelay: 15 * time.Second, + desc: "missing lease namespace", + baseConfig: goodConfig, + patchConfig: func(c *cliConfig) { + c.leaseNamespace = "" }, wantErr: errors.New("missing lease-namespace flag"), }, { - desc: "missing lease notify-http-url", - cfg: cliConfig{ - leaseName: "lease", - leaseNamespace: "namespace", - notifyHTTPURL: "", - notifyHTTPMethod: http.MethodPost, - notifyRetryMaxAttempts: 1, - notifyRetryDelay: 10 * time.Second, - readinessPollPeriod: 10 * time.Second, - apiListenAddr: ":5678", - apiShutdownGraceDelay: 15 * time.Second, + desc: "missing lease notify-http-url", + baseConfig: goodConfig, + patchConfig: func(c *cliConfig) { + c.notifyHTTPURL = "" }, wantErr: errors.New("missing notify-http-url flag"), }, { - desc: "missing lease notify-http-method", - cfg: cliConfig{ - leaseName: "lease", - leaseNamespace: "namespace", - notifyHTTPURL: "http://reload.com", - notifyHTTPMethod: "", - notifyRetryMaxAttempts: 1, - notifyRetryDelay: 10 * time.Second, - readinessPollPeriod: 10 * time.Second, - apiListenAddr: ":5678", - apiShutdownGraceDelay: 15 * time.Second, + desc: "missing lease notify-http-method", + baseConfig: goodConfig, + patchConfig: func(c *cliConfig) { + c.notifyHTTPMethod = "" }, wantErr: errors.New("invalid notify-http-method"), }, { - desc: "invalid lease notify-http-method", - cfg: cliConfig{ - leaseName: "lease", - leaseNamespace: "namespace", - notifyHTTPURL: "http://reload.com", - notifyHTTPMethod: "///3eee", - notifyRetryMaxAttempts: 1, - notifyRetryDelay: 10 * time.Second, - readinessPollPeriod: 10 * time.Second, - apiListenAddr: ":5678", - apiShutdownGraceDelay: 15 * time.Second, + desc: "invalid notify http-method", + baseConfig: goodConfig, + patchConfig: func(c *cliConfig) { + c.notifyHTTPMethod = "///3eee" }, wantErr: errors.New("invalid notify-http-method"), }, { - desc: "invalid retry-max-attempts", - cfg: cliConfig{ - leaseName: "lease", - leaseNamespace: "namespace", - notifyHTTPURL: "http://reload.com", - notifyHTTPMethod: http.MethodPost, - notifyRetryMaxAttempts: -1, - notifyRetryDelay: 10 * time.Second, - readinessPollPeriod: 10 * time.Second, - apiListenAddr: ":5678", - apiShutdownGraceDelay: 15 * time.Second, + desc: "invalid notify retry-max-attempts", + baseConfig: goodConfig, + patchConfig: func(c *cliConfig) { + c.notifyRetryMaxAttempts = -1 }, wantErr: errors.New("invalid notify-retry-max-attempts, should be >= 1"), }, { - desc: "invalid retry-delay", - cfg: cliConfig{ - leaseName: "lease", - leaseNamespace: "namespace", - notifyHTTPURL: "http://reload.com", - notifyHTTPMethod: http.MethodPost, - notifyRetryMaxAttempts: 1, - notifyRetryDelay: -10 * time.Second, - readinessPollPeriod: 10 * time.Second, - apiListenAddr: ":5678", - apiShutdownGraceDelay: 15 * time.Second, + desc: "invalid notify retry-delay", + baseConfig: goodConfig, + patchConfig: func(c *cliConfig) { + c.notifyRetryDelay = -10 * time.Second }, wantErr: errors.New("invalid notify-retry-delay, should be >= 1"), }, { - desc: "missing api-listen-address", - cfg: cliConfig{ - leaseName: "lease", - leaseNamespace: "namespace", - notifyHTTPURL: "http://reload.com", - notifyHTTPMethod: http.MethodPost, - notifyRetryMaxAttempts: 1, - notifyRetryDelay: 10 * time.Second, - readinessPollPeriod: 10 * time.Second, - apiListenAddr: "", - apiShutdownGraceDelay: 15 * time.Second, + desc: "invalid notify timeout", + baseConfig: goodConfig, + patchConfig: func(c *cliConfig) { + c.notifyTimeout = -10 * time.Second + }, + wantErr: errors.New("invalid notify-timeout, should be >= 1"), + }, + { + desc: "invalid healthcheck period", + baseConfig: goodConfig, + patchConfig: func(c *cliConfig) { + c.healthcheckPeriod = -10 * time.Second + }, + wantErr: errors.New("invalid healthcheck-period, should be >= 1"), + }, + { + desc: "invalid healthcheck timeout", + baseConfig: goodConfig, + patchConfig: func(c *cliConfig) { + c.healthcheckTimeout = -10 * time.Second + }, + wantErr: errors.New("invalid healthcheck-timeout, should be >= 1"), + }, + { + desc: "invalid healthcheck success threshold", + baseConfig: goodConfig, + patchConfig: func(c *cliConfig) { + c.healthcheckSuccessThreshold = 0 + }, + wantErr: errors.New("invalid healthcheck-success-threshold, should be >= 1"), + }, + { + desc: "invalid healthcheck failure threshold", + baseConfig: goodConfig, + patchConfig: func(c *cliConfig) { + c.healthcheckFailureThreshold = 0 + }, + wantErr: errors.New("invalid healthcheck-failure-threshold, should be >= 1"), + }, + { + desc: "missing api-listen-address", + baseConfig: goodConfigWithProxy, + patchConfig: func(c *cliConfig) { + c.apiListenAddr = "" }, wantErr: errors.New("missing api-listen-address"), }, { - desc: "invalid api-shutdown-grace-delay", - cfg: cliConfig{ - leaseName: "lease", - leaseNamespace: "namespace", - notifyHTTPURL: "http://reload.com", - notifyHTTPMethod: http.MethodPost, - notifyRetryMaxAttempts: 1, - notifyRetryDelay: 10 * time.Second, - readinessPollPeriod: 10 * time.Second, - apiListenAddr: ":5678", - apiShutdownGraceDelay: -15 * time.Second, + desc: "invalid api-shutdown-grace-delay", + baseConfig: goodConfig, + patchConfig: func(c *cliConfig) { + c.apiShutdownGraceDelay = -15 * time.Second }, wantErr: errors.New("invalid api-shudown-grace-delay, should be >= 0"), }, { - desc: "invalid readiness poll period", - cfg: cliConfig{ - leaseName: "lease", - leaseNamespace: "namespace", - notifyHTTPURL: "http://reload.com", - notifyHTTPMethod: http.MethodPost, - notifyRetryMaxAttempts: 1, - notifyRetryDelay: 10 * time.Second, - readinessPollPeriod: -10 * time.Second, - apiListenAddr: ":5678", - apiShutdownGraceDelay: 15 * time.Second, + desc: "invalid readiness poll period", + baseConfig: goodConfig, + patchConfig: func(c *cliConfig) { + c.readinessPollPeriod = -10 * time.Second }, wantErr: errors.New("invalid readiness-poll-period, should be >= 1"), }, - { - desc: "proxy enabled invalid prometheus local port", - cfg: cliConfig{ - leaseName: "lease", - leaseNamespace: "namespace", - notifyHTTPURL: "http://reload.com", - notifyHTTPMethod: http.MethodPost, - notifyRetryMaxAttempts: 1, - notifyRetryDelay: 10 * time.Second, - readinessPollPeriod: 10 * time.Second, - apiListenAddr: ":5678", - apiShutdownGraceDelay: 15 * time.Second, - apiProxyEnabled: true, - apiProxyPrometheusLocalPort: 0, - apiProxyPrometheusRemotePort: 9090, - apiProxyPrometheusServiceName: "prometheus", + desc: "invalid readiness timeout", + baseConfig: goodConfig, + patchConfig: func(c *cliConfig) { + c.readinessTimeout = -10 * time.Second + }, + wantErr: errors.New("invalid readiness-timeout, should be >= 1"), + }, + { + desc: "proxy enabled invalid prometheus local port", + baseConfig: goodConfigWithProxy, + patchConfig: func(c *cliConfig) { + c.apiProxyPrometheusLocalPort = 0 }, wantErr: errors.New("invalid api-proxy-prometheus-local-port, should be > 0"), }, { - desc: "proxy enabled invalid prometheus remote port", - cfg: cliConfig{ - leaseName: "lease", - leaseNamespace: "namespace", - notifyHTTPURL: "http://reload.com", - notifyHTTPMethod: http.MethodPost, - notifyRetryMaxAttempts: 1, - notifyRetryDelay: 10 * time.Second, - readinessPollPeriod: 10 * time.Second, - apiListenAddr: ":5678", - apiShutdownGraceDelay: 15 * time.Second, - apiProxyEnabled: true, - apiProxyPrometheusLocalPort: 9090, - apiProxyPrometheusRemotePort: 0, - apiProxyPrometheusServiceName: "prometheus", + desc: "proxy enabled invalid prometheus remote port", + baseConfig: goodConfigWithProxy, + patchConfig: func(c *cliConfig) { + c.apiProxyPrometheusRemotePort = 0 }, wantErr: errors.New("invalid api-proxy-prometheus-remote-port, should be > 0"), }, { - desc: "proxy enabled missing prometheus service name", - cfg: cliConfig{ - leaseName: "lease", - leaseNamespace: "namespace", - notifyHTTPURL: "http://reload.com", - notifyHTTPMethod: http.MethodPost, - notifyRetryMaxAttempts: 1, - notifyRetryDelay: 10 * time.Second, - readinessPollPeriod: 10 * time.Second, - apiListenAddr: ":5678", - apiShutdownGraceDelay: 15 * time.Second, - apiProxyEnabled: true, - apiProxyPrometheusLocalPort: 9090, - apiProxyPrometheusRemotePort: 9090, - apiProxyPrometheusServiceName: "", + desc: "proxy enabled missing prometheus service name", + baseConfig: goodConfigWithProxy, + patchConfig: func(c *cliConfig) { + c.apiProxyPrometheusServiceName = "" }, wantErr: errors.New("missing api-proxy-prometheus-service-name"), }, } { t.Run(testCase.desc, func(t *testing.T) { + cfg := testCase.baseConfig + + testCase.patchConfig(&cfg) + assert.Equal( t, testCase.wantErr, - testCase.cfg.validateRuntimeConfig(), + cfg.validateRuntimeConfig(), ) if testCase.wantMemberID != "" { assert.Equal( t, testCase.wantMemberID, - testCase.cfg.memberID, + cfg.memberID, ) } diff --git a/cmd/main.go b/cmd/main.go index 1348855..c87ba68 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -18,6 +18,7 @@ import ( "github.com/jlevesy/prometheus-elector/api" "github.com/jlevesy/prometheus-elector/config" "github.com/jlevesy/prometheus-elector/election" + "github.com/jlevesy/prometheus-elector/health" "github.com/jlevesy/prometheus-elector/notifier" "github.com/jlevesy/prometheus-elector/readiness" "github.com/jlevesy/prometheus-elector/watcher" @@ -29,6 +30,9 @@ func main() { var ( cfg = newCLIConfig() ctx, cancel = signal.NotifyContext(context.Background(), os.Interrupt) + + promReady = make(chan struct{}) + grp, grpCtx = errgroup.WithContext(ctx) ) defer cancel() @@ -69,6 +73,7 @@ func main() { notifier.NewHTTP( cfg.notifyHTTPURL, cfg.notifyHTTPMethod, + cfg.notifyTimeout, ), ), cfg.notifyRetryMaxAttempts, @@ -112,8 +117,6 @@ func main() { OnStoppedLeading: func() { klog.Info("Stopped leading, applying follower configuration.") - ctx := context.Background() - if err := reconciller.Reconcile(ctx); err != nil { klog.ErrorS(err, "Failed to reconcile configurations") return @@ -169,17 +172,76 @@ func main() { var readinessWaiter readiness.Waiter = readiness.NoopWaiter{} if cfg.readinessHTTPURL != "" { - readinessWaiter = readiness.NewHTTP(cfg.readinessHTTPURL, cfg.readinessPollPeriod) + readinessWaiter = readiness.NewHTTP( + cfg.readinessHTTPURL, + cfg.readinessPollPeriod, + cfg.readinessTimeout, + ) } - grp, grpCtx := errgroup.WithContext(ctx) + var healthChecker health.Checker = health.NoopChecker{} + + if cfg.healthcheckHTTPURL != "" { + healthChecker = health.NewHTTPChecker( + health.HTTPCheckConfig{ + URL: cfg.healthcheckHTTPURL, + Period: cfg.healthcheckPeriod, + Timeout: cfg.healthcheckTimeout, + SuccessThreshold: cfg.healthcheckSuccessThreshold, + FailureThreshold: cfg.healthcheckFailureThreshold, + }, + health.CallbacksFuncs{ + OnHealthyFunc: func() error { + klog.Info("Prometheus is healthy, joining the election") + err := elector.Start(grpCtx) + if errors.Is(err, election.ErrAlreadyRunning) { + klog.Info("Already joined the election, ignoring.") + return nil + } + + return err + }, + OnUnHealthyFunc: func() error { + klog.Info("Prometheus is unhealthy, leaving the election") + err := elector.Stop(grpCtx) + if errors.Is(err, election.ErrNotRunning) { + klog.Info("Already left the election, ignoring.") + return nil + } + + return err + }, + }, + ) + + } grp.Go(func() error { if err := readinessWaiter.Wait(grpCtx); err != nil { return err } - return nil + // Signal whoever is waiting that prometheus is ready. + close(promReady) + + // Join election as soon as we start. + err := elector.Start(grpCtx) + if errors.Is(err, election.ErrAlreadyRunning) { + return nil + } + + return err + }) + + grp.Go(func() error { + // Wait for the app to be ready before starting the healthcheck. + select { + case <-grpCtx.Done(): + return nil + case <-promReady: + } + + return healthChecker.Check(grpCtx) }) grp.Go(func() error { return watcher.Watch(grpCtx) }) diff --git a/health/checker.go b/health/checker.go new file mode 100644 index 0000000..70fc95f --- /dev/null +++ b/health/checker.go @@ -0,0 +1,41 @@ +package health + +import ( + "context" +) + +type CallbacksFuncs struct { + OnHealthyFunc func() error + OnUnHealthyFunc func() error +} + +func (c CallbacksFuncs) OnHealthy() error { + if c.OnHealthyFunc == nil { + return nil + } + + return c.OnHealthyFunc() +} + +func (c CallbacksFuncs) OnUnHealthy() error { + if c.OnUnHealthyFunc == nil { + return nil + } + + return c.OnUnHealthyFunc() +} + +type Callbacks interface { + OnHealthy() error + OnUnHealthy() error +} + +type Checker interface { + Check(ctx context.Context) error +} + +type NoopChecker struct{} + +func (n NoopChecker) Check(context.Context) error { + return nil +} diff --git a/health/http.go b/health/http.go new file mode 100644 index 0000000..89c8f6e --- /dev/null +++ b/health/http.go @@ -0,0 +1,115 @@ +package health + +import ( + "context" + "errors" + "net/http" + "time" + + "k8s.io/klog/v2" +) + +type HTTPCheckConfig struct { + URL string + Period time.Duration + Timeout time.Duration + SuccessThreshold int + FailureThreshold int +} + +type HTTPChecker struct { + config HTTPCheckConfig + callbacks Callbacks + + httpClient *http.Client +} + +func NewHTTPChecker(cfg HTTPCheckConfig, cbs Callbacks) *HTTPChecker { + return &HTTPChecker{ + callbacks: cbs, + config: cfg, + httpClient: http.DefaultClient, + } +} + +func (c *HTTPChecker) Check(ctx context.Context) error { + klog.InfoS("Starting healtcheck", "url", c.config.URL, "period", c.config.Period) + ticker := time.NewTicker(c.config.Period) + defer ticker.Stop() + + var status checkState + + for { + select { + case <-ticker.C: + ok, err := c.doCheck(ctx) + if errors.Is(err, context.Canceled) { + return nil + } + + if err != nil { + klog.ErrorS(err, "unable to perform health check, exiting") + return err + } + + if ok { + status.successCount++ + status.failureCount = 0 + } else { + status.failureCount++ + status.successCount = 0 + } + + if status.successCount == c.config.SuccessThreshold { + if err := c.callbacks.OnHealthy(); err != nil { + klog.ErrorS(err, "Unable to notify healthiness") + } + } + + if status.failureCount == c.config.FailureThreshold { + if err := c.callbacks.OnUnHealthy(); err != nil { + klog.ErrorS(err, "Unable to notify unhealthiness") + } + } + + case <-ctx.Done(): + err := ctx.Err() + if errors.Is(err, context.Canceled) { + return nil + } + + return err + } + + } +} + +func (c *HTTPChecker) doCheck(ctx context.Context) (bool, error) { + ctx, cancel := context.WithTimeout(ctx, c.config.Timeout) + defer cancel() + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.config.URL, http.NoBody) + if err != nil { + return false, err + } + + resp, err := c.httpClient.Do(req) + if err != nil { + klog.ErrorS(err, "unable to query the health endpoint") + return false, nil + } + + // [200, 300[ then OK. + if resp.StatusCode >= http.StatusOK && resp.StatusCode < http.StatusMultipleChoices { + return true, nil + } + + klog.InfoS("Prometheus failed an healthcheck", "code", resp.StatusCode) + + return false, nil +} + +type checkState struct { + successCount int + failureCount int +} diff --git a/health/http_test.go b/health/http_test.go new file mode 100644 index 0000000..8bf1d81 --- /dev/null +++ b/health/http_test.go @@ -0,0 +1,128 @@ +package health_test + +import ( + "context" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/jlevesy/prometheus-elector/health" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestHTTPChecker(t *testing.T) { + for _, tt := range []struct { + desc string + sequence []int + + successThreshold int + failureThreshold int + wantOnHealthyCalled int + wantOnUnhealthyCalled int + }{ + { + desc: "calls healty", + sequence: []int{ + http.StatusOK, + http.StatusOK, + http.StatusOK, + http.StatusOK, + }, + wantOnHealthyCalled: 1, + }, + { + desc: "calls unhealthy", + sequence: []int{ + http.StatusInternalServerError, + http.StatusInternalServerError, + http.StatusInternalServerError, + http.StatusInternalServerError, + }, + wantOnUnhealthyCalled: 1, + }, + { + desc: "resets state", + sequence: []int{ + http.StatusOK, + http.StatusInternalServerError, + http.StatusOK, + http.StatusOK, + http.StatusInternalServerError, + http.StatusInternalServerError, + http.StatusOK, + http.StatusOK, + http.StatusOK, + http.StatusOK, + http.StatusInternalServerError, + http.StatusInternalServerError, + http.StatusInternalServerError, + http.StatusInternalServerError, + }, + wantOnUnhealthyCalled: 1, + wantOnHealthyCalled: 1, + }, + } { + t.Run(tt.desc, func(t *testing.T) { + var ( + callCount int + sequenceDone = make(chan struct{}) + srv = httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { + if callCount == len(tt.sequence)-1 { + close(sequenceDone) + rw.WriteHeader(http.StatusOK) + return + } + + rw.WriteHeader(tt.sequence[callCount]) + callCount++ + })) + ) + defer srv.Close() + + var ( + onHealthyCalled int + onUnhealthyCalled int + + callbacks = health.CallbacksFuncs{ + OnHealthyFunc: func() error { + onHealthyCalled++ + return nil + }, + OnUnHealthyFunc: func() error { + onUnhealthyCalled++ + return nil + }, + } + + config = health.HTTPCheckConfig{ + URL: srv.URL + "/-/health", + Period: time.Millisecond, + Timeout: time.Second, + SuccessThreshold: 3, + FailureThreshold: 3, + } + + checkDone = make(chan struct{}) + checker = health.NewHTTPChecker(config, callbacks) + ) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go func() { + err := checker.Check(ctx) + require.NoError(t, err) + close(checkDone) + }() + + <-sequenceDone + cancel() + <-checkDone + + assert.Equal(t, tt.wantOnHealthyCalled, onHealthyCalled) + assert.Equal(t, tt.wantOnUnhealthyCalled, onUnhealthyCalled) + }) + } +} diff --git a/helm/templates/statefulset.yaml b/helm/templates/statefulset.yaml index 9d9881d..dd20bd1 100644 --- a/helm/templates/statefulset.yaml +++ b/helm/templates/statefulset.yaml @@ -53,6 +53,7 @@ spec: - -output=/etc/runtime/prometheus.yaml - -notify-http-url=http://127.0.0.1:9090/-/reload - -readiness-http-url=http://127.0.0.1:9090/-/ready + - -healthcheck-http-url=http://127.0.0.1:9090/-/healthy - -api-listen-address=:9095 {{- if .Values.enableLeaderProxy }} - -api-proxy-enabled diff --git a/notifier/http.go b/notifier/http.go index 92c31f3..18bdc09 100644 --- a/notifier/http.go +++ b/notifier/http.go @@ -4,24 +4,31 @@ import ( "context" "fmt" "net/http" + "time" ) type httpNotifier struct { method string url string + timeout time.Duration + httpClient *http.Client } -func NewHTTP(url, method string) Notifier { +func NewHTTP(url, method string, timeout time.Duration) Notifier { return &httpNotifier{ url: url, method: method, + timeout: timeout, httpClient: http.DefaultClient, } } func (n *httpNotifier) Notify(ctx context.Context) error { + ctx, cancel := context.WithTimeout(ctx, n.timeout) + defer cancel() + req, err := http.NewRequestWithContext(ctx, n.method, n.url, http.NoBody) if err != nil { return err diff --git a/notifier/notifier_test.go b/notifier/notifier_test.go index 8ee19b8..0f185cb 100644 --- a/notifier/notifier_test.go +++ b/notifier/notifier_test.go @@ -35,6 +35,7 @@ func TestHTTPNotifierWithRetries(t *testing.T) { notifier.NewHTTP( srv.URL, http.MethodPost, + time.Second, ), ), 10, @@ -78,6 +79,7 @@ func TestHTTPNotifierExhaustRetries(t *testing.T) { notifier.NewHTTP( srv.URL, http.MethodPost, + time.Second, ), 10, 0*time.Second, diff --git a/readiness/http.go b/readiness/http.go index b575653..7594be5 100644 --- a/readiness/http.go +++ b/readiness/http.go @@ -11,13 +11,15 @@ import ( type httpWaiter struct { url string pollPeriod time.Duration + timeout time.Duration httpClient *http.Client } -func NewHTTP(url string, pollPeriod time.Duration) Waiter { +func NewHTTP(url string, pollPeriod, timeout time.Duration) Waiter { return &httpWaiter{ url: url, + timeout: timeout, pollPeriod: pollPeriod, httpClient: http.DefaultClient, } @@ -48,6 +50,9 @@ func (w *httpWaiter) Wait(ctx context.Context) error { } func (w *httpWaiter) checkReadiness(ctx context.Context) (bool, error) { + ctx, cancel := context.WithTimeout(ctx, w.timeout) + defer cancel() + req, err := http.NewRequestWithContext(ctx, http.MethodGet, w.url, http.NoBody) if err != nil { return false, err diff --git a/readiness/http_test.go b/readiness/http_test.go index 711fc6a..508a809 100644 --- a/readiness/http_test.go +++ b/readiness/http_test.go @@ -28,7 +28,7 @@ func TestHTTPWaiter(t *testing.T) { })) defer srv.Close() - waiter := readiness.NewHTTP(srv.URL+"/foo", 200*time.Millisecond) + waiter := readiness.NewHTTP(srv.URL+"/foo", 200*time.Millisecond, time.Second) err := waiter.Wait(context.Background()) require.NoError(t, err)