Skip to content

Commit

Permalink
Add InfluxDB metrics output (#267)
Browse files Browse the repository at this point in the history
  • Loading branch information
klauspost authored Jun 8, 2023
1 parent 7b79f48 commit 5821379
Show file tree
Hide file tree
Showing 11 changed files with 503 additions and 69 deletions.
96 changes: 89 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -728,20 +728,102 @@ The main reason for running the benchmark on several clients would be to help el

It is important to note that only data that strictly overlaps in absolute time will be considered for analysis.


## InfluxDB Output

Warp allows realtime statistics to be pushed to InfluxDB v2 or later.

This can be combined with the `--stress` parameter, which will allow to have long-running tests without consuming memory and still get access to performance numbers.

Warp does not provide any analysis on the data sent to InfluxDB.

### Configuring

InfluxDB is enabled via a the `--influxdb` parameter. Alternatively the parameter can be set in the `WARP_INFLUXDB_CONNECT` environment variable.

The value must be formatted like a URL: `<schema>://<token>@<hostname>:<port>/<bucket>/<org>?<tag=value>`

| Part | |
|---------------|-------------------------------------------------------------------------------|
| `<schema>` | Connection type. Replace with `http` or `https` |
| `<token>` | Replace with the token needed to access the server |
| `<hostname>` | Replace with the host name or IP address of your server |
| `<port>` | Replace with the port of your server |
| `<bucket>` | Replace with the bucket in which to place the data |
| `<org>` | Replace with the organization to which the data should be associated (if any) |
| `<tag=value>` | One or more tags to add to each data point |

Each parameter can be URL encoded.

Example:

`--influxdb "http://shmRUvVjk0Ig2J9qU0_g349PF6l-GB1dmwXUXDh5qd19n1Nda_K7yvSIi9tGpax9jyOsmP2dUd-md8yPOoDNHg==@127.0.0.1:8086/mybucket/myorg?mytag=myvalue"`

This will connect to port 8086 on 127.0.0.1 using the provided token `shmRU...`.

Data will be placed in `mybucket` and associated with `myorg`. An additional tag `mytag` will be set to `myvalue` on all data points.

For distributed benchmarking all clients will be sending data, so hosts like localhost and 127.0.0.1 should not be used.

### Data

All in-run measurements are of type `warp`.

| Tag | Value |
|------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `warp_id` | Contains a random string value, unique per client.<br/>This can be used to identify individual runs or single warp clients when using distributed benchmarks. |
| `op` | Contains the operation type, for example GET, PUT, DELETE, etc. |
| `endpoint` | Endpoint is the endpoint to which the operation was sent.<br/>Measurements without this value is total for the warp client. |


Fields are sent as accumulated totals per run per operation type.

New metrics are sent as each operation (request) completes. There is no inter-operation progress logged.
This means that bigger objects (meaning less requests) will create bigger fluctuations. That is important to note when analyzing.

| Field | Value |
|---------------------------|--------------------------------------------------------------------------------|
| `requests` | Total number of requests performed |
| `objects` | Total number of objects affected |
| `bytes_total` | Total number of bytes affected |
| `errors` | Total errors encountered |
| `request_total_secs` | Total request time in seconds |
| `request_ttfb_total_secs` | Total time to first byte in seconds for relevant operations |

The statistics provided means that to get "rates over time" the numbers must be calculated as differences (increase/positive derivatives).

### Summary

When a run has finished a summary will be sent. This will be a `warp_run_summary` measurement type.
In addition to the fields above it will contain:

| Field | Value |
|-------------------------|-----------------------------------|
| `request_avg_secs` | Average Request Time |
| `request_max_secs` | Longest Request Time |
| `request_min_secs` | Shortest Request Time |
| `request_ttfb_avg_secs` | Average Time To First Byte (TTFB) |
| `request_ttfb_max_secs` | Longest TTFB |
| `request_ttfb_min_secs` | Shortest TTFB |

All times are in float point seconds.

The summary will be sent for each host and operation type.

# Server Profiling

When running against a MinIO server it is possible to enable profiling while the benchmark is running.

This is done by adding `--serverprof=type` parameter with the type of profile you would like.
This requires that the credentials allows admin access for the first host.

| Type | Description |
|-------|--------------------------------------------------------------------------------------------------------------------------------------------|
| cpu | CPU profile determines where a program spends its time while actively consuming CPU cycles (as opposed while sleeping or waiting for I/O). |
| mem | Heap profile reports the currently live allocations; used to monitor current memory usage or check for memory leaks. |
| block | Block profile show where goroutines block waiting on synchronization primitives (including timer channels). |
| mutex | Mutex profile reports the lock contentions. When you think your CPU is not fully utilized due to a mutex contention, use this profile. |
| trace | A detailed trace of execution of the current program. This will include information about goroutine scheduling and garbage collection. |
| Type | Description |
|---------|--------------------------------------------------------------------------------------------------------------------------------------------|
| `cpu` | CPU profile determines where a program spends its time while actively consuming CPU cycles (as opposed while sleeping or waiting for I/O). |
| `mem` | Heap profile reports the currently live allocations; used to monitor current memory usage or check for memory leaks. |
| `block` | Block profile show where goroutines block waiting on synchronization primitives (including timer channels). |
| `mutex` | Mutex profile reports the lock contentions. When you think your CPU is not fully utilized due to a mutex contention, use this profile. |
| `trace` | A detailed trace of execution of the current program. This will include information about goroutine scheduling and garbage collection. |

Profiles for all cluster members will be downloaded as a zip file.

Expand Down
1 change: 0 additions & 1 deletion cli/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,6 @@ func printRequestAnalysis(_ *cli.Context, ops aggregate.Operation, details bool)
console.SetColor("Print", color.New(color.FgWhite))

if reqs.Skipped {
fmt.Println(reqs)
console.Println("Not enough requests")
return
}
Expand Down
60 changes: 34 additions & 26 deletions cli/benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ var benchFlags = []cli.Flag{

// runBench will run the supplied benchmark and save/print the analysis.
func runBench(ctx *cli.Context, b bench.Benchmark) error {
defer globalWG.Wait()
activeBenchmarkMu.Lock()
ab := activeBenchmark
activeBenchmarkMu.Unlock()
Expand Down Expand Up @@ -246,21 +247,23 @@ func runBench(ctx *cli.Context, b bench.Benchmark) error {
ops.SetClientID(cID)
prof.stop(ctx2, ctx, fileName+".profiles.zip")

f, err := os.Create(fileName + ".csv.zst")
if err != nil {
monitor.Errorln("Unable to write benchmark data:", err)
} else {
func() {
defer f.Close()
enc, err := zstd.NewWriter(f, zstd.WithEncoderLevel(zstd.SpeedBetterCompression))
fatalIf(probe.NewError(err), "Unable to compress benchmark output")
if len(ops) > 0 {
f, err := os.Create(fileName + ".csv.zst")
if err != nil {
monitor.Errorln("Unable to write benchmark data:", err)
} else {
func() {
defer f.Close()
enc, err := zstd.NewWriter(f, zstd.WithEncoderLevel(zstd.SpeedBetterCompression))
fatalIf(probe.NewError(err), "Unable to compress benchmark output")

defer enc.Close()
err = ops.CSV(enc, commandLine(ctx))
fatalIf(probe.NewError(err), "Unable to write benchmark output")
defer enc.Close()
err = ops.CSV(enc, commandLine(ctx))
fatalIf(probe.NewError(err), "Unable to write benchmark output")

monitor.InfoLn(fmt.Sprintf("Benchmark data written to %q\n", fileName+".csv.zst"))
}()
monitor.InfoLn(fmt.Sprintf("Benchmark data written to %q\n", fileName+".csv.zst"))
}()
}
}
monitor.OperationsReady(ops, fileName, commandLine(ctx))
printAnalysis(ctx, ops)
Expand Down Expand Up @@ -425,21 +428,23 @@ func runClientBenchmark(ctx *cli.Context, b bench.Benchmark, cb *clientBenchmark
ops.SetClientID(cID)
ops.SortByStartTime()

f, err := os.Create(fileName + ".csv.zst")
if err != nil {
console.Error("Unable to write benchmark data:", err)
} else {
func() {
defer f.Close()
enc, err := zstd.NewWriter(f, zstd.WithEncoderLevel(zstd.SpeedBetterCompression))
fatalIf(probe.NewError(err), "Unable to compress benchmark output")
if len(ops) > 0 {
f, err := os.Create(fileName + ".csv.zst")
if err != nil {
console.Error("Unable to write benchmark data:", err)
} else {
func() {
defer f.Close()
enc, err := zstd.NewWriter(f, zstd.WithEncoderLevel(zstd.SpeedBetterCompression))
fatalIf(probe.NewError(err), "Unable to compress benchmark output")

defer enc.Close()
err = ops.CSV(enc, commandLine(ctx))
fatalIf(probe.NewError(err), "Unable to write benchmark output")
defer enc.Close()
err = ops.CSV(enc, commandLine(ctx))
fatalIf(probe.NewError(err), "Unable to write benchmark output")

console.Infof("Benchmark data written to %q\n", fileName+".csv.zst")
}()
console.Infof("Benchmark data written to %q\n", fileName+".csv.zst")
}()
}
}

err = cb.waitForStage(stageCleanup)
Expand Down Expand Up @@ -512,6 +517,9 @@ func checkBenchmark(ctx *cli.Context) {
madmin.ProfilerTrace,
}

_, err := parseInfluxURL(ctx)
fatalIf(probe.NewError(err), "invalid influx config")

profs := strings.Split(ctx.String("serverprof"), ",")
for _, profilerType := range profs {
if len(profilerType) == 0 {
Expand Down
34 changes: 18 additions & 16 deletions cli/benchserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,22 +201,24 @@ func runServerBenchmark(ctx *cli.Context, b bench.Benchmark) (bool, error) {
}
}

allOps.SortByStartTime()
f, err := os.Create(fileName + ".csv.zst")
if err != nil {
errorLn("Unable to write benchmark data:", err)
} else {
func() {
defer f.Close()
enc, err := zstd.NewWriter(f, zstd.WithEncoderLevel(zstd.SpeedBetterCompression))
fatalIf(probe.NewError(err), "Unable to compress benchmark output")

defer enc.Close()
err = allOps.CSV(enc, commandLine(ctx))
fatalIf(probe.NewError(err), "Unable to write benchmark output")

infoLn(fmt.Sprintf("Benchmark data written to %q\n", fileName+".csv.zst"))
}()
if len(allOps) > 0 {
allOps.SortByStartTime()
f, err := os.Create(fileName + ".csv.zst")
if err != nil {
errorLn("Unable to write benchmark data:", err)
} else {
func() {
defer f.Close()
enc, err := zstd.NewWriter(f, zstd.WithEncoderLevel(zstd.SpeedBetterCompression))
fatalIf(probe.NewError(err), "Unable to compress benchmark output")

defer enc.Close()
err = allOps.CSV(enc, commandLine(ctx))
fatalIf(probe.NewError(err), "Unable to write benchmark output")

infoLn(fmt.Sprintf("Benchmark data written to %q\n", fileName+".csv.zst"))
}()
}
}
monitor.OperationsReady(allOps, fileName, commandLine(ctx))
printAnalysis(ctx, allOps)
Expand Down
22 changes: 21 additions & 1 deletion cli/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ package cli
import (
"fmt"
"os"
"sync"

"github.com/minio/cli"
"github.com/minio/mc/pkg/probe"
"github.com/minio/pkg/console"
"github.com/minio/warp/pkg/bench"
"github.com/minio/warp/pkg/generator"
Expand Down Expand Up @@ -97,6 +99,8 @@ var profileFlags = []cli.Flag{
},
}

var globalWG sync.WaitGroup

// Set global states. NOTE: It is deliberately kept monolithic to ensure we dont miss out any flags.
func setGlobalsFromContext(ctx *cli.Context) error {
quiet := ctx.IsSet("quiet")
Expand Down Expand Up @@ -130,7 +134,7 @@ func commandLine(ctx *cli.Context) string {
}
name := flag.GetName()
switch name {
case "access-key", "secret-key":
case "access-key", "secret-key", "influxdb":
val = "*REDACTED*"
}
s += " --" + flag.GetName() + "=" + val
Expand Down Expand Up @@ -233,9 +237,24 @@ var ioFlags = []cli.Flag{
Name: "stress",
Usage: "stress test only and discard output",
},
cli.StringFlag{
Name: "influxdb",
EnvVar: appNameUC + "_INFLUXDB_CONNECT",
Usage: "Send operations to InfluxDB. Specify as 'http://<token>@<hostname>:<port>/<bucket>/<org>'",
},
}

func getCommon(ctx *cli.Context, src func() generator.Source) bench.Common {
var extra []chan<- bench.Operation
u, err := parseInfluxURL(ctx)
if err != nil {
fatalIf(probe.NewError(err), "invalid influx config")
}
if u != nil {
if in := newInfluxDB(ctx, &globalWG); in != nil {
extra = append(extra, in)
}
}
return bench.Common{
Client: newClient(ctx),
Concurrency: ctx.Int("concurrent"),
Expand All @@ -244,5 +263,6 @@ func getCommon(ctx *cli.Context, src func() generator.Source) bench.Common {
Location: ctx.String("region"),
PutOpts: putOpts(ctx),
DiscardOutput: ctx.Bool("stress"),
ExtraOut: extra,
}
}
Loading

0 comments on commit 5821379

Please sign in to comment.