From 7731e97a2b53c4798b51697a41b422b442c7885a Mon Sep 17 00:00:00 2001 From: Klaus Post Date: Wed, 1 Nov 2023 13:55:39 +0100 Subject: [PATCH] grid: Remove allocs Removes some minor allocations and cleans up benchmarks as well as adds typed roundtrip benchmarks. Websocket library updated to include https://github.com/gobwas/ws/pull/189 --- go.mod | 2 +- go.sum | 4 +- internal/grid/benchmark_test.go | 170 ++++++++++++++++++++++--------- internal/grid/connection.go | 8 +- internal/grid/grid_test.go | 22 +--- internal/grid/grid_types_test.go | 8 ++ internal/grid/handlers.go | 1 + internal/grid/muxclient.go | 2 - 8 files changed, 143 insertions(+), 74 deletions(-) diff --git a/go.mod b/go.mod index cedbb19b8ecb94..dee4b1727938d5 100644 --- a/go.mod +++ b/go.mod @@ -26,7 +26,7 @@ require ( github.com/go-ldap/ldap/v3 v3.4.6 github.com/go-openapi/loads v0.21.2 github.com/go-sql-driver/mysql v1.7.1 - github.com/gobwas/ws v1.2.1 + github.com/gobwas/ws v1.3.1-0.20231030152437-516805a9f3b3 github.com/golang-jwt/jwt/v4 v4.5.0 github.com/gomodule/redigo v1.8.9 github.com/google/uuid v1.3.1 diff --git a/go.sum b/go.sum index bcb1db8a5f980c..f53687bd48e67d 100644 --- a/go.sum +++ b/go.sum @@ -235,8 +235,8 @@ github.com/gobwas/httphead v0.1.0 h1:exrUm0f4YX0L7EBwZHuCF4GDp8aJfVeBrlLQrs6NqWU github.com/gobwas/httphead v0.1.0/go.mod h1:O/RXo79gxV8G+RqlR/otEwx4Q36zl9rqC5u12GKvMCM= github.com/gobwas/pool v0.2.1 h1:xfeeEhW7pwmX8nuLVlqbzVc7udMDrwetjEv+TZIz1og= github.com/gobwas/pool v0.2.1/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw= -github.com/gobwas/ws v1.2.1 h1:F2aeBZrm2NDsc7vbovKrWSogd4wvfAxg0FQ89/iqOTk= -github.com/gobwas/ws v1.2.1/go.mod h1:hRKAFb8wOxFROYNsT1bqfWnhX+b5MFeJM9r2ZSwg/KY= +github.com/gobwas/ws v1.3.1-0.20231030152437-516805a9f3b3 h1:u5on5kZjHKikhx6d2IAGOxFf4BAcJhUb2v8VJFHBgFA= +github.com/gobwas/ws v1.3.1-0.20231030152437-516805a9f3b3/go.mod h1:hRKAFb8wOxFROYNsT1bqfWnhX+b5MFeJM9r2ZSwg/KY= github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU= github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= diff --git a/internal/grid/benchmark_test.go b/internal/grid/benchmark_test.go index a55462e5fdd416..4a314c0f22ee07 100644 --- a/internal/grid/benchmark_test.go +++ b/internal/grid/benchmark_test.go @@ -46,6 +46,7 @@ func benchmarkGridRequests(b *testing.B, n int) { b.Fatal(err) } } + rpc := NewSingleHandler[*testRequest, *testResponse](handlerTest2, newTestRequest, newTestResponse) grid, err := SetupTestGrid(n) errFatal(err) b.Cleanup(grid.Cleanup) @@ -53,8 +54,16 @@ func benchmarkGridRequests(b *testing.B, n int) { for _, remote := range grid.Managers { // Register a single handler which echos the payload. errFatal(remote.RegisterSingleHandler(handlerTest, func(payload []byte) ([]byte, *RemoteErr) { + defer PutByteBuffer(payload) return append(GetByteBuffer()[:0], payload...), nil })) + errFatal(rpc.Register(remote, func(req *testRequest) (resp *testResponse, err *RemoteErr) { + return &testResponse{ + OrgNum: req.Num, + OrgString: req.String, + Embedded: *req, + }, nil + })) errFatal(err) } const payloadSize = 512 @@ -65,61 +74,124 @@ func benchmarkGridRequests(b *testing.B, n int) { // Wait for all to connect // Parallel writes per server. - for par := 1; par <= 32; par *= 2 { - b.Run("par="+strconv.Itoa(par*runtime.GOMAXPROCS(0)), func(b *testing.B) { - defer timeout(30 * time.Second)() - b.ReportAllocs() - b.SetBytes(int64(len(payload) * 2)) - b.ResetTimer() - t := time.Now() - var ops int64 - var lat int64 - b.SetParallelism(par) - b.RunParallel(func(pb *testing.PB) { - rng := rand.New(rand.NewSource(time.Now().UnixNano())) - n := 0 - var latency int64 - managers := grid.Managers - hosts := grid.Hosts - for pb.Next() { - // Pick a random manager. - src, dst := rng.Intn(len(managers)), rng.Intn(len(managers)) - if src == dst { - dst = (dst + 1) % len(managers) + b.Run("bytes", func(b *testing.B) { + for par := 1; par <= 32; par *= 2 { + b.Run("par="+strconv.Itoa(par*runtime.GOMAXPROCS(0)), func(b *testing.B) { + defer timeout(60 * time.Second)() + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + b.ReportAllocs() + b.SetBytes(int64(len(payload) * 2)) + b.ResetTimer() + t := time.Now() + var ops int64 + var lat int64 + b.SetParallelism(par) + b.RunParallel(func(pb *testing.PB) { + rng := rand.New(rand.NewSource(time.Now().UnixNano())) + n := 0 + var latency int64 + managers := grid.Managers + hosts := grid.Hosts + for pb.Next() { + // Pick a random manager. + src, dst := rng.Intn(len(managers)), rng.Intn(len(managers)) + if src == dst { + dst = (dst + 1) % len(managers) + } + local := managers[src] + conn := local.Connection(hosts[dst]) + if conn == nil { + b.Fatal("No connection") + } + // Send the payload. + t := time.Now() + resp, err := conn.Request(ctx, handlerTest, payload) + latency += time.Since(t).Nanoseconds() + if err != nil { + if debugReqs { + fmt.Println(err.Error()) + } + b.Fatal(err.Error()) + } + PutByteBuffer(resp) + n++ } - local := managers[src] - conn := local.Connection(hosts[dst]) - if conn == nil { - b.Fatal("No connection") + atomic.AddInt64(&ops, int64(n)) + atomic.AddInt64(&lat, latency) + }) + spent := time.Since(t) + if spent > 0 && n > 0 { + // Since we are benchmarking n parallel servers we need to multiply by n. + // This will give an estimate of the total ops/s. + latency := float64(atomic.LoadInt64(&lat)) / float64(time.Millisecond) + b.ReportMetric(float64(n)*float64(ops)/spent.Seconds(), "vops/s") + b.ReportMetric(latency/float64(ops), "ms/op") + } + }) + } + }) + return + b.Run("rpc", func(b *testing.B) { + for par := 1; par <= 32; par *= 2 { + b.Run("par="+strconv.Itoa(par*runtime.GOMAXPROCS(0)), func(b *testing.B) { + defer timeout(60 * time.Second)() + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + b.ReportAllocs() + b.ResetTimer() + t := time.Now() + var ops int64 + var lat int64 + b.SetParallelism(par) + b.RunParallel(func(pb *testing.PB) { + rng := rand.New(rand.NewSource(time.Now().UnixNano())) + n := 0 + var latency int64 + managers := grid.Managers + hosts := grid.Hosts + req := testRequest{ + Num: rng.Int(), + String: "hello", } - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - // Send the payload. - t := time.Now() - resp, err := conn.Request(ctx, handlerTest, payload) - latency += time.Since(t).Nanoseconds() - cancel() - if err != nil { - if debugReqs { - fmt.Println(err.Error()) + for pb.Next() { + // Pick a random manager. + src, dst := rng.Intn(len(managers)), rng.Intn(len(managers)) + if src == dst { + dst = (dst + 1) % len(managers) } - b.Fatal(err.Error()) + local := managers[src] + conn := local.Connection(hosts[dst]) + if conn == nil { + b.Fatal("No connection") + } + // Send the payload. + t := time.Now() + resp, err := rpc.Call(ctx, conn, &req) + latency += time.Since(t).Nanoseconds() + if err != nil { + if debugReqs { + fmt.Println(err.Error()) + } + b.Fatal(err.Error()) + } + rpc.PutResponse(resp) + n++ } - PutByteBuffer(resp) - n++ + atomic.AddInt64(&ops, int64(n)) + atomic.AddInt64(&lat, latency) + }) + spent := time.Since(t) + if spent > 0 && n > 0 { + // Since we are benchmarking n parallel servers we need to multiply by n. + // This will give an estimate of the total ops/s. + latency := float64(atomic.LoadInt64(&lat)) / float64(time.Millisecond) + b.ReportMetric(float64(n)*float64(ops)/spent.Seconds(), "vops/s") + b.ReportMetric(latency/float64(ops), "ms/op") } - atomic.AddInt64(&ops, int64(n)) - atomic.AddInt64(&lat, latency) }) - spent := time.Since(t) - if spent > 0 && n > 0 { - // Since we are benchmarking n parallel servers we need to multiply by n. - // This will give an estimate of the total ops/s. - latency := float64(atomic.LoadInt64(&lat)) / float64(time.Millisecond) - b.ReportMetric(float64(n)*float64(ops)/spent.Seconds(), "vops/s") - b.ReportMetric(latency/float64(ops), "ms/op") - } - }) - } + } + }) } func BenchmarkStream(b *testing.B) { diff --git a/internal/grid/connection.go b/internal/grid/connection.go index 78991bd339034d..395a13fc93c22e 100644 --- a/internal/grid/connection.go +++ b/internal/grid/connection.go @@ -1187,6 +1187,10 @@ func (c *Connection) handleConnectMux(ctx context.Context, m message, subID *sub // Stream: var handler *StreamHandler if subID == nil { + if !m.Handler.valid() { + logger.LogIf(ctx, c.queueMsg(m, muxConnectError{Error: "Invalid Handler"})) + return + } handler = c.handlers.streams[m.Handler] } else { handler = c.handlers.subStreams[*subID] @@ -1237,6 +1241,9 @@ func (c *Connection) handleRequest(ctx context.Context, m message, subID *subHan logger.LogIf(ctx, c.queueMsg(m, muxConnectError{Error: "Invalid Handler for type"})) return } + + // TODO: This causes allocations, but escape analysis doesn't really show the cause. + // If another faithful engineer wants to take a stab, feel free. go func(m message) { var start time.Time if m.DeadlineMS > 0 { @@ -1258,7 +1265,6 @@ func (c *Connection) handleRequest(ctx context.Context, m message, subID *subHan } }() - // TODO: Maybe recycle m.Payload - should be free here. if m.DeadlineMS > 0 && time.Since(start).Milliseconds()+c.addDeadline.Milliseconds() > int64(m.DeadlineMS) { if debugReqs { fmt.Println(m.MuxID, c.StringReverse(), "DEADLINE EXCEEDED") diff --git a/internal/grid/grid_test.go b/internal/grid/grid_test.go index cfc2905333f02e..b8262fecb2daae 100644 --- a/internal/grid/grid_test.go +++ b/internal/grid/grid_test.go @@ -160,11 +160,7 @@ func TestSingleRoundtripGenerics(t *testing.T) { return resp, nil } // Return error - h2 := NewSingleHandler[*testRequest, *testResponse](handlerTest2, func() *testRequest { - return &testRequest{} - }, func() *testResponse { - return &testResponse{} - }) + h2 := NewSingleHandler[*testRequest, *testResponse](handlerTest2, newTestRequest, newTestResponse) handler2 := func(req *testRequest) (resp *testResponse, err *RemoteErr) { r := RemoteErr(req.String) return nil, &r @@ -682,13 +678,7 @@ func testGenericsStreamRoundtrip(t *testing.T, local, remote *Manager) { // We fake a local and remote server. remoteHost := remote.HostName() - handler := NewStream[*testRequest, *testRequest, *testResponse](handlerTest, func() *testRequest { - return &testRequest{} - }, func() *testRequest { - return &testRequest{} - }, func() *testResponse { - return &testResponse{} - }) + handler := NewStream[*testRequest, *testRequest, *testResponse](handlerTest, newTestRequest, newTestRequest, newTestResponse) handler.InCapacity = 1 handler.OutCapacity = 1 const payloads = 10 @@ -759,13 +749,7 @@ func testGenericsStreamRoundtripSubroute(t *testing.T, local, remote *Manager) { // We fake a local and remote server. remoteHost := remote.HostName() - handler := NewStream[*testRequest, *testRequest, *testResponse](handlerTest, func() *testRequest { - return &testRequest{} - }, func() *testRequest { - return &testRequest{} - }, func() *testResponse { - return &testResponse{} - }) + handler := NewStream[*testRequest, *testRequest, *testResponse](handlerTest, newTestRequest, newTestRequest, newTestResponse) handler.InCapacity = 1 handler.OutCapacity = 1 const payloads = 10 diff --git a/internal/grid/grid_types_test.go b/internal/grid/grid_types_test.go index 99683c4b8b9cd3..1c3b3bf74e12ce 100644 --- a/internal/grid/grid_types_test.go +++ b/internal/grid/grid_types_test.go @@ -29,3 +29,11 @@ type testResponse struct { OrgString string Embedded testRequest } + +func newTestRequest() *testRequest { + return &testRequest{} +} + +func newTestResponse() *testResponse { + return &testResponse{} +} diff --git a/internal/grid/handlers.go b/internal/grid/handlers.go index 182681a3bb59b5..da494138a4741d 100644 --- a/internal/grid/handlers.go +++ b/internal/grid/handlers.go @@ -147,6 +147,7 @@ type ( // A non-nil error value will be returned as RemoteErr(msg) to client. // No client information or cancellation (deadline) is available. // Include this in payload if needed. + // Payload should be recycled with PutByteBuffer if not needed after the call. SingleHandlerFn func(payload []byte) ([]byte, *RemoteErr) // StatelessHandlerFn must handle incoming stateless request. diff --git a/internal/grid/muxclient.go b/internal/grid/muxclient.go index 2389cf0d304fb2..fb08cf15215c7c 100644 --- a/internal/grid/muxclient.go +++ b/internal/grid/muxclient.go @@ -35,7 +35,6 @@ type muxClient struct { MuxID uint64 SendSeq, RecvSeq uint32 LastPong int64 - Resp chan []byte BaseFlags Flags ctx context.Context cancelFn context.CancelCauseFunc @@ -62,7 +61,6 @@ func newMuxClient(ctx context.Context, muxID uint64, parent *Connection) *muxCli ctx, cancelFn := context.WithCancelCause(ctx) return &muxClient{ MuxID: muxID, - Resp: make(chan []byte, 1), ctx: ctx, cancelFn: cancelFn, parent: parent,