diff --git a/.changelog/unreleased/improvements/2986-lower-memory-allocation-in-packet-writing.md b/.changelog/unreleased/improvements/2986-lower-memory-allocation-in-packet-writing.md new file mode 100644 index 0000000000..9d26279880 --- /dev/null +++ b/.changelog/unreleased/improvements/2986-lower-memory-allocation-in-packet-writing.md @@ -0,0 +1,2 @@ +- `[p2p/conn]` Speedup connection.WritePacketMsgTo, by reusing internal buffers rather than re-allocating. + ([\#2986](https://github.com/cometbft/cometbft/pull/2986)) \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md index 9b9147c5cf..a7ad0523e1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,6 @@ # CHANGELOG -## Unreleased +## v0.37.4-v25-osmo-6 * [#83](https://github.com/osmosis-labs/cometbft/pull/83) perf(types): 3x speedup MakePartSet (#3117) * [#85](https://github.com/osmosis-labs/cometbft/pull/85) perf(flowrate): Speedup flowrate.Clock (#3016) @@ -8,6 +8,7 @@ * [#91](https://github.com/osmosis-labs/cometbft/pull/91) perf(consensus): Minor improvement by making add vote only do one peer set mutex call, not 3 (#3156) * [#93](https://github.com/osmosis-labs/cometbft/pull/93) perf(consensus): Make some consensus reactor messages take RLock's not WLock's (#3159) * [#95](https://github.com/osmosis-labs/cometbft/pull/95) perf(types) Make a new method `GetByAddressMut` for `ValSet`, which does not copy the returned validator. (#3129) +* [#97](https://github.com/osmosis-labs/cometbft/pull/97) perf(p2p/connection): Lower wasted re-allocations in sendRoutine (#2986) ## v0.37.4-v25-osmo-5 diff --git a/p2p/conn/connection.go b/p2p/conn/connection.go index 81201ac05e..5f1343d2c7 100644 --- a/p2p/conn/connection.go +++ b/p2p/conn/connection.go @@ -282,9 +282,10 @@ func (c *MConnection) FlushStop() { // Send and flush all pending msgs. // Since sendRoutine has exited, we can call this // safely - eof := c.sendSomePacketMsgs() + w := protoio.NewDelimitedWriter(c.bufConnWriter) + eof := c.sendSomePacketMsgs(w) for !eof { - eof = c.sendSomePacketMsgs() + eof = c.sendSomePacketMsgs(w) } c.flush() @@ -474,7 +475,7 @@ FOR_LOOP: break FOR_LOOP case <-c.send: // Send some PacketMsgs - eof := c.sendSomePacketMsgs() + eof := c.sendSomePacketMsgs(protoWriter) if !eof { // Keep sendRoutine awake. select { @@ -501,18 +502,18 @@ FOR_LOOP: // Returns true if messages from channels were exhausted. // Blocks in accordance to .sendMonitor throttling. -func (c *MConnection) sendSomePacketMsgs() bool { +func (c *MConnection) sendSomePacketMsgs(w protoio.Writer) bool { // Block until .sendMonitor says we can write. // Once we're ready we send more than we asked for, // but amortized it should even out. c.sendMonitor.Limit(c._maxPacketMsgSize, c.config.SendRate, true) // Now send some PacketMsgs. - return c.sendBatchPacketMsgs(numBatchPacketMsgs) + return c.sendBatchPacketMsgs(w, numBatchPacketMsgs) } // Returns true if messages from channels were exhausted. -func (c *MConnection) sendBatchPacketMsgs(batchSize int) bool { +func (c *MConnection) sendBatchPacketMsgs(w protoio.Writer, batchSize int) bool { // Send a batch of PacketMsgs. for i := 0; i < batchSize; i++ { channel := selectChannelToGossipOn(c.channels) @@ -520,7 +521,7 @@ func (c *MConnection) sendBatchPacketMsgs(batchSize int) bool { if channel == nil { return true } - err := c.sendPacketMsgOnChannel(channel) + err := c.sendPacketMsgOnChannel(w, channel) if err { return true } @@ -555,11 +556,9 @@ func selectChannelToGossipOn(channels []*Channel) *Channel { return leastChannel } -func (c *MConnection) sendPacketMsgOnChannel(sendChannel *Channel) bool { - // c.Logger.Info("Found a msgPacket to send") - +func (c *MConnection) sendPacketMsgOnChannel(w protoio.Writer, sendChannel *Channel) bool { // Make & send a PacketMsg from this channel - _n, err := sendChannel.writePacketMsgTo(c.bufConnWriter) + _n, err := sendChannel.writePacketMsgTo(w) if err != nil { c.Logger.Error("Failed to write PacketMsg", "err", err) c.stopForError(err) @@ -862,12 +861,15 @@ func (ch *Channel) nextPacketMsg() tmp2p.PacketMsg { } // Writes next PacketMsg to w and updates c.recentlySent. -// Not goroutine-safe -func (ch *Channel) writePacketMsgTo(w io.Writer) (n int, err error) { +// Not goroutine-safe. +func (ch *Channel) writePacketMsgTo(w protoio.Writer) (n int, err error) { packet := ch.nextPacketMsg() - n, err = protoio.NewDelimitedWriter(w).WriteMsg(mustWrapPacket(&packet)) + n, err = w.WriteMsg(mustWrapPacket(&packet)) + if err != nil { + return 0, err + } atomic.AddInt64(&ch.recentlySent, int64(n)) - return + return n, nil } // Handles incoming PacketMsgs. It returns a message bytes if message is