Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for InfluxDB v2 #2

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,10 @@ Here are some of the important config-options:
|main|FieldSeperator|This char is used to separate the logical parts of the tablenames. This char has to be an char which is not allowed in one of those: host-, servicename, command, perfdata|
|main|FileBufferSize|This is the size of the buffer which is used to read files from disk, if you have huge checks or a lot of them you maybe recive error messages that your buffer is too small and that's the point to change it|
|Log|MinSeverity|INFO is default an enough for the most. DEBUG give you a lot more data but it's mostly just spamming|
|InfluxDBGlobal|Version|Currentliy the only supported Version of InfluxDB is 0.9+|
|Influx "name"|Version|**1.0** - for InfluxDB 0.9+ and 2.0 earlier versions<br>**2.0** - for InfluxDB 2.0 or later versions|
|Influx "name"|Address|The URL of the InfluxDB-API|
|Influx "name"|Arguments|Here you can set your user name and password as well as the database. **The precision has to be ms!**|
|Influx "name"|Arguments|Here you can set your user name and password as well as the database. **The precision has to be ms!**<br> Organization & Bucket details required for InfluxDB 2.0 or later versions|
|Influx "name"|AuthToken|InfluxDB API Token with required permissions|
|Influx "name"|NastyString/NastyStringToReplace|These keys are to avoid a bug in InfluxDB and should disappear when the bug is fixed|
|Influx "name"|StopPullingDataIfDown|This is used to tell Nagflux, if this Influxdb is down to stop reading new data. That's useful if you're using spoolfiles. But if you're using gearman set this always to false because by default gearman will not buffer the data endlessly|

Expand Down
8 changes: 8 additions & 0 deletions config.gcfg.example
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,14 @@
Arguments = "precision=ms&u=root&p=root&db=nagflux"
StopPullingDataIfDown = true

[InfluxDB "nagflux2"]
Enabled = true
Version = 2.0
Address = "http://127.0.0.1:8086"
Arguments = "precision=ms&org=nagflux&bucket=nagflux"
AuthToken = "ABCDEFGHIJLKMNOPQRSTUVWXYZ"
StopPullingDataIfDown = true

[InfluxDB "fast"]
Enabled = false
Version = 1.0
Expand Down
1 change: 1 addition & 0 deletions config/Config.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type Config struct {
Version string
StopPullingDataIfDown bool
HealthUrl string
AuthToken string
}
Livestatus struct {
Type string
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
github.com/kdar/factorlog v0.0.0-20140929220826-d5b6afb8b4fe
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d // indirect
github.com/prometheus/client_golang v1.7.1
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 // indirect
golang.org/x/tools v0.0.0-20200820180210-c8f393745106
gopkg.in/gcfg.v1 v1.2.3
gopkg.in/robfig/cron.v2 v2.0.0-20150107220207-be2e0b0deed5 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,8 @@ golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1 h1:ogLJMz+qpzav7lGMh10LMvAkM/fAoGlaiiHYiFYdm80=
golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 h1:0A+M6Uqn+Eje4kHMK80dtF3JCXC4ykBgQG4Fe06QRhQ=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
Expand Down
24 changes: 24 additions & 0 deletions helper/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,27 @@ func getBody(resp *http.Response) string {
}
return ""
}

//GetHeaders makes an HEAD or GET request. If no errors, it will return all HTTP Response Headers
func GetHeaders(client http.Client, url, function string) map[string]string {
var resp *http.Response
var err error
r := make(map[string]string)
switch function {
case "HEAD":
resp, err = client.Head(url)
case "GET":
resp, err = client.Get(url)
default:
err = errors.New("Unknown Function")
}
if err == nil {
for name, values := range resp.Header {
// Loop over all values for the name.
for _, value := range values {
r[name] = value
}
}
}
return r
}
8 changes: 6 additions & 2 deletions nagflux.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,13 @@ For further informations / bugs reportes: https://github.com/ConSol/nagflux
resultQueues[target],
influxConfig.Address, influxConfig.Arguments, cfg.Main.DumpFile, influxConfig.Version,
cfg.Main.InfluxWorker, cfg.Main.MaxInfluxWorker, cfg.InfluxDBGlobal.CreateDatabaseIfNotExists,
influxConfig.StopPullingDataIfDown, target, cfg.InfluxDBGlobal.ClientTimeout, influxConfig.HealthUrl,
influxConfig.StopPullingDataIfDown, target, cfg.InfluxDBGlobal.ClientTimeout, influxConfig.HealthUrl, influxConfig.AuthToken,
)
stoppables = append(stoppables, influx)
if influx.IsAlive() {
stoppables = append(stoppables, influx)
} else {
log.Criticalf("Nagflux is disabled for InfluxDB(%s)", target.Name)
}
influxDumpFileCollector := nagflux.NewDumpfileCollector(resultQueues[target], cfg.Main.DumpFile, target, cfg.Main.FileBufferSize)
waitForDumpfileCollector(influxDumpFileCollector)
stoppables = append(stoppables, influxDumpFileCollector)
Expand Down
36 changes: 35 additions & 1 deletion target/influx/Connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,12 @@ type Connector struct {
clientTimeout int
createDatabaseIfNotExists bool
healthUrl string
authToken string
}

//ConnectorFactory Constructor which will create some workers if the connection is established.
func ConnectorFactory(jobs chan collector.Printable, connectionHost, connectionArgs, dumpFile, version string,
workerAmount, maxWorkers int, createDatabaseIfNotExists, stopReadingDataIfDown bool, target data.Target, clientTimeout int, healthUrl string) *Connector {
workerAmount, maxWorkers int, createDatabaseIfNotExists, stopReadingDataIfDown bool, target data.Target, clientTimeout int, healthUrl string, authToken string) *Connector {
parsedArgs := helper.StringToMap(connectionArgs, "&", "=")
var databaseName string
if db, found_db := parsedArgs["db"]; found_db {
Expand All @@ -56,6 +57,30 @@ func ConnectorFactory(jobs chan collector.Printable, connectionHost, connectionA
workers: make([]*Worker, workerAmount), maxWorkers: maxWorkers, jobs: jobs, quit: make(chan bool),
log: logging.GetLogger(), version: version, isAlive: false, databaseExists: false, databaseName: databaseName,
httpClient: client, target: target, stopReadingDataIfDown: stopReadingDataIfDown, clientTimeout: clientTimeout, createDatabaseIfNotExists: createDatabaseIfNotExists, healthUrl: healthUrl,
authToken: authToken,
}

// InfluxDB v2
if version == "2.0" {
// InfluxDB OSS requires either org or orgID
var orgInfo string
for _, v := range []string{"org", "orgID"} {
if o, ok := parsedArgs[v]; ok {
orgInfo = o
break
}
}
if orgInfo == "" {
result := helper.GetHeaders(s.httpClient, s.connectionHost+"/ping", "GET")
if len(result) != 0 && result["X-Influxdb-Build"] == "OSS" {
s.log.Critical("InfluxDB OSS requires Orgranization Details. Please provide either orgID or org")
s.isAlive = false
return s
}
}
// In InfluxDB 2.0 or later versions, databases no longer exist, they are replaced by buckets.
s.createDatabaseIfNotExists = false
createDatabaseIfNotExists = false
}

if createDatabaseIfNotExists && databaseName == "" {
Expand Down Expand Up @@ -90,6 +115,9 @@ func ConnectorFactory(jobs chan collector.Printable, connectionHost, connectionA
}

gen := WorkerGenerator(jobs, connectionHost+"/write?"+connectionArgs, dumpFile, version, s, target, stopReadingDataIfDown)
if s.version == "2.0" {
gen = WorkerGenerator(jobs, connectionHost+"/api/v2/write?"+connectionArgs, dumpFile, version, s, target, stopReadingDataIfDown)
}
s.TestIfIsAlive(stopReadingDataIfDown)
if !s.isAlive && !stopReadingDataIfDown {
s.log.Warnf("InfluxDB server(%s) is down but starting anyway due to 'stopReadingDataIfDown' = %t", target.Name, stopReadingDataIfDown)
Expand Down Expand Up @@ -135,6 +163,12 @@ func (connector *Connector) AddWorker() {
connector.jobs, connector.connectionHost+"/write?"+connector.connectionArgs,
connector.dumpFile, connector.version, connector, connector.target, connector.stopReadingDataIfDown,
)
if connector.version == "2.0" {
gen = WorkerGenerator(
connector.jobs, connector.connectionHost+"/api/v2/write?"+connector.connectionArgs,
connector.dumpFile, connector.version, connector, connector.target, connector.stopReadingDataIfDown,
)
}
connector.workers = append(connector.workers, gen(oldLength+2))
connector.log.Infof("Starting Worker: %d -> %d", oldLength, connector.AmountWorkers())
}
Expand Down
3 changes: 3 additions & 0 deletions target/influx/Worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,9 @@ func (worker Worker) sendData(rawData []byte, log bool) error {
worker.log.Warn(err)
}
req.Header.Set("User-Agent", "Nagflux")
if worker.version == "2.0" {
req.Header.Set("Authorization", "Token "+worker.connector.authToken)
}
resp, err := worker.httpClient.Do(req)
if err != nil {
worker.log.Warn(err)
Expand Down