diff --git a/.changelog/unreleased/improvements/3162-reuse-internal-buffer-for-block-building.md b/.changelog/unreleased/improvements/3162-reuse-internal-buffer-for-block-building.md new file mode 100644 index 0000000000..b85019f0a8 --- /dev/null +++ b/.changelog/unreleased/improvements/3162-reuse-internal-buffer-for-block-building.md @@ -0,0 +1,2 @@ +- [`consensus`] Reuse an internal buffer for block building to reduce memory allocation overhead. + ([\#3162](https://github.com/cometbft/cometbft/issues/3162) diff --git a/consensus/state.go b/consensus/state.go index d712c91ad6..676d1d481d 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -140,6 +140,13 @@ type State struct { // for reporting metrics metrics *Metrics + + // offline state sync height indicating to which height the node synced offline + offlineStateSyncHeight int64 + + // a buffer to store the concatenated proposal block parts (serialization format) + // should only be accessed under the cs.mtx lock + serializedBlockBuffer []byte } // StateOption sets an optional parameter on the State. @@ -1900,6 +1907,27 @@ func (cs *State) defaultSetProposal(proposal *types.Proposal) error { return nil } +func (cs *State) readSerializedBlockFromBlockParts() ([]byte, error) { + // reuse a serialized block buffer from cs + var serializedBlockBuffer []byte + if len(cs.serializedBlockBuffer) < int(cs.ProposalBlockParts.ByteSize()) { + serializedBlockBuffer = make([]byte, cs.ProposalBlockParts.ByteSize()) + cs.serializedBlockBuffer = serializedBlockBuffer + } else { + serializedBlockBuffer = cs.serializedBlockBuffer[:cs.ProposalBlockParts.ByteSize()] + } + + n, err := io.ReadFull(cs.ProposalBlockParts.GetReader(), serializedBlockBuffer) + if err != nil { + return nil, err + } + // Consistency check, should be impossible to fail. + if n != len(serializedBlockBuffer) { + return nil, fmt.Errorf("unexpected error in reading block parts, expected to read %d bytes, read %d", len(serializedBlockBuffer), n) + } + return serializedBlockBuffer, nil +} + // NOTE: block is not necessarily valid. // Asynchronously triggers either enterPrevote (before we timeout of propose) or tryFinalizeCommit, // once we have the full block. @@ -1944,7 +1972,7 @@ func (cs *State) addProposalBlockPart(msg *BlockPartMessage, peerID p2p.ID) (add ) } if added && cs.ProposalBlockParts.IsComplete() { - bz, err := io.ReadAll(cs.ProposalBlockParts.GetReader()) + bz, err := cs.readSerializedBlockFromBlockParts() if err != nil { return added, err } diff --git a/consensus/state_test.go b/consensus/state_test.go index 298dd90770..83eb92e25f 100644 --- a/consensus/state_test.go +++ b/consensus/state_test.go @@ -2067,3 +2067,32 @@ func findBlockSizeLimit(t *testing.T, height, maxBytes int64, cs *State, partSiz require.Fail(t, "We shouldn't hit the end of the loop") return nil, nil } + +// TestReadSerializedBlockFromBlockParts tests that the readSerializedBlockFromBlockParts function +// reads the block correctly from the block parts. +func TestReadSerializedBlockFromBlockParts(t *testing.T) { + sizes := []int{0, 5, 64, 70, 128, 200} + + // iterate through many initial buffer sizes and new block sizes. + // (Skip new block size = 0, as that is not valid construction) + // Ensure that we read back the correct block size, and the buffer is resized correctly. + for i := 0; i < len(sizes); i++ { + for j := 1; j < len(sizes); j++ { + initialSize, newBlockSize := sizes[i], sizes[j] + testName := fmt.Sprintf("initialSize=%d,newBlockSize=%d", initialSize, newBlockSize) + t.Run(testName, func(t *testing.T) { + blockData := cmtrand.Bytes(newBlockSize) + ps := types.NewPartSetFromData(blockData, 64) + cs := &State{ + serializedBlockBuffer: make([]byte, initialSize), + } + cs.ProposalBlockParts = ps + + serializedBlock, err := cs.readSerializedBlockFromBlockParts() + require.NoError(t, err) + require.Equal(t, blockData, serializedBlock) + require.Equal(t, len(cs.serializedBlockBuffer), max(initialSize, newBlockSize)) + }) + } + } +}