Skip to content

Commit

Permalink
sweet: helper to run functions in parallel
Browse files Browse the repository at this point in the history
Change-Id: I0dcdbaecaef7a8b2f08291574229716c6b27079d
Reviewed-on: https://go-review.googlesource.com/c/benchmarks/+/600057
Reviewed-by: Michael Knyszek <[email protected]>
LUCI-TryBot-Result: Go LUCI <[email protected]>
Auto-Submit: Austin Clements <[email protected]>
  • Loading branch information
aclements authored and gopherbot committed Jul 22, 2024
1 parent bff3eae commit 13000c5
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 49 deletions.
34 changes: 9 additions & 25 deletions sweet/benchmarks/cockroachdb/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ import (
"runtime"
"strconv"
"strings"
"sync"
"sync/atomic"
"syscall"
"time"

"golang.org/x/benchmarks/sweet/benchmarks/internal/driver"
"golang.org/x/benchmarks/sweet/benchmarks/internal/par"
"golang.org/x/benchmarks/sweet/benchmarks/internal/server"
"golang.org/x/benchmarks/sweet/common/diagnostics"
)
Expand Down Expand Up @@ -597,15 +597,16 @@ func run(cfg *config) (err error) {
}
return driver.RunBenchmark(cfg.bench.reportName, func(d *driver.B) error {
// Set up diagnostics.
var finishers []func() uint64
var stopAll par.Funcs
if driver.DiagnosticEnabled(diagnostics.CPUProfile) {
for _, inst := range instances {
finishers = append(finishers, server.PollDiagnostic(
stop := server.PollDiagnostic(
inst.httpAddr(),
cfg.tmpDir,
cfg.bench.reportName,
diagnostics.CPUProfile,
))
)
stopAll.Add(stop)
}
}
if driver.DiagnosticEnabled(diagnostics.Trace) {
Expand All @@ -617,10 +618,9 @@ func run(cfg *config) (err error) {
cfg.bench.reportName,
diagnostics.Trace,
)
finishers = append(finishers, func() uint64 {
stopAll.Add(func() {
n := stopTrace()
sum.Add(n)
return n
})
}
defer func() {
Expand All @@ -630,8 +630,8 @@ func run(cfg *config) (err error) {
if driver.DiagnosticEnabled(diagnostics.MemProfile) {
for _, inst := range instances {
inst := inst
finishers = append(finishers, func() uint64 {
n, err := server.CollectDiagnostic(
stopAll.Add(func() {
_, err := server.CollectDiagnostic(
inst.httpAddr(),
cfg.tmpDir,
cfg.bench.reportName,
Expand All @@ -640,26 +640,10 @@ func run(cfg *config) (err error) {
if err != nil {
fmt.Fprintf(os.Stderr, "failed to read memprofile: %v", err)
}
return uint64(n)
})
}
}
if len(finishers) != 0 {
// Finish all the diagnostic collections in concurrently. Otherwise we could be waiting a while.
defer func() {
log.Println("running finishers")
var wg sync.WaitGroup
for _, finish := range finishers {
finish := finish
wg.Add(1)
go func() {
defer wg.Done()
finish()
}()
}
wg.Wait()
}()
}
defer stopAll.Run()
// Actually run the benchmark.
log.Println("running benchmark")
return runBenchmark(d, cfg, instances)
Expand Down
33 changes: 9 additions & 24 deletions sweet/benchmarks/etcd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ import (
"runtime"
"strconv"
"strings"
"sync"
"sync/atomic"

clientv3 "go.etcd.io/etcd/client/v3"
"golang.org/x/benchmarks/sweet/benchmarks/internal/driver"
"golang.org/x/benchmarks/sweet/benchmarks/internal/par"
"golang.org/x/benchmarks/sweet/benchmarks/internal/server"
"golang.org/x/benchmarks/sweet/common/diagnostics"
)
Expand Down Expand Up @@ -362,15 +362,16 @@ func run(cfg *config) (err error) {
}
return driver.RunBenchmark(cfg.bench.reportName, func(d *driver.B) error {
// Set up diagnostics.
var finishers []func() uint64
var stopAll par.Funcs
if driver.DiagnosticEnabled(diagnostics.CPUProfile) {
for _, inst := range instances {
finishers = append(finishers, server.PollDiagnostic(
stop := server.PollDiagnostic(
inst.host(clientPort),
cfg.tmpDir,
cfg.bench.reportName,
diagnostics.CPUProfile,
))
)
stopAll.Add(stop)
}
}
if driver.DiagnosticEnabled(diagnostics.Trace) {
Expand All @@ -382,10 +383,9 @@ func run(cfg *config) (err error) {
cfg.bench.reportName,
diagnostics.Trace,
)
finishers = append(finishers, func() uint64 {
stopAll.Add(func() {
n := stopTrace()
sum.Add(n)
return n
})
}
defer func() {
Expand All @@ -395,8 +395,8 @@ func run(cfg *config) (err error) {
if driver.DiagnosticEnabled(diagnostics.MemProfile) {
for _, inst := range instances {
inst := inst
finishers = append(finishers, func() uint64 {
n, err := server.CollectDiagnostic(
stopAll.Add(func() {
_, err := server.CollectDiagnostic(
inst.host(clientPort),
cfg.tmpDir,
cfg.bench.reportName,
Expand All @@ -405,25 +405,10 @@ func run(cfg *config) (err error) {
if err != nil {
fmt.Fprintf(os.Stderr, "failed to read memprofile: %v", err)
}
return uint64(n)
})
}
}
if len(finishers) != 0 {
// Finish all the diagnostic collections in concurrently. Otherwise we could be waiting a while.
defer func() {
var wg sync.WaitGroup
for _, finish := range finishers {
finish := finish
wg.Add(1)
go func() {
defer wg.Done()
finish()
}()
}
wg.Wait()
}()
}
defer stopAll.Run()
// Actually run the benchmark.
return runBenchmark(d, cfg, instances)
}, opts...)
Expand Down
41 changes: 41 additions & 0 deletions sweet/benchmarks/internal/par/funcs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright 2024 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package par

import (
"reflect"
"sync"
)

type Funcs struct {
funcs []func()
}

func (f *Funcs) Add(fn any) {
if fn, ok := fn.(func()); ok {
// Easy
f.funcs = append(f.funcs, fn)
return
}

rv := reflect.ValueOf(fn)
if rv.Kind() != reflect.Func || rv.Type().NumIn() != 0 {
panic("fn must be a function with zero arguments")
}
f.funcs = append(f.funcs, func() { rv.Call(nil) })
}

func (f *Funcs) Run() {
var wg sync.WaitGroup
for _, fn := range f.funcs {
wg.Add(1)
go func(fn func()) {
defer wg.Done()
fn()
}(fn)
}
f.funcs = nil
wg.Wait()
}

0 comments on commit 13000c5

Please sign in to comment.