From ed22a93ba4c2e9a915ca8326f569266a680ef288 Mon Sep 17 00:00:00 2001 From: lework Date: Fri, 30 Aug 2024 18:57:44 +0800 Subject: [PATCH 1/5] feat: support for scraping multiple elasticsearch instance Signed-off-by: lework --- main.go | 262 +++++++++++++++++++-------------- pkg/clusterinfo/clusterinfo.go | 26 ++-- 2 files changed, 167 insertions(+), 121 deletions(-) diff --git a/main.go b/main.go index 5e4c6da0..9f7f47a1 100644 --- a/main.go +++ b/main.go @@ -143,117 +143,13 @@ func main() { esURL.User = url.UserPassword(esUsername, esPassword) } - // returns nil if not provided and falls back to simple TCP. - tlsConfig := createTLSConfig(*esCA, *esClientCert, *esClientPrivateKey, *esInsecureSkipVerify) - - var httpTransport http.RoundTripper - - httpTransport = &http.Transport{ - TLSClientConfig: tlsConfig, - Proxy: http.ProxyFromEnvironment, - } - - esAPIKey := os.Getenv("ES_API_KEY") - - if esAPIKey != "" { - httpTransport = &transportWithAPIKey{ - underlyingTransport: httpTransport, - apiKey: esAPIKey, - } - } - - httpClient := &http.Client{ - Timeout: *esTimeout, - Transport: httpTransport, - } - - if *awsRegion != "" { - httpClient.Transport, err = roundtripper.NewAWSSigningTransport(httpTransport, *awsRegion, *awsRoleArn, logger) - if err != nil { - level.Error(logger).Log("msg", "failed to create AWS transport", "err", err) - os.Exit(1) - } - } - - // version metric - prometheus.MustRegister(version.NewCollector(name)) - - // create the exporter - exporter, err := collector.NewElasticsearchCollector( - logger, - []string{}, - collector.WithElasticsearchURL(esURL), - collector.WithHTTPClient(httpClient), - ) - if err != nil { - level.Error(logger).Log("msg", "failed to create Elasticsearch collector", "err", err) - os.Exit(1) - } - prometheus.MustRegister(exporter) - - // TODO(@sysadmind): Remove this when we have a better way to get the cluster name to down stream collectors. - // cluster info retriever - clusterInfoRetriever := clusterinfo.New(logger, httpClient, esURL, *esClusterInfoInterval) - - prometheus.MustRegister(collector.NewClusterHealth(logger, httpClient, esURL)) - prometheus.MustRegister(collector.NewNodes(logger, httpClient, esURL, *esAllNodes, *esNode)) - - if *esExportIndices || *esExportShards { - sC := collector.NewShards(logger, httpClient, esURL) - prometheus.MustRegister(sC) - iC := collector.NewIndices(logger, httpClient, esURL, *esExportShards, *esExportIndexAliases) - prometheus.MustRegister(iC) - if registerErr := clusterInfoRetriever.RegisterConsumer(iC); registerErr != nil { - level.Error(logger).Log("msg", "failed to register indices collector in cluster info") - os.Exit(1) - } - if registerErr := clusterInfoRetriever.RegisterConsumer(sC); registerErr != nil { - level.Error(logger).Log("msg", "failed to register shards collector in cluster info") - os.Exit(1) - } - } - - if *esExportSLM { - prometheus.MustRegister(collector.NewSLM(logger, httpClient, esURL)) - } - - if *esExportDataStream { - prometheus.MustRegister(collector.NewDataStream(logger, httpClient, esURL)) - } - - if *esExportIndicesSettings { - prometheus.MustRegister(collector.NewIndicesSettings(logger, httpClient, esURL)) - } - - if *esExportIndicesMappings { - prometheus.MustRegister(collector.NewIndicesMappings(logger, httpClient, esURL)) - } - - if *esExportILM { - prometheus.MustRegister(collector.NewIlmStatus(logger, httpClient, esURL)) - prometheus.MustRegister(collector.NewIlmIndicies(logger, httpClient, esURL)) - } + clusterRetrieverMap := make(map[string]*clusterinfo.Retriever) // Create a context that is cancelled on SIGKILL or SIGINT. ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill) defer cancel() - // start the cluster info retriever - switch runErr := clusterInfoRetriever.Run(ctx); runErr { - case nil: - level.Info(logger).Log( - "msg", "started cluster info retriever", - "interval", (*esClusterInfoInterval).String(), - ) - case clusterinfo.ErrInitialCallTimeout: - level.Info(logger).Log("msg", "initial cluster info call timed out") - default: - level.Error(logger).Log("msg", "failed to run cluster info retriever", "err", err) - os.Exit(1) - } - - // register cluster info retriever as prometheus collector - prometheus.MustRegister(clusterInfoRetriever) + probePath := "/probe" http.Handle(*metricsPath, promhttp.Handler()) if *metricsPath != "/" && *metricsPath != "" { @@ -263,8 +159,14 @@ func main() { Version: version.Info(), Links: []web.LandingLinks{ { - Address: *metricsPath, - Text: "Metrics", + Address: *metricsPath, + Text: "Metrics", + Description: "Metrics endpoint exposing elasticsearch-exporter metrics in the Prometheus exposition format.", + }, + { + Address: probePath, + Text: "Probe", + Description: "Probe endpoint for testing the exporter against a specific Elasticsearch instance.", }, }, } @@ -276,6 +178,150 @@ func main() { http.Handle("/", landingPage) } + http.HandleFunc("/probe", func(w http.ResponseWriter, r *http.Request) { + params := r.URL.Query() + target := params.Get("target") + + if target != "" { + targetURL, err := url.Parse(target) + if err != nil { + http.Error(w, "invalid target", http.StatusBadRequest) + return + } + + targetUsername := os.Getenv("ES_USERNAME") + targetPassword := os.Getenv("ES_PASSWORD") + + authModule := params.Get("auth_module") + if authModule != "" { + targetUsername = os.Getenv(fmt.Sprintf("ES_%s_USERNAME", authModule)) + targetPassword = os.Getenv(fmt.Sprintf("ES_%s_PASSWORD", authModule)) + } + + if targetUsername != "" && targetPassword != "" { + targetURL.User = url.UserPassword(targetUsername, targetPassword) + } + + esURL = targetURL + } + + registry := prometheus.NewRegistry() + // returns nil if not provided and falls back to simple TCP. + tlsConfig := createTLSConfig(*esCA, *esClientCert, *esClientPrivateKey, *esInsecureSkipVerify) + + var httpTransport http.RoundTripper + + httpTransport = &http.Transport{ + TLSClientConfig: tlsConfig, + Proxy: http.ProxyFromEnvironment, + } + + esAPIKey := os.Getenv("ES_API_KEY") + + if esAPIKey != "" { + httpTransport = &transportWithAPIKey{ + underlyingTransport: httpTransport, + apiKey: esAPIKey, + } + } + + httpClient := &http.Client{ + Timeout: *esTimeout, + Transport: httpTransport, + } + + if *awsRegion != "" { + httpClient.Transport, err = roundtripper.NewAWSSigningTransport(httpTransport, *awsRegion, *awsRoleArn, logger) + if err != nil { + level.Error(logger).Log("msg", "failed to create AWS transport", "err", err) + os.Exit(1) + } + } + + // version metric + registry.MustRegister(version.NewCollector(name)) + + // create the exporter + exporter, err := collector.NewElasticsearchCollector( + logger, + []string{}, + collector.WithElasticsearchURL(esURL), + collector.WithHTTPClient(httpClient), + ) + if err != nil { + level.Error(logger).Log("msg", "failed to create Elasticsearch collector", "err", err) + os.Exit(1) + } + registry.MustRegister(exporter) + + // TODO(@sysadmind): Remove this when we have a better way to get the cluster name to down stream collectors. + // cluster info retriever + + clusterInfoRetriever, ok := clusterRetrieverMap[target] + if !ok { + clusterInfoRetriever = clusterinfo.New(logger, httpClient, esURL, *esClusterInfoInterval) + clusterRetrieverMap[target] = clusterInfoRetriever + + // start the cluster info retriever + switch runErr := clusterInfoRetriever.Run(ctx); runErr { + case nil: + level.Info(logger).Log( + "msg", fmt.Sprintf("[%s]started cluster info retriever", esURL.Host), + "interval", (*esClusterInfoInterval).String(), + ) + case clusterinfo.ErrInitialCallTimeout: + level.Info(logger).Log("msg", fmt.Sprintf("[%s]initial cluster info call timed out", esURL.Host)) + default: + level.Error(logger).Log("msg", fmt.Sprintf("[%s]failed to run cluster info retriever", esURL.Host), "err", err) + } + } + + registry.MustRegister(collector.NewClusterHealth(logger, httpClient, esURL)) + registry.MustRegister(collector.NewNodes(logger, httpClient, esURL, *esAllNodes, *esNode)) + + if *esExportIndices || *esExportShards { + sC := collector.NewShards(logger, httpClient, esURL) + prometheus.MustRegister(sC) + iC := collector.NewIndices(logger, httpClient, esURL, *esExportShards, *esExportIndexAliases) + prometheus.MustRegister(iC) + if registerErr := clusterInfoRetriever.RegisterConsumer(iC); registerErr != nil { + level.Error(logger).Log("msg", "failed to register indices collector in cluster info") + os.Exit(1) + } + if registerErr := clusterInfoRetriever.RegisterConsumer(sC); registerErr != nil { + level.Error(logger).Log("msg", "failed to register shards collector in cluster info") + os.Exit(1) + } + } + + if *esExportSLM { + registry.MustRegister(collector.NewSLM(logger, httpClient, esURL)) + } + + if *esExportDataStream { + registry.MustRegister(collector.NewDataStream(logger, httpClient, esURL)) + } + + if *esExportIndicesSettings { + registry.MustRegister(collector.NewIndicesSettings(logger, httpClient, esURL)) + } + + if *esExportIndicesMappings { + registry.MustRegister(collector.NewIndicesMappings(logger, httpClient, esURL)) + } + + if *esExportILM { + registry.MustRegister(collector.NewIlmStatus(logger, httpClient, esURL)) + registry.MustRegister(collector.NewIlmIndicies(logger, httpClient, esURL)) + } + + // register cluster info retriever as prometheus collector + registry.MustRegister(clusterInfoRetriever) + + h := promhttp.HandlerFor(registry, promhttp.HandlerOpts{}) + h.ServeHTTP(w, r) + }) + // health endpoint http.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { http.Error(w, http.StatusText(http.StatusOK), http.StatusOK) diff --git a/pkg/clusterinfo/clusterinfo.go b/pkg/clusterinfo/clusterinfo.go index 00486f5f..24adb840 100644 --- a/pkg/clusterinfo/clusterinfo.go +++ b/pkg/clusterinfo/clusterinfo.go @@ -131,7 +131,7 @@ func (r *Retriever) updateMetrics(res *Response) { u := *r.url u.User = nil url := u.String() - level.Debug(r.logger).Log("msg", "updating cluster info metrics") + level.Debug(r.logger).Log("msg", fmt.Sprintf("[%s]updating cluster info metrics", u.Host)) // scrape failed, response is nil if res == nil { r.up.WithLabelValues(url).Set(0.0) @@ -175,18 +175,18 @@ func (r *Retriever) Run(ctx context.Context) error { select { case <-ctx.Done(): level.Info(r.logger).Log( - "msg", "context cancelled, exiting cluster info update loop", + "msg", fmt.Sprintf("[%s]context cancelled, exiting cluster info update loop", r.url.Host), "err", ctx.Err(), ) return case <-r.sync: level.Info(r.logger).Log( - "msg", "providing consumers with updated cluster info label", + "msg", fmt.Sprintf("[%s]providing consumers with updated cluster info label", r.url.Host), ) res, err := r.fetchAndDecodeClusterInfo() if err != nil { level.Error(r.logger).Log( - "msg", "failed to retrieve cluster info from ES", + "msg", fmt.Sprintf("[%s]failed to retrieve cluster info from ES", r.url.Host), "err", err, ) r.updateMetrics(nil) @@ -195,7 +195,7 @@ func (r *Retriever) Run(ctx context.Context) error { r.updateMetrics(res) for name, consumerCh := range r.consumerChannels { level.Debug(r.logger).Log( - "msg", "sending update", + "msg", fmt.Sprintf("[%s]sending update", r.url.Host), "consumer", name, "res", fmt.Sprintf("%+v", res), ) @@ -212,7 +212,7 @@ func (r *Retriever) Run(ctx context.Context) error { }(ctx) // trigger initial cluster info call level.Info(r.logger).Log( - "msg", "triggering initial cluster info call", + "msg", fmt.Sprintf("[%s]triggering initial cluster info call", r.url.Host), ) r.sync <- struct{}{} @@ -220,7 +220,7 @@ func (r *Retriever) Run(ctx context.Context) error { go func(ctx context.Context) { if r.interval <= 0 { level.Info(r.logger).Log( - "msg", "no periodic cluster info label update requested", + "msg", fmt.Sprintf("[%s]no periodic cluster info label update requested", r.url.Host), ) return } @@ -229,13 +229,13 @@ func (r *Retriever) Run(ctx context.Context) error { select { case <-ctx.Done(): level.Info(r.logger).Log( - "msg", "context cancelled, exiting cluster info trigger loop", + "msg", fmt.Sprintf("[%s]context cancelled, exiting cluster info trigger loop", r.url.Host), "err", ctx.Err(), ) return case <-ticker.C: level.Debug(r.logger).Log( - "msg", "triggering periodic update", + "msg", fmt.Sprintf("[%s]triggering periodic update", r.url.Host), ) r.sync <- struct{}{} } @@ -246,7 +246,7 @@ func (r *Retriever) Run(ctx context.Context) error { select { case <-startupComplete: // first sync has been successful - level.Debug(r.logger).Log("msg", "initial clusterinfo sync succeeded") + level.Debug(r.logger).Log("msg", fmt.Sprintf("[%s]initial clusterinfo sync succeeded", r.url.Host)) return nil case <-time.After(initialTimeout): // initial call timed out @@ -265,7 +265,7 @@ func (r *Retriever) fetchAndDecodeClusterInfo() (*Response, error) { res, err := r.client.Get(u.String()) if err != nil { level.Error(r.logger).Log( - "msg", "failed to get cluster info", + "msg", fmt.Sprintf("[%s]failed to get cluster info", r.url.Host), "err", err, ) return nil, err @@ -275,14 +275,14 @@ func (r *Retriever) fetchAndDecodeClusterInfo() (*Response, error) { err = res.Body.Close() if err != nil { level.Warn(r.logger).Log( - "msg", "failed to close http.Client", + "msg", fmt.Sprintf("[%s]failed to close http.Client", r.url.Host), "err", err, ) } }() if res.StatusCode != http.StatusOK { - return nil, fmt.Errorf("HTTP Request failed with code %d", res.StatusCode) + return nil, fmt.Errorf("[%s]HTTP Request failed with code %d", r.url.Host, res.StatusCode) } bts, err := io.ReadAll(res.Body) From 2f6ea86ba9c5affab2bc494592ec5122ee0c5208 Mon Sep 17 00:00:00 2001 From: lework Date: Fri, 30 Aug 2024 19:20:34 +0800 Subject: [PATCH 2/5] feat: support for scraping multiple elasticsearch instance Signed-off-by: lework --- README.md | 46 ++++++++++++++++++++++++++++++++++++++++++++++ main.go | 2 ++ 2 files changed, 48 insertions(+) diff --git a/README.md b/README.md index a7fa6e0e..2f87fbd4 100644 --- a/README.md +++ b/README.md @@ -29,6 +29,9 @@ elasticsearch_exporter: - "127.0.0.1:9114:9114" ``` +- /metrics endpoint is used to scrape the elasticsearch-exporter. +- /probe endpoint is used to scrape multiple elasticsearch instances. + #### Kubernetes You can find a helm chart in the prometheus-community charts repository at @@ -103,6 +106,49 @@ Further Information - [Defining Roles](https://www.elastic.co/guide/en/elastic-stack-overview/7.3/defining-roles.html) - [Privileges](https://www.elastic.co/guide/en/elastic-stack-overview/7.3/security-privileges.html) + +### Multi exporter mode + +Pass the es url via the url parameter. + +``` +http://localhost:9114/probe?target=http://server1:9200&auth_module=TEST +``` + +- target: es url +- auth_module: es url对应的用户名密码环境变量 exp: ES_{auth_module}_USERNAME=elastic, ES_{auth_module}_PASSWORD + +``` +elasticsearch_exporter: + image: quay.io/prometheuscommunity/elasticsearch-exporter:latest + environment: + - ES_TEST_USERNAME=elastic + - ES_TEST_PASSWORD=changeme + restart: always + ports: + - "127.0.0.1:9114:9114" +``` + +On the prometheus side you can set a scrape config as follows +``` +- job_name: elasticsearch # To get metrics about the elasticsearch exporter’s targets + static_configs: + - targets: + # All elasticsearch hostnames to monitor. + - http://server1:9200 + - http://server2:9200 + relabel_configs: + - source_labels: [__address__] + target_label: __param_target + - source_labels: [__param_target] + target_label: instance + # The auth_module is used to find out the username and password from the environment variables. + - target_label: __param_auth_module + replacement: test + - target_label: __address__ + # The elasticsearch_exporter host:port + replacement: localhost:9114 +``` ### Metrics | Name | Type | Cardinality | Help | diff --git a/main.go b/main.go index 9f7f47a1..536032b4 100644 --- a/main.go +++ b/main.go @@ -19,6 +19,7 @@ import ( "net/url" "os" "os/signal" + "strings" "time" "context" @@ -194,6 +195,7 @@ func main() { authModule := params.Get("auth_module") if authModule != "" { + authModule = strings.ToUpper(authModule) targetUsername = os.Getenv(fmt.Sprintf("ES_%s_USERNAME", authModule)) targetPassword = os.Getenv(fmt.Sprintf("ES_%s_PASSWORD", authModule)) } From e89e697916891795f81d47d4b6d6b904d2726dce Mon Sep 17 00:00:00 2001 From: lework Date: Fri, 30 Aug 2024 19:22:48 +0800 Subject: [PATCH 3/5] feat: support for scraping multiple elasticsearch instance Signed-off-by: lework --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 2f87fbd4..b47acf98 100644 --- a/README.md +++ b/README.md @@ -116,7 +116,7 @@ http://localhost:9114/probe?target=http://server1:9200&auth_module=TEST ``` - target: es url -- auth_module: es url对应的用户名密码环境变量 exp: ES_{auth_module}_USERNAME=elastic, ES_{auth_module}_PASSWORD +- auth_module: The username and password environment variables for the es url. exp: `ES_{auth_module}_USERNAME`, `ES_{auth_module}_PASSWORD` ``` elasticsearch_exporter: From 15c3337049870751e7e4af69c2ea33a579acaba9 Mon Sep 17 00:00:00 2001 From: lework Date: Mon, 2 Sep 2024 11:11:24 +0800 Subject: [PATCH 4/5] feat: support for scraping multiple elasticsearch instance Signed-off-by: lework --- main.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/main.go b/main.go index 536032b4..91fc0271 100644 --- a/main.go +++ b/main.go @@ -236,7 +236,8 @@ func main() { httpClient.Transport, err = roundtripper.NewAWSSigningTransport(httpTransport, *awsRegion, *awsRoleArn, logger) if err != nil { level.Error(logger).Log("msg", "failed to create AWS transport", "err", err) - os.Exit(1) + http.Error(w, "failed to create AWS transport", http.StatusInternalServerError) + return } } @@ -252,7 +253,8 @@ func main() { ) if err != nil { level.Error(logger).Log("msg", "failed to create Elasticsearch collector", "err", err) - os.Exit(1) + http.Error(w, "failed to create Elasticsearch collector", http.StatusInternalServerError) + return } registry.MustRegister(exporter) @@ -283,9 +285,9 @@ func main() { if *esExportIndices || *esExportShards { sC := collector.NewShards(logger, httpClient, esURL) - prometheus.MustRegister(sC) + registry.MustRegister(sC) iC := collector.NewIndices(logger, httpClient, esURL, *esExportShards, *esExportIndexAliases) - prometheus.MustRegister(iC) + registry.MustRegister(iC) if registerErr := clusterInfoRetriever.RegisterConsumer(iC); registerErr != nil { level.Error(logger).Log("msg", "failed to register indices collector in cluster info") os.Exit(1) From f8b845df02bdc9114a24843333fdf02d86ca4811 Mon Sep 17 00:00:00 2001 From: lework Date: Mon, 2 Sep 2024 11:44:22 +0800 Subject: [PATCH 5/5] fix: fix cluster info failed Signed-off-by: lework --- main.go | 34 ++++++++++++++++++---------------- 1 file changed, 18 insertions(+), 16 deletions(-) diff --git a/main.go b/main.go index 91fc0271..229e6f0b 100644 --- a/main.go +++ b/main.go @@ -186,6 +186,7 @@ func main() { if target != "" { targetURL, err := url.Parse(target) if err != nil { + level.Error(logger).Log("invalid target", err) http.Error(w, "invalid target", http.StatusBadRequest) return } @@ -260,12 +261,28 @@ func main() { // TODO(@sysadmind): Remove this when we have a better way to get the cluster name to down stream collectors. // cluster info retriever - clusterInfoRetriever, ok := clusterRetrieverMap[target] if !ok { clusterInfoRetriever = clusterinfo.New(logger, httpClient, esURL, *esClusterInfoInterval) clusterRetrieverMap[target] = clusterInfoRetriever + if *esExportIndices || *esExportShards { + sC := collector.NewShards(logger, httpClient, esURL) + registry.MustRegister(sC) + iC := collector.NewIndices(logger, httpClient, esURL, *esExportShards, *esExportIndexAliases) + registry.MustRegister(iC) + if registerErr := clusterInfoRetriever.RegisterConsumer(iC); registerErr != nil { + level.Error(logger).Log("msg", "failed to register indices collector in cluster info", registerErr) + http.Error(w, "failed to register indices collector in cluster info", http.StatusInternalServerError) + return + } + if registerErr := clusterInfoRetriever.RegisterConsumer(sC); registerErr != nil { + level.Error(logger).Log("msg", "failed to register shards collector in cluster info", registerErr) + http.Error(w, "failed to register shards collector in cluster info", http.StatusInternalServerError) + return + } + } + // start the cluster info retriever switch runErr := clusterInfoRetriever.Run(ctx); runErr { case nil: @@ -283,21 +300,6 @@ func main() { registry.MustRegister(collector.NewClusterHealth(logger, httpClient, esURL)) registry.MustRegister(collector.NewNodes(logger, httpClient, esURL, *esAllNodes, *esNode)) - if *esExportIndices || *esExportShards { - sC := collector.NewShards(logger, httpClient, esURL) - registry.MustRegister(sC) - iC := collector.NewIndices(logger, httpClient, esURL, *esExportShards, *esExportIndexAliases) - registry.MustRegister(iC) - if registerErr := clusterInfoRetriever.RegisterConsumer(iC); registerErr != nil { - level.Error(logger).Log("msg", "failed to register indices collector in cluster info") - os.Exit(1) - } - if registerErr := clusterInfoRetriever.RegisterConsumer(sC); registerErr != nil { - level.Error(logger).Log("msg", "failed to register shards collector in cluster info") - os.Exit(1) - } - } - if *esExportSLM { registry.MustRegister(collector.NewSLM(logger, httpClient, esURL)) }