From 62273dae8310c17d3243dd38e20b84e497f3927b Mon Sep 17 00:00:00 2001 From: Michael Hoffmann Date: Fri, 21 Jan 2022 09:29:26 +0100 Subject: [PATCH 1/3] aiven: aiven patches see AIVEN_CHANGES.md for details --- AIVEN_CHANGES.md | 36 ++ agent/agent.go | 2 +- plugins/inputs/aiven-procstat/README.md | 8 + .../inputs/aiven-procstat/dev/telegraf.conf | 9 + .../inputs/aiven-procstat/native_finder.go | 96 ++++ .../native_finder_notwindows.go | 32 ++ .../aiven-procstat/native_finder_test.go | 29 + .../aiven-procstat/native_finder_windows.go | 30 ++ .../native_finder_windows_test.go | 49 ++ plugins/inputs/aiven-procstat/pgrep.go | 90 ++++ plugins/inputs/aiven-procstat/process.go | 77 +++ plugins/inputs/aiven-procstat/procstat.go | 507 ++++++++++++++++++ .../inputs/aiven-procstat/procstat_test.go | 465 ++++++++++++++++ .../aiven-procstat/win_service_notwindows.go | 11 + .../aiven-procstat/win_service_windows.go | 48 ++ plugins/inputs/all/aiven_procstat.go | 5 + plugins/inputs/elasticsearch/README.md | 5 + plugins/inputs/elasticsearch/elasticsearch.go | 104 ++++ .../elasticsearch/elasticsearch_test.go | 41 ++ plugins/inputs/elasticsearch/sample.conf | 6 + plugins/inputs/elasticsearch/testdata_test.go | 106 ++++ plugins/inputs/mysql/README.md | 3 + plugins/inputs/mysql/mysql.go | 25 +- plugins/inputs/mysql/sample.conf | 3 + plugins/outputs/aiven-postgresql/README.md | 33 ++ .../outputs/aiven-postgresql/postgresql.go | 427 +++++++++++++++ .../aiven-postgresql/postgresql_test.go | 68 +++ plugins/outputs/all/aiven_postgresql.go | 5 + .../outputs/prometheus_client/v1/collector.go | 2 +- plugins/serializers/prometheus/convert.go | 3 +- .../serializers/prometheus/prometheus_test.go | 20 +- .../prometheusremotewrite_test.go | 8 +- 32 files changed, 2334 insertions(+), 19 deletions(-) create mode 100644 AIVEN_CHANGES.md create mode 100644 plugins/inputs/aiven-procstat/README.md create mode 100644 plugins/inputs/aiven-procstat/dev/telegraf.conf create mode 100644 plugins/inputs/aiven-procstat/native_finder.go create mode 100644 plugins/inputs/aiven-procstat/native_finder_notwindows.go create mode 100644 plugins/inputs/aiven-procstat/native_finder_test.go create mode 100644 plugins/inputs/aiven-procstat/native_finder_windows.go create mode 100644 plugins/inputs/aiven-procstat/native_finder_windows_test.go create mode 100644 plugins/inputs/aiven-procstat/pgrep.go create mode 100644 plugins/inputs/aiven-procstat/process.go create mode 100644 plugins/inputs/aiven-procstat/procstat.go create mode 100644 plugins/inputs/aiven-procstat/procstat_test.go create mode 100644 plugins/inputs/aiven-procstat/win_service_notwindows.go create mode 100644 plugins/inputs/aiven-procstat/win_service_windows.go create mode 100644 plugins/inputs/all/aiven_procstat.go create mode 100644 plugins/outputs/aiven-postgresql/README.md create mode 100644 plugins/outputs/aiven-postgresql/postgresql.go create mode 100644 plugins/outputs/aiven-postgresql/postgresql_test.go create mode 100644 plugins/outputs/all/aiven_postgresql.go diff --git a/AIVEN_CHANGES.md b/AIVEN_CHANGES.md new file mode 100644 index 0000000000000..83df7637d8888 --- /dev/null +++ b/AIVEN_CHANGES.md @@ -0,0 +1,36 @@ +# Reasons for this fork + +## Input Plugins + +### Elasticsearch + +* add cross cluster replication metrics ( they dont work for elasticsearch but its a first step until we have an opensearch plugin ) + +### Aiven Procstat + +* basically a clone of procstat containing incompatible changes that are likely not upstreamable +* needed a way to parse multiple unit files in invocation of `systemctl` for performance Reasons +* the way that telegraf provides ( globbing ) does not fit our systemd unit structure +* we need to check units inside of containers + +### MySQL + +* added aggregated IOPerf Stats ( probably upstreamable ) + +## Output Plugins + +### Aiven Postgresql + +* added postgresql output plugin from scratch to work with timescaledb ( probably upstreamable, although influxdata is not keen on supporting timescaledb as it seems ) +* predates the upstream postgresql plugin and was subsequently moved to the aiven prefix + +### Prometheus Client + +* added incompatible metric name replacements ( not sure exactely why it was needed, but its now our api and we have to keep it ) + +## Serializers + +### Prometheus and Prometheus Remote Write + +* changes to make `Plugins.Prometheus Client` work for the same reasons as stated there + diff --git a/agent/agent.go b/agent/agent.go index 5d7cc033fe5b9..8d4b5c684967d 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -503,7 +503,7 @@ func (a *Agent) testRunInputs( // Run plugins that require multiple gathers to calculate rate // and delta metrics twice. switch input.Config.Name { - case "cpu", "mongodb", "procstat": + case "cpu", "mongodb", "procstat", "aiven-procstat": nulAcc := NewAccumulator(input, nul) nulAcc.SetPrecision(getPrecision(precision, interval)) if err := input.Input.Gather(nulAcc); err != nil { diff --git a/plugins/inputs/aiven-procstat/README.md b/plugins/inputs/aiven-procstat/README.md new file mode 100644 index 0000000000000..4c767130d817b --- /dev/null +++ b/plugins/inputs/aiven-procstat/README.md @@ -0,0 +1,8 @@ +# Aiven Procstat Input Plugin + +Was copied from the procstat input. Divergences: + +* add 'systemd_units' configuration parameter. A list that specifies the units to fetch the pids from +* to that end it parses the output from `systemctl status` in one go instead of invoking `systemctl status [...]` for every unit +* it is not possible to use the globbing feature of the original `procstat` input for several reasons, one being that the tags are not expanded with the glob, the other is that the units we are targeting are not named glob friendly + diff --git a/plugins/inputs/aiven-procstat/dev/telegraf.conf b/plugins/inputs/aiven-procstat/dev/telegraf.conf new file mode 100644 index 0000000000000..63b150d7cc125 --- /dev/null +++ b/plugins/inputs/aiven-procstat/dev/telegraf.conf @@ -0,0 +1,9 @@ +[agent] + interval="1s" + flush_interval="1s" + +[[inputs.procstat]] + exe = "telegraf" + +[[outputs.file]] + files = ["stdout"] diff --git a/plugins/inputs/aiven-procstat/native_finder.go b/plugins/inputs/aiven-procstat/native_finder.go new file mode 100644 index 0000000000000..b6ccc564b2476 --- /dev/null +++ b/plugins/inputs/aiven-procstat/native_finder.go @@ -0,0 +1,96 @@ +package aiven_procstat + +import ( + "fmt" + "io/ioutil" + "regexp" + "strconv" + "strings" + + "github.com/shirou/gopsutil/v3/process" +) + +//NativeFinder uses gopsutil to find processes +type NativeFinder struct { +} + +//NewNativeFinder ... +func NewNativeFinder() (PIDFinder, error) { + return &NativeFinder{}, nil +} + +//Uid will return all pids for the given user +func (pg *NativeFinder) Uid(user string) ([]PID, error) { + var dst []PID + procs, err := process.Processes() + if err != nil { + return dst, err + } + for _, p := range procs { + username, err := p.Username() + if err != nil { + //skip, this can happen if we don't have permissions or + //the pid no longer exists + continue + } + if username == user { + dst = append(dst, PID(p.Pid)) + } + } + return dst, nil +} + +//PidFile returns the pid from the pid file given. +func (pg *NativeFinder) PidFile(path string) ([]PID, error) { + var pids []PID + pidString, err := ioutil.ReadFile(path) + if err != nil { + return pids, fmt.Errorf("Failed to read pidfile '%s'. Error: '%s'", + path, err) + } + pid, err := strconv.ParseInt(strings.TrimSpace(string(pidString)), 10, 32) + if err != nil { + return pids, err + } + pids = append(pids, PID(pid)) + return pids, nil + +} + +//FullPattern matches on the command line when the process was executed +func (pg *NativeFinder) FullPattern(pattern string) ([]PID, error) { + var pids []PID + regxPattern, err := regexp.Compile(pattern) + if err != nil { + return pids, err + } + procs, err := pg.FastProcessList() + if err != nil { + return pids, err + } + for _, p := range procs { + cmd, err := p.Cmdline() + if err != nil { + //skip, this can be caused by the pid no longer existing + //or you having no permissions to access it + continue + } + if regxPattern.MatchString(cmd) { + pids = append(pids, PID(p.Pid)) + } + } + return pids, err +} + +func (pg *NativeFinder) FastProcessList() ([]*process.Process, error) { + pids, err := process.Pids() + if err != nil { + return nil, err + } + + result := make([]*process.Process, len(pids)) + for i, pid := range pids { + result[i] = &process.Process{Pid: pid} + } + return result, nil +} diff --git a/plugins/inputs/aiven-procstat/native_finder_notwindows.go b/plugins/inputs/aiven-procstat/native_finder_notwindows.go new file mode 100644 index 0000000000000..83b3d5407541b --- /dev/null +++ b/plugins/inputs/aiven-procstat/native_finder_notwindows.go @@ -0,0 +1,32 @@ +// +build !windows + +package aiven_procstat + +import ( + "regexp" +) + +//Pattern matches on the process name +func (pg *NativeFinder) Pattern(pattern string) ([]PID, error) { + var pids []PID + regxPattern, err := regexp.Compile(pattern) + if err != nil { + return pids, err + } + procs, err := pg.FastProcessList() + if err != nil { + return pids, err + } + for _, p := range procs { + name, err := p.Exe() + if err != nil { + //skip, this can be caused by the pid no longer existing + //or you having no permissions to access it + continue + } + if regxPattern.MatchString(name) { + pids = append(pids, PID(p.Pid)) + } + } + return pids, err +} diff --git a/plugins/inputs/aiven-procstat/native_finder_test.go b/plugins/inputs/aiven-procstat/native_finder_test.go new file mode 100644 index 0000000000000..cd9701e4eb968 --- /dev/null +++ b/plugins/inputs/aiven-procstat/native_finder_test.go @@ -0,0 +1,29 @@ +package aiven_procstat + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func BenchmarkPattern(b *testing.B) { + f, err := NewNativeFinder() + require.NoError(b, err) + for n := 0; n < b.N; n++ { + _, err := f.Pattern(".*") + if err != nil { + panic(err) + } + } +} + +func BenchmarkFullPattern(b *testing.B) { + f, err := NewNativeFinder() + require.NoError(b, err) + for n := 0; n < b.N; n++ { + _, err := f.FullPattern(".*") + if err != nil { + panic(err) + } + } +} diff --git a/plugins/inputs/aiven-procstat/native_finder_windows.go b/plugins/inputs/aiven-procstat/native_finder_windows.go new file mode 100644 index 0000000000000..82389f3d34f23 --- /dev/null +++ b/plugins/inputs/aiven-procstat/native_finder_windows.go @@ -0,0 +1,30 @@ +package aiven_procstat + +import ( + "regexp" +) + +// Pattern matches on the process name +func (pg *NativeFinder) Pattern(pattern string) ([]PID, error) { + var pids []PID + regxPattern, err := regexp.Compile(pattern) + if err != nil { + return pids, err + } + procs, err := pg.FastProcessList() + if err != nil { + return pids, err + } + for _, p := range procs { + name, err := p.Name() + if err != nil { + //skip, this can be caused by the pid no longer existing + //or you having no permissions to access it + continue + } + if regxPattern.MatchString(name) { + pids = append(pids, PID(p.Pid)) + } + } + return pids, err +} diff --git a/plugins/inputs/aiven-procstat/native_finder_windows_test.go b/plugins/inputs/aiven-procstat/native_finder_windows_test.go new file mode 100644 index 0000000000000..e4798fc937272 --- /dev/null +++ b/plugins/inputs/aiven-procstat/native_finder_windows_test.go @@ -0,0 +1,49 @@ +package aiven_procstat + +import ( + "fmt" + "testing" + + "os/user" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestGather_RealPattern(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + pg, err := NewNativeFinder() + require.NoError(t, err) + pids, err := pg.Pattern(`procstat`) + require.NoError(t, err) + fmt.Println(pids) + assert.Equal(t, len(pids) > 0, true) +} + +func TestGather_RealFullPattern(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + pg, err := NewNativeFinder() + require.NoError(t, err) + pids, err := pg.FullPattern(`%procstat%`) + require.NoError(t, err) + fmt.Println(pids) + assert.Equal(t, len(pids) > 0, true) +} + +func TestGather_RealUser(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + user, err := user.Current() + require.NoError(t, err) + pg, err := NewNativeFinder() + require.NoError(t, err) + pids, err := pg.Uid(user.Username) + require.NoError(t, err) + fmt.Println(pids) + assert.Equal(t, len(pids) > 0, true) +} diff --git a/plugins/inputs/aiven-procstat/pgrep.go b/plugins/inputs/aiven-procstat/pgrep.go new file mode 100644 index 0000000000000..ba511ee473e1f --- /dev/null +++ b/plugins/inputs/aiven-procstat/pgrep.go @@ -0,0 +1,90 @@ +package aiven_procstat + +import ( + "fmt" + "io/ioutil" + "os/exec" + "strconv" + "strings" + + "github.com/influxdata/telegraf/internal" +) + +// Implementation of PIDGatherer that execs pgrep to find processes +type Pgrep struct { + path string +} + +func NewPgrep() (PIDFinder, error) { + path, err := exec.LookPath("pgrep") + if err != nil { + return nil, fmt.Errorf("Could not find pgrep binary: %s", err) + } + return &Pgrep{path}, nil +} + +func (pg *Pgrep) PidFile(path string) ([]PID, error) { + var pids []PID + pidString, err := ioutil.ReadFile(path) + if err != nil { + return pids, fmt.Errorf("Failed to read pidfile '%s'. Error: '%s'", + path, err) + } + pid, err := strconv.ParseInt(strings.TrimSpace(string(pidString)), 10, 32) + if err != nil { + return pids, err + } + pids = append(pids, PID(pid)) + return pids, nil +} + +func (pg *Pgrep) Pattern(pattern string) ([]PID, error) { + args := []string{pattern} + return find(pg.path, args) +} + +func (pg *Pgrep) Uid(user string) ([]PID, error) { + args := []string{"-u", user} + return find(pg.path, args) +} + +func (pg *Pgrep) FullPattern(pattern string) ([]PID, error) { + args := []string{"-f", pattern} + return find(pg.path, args) +} + +func find(path string, args []string) ([]PID, error) { + out, err := run(path, args) + if err != nil { + return nil, err + } + + return parseOutput(out) +} + +func run(path string, args []string) (string, error) { + out, err := exec.Command(path, args...).Output() + + //if exit code 1, ie no processes found, do not return error + if i, _ := internal.ExitStatus(err); i == 1 { + return "", nil + } + + if err != nil { + return "", fmt.Errorf("Error running %s: %s", path, err) + } + return string(out), err +} + +func parseOutput(out string) ([]PID, error) { + pids := []PID{} + fields := strings.Fields(out) + for _, field := range fields { + pid, err := strconv.ParseInt(field, 10, 32) + if err != nil { + return nil, err + } + pids = append(pids, PID(pid)) + } + return pids, nil +} diff --git a/plugins/inputs/aiven-procstat/process.go b/plugins/inputs/aiven-procstat/process.go new file mode 100644 index 0000000000000..5cc0f4496d04b --- /dev/null +++ b/plugins/inputs/aiven-procstat/process.go @@ -0,0 +1,77 @@ +package aiven_procstat + +import ( + "fmt" + "time" + + "github.com/shirou/gopsutil/v3/cpu" + "github.com/shirou/gopsutil/v3/process" +) + +type Process interface { + PID() PID + Tags() map[string]string + + PageFaults() (*process.PageFaultsStat, error) + IOCounters() (*process.IOCountersStat, error) + MemoryInfo() (*process.MemoryInfoStat, error) + Name() (string, error) + Cmdline() (string, error) + NumCtxSwitches() (*process.NumCtxSwitchesStat, error) + NumFDs() (int32, error) + NumThreads() (int32, error) + Percent(interval time.Duration) (float64, error) + MemoryPercent() (float32, error) + Times() (*cpu.TimesStat, error) + RlimitUsage(bool) ([]process.RlimitStat, error) + Username() (string, error) + CreateTime() (int64, error) +} + +type PIDFinder interface { + PidFile(path string) ([]PID, error) + Pattern(pattern string) ([]PID, error) + Uid(user string) ([]PID, error) + FullPattern(path string) ([]PID, error) +} + +type Proc struct { + hasCPUTimes bool + tags map[string]string + *process.Process +} + +func NewProc(pid PID) (Process, error) { + process, err := process.NewProcess(int32(pid)) + if err != nil { + return nil, err + } + + proc := &Proc{ + Process: process, + hasCPUTimes: false, + tags: make(map[string]string), + } + return proc, nil +} + +func (p *Proc) Tags() map[string]string { + return p.tags +} + +func (p *Proc) PID() PID { + return PID(p.Process.Pid) +} + +func (p *Proc) Username() (string, error) { + return p.Process.Username() +} + +func (p *Proc) Percent(interval time.Duration) (float64, error) { + cpu_perc, err := p.Process.Percent(time.Duration(0)) + if !p.hasCPUTimes && err == nil { + p.hasCPUTimes = true + return 0, fmt.Errorf("Must call Percent twice to compute percent cpu.") + } + return cpu_perc, err +} diff --git a/plugins/inputs/aiven-procstat/procstat.go b/plugins/inputs/aiven-procstat/procstat.go new file mode 100644 index 0000000000000..4289337231c9a --- /dev/null +++ b/plugins/inputs/aiven-procstat/procstat.go @@ -0,0 +1,507 @@ +package aiven_procstat + +import ( + "bytes" + "fmt" + "io/ioutil" + "os/exec" + "path/filepath" + "regexp" + "strconv" + "strings" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/inputs" + "github.com/shirou/gopsutil/v3/process" +) + +var ( + defaultPIDFinder = NewPgrep + defaultProcess = NewProc +) + +type PID int32 + +type Procstat struct { + PidFinder string `toml:"pid_finder"` + PidFile string `toml:"pid_file"` + Exe string + Pattern string + Prefix string + CmdLineTag bool `toml:"cmdline_tag"` + ProcessName string + User string + SystemdUnit string + SystemdUnits []string + CGroup string `toml:"cgroup"` + PidTag bool + WinService string `toml:"win_service"` + + finder PIDFinder + + createPIDFinder func() (PIDFinder, error) + procs map[PID]Process + createProcess func(PID) (Process, error) +} + +var sampleConfig = ` + ## PID file to monitor process + pid_file = "/var/run/nginx.pid" + ## executable name (ie, pgrep ) + # exe = "nginx" + ## pattern as argument for pgrep (ie, pgrep -f ) + # pattern = "nginx" + ## user as argument for pgrep (ie, pgrep -u ) + # user = "nginx" + ## Systemd unit name. Use systemd_units when getting metrics + ## for several units. + # systemd_unit = "nginx.service" + ## Systemd unit name array + # systemd_units = ["nginx.service", "haproxy.service"] + ## CGroup name or path + # cgroup = "systemd/system.slice/nginx.service" + + ## Windows service name + # win_service = "" + + ## override for process_name + ## This is optional; default is sourced from /proc//status + # process_name = "bar" + + ## Field name prefix + # prefix = "" + + ## When true add the full cmdline as a tag. + # cmdline_tag = false + + ## Add the PID as a tag instead of as a field. When collecting multiple + ## processes with otherwise matching tags this setting should be enabled to + ## ensure each process has a unique identity. + ## + ## Enabling this option may result in a large number of series, especially + ## when processes have a short lifetime. + # pid_tag = false + + ## Method to use when finding process IDs. Can be one of 'pgrep', or + ## 'native'. The pgrep finder calls the pgrep executable in the PATH while + ## the native finder performs the search directly in a manor dependent on the + ## platform. Default is 'pgrep' + # pid_finder = "pgrep" +` + +func (_ *Procstat) SampleConfig() string { + return sampleConfig +} + +func (_ *Procstat) Description() string { + return "Monitor process cpu and memory usage" +} + +func (p *Procstat) Gather(acc telegraf.Accumulator) error { + if p.createPIDFinder == nil { + switch p.PidFinder { + case "native": + p.createPIDFinder = NewNativeFinder + case "pgrep": + p.createPIDFinder = NewPgrep + default: + p.PidFinder = "pgrep" + p.createPIDFinder = defaultPIDFinder + } + + } + if p.createProcess == nil { + p.createProcess = defaultProcess + } + + procs, err := p.updateProcesses(acc, p.procs) + if err != nil { + acc.AddError(fmt.Errorf("E! Error: procstat getting process, exe: [%s] pidfile: [%s] pattern: [%s] user: [%s] %s", + p.Exe, p.PidFile, p.Pattern, p.User, err.Error())) + } + p.procs = procs + + for _, proc := range p.procs { + p.addMetrics(proc, acc) + } + + return nil +} + +// Add metrics a single Process +func (p *Procstat) addMetrics(proc Process, acc telegraf.Accumulator) { + var prefix string + if p.Prefix != "" { + prefix = p.Prefix + "_" + } + + fields := map[string]interface{}{} + + //If process_name tag is not already set, set to actual name + if _, nameInTags := proc.Tags()["process_name"]; !nameInTags { + name, err := proc.Name() + if err == nil { + proc.Tags()["process_name"] = name + } + } + + //If user tag is not already set, set to actual name + if _, ok := proc.Tags()["user"]; !ok { + user, err := proc.Username() + if err == nil { + proc.Tags()["user"] = user + } + } + + //If pid is not present as a tag, include it as a field. + if _, pidInTags := proc.Tags()["pid"]; !pidInTags { + fields["pid"] = int32(proc.PID()) + } + + //If cmd_line tag is true and it is not already set add cmdline as a tag + if p.CmdLineTag { + if _, ok := proc.Tags()["cmdline"]; !ok { + Cmdline, err := proc.Cmdline() + if err == nil { + proc.Tags()["cmdline"] = Cmdline + } + } + } + + numThreads, err := proc.NumThreads() + if err == nil { + fields[prefix+"num_threads"] = numThreads + } + + fds, err := proc.NumFDs() + if err == nil { + fields[prefix+"num_fds"] = fds + } + + ctx, err := proc.NumCtxSwitches() + if err == nil { + fields[prefix+"voluntary_context_switches"] = ctx.Voluntary + fields[prefix+"involuntary_context_switches"] = ctx.Involuntary + } + + io, err := proc.IOCounters() + if err == nil { + fields[prefix+"read_count"] = io.ReadCount + fields[prefix+"write_count"] = io.WriteCount + fields[prefix+"read_bytes"] = io.ReadBytes + fields[prefix+"write_bytes"] = io.WriteBytes + } + + createdAt, err := proc.CreateTime() //Returns epoch in ms + if err == nil { + fields[prefix+"created_at"] = createdAt * 1000000 //Convert ms to ns + } + + cpu_time, err := proc.Times() + if err == nil { + fields[prefix+"cpu_time_user"] = cpu_time.User + fields[prefix+"cpu_time_system"] = cpu_time.System + fields[prefix+"cpu_time_idle"] = cpu_time.Idle + fields[prefix+"cpu_time_nice"] = cpu_time.Nice + fields[prefix+"cpu_time_iowait"] = cpu_time.Iowait + fields[prefix+"cpu_time_irq"] = cpu_time.Irq + fields[prefix+"cpu_time_soft_irq"] = cpu_time.Softirq + fields[prefix+"cpu_time_steal"] = cpu_time.Steal + fields[prefix+"cpu_time_guest"] = cpu_time.Guest + fields[prefix+"cpu_time_guest_nice"] = cpu_time.GuestNice + } + + cpu_perc, err := proc.Percent(time.Duration(0)) + if err == nil { + fields[prefix+"cpu_usage"] = cpu_perc + } + + mem, err := proc.MemoryInfo() + if err == nil { + fields[prefix+"memory_rss"] = mem.RSS + fields[prefix+"memory_vms"] = mem.VMS + fields[prefix+"memory_swap"] = mem.Swap + fields[prefix+"memory_data"] = mem.Data + fields[prefix+"memory_stack"] = mem.Stack + fields[prefix+"memory_locked"] = mem.Locked + } + + mem_perc, err := proc.MemoryPercent() + if err == nil { + fields[prefix+"memory_usage"] = mem_perc + } + + rlims, err := proc.RlimitUsage(true) + if err == nil { + for _, rlim := range rlims { + var name string + switch rlim.Resource { + case process.RLIMIT_CPU: + name = "cpu_time" + case process.RLIMIT_DATA: + name = "memory_data" + case process.RLIMIT_STACK: + name = "memory_stack" + case process.RLIMIT_RSS: + name = "memory_rss" + case process.RLIMIT_NOFILE: + name = "num_fds" + case process.RLIMIT_MEMLOCK: + name = "memory_locked" + case process.RLIMIT_AS: + name = "memory_vms" + case process.RLIMIT_LOCKS: + name = "file_locks" + case process.RLIMIT_SIGPENDING: + name = "signals_pending" + case process.RLIMIT_NICE: + name = "nice_priority" + case process.RLIMIT_RTPRIO: + name = "realtime_priority" + default: + continue + } + + fields[prefix+"rlimit_"+name+"_soft"] = rlim.Soft + fields[prefix+"rlimit_"+name+"_hard"] = rlim.Hard + if name != "file_locks" { // gopsutil doesn't currently track the used file locks count + fields[prefix+name] = rlim.Used + } + } + } + + acc.AddFields("procstat", fields, proc.Tags()) +} + +// Update monitored Processes +func (p *Procstat) updateProcesses(acc telegraf.Accumulator, prevInfo map[PID]Process) (map[PID]Process, error) { + pidsArray, tagsArray, err := p.findPids() + if err != nil { + return nil, err + } + + procs := make(map[PID]Process, len(prevInfo)) + + for index, pids := range pidsArray { + tags := tagsArray[index] + + finderTags := make(map[string]string) + for k, v := range tags { + finderTags[k] = v + } + + // Add metric for the number of matched pids + fields := make(map[string]interface{}) + finderTags["pid_finder"] = p.PidFinder + fields["pid_count"] = len(pids) + acc.AddFields("procstat_lookup", fields, tags) + + for _, pid := range pids { + info, ok := prevInfo[pid] + if ok { + // Assumption: if a process has no name, it probably does not exist + if name, _ := info.Name(); name == "" { + continue + } + procs[pid] = info + } else { + proc, err := p.createProcess(pid) + if err != nil { + // No problem; process may have ended after we found it + continue + } + // Assumption: if a process has no name, it probably does not exist + if name, _ := proc.Name(); name == "" { + continue + } + procs[pid] = proc + + // Add initial tags + for k, v := range tags { + proc.Tags()[k] = v + } + + // Add pid tag if needed + if p.PidTag { + proc.Tags()["pid"] = strconv.Itoa(int(pid)) + } + if p.ProcessName != "" { + proc.Tags()["process_name"] = p.ProcessName + } + } + } + } + return procs, nil +} + +// Create and return PIDGatherer lazily +func (p *Procstat) getPIDFinder() (PIDFinder, error) { + if p.finder == nil { + f, err := p.createPIDFinder() + if err != nil { + return nil, err + } + p.finder = f + } + return p.finder, nil +} + +// Get matching PIDs and their initial tags +func (p *Procstat) findPids() ([][]PID, []map[string]string, error) { + var pidsArray [][]PID + var tagsArray []map[string]string + var pids []PID + tags := make(map[string]string) + var err error + + f, err := p.getPIDFinder() + if err != nil { + return nil, nil, err + } + + if p.PidFile != "" { + pids, err = f.PidFile(p.PidFile) + tags = map[string]string{"pidfile": p.PidFile} + } else if p.Exe != "" { + pids, err = f.Pattern(p.Exe) + tags = map[string]string{"exe": p.Exe} + } else if p.Pattern != "" { + pids, err = f.FullPattern(p.Pattern) + tags = map[string]string{"pattern": p.Pattern} + } else if p.User != "" { + pids, err = f.Uid(p.User) + tags = map[string]string{"user": p.User} + } else if p.SystemdUnit != "" { + pidsArray, tagsArray, err = p.systemdUnitPIDs([]string{p.SystemdUnit}) + } else if p.SystemdUnits != nil { + pidsArray, tagsArray, err = p.systemdUnitPIDs(p.SystemdUnits) + } else if p.CGroup != "" { + pids, err = p.cgroupPIDs() + tags = map[string]string{"cgroup": p.CGroup} + } else if p.WinService != "" { + pids, err = p.winServicePIDs() + tags = map[string]string{"win_service": p.WinService} + } else { + err = fmt.Errorf("Either exe, pid_file, user, pattern, systemd_unit, systemd_units, or cgroup must be specified") + } + + if pids != nil { + pidsArray = [][]PID{pids} + tagsArray = []map[string]string{tags} + } + + return pidsArray, tagsArray, err +} + +// execCommand is so tests can mock out exec.Command usage. +var execCommand = exec.Command + +func (p *Procstat) systemdUnitPIDs(units []string) ([][]PID, []map[string]string, error) { + var pidsArray [][]PID + var tagsArray []map[string]string + var pids []PID + var tags map[string]string + // Lines with PID look like " ├─ 123 /usr/bin/foo" or " └─4567 /usr/bin/bar" + // (possibly with some non-whitespace leading characters) + pidMatcher, err := regexp.Compile(`.*?[├└]─\s*(\d+)\s+\S+.*`) + if err != nil { + return nil, nil, err + } + // Use systemctl status and parse the pids from there. This provides output that is + // slightly more tedious to parse than "systemctl show " output but this allows + // getting all pids for the unit and it works for systemd containers. Also, when + // several systemd units are defined this performs much better as we can make do with + // just a single systemctl invocation instead of one per unit. + cmd := execCommand("systemctl", "status") + out, err := cmd.Output() + if err != nil { + return nil, nil, err + } +LINES: + for _, line := range bytes.Split(out, []byte{'\n'}) { + line := bytes.TrimRight(line, "\t ") + for _, unit := range units { + // Lines with unit name look like " ├─foo.service" or " └─bar.service" + unitWithSuffix := unit + if !strings.HasSuffix(unit, ".service") { + unitWithSuffix = unit + ".service" + } + if bytes.HasSuffix(line, []byte("─"+unitWithSuffix)) { + if tags != nil && pids != nil { + pidsArray = append(pidsArray, pids) + tagsArray = append(tagsArray, tags) + pids = nil + } + tags = map[string]string{"systemd_unit": unit} + continue LINES + } + } + + matches := pidMatcher.FindSubmatch(line) + if matches == nil { + // This was not a pid line. Presumably new unit but not one we're interested in. + // If we had matched some pids for a unit record those + if tags != nil && pids != nil { + pidsArray = append(pidsArray, pids) + tagsArray = append(tagsArray, tags) + } + pids = nil + tags = nil + } else if tags != nil { + // Previous line started a section for a unit we're tracking or it was another + // pid for that unit. Get the pid from this line too. + pid, err := strconv.ParseInt(string(matches[1]), 10, 32) + if err != nil { + return nil, nil, fmt.Errorf("invalid pid '%s'", matches[1]) + } + pids = append(pids, PID(pid)) + } + } + return pidsArray, tagsArray, nil +} + +func (p *Procstat) cgroupPIDs() ([]PID, error) { + var pids []PID + + procsPath := p.CGroup + if procsPath[0] != '/' { + procsPath = "/sys/fs/cgroup/" + procsPath + } + procsPath = filepath.Join(procsPath, "cgroup.procs") + out, err := ioutil.ReadFile(procsPath) + if err != nil { + return nil, err + } + for _, pidBS := range bytes.Split(out, []byte{'\n'}) { + if len(pidBS) == 0 { + continue + } + pid, err := strconv.ParseInt(string(pidBS), 10, 32) + if err != nil { + return nil, fmt.Errorf("invalid pid '%s'", pidBS) + } + pids = append(pids, PID(pid)) + } + + return pids, nil +} + +func (p *Procstat) winServicePIDs() ([]PID, error) { + var pids []PID + + pid, err := queryPidWithWinServiceName(p.WinService) + if err != nil { + return pids, err + } + + pids = append(pids, PID(pid)) + + return pids, nil +} + +func init() { + inputs.Add("aiven-procstat", func() telegraf.Input { + return &Procstat{} + }) +} diff --git a/plugins/inputs/aiven-procstat/procstat_test.go b/plugins/inputs/aiven-procstat/procstat_test.go new file mode 100644 index 0000000000000..8204845f5fca3 --- /dev/null +++ b/plugins/inputs/aiven-procstat/procstat_test.go @@ -0,0 +1,465 @@ +package aiven_procstat + +import ( + "fmt" + "io/ioutil" + "os" + "os/exec" + "path/filepath" + "runtime" + "strings" + "testing" + "time" + + "github.com/influxdata/telegraf/testutil" + "github.com/shirou/gopsutil/v3/cpu" + "github.com/shirou/gopsutil/v3/process" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func init() { + execCommand = mockExecCommand +} +func mockExecCommand(arg0 string, args ...string) *exec.Cmd { + args = append([]string{"-test.run=TestMockExecCommand", "--", arg0}, args...) + cmd := exec.Command(os.Args[0], args...) + cmd.Stderr = os.Stderr + return cmd +} +func TestMockExecCommand(t *testing.T) { + var cmd []string + for _, arg := range os.Args { + if string(arg) == "--" { + cmd = []string{} + continue + } + if cmd == nil { + continue + } + cmd = append(cmd, string(arg)) + } + if cmd == nil { + return + } + cmdline := strings.Join(cmd, " ") + + if cmdline == "systemctl status" { + fmt.Printf(`● hostname + State: degraded + Jobs: 0 queued + Failed: 1 units + Since: Fri 2019-09-13 06:08:15 UTC; 50min ago + CGroup: / + ├─user.slice + │ └─user-1002.slice + │ ├─user@1002.service + │ │ └─init.scope + │ │ ├─3790 /usr/lib/systemd/systemd --user + │ │ └─3792 (sd-pam) + │ └─session-5.scope + │ ├─3819 sshd: root [priv] + │ ├─3822 sshd: root@pts/1 + │ ├─3823 -bash + │ ├─3858 sudo -i + │ ├─3860 -bash + │ ├─7445 systemctl status + │ └─7446 less + ├─init.scope + │ └─1 /usr/lib/systemd/systemd --switched-root --system --deserialize 31 + ├─system.slice + │ ├─systemd-udevd.service + │ │ └─455 /usr/lib/systemd/systemd-udevd + │ ├─dbus-broker.service + │ │ ├─538 /usr/bin/dbus-broker-launch --scope system --audit + │ │ └─539 dbus-broker --log 4 --controller 9 --machine-id 5305bf75e3144e22a1c4d297f26ec42d --max-bytes 536870912 --max-fds 4096 --max-matches 131072 --audit + │ ├─system-serial\x2dgetty.slice + │ │ └─serial-getty@ttyS0.service + │ │ └─739 /sbin/agetty -o -p -- \u --keep-baud 115200,38400,9600 ttyS0 vt220 + │ ├─TestGather_systemdUnitPIDs.service + │ │ └─11408 /usr/bin/foo + │ │ └─11420 /usr/bin/bar + │ ├─TestTrailingSpaces_systemdUnitPIDs.service + │ │ └─11428 /usr/bin/foo + │ ├─chronyd.service + │ │ └─1931 /usr/sbin/chronyd + └─machine.slice + └─machine-foo\x2dmgmt\x2d1.scope + └─payload + ├─init.scope + │ └─2334 /usr/lib/systemd/systemd + └─system.slice + └─foo.service + └─2371 /bin/python3 -m aiven.almond.almond +`) + os.Exit(0) + } + + fmt.Printf("command not found\n") + os.Exit(1) +} + +type testPgrep struct { + pids []PID + err error +} + +func pidFinder(pids []PID, err error) func() (PIDFinder, error) { + return func() (PIDFinder, error) { + return &testPgrep{ + pids: pids, + err: err, + }, nil + } +} + +func (pg *testPgrep) PidFile(path string) ([]PID, error) { + return pg.pids, pg.err +} + +func (p *testProc) Cmdline() (string, error) { + return "test_proc", nil +} + +func (pg *testPgrep) Pattern(pattern string) ([]PID, error) { + return pg.pids, pg.err +} + +func (pg *testPgrep) Uid(user string) ([]PID, error) { + return pg.pids, pg.err +} + +func (pg *testPgrep) FullPattern(pattern string) ([]PID, error) { + return pg.pids, pg.err +} + +type testProc struct { + pid PID + tags map[string]string +} + +func newTestProc(pid PID) (Process, error) { + proc := &testProc{ + tags: make(map[string]string), + } + return proc, nil +} + +func (p *testProc) PID() PID { + return p.pid +} + +func (p *testProc) Username() (string, error) { + return "testuser", nil +} + +func (p *testProc) Tags() map[string]string { + return p.tags +} + +func (p *testProc) PageFaults() (*process.PageFaultsStat, error) { + return &process.PageFaultsStat{}, nil +} + +func (p *testProc) IOCounters() (*process.IOCountersStat, error) { + return &process.IOCountersStat{}, nil +} + +func (p *testProc) MemoryInfo() (*process.MemoryInfoStat, error) { + return &process.MemoryInfoStat{}, nil +} + +func (p *testProc) Name() (string, error) { + return "test_proc", nil +} + +func (p *testProc) NumCtxSwitches() (*process.NumCtxSwitchesStat, error) { + return &process.NumCtxSwitchesStat{}, nil +} + +func (p *testProc) NumFDs() (int32, error) { + return 0, nil +} + +func (p *testProc) NumThreads() (int32, error) { + return 0, nil +} + +func (p *testProc) Percent(interval time.Duration) (float64, error) { + return 0, nil +} + +func (p *testProc) MemoryPercent() (float32, error) { + return 0, nil +} + +func (p *testProc) CreateTime() (int64, error) { + return 0, nil +} + +func (p *testProc) Times() (*cpu.TimesStat, error) { + return &cpu.TimesStat{}, nil +} + +func (p *testProc) RlimitUsage(gatherUsage bool) ([]process.RlimitStat, error) { + return []process.RlimitStat{}, nil +} + +var pid PID = PID(42) +var exe string = "foo" + +func TestGather_CreateProcessErrorOk(t *testing.T) { + var acc testutil.Accumulator + + p := Procstat{ + Exe: exe, + createPIDFinder: pidFinder([]PID{pid}, nil), + createProcess: func(PID) (Process, error) { + return nil, fmt.Errorf("createProcess error") + }, + } + require.NoError(t, acc.GatherError(p.Gather)) +} + +func TestGather_CreatePIDFinderError(t *testing.T) { + var acc testutil.Accumulator + + p := Procstat{ + createPIDFinder: func() (PIDFinder, error) { + return nil, fmt.Errorf("createPIDFinder error") + }, + createProcess: newTestProc, + } + require.Error(t, acc.GatherError(p.Gather)) +} + +func TestGather_ProcessName(t *testing.T) { + var acc testutil.Accumulator + + p := Procstat{ + Exe: exe, + ProcessName: "custom_name", + createPIDFinder: pidFinder([]PID{pid}, nil), + createProcess: newTestProc, + } + require.NoError(t, acc.GatherError(p.Gather)) + + assert.Equal(t, "custom_name", acc.TagValue("procstat", "process_name")) +} + +func TestGather_NoProcessNameUsesReal(t *testing.T) { + var acc testutil.Accumulator + pid := PID(os.Getpid()) + + p := Procstat{ + Exe: exe, + createPIDFinder: pidFinder([]PID{pid}, nil), + createProcess: newTestProc, + } + require.NoError(t, acc.GatherError(p.Gather)) + + assert.True(t, acc.HasTag("procstat", "process_name")) +} + +func TestGather_NoPidTag(t *testing.T) { + var acc testutil.Accumulator + + p := Procstat{ + Exe: exe, + createPIDFinder: pidFinder([]PID{pid}, nil), + createProcess: newTestProc, + } + require.NoError(t, acc.GatherError(p.Gather)) + assert.True(t, acc.HasInt32Field("procstat", "pid")) + assert.False(t, acc.HasTag("procstat", "pid")) +} + +func TestGather_PidTag(t *testing.T) { + var acc testutil.Accumulator + + p := Procstat{ + Exe: exe, + PidTag: true, + createPIDFinder: pidFinder([]PID{pid}, nil), + createProcess: newTestProc, + } + require.NoError(t, acc.GatherError(p.Gather)) + assert.Equal(t, "42", acc.TagValue("procstat", "pid")) + assert.False(t, acc.HasInt32Field("procstat", "pid")) +} + +func TestGather_Prefix(t *testing.T) { + var acc testutil.Accumulator + + p := Procstat{ + Exe: exe, + Prefix: "custom_prefix", + createPIDFinder: pidFinder([]PID{pid}, nil), + createProcess: newTestProc, + } + require.NoError(t, acc.GatherError(p.Gather)) + assert.True(t, acc.HasInt32Field("procstat", "custom_prefix_num_fds")) +} + +func TestGather_Exe(t *testing.T) { + var acc testutil.Accumulator + + p := Procstat{ + Exe: exe, + createPIDFinder: pidFinder([]PID{pid}, nil), + createProcess: newTestProc, + } + require.NoError(t, acc.GatherError(p.Gather)) + + assert.Equal(t, exe, acc.TagValue("procstat", "exe")) +} + +func TestGather_User(t *testing.T) { + var acc testutil.Accumulator + user := "ada" + + p := Procstat{ + User: user, + createPIDFinder: pidFinder([]PID{pid}, nil), + createProcess: newTestProc, + } + require.NoError(t, acc.GatherError(p.Gather)) + + assert.Equal(t, user, acc.TagValue("procstat", "user")) +} + +func TestGather_Pattern(t *testing.T) { + var acc testutil.Accumulator + pattern := "foo" + + p := Procstat{ + Pattern: pattern, + createPIDFinder: pidFinder([]PID{pid}, nil), + createProcess: newTestProc, + } + require.NoError(t, acc.GatherError(p.Gather)) + + assert.Equal(t, pattern, acc.TagValue("procstat", "pattern")) +} + +func TestGather_MissingPidMethod(t *testing.T) { + var acc testutil.Accumulator + + p := Procstat{ + createPIDFinder: pidFinder([]PID{pid}, nil), + createProcess: newTestProc, + } + require.Error(t, acc.GatherError(p.Gather)) +} + +func TestGather_PidFile(t *testing.T) { + var acc testutil.Accumulator + pidfile := "/path/to/pidfile" + + p := Procstat{ + PidFile: pidfile, + createPIDFinder: pidFinder([]PID{pid}, nil), + createProcess: newTestProc, + } + require.NoError(t, acc.GatherError(p.Gather)) + + assert.Equal(t, pidfile, acc.TagValue("procstat", "pidfile")) +} + +func TestGather_PercentFirstPass(t *testing.T) { + var acc testutil.Accumulator + pid := PID(os.Getpid()) + + p := Procstat{ + Pattern: "foo", + PidTag: true, + createPIDFinder: pidFinder([]PID{pid}, nil), + createProcess: NewProc, + } + require.NoError(t, acc.GatherError(p.Gather)) + + assert.True(t, acc.HasFloatField("procstat", "cpu_time_user")) + assert.False(t, acc.HasFloatField("procstat", "cpu_usage")) +} + +func TestGather_PercentSecondPass(t *testing.T) { + var acc testutil.Accumulator + pid := PID(os.Getpid()) + + p := Procstat{ + Pattern: "foo", + PidTag: true, + createPIDFinder: pidFinder([]PID{pid}, nil), + createProcess: NewProc, + } + require.NoError(t, acc.GatherError(p.Gather)) + require.NoError(t, acc.GatherError(p.Gather)) + + assert.True(t, acc.HasFloatField("procstat", "cpu_time_user")) + assert.True(t, acc.HasFloatField("procstat", "cpu_usage")) +} + +func TestGather_systemdUnitPIDs(t *testing.T) { + p := Procstat{ + createPIDFinder: pidFinder([]PID{}, nil), + SystemdUnit: "TestGather_systemdUnitPIDs", + } + pidsArray, tagsArray, err := p.findPids() + require.NoError(t, err) + assert.Equal(t, []PID{11408, 11420}, pidsArray[0]) + assert.Equal(t, "TestGather_systemdUnitPIDs", tagsArray[0]["systemd_unit"]) + + p = Procstat{ + createPIDFinder: pidFinder([]PID{}, nil), + SystemdUnits: []string{"TestGather_systemdUnitPIDs", "foo.service"}, + } + pidsArray, tagsArray, err = p.findPids() + require.NoError(t, err) + assert.Equal(t, []PID{11408, 11420}, pidsArray[0]) + assert.Equal(t, "TestGather_systemdUnitPIDs", tagsArray[0]["systemd_unit"]) + assert.Equal(t, []PID{2371}, pidsArray[1]) + assert.Equal(t, "foo.service", tagsArray[1]["systemd_unit"]) +} + +func TestTrailingSpaces_systemdUnitPIDs(t *testing.T) { + p := Procstat{ + createPIDFinder: pidFinder([]PID{}, nil), + SystemdUnit: "TestTrailingSpaces_systemdUnitPIDs", + } + pidsArray, tagsArray, err := p.findPids() + require.NoError(t, err) + assert.Equal(t, []PID{11428}, pidsArray[0]) + assert.Equal(t, "TestTrailingSpaces_systemdUnitPIDs", tagsArray[0]["systemd_unit"]) +} + +func TestGather_cgroupPIDs(t *testing.T) { + //no cgroups in windows + if runtime.GOOS == "windows" { + t.Skip("no cgroups in windows") + } + td, err := ioutil.TempDir("", "") + require.NoError(t, err) + defer os.RemoveAll(td) + err = ioutil.WriteFile(filepath.Join(td, "cgroup.procs"), []byte("1234\n5678\n"), 0644) + require.NoError(t, err) + + p := Procstat{ + createPIDFinder: pidFinder([]PID{}, nil), + CGroup: td, + } + pidsArray, tagsArray, err := p.findPids() + require.NoError(t, err) + assert.Equal(t, []PID{1234, 5678}, pidsArray[0]) + assert.Equal(t, td, tagsArray[0]["cgroup"]) +} + +func TestProcstatLookupMetric(t *testing.T) { + p := Procstat{ + createPIDFinder: pidFinder([]PID{543}, nil), + Exe: "-Gsys", + } + var acc testutil.Accumulator + err := acc.GatherError(p.Gather) + require.NoError(t, err) + require.Equal(t, len(p.procs)+1, len(acc.Metrics)) +} diff --git a/plugins/inputs/aiven-procstat/win_service_notwindows.go b/plugins/inputs/aiven-procstat/win_service_notwindows.go new file mode 100644 index 0000000000000..5f083c44db22b --- /dev/null +++ b/plugins/inputs/aiven-procstat/win_service_notwindows.go @@ -0,0 +1,11 @@ +// +build !windows + +package aiven_procstat + +import ( + "fmt" +) + +func queryPidWithWinServiceName(winServiceName string) (uint32, error) { + return 0, fmt.Errorf("os not support win_service option") +} diff --git a/plugins/inputs/aiven-procstat/win_service_windows.go b/plugins/inputs/aiven-procstat/win_service_windows.go new file mode 100644 index 0000000000000..5ecedd885023f --- /dev/null +++ b/plugins/inputs/aiven-procstat/win_service_windows.go @@ -0,0 +1,48 @@ +// +build windows + +package aiven_procstat + +import ( + "unsafe" + + "golang.org/x/sys/windows" + "golang.org/x/sys/windows/svc/mgr" +) + +func getService(name string) (*mgr.Service, error) { + m, err := mgr.Connect() + if err != nil { + return nil, err + } + defer m.Disconnect() + + srv, err := m.OpenService(name) + if err != nil { + return nil, err + } + + return srv, nil +} + +func queryPidWithWinServiceName(winServiceName string) (uint32, error) { + srv, err := getService(winServiceName) + if err != nil { + return 0, err + } + + var p *windows.SERVICE_STATUS_PROCESS + var bytesNeeded uint32 + var buf []byte + + if err := windows.QueryServiceStatusEx(srv.Handle, windows.SC_STATUS_PROCESS_INFO, nil, 0, &bytesNeeded); err != windows.ERROR_INSUFFICIENT_BUFFER { + return 0, err + } + + buf = make([]byte, bytesNeeded) + p = (*windows.SERVICE_STATUS_PROCESS)(unsafe.Pointer(&buf[0])) + if err := windows.QueryServiceStatusEx(srv.Handle, windows.SC_STATUS_PROCESS_INFO, &buf[0], uint32(len(buf)), &bytesNeeded); err != nil { + return 0, err + } + + return p.ProcessId, nil +} diff --git a/plugins/inputs/all/aiven_procstat.go b/plugins/inputs/all/aiven_procstat.go new file mode 100644 index 0000000000000..6d9b1deb5590f --- /dev/null +++ b/plugins/inputs/all/aiven_procstat.go @@ -0,0 +1,5 @@ +//go:build !custom || inputs || inputs.aiven_procstat + +package all + +import _ "github.com/influxdata/telegraf/plugins/inputs/aiven-procstat" // register plugin diff --git a/plugins/inputs/elasticsearch/README.md b/plugins/inputs/elasticsearch/README.md index c75586710a1a8..4d4c72a7dd3b6 100644 --- a/plugins/inputs/elasticsearch/README.md +++ b/plugins/inputs/elasticsearch/README.md @@ -76,6 +76,11 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. ## Gather stats from the enrich API # enrich_stats = false + ## Set ccr_stats to true when you want to obtain cross cluster replication stats. + ccr_stats = false + + ## Only gather ccr_stats from the master node. To work this require local = true + ccr_stats_only_from_master = true ## Indices to collect; can be one or more indices names or _all ## Use of wildcards is allowed. Use a wildcard at the end to retrieve index diff --git a/plugins/inputs/elasticsearch/elasticsearch.go b/plugins/inputs/elasticsearch/elasticsearch.go index 9341abc56d8ae..666d146de8d8d 100644 --- a/plugins/inputs/elasticsearch/elasticsearch.go +++ b/plugins/inputs/elasticsearch/elasticsearch.go @@ -104,6 +104,44 @@ type clusterStats struct { Nodes interface{} `json:"nodes"` } +type ccrLeaderStats struct { + ccrLeaderIndexStats + NumReplicatedIndices int `json:"num_replicated_indices"` + IndexStats map[string]ccrLeaderIndexStats `json:"index_stats"` +} + +type ccrLeaderIndexStats struct { + TranslogSizeBytes int `json:"translog_size_bytes"` + OperationsRead int `json:"operations_read"` + OperationsReadLucene int `json:"operations_read_lucene"` + OperationsReadTranslog int `json:"operations_read_translog"` + TotalReadTimeLuceneMillis int `json:"total_read_time_lucene_millis"` + TotalReadTimeTranslogMillis int `json:"total_read_time_translog_millis"` + BytesRead int `json:"bytes_read"` +} + +type ccrFollowerStats struct { + ccrFollowerIndexStats + NumSyincingIndices int `json:"num_syncing_indices"` + NumBootstrappingIndicies int `json:"num_bootstrapping_indices"` + NumPausedIndices int `json:"num_paused_indices"` + NumFailedIndices int `json:"num_failed_indices"` + NumShardTasks int `json:"num_shard_tasks"` + NumIndexTasks int `json:"num_index_tasks"` +} + +type ccrFollowerIndexStats struct { + OperationsWritten int `json:"operations_written"` + OperationsRead int `json:"operations_read"` + FailedReadRequests int `json:"failed_read_requests"` + ThrottledReadRequests int `json:"throttled_read_requests"` + FailedWriteRequests int `json:"failed_write_requests"` + ThrottledWriteRequests int `json:"throttled_write_requests"` + FollowerCheckpoint int `json:"follower_checkpoint"` + LeaderCheckpoint int `json:"leader_checkpoint"` + TotalWriteTimeMillis int `json:"total_write_time_millis"` +} + type indexStat struct { Primaries interface{} `json:"primaries"` Total interface{} `json:"total"` @@ -121,6 +159,8 @@ type Elasticsearch struct { ClusterHealthLevel string `toml:"cluster_health_level"` ClusterStats bool `toml:"cluster_stats"` ClusterStatsOnlyFromMaster bool `toml:"cluster_stats_only_from_master"` + CCRStats bool `toml:"ccr_stats"` + CCRStatsOnlyFromMaster bool `toml:"ccr_stats_only_from_master"` EnrichStats bool `toml:"enrich_stats"` IndicesInclude []string `toml:"indices_include"` IndicesLevel string `toml:"indices_level"` @@ -151,6 +191,7 @@ func (i serverInfo) isMaster() bool { func NewElasticsearch() *Elasticsearch { return &Elasticsearch{ ClusterStatsOnlyFromMaster: true, + CCRStatsOnlyFromMaster: true, ClusterHealthLevel: "indices", HTTPClientConfig: httpconfig.HTTPClientConfig{ ResponseHeaderTimeout: config.Duration(5 * time.Second), @@ -285,6 +326,17 @@ func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error { } } + if e.CCRStats && (e.serverInfo[s].isMaster() || !e.CCRStatsOnlyFromMaster || !e.Local) { + if err := e.gatherCCRLeaderStats(s+"/_plugins/_replication/leader_stats", acc); err != nil { + acc.AddError(fmt.Errorf(mask.ReplaceAllString(err.Error(), "http(s)://XXX:XXX@"))) + return + } + if err := e.gatherCCRFollowerStats(s+"/_plugins/_replication/follower_stats", acc); err != nil { + acc.AddError(fmt.Errorf(mask.ReplaceAllString(err.Error(), "http(s)://XXX:XXX@"))) + return + } + } + if len(e.IndicesInclude) > 0 && (e.serverInfo[s].isMaster() || !e.ClusterStatsOnlyFromMaster || !e.Local) { if e.IndicesLevel != "shards" { if err := e.gatherIndicesStats(s+"/"+strings.Join(e.IndicesInclude, ",")+"/_stats", acc); err != nil { @@ -535,6 +587,58 @@ func (e *Elasticsearch) gatherClusterStats(url string, acc telegraf.Accumulator) return nil } +func (e *Elasticsearch) gatherCCRLeaderStats(url string, acc telegraf.Accumulator) error { + ccrStats := &ccrLeaderStats{} + if err := e.gatherJSONData(url, ccrStats); err != nil { + return err + } + now := time.Now() + + stats := map[string]interface{}{ + "num_replicated_indices": float64(ccrStats.NumReplicatedIndices), + "translog_size_bytes": float64(ccrStats.TranslogSizeBytes), + "bytes_read": float64(ccrStats.BytesRead), + "operations_read": float64(ccrStats.OperationsRead), + "operations_read_lucene": float64(ccrStats.OperationsReadLucene), + "operations_read_translog": float64(ccrStats.OperationsReadTranslog), + "total_read_time_lucene_millis": float64(ccrStats.TotalReadTimeLuceneMillis), + "total_read_time_translog_millis": float64(ccrStats.TotalReadTimeTranslogMillis), + } + + acc.AddFields("elasticsearch_ccr_stats_leader", stats, map[string]string{}, now) + + return nil +} + +func (e *Elasticsearch) gatherCCRFollowerStats(url string, acc telegraf.Accumulator) error { + ccrStats := &ccrFollowerStats{} + if err := e.gatherJSONData(url, ccrStats); err != nil { + return err + } + now := time.Now() + + stats := map[string]interface{}{ + "num_syncing_indices": float64(ccrStats.NumSyincingIndices), + "num_bootstrapping_indices": float64(ccrStats.NumBootstrappingIndicies), + "num_paused_indices": float64(ccrStats.NumPausedIndices), + "num_failed_indices": float64(ccrStats.NumFailedIndices), + "num_shard_tasks": float64(ccrStats.NumShardTasks), + "num_index_tasks": float64(ccrStats.NumIndexTasks), + "operations_written": float64(ccrStats.OperationsWritten), + "operations_read": float64(ccrStats.OperationsRead), + "failed_read_requests": float64(0), + "throttled_read_requests": float64(0), + "failed_write_requests": float64(0), + "throttled_write_requests": float64(0), + "follower_checkpoint": float64(1), + "leader_checkpoint": float64(1), + "total_write_time_millis": float64(2290), + } + acc.AddFields("elasticsearch_ccr_stats_follower", stats, map[string]string{}, now) + + return nil +} + func (e *Elasticsearch) gatherIndicesStats(url string, acc telegraf.Accumulator) error { indicesStats := &struct { Shards map[string]interface{} `json:"_shards"` diff --git a/plugins/inputs/elasticsearch/elasticsearch_test.go b/plugins/inputs/elasticsearch/elasticsearch_test.go index 2ee1160492fdb..978d16d427b22 100644 --- a/plugins/inputs/elasticsearch/elasticsearch_test.go +++ b/plugins/inputs/elasticsearch/elasticsearch_test.go @@ -274,6 +274,47 @@ func TestGatherClusterStatsNonMaster(t *testing.T) { checkNodeStatsResult(t, &acc) } +func TestGatherCCRStatsMaster(t *testing.T) { + // This needs multiple steps to replicate the multiple calls internally. + es := newElasticsearchWithClient() + es.CCRStats = true + es.Servers = []string{"http://example.com:9200"} + es.serverInfo = make(map[string]serverInfo) + info := serverInfo{nodeID: "SDFsfSDFsdfFSDSDfSFDSDF", masterID: ""} + + // first get catMaster + es.client.Transport = newTransportMock(IsMasterResult) + masterID, err := es.getCatMaster("junk") + require.NoError(t, err) + info.masterID = masterID + es.serverInfo["http://example.com:9200"] = info + + isMasterResultTokens := strings.Split(IsMasterResult, " ") + require.Equal(t, masterID, isMasterResultTokens[0], "catmaster is incorrect") + + // now get node status, which determines whether we're master + var acc testutil.Accumulator + es.Local = true + es.client.Transport = newTransportMock(nodeStatsResponse) + require.NoError(t, es.gatherNodeStats("junk", &acc)) + require.True(t, es.serverInfo[es.Servers[0]].isMaster(), "IsMaster set incorrectly") + checkNodeStatsResult(t, &acc) + + tags := map[string]string{} + + // now test the ccr leader method + es.client.Transport = newTransportMock(ccrLeaderResponse) + require.NoError(t, es.gatherCCRLeaderStats("junk", &acc)) + + acc.AssertContainsTaggedFields(t, "elasticsearch_ccr_stats_leader", ccrLeaderStatsExpected, tags) + + // now test the ccr follower method + es.client.Transport = newTransportMock(ccrFollowerResponse) + require.NoError(t, es.gatherCCRFollowerStats("junk", &acc)) + + acc.AssertContainsTaggedFields(t, "elasticsearch_ccr_stats_follower", ccrFollowerStatsExpected, tags) +} + func TestGatherClusterIndicesStats(t *testing.T) { es := newElasticsearchWithClient() es.IndicesInclude = []string{"_all"} diff --git a/plugins/inputs/elasticsearch/sample.conf b/plugins/inputs/elasticsearch/sample.conf index 759017f639a73..ea8b6f5b65f21 100644 --- a/plugins/inputs/elasticsearch/sample.conf +++ b/plugins/inputs/elasticsearch/sample.conf @@ -33,6 +33,12 @@ ## To work this require local = true cluster_stats_only_from_master = true + ## Set ccr_stats to true when you want to obtain cross cluster replication stats. + ccr_stats = false + + ## Only gather ccr_stats from the master node. To work this require local = true + ccr_stats_only_from_master = true + ## Gather stats from the enrich API # enrich_stats = false diff --git a/plugins/inputs/elasticsearch/testdata_test.go b/plugins/inputs/elasticsearch/testdata_test.go index a0928f57e2dc9..51323ccfdabbc 100644 --- a/plugins/inputs/elasticsearch/testdata_test.go +++ b/plugins/inputs/elasticsearch/testdata_test.go @@ -1648,6 +1648,112 @@ const IsMasterResult = "SDFsfSDFsdfFSDSDfSFDSDF 10.206.124.66 10.206.124.66 test const IsNotMasterResult = "junk 10.206.124.66 10.206.124.66 test.junk.com " +const ccrLeaderResponse = ` +{ + "num_replicated_indices": 2, + "operations_read": 15, + "translog_size_bytes": 1355, + "operations_read_lucene": 0, + "operations_read_translog": 15, + "total_read_time_lucene_millis": 0, + "total_read_time_translog_millis": 659, + "bytes_read": 1000, + "index_stats":{ + "leader-index-1":{ + "operations_read": 7, + "translog_size_bytes": 639, + "operations_read_lucene": 0, + "operations_read_translog": 7, + "total_read_time_lucene_millis": 0, + "total_read_time_translog_millis": 353, + "bytes_read":466 + }, + "leader-index-2":{ + "operations_read": 8, + "translog_size_bytes": 716, + "operations_read_lucene": 0, + "operations_read_translog": 8, + "total_read_time_lucene_millis": 0, + "total_read_time_translog_millis": 306, + "bytes_read": 534 + } + } +} +` + +var ccrLeaderStatsExpected = map[string]interface{}{ + "num_replicated_indices": float64(2), + "translog_size_bytes": float64(1355), + "operations_read": float64(15), + "operations_read_lucene": float64(0), + "operations_read_translog": float64(15), + "total_read_time_lucene_millis": float64(0), + "total_read_time_translog_millis": float64(659), + "bytes_read": float64(1000), +} + +const ccrFollowerResponse = ` +{ + "num_syncing_indices": 2, + "num_bootstrapping_indices": 0, + "num_paused_indices": 0, + "num_failed_indices": 0, + "num_shard_tasks": 2, + "num_index_tasks": 2, + "operations_written": 3, + "operations_read": 3, + "failed_read_requests": 0, + "throttled_read_requests": 0, + "failed_write_requests": 0, + "throttled_write_requests": 0, + "follower_checkpoint": 1, + "leader_checkpoint": 1, + "total_write_time_millis": 2290, + "index_stats":{ + "follower-index-1":{ + "operations_written": 2, + "operations_read": 2, + "failed_read_requests": 0, + "throttled_read_requests": 0, + "failed_write_requests": 0, + "throttled_write_requests": 0, + "follower_checkpoint": 1, + "leader_checkpoint": 1, + "total_write_time_millis": 1355 + }, + "follower-index-2":{ + "operations_written": 1, + "operations_read": 1, + "failed_read_requests": 0, + "throttled_read_requests": 0, + "failed_write_requests": 0, + "throttled_write_requests": 0, + "follower_checkpoint": 0, + "leader_checkpoint": 0, + "total_write_time_millis": 935 + } + } +} +` + +var ccrFollowerStatsExpected = map[string]interface{}{ + "num_syncing_indices": float64(2), + "num_bootstrapping_indices": float64(0), + "num_paused_indices": float64(0), + "num_failed_indices": float64(0), + "num_shard_tasks": float64(2), + "num_index_tasks": float64(2), + "operations_written": float64(3), + "operations_read": float64(3), + "failed_read_requests": float64(0), + "throttled_read_requests": float64(0), + "failed_write_requests": float64(0), + "throttled_write_requests": float64(0), + "follower_checkpoint": float64(1), + "leader_checkpoint": float64(1), + "total_write_time_millis": float64(2290), +} + const clusterIndicesResponse = ` { "_shards": { diff --git a/plugins/inputs/mysql/README.md b/plugins/inputs/mysql/README.md index 22aa69271df40..2a2de4f08036b 100644 --- a/plugins/inputs/mysql/README.md +++ b/plugins/inputs/mysql/README.md @@ -138,6 +138,9 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. # tls_key = "/etc/telegraf/key.pem" ## Use TLS but skip chain & host verification # insecure_skip_verify = false + + ## Aggregate table_io_waits + # aggregate_table_io_waits = true ``` ### String Data diff --git a/plugins/inputs/mysql/mysql.go b/plugins/inputs/mysql/mysql.go index be3b3e85b8243..9e9b4c0b76ab3 100644 --- a/plugins/inputs/mysql/mysql.go +++ b/plugins/inputs/mysql/mysql.go @@ -34,6 +34,7 @@ type Mysql struct { PerfEventsStatementsDigestTextLimit int64 `toml:"perf_events_statements_digest_text_limit"` PerfEventsStatementsLimit int64 `toml:"perf_events_statements_limit"` PerfEventsStatementsTimeLimit int64 `toml:"perf_events_statements_time_limit"` + AggregateTableIOWaits bool `toml:"aggregate_table_io_waits"` TableSchemaDatabases []string `toml:"table_schema_databases"` GatherProcessList bool `toml:"gather_process_list"` GatherUserStatistics bool `toml:"gather_user_statistics"` @@ -298,6 +299,22 @@ const ( SUM_TIMER_FETCH, SUM_TIMER_INSERT, SUM_TIMER_UPDATE, SUM_TIMER_DELETE FROM performance_schema.table_io_waits_summary_by_table WHERE OBJECT_SCHEMA NOT IN ('mysql', 'performance_schema') + ` + perfTableIOWaitsAggregateQuery = ` + SELECT + OBJECT_SCHEMA, + OBJECT_SCHEMA AS OBJECT_NAME, + SUM(COUNT_FETCH) AS COUNT_FETCH, + SUM(COUNT_INSERT) AS COUNT_INSERT, + SUM(COUNT_UPDATE) AS COUNT_UPDATE, + SUM(COUNT_DELETE) AS COUNT_DELETE, + SUM(SUM_TIMER_FETCH) AS SUM_TIMER_FETCH, + SUM(SUM_TIMER_INSERT) AS SUM_TIMER_INSERT, + SUM(SUM_TIMER_UPDATE) AS SUM_TIMER_UPDATE, + SUM(SUM_TIMER_DELETE) AS SUM_TIMER_DELETE + FROM performance_schema.table_io_waits_summary_by_table + WHERE OBJECT_SCHEMA NOT IN ('mysql', 'performance_schema') + GROUP BY OBJECT_SCHEMA ` perfIndexIOWaitsQuery = ` SELECT OBJECT_SCHEMA, OBJECT_NAME, ifnull(INDEX_NAME, 'NONE') as INDEX_NAME, @@ -1178,7 +1195,13 @@ func getColSlice(rows *sql.Rows) ([]interface{}, error) { // gatherPerfTableIOWaits can be used to get total count and time // of I/O wait event for each table and process func (m *Mysql) gatherPerfTableIOWaits(db *sql.DB, servtag string, acc telegraf.Accumulator) error { - rows, err := db.Query(perfTableIOWaitsQuery) + var queryStr string + if m.AggregateTableIOWaits { + queryStr = perfTableIOWaitsAggregateQuery + } else { + queryStr = perfTableIOWaitsQuery + } + rows, err := db.Query(queryStr) if err != nil { return err } diff --git a/plugins/inputs/mysql/sample.conf b/plugins/inputs/mysql/sample.conf index a3a439c90f7eb..ff3c4296d6fcf 100644 --- a/plugins/inputs/mysql/sample.conf +++ b/plugins/inputs/mysql/sample.conf @@ -101,3 +101,6 @@ # tls_key = "/etc/telegraf/key.pem" ## Use TLS but skip chain & host verification # insecure_skip_verify = false + + ## Aggregate table_io_waits + # aggregate_table_io_waits = true diff --git a/plugins/outputs/aiven-postgresql/README.md b/plugins/outputs/aiven-postgresql/README.md new file mode 100644 index 0000000000000..441920924954c --- /dev/null +++ b/plugins/outputs/aiven-postgresql/README.md @@ -0,0 +1,33 @@ +# Aiven PostgreSQL Output Plugin + +This output plugin writes all metrics to PostgreSQL. + +### Configuration: + +```toml +# Send metrics to postgres +[[outputs.aiven-postgresql]] + address = "host=localhost user=postgres sslmode=verify-full" + + ## Store tags as foreign keys in the metrics table. Default is false. + # tags_as_foreignkeys = false + + ## Template to use for generating tables + ## Available Variables: + ## {TABLE} - tablename as identifier + ## {TABLELITERAL} - tablename as string literal + ## {COLUMNS} - column definitions + ## {KEY_COLUMNS} - comma-separated list of key columns (time + tags) + + ## Default template + # table_template = "CREATE TABLE IF NOT EXISTS {TABLE}({COLUMNS})" + ## Example for timescaledb + # table_template = "CREATE TABLE IF NOT EXISTS {TABLE}({COLUMNS}); SELECT create_hypertable({TABLELITERAL},'time',chunk_time_interval := '1 week'::interval, if_not_exists := true);" + + ## Use jsonb datatype for tags. Default is true. + # tags_as_jsonb = true + + ## Use jsonb datatype for fields. Default is true. + # fields_as_jsonb = true + +``` diff --git a/plugins/outputs/aiven-postgresql/postgresql.go b/plugins/outputs/aiven-postgresql/postgresql.go new file mode 100644 index 0000000000000..b90ddf1931855 --- /dev/null +++ b/plugins/outputs/aiven-postgresql/postgresql.go @@ -0,0 +1,427 @@ +package aiven_postgresql + +import ( + "database/sql" + "encoding/json" + "fmt" + "log" + "sort" + "strings" + + "github.com/jackc/pgx/v4" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/outputs" +) + +type Postgresql struct { + db *sql.DB + Address string + Schema string + TagsAsForeignkeys bool + TagsAsJsonb bool + FieldsAsJsonb bool + TableTemplate string + TagTableSuffix string + Tables map[string]bool +} + +func (p *Postgresql) Connect() error { + db, err := sql.Open("pgx", p.Address) + if err != nil { + return err + } + p.db = db + p.Tables = make(map[string]bool) + + return nil +} + +func (p *Postgresql) Close() error { + return p.db.Close() +} + +func contains(haystack []string, needle string) bool { + for _, key := range haystack { + if key == needle { + return true + } + } + return false +} + +func quoteIdent(name string) string { + return pgx.Identifier{name}.Sanitize() +} + +func quoteLiteral(name string) string { + return "'" + strings.Replace(name, "'", "''", -1) + "'" +} + +func (p *Postgresql) fullTableName(name string) string { + return quoteIdent(p.Schema) + "." + quoteIdent(name) +} + +func deriveDatatype(value interface{}) string { + var datatype string + + switch value.(type) { + case bool: + datatype = "boolean" + case uint64: + datatype = "int8" + case int64: + datatype = "int8" + case float64: + datatype = "float8" + case string: + datatype = "text" + default: + datatype = "text" + log.Printf("E! Unknown datatype %T(%v)", value, value) + } + return datatype +} + +var sampleConfig = ` + ## specify address via a url matching: + ## postgres://[pqgotest[:password]]@localhost[/dbname]\ + ## ?sslmode=[disable|verify-ca|verify-full] + ## or a simple string: + ## host=localhost user=pqotest password=... sslmode=... dbname=app_production + ## + ## All connection parameters are optional. + ## + ## Without the dbname parameter, the driver will default to a database + ## with the same name as the user. This dbname is just for instantiating a + ## connection with the server and doesn't restrict the databases we are trying + ## to grab metrics for. + ## + address = "host=localhost user=postgres sslmode=verify-full" + + ## Store tags as foreign keys in the metrics table. Default is false. + # tags_as_foreignkeys = false + + ## Template to use for generating tables + ## Available Variables: + ## {TABLE} - tablename as identifier + ## {TABLELITERAL} - tablename as string literal + ## {COLUMNS} - column definitions + ## {KEY_COLUMNS} - comma-separated list of key columns (time + tags) + + ## Default template + # table_template = "CREATE TABLE IF NOT EXISTS {TABLE}({COLUMNS})" + ## Example for timescaledb + # table_template = "CREATE TABLE IF NOT EXISTS {TABLE}({COLUMNS}); SELECT create_hypertable({TABLELITERAL},'time',chunk_time_interval := '1 week'::interval,if_not_exists := true);" + + ## Schema to create the tables into + # schema = "public" + + ## Use jsonb datatype for tags + # tags_as_jsonb = true + + ## Use jsonb datatype for fields + # fields_as_jsonb = true + +` + +func (p *Postgresql) SampleConfig() string { return sampleConfig } +func (p *Postgresql) Description() string { return "Send metrics to PostgreSQL" } + +func (p *Postgresql) generateCreateTable(metric telegraf.Metric) string { + var columns []string + var pk []string + var sql []string + + pk = append(pk, quoteIdent("time")) + columns = append(columns, "time timestamptz") + + // handle tags if necessary + if len(metric.Tags()) > 0 { + if p.TagsAsForeignkeys { + // tags in separate table + var tag_columns []string + var tag_columndefs []string + columns = append(columns, "tag_id int") + + if p.TagsAsJsonb { + tag_columns = append(tag_columns, "tags") + tag_columndefs = append(tag_columndefs, "tags jsonb") + } else { + for column, _ := range metric.Tags() { + tag_columns = append(tag_columns, quoteIdent(column)) + tag_columndefs = append(tag_columndefs, fmt.Sprintf("%s text", quoteIdent(column))) + } + } + table := quoteIdent(metric.Name() + p.TagTableSuffix) + sql = append(sql, fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s(tag_id serial primary key,%s,UNIQUE(%s))", table, strings.Join(tag_columndefs, ","), strings.Join(tag_columns, ","))) + } else { + // tags in measurement table + if p.TagsAsJsonb { + columns = append(columns, "tags jsonb") + } else { + for column, _ := range metric.Tags() { + pk = append(pk, quoteIdent(column)) + columns = append(columns, fmt.Sprintf("%s text", quoteIdent(column))) + } + } + } + } + + if p.FieldsAsJsonb { + columns = append(columns, "fields jsonb") + } else { + var datatype string + for column, v := range metric.Fields() { + datatype = deriveDatatype(v) + columns = append(columns, fmt.Sprintf("%s %s", quoteIdent(column), datatype)) + } + } + + query := strings.Replace(p.TableTemplate, "{TABLE}", p.fullTableName(metric.Name()), -1) + query = strings.Replace(query, "{TABLELITERAL}", quoteLiteral(p.fullTableName(metric.Name())), -1) + query = strings.Replace(query, "{COLUMNS}", strings.Join(columns, ","), -1) + query = strings.Replace(query, "{KEY_COLUMNS}", strings.Join(pk, ","), -1) + + sql = append(sql, query) + return strings.Join(sql, ";") +} + +func (p *Postgresql) generateInsert(tablename string, columns []string) string { + var placeholder, quoted []string + for i, column := range columns { + placeholder = append(placeholder, fmt.Sprintf("$%d", i+1)) + quoted = append(quoted, quoteIdent(column)) + } + + return fmt.Sprintf("INSERT INTO %s(%s) VALUES(%s)", p.fullTableName(tablename), strings.Join(quoted, ","), strings.Join(placeholder, ",")) +} + +func (p *Postgresql) tableExists(tableName string) bool { + stmt := "SELECT tablename FROM pg_tables WHERE tablename = $1 AND schemaname = $2;" + result, err := p.db.Exec(stmt, tableName, p.Schema) + if err != nil { + log.Printf("E! Error checking for existence of metric table %s: %v", tableName, err) + return false + } + if count, _ := result.RowsAffected(); count == 1 { + p.Tables[tableName] = true + return true + } + return false +} + +func (p *Postgresql) Write(metrics []telegraf.Metric) error { + batches := make(map[string][]interface{}) + params := make(map[string][]string) + colmap := make(map[string][]string) + tabmap := make(map[string]string) + + for _, metric := range metrics { + tablename := metric.Name() + + // create table if needed + if p.Tables[tablename] == false && p.tableExists(tablename) == false { + createStmt := p.generateCreateTable(metric) + _, err := p.db.Exec(createStmt) + if err != nil { + log.Printf("E! Creating table failed: statement: %v, error: %v", createStmt, err) + return err + } + p.Tables[tablename] = true + } + + columns := []string{"time"} + values := []interface{}{metric.Time()} + var js map[string]interface{} + + if len(metric.Tags()) > 0 { + if p.TagsAsForeignkeys { + // tags in separate table + var tag_id int + var where_columns []string + var where_values []interface{} + + if p.TagsAsJsonb { + js = make(map[string]interface{}) + for column, value := range metric.Tags() { + js[column] = value + } + + if len(js) > 0 { + d, err := json.Marshal(js) + if err != nil { + return err + } + + where_columns = append(where_columns, "tags") + where_values = append(where_values, d) + } + } else { + for column, value := range metric.Tags() { + where_columns = append(where_columns, column) + where_values = append(where_values, value) + } + } + + var where_parts []string + for i, column := range where_columns { + where_parts = append(where_parts, fmt.Sprintf("%s = $%d", quoteIdent(column), i+1)) + } + query := fmt.Sprintf("SELECT tag_id FROM %s WHERE %s", p.fullTableName(tablename+p.TagTableSuffix), strings.Join(where_parts, " AND ")) + + err := p.db.QueryRow(query, where_values...).Scan(&tag_id) + if err != nil { + query := p.generateInsert(tablename+p.TagTableSuffix, where_columns) + " RETURNING tag_id" + err := p.db.QueryRow(query, where_values...).Scan(&tag_id) + if err != nil { + return err + } + } + + columns = append(columns, "tag_id") + values = append(values, tag_id) + } else { + // tags in measurement table + if p.TagsAsJsonb { + js = make(map[string]interface{}) + for column, value := range metric.Tags() { + js[column] = value + } + + if len(js) > 0 { + d, err := json.Marshal(js) + if err != nil { + return err + } + + columns = append(columns, "tags") + values = append(values, d) + } + } else { + var keys []string + fields := metric.Tags() + for column := range fields { + keys = append(keys, column) + } + sort.Strings(keys) + for _, column := range keys { + columns = append(columns, column) + values = append(values, fields[column]) + } + } + } + } + + if p.FieldsAsJsonb { + js = make(map[string]interface{}) + for column, value := range metric.Fields() { + js[column] = value + } + + d, err := json.Marshal(js) + if err != nil { + return err + } + + columns = append(columns, "fields") + values = append(values, d) + } else { + var keys []string + fields := metric.Fields() + for column := range fields { + keys = append(keys, column) + } + sort.Strings(keys) + for _, column := range keys { + columns = append(columns, column) + values = append(values, fields[column]) + } + } + + var table_and_cols string + var placeholder, quoted_columns []string + for _, column := range columns { + quoted_columns = append(quoted_columns, quoteIdent(column)) + } + table_and_cols = fmt.Sprintf("%s(%s)", p.fullTableName(tablename), strings.Join(quoted_columns, ",")) + batches[table_and_cols] = append(batches[table_and_cols], values...) + for i, _ := range columns { + i += len(params[table_and_cols]) * len(columns) + placeholder = append(placeholder, fmt.Sprintf("$%d", i+1)) + } + params[table_and_cols] = append(params[table_and_cols], strings.Join(placeholder, ",")) + colmap[table_and_cols] = columns + tabmap[table_and_cols] = tablename + } + + for table_and_cols, values := range batches { + sql := fmt.Sprintf("INSERT INTO %s VALUES (%s)", table_and_cols, strings.Join(params[table_and_cols], "),(")) + _, err := p.db.Exec(sql, values...) + if err != nil { + // check if insert error was caused by column mismatch + retry := false + if p.FieldsAsJsonb == false { + log.Printf("E! Error during insert: %v", err) + tablename := tabmap[table_and_cols] + columns := colmap[table_and_cols] + var quoted_columns []string + for _, column := range columns { + quoted_columns = append(quoted_columns, quoteLiteral(column)) + } + query := "SELECT c FROM unnest(array[%s]) AS c WHERE NOT EXISTS(SELECT 1 FROM information_schema.columns WHERE column_name=c AND table_schema=$1 AND table_name=$2)" + query = fmt.Sprintf(query, strings.Join(quoted_columns, ",")) + result, err := p.db.Query(query, p.Schema, tablename) + if err != nil { + return err + } + defer result.Close() + + // some columns are missing + var column, datatype string + for result.Next() { + err := result.Scan(&column) + if err != nil { + log.Println(err) + } + for i, name := range columns { + if name == column { + datatype = deriveDatatype(values[i]) + } + } + query := "ALTER TABLE %s ADD COLUMN IF NOT EXISTS %s %s;" + _, err = p.db.Exec(fmt.Sprintf(query, p.fullTableName(tablename), quoteIdent(column), datatype)) + if err != nil { + return err + } + retry = true + } + } + + // We added some columns and insert might work now. Try again immediately to + // avoid long lead time in getting metrics when there are several columns missing + // from the original create statement and they get added in small drops. + if retry { + _, err = p.db.Exec(sql, values...) + } + if err != nil { + return err + } + } + } + return nil +} + +func init() { + outputs.Add("aiven-postgresql", func() telegraf.Output { return newPostgresql() }) +} + +func newPostgresql() *Postgresql { + return &Postgresql{ + Schema: "public", + TableTemplate: "CREATE TABLE IF NOT EXISTS {TABLE}({COLUMNS})", + TagsAsJsonb: true, + TagTableSuffix: "_tag", + FieldsAsJsonb: true, + } +} diff --git a/plugins/outputs/aiven-postgresql/postgresql_test.go b/plugins/outputs/aiven-postgresql/postgresql_test.go new file mode 100644 index 0000000000000..6b286b5b8e9d6 --- /dev/null +++ b/plugins/outputs/aiven-postgresql/postgresql_test.go @@ -0,0 +1,68 @@ +package aiven_postgresql + +import ( + "testing" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" + + "github.com/stretchr/testify/assert" +) + +func TestPostgresqlQuote(t *testing.T) { + assert.Equal(t, `"foo"`, quoteIdent("foo")) + assert.Equal(t, `"fo'o"`, quoteIdent("fo'o")) + assert.Equal(t, `"fo""o"`, quoteIdent("fo\"o")) + + assert.Equal(t, "'foo'", quoteLiteral("foo")) + assert.Equal(t, "'fo''o'", quoteLiteral("fo'o")) + assert.Equal(t, "'fo\"o'", quoteLiteral("fo\"o")) +} + +func TestPostgresqlCreateStatement(t *testing.T) { + p := newPostgresql() + timestamp := time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC) + + var m telegraf.Metric + m = metric.New("m", nil, map[string]interface{}{"f": float64(3.14)}, timestamp) + assert.Equal(t, `CREATE TABLE IF NOT EXISTS "public"."m"(time timestamptz,fields jsonb)`, p.generateCreateTable(m)) + + m = metric.New("m", map[string]string{"k": "v"}, map[string]interface{}{"i": int(3)}, timestamp) + assert.Equal(t, `CREATE TABLE IF NOT EXISTS "public"."m"(time timestamptz,tags jsonb,fields jsonb)`, p.generateCreateTable(m)) + + p.TagsAsJsonb = false + p.FieldsAsJsonb = false + + m = metric.New("m", nil, map[string]interface{}{"f": float64(3.14)}, timestamp) + assert.Equal(t, `CREATE TABLE IF NOT EXISTS "public"."m"(time timestamptz,"f" float8)`, p.generateCreateTable(m)) + + m = metric.New("m", nil, map[string]interface{}{"i": int(3)}, timestamp) + assert.Equal(t, `CREATE TABLE IF NOT EXISTS "public"."m"(time timestamptz,"i" int8)`, p.generateCreateTable(m)) + + m = metric.New("m", map[string]string{"k": "v"}, map[string]interface{}{"i": int(3)}, timestamp) + assert.Equal(t, `CREATE TABLE IF NOT EXISTS "public"."m"(time timestamptz,"k" text,"i" int8)`, p.generateCreateTable(m)) + +} + +func TestPostgresqlInsertStatement(t *testing.T) { + p := newPostgresql() + + p.TagsAsJsonb = false + p.FieldsAsJsonb = false + + sql := p.generateInsert("m", []string{"time", "f"}) + assert.Equal(t, `INSERT INTO "public"."m"("time","f") VALUES($1,$2)`, sql) + + sql = p.generateInsert("m", []string{"time", "i"}) + assert.Equal(t, `INSERT INTO "public"."m"("time","i") VALUES($1,$2)`, sql) + + sql = p.generateInsert("m", []string{"time", "f", "i"}) + assert.Equal(t, `INSERT INTO "public"."m"("time","f","i") VALUES($1,$2,$3)`, sql) + + sql = p.generateInsert("m", []string{"time", "k", "i"}) + assert.Equal(t, `INSERT INTO "public"."m"("time","k","i") VALUES($1,$2,$3)`, sql) + + sql = p.generateInsert("m", []string{"time", "k1", "k2", "i"}) + assert.Equal(t, `INSERT INTO "public"."m"("time","k1","k2","i") VALUES($1,$2,$3,$4)`, sql) +} diff --git a/plugins/outputs/all/aiven_postgresql.go b/plugins/outputs/all/aiven_postgresql.go new file mode 100644 index 0000000000000..197f924aac286 --- /dev/null +++ b/plugins/outputs/all/aiven_postgresql.go @@ -0,0 +1,5 @@ +//go:build !custom || outputs || outputs.aiven_postgresql + +package all + +import _ "github.com/influxdata/telegraf/plugins/outputs/aiven-postgresql" // register plugin diff --git a/plugins/outputs/prometheus_client/v1/collector.go b/plugins/outputs/prometheus_client/v1/collector.go index c1a635d058fd1..74383e255c347 100644 --- a/plugins/outputs/prometheus_client/v1/collector.go +++ b/plugins/outputs/prometheus_client/v1/collector.go @@ -15,7 +15,7 @@ import ( ) var ( - invalidNameCharRE = regexp.MustCompile(`[^a-zA-Z0-9_:]`) + invalidNameCharRE = regexp.MustCompile(`[^a-zA-Z0-9_]`) validNameCharRE = regexp.MustCompile(`^[a-zA-Z_][a-zA-Z0-9_]*`) ) diff --git a/plugins/serializers/prometheus/convert.go b/plugins/serializers/prometheus/convert.go index 178c0b9cb8252..b4652666f82ff 100644 --- a/plugins/serializers/prometheus/convert.go +++ b/plugins/serializers/prometheus/convert.go @@ -18,7 +18,6 @@ type Table struct { var MetricNameTable = Table{ First: &unicode.RangeTable{ R16: []unicode.Range16{ - {0x003A, 0x003A, 1}, // : {0x0041, 0x005A, 1}, // A-Z {0x005F, 0x005F, 1}, // _ {0x0061, 0x007A, 1}, // a-z @@ -27,7 +26,7 @@ var MetricNameTable = Table{ }, Rest: &unicode.RangeTable{ R16: []unicode.Range16{ - {0x0030, 0x003A, 1}, // 0-: + {0x0030, 0x0039, 1}, // 0-9 {0x0041, 0x005A, 1}, // A-Z {0x005F, 0x005F, 1}, // _ {0x0061, 0x007A, 1}, // a-z diff --git a/plugins/serializers/prometheus/prometheus_test.go b/plugins/serializers/prometheus/prometheus_test.go index ca643c92041f9..29e96ae35e64f 100644 --- a/plugins/serializers/prometheus/prometheus_test.go +++ b/plugins/serializers/prometheus/prometheus_test.go @@ -486,7 +486,7 @@ cpu_time_idle 43 `), }, { - name: "colons are not replaced in metric name from measurement", + name: "colons are replaced in metric name from measurement", metrics: []telegraf.Metric{ testutil.MustMetric( "cpu::xyzzy", @@ -498,13 +498,13 @@ cpu_time_idle 43 ), }, expected: []byte(` -# HELP cpu::xyzzy_time_idle Telegraf collected metric -# TYPE cpu::xyzzy_time_idle untyped -cpu::xyzzy_time_idle 42 +# HELP cpu__xyzzy_time_idle Telegraf collected metric +# TYPE cpu__xyzzy_time_idle untyped +cpu__xyzzy_time_idle 42 `), }, { - name: "colons are not replaced in metric name from field", + name: "colons are replaced in metric name from field", metrics: []telegraf.Metric{ testutil.MustMetric( "cpu", @@ -516,21 +516,21 @@ cpu::xyzzy_time_idle 42 ), }, expected: []byte(` -# HELP cpu_time:idle Telegraf collected metric -# TYPE cpu_time:idle untyped -cpu_time:idle 42 +# HELP cpu_time_idle Telegraf collected metric +# TYPE cpu_time_idle untyped +cpu_time_idle 42 `), }, { name: "invalid label", metrics: []telegraf.Metric{ testutil.MustMetric( - "cpu", + "cpu_time", map[string]string{ "host-name": "example.org", }, map[string]interface{}{ - "time_idle": 42.0, + "idle": 42.0, }, time.Unix(0, 0), ), diff --git a/plugins/serializers/prometheusremotewrite/prometheusremotewrite_test.go b/plugins/serializers/prometheusremotewrite/prometheusremotewrite_test.go index 3c3ba2bb34ad0..a9e8379ac9033 100644 --- a/plugins/serializers/prometheusremotewrite/prometheusremotewrite_test.go +++ b/plugins/serializers/prometheusremotewrite/prometheusremotewrite_test.go @@ -442,7 +442,7 @@ cpu_time_idle 43 ), }, expected: []byte(` -cpu::xyzzy_time_idle 42 +cpu__xyzzy_time_idle 42 `), }, { @@ -458,19 +458,19 @@ cpu::xyzzy_time_idle 42 ), }, expected: []byte(` -cpu_time:idle 42 +cpu_time_idle 42 `), }, { name: "invalid label", metrics: []telegraf.Metric{ testutil.MustMetric( - "cpu", + "cpu_time", map[string]string{ "host-name": "example.org", }, map[string]interface{}{ - "time_idle": 42.0, + "idle": 42.0, }, time.Unix(0, 0), ), From e91e48a9019b739799db9c3ba06bd43a1bf0cff0 Mon Sep 17 00:00:00 2001 From: Kevin Michel Date: Mon, 30 Oct 2023 07:52:13 +0100 Subject: [PATCH 2/3] feat(inputs.clickhouse): Add replication queue metrics The extra metrics give a better view of what is happening in the cluster. The description of the too_many_tries_replicas metrics was not accurate, this is fixed. Ideally the name should be changed to, but that would break compatibility. --- AIVEN_CHANGES.md | 5 ++- plugins/inputs/clickhouse/README.md | 10 +++++- plugins/inputs/clickhouse/clickhouse.go | 36 ++++++++++++++++---- plugins/inputs/clickhouse/clickhouse_test.go | 33 ++++++++++++++---- 4 files changed, 70 insertions(+), 14 deletions(-) diff --git a/AIVEN_CHANGES.md b/AIVEN_CHANGES.md index 83df7637d8888..ac6b955f70bef 100644 --- a/AIVEN_CHANGES.md +++ b/AIVEN_CHANGES.md @@ -2,6 +2,10 @@ ## Input Plugins +### ClickHouse + +* Add extra metrics to monitor the replication queue + ### Elasticsearch * add cross cluster replication metrics ( they dont work for elasticsearch but its a first step until we have an opensearch plugin ) @@ -33,4 +37,3 @@ ### Prometheus and Prometheus Remote Write * changes to make `Plugins.Prometheus Client` work for the same reasons as stated there - diff --git a/plugins/inputs/clickhouse/README.md b/plugins/inputs/clickhouse/README.md index c76499b9eae26..ae3358b15ec33 100644 --- a/plugins/inputs/clickhouse/README.md +++ b/plugins/inputs/clickhouse/README.md @@ -142,7 +142,15 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. - cluster (Name of the cluster [optional]) - shard_num (Shard number in the cluster [optional]) - fields: - - too_many_tries_replicas (count of replicas which have `num_tries > 1`) + - num_total (Number of replication queue items) + - num_get_part (Number of GET_PART replication queue items) + - num_attach_part (Number of ATTACH_PART replication queue items) + - num_merge_parts (Number of regular MERGE_PARTS replication queue items) + - num_merge_parts_ttl_delete (Number of TTLDelete MERGE_PARTS replication queue items) + - num_merge_parts_ttl_recompress (Number of TTLRecompress MERGE_PARTS replication queue items) + - num_mutate_part (Number of MUTATE_PART replication queue items) + - too_many_tries_replicas (Number of replication queue items with num_tries > 100) + - num_tries_replicas (Number of replication queue items with num_tries > 1) - clickhouse_detached_parts (see [system.detached_parts][] for details) - tags: diff --git a/plugins/inputs/clickhouse/clickhouse.go b/plugins/inputs/clickhouse/clickhouse.go index e1b16912037ac..e924f8b77d0e9 100644 --- a/plugins/inputs/clickhouse/clickhouse.go +++ b/plugins/inputs/clickhouse/clickhouse.go @@ -287,8 +287,15 @@ func (ch *ClickHouse) replicationQueue(acc telegraf.Accumulator, conn *connect) if len(replicationQueueExists) > 0 && replicationQueueExists[0].ReplicationQueueExists > 0 { var replicationTooManyTries []struct { - NumTriesReplicas chUInt64 `json:"replication_num_tries_replicas"` - TooManyTriesReplicas chUInt64 `json:"replication_too_many_tries_replicas"` + NumTotal chUInt64 `json:"replication_num_total"` + NumGetPart chUInt64 `json:"replication_num_get_part"` + NumAttachPart chUInt64 `json:"replication_num_attach_part"` + NumMergeParts chUInt64 `json:"replication_num_merge_parts"` + NumMergePartsTtlDelete chUInt64 `json:"replication_num_merge_parts_ttl_delete"` + NumMergePartsTtlRecompress chUInt64 `json:"replication_num_merge_parts_ttl_recompress"` + NumMutatePart chUInt64 `json:"replication_num_mutate_part"` + NumTriesReplicas chUInt64 `json:"replication_num_tries_replicas"` + TooManyTriesReplicas chUInt64 `json:"replication_too_many_tries_replicas"` } if err := ch.execQuery(conn.url, systemReplicationNumTriesSQL, &replicationTooManyTries); err != nil { return err @@ -296,8 +303,15 @@ func (ch *ClickHouse) replicationQueue(acc telegraf.Accumulator, conn *connect) acc.AddFields("clickhouse_replication_queue", map[string]interface{}{ - "too_many_tries_replicas": uint64(replicationTooManyTries[0].TooManyTriesReplicas), - "num_tries_replicas": uint64(replicationTooManyTries[0].NumTriesReplicas), + "num_total": uint64(replicationTooManyTries[0].NumTotal), + "num_get_part": uint64(replicationTooManyTries[0].NumGetPart), + "num_attach_part": uint64(replicationTooManyTries[0].NumAttachPart), + "num_merge_parts": uint64(replicationTooManyTries[0].NumMergeParts), + "num_merge_parts_ttl_delete": uint64(replicationTooManyTries[0].NumMergePartsTtlDelete), + "num_merge_parts_ttl_recompress": uint64(replicationTooManyTries[0].NumMergePartsTtlRecompress), + "num_mutate_part": uint64(replicationTooManyTries[0].NumMutatePart), + "too_many_tries_replicas": uint64(replicationTooManyTries[0].TooManyTriesReplicas), + "num_tries_replicas": uint64(replicationTooManyTries[0].NumTriesReplicas), }, tags, ) @@ -604,8 +618,18 @@ const ( systemZookeeperRootNodesSQL = "SELECT count() AS zk_root_nodes FROM system.zookeeper WHERE path='/'" systemReplicationExistsSQL = "SELECT count() AS replication_queue_exists FROM system.tables WHERE database='system' AND name='replication_queue'" - systemReplicationNumTriesSQL = "SELECT countIf(num_tries>1) AS replication_num_tries_replicas, countIf(num_tries>100) " + - "AS replication_too_many_tries_replicas FROM system.replication_queue SETTINGS empty_result_for_aggregation_by_empty_set=0" + systemReplicationNumTriesSQL = ` + SELECT + count() as replication_num_total, + countIf(type='GET_PART') AS replication_num_get_part, + countIf(type='ATTACH_PART') AS replication_num_attach_part, + countIf(type='MERGE_PARTS' AND merge_type='Regular') AS replication_num_merge_parts, + countIf(type='MERGE_PARTS' AND merge_type='TTLDelete') AS replication_num_merge_parts_ttl_delete, + countIf(type='MERGE_PARTS' AND merge_type='TTLRecompress') AS replication_num_merge_parts_ttl_recompress, + countIf(type='MUTATE_PART') AS replication_num_mutate_part, + countIf(num_tries>1) AS replication_num_tries_replicas, + countIf(num_tries>100) AS replication_too_many_tries_replicas + FROM system.replication_queue SETTINGS empty_result_for_aggregation_by_empty_set=0` systemDetachedPartsSQL = "SELECT count() AS detached_parts FROM system.detached_parts SETTINGS empty_result_for_aggregation_by_empty_set=0" diff --git a/plugins/inputs/clickhouse/clickhouse_test.go b/plugins/inputs/clickhouse/clickhouse_test.go index d57cee00e01c2..85f9f05971e9c 100644 --- a/plugins/inputs/clickhouse/clickhouse_test.go +++ b/plugins/inputs/clickhouse/clickhouse_test.go @@ -162,12 +162,26 @@ func TestGather(t *testing.T) { case strings.Contains(query, "replication_too_many_tries_replicas"): err := enc.Encode(result{ Data: []struct { - TooManyTriesReplicas chUInt64 `json:"replication_too_many_tries_replicas"` - NumTriesReplicas chUInt64 `json:"replication_num_tries_replicas"` + NumTotal chUInt64 `json:"replication_num_total"` + NumGetPart chUInt64 `json:"replication_num_get_part"` + NumAttachPart chUInt64 `json:"replication_num_attach_part"` + NumMergeParts chUInt64 `json:"replication_num_merge_parts"` + NumMergePartsTtlDelete chUInt64 `json:"replication_num_merge_parts_ttl_delete"` + NumMergePartsTtlRecompress chUInt64 `json:"replication_num_merge_parts_ttl_recompress"` + NumMutatePart chUInt64 `json:"replication_num_mutate_part"` + TooManyTriesReplicas chUInt64 `json:"replication_too_many_tries_replicas"` + NumTriesReplicas chUInt64 `json:"replication_num_tries_replicas"` }{ { - TooManyTriesReplicas: 10, - NumTriesReplicas: 100, + NumTotal: 1000, + NumGetPart: 20, + NumAttachPart: 30, + NumMergeParts: 40, + NumMergePartsTtlDelete: 50, + NumMergePartsTtlRecompress: 60, + NumMutatePart: 70, + TooManyTriesReplicas: 10, + NumTriesReplicas: 100, }, }, }) @@ -348,8 +362,15 @@ func TestGather(t *testing.T) { ) acc.AssertContainsFields(t, "clickhouse_replication_queue", map[string]interface{}{ - "too_many_tries_replicas": uint64(10), - "num_tries_replicas": uint64(100), + "num_total": uint64(1000), + "num_get_part": uint64(20), + "num_attach_part": uint64(30), + "num_merge_parts": uint64(40), + "num_merge_parts_ttl_delete": uint64(50), + "num_merge_parts_ttl_recompress": uint64(60), + "num_mutate_part": uint64(70), + "too_many_tries_replicas": uint64(10), + "num_tries_replicas": uint64(100), }, ) acc.AssertContainsFields(t, "clickhouse_detached_parts", From 48266ed057546f64e5a013b2e48fb83c24a2880d Mon Sep 17 00:00:00 2001 From: Maksim Kremenev Date: Mon, 14 Oct 2024 16:26:03 +0200 Subject: [PATCH 3/3] plugins: fix markdown lint issues --- plugins/inputs/aiven-procstat/README.md | 3 +-- plugins/outputs/aiven-postgresql/README.md | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/plugins/inputs/aiven-procstat/README.md b/plugins/inputs/aiven-procstat/README.md index 4c767130d817b..2b83f97a69ebc 100644 --- a/plugins/inputs/aiven-procstat/README.md +++ b/plugins/inputs/aiven-procstat/README.md @@ -1,8 +1,7 @@ # Aiven Procstat Input Plugin -Was copied from the procstat input. Divergences: +Was copied from the procstat input. Divergences: * add 'systemd_units' configuration parameter. A list that specifies the units to fetch the pids from * to that end it parses the output from `systemctl status` in one go instead of invoking `systemctl status [...]` for every unit * it is not possible to use the globbing feature of the original `procstat` input for several reasons, one being that the tags are not expanded with the glob, the other is that the units we are targeting are not named glob friendly - diff --git a/plugins/outputs/aiven-postgresql/README.md b/plugins/outputs/aiven-postgresql/README.md index 441920924954c..262ea1250dc49 100644 --- a/plugins/outputs/aiven-postgresql/README.md +++ b/plugins/outputs/aiven-postgresql/README.md @@ -2,7 +2,7 @@ This output plugin writes all metrics to PostgreSQL. -### Configuration: +## Configuration ```toml # Send metrics to postgres