Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support for scraping multiple elasticsearch instance #920

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ elasticsearch_exporter:
- "127.0.0.1:9114:9114"
```

- /metrics endpoint is used to scrape the elasticsearch-exporter.
- /probe endpoint is used to scrape multiple elasticsearch instances.

#### Kubernetes

You can find a helm chart in the prometheus-community charts repository at <https://github.com/prometheus-community/helm-charts/tree/main/charts/prometheus-elasticsearch-exporter>
Expand Down Expand Up @@ -103,6 +106,49 @@ Further Information
- [Defining Roles](https://www.elastic.co/guide/en/elastic-stack-overview/7.3/defining-roles.html)
- [Privileges](https://www.elastic.co/guide/en/elastic-stack-overview/7.3/security-privileges.html)


### Multi exporter mode

Pass the es url via the url parameter.

```
http://localhost:9114/probe?target=http://server1:9200&auth_module=TEST
```

- target: es url
- auth_module: The username and password environment variables for the es url. exp: `ES_{auth_module}_USERNAME`, `ES_{auth_module}_PASSWORD`

```
elasticsearch_exporter:
image: quay.io/prometheuscommunity/elasticsearch-exporter:latest
environment:
- ES_TEST_USERNAME=elastic
- ES_TEST_PASSWORD=changeme
restart: always
ports:
- "127.0.0.1:9114:9114"
```

On the prometheus side you can set a scrape config as follows
```
- job_name: elasticsearch # To get metrics about the elasticsearch exporter’s targets
static_configs:
- targets:
# All elasticsearch hostnames to monitor.
- http://server1:9200
- http://server2:9200
relabel_configs:
- source_labels: [__address__]
target_label: __param_target
- source_labels: [__param_target]
target_label: instance
# The auth_module is used to find out the username and password from the environment variables.
- target_label: __param_auth_module
replacement: test
- target_label: __address__
# The elasticsearch_exporter host:port
replacement: localhost:9114
```
### Metrics

| Name | Type | Cardinality | Help |
Expand Down
268 changes: 160 additions & 108 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"net/url"
"os"
"os/signal"
"strings"
"time"

"context"
Expand Down Expand Up @@ -143,117 +144,13 @@ func main() {
esURL.User = url.UserPassword(esUsername, esPassword)
}

// returns nil if not provided and falls back to simple TCP.
tlsConfig := createTLSConfig(*esCA, *esClientCert, *esClientPrivateKey, *esInsecureSkipVerify)

var httpTransport http.RoundTripper

httpTransport = &http.Transport{
TLSClientConfig: tlsConfig,
Proxy: http.ProxyFromEnvironment,
}

esAPIKey := os.Getenv("ES_API_KEY")

if esAPIKey != "" {
httpTransport = &transportWithAPIKey{
underlyingTransport: httpTransport,
apiKey: esAPIKey,
}
}

httpClient := &http.Client{
Timeout: *esTimeout,
Transport: httpTransport,
}

if *awsRegion != "" {
httpClient.Transport, err = roundtripper.NewAWSSigningTransport(httpTransport, *awsRegion, *awsRoleArn, logger)
if err != nil {
level.Error(logger).Log("msg", "failed to create AWS transport", "err", err)
os.Exit(1)
}
}

// version metric
prometheus.MustRegister(version.NewCollector(name))

// create the exporter
exporter, err := collector.NewElasticsearchCollector(
logger,
[]string{},
collector.WithElasticsearchURL(esURL),
collector.WithHTTPClient(httpClient),
)
if err != nil {
level.Error(logger).Log("msg", "failed to create Elasticsearch collector", "err", err)
os.Exit(1)
}
prometheus.MustRegister(exporter)

// TODO(@sysadmind): Remove this when we have a better way to get the cluster name to down stream collectors.
// cluster info retriever
clusterInfoRetriever := clusterinfo.New(logger, httpClient, esURL, *esClusterInfoInterval)

prometheus.MustRegister(collector.NewClusterHealth(logger, httpClient, esURL))
prometheus.MustRegister(collector.NewNodes(logger, httpClient, esURL, *esAllNodes, *esNode))

if *esExportIndices || *esExportShards {
sC := collector.NewShards(logger, httpClient, esURL)
prometheus.MustRegister(sC)
iC := collector.NewIndices(logger, httpClient, esURL, *esExportShards, *esExportIndexAliases)
prometheus.MustRegister(iC)
if registerErr := clusterInfoRetriever.RegisterConsumer(iC); registerErr != nil {
level.Error(logger).Log("msg", "failed to register indices collector in cluster info")
os.Exit(1)
}
if registerErr := clusterInfoRetriever.RegisterConsumer(sC); registerErr != nil {
level.Error(logger).Log("msg", "failed to register shards collector in cluster info")
os.Exit(1)
}
}

if *esExportSLM {
prometheus.MustRegister(collector.NewSLM(logger, httpClient, esURL))
}

if *esExportDataStream {
prometheus.MustRegister(collector.NewDataStream(logger, httpClient, esURL))
}

if *esExportIndicesSettings {
prometheus.MustRegister(collector.NewIndicesSettings(logger, httpClient, esURL))
}

if *esExportIndicesMappings {
prometheus.MustRegister(collector.NewIndicesMappings(logger, httpClient, esURL))
}

if *esExportILM {
prometheus.MustRegister(collector.NewIlmStatus(logger, httpClient, esURL))
prometheus.MustRegister(collector.NewIlmIndicies(logger, httpClient, esURL))
}
clusterRetrieverMap := make(map[string]*clusterinfo.Retriever)

// Create a context that is cancelled on SIGKILL or SIGINT.
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill)
defer cancel()

// start the cluster info retriever
switch runErr := clusterInfoRetriever.Run(ctx); runErr {
case nil:
level.Info(logger).Log(
"msg", "started cluster info retriever",
"interval", (*esClusterInfoInterval).String(),
)
case clusterinfo.ErrInitialCallTimeout:
level.Info(logger).Log("msg", "initial cluster info call timed out")
default:
level.Error(logger).Log("msg", "failed to run cluster info retriever", "err", err)
os.Exit(1)
}

// register cluster info retriever as prometheus collector
prometheus.MustRegister(clusterInfoRetriever)
probePath := "/probe"

http.Handle(*metricsPath, promhttp.Handler())
if *metricsPath != "/" && *metricsPath != "" {
Expand All @@ -263,8 +160,14 @@ func main() {
Version: version.Info(),
Links: []web.LandingLinks{
{
Address: *metricsPath,
Text: "Metrics",
Address: *metricsPath,
Text: "Metrics",
Description: "Metrics endpoint exposing elasticsearch-exporter metrics in the Prometheus exposition format.",
},
{
Address: probePath,
Text: "Probe",
Description: "Probe endpoint for testing the exporter against a specific Elasticsearch instance.",
},
},
}
Expand All @@ -276,6 +179,155 @@ func main() {
http.Handle("/", landingPage)
}

http.HandleFunc("/probe", func(w http.ResponseWriter, r *http.Request) {
params := r.URL.Query()
target := params.Get("target")

if target != "" {
targetURL, err := url.Parse(target)
if err != nil {
level.Error(logger).Log("invalid target", err)
http.Error(w, "invalid target", http.StatusBadRequest)
return
}

targetUsername := os.Getenv("ES_USERNAME")
targetPassword := os.Getenv("ES_PASSWORD")

authModule := params.Get("auth_module")
if authModule != "" {
authModule = strings.ToUpper(authModule)
targetUsername = os.Getenv(fmt.Sprintf("ES_%s_USERNAME", authModule))
targetPassword = os.Getenv(fmt.Sprintf("ES_%s_PASSWORD", authModule))
}

if targetUsername != "" && targetPassword != "" {
targetURL.User = url.UserPassword(targetUsername, targetPassword)
}

esURL = targetURL
}

registry := prometheus.NewRegistry()
// returns nil if not provided and falls back to simple TCP.
tlsConfig := createTLSConfig(*esCA, *esClientCert, *esClientPrivateKey, *esInsecureSkipVerify)

var httpTransport http.RoundTripper

httpTransport = &http.Transport{
TLSClientConfig: tlsConfig,
Proxy: http.ProxyFromEnvironment,
}

esAPIKey := os.Getenv("ES_API_KEY")

if esAPIKey != "" {
httpTransport = &transportWithAPIKey{
underlyingTransport: httpTransport,
apiKey: esAPIKey,
}
}

httpClient := &http.Client{
Timeout: *esTimeout,
Transport: httpTransport,
}

if *awsRegion != "" {
httpClient.Transport, err = roundtripper.NewAWSSigningTransport(httpTransport, *awsRegion, *awsRoleArn, logger)
if err != nil {
level.Error(logger).Log("msg", "failed to create AWS transport", "err", err)
http.Error(w, "failed to create AWS transport", http.StatusInternalServerError)
return
}
}

// version metric
registry.MustRegister(version.NewCollector(name))

// create the exporter
exporter, err := collector.NewElasticsearchCollector(
logger,
[]string{},
collector.WithElasticsearchURL(esURL),
collector.WithHTTPClient(httpClient),
)
if err != nil {
level.Error(logger).Log("msg", "failed to create Elasticsearch collector", "err", err)
http.Error(w, "failed to create Elasticsearch collector", http.StatusInternalServerError)
return
}
registry.MustRegister(exporter)

// TODO(@sysadmind): Remove this when we have a better way to get the cluster name to down stream collectors.
// cluster info retriever
clusterInfoRetriever, ok := clusterRetrieverMap[target]
if !ok {
clusterInfoRetriever = clusterinfo.New(logger, httpClient, esURL, *esClusterInfoInterval)
clusterRetrieverMap[target] = clusterInfoRetriever

if *esExportIndices || *esExportShards {
sC := collector.NewShards(logger, httpClient, esURL)
registry.MustRegister(sC)
iC := collector.NewIndices(logger, httpClient, esURL, *esExportShards, *esExportIndexAliases)
registry.MustRegister(iC)
if registerErr := clusterInfoRetriever.RegisterConsumer(iC); registerErr != nil {
level.Error(logger).Log("msg", "failed to register indices collector in cluster info", registerErr)
http.Error(w, "failed to register indices collector in cluster info", http.StatusInternalServerError)
return
}
if registerErr := clusterInfoRetriever.RegisterConsumer(sC); registerErr != nil {
level.Error(logger).Log("msg", "failed to register shards collector in cluster info", registerErr)
http.Error(w, "failed to register shards collector in cluster info", http.StatusInternalServerError)
return
}
}

// start the cluster info retriever
switch runErr := clusterInfoRetriever.Run(ctx); runErr {
case nil:
level.Info(logger).Log(
"msg", fmt.Sprintf("[%s]started cluster info retriever", esURL.Host),
"interval", (*esClusterInfoInterval).String(),
)
case clusterinfo.ErrInitialCallTimeout:
level.Info(logger).Log("msg", fmt.Sprintf("[%s]initial cluster info call timed out", esURL.Host))
default:
level.Error(logger).Log("msg", fmt.Sprintf("[%s]failed to run cluster info retriever", esURL.Host), "err", err)
}
}

registry.MustRegister(collector.NewClusterHealth(logger, httpClient, esURL))
registry.MustRegister(collector.NewNodes(logger, httpClient, esURL, *esAllNodes, *esNode))

if *esExportSLM {
registry.MustRegister(collector.NewSLM(logger, httpClient, esURL))
}

if *esExportDataStream {
registry.MustRegister(collector.NewDataStream(logger, httpClient, esURL))
}

if *esExportIndicesSettings {
registry.MustRegister(collector.NewIndicesSettings(logger, httpClient, esURL))
}

if *esExportIndicesMappings {
registry.MustRegister(collector.NewIndicesMappings(logger, httpClient, esURL))
}

if *esExportILM {
registry.MustRegister(collector.NewIlmStatus(logger, httpClient, esURL))
registry.MustRegister(collector.NewIlmIndicies(logger, httpClient, esURL))
}

// register cluster info retriever as prometheus collector
registry.MustRegister(clusterInfoRetriever)

h := promhttp.HandlerFor(registry, promhttp.HandlerOpts{})
h.ServeHTTP(w, r)
})

// health endpoint
http.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
http.Error(w, http.StatusText(http.StatusOK), http.StatusOK)
Expand Down
Loading