Skip to content

Commit

Permalink
Tweak analysis (#140)
Browse files Browse the repository at this point in the history
* Load operations as stream (less memory required)
* When analyzing  only, simplify some operation parameters.
* Simplify hidden profiling. `github.com/pkg/profile` will only do one :(
* Add hidden `--analyze.offset` and `--analyze.limit` to allow loading partial sets.
* Always print duration in mixed mode.
* Print load progress.
  • Loading branch information
klauspost authored Oct 7, 2020
1 parent 869d612 commit 4475aa4
Show file tree
Hide file tree
Showing 8 changed files with 113 additions and 64 deletions.
33 changes: 24 additions & 9 deletions cli/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@
package cli

import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"strings"
Expand Down Expand Up @@ -66,6 +64,18 @@ var analyzeFlags = []cli.Flag{
Hidden: false,
Value: 0,
},
cli.IntFlag{
Name: "analyze.limit",
Usage: "Max operations to load for analysis.",
Hidden: true,
Value: 0,
},
cli.IntFlag{
Name: "analyze.offset",
Usage: "Skip this number of operations for analysis",
Hidden: true,
Value: 0,
},
cli.BoolFlag{
Name: "analyze.v",
Usage: "Display additional analysis data.",
Expand Down Expand Up @@ -123,10 +133,8 @@ func mainAnalyze(ctx *cli.Context) error {
input = f
}
err := zstdDec.Reset(input)
fatalIf(probe.NewError(err), "Unable to decompress input")
b, err := ioutil.ReadAll(zstdDec)
fatalIf(probe.NewError(err), "Unable to read input")
ops, err := bench.OperationsFromCSV(bytes.NewBuffer(b))
ops, err := bench.OperationsFromCSV(zstdDec, true, ctx.Int("analyze.offset"), ctx.Int("analyze.limit"))
fatalIf(probe.NewError(err), "Unable to parse input")

printAnalysis(ctx, ops)
Expand All @@ -137,7 +145,7 @@ func mainAnalyze(ctx *cli.Context) error {

func printMixedOpAnalysis(ctx *cli.Context, aggr aggregate.Aggregated, details bool) {
console.SetColor("Print", color.New(color.FgWhite))
console.Println("Mixed operations.")
console.Printf("Mixed operations.")

if aggr.MixedServerStats == nil {
console.Errorln("No mixed stats")
Expand All @@ -149,10 +157,11 @@ func printMixedOpAnalysis(ctx *cli.Context, aggr aggregate.Aggregated, details b
if aggr.MixedServerStats.Operations > 0 {
pct = 100.0 * float64(ops.Throughput.Operations) / float64(aggr.MixedServerStats.Operations)
}
duration := ops.EndTime.Sub(ops.StartTime).Truncate(time.Second)
if !details {
console.Printf("Operation: %v, %d%%\n", ops.Type, int(pct+0.5))
console.Printf("Operation: %v, %d%%, Concurrency: %d, Duration: %v.\n", ops.Type, int(pct+0.5), ops.Concurrency, duration)
} else {
console.Printf("Operation: %v - total: %v, %.01f%%\n", ops.Type, ops.Throughput.Operations, pct)
console.Printf("Operation: %v - total: %v, %.01f%%, Concurrency: %d, Duration: %v, starting %v\n", ops.Type, ops.Throughput.Operations, pct, ops.Concurrency, duration, ops.StartTime.Truncate(time.Millisecond))
}
console.SetColor("Print", color.New(color.FgWhite))

Expand Down Expand Up @@ -302,7 +311,12 @@ func printAnalysis(ctx *cli.Context, o bench.Operations) {

for ep, ops := range eps {
console.SetColor("Print", color.New(color.FgWhite))
console.Print(" * ", ep, ": Avg: ", ops.StringDetails(details), "\n")
console.Print(" * ", ep, ":")
if !details {
console.Print(" Avg: ", ops.StringDetails(details), "\n")
} else {
console.Print("\n")
}
if ops.Errors > 0 {
console.SetColor("Print", color.New(color.FgHiRed))
console.Println("Errors:", ops.Errors)
Expand All @@ -315,6 +329,7 @@ func printAnalysis(ctx *cli.Context, o bench.Operations) {
continue
}
console.SetColor("Print", color.New(color.FgWhite))
console.Println("\t- Average: ", ops.StringDetails(false))
console.Println("\t- Fastest:", aggregate.BPSorOPS(seg.FastestBPS, seg.FastestOPS))
console.Println("\t- 50% Median:", aggregate.BPSorOPS(seg.MedianBPS, seg.MedianOPS))
console.Println("\t- Slowest:", aggregate.BPSorOPS(seg.SlowestBPS, seg.SlowestOPS))
Expand Down
37 changes: 22 additions & 15 deletions cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,29 +159,36 @@ func registerApp(name string, appCmds []cli.Command) *cli.App {
}

app.Before = func(ctx *cli.Context) error {
var after []func()
if s := ctx.String("cpuprofile"); s != "" {
after = append(after, profile.Start(profile.CPUProfile, profile.ProfilePath(s)).Stop)
var profiles []func(*profile.Profile)
if ctx.Bool("cpu") {
profiles = append(profiles, profile.CPUProfile)
}
if s := ctx.String("memprofile"); s != "" {
after = append(after, profile.Start(profile.MemProfile, profile.ProfilePath(s)).Stop)
if ctx.Bool("mem") {
profiles = append(profiles, profile.MemProfile)
}
if s := ctx.String("blockprofile"); s != "" {
after = append(after, profile.Start(profile.BlockProfile, profile.ProfilePath(s)).Stop)
if ctx.Bool("block") {
profiles = append(profiles, profile.BlockProfile)
}
if s := ctx.String("mutexprofile"); s != "" {
after = append(after, profile.Start(profile.MutexProfile, profile.ProfilePath(s)).Stop)
if ctx.Bool("mutex") {
profiles = append(profiles, profile.MutexProfile)
}
if s := ctx.String("trace"); s != "" {
after = append(after, profile.Start(profile.TraceProfile, profile.ProfilePath(s)).Stop)
if ctx.Bool("trace") {
profiles = append(profiles, profile.TraceProfile)
}
if len(after) == 0 {
if ctx.Bool("threads") {
profiles = append(profiles, profile.ThreadcreationProfile)
}
if len(profiles) == 0 {
return nil
}
if len(profiles) > 1 {
fatal(nil, "sorry, only one type of profiling can be enabled concurrently")
}
profiles = append(profiles, profile.ProfilePath(ctx.String("profdir")))
stopper := profile.Start(profiles...)

afterExec = func(ctx *cli.Context) error {
for _, fn := range after {
fn()
}
stopper.Stop()
return nil
}
return nil
Expand Down
6 changes: 1 addition & 5 deletions cli/cmp.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@
package cli

import (
"bytes"
"io"
"io/ioutil"
"os"
"time"

Expand Down Expand Up @@ -64,10 +62,8 @@ func mainCmp(ctx *cli.Context) error {
fatalIf(probe.NewError(err), "Unable to open input file")
defer f.Close()
err = zstdDec.Reset(f)
fatalIf(probe.NewError(err), "Unable to decompress input")
b, err := ioutil.ReadAll(zstdDec)
fatalIf(probe.NewError(err), "Unable to read input")
ops, err := bench.OperationsFromCSV(bytes.NewBuffer(b))
ops, err := bench.OperationsFromCSV(zstdDec, true, ctx.Int("analyze.offset"), ctx.Int("analyze.limit"))
fatalIf(probe.NewError(err), "Unable to parse input")
return ops
}
Expand Down
44 changes: 25 additions & 19 deletions cli/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,35 +57,41 @@ var globalFlags = []cli.Flag{
}

var profileFlags = []cli.Flag{
// These flags mimmic the `go test` flags.
cli.StringFlag{
Name: "cpuprofile",
Value: "",
Usage: "Write a local CPU profile to the specified file before exiting.",
Name: "profdir",
Usage: "Write profiles to this folder",
Value: "pprof",
Hidden: true,
},
cli.StringFlag{
Name: "memprofile",
Value: "",
Usage: "Write an local allocation profile to the file after all tests have passed.",

cli.BoolFlag{
Name: "cpu",
Usage: "Write a local CPU profile",
Hidden: true,
},
cli.StringFlag{
Name: "blockprofile",
Value: "",
Usage: "Write a local goroutine blocking profile to the specified file when all tests are complete.",
cli.BoolFlag{
Name: "mem",
Usage: "Write an local allocation profile",
Hidden: true,
},
cli.StringFlag{
Name: "mutexprofile",
Value: "",
Usage: "Write a mutex contention profile to the specified file when all tests are complete.",
cli.BoolFlag{
Name: "block",
Usage: "Write a local goroutine blocking profile",
Hidden: true,
},
cli.StringFlag{
cli.BoolFlag{
Name: "mutex",
Usage: "Write a mutex contention profile",
Hidden: true,
},
cli.BoolFlag{
Name: "threads",
Usage: "Write a threas create profile",
Hidden: true,
},
cli.BoolFlag{
Name: "trace",
Value: "",
Usage: "Write an local execution trace to the specified file before exiting.",
Usage: "Write an local execution trace",
Hidden: true,
},
}
Expand Down
6 changes: 1 addition & 5 deletions cli/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@
package cli

import (
"bytes"
"errors"
"fmt"
"io/ioutil"
"os"
"time"

Expand Down Expand Up @@ -75,9 +73,7 @@ func mainMerge(ctx *cli.Context) error {
defer f.Close()
err = zstdDec.Reset(f)
fatalIf(probe.NewError(err), "Unable to decompress input")
b, err := ioutil.ReadAll(zstdDec)
fatalIf(probe.NewError(err), "Unable to read input")
ops, err := bench.OperationsFromCSV(bytes.NewBuffer(b))
ops, err := bench.OperationsFromCSV(zstdDec, false, ctx.Int("analyze.offset"), ctx.Int("analyze.limit"))
fatalIf(probe.NewError(err), "Unable to parse input")

threads = ops.OffsetThreads(threads)
Expand Down
15 changes: 8 additions & 7 deletions pkg/aggregate/throughput.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,21 +45,22 @@ type Throughput struct {

// String returns a string representation of the segment
func (t Throughput) String() string {
return t.StringDetails(true)
return t.StringDetails(true) + " " + t.StringDuration()
}

// StringDuration returns a string representation of the segment duration
func (t Throughput) StringDuration() string {
return fmt.Sprintf("Duration: %v, starting %v", time.Duration(t.MeasureDurationMillis)*time.Millisecond, t.StartTime.Format("15:04:05 MST"))
}

// String returns a string representation of the segment
func (t Throughput) StringDetails(details bool) string {
speed := ""
detail := ""
if t.AverageBPS > 0 {
speed = fmt.Sprintf("%.02f MiB/s, ", t.AverageBPS/(1<<20))
}
if details {
detail = fmt.Sprintf(" (%v, starting %v)", time.Duration(t.MeasureDurationMillis)*time.Millisecond, t.StartTime.Format("15:04:05 MST"))
}
return fmt.Sprintf("%s%.02f obj/s%s",
speed, t.AverageOPS, detail)
return fmt.Sprintf("%s%.02f obj/s",
speed, t.AverageOPS)
}

func (t *Throughput) fill(total bench.Segment) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/bench/analyze_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestOperations_Segment(t *testing.T) {
if err != nil {
t.Fatal(err)
}
ops, err := OperationsFromCSV(bytes.NewBuffer(b))
ops, err := OperationsFromCSV(bytes.NewBuffer(b), false, 0, 0)
if err != nil {
t.Fatal(err)
}
Expand Down
34 changes: 31 additions & 3 deletions pkg/bench/ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -891,7 +891,7 @@ func (o Operations) CSV(w io.Writer, comment string) error {
}

// OperationsFromCSV will load operations from CSV.
func OperationsFromCSV(r io.Reader) (Operations, error) {
func OperationsFromCSV(r io.Reader, analyzeOnly bool, offset, limit int) (Operations, error) {
var ops Operations
cr := csv.NewReader(r)
cr.Comma = '\t'
Expand All @@ -905,6 +905,19 @@ func OperationsFromCSV(r io.Reader) (Operations, error) {
for i, s := range header {
fieldIdx[s] = i
}
var clientMap = make(map[string]string, 16)
cb := byte('a')
getClient := func(c string) string {
if !analyzeOnly {
return c
}
if v, ok := clientMap[c]; ok {
return v
}
clientMap[c] = string([]byte{cb})
cb++
return clientMap[c]
}
for {
values, err := cr.Read()
if err == io.EOF {
Expand All @@ -916,6 +929,10 @@ func OperationsFromCSV(r io.Reader) (Operations, error) {
if len(values) == 0 {
continue
}
if offset > 0 {
offset--
continue
}
start, err := time.Parse(time.RFC3339Nano, values[fieldIdx["start"]])
if err != nil {
return nil, err
Expand Down Expand Up @@ -951,6 +968,10 @@ func OperationsFromCSV(r io.Reader) (Operations, error) {
if idx, ok := fieldIdx["client_id"]; ok {
clientID = values[idx]
}
var file string
if !analyzeOnly {
file = values[fieldIdx["file"]]
}
ops = append(ops, Operation{
OpType: values[fieldIdx["op"]],
ObjPerOp: int(objs),
Expand All @@ -959,11 +980,18 @@ func OperationsFromCSV(r io.Reader) (Operations, error) {
End: end,
Err: values[fieldIdx["error"]],
Size: size,
File: values[fieldIdx["file"]],
File: file,
Thread: uint16(thread),
Endpoint: endpoint,
ClientID: clientID,
ClientID: getClient(clientID),
})
if len(ops)%1000000 == 0 {
console.Printf("\r%d operations loaded...", len(ops))
}
if limit > 0 && len(ops) >= limit {
break
}
}
console.Printf("\r%d operations loaded... Done!\n", len(ops))
return ops, nil
}

0 comments on commit 4475aa4

Please sign in to comment.