Skip to content

Commit

Permalink
Merge pull request #144 from aiven/kremenev/telegraf-v1.32.0
Browse files Browse the repository at this point in the history
Bump to v1.32.0
  • Loading branch information
RommelLayco authored Oct 15, 2024
2 parents dcfadf8 + 48266ed commit cd662c6
Show file tree
Hide file tree
Showing 35 changed files with 2,402 additions and 32 deletions.
39 changes: 39 additions & 0 deletions AIVEN_CHANGES.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Reasons for this fork

## Input Plugins

### ClickHouse

* Add extra metrics to monitor the replication queue

### Elasticsearch

* add cross cluster replication metrics ( they dont work for elasticsearch but its a first step until we have an opensearch plugin )

### Aiven Procstat

* basically a clone of procstat containing incompatible changes that are likely not upstreamable
* needed a way to parse multiple unit files in invocation of `systemctl` for performance Reasons
* the way that telegraf provides ( globbing ) does not fit our systemd unit structure
* we need to check units inside of containers

### MySQL

* added aggregated IOPerf Stats ( probably upstreamable )

## Output Plugins

### Aiven Postgresql

* added postgresql output plugin from scratch to work with timescaledb ( probably upstreamable, although influxdata is not keen on supporting timescaledb as it seems )
* predates the upstream postgresql plugin and was subsequently moved to the aiven prefix

### Prometheus Client

* added incompatible metric name replacements ( not sure exactely why it was needed, but its now our api and we have to keep it )

## Serializers

### Prometheus and Prometheus Remote Write

* changes to make `Plugins.Prometheus Client` work for the same reasons as stated there
2 changes: 1 addition & 1 deletion agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ func (a *Agent) testRunInputs(
// Run plugins that require multiple gathers to calculate rate
// and delta metrics twice.
switch input.Config.Name {
case "cpu", "mongodb", "procstat":
case "cpu", "mongodb", "procstat", "aiven-procstat":
nulAcc := NewAccumulator(input, nul)
nulAcc.SetPrecision(getPrecision(precision, interval))
if err := input.Input.Gather(nulAcc); err != nil {
Expand Down
7 changes: 7 additions & 0 deletions plugins/inputs/aiven-procstat/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Aiven Procstat Input Plugin

Was copied from the procstat input. Divergences:

* add 'systemd_units' configuration parameter. A list that specifies the units to fetch the pids from
* to that end it parses the output from `systemctl status` in one go instead of invoking `systemctl status [...]` for every unit
* it is not possible to use the globbing feature of the original `procstat` input for several reasons, one being that the tags are not expanded with the glob, the other is that the units we are targeting are not named glob friendly
9 changes: 9 additions & 0 deletions plugins/inputs/aiven-procstat/dev/telegraf.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
[agent]
interval="1s"
flush_interval="1s"

[[inputs.procstat]]
exe = "telegraf"

[[outputs.file]]
files = ["stdout"]
96 changes: 96 additions & 0 deletions plugins/inputs/aiven-procstat/native_finder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package aiven_procstat

import (
"fmt"
"io/ioutil"
"regexp"
"strconv"
"strings"

"github.com/shirou/gopsutil/v3/process"
)

//NativeFinder uses gopsutil to find processes
type NativeFinder struct {
}

//NewNativeFinder ...
func NewNativeFinder() (PIDFinder, error) {
return &NativeFinder{}, nil
}

//Uid will return all pids for the given user
func (pg *NativeFinder) Uid(user string) ([]PID, error) {
var dst []PID
procs, err := process.Processes()
if err != nil {
return dst, err
}
for _, p := range procs {
username, err := p.Username()
if err != nil {
//skip, this can happen if we don't have permissions or
//the pid no longer exists
continue
}
if username == user {
dst = append(dst, PID(p.Pid))
}
}
return dst, nil
}

//PidFile returns the pid from the pid file given.
func (pg *NativeFinder) PidFile(path string) ([]PID, error) {
var pids []PID
pidString, err := ioutil.ReadFile(path)
if err != nil {
return pids, fmt.Errorf("Failed to read pidfile '%s'. Error: '%s'",
path, err)
}
pid, err := strconv.ParseInt(strings.TrimSpace(string(pidString)), 10, 32)
if err != nil {
return pids, err
}
pids = append(pids, PID(pid))
return pids, nil

}

//FullPattern matches on the command line when the process was executed
func (pg *NativeFinder) FullPattern(pattern string) ([]PID, error) {
var pids []PID
regxPattern, err := regexp.Compile(pattern)
if err != nil {
return pids, err
}
procs, err := pg.FastProcessList()
if err != nil {
return pids, err
}
for _, p := range procs {
cmd, err := p.Cmdline()
if err != nil {
//skip, this can be caused by the pid no longer existing
//or you having no permissions to access it
continue
}
if regxPattern.MatchString(cmd) {
pids = append(pids, PID(p.Pid))
}
}
return pids, err
}

func (pg *NativeFinder) FastProcessList() ([]*process.Process, error) {
pids, err := process.Pids()
if err != nil {
return nil, err
}

result := make([]*process.Process, len(pids))
for i, pid := range pids {
result[i] = &process.Process{Pid: pid}
}
return result, nil
}
32 changes: 32 additions & 0 deletions plugins/inputs/aiven-procstat/native_finder_notwindows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// +build !windows

package aiven_procstat

import (
"regexp"
)

//Pattern matches on the process name
func (pg *NativeFinder) Pattern(pattern string) ([]PID, error) {
var pids []PID
regxPattern, err := regexp.Compile(pattern)
if err != nil {
return pids, err
}
procs, err := pg.FastProcessList()
if err != nil {
return pids, err
}
for _, p := range procs {
name, err := p.Exe()
if err != nil {
//skip, this can be caused by the pid no longer existing
//or you having no permissions to access it
continue
}
if regxPattern.MatchString(name) {
pids = append(pids, PID(p.Pid))
}
}
return pids, err
}
29 changes: 29 additions & 0 deletions plugins/inputs/aiven-procstat/native_finder_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package aiven_procstat

import (
"testing"

"github.com/stretchr/testify/require"
)

func BenchmarkPattern(b *testing.B) {
f, err := NewNativeFinder()
require.NoError(b, err)
for n := 0; n < b.N; n++ {
_, err := f.Pattern(".*")
if err != nil {
panic(err)
}
}
}

func BenchmarkFullPattern(b *testing.B) {
f, err := NewNativeFinder()
require.NoError(b, err)
for n := 0; n < b.N; n++ {
_, err := f.FullPattern(".*")
if err != nil {
panic(err)
}
}
}
30 changes: 30 additions & 0 deletions plugins/inputs/aiven-procstat/native_finder_windows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package aiven_procstat

import (
"regexp"
)

// Pattern matches on the process name
func (pg *NativeFinder) Pattern(pattern string) ([]PID, error) {
var pids []PID
regxPattern, err := regexp.Compile(pattern)
if err != nil {
return pids, err
}
procs, err := pg.FastProcessList()
if err != nil {
return pids, err
}
for _, p := range procs {
name, err := p.Name()
if err != nil {
//skip, this can be caused by the pid no longer existing
//or you having no permissions to access it
continue
}
if regxPattern.MatchString(name) {
pids = append(pids, PID(p.Pid))
}
}
return pids, err
}
49 changes: 49 additions & 0 deletions plugins/inputs/aiven-procstat/native_finder_windows_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package aiven_procstat

import (
"fmt"
"testing"

"os/user"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestGather_RealPattern(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
pg, err := NewNativeFinder()
require.NoError(t, err)
pids, err := pg.Pattern(`procstat`)
require.NoError(t, err)
fmt.Println(pids)
assert.Equal(t, len(pids) > 0, true)
}

func TestGather_RealFullPattern(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
pg, err := NewNativeFinder()
require.NoError(t, err)
pids, err := pg.FullPattern(`%procstat%`)
require.NoError(t, err)
fmt.Println(pids)
assert.Equal(t, len(pids) > 0, true)
}

func TestGather_RealUser(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
user, err := user.Current()
require.NoError(t, err)
pg, err := NewNativeFinder()
require.NoError(t, err)
pids, err := pg.Uid(user.Username)
require.NoError(t, err)
fmt.Println(pids)
assert.Equal(t, len(pids) > 0, true)
}
90 changes: 90 additions & 0 deletions plugins/inputs/aiven-procstat/pgrep.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package aiven_procstat

import (
"fmt"
"io/ioutil"
"os/exec"
"strconv"
"strings"

"github.com/influxdata/telegraf/internal"
)

// Implementation of PIDGatherer that execs pgrep to find processes
type Pgrep struct {
path string
}

func NewPgrep() (PIDFinder, error) {
path, err := exec.LookPath("pgrep")
if err != nil {
return nil, fmt.Errorf("Could not find pgrep binary: %s", err)
}
return &Pgrep{path}, nil
}

func (pg *Pgrep) PidFile(path string) ([]PID, error) {
var pids []PID
pidString, err := ioutil.ReadFile(path)
if err != nil {
return pids, fmt.Errorf("Failed to read pidfile '%s'. Error: '%s'",
path, err)
}
pid, err := strconv.ParseInt(strings.TrimSpace(string(pidString)), 10, 32)
if err != nil {
return pids, err
}
pids = append(pids, PID(pid))
return pids, nil
}

func (pg *Pgrep) Pattern(pattern string) ([]PID, error) {
args := []string{pattern}
return find(pg.path, args)
}

func (pg *Pgrep) Uid(user string) ([]PID, error) {
args := []string{"-u", user}
return find(pg.path, args)
}

func (pg *Pgrep) FullPattern(pattern string) ([]PID, error) {
args := []string{"-f", pattern}
return find(pg.path, args)
}

func find(path string, args []string) ([]PID, error) {
out, err := run(path, args)
if err != nil {
return nil, err
}

return parseOutput(out)
}

func run(path string, args []string) (string, error) {
out, err := exec.Command(path, args...).Output()

//if exit code 1, ie no processes found, do not return error
if i, _ := internal.ExitStatus(err); i == 1 {
return "", nil
}

if err != nil {
return "", fmt.Errorf("Error running %s: %s", path, err)
}
return string(out), err
}

func parseOutput(out string) ([]PID, error) {
pids := []PID{}
fields := strings.Fields(out)
for _, field := range fields {
pid, err := strconv.ParseInt(field, 10, 32)
if err != nil {
return nil, err
}
pids = append(pids, PID(pid))
}
return pids, nil
}
Loading

0 comments on commit cd662c6

Please sign in to comment.