Skip to content

Commit

Permalink
grid: Remove allocs
Browse files Browse the repository at this point in the history
Removes some minor allocations and cleans up benchmarks as well as adds typed roundtrip benchmarks.

Websocket library updated to include gobwas/ws#189
  • Loading branch information
klauspost committed Nov 1, 2023
1 parent 85af24e commit 68cec68
Show file tree
Hide file tree
Showing 8 changed files with 142 additions and 74 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ require (
github.com/go-ldap/ldap/v3 v3.4.6
github.com/go-openapi/loads v0.21.2
github.com/go-sql-driver/mysql v1.7.1
github.com/gobwas/ws v1.2.1
github.com/gobwas/ws v1.3.1-0.20231030152437-516805a9f3b3
github.com/golang-jwt/jwt/v4 v4.5.0
github.com/gomodule/redigo v1.8.9
github.com/google/uuid v1.3.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -235,8 +235,8 @@ github.com/gobwas/httphead v0.1.0 h1:exrUm0f4YX0L7EBwZHuCF4GDp8aJfVeBrlLQrs6NqWU
github.com/gobwas/httphead v0.1.0/go.mod h1:O/RXo79gxV8G+RqlR/otEwx4Q36zl9rqC5u12GKvMCM=
github.com/gobwas/pool v0.2.1 h1:xfeeEhW7pwmX8nuLVlqbzVc7udMDrwetjEv+TZIz1og=
github.com/gobwas/pool v0.2.1/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw=
github.com/gobwas/ws v1.2.1 h1:F2aeBZrm2NDsc7vbovKrWSogd4wvfAxg0FQ89/iqOTk=
github.com/gobwas/ws v1.2.1/go.mod h1:hRKAFb8wOxFROYNsT1bqfWnhX+b5MFeJM9r2ZSwg/KY=
github.com/gobwas/ws v1.3.1-0.20231030152437-516805a9f3b3 h1:u5on5kZjHKikhx6d2IAGOxFf4BAcJhUb2v8VJFHBgFA=
github.com/gobwas/ws v1.3.1-0.20231030152437-516805a9f3b3/go.mod h1:hRKAFb8wOxFROYNsT1bqfWnhX+b5MFeJM9r2ZSwg/KY=
github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU=
github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
Expand Down
169 changes: 120 additions & 49 deletions internal/grid/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,24 @@ func benchmarkGridRequests(b *testing.B, n int) {
b.Fatal(err)
}
}
rpc := NewSingleHandler[*testRequest, *testResponse](handlerTest2, newTestRequest, newTestResponse)
grid, err := SetupTestGrid(n)
errFatal(err)
b.Cleanup(grid.Cleanup)
// Create n managers.
for _, remote := range grid.Managers {
// Register a single handler which echos the payload.
errFatal(remote.RegisterSingleHandler(handlerTest, func(payload []byte) ([]byte, *RemoteErr) {
defer PutByteBuffer(payload)
return append(GetByteBuffer()[:0], payload...), nil
}))
errFatal(rpc.Register(remote, func(req *testRequest) (resp *testResponse, err *RemoteErr) {
return &testResponse{
OrgNum: req.Num,
OrgString: req.String,
Embedded: *req,
}, nil
}))
errFatal(err)
}
const payloadSize = 512
Expand All @@ -65,61 +74,123 @@ func benchmarkGridRequests(b *testing.B, n int) {

// Wait for all to connect
// Parallel writes per server.
for par := 1; par <= 32; par *= 2 {
b.Run("par="+strconv.Itoa(par*runtime.GOMAXPROCS(0)), func(b *testing.B) {
defer timeout(30 * time.Second)()
b.ReportAllocs()
b.SetBytes(int64(len(payload) * 2))
b.ResetTimer()
t := time.Now()
var ops int64
var lat int64
b.SetParallelism(par)
b.RunParallel(func(pb *testing.PB) {
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
n := 0
var latency int64
managers := grid.Managers
hosts := grid.Hosts
for pb.Next() {
// Pick a random manager.
src, dst := rng.Intn(len(managers)), rng.Intn(len(managers))
if src == dst {
dst = (dst + 1) % len(managers)
b.Run("bytes", func(b *testing.B) {
for par := 1; par <= 32; par *= 2 {
b.Run("par="+strconv.Itoa(par*runtime.GOMAXPROCS(0)), func(b *testing.B) {
defer timeout(60 * time.Second)()
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
b.ReportAllocs()
b.SetBytes(int64(len(payload) * 2))
b.ResetTimer()
t := time.Now()
var ops int64
var lat int64
b.SetParallelism(par)
b.RunParallel(func(pb *testing.PB) {
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
n := 0
var latency int64
managers := grid.Managers
hosts := grid.Hosts
for pb.Next() {
// Pick a random manager.
src, dst := rng.Intn(len(managers)), rng.Intn(len(managers))
if src == dst {
dst = (dst + 1) % len(managers)
}
local := managers[src]
conn := local.Connection(hosts[dst])
if conn == nil {
b.Fatal("No connection")
}
// Send the payload.
t := time.Now()
resp, err := conn.Request(ctx, handlerTest, payload)
latency += time.Since(t).Nanoseconds()
if err != nil {
if debugReqs {
fmt.Println(err.Error())
}
b.Fatal(err.Error())
}
PutByteBuffer(resp)
n++
}
local := managers[src]
conn := local.Connection(hosts[dst])
if conn == nil {
b.Fatal("No connection")
atomic.AddInt64(&ops, int64(n))
atomic.AddInt64(&lat, latency)
})
spent := time.Since(t)
if spent > 0 && n > 0 {
// Since we are benchmarking n parallel servers we need to multiply by n.
// This will give an estimate of the total ops/s.
latency := float64(atomic.LoadInt64(&lat)) / float64(time.Millisecond)
b.ReportMetric(float64(n)*float64(ops)/spent.Seconds(), "vops/s")
b.ReportMetric(latency/float64(ops), "ms/op")
}
})
}
})
b.Run("rpc", func(b *testing.B) {
for par := 1; par <= 32; par *= 2 {
b.Run("par="+strconv.Itoa(par*runtime.GOMAXPROCS(0)), func(b *testing.B) {
defer timeout(60 * time.Second)()
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
b.ReportAllocs()
b.ResetTimer()
t := time.Now()
var ops int64
var lat int64
b.SetParallelism(par)
b.RunParallel(func(pb *testing.PB) {
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
n := 0
var latency int64
managers := grid.Managers
hosts := grid.Hosts
req := testRequest{
Num: rng.Int(),
String: "hello",
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
// Send the payload.
t := time.Now()
resp, err := conn.Request(ctx, handlerTest, payload)
latency += time.Since(t).Nanoseconds()
cancel()
if err != nil {
if debugReqs {
fmt.Println(err.Error())
for pb.Next() {
// Pick a random manager.
src, dst := rng.Intn(len(managers)), rng.Intn(len(managers))
if src == dst {
dst = (dst + 1) % len(managers)
}
b.Fatal(err.Error())
local := managers[src]
conn := local.Connection(hosts[dst])
if conn == nil {
b.Fatal("No connection")
}
// Send the payload.
t := time.Now()
resp, err := rpc.Call(ctx, conn, &req)
latency += time.Since(t).Nanoseconds()
if err != nil {
if debugReqs {
fmt.Println(err.Error())
}
b.Fatal(err.Error())
}
rpc.PutResponse(resp)
n++
}
PutByteBuffer(resp)
n++
atomic.AddInt64(&ops, int64(n))
atomic.AddInt64(&lat, latency)
})
spent := time.Since(t)
if spent > 0 && n > 0 {
// Since we are benchmarking n parallel servers we need to multiply by n.
// This will give an estimate of the total ops/s.
latency := float64(atomic.LoadInt64(&lat)) / float64(time.Millisecond)
b.ReportMetric(float64(n)*float64(ops)/spent.Seconds(), "vops/s")
b.ReportMetric(latency/float64(ops), "ms/op")
}
atomic.AddInt64(&ops, int64(n))
atomic.AddInt64(&lat, latency)
})
spent := time.Since(t)
if spent > 0 && n > 0 {
// Since we are benchmarking n parallel servers we need to multiply by n.
// This will give an estimate of the total ops/s.
latency := float64(atomic.LoadInt64(&lat)) / float64(time.Millisecond)
b.ReportMetric(float64(n)*float64(ops)/spent.Seconds(), "vops/s")
b.ReportMetric(latency/float64(ops), "ms/op")
}
})
}
}
})
}

func BenchmarkStream(b *testing.B) {
Expand Down
8 changes: 7 additions & 1 deletion internal/grid/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -1187,6 +1187,10 @@ func (c *Connection) handleConnectMux(ctx context.Context, m message, subID *sub
// Stream:
var handler *StreamHandler
if subID == nil {
if !m.Handler.valid() {
logger.LogIf(ctx, c.queueMsg(m, muxConnectError{Error: "Invalid Handler"}))
return
}
handler = c.handlers.streams[m.Handler]
} else {
handler = c.handlers.subStreams[*subID]
Expand Down Expand Up @@ -1237,6 +1241,9 @@ func (c *Connection) handleRequest(ctx context.Context, m message, subID *subHan
logger.LogIf(ctx, c.queueMsg(m, muxConnectError{Error: "Invalid Handler for type"}))
return
}

// TODO: This causes allocations, but escape analysis doesn't really show the cause.
// If another faithful engineer wants to take a stab, feel free.
go func(m message) {
var start time.Time
if m.DeadlineMS > 0 {
Expand All @@ -1258,7 +1265,6 @@ func (c *Connection) handleRequest(ctx context.Context, m message, subID *subHan
}
}()

// TODO: Maybe recycle m.Payload - should be free here.
if m.DeadlineMS > 0 && time.Since(start).Milliseconds()+c.addDeadline.Milliseconds() > int64(m.DeadlineMS) {
if debugReqs {
fmt.Println(m.MuxID, c.StringReverse(), "DEADLINE EXCEEDED")
Expand Down
22 changes: 3 additions & 19 deletions internal/grid/grid_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,11 +160,7 @@ func TestSingleRoundtripGenerics(t *testing.T) {
return resp, nil
}
// Return error
h2 := NewSingleHandler[*testRequest, *testResponse](handlerTest2, func() *testRequest {
return &testRequest{}
}, func() *testResponse {
return &testResponse{}
})
h2 := NewSingleHandler[*testRequest, *testResponse](handlerTest2, newTestRequest, newTestResponse)
handler2 := func(req *testRequest) (resp *testResponse, err *RemoteErr) {
r := RemoteErr(req.String)
return nil, &r
Expand Down Expand Up @@ -682,13 +678,7 @@ func testGenericsStreamRoundtrip(t *testing.T, local, remote *Manager) {

// We fake a local and remote server.
remoteHost := remote.HostName()
handler := NewStream[*testRequest, *testRequest, *testResponse](handlerTest, func() *testRequest {
return &testRequest{}
}, func() *testRequest {
return &testRequest{}
}, func() *testResponse {
return &testResponse{}
})
handler := NewStream[*testRequest, *testRequest, *testResponse](handlerTest, newTestRequest, newTestRequest, newTestResponse)
handler.InCapacity = 1
handler.OutCapacity = 1
const payloads = 10
Expand Down Expand Up @@ -759,13 +749,7 @@ func testGenericsStreamRoundtripSubroute(t *testing.T, local, remote *Manager) {

// We fake a local and remote server.
remoteHost := remote.HostName()
handler := NewStream[*testRequest, *testRequest, *testResponse](handlerTest, func() *testRequest {
return &testRequest{}
}, func() *testRequest {
return &testRequest{}
}, func() *testResponse {
return &testResponse{}
})
handler := NewStream[*testRequest, *testRequest, *testResponse](handlerTest, newTestRequest, newTestRequest, newTestResponse)
handler.InCapacity = 1
handler.OutCapacity = 1
const payloads = 10
Expand Down
8 changes: 8 additions & 0 deletions internal/grid/grid_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,11 @@ type testResponse struct {
OrgString string
Embedded testRequest
}

func newTestRequest() *testRequest {
return &testRequest{}
}

func newTestResponse() *testResponse {
return &testResponse{}
}
1 change: 1 addition & 0 deletions internal/grid/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ type (
// A non-nil error value will be returned as RemoteErr(msg) to client.
// No client information or cancellation (deadline) is available.
// Include this in payload if needed.
// Payload should be recycled with PutByteBuffer if not needed after the call.
SingleHandlerFn func(payload []byte) ([]byte, *RemoteErr)

// StatelessHandlerFn must handle incoming stateless request.
Expand Down
2 changes: 0 additions & 2 deletions internal/grid/muxclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ type muxClient struct {
MuxID uint64
SendSeq, RecvSeq uint32
LastPong int64
Resp chan []byte
BaseFlags Flags
ctx context.Context
cancelFn context.CancelCauseFunc
Expand All @@ -62,7 +61,6 @@ func newMuxClient(ctx context.Context, muxID uint64, parent *Connection) *muxCli
ctx, cancelFn := context.WithCancelCause(ctx)
return &muxClient{
MuxID: muxID,
Resp: make(chan []byte, 1),
ctx: ctx,
cancelFn: cancelFn,
parent: parent,
Expand Down

0 comments on commit 68cec68

Please sign in to comment.