Skip to content

Commit

Permalink
Merge pull request #98 from osmosis-labs/mergify/bp/osmo-v25/v0.37.4/…
Browse files Browse the repository at this point in the history
…pr-97

perf(p2p/connection): Lower wasted re-allocations in sendRoutine (bac… (backport #97)
  • Loading branch information
ValarDragon authored Jun 4, 2024
2 parents e6ab6d6 + bd944c1 commit ba57909
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 16 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
- `[p2p/conn]` Speedup connection.WritePacketMsgTo, by reusing internal buffers rather than re-allocating.
([\#2986](https://github.com/cometbft/cometbft/pull/2986))
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
# CHANGELOG

## Unreleased
## v0.37.4-v25-osmo-6

* [#83](https://github.com/osmosis-labs/cometbft/pull/83) perf(types): 3x speedup MakePartSet (#3117)
* [#85](https://github.com/osmosis-labs/cometbft/pull/85) perf(flowrate): Speedup flowrate.Clock (#3016)
* [#86](https://github.com/osmosis-labs/cometbft/pull/86) Comment out expensive debug logs
* [#91](https://github.com/osmosis-labs/cometbft/pull/91) perf(consensus): Minor improvement by making add vote only do one peer set mutex call, not 3 (#3156)
* [#93](https://github.com/osmosis-labs/cometbft/pull/93) perf(consensus): Make some consensus reactor messages take RLock's not WLock's (#3159)
* [#95](https://github.com/osmosis-labs/cometbft/pull/95) perf(types) Make a new method `GetByAddressMut` for `ValSet`, which does not copy the returned validator. (#3129)
* [#97](https://github.com/osmosis-labs/cometbft/pull/97) perf(p2p/connection): Lower wasted re-allocations in sendRoutine (#2986)


## v0.37.4-v25-osmo-5
Expand Down
32 changes: 17 additions & 15 deletions p2p/conn/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,9 +282,10 @@ func (c *MConnection) FlushStop() {
// Send and flush all pending msgs.
// Since sendRoutine has exited, we can call this
// safely
eof := c.sendSomePacketMsgs()
w := protoio.NewDelimitedWriter(c.bufConnWriter)
eof := c.sendSomePacketMsgs(w)
for !eof {
eof = c.sendSomePacketMsgs()
eof = c.sendSomePacketMsgs(w)
}
c.flush()

Expand Down Expand Up @@ -473,7 +474,7 @@ FOR_LOOP:
break FOR_LOOP
case <-c.send:
// Send some PacketMsgs
eof := c.sendSomePacketMsgs()
eof := c.sendSomePacketMsgs(protoWriter)
if !eof {
// Keep sendRoutine awake.
select {
Expand All @@ -500,26 +501,26 @@ FOR_LOOP:

// Returns true if messages from channels were exhausted.
// Blocks in accordance to .sendMonitor throttling.
func (c *MConnection) sendSomePacketMsgs() bool {
func (c *MConnection) sendSomePacketMsgs(w protoio.Writer) bool {
// Block until .sendMonitor says we can write.
// Once we're ready we send more than we asked for,
// but amortized it should even out.
c.sendMonitor.Limit(c._maxPacketMsgSize, c.config.SendRate, true)

// Now send some PacketMsgs.
return c.sendBatchPacketMsgs(numBatchPacketMsgs)
return c.sendBatchPacketMsgs(w, numBatchPacketMsgs)
}

// Returns true if messages from channels were exhausted.
func (c *MConnection) sendBatchPacketMsgs(batchSize int) bool {
func (c *MConnection) sendBatchPacketMsgs(w protoio.Writer, batchSize int) bool {
// Send a batch of PacketMsgs.
for i := 0; i < batchSize; i++ {
channel := selectChannelToGossipOn(c.channels)
// nothing to send across any channel.
if channel == nil {
return true
}
err := c.sendPacketMsgOnChannel(channel)
err := c.sendPacketMsgOnChannel(w, channel)
if err {
return true
}
Expand Down Expand Up @@ -554,11 +555,9 @@ func selectChannelToGossipOn(channels []*Channel) *Channel {
return leastChannel
}

func (c *MConnection) sendPacketMsgOnChannel(sendChannel *Channel) bool {
// c.Logger.Info("Found a msgPacket to send")

func (c *MConnection) sendPacketMsgOnChannel(w protoio.Writer, sendChannel *Channel) bool {
// Make & send a PacketMsg from this channel
_n, err := sendChannel.writePacketMsgTo(c.bufConnWriter)
_n, err := sendChannel.writePacketMsgTo(w)
if err != nil {
c.Logger.Error("Failed to write PacketMsg", "err", err)
c.stopForError(err)
Expand Down Expand Up @@ -861,12 +860,15 @@ func (ch *Channel) nextPacketMsg() tmp2p.PacketMsg {
}

// Writes next PacketMsg to w and updates c.recentlySent.
// Not goroutine-safe
func (ch *Channel) writePacketMsgTo(w io.Writer) (n int, err error) {
// Not goroutine-safe.
func (ch *Channel) writePacketMsgTo(w protoio.Writer) (n int, err error) {
packet := ch.nextPacketMsg()
n, err = protoio.NewDelimitedWriter(w).WriteMsg(mustWrapPacket(&packet))
n, err = w.WriteMsg(mustWrapPacket(&packet))
if err != nil {
return 0, err
}
atomic.AddInt64(&ch.recentlySent, int64(n))
return
return n, nil
}

// Handles incoming PacketMsgs. It returns a message bytes if message is
Expand Down

0 comments on commit ba57909

Please sign in to comment.