Skip to content

Commit

Permalink
Merge pull request #6717 from multiversx/equivalent-proofs-stabilizat…
Browse files Browse the repository at this point in the history
…ion-2

Equivalent proofs stabilization 2
  • Loading branch information
ssd04 authored Jan 29, 2025
2 parents dc29ec4 + a0a6bb5 commit 212798e
Show file tree
Hide file tree
Showing 33 changed files with 615 additions and 175 deletions.
3 changes: 2 additions & 1 deletion cmd/node/config/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@

# ChainParametersByEpoch defines chain operation configurable values that can be modified based on epochs
ChainParametersByEpoch = [
{ EnableEpoch = 0, RoundDuration = 6000, ShardConsensusGroupSize = 7, ShardMinNumNodes = 10, MetachainConsensusGroupSize = 10, MetachainMinNumNodes = 10, Hysteresis = 0.2, Adaptivity = false }
{ EnableEpoch = 0, RoundDuration = 6000, ShardConsensusGroupSize = 7, ShardMinNumNodes = 10, MetachainConsensusGroupSize = 10, MetachainMinNumNodes = 10, Hysteresis = 0.2, Adaptivity = false },
{ EnableEpoch = 8, RoundDuration = 6000, ShardConsensusGroupSize = 10, ShardMinNumNodes = 10, MetachainConsensusGroupSize = 10, MetachainMinNumNodes = 10, Hysteresis = 0.2, Adaptivity = false }
]

[HardwareRequirements]
Expand Down
9 changes: 6 additions & 3 deletions common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,16 @@ func IsEpochChangeBlockForFlagActivation(header data.HeaderHandler, enableEpochs
return isStartOfEpochBlock && isBlockInActivationEpoch
}

// IsFlagEnabledAfterEpochsStartBlock returns true if the flag is enabled for the header, but it is not the epoch start block
func IsFlagEnabledAfterEpochsStartBlock(header data.HeaderHandler, enableEpochsHandler EnableEpochsHandler, flag core.EnableEpochFlag) bool {
// isFlagEnabledAfterEpochsStartBlock returns true if the flag is enabled for the header, but it is not the epoch start block
func isFlagEnabledAfterEpochsStartBlock(header data.HeaderHandler, enableEpochsHandler EnableEpochsHandler, flag core.EnableEpochFlag) bool {
isFlagEnabled := enableEpochsHandler.IsFlagEnabledInEpoch(flag, header.GetEpoch())
isEpochStartBlock := IsEpochChangeBlockForFlagActivation(header, enableEpochsHandler, flag)
return isFlagEnabled && !isEpochStartBlock
}

// ShouldBlockHavePrevProof returns true if the block should have a proof
func ShouldBlockHavePrevProof(header data.HeaderHandler, enableEpochsHandler EnableEpochsHandler, flag core.EnableEpochFlag) bool {
return IsFlagEnabledAfterEpochsStartBlock(header, enableEpochsHandler, flag) && header.GetNonce() > 1
return isFlagEnabledAfterEpochsStartBlock(header, enableEpochsHandler, flag) && header.GetNonce() > 1
}

// VerifyProofAgainstHeader verifies the fields on the proof match the ones on the header
Expand All @@ -71,6 +71,9 @@ func VerifyProofAgainstHeader(proof data.HeaderProofHandler, header data.HeaderH
if proof.GetHeaderRound() != header.GetRound() {
return fmt.Errorf("%w, round mismatch", ErrInvalidHeaderProof)
}
if proof.GetIsStartOfEpoch() != header.IsStartOfEpochBlock() {
return fmt.Errorf("%w, is start of epoch mismatch", ErrInvalidHeaderProof)
}

return nil
}
Expand Down
1 change: 1 addition & 0 deletions consensus/chronology/chronology.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ func (chr *chronology) RemoveAllSubrounds() {

chr.subrounds = make(map[int]int)
chr.subroundHandlers = make([]consensus.SubroundHandler, 0)
chr.subroundId = srBeforeStartRound

chr.mutSubrounds.Unlock()
}
Expand Down
2 changes: 1 addition & 1 deletion consensus/spos/bls/v2/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ func (sr *subroundEndRound) DoEndRoundJobByNode() bool {

// CreateAndBroadcastProof calls the unexported createAndBroadcastHeaderFinalInfo function
func (sr *subroundEndRound) CreateAndBroadcastProof(signature []byte, bitmap []byte) {
_, _ = sr.createAndBroadcastProof(signature, bitmap)
_ = sr.createAndBroadcastProof(signature, bitmap)
}

// ReceivedProof calls the unexported receivedProof function
Expand Down
2 changes: 1 addition & 1 deletion consensus/spos/bls/v2/subroundBlock.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ func (sr *subroundBlock) saveProofForPreviousHeaderIfNeeded(header data.HeaderHa
proof := header.GetPreviousProof()
err := common.VerifyProofAgainstHeader(proof, prevHeader)
if err != nil {
log.Debug("saveProofForPreviousHeaderIfNeeded: invalid proof, %w", err)
log.Debug("saveProofForPreviousHeaderIfNeeded: invalid proof", "error", err.Error())
return
}

Expand Down
64 changes: 42 additions & 22 deletions consensus/spos/bls/v2/subroundEndRound.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (

"github.com/multiversx/mx-chain-core-go/core"
"github.com/multiversx/mx-chain-core-go/core/check"
"github.com/multiversx/mx-chain-core-go/data"
"github.com/multiversx/mx-chain-core-go/data/block"
"github.com/multiversx/mx-chain-core-go/display"

Expand Down Expand Up @@ -247,10 +246,36 @@ func (sr *subroundEndRound) doEndRoundJobByNode() bool {
if !sr.waitForSignalSync() {
return false
}
sr.sendProof()
}

proof, ok := sr.sendProof()
if !ok {
return sr.finalizeConfirmedBlock()
}

func (sr *subroundEndRound) waitForProof() bool {
shardID := sr.ShardCoordinator().SelfId()
headerHash := sr.GetData()
if sr.EquivalentProofsPool().HasProof(shardID, headerHash) {
return true
}

ctx, cancel := context.WithTimeout(context.Background(), sr.RoundHandler().TimeDuration())
defer cancel()

for {
select {
case <-time.After(time.Millisecond):
if sr.EquivalentProofsPool().HasProof(shardID, headerHash) {
return true
}
case <-ctx.Done():
return false
}
}
}

func (sr *subroundEndRound) finalizeConfirmedBlock() bool {
if !sr.waitForProof() {
return false
}

Expand All @@ -259,14 +284,6 @@ func (sr *subroundEndRound) doEndRoundJobByNode() bool {
return false
}

// if proof not nil, it was created and broadcasted so it has to be added to the pool
if proof != nil {
ok := sr.EquivalentProofsPool().AddProof(proof)
if !ok {
log.Trace("doEndRoundJobByNode.AddProof", "added", ok)
}
}

sr.SetStatus(sr.Current(), spos.SsFinished)

sr.worker.DisplayStatistics()
Expand All @@ -281,42 +298,44 @@ func (sr *subroundEndRound) doEndRoundJobByNode() bool {
return true
}

func (sr *subroundEndRound) sendProof() (data.HeaderProofHandler, bool) {
func (sr *subroundEndRound) sendProof() {
if !sr.shouldSendProof() {
return nil, true
return
}

bitmap := sr.GenerateBitmap(bls.SrSignature)
err := sr.checkSignaturesValidity(bitmap)
if err != nil {
log.Debug("sendProof.checkSignaturesValidity", "error", err.Error())
return nil, false
return
}

// Aggregate signatures, handle invalid signers and send final info if needed
bitmap, sig, err := sr.aggregateSigsAndHandleInvalidSigners(bitmap)
if err != nil {
log.Debug("sendProof.aggregateSigsAndHandleInvalidSigners", "error", err.Error())
return nil, false
return
}

ok := sr.ScheduledProcessor().IsProcessedOKWithTimeout()
// placeholder for subroundEndRound.doEndRoundJobByLeader script
if !ok {
return nil, false
return
}

roundHandler := sr.RoundHandler()
if roundHandler.RemainingTime(roundHandler.TimeStamp(), roundHandler.TimeDuration()) < 0 {
log.Debug("sendProof: time is out -> cancel broadcasting final info and header",
"round time stamp", roundHandler.TimeStamp(),
"current time", time.Now())
return nil, false
return
}

// broadcast header proof
proof, err := sr.createAndBroadcastProof(sig, bitmap)
return proof, err == nil
err = sr.createAndBroadcastProof(sig, bitmap)
if err != nil {
log.Warn("sendProof.createAndBroadcastProof", "error", err.Error())
}
}

func (sr *subroundEndRound) shouldSendProof() bool {
Expand Down Expand Up @@ -524,7 +543,7 @@ func (sr *subroundEndRound) computeAggSigOnValidNodes() ([]byte, []byte, error)
return bitmap, sig, nil
}

func (sr *subroundEndRound) createAndBroadcastProof(signature []byte, bitmap []byte) (*block.HeaderProof, error) {
func (sr *subroundEndRound) createAndBroadcastProof(signature []byte, bitmap []byte) error {
headerProof := &block.HeaderProof{
PubKeysBitmap: bitmap,
AggregatedSignature: signature,
Expand All @@ -533,18 +552,19 @@ func (sr *subroundEndRound) createAndBroadcastProof(signature []byte, bitmap []b
HeaderNonce: sr.GetHeader().GetNonce(),
HeaderShardId: sr.GetHeader().GetShardID(),
HeaderRound: sr.GetHeader().GetRound(),
IsStartOfEpoch: sr.GetHeader().IsStartOfEpochBlock(),
}

err := sr.BroadcastMessenger().BroadcastEquivalentProof(headerProof, []byte(sr.SelfPubKey()))
if err != nil {
return nil, err
return err
}

log.Debug("step 3: block header proof has been sent",
"PubKeysBitmap", bitmap,
"AggregateSignature", signature)

return headerProof, nil
return nil
}

func (sr *subroundEndRound) createAndBroadcastInvalidSigners(invalidSigners []byte) {
Expand Down
33 changes: 20 additions & 13 deletions consensus/spos/bls/v2/subroundEndRound_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,11 @@ func TestSubroundEndRound_DoEndRoundJobAllOK(t *testing.T) {
t.Parallel()

container := consensusMocks.InitConsensusCore()
container.SetEquivalentProofsPool(&dataRetriever.ProofsPoolMock{
HasProofCalled: func(shardID uint32, headerHash []byte) bool {
return true
},
})
sr := initSubroundEndRoundWithContainer(container, &statusHandler.AppStatusHandlerStub{})
sr.SetSelfPubKey("A")

Expand Down Expand Up @@ -1176,6 +1181,16 @@ func TestSubroundEndRound_DoEndRoundJobByNode(t *testing.T) {
t.Parallel()

container := consensusMocks.InitConsensusCore()
numCalls := 0
container.SetEquivalentProofsPool(&dataRetriever.ProofsPoolMock{
HasProofCalled: func(shardID uint32, headerHash []byte) bool {
if numCalls <= 1 {
numCalls++
return false
}
return true
},
})
sr := initSubroundEndRoundWithContainer(container, &statusHandler.AppStatusHandlerStub{})

verifySigShareNumCalls := 0
Expand Down Expand Up @@ -1227,31 +1242,24 @@ func TestSubroundEndRound_DoEndRoundJobByNode(t *testing.T) {
t.Run("should work with equivalent messages flag active", func(t *testing.T) {
t.Parallel()

providedPrevSig := []byte("prev sig")
providedPrevBitmap := []byte{1, 1, 1, 1, 1, 1, 1, 1, 1}
container := consensusMocks.InitConsensusCore()
container.SetBlockchain(&testscommon.ChainHandlerStub{
GetGenesisHeaderCalled: func() data.HeaderHandler {
return &block.HeaderV2{}
},
})
container.SetEquivalentProofsPool(&dataRetriever.ProofsPoolMock{
HasProofCalled: func(shardID uint32, headerHash []byte) bool {
return true
},
})
enableEpochsHandler := &enableEpochsHandlerMock.EnableEpochsHandlerStub{
IsFlagEnabledInEpochCalled: func(flag core.EnableEpochFlag, epoch uint32) bool {
return flag == common.EquivalentMessagesFlag
},
}
container.SetEnableEpochsHandler(enableEpochsHandler)

wasSetCurrentHeaderProofCalled := false
container.SetEquivalentProofsPool(&dataRetriever.ProofsPoolMock{
AddProofCalled: func(headerProof data.HeaderProofHandler) bool {
wasSetCurrentHeaderProofCalled = true
require.NotEqual(t, providedPrevSig, headerProof.GetAggregatedSignature())
require.NotEqual(t, providedPrevBitmap, headerProof.GetPubKeysBitmap())
return true
},
})

ch := make(chan bool, 1)
consensusState := initializers.InitConsensusState()
sr, _ := spos.NewSubround(
Expand Down Expand Up @@ -1295,7 +1303,6 @@ func TestSubroundEndRound_DoEndRoundJobByNode(t *testing.T) {

r := srEndRound.DoEndRoundJobByNode()
require.True(t, r)
require.True(t, wasSetCurrentHeaderProofCalled)
})
}

Expand Down
17 changes: 8 additions & 9 deletions dataRetriever/dataPool/proofsCache/proofsPool.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,20 +53,22 @@ func (pp *proofsPool) AddProof(
}

pp.mutCache.Lock()
defer pp.mutCache.Unlock()

proofsPerShard, ok := pp.cache[shardID]
if !ok {
proofsPerShard = newProofsCache()
pp.cache[shardID] = proofsPerShard
}
pp.mutCache.Unlock()

log.Trace("added proof to pool",
log.Debug("added proof to pool",
"header hash", headerProof.GetHeaderHash(),
"epoch", headerProof.GetHeaderEpoch(),
"nonce", headerProof.GetHeaderNonce(),
"shardID", headerProof.GetHeaderShardId(),
"pubKeys bitmap", headerProof.GetPubKeysBitmap(),
"round", headerProof.GetHeaderRound(),
"nonce", headerProof.GetHeaderNonce(),
"isStartOfEpoch", headerProof.GetIsStartOfEpoch(),
)

proofsPerShard.addProof(headerProof)
Expand Down Expand Up @@ -98,9 +100,8 @@ func (pp *proofsPool) CleanupProofsBehindNonce(shardID uint32, nonce uint64) err
nonce -= pp.cleanupNonceDelta

pp.mutCache.RLock()
defer pp.mutCache.RUnlock()

proofsPerShard, ok := pp.cache[shardID]
pp.mutCache.RUnlock()
if !ok {
return fmt.Errorf("%w: proofs cache per shard not found, shard ID: %d", ErrMissingProof, shardID)
}
Expand All @@ -123,16 +124,14 @@ func (pp *proofsPool) GetProof(
if headerHash == nil {
return nil, fmt.Errorf("nil header hash")
}

pp.mutCache.RLock()
defer pp.mutCache.RUnlock()

log.Trace("trying to get proof",
"headerHash", headerHash,
"shardID", shardID,
)

pp.mutCache.RLock()
proofsPerShard, ok := pp.cache[shardID]
pp.mutCache.RUnlock()
if !ok {
return nil, fmt.Errorf("%w: proofs cache per shard not found, shard ID: %d", ErrMissingProof, shardID)
}
Expand Down
4 changes: 2 additions & 2 deletions epochStart/bootstrap/epochStartMetaBlockProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ func (e *epochStartMetaBlockProcessor) waitForConfMetaBlock(ctx context.Context,
return epochStart.ErrNilMetaBlock
}

err := e.requestConfirmationMetaBlock(metaBlock.GetNonce())
err := e.requestConfirmationMetaBlock(metaBlock.GetNonce() + 1)
if err != nil {
return err
}
Expand All @@ -278,7 +278,7 @@ func (e *epochStartMetaBlockProcessor) waitForConfMetaBlock(ctx context.Context,
case <-ctx.Done():
return epochStart.ErrTimeoutWaitingForMetaBlock
case <-chanRequests:
err = e.requestConfirmationMetaBlock(metaBlock.GetNonce())
err = e.requestConfirmationMetaBlock(metaBlock.GetNonce() + 1)
if err != nil {
return err
}
Expand Down
1 change: 1 addition & 0 deletions epochStart/shardchain/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,7 @@ func (t *trigger) receivedProof(headerProof data.HeaderProofHandler) {
if headerProof.GetHeaderShardId() != core.MetachainShardId {
return
}

t.mutTrigger.Lock()
defer t.mutTrigger.Unlock()

Expand Down
Loading

0 comments on commit 212798e

Please sign in to comment.