From 3d900f3aa5a2ae848a3fe50b115f82029dcc5a0b Mon Sep 17 00:00:00 2001 From: frozen <355847+Frozen@users.noreply.github.com> Date: Wed, 1 Jan 2025 18:17:09 -0400 Subject: [PATCH] Moved consensus fields to state struct. --- consensus/checks_test.go | 19 +++++ consensus/consensus.go | 26 ++----- consensus/consensus_service.go | 42 +++++------- consensus/consensus_service_test.go | 2 +- consensus/consensus_test.go | 4 +- consensus/consensus_v2.go | 5 +- consensus/construct.go | 30 ++++---- consensus/construct_test.go | 39 +++++------ consensus/debug.go | 2 +- consensus/leader.go | 8 +-- consensus/reward/schedule.go | 5 +- consensus/reward/schedule_test.go | 20 +++++- consensus/signature/signature_test.go | 27 ++++++++ consensus/state.go | 98 ++++++++++++++++++++++++++ consensus/state_test.go | 18 +++++ consensus/threshold.go | 6 +- consensus/validator.go | 22 +++--- consensus/view_change.go | 99 +++++++++++---------------- consensus/view_change_test.go | 12 ++-- consensus/votepower/roster_test.go | 8 +++ internal/chain/supply.go | 2 +- staking/network/reward.go | 2 +- 22 files changed, 320 insertions(+), 176 deletions(-) create mode 100644 consensus/checks_test.go create mode 100644 consensus/signature/signature_test.go create mode 100644 consensus/state.go create mode 100644 consensus/state_test.go diff --git a/consensus/checks_test.go b/consensus/checks_test.go new file mode 100644 index 0000000000..5114f64531 --- /dev/null +++ b/consensus/checks_test.go @@ -0,0 +1,19 @@ +package consensus + +import ( + "testing" + + msg_pb "github.com/harmony-one/harmony/api/proto/message" + "github.com/stretchr/testify/require" +) + +// verifyMessageSig modifies the message signature when returns error +func TestVerifyMessageSig(t *testing.T) { + message := &msg_pb.Message{ + Signature: []byte("signature"), + } + + err := verifyMessageSig(nil, message) + require.Error(t, err) + require.Empty(t, message.Signature) +} diff --git a/consensus/consensus.go b/consensus/consensus.go index d1291152e2..fd95bfb51d 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -5,7 +5,6 @@ import ( "sync" "sync/atomic" "time" - "unsafe" "github.com/harmony-one/abool" bls_core "github.com/harmony-one/bls/ffi/go/bls" @@ -65,8 +64,6 @@ type Consensus struct { decider quorum.Decider // FBFTLog stores the pbft messages and blocks during FBFT process fBFTLog *FBFTLog - // phase: different phase of FBFT protocol: pre-prepare, prepare, commit, finish etc - phase FBFTPhase // current indicates what state a node is in current State // isBackup declarative the node is in backup mode @@ -89,15 +86,7 @@ type Consensus struct { MinPeers int // private/public keys of current node priKey multibls.PrivateKeys - // the publickey of leader - leaderPubKey unsafe.Pointer //*bls.PublicKeyWrapper - // blockNum: the next blockNumber that FBFT is going to agree on, - // should be equal to the blockNumber of next block - blockNum uint64 - // Blockhash - 32 byte - blockHash [32]byte - // Block to run consensus on - block []byte + // Shard Id which this node belongs to ShardID uint32 // IgnoreViewIDCheck determines whether to ignore viewID check @@ -241,13 +230,11 @@ func (consensus *Consensus) getPublicKeys() multibls.PublicKeys { } func (consensus *Consensus) GetLeaderPubKey() *bls_cosi.PublicKeyWrapper { - consensus.mutex.RLock() - defer consensus.mutex.RUnlock() return consensus.getLeaderPubKey() } func (consensus *Consensus) getLeaderPubKey() *bls_cosi.PublicKeyWrapper { - return (*bls_cosi.PublicKeyWrapper)(atomic.LoadPointer(&consensus.leaderPubKey)) + return consensus.current.getLeaderPubKey() } func (consensus *Consensus) SetLeaderPubKey(pub *bls_cosi.PublicKeyWrapper) { @@ -255,7 +242,7 @@ func (consensus *Consensus) SetLeaderPubKey(pub *bls_cosi.PublicKeyWrapper) { } func (consensus *Consensus) setLeaderPubKey(pub *bls_cosi.PublicKeyWrapper) { - atomic.StorePointer(&consensus.leaderPubKey, unsafe.Pointer(pub)) + consensus.current.setLeaderPubKey(pub) } func (consensus *Consensus) GetPrivateKeys() multibls.PrivateKeys { @@ -284,11 +271,11 @@ func (consensus *Consensus) IsBackup() bool { } func (consensus *Consensus) BlockNum() uint64 { - return atomic.LoadUint64(&consensus.blockNum) + return consensus.getBlockNum() } func (consensus *Consensus) getBlockNum() uint64 { - return atomic.LoadUint64(&consensus.blockNum) + return atomic.LoadUint64(&consensus.current.blockNum) } // New create a new Consensus record @@ -301,8 +288,7 @@ func New( mutex: &sync.RWMutex{}, ShardID: shard, fBFTLog: NewFBFTLog(), - phase: FBFTAnnounce, - current: NewState(Normal), + current: NewState(Normal, shard), decider: Decider, registry: registry, MinPeers: minPeers, diff --git a/consensus/consensus_service.go b/consensus/consensus_service.go index 9307aec36f..7007439823 100644 --- a/consensus/consensus_service.go +++ b/consensus/consensus_service.go @@ -2,7 +2,6 @@ package consensus import ( "math/big" - "sync/atomic" "time" "github.com/ethereum/go-ethereum/common" @@ -65,7 +64,7 @@ var ( // Signs the consensus message and returns the marshaled message. func (consensus *Consensus) signAndMarshalConsensusMessage(message *msg_pb.Message, priKey *bls_core.SecretKey) ([]byte, error) { - if err := consensus.signConsensusMessage(message, priKey); err != nil { + if err := signConsensusMessage(message, priKey); err != nil { return empty, err } marshaledMessage, err := protobuf.Marshal(message) @@ -113,14 +112,14 @@ func (consensus *Consensus) updatePublicKeys(pubKeys, allowlist []bls_cosi.Publi } // Sign on the hash of the message -func (consensus *Consensus) signMessage(message []byte, priKey *bls_core.SecretKey) []byte { +func signMessage(message []byte, priKey *bls_core.SecretKey) []byte { hash := hash.Keccak256(message) signature := priKey.SignHash(hash[:]) return signature.Serialize() } // Sign on the consensus message signature field. -func (consensus *Consensus) signConsensusMessage(message *msg_pb.Message, +func signConsensusMessage(message *msg_pb.Message, priKey *bls_core.SecretKey) error { message.Signature = nil marshaledMessage, err := protobuf.Marshal(message) @@ -128,7 +127,7 @@ func (consensus *Consensus) signConsensusMessage(message *msg_pb.Message, return err } // 64 byte of signature on previous data - signature := consensus.signMessage(marshaledMessage, priKey) + signature := signMessage(marshaledMessage, priKey) message.Signature = signature return nil } @@ -136,7 +135,7 @@ func (consensus *Consensus) signConsensusMessage(message *msg_pb.Message, // UpdateBitmaps update the bitmaps for prepare and commit phase func (consensus *Consensus) updateBitmaps() { consensus.getLogger().Debug(). - Str("MessageType", consensus.phase.String()). + Str("MessageType", consensus.current.phase.String()). Msg("[UpdateBitmaps] Updating consensus bitmaps") members := consensus.decider.Participants() prepareBitmap := bls_cosi.NewMask(members) @@ -199,8 +198,8 @@ func (consensus *Consensus) sendLastSignPower() { func (consensus *Consensus) resetState() { consensus.switchPhase("ResetState", FBFTAnnounce) - consensus.blockHash = [32]byte{} - consensus.block = []byte{} + consensus.current.blockHash = [32]byte{} + consensus.current.block = []byte{} consensus.decider.ResetPrepareAndCommitVotes() if consensus.prepareBitmap != nil { consensus.prepareBitmap.Clear() @@ -295,12 +294,12 @@ func (consensus *Consensus) checkViewID(msg *FBFTMessage) error { // SetBlockNum sets the blockNum in consensus object, called at node bootstrap func (consensus *Consensus) SetBlockNum(blockNum uint64) { - atomic.StoreUint64(&consensus.blockNum, blockNum) + consensus.setBlockNum(blockNum) } // SetBlockNum sets the blockNum in consensus object, called at node bootstrap func (consensus *Consensus) setBlockNum(blockNum uint64) { - atomic.StoreUint64(&consensus.blockNum, blockNum) + consensus.current.setBlockNum(blockNum) } // ReadSignatureBitmapPayload read the payload for signature and bitmap; offset is the beginning position of reading @@ -308,10 +307,10 @@ func (consensus *Consensus) ReadSignatureBitmapPayload(recvPayload []byte, offse consensus.mutex.RLock() members := consensus.decider.Participants() consensus.mutex.RUnlock() - return consensus.readSignatureBitmapPayload(recvPayload, offset, members) + return readSignatureBitmapPayload(recvPayload, offset, members) } -func (consensus *Consensus) readSignatureBitmapPayload(recvPayload []byte, offset int, members multibls.PublicKeys) (*bls_core.Sign, *bls_cosi.Mask, error) { +func readSignatureBitmapPayload(recvPayload []byte, offset int, members multibls.PublicKeys) (*bls_core.Sign, *bls_cosi.Mask, error) { if offset+bls.BLSSignatureSizeInBytes > len(recvPayload) { return nil, nil, errors.New("payload not have enough length") } @@ -596,12 +595,7 @@ func (consensus *Consensus) GetFinality() int64 { // switchPhase will switch FBFTPhase to desired phase. func (consensus *Consensus) switchPhase(subject string, desired FBFTPhase) { - consensus.getLogger().Info(). - Str("from:", consensus.phase.String()). - Str("to:", desired.String()). - Str("switchPhase:", subject) - - consensus.phase = desired + consensus.current.switchPhase(subject, desired) } var ( @@ -623,7 +617,7 @@ func (consensus *Consensus) selfCommit(payload []byte) error { return errGetPreparedBlock } - aggSig, mask, err := consensus.readSignatureBitmapPayload(payload, 32, consensus.decider.Participants()) + aggSig, mask, err := readSignatureBitmapPayload(payload, 32, consensus.decider.Participants()) if err != nil { return errReadBitmapPayload } @@ -631,7 +625,7 @@ func (consensus *Consensus) selfCommit(payload []byte) error { // Have to keep the block hash so the leader can finish the commit phase of prepared block consensus.resetState() - copy(consensus.blockHash[:], blockHash[:]) + copy(consensus.current.blockHash[:], blockHash[:]) consensus.switchPhase("selfCommit", FBFTCommit) consensus.aggregatedPrepareSig = aggSig consensus.prepareBitmap = mask @@ -651,7 +645,7 @@ func (consensus *Consensus) selfCommit(payload []byte) error { quorum.Commit, []*bls_cosi.PublicKeyWrapper{key.Pub}, key.Pri.SignHash(commitPayload), - common.BytesToHash(consensus.blockHash[:]), + common.BytesToHash(consensus.current.blockHash[:]), block.NumberU64(), block.Header().ViewID().Uint64(), ); err != nil { @@ -697,9 +691,9 @@ func (consensus *Consensus) GetLogger() *zerolog.Logger { func (consensus *Consensus) getLogger() *zerolog.Logger { logger := utils.Logger().With(). Uint32("shardID", consensus.ShardID). - Uint64("myBlock", consensus.blockNum). - Uint64("myViewID", consensus.getCurBlockViewID()). - Str("phase", consensus.phase.String()). + Uint64("myBlock", consensus.current.getBlockNum()). + Uint64("myViewID", consensus.current.getCurBlockViewID()). + Str("phase", consensus.current.phase.String()). Str("mode", consensus.current.Mode().String()). Logger() return &logger diff --git a/consensus/consensus_service_test.go b/consensus/consensus_service_test.go index b3e2e73ac9..7379ab105f 100644 --- a/consensus/consensus_service_test.go +++ b/consensus/consensus_service_test.go @@ -34,7 +34,7 @@ func TestSignAndMarshalConsensusMessage(t *testing.T) { t.Fatalf("Cannot craeate consensus: %v", err) } consensus.SetCurBlockViewID(2) - consensus.blockHash = [32]byte{} + consensus.current.blockHash = [32]byte{} msg := &msg_pb.Message{} marshaledMessage, err := consensus.signAndMarshalConsensusMessage(msg, blsPriKey) diff --git a/consensus/consensus_test.go b/consensus/consensus_test.go index 794f8ecdb0..38e9719c5b 100644 --- a/consensus/consensus_test.go +++ b/consensus/consensus_test.go @@ -23,7 +23,7 @@ func TestConsensusInitialization(t *testing.T) { assert.NoError(t, err) messageSender := &MessageSender{host: host, retryTimes: int(phaseDuration.Seconds()) / RetryIntervalInSec} - state := NewState(Normal) + state := NewState(Normal, consensus.ShardID) timeouts := createTimeout() expectedTimeouts := make(map[TimeoutType]time.Duration) @@ -37,7 +37,7 @@ func TestConsensusInitialization(t *testing.T) { // FBFTLog assert.NotNil(t, consensus.FBFTLog()) - assert.Equal(t, FBFTAnnounce, consensus.phase) + assert.Equal(t, FBFTAnnounce, consensus.current.phase) // State / consensus.current assert.Equal(t, state.mode, consensus.current.mode) diff --git a/consensus/consensus_v2.go b/consensus/consensus_v2.go index f59fa5bd1e..947b392370 100644 --- a/consensus/consensus_v2.go +++ b/consensus/consensus_v2.go @@ -5,7 +5,6 @@ import ( "context" "encoding/hex" "math/big" - "sync/atomic" "time" "github.com/ethereum/go-ethereum/common" @@ -179,7 +178,7 @@ func (consensus *Consensus) _finalCommit(isLeader bool) { ) consensus.fBFTLog.AddVerifiedMessage(FBFTMsg) // find correct block content - curBlockHash := consensus.blockHash + curBlockHash := consensus.current.blockHash block := consensus.fBFTLog.GetBlockByHash(curBlockHash) if block == nil { consensus.getLogger().Warn(). @@ -841,7 +840,7 @@ func (consensus *Consensus) rotateLeader(epoch *big.Int, defaultKey *bls.PublicK // SetupForNewConsensus sets the state for new consensus func (consensus *Consensus) setupForNewConsensus(blk *types.Block, committedMsg *FBFTMessage) { - atomic.StoreUint64(&consensus.blockNum, blk.NumberU64()+1) + consensus.setBlockNum(blk.NumberU64() + 1) consensus.setCurBlockViewID(committedMsg.ViewID + 1) var epoch *big.Int if blk.IsLastBlockInEpoch() { diff --git a/consensus/construct.go b/consensus/construct.go index 01d02a4d41..ebca0a5379 100644 --- a/consensus/construct.go +++ b/consensus/construct.go @@ -24,32 +24,32 @@ type NetworkMessage struct { } // Populates the common basic fields for all consensus message. -func (consensus *Consensus) populateMessageFields( +func (pm *State) populateMessageFields( request *msg_pb.ConsensusRequest, blockHash []byte, ) *msg_pb.ConsensusRequest { - request.ViewId = consensus.getCurBlockViewID() - request.BlockNum = consensus.getBlockNum() - request.ShardId = consensus.ShardID + request.ViewId = pm.getCurBlockViewID() + request.BlockNum = pm.getBlockNum() + request.ShardId = pm.ShardID // 32 byte block hash request.BlockHash = blockHash return request } // Populates the common basic fields for the consensus message and senders bitmap. -func (consensus *Consensus) populateMessageFieldsAndSendersBitmap( +func (pm *State) populateMessageFieldsAndSendersBitmap( request *msg_pb.ConsensusRequest, blockHash []byte, bitmap []byte, ) *msg_pb.ConsensusRequest { - consensus.populateMessageFields(request, blockHash) + pm.populateMessageFields(request, blockHash) // sender address request.SenderPubkeyBitmap = bitmap return request } // Populates the common basic fields for the consensus message and single sender. -func (consensus *Consensus) populateMessageFieldsAndSender( +func (pm *State) populateMessageFieldsAndSender( request *msg_pb.ConsensusRequest, blockHash []byte, pubKey bls.SerializedPublicKey, ) *msg_pb.ConsensusRequest { - consensus.populateMessageFields(request, blockHash) + pm.populateMessageFields(request, blockHash) // sender address request.SenderPubkey = pubKey[:] return request @@ -75,8 +75,8 @@ func (consensus *Consensus) construct( ) if len(priKeys) == 1 { - consensusMsg = consensus.populateMessageFieldsAndSender( - message.GetConsensus(), consensus.blockHash[:], priKeys[0].Pub.Bytes, + consensusMsg = consensus.current.populateMessageFieldsAndSender( + message.GetConsensus(), consensus.current.blockHash[:], priKeys[0].Pub.Bytes, ) } else { // TODO: use a persistent bitmap to report bitmap @@ -84,8 +84,8 @@ func (consensus *Consensus) construct( for _, key := range priKeys { mask.SetKey(key.Pub.Bytes, true) } - consensusMsg = consensus.populateMessageFieldsAndSendersBitmap( - message.GetConsensus(), consensus.blockHash[:], mask.Bitmap, + consensusMsg = consensus.current.populateMessageFieldsAndSendersBitmap( + message.GetConsensus(), consensus.current.blockHash[:], mask.Bitmap, ) } @@ -93,8 +93,8 @@ func (consensus *Consensus) construct( needMsgSig := true switch p { case msg_pb.MessageType_ANNOUNCE: - consensusMsg.Block = consensus.block - consensusMsg.Payload = consensus.blockHash[:] + consensusMsg.Block = consensus.current.block + consensusMsg.Payload = consensus.current.blockHash[:] case msg_pb.MessageType_PREPARE: needMsgSig = false sig := bls_core.Sign{} @@ -114,7 +114,7 @@ func (consensus *Consensus) construct( } consensusMsg.Payload = sig.Serialize() case msg_pb.MessageType_PREPARED: - consensusMsg.Block = consensus.block + consensusMsg.Block = consensus.current.block consensusMsg.Payload = consensus.constructQuorumSigAndBitmap(quorum.Prepare) case msg_pb.MessageType_COMMITTED: consensusMsg.Payload = consensus.constructQuorumSigAndBitmap(quorum.Commit) diff --git a/consensus/construct_test.go b/consensus/construct_test.go index 24cd9fc3bc..96bb21390c 100644 --- a/consensus/construct_test.go +++ b/consensus/construct_test.go @@ -2,7 +2,6 @@ package consensus import ( "bytes" - "sync/atomic" "testing" "github.com/ethereum/go-ethereum/common" @@ -45,7 +44,7 @@ func TestConstructAnnounceMessage(test *testing.T) { if err != nil { test.Fatalf("Cannot create consensus: %v", err) } - consensus.blockHash = [32]byte{} + consensus.current.blockHash = [32]byte{} pubKeyWrapper := bls.PublicKeyWrapper{Object: blsPriKey.GetPublicKey()} pubKeyWrapper.Bytes.FromLibBLSPublicKey(pubKeyWrapper.Object) priKeyWrapper := bls.PrivateKeyWrapper{Pri: blsPriKey, Pub: &pubKeyWrapper} @@ -80,7 +79,7 @@ func TestConstructPreparedMessage(test *testing.T) { } consensus.resetState() consensus.updateBitmaps() - consensus.blockHash = [32]byte{} + consensus.current.blockHash = [32]byte{} message := "test string" leaderKey := bls.SerializedPublicKey{} @@ -93,7 +92,7 @@ func TestConstructPreparedMessage(test *testing.T) { quorum.Prepare, []*bls.PublicKeyWrapper{&leaderKeyWrapper}, leaderPriKey.Sign(message), - common.BytesToHash(consensus.blockHash[:]), + common.BytesToHash(consensus.current.blockHash[:]), consensus.BlockNum(), consensus.GetCurBlockViewID(), ) @@ -101,7 +100,7 @@ func TestConstructPreparedMessage(test *testing.T) { quorum.Prepare, []*bls.PublicKeyWrapper{&validatorKeyWrapper}, validatorPriKey.Sign(message), - common.BytesToHash(consensus.blockHash[:]), + common.BytesToHash(consensus.current.blockHash[:]), consensus.BlockNum(), consensus.GetCurBlockViewID(), ); err != nil { @@ -162,11 +161,11 @@ func TestConstructPrepareMessage(test *testing.T) { consensus.UpdatePublicKeys([]bls.PublicKeyWrapper{pubKeyWrapper1, pubKeyWrapper2}, []bls.PublicKeyWrapper{}) consensus.SetCurBlockViewID(2) - consensus.blockHash = [32]byte{} - copy(consensus.blockHash[:], []byte("random")) - atomic.StoreUint64(&consensus.blockNum, 1000) + consensus.current.blockHash = [32]byte{} + copy(consensus.current.blockHash[:], []byte("random")) + consensus.setBlockNum(1000) - sig := priKeyWrapper1.Pri.SignHash(consensus.blockHash[:]) + sig := priKeyWrapper1.Pri.SignHash(consensus.current.blockHash[:]) network, err := consensus.construct(msg_pb.MessageType_PREPARE, nil, []*bls.PrivateKeyWrapper{&priKeyWrapper1}) if err != nil { @@ -175,7 +174,7 @@ func TestConstructPrepareMessage(test *testing.T) { if network.MessageType != msg_pb.MessageType_PREPARE { test.Errorf("MessageType is not populated correctly") } - if network.FBFTMsg.BlockHash != consensus.blockHash { + if network.FBFTMsg.BlockHash != consensus.current.blockHash { test.Errorf("BlockHash is not populated correctly") } if network.FBFTMsg.ViewID != consensus.GetCurBlockViewID() { @@ -191,7 +190,7 @@ func TestConstructPrepareMessage(test *testing.T) { keys := []*bls.PrivateKeyWrapper{&priKeyWrapper1, &priKeyWrapper2} aggSig := bls_core.Sign{} for _, priKey := range keys { - if s := priKey.Pri.SignHash(consensus.blockHash[:]); s != nil { + if s := priKey.Pri.SignHash(consensus.current.blockHash[:]); s != nil { aggSig.Add(s) } } @@ -203,7 +202,7 @@ func TestConstructPrepareMessage(test *testing.T) { if network.MessageType != msg_pb.MessageType_PREPARE { test.Errorf("MessageType is not populated correctly") } - if network.FBFTMsg.BlockHash != consensus.blockHash { + if network.FBFTMsg.BlockHash != consensus.current.blockHash { test.Errorf("BlockHash is not populated correctly") } if network.FBFTMsg.ViewID != consensus.GetCurBlockViewID() { @@ -252,9 +251,9 @@ func TestConstructCommitMessage(test *testing.T) { consensus.UpdatePublicKeys([]bls.PublicKeyWrapper{pubKeyWrapper1, pubKeyWrapper2}, []bls.PublicKeyWrapper{}) consensus.SetCurBlockViewID(2) - consensus.blockHash = [32]byte{} - copy(consensus.blockHash[:], []byte("random")) - atomic.StoreUint64(&consensus.blockNum, 1000) + consensus.current.blockHash = [32]byte{} + copy(consensus.current.blockHash[:], []byte("random")) + consensus.setBlockNum(1000) sigPayload := []byte("payload") @@ -267,7 +266,7 @@ func TestConstructCommitMessage(test *testing.T) { if network.MessageType != msg_pb.MessageType_COMMIT { test.Errorf("MessageType is not populated correctly") } - if network.FBFTMsg.BlockHash != consensus.blockHash { + if network.FBFTMsg.BlockHash != consensus.current.blockHash { test.Errorf("BlockHash is not populated correctly") } if network.FBFTMsg.ViewID != consensus.GetCurBlockViewID() { @@ -295,7 +294,7 @@ func TestConstructCommitMessage(test *testing.T) { if network.MessageType != msg_pb.MessageType_COMMIT { test.Errorf("MessageType is not populated correctly") } - if network.FBFTMsg.BlockHash != consensus.blockHash { + if network.FBFTMsg.BlockHash != consensus.current.blockHash { test.Errorf("BlockHash is not populated correctly") } if network.FBFTMsg.ViewID != consensus.GetCurBlockViewID() { @@ -334,7 +333,7 @@ func TestPopulateMessageFields(t *testing.T) { } consensus.SetCurBlockViewID(2) blockHash := [32]byte{} - consensus.blockHash = blockHash + consensus.current.blockHash = blockHash msg := &msg_pb.Message{ Request: &msg_pb.Message_Consensus{ @@ -344,7 +343,7 @@ func TestPopulateMessageFields(t *testing.T) { keyBytes := bls.SerializedPublicKey{} keyBytes.FromLibBLSPublicKey(blsPriKey.GetPublicKey()) - consensusMsg := consensus.populateMessageFieldsAndSender(msg.GetConsensus(), consensus.blockHash[:], + consensusMsg := consensus.current.populateMessageFieldsAndSender(msg.GetConsensus(), consensus.current.blockHash[:], keyBytes) if consensusMsg.ViewId != 2 { @@ -366,7 +365,7 @@ func TestPopulateMessageFields(t *testing.T) { }, } bitmap := []byte("random bitmap") - consensusMsg = consensus.populateMessageFieldsAndSendersBitmap(msg.GetConsensus(), consensus.blockHash[:], + consensusMsg = consensus.current.populateMessageFieldsAndSendersBitmap(msg.GetConsensus(), consensus.current.blockHash[:], bitmap) if consensusMsg.ViewId != 2 { diff --git a/consensus/debug.go b/consensus/debug.go index c195305809..2766ede208 100644 --- a/consensus/debug.go +++ b/consensus/debug.go @@ -9,7 +9,7 @@ func (consensus *Consensus) GetConsensusPhase() string { // GetConsensusPhase returns the current phase of the consensus. func (consensus *Consensus) getConsensusPhase() string { - return consensus.phase.String() + return consensus.current.phase.String() } // GetConsensusMode returns the current mode of the consensus diff --git a/consensus/leader.go b/consensus/leader.go index e536ca0369..09fe576bc5 100644 --- a/consensus/leader.go +++ b/consensus/leader.go @@ -27,8 +27,8 @@ func (consensus *Consensus) announce(block *types.Block) { return } - copy(consensus.blockHash[:], blockHash[:]) - consensus.block = encodedBlock // Must set block bytes before consensus.construct() + copy(consensus.current.blockHash[:], blockHash[:]) + consensus.current.block = encodedBlock // Must set block bytes before consensus.construct() key, err := consensus.getConsensusLeaderPrivateKey() if err != nil { @@ -65,7 +65,7 @@ func (consensus *Consensus) announce(block *types.Block) { if _, err := consensus.decider.AddNewVote( quorum.Prepare, []*bls.PublicKeyWrapper{key.Pub}, - key.Pri.SignHash(consensus.blockHash[:]), + key.Pri.SignHash(consensus.current.blockHash[:]), block.Hash(), block.NumberU64(), block.Header().ViewID().Uint64(), @@ -121,7 +121,7 @@ func (consensus *Consensus) onPrepare(recvMsg *FBFTMessage) { return } - blockHash := consensus.blockHash[:] + blockHash := consensus.current.blockHash[:] prepareBitmap := consensus.prepareBitmap // proceed only when the message is not received before for _, signer := range recvMsg.SenderPubkeys { diff --git a/consensus/reward/schedule.go b/consensus/reward/schedule.go index d573ced74d..f9d4241679 100644 --- a/consensus/reward/schedule.go +++ b/consensus/reward/schedule.go @@ -7,7 +7,6 @@ import ( shardingconfig "github.com/harmony-one/harmony/internal/configs/sharding" "github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/numeric" - "github.com/harmony-one/harmony/shard" ) type pair struct { @@ -124,8 +123,8 @@ func mustParse(ts string) int64 { } // PercentageForTimeStamp .. -func PercentageForTimeStamp(ts int64) numeric.Dec { - if shard.Schedule.GetNetworkID() != shardingconfig.MainNet { +func PercentageForTimeStamp(schedule shardingconfig.Schedule, ts int64) numeric.Dec { + if schedule.GetNetworkID() != shardingconfig.MainNet { return numeric.MustNewDecFromStr("1") } diff --git a/consensus/reward/schedule_test.go b/consensus/reward/schedule_test.go index 5a4dcab4d6..8abf1712e0 100644 --- a/consensus/reward/schedule_test.go +++ b/consensus/reward/schedule_test.go @@ -3,7 +3,9 @@ package reward import ( "testing" + shardingconfig "github.com/harmony-one/harmony/internal/configs/sharding" "github.com/harmony-one/harmony/numeric" + "github.com/harmony-one/harmony/shard" ) func TestPercentageForTimeStamp(t *testing.T) { @@ -21,7 +23,7 @@ func TestPercentageForTimeStamp(t *testing.T) { } for _, tc := range testCases { - result := PercentageForTimeStamp(mustParse(tc.time)) + result := PercentageForTimeStamp(shard.Schedule, mustParse(tc.time)) expect := numeric.MustNewDecFromStr(tc.expected) if !result.Equal(expect) { t.Errorf("Time: %s, Chosen bucket percent: %s, Expected: %s", @@ -29,3 +31,19 @@ func TestPercentageForTimeStamp(t *testing.T) { } } } + +// tests panic for invalid timestamp +func TestMustParse(t *testing.T) { + defer func() { + if r := recover(); r == nil { + t.Errorf("mustParse did not panic") + } + }() + mustParse("invalid-timestamp") +} + +func TestPercentageForTimeStampForNonMainnet(t *testing.T) { + if p := PercentageForTimeStamp(shardingconfig.LocalnetSchedule, 0); !p.Equal(numeric.NewDec(1)) { + t.Errorf("Expected 1, got %s", p) + } +} diff --git a/consensus/signature/signature_test.go b/consensus/signature/signature_test.go new file mode 100644 index 0000000000..0db78e2699 --- /dev/null +++ b/consensus/signature/signature_test.go @@ -0,0 +1,27 @@ +package signature + +import ( + "math/big" + "testing" + + "github.com/ethereum/go-ethereum/common" + "github.com/harmony-one/harmony/internal/params" +) + +func TestConstructCommitPayload(t *testing.T) { + epoch := big.NewInt(0) + blockHash := common.Hash{} + blockNum := uint64(0) + viewID := uint64(0) + config := ¶ms.ChainConfig{ + StakingEpoch: new(big.Int).Set(epoch), + } + if rs := ConstructCommitPayload(config, epoch, blockHash, blockNum, viewID); len(rs) == 0 { + t.Error("ConstructCommitPayload failed") + } + + config.StakingEpoch = new(big.Int).Add(epoch, big.NewInt(1)) + if rs := ConstructCommitPayload(config, epoch, blockHash, blockNum, viewID); len(rs) == 0 { + t.Error("ConstructCommitPayload failed") + } +} diff --git a/consensus/state.go b/consensus/state.go new file mode 100644 index 0000000000..1175329c87 --- /dev/null +++ b/consensus/state.go @@ -0,0 +1,98 @@ +package consensus + +import ( + "sync/atomic" + "unsafe" + + bls_cosi "github.com/harmony-one/harmony/crypto/bls" + "github.com/harmony-one/harmony/internal/utils" + "github.com/rs/zerolog" +) + +// State contains current mode and current viewID +type State struct { + mode uint32 + + // blockNum: the next blockNumber that FBFT is going to agree on, + // should be equal to the blockNumber of next block + blockNum uint64 + + // current view id in normal mode + // it changes per successful consensus + blockViewID uint64 + + // view changing id is used during view change mode + // it is the next view id + viewChangingID uint64 + + // the publickey of leader + leaderPubKey unsafe.Pointer //*bls.PublicKeyWrapper + + // Blockhash - 32 byte + blockHash [32]byte + // Block to run consensus on + block []byte + + phase FBFTPhase + + ShardID uint32 +} + +func NewState(mode Mode, shardID uint32) State { + return State{ + mode: uint32(mode), + ShardID: shardID, + } +} + +func (pm *State) getBlockNum() uint64 { + return atomic.LoadUint64(&pm.blockNum) +} + +// SetBlockNum sets the blockNum in consensus object, called at node bootstrap +func (pm *State) setBlockNum(blockNum uint64) { + atomic.StoreUint64(&pm.blockNum, blockNum) +} + +// SetBlockNum sets the blockNum in consensus object, called at node bootstrap +func (pm *State) SetBlockNum(blockNum uint64) { + pm.setBlockNum(blockNum) +} + +func (pm *State) GetBlockNum() uint64 { + return pm.getBlockNum() +} + +func (pm *State) getLeaderPubKey() *bls_cosi.PublicKeyWrapper { + return (*bls_cosi.PublicKeyWrapper)(atomic.LoadPointer(&pm.leaderPubKey)) +} + +func (pm *State) setLeaderPubKey(pub *bls_cosi.PublicKeyWrapper) { + atomic.StorePointer(&pm.leaderPubKey, unsafe.Pointer(pub)) +} + +func (pm *State) getLogger() *zerolog.Logger { + logger := utils.Logger().With(). + Uint32("shardID", pm.ShardID). + Uint64("myBlock", pm.getBlockNum()). + Uint64("myViewID", pm.GetCurBlockViewID()). + Str("phase", pm.phase.String()). + Str("mode", pm.Mode().String()). + Logger() + return &logger +} + +// switchPhase will switch FBFTPhase to desired phase. +func (pm *State) switchPhase(subject string, desired FBFTPhase) { + pm.getLogger().Info(). + Str("from:", pm.phase.String()). + Str("to:", desired.String()). + Str("switchPhase:", subject) + + pm.phase = desired +} + +// GetCurBlockViewID returns the current view ID of the consensus +func (pm *State) getCurBlockViewID() uint64 { + return atomic.LoadUint64(&pm.blockViewID) +} diff --git a/consensus/state_test.go b/consensus/state_test.go new file mode 100644 index 0000000000..5263705184 --- /dev/null +++ b/consensus/state_test.go @@ -0,0 +1,18 @@ +package consensus_test + +import ( + "testing" + + "github.com/harmony-one/harmony/consensus" +) + +func TestState_SetBlockNum(t *testing.T) { + state := consensus.NewState(consensus.Normal, 0) + if state.GetBlockNum() == 1 { + t.Errorf("GetBlockNum expected not to be 1") + } + state.SetBlockNum(1) + if state.GetBlockNum() != 1 { + t.Errorf("SetBlockNum failed") + } +} diff --git a/consensus/threshold.go b/consensus/threshold.go index 339f6d2a7e..37693b2568 100644 --- a/consensus/threshold.go +++ b/consensus/threshold.go @@ -39,7 +39,7 @@ func (consensus *Consensus) didReachPrepareQuorum() error { consensus.fBFTLog.AddVerifiedMessage(FBFTMsg) // Leader add commit phase signature var blockObj types.Block - if err := rlp.DecodeBytes(consensus.block, &blockObj); err != nil { + if err := rlp.DecodeBytes(consensus.current.block, &blockObj); err != nil { consensus.getLogger().Warn(). Err(err). Uint64("BlockNum", consensus.BlockNum()). @@ -78,7 +78,7 @@ func (consensus *Consensus) didReachPrepareQuorum() error { consensus.getLogger().Warn().Msg("[OnPrepare] Cannot send prepared message") } else { consensus.getLogger().Info(). - Hex("blockHash", consensus.blockHash[:]). + Hex("blockHash", consensus.current.blockHash[:]). Uint64("blockNum", consensus.BlockNum()). Msg("[OnPrepare] Sent Prepared Message!!") } @@ -87,7 +87,7 @@ func (consensus *Consensus) didReachPrepareQuorum() error { consensus.msgSender.StopRetry(msg_pb.MessageType_COMMITTED) consensus.getLogger().Debug(). - Str("From", consensus.phase.String()). + Str("From", consensus.current.phase.String()). Str("To", FBFTCommit.String()). Msg("[OnPrepare] Switching phase") diff --git a/consensus/validator.go b/consensus/validator.go index d76ef34b84..06e8ca2652 100644 --- a/consensus/validator.go +++ b/consensus/validator.go @@ -33,7 +33,7 @@ func (consensus *Consensus) onAnnounce(msg *msg_pb.Message) { consensus.getLogger().Info(). Uint64("myBlockNum", consensus.BlockNum()). Uint64("MsgBlockNum", recvMsg.BlockNum). - Hex("myBlockHash", consensus.blockHash[:]). + Hex("myBlockHash", consensus.current.blockHash[:]). Hex("MsgBlockHash", recvMsg.BlockHash[:]). Msg("[OnCommitted] low consensus block number. Spin up state sync") consensus.spinUpStateSync() @@ -47,7 +47,7 @@ func (consensus *Consensus) onAnnounce(msg *msg_pb.Message) { Uint64("MsgBlockNum", recvMsg.BlockNum). Msg("[OnAnnounce] Announce message Added") consensus.fBFTLog.AddVerifiedMessage(recvMsg) - consensus.blockHash = recvMsg.BlockHash + consensus.current.blockHash = recvMsg.BlockHash // we have already added message and block, skip check viewID // and send prepare message if is in ViewChanging mode if consensus.isViewChangingMode() { @@ -125,7 +125,7 @@ func (consensus *Consensus) validateNewBlock(recvMsg *FBFTMessage) (*types.Block // add block field blockPayload := make([]byte, len(recvMsg.Block)) copy(blockPayload[:], recvMsg.Block[:]) - consensus.block = blockPayload + consensus.current.block = blockPayload recvMsg.Block = []byte{} // save memory space consensus.fBFTLog.AddVerifiedMessage(recvMsg) consensus.getLogger().Debug(). @@ -158,7 +158,7 @@ func (consensus *Consensus) prepare() { consensus.getLogger().Warn().Err(err).Msg("[OnAnnounce] Cannot send prepare message") } else { consensus.getLogger().Info(). - Str("blockHash", hex.EncodeToString(consensus.blockHash[:])). + Str("blockHash", hex.EncodeToString(consensus.current.blockHash[:])). Msg("[OnAnnounce] Sent Prepare Message!!") } } @@ -186,7 +186,7 @@ func (consensus *Consensus) sendCommitMessages(blockObj *types.Block) { } else { consensus.getLogger().Info(). Uint64("blockNum", consensus.BlockNum()). - Hex("blockHash", consensus.blockHash[:]). + Hex("blockHash", consensus.current.blockHash[:]). Msg("[sendCommitMessages] Sent Commit Message!!") } } @@ -208,7 +208,7 @@ func (consensus *Consensus) onPrepared(recvMsg *FBFTMessage) { consensus.getLogger().Warn(). Uint64("myBlockNum", consensus.BlockNum()). Uint64("MsgBlockNum", recvMsg.BlockNum). - Hex("myBlockHash", consensus.blockHash[:]). + Hex("myBlockHash", consensus.current.blockHash[:]). Hex("MsgBlockHash", recvMsg.BlockHash[:]). Msgf("[OnPrepared] low consensus block number. Spin sync") consensus.spinUpStateSync() @@ -216,7 +216,7 @@ func (consensus *Consensus) onPrepared(recvMsg *FBFTMessage) { // check validity of prepared signature blockHash := recvMsg.BlockHash - aggSig, mask, err := consensus.readSignatureBitmapPayload(recvMsg.Payload, 0, consensus.decider.Participants()) + aggSig, mask, err := readSignatureBitmapPayload(recvMsg.Payload, 0, consensus.decider.Participants()) if err != nil { consensus.getLogger().Error().Err(err).Msg("ReadSignatureBitmapPayload failed!") return @@ -227,7 +227,7 @@ func (consensus *Consensus) onPrepared(recvMsg *FBFTMessage) { } if !aggSig.VerifyHash(mask.AggregatePublic, blockHash[:]) { myBlockHash := common.Hash{} - myBlockHash.SetBytes(consensus.blockHash[:]) + myBlockHash.SetBytes(consensus.current.blockHash[:]) consensus.getLogger().Warn(). Uint64("MsgBlockNum", recvMsg.BlockNum). Uint64("MsgViewID", recvMsg.ViewID). @@ -265,7 +265,7 @@ func (consensus *Consensus) onPrepared(recvMsg *FBFTMessage) { consensus.prepareBitmap = mask // Optimistically add blockhash field of prepare message - copy(consensus.blockHash[:], blockHash[:]) + copy(consensus.current.blockHash[:], blockHash[:]) // tryCatchup is also run in onCommitted(), so need to lock with commitMutex. if consensus.current.Mode() == Normal { @@ -314,7 +314,7 @@ func (consensus *Consensus) onCommitted(recvMsg *FBFTMessage) { consensus.getLogger().Info(). Uint64("myBlockNum", consensus.BlockNum()). Uint64("MsgBlockNum", recvMsg.BlockNum). - Hex("myBlockHash", consensus.blockHash[:]). + Hex("myBlockHash", consensus.current.blockHash[:]). Hex("MsgBlockHash", recvMsg.BlockHash[:]). Msg("[OnCommitted] low consensus block number. Spin up state sync") consensus.spinUpStateSync() @@ -385,7 +385,7 @@ func (consensus *Consensus) onCommitted(recvMsg *FBFTMessage) { consensus.getLogger().Info(). Uint64("myBlockNum", consensus.BlockNum()). Uint64("MsgBlockNum", recvMsg.BlockNum). - Hex("myBlockHash", consensus.blockHash[:]). + Hex("myBlockHash", consensus.current.blockHash[:]). Hex("MsgBlockHash", recvMsg.BlockHash[:]). Msg("[OnCommitted] OUT OF SYNC") return diff --git a/consensus/view_change.go b/consensus/view_change.go index 19a1e39151..a2a3f60cec 100644 --- a/consensus/view_change.go +++ b/consensus/view_change.go @@ -7,6 +7,8 @@ import ( "github.com/ethereum/go-ethereum/common" msg_pb "github.com/harmony-one/harmony/api/proto/message" + "github.com/harmony-one/harmony/block" + "github.com/harmony-one/harmony/consensus/engine" "github.com/harmony-one/harmony/consensus/quorum" "github.com/harmony-one/harmony/crypto/bls" "github.com/harmony-one/harmony/internal/chain" @@ -21,25 +23,6 @@ import ( // MaxViewIDDiff limits the received view ID to only 249 further from the current view ID const MaxViewIDDiff = 249 -// State contains current mode and current viewID -type State struct { - mode uint32 - - // current view id in normal mode - // it changes per successful consensus - blockViewID uint64 - - // view changing id is used during view change mode - // it is the next view id - viewChangingID uint64 -} - -func NewState(mode Mode) State { - return State{ - mode: uint32(mode), - } -} - // Mode return the current node mode func (pm *State) Mode() Mode { return Mode(atomic.LoadUint32(&pm.mode)) @@ -82,15 +65,15 @@ func (pm *State) GetViewChangeDuraion() time.Duration { // fallbackNextViewID return the next view ID and duration when there is an exception // to calculate the time-based viewId -func (consensus *Consensus) fallbackNextViewID() (uint64, time.Duration) { - diff := int64(consensus.getViewChangingID() + 1 - consensus.getCurBlockViewID()) +func (pm *State) fallbackNextViewID() (uint64, time.Duration) { + diff := int64(pm.GetViewChangingID() + 1 - pm.GetCurBlockViewID()) if diff <= 0 { diff = int64(1) } - consensus.getLogger().Error(). + pm.getLogger().Error(). Int64("diff", diff). Msg("[fallbackNextViewID] use legacy viewID algorithm") - return consensus.getViewChangingID() + 1, time.Duration(diff * diff * int64(viewChangeDuration)) + return pm.GetViewChangingID() + 1, time.Duration(diff * diff * int64(viewChangeDuration)) } // getNextViewID return the next view ID based on the timestamp @@ -103,14 +86,9 @@ func (consensus *Consensus) fallbackNextViewID() (uint64, time.Duration) { // The view change duration is a fixed duration now to avoid stuck into offline nodes during // the view change. // viewID is only used as the fallback mechansim to determine the nextViewID -func (consensus *Consensus) getNextViewID() (uint64, time.Duration) { - // handle corner case at first - if consensus.Blockchain() == nil { - return consensus.fallbackNextViewID() - } - curHeader := consensus.Blockchain().CurrentHeader() +func (pm *State) getNextViewID(curHeader *block.Header) (uint64, time.Duration) { if curHeader == nil { - return consensus.fallbackNextViewID() + return pm.fallbackNextViewID() } blockTimestamp := curHeader.Time().Int64() stuckBlockViewID := curHeader.ViewID().Uint64() + 1 @@ -118,18 +96,18 @@ func (consensus *Consensus) getNextViewID() (uint64, time.Duration) { // timestamp messed up in current validator node if curTimestamp <= blockTimestamp { - consensus.getLogger().Error(). + pm.getLogger().Error(). Int64("curTimestamp", curTimestamp). Int64("blockTimestamp", blockTimestamp). Msg("[getNextViewID] timestamp of block too high") - return consensus.fallbackNextViewID() + return pm.fallbackNextViewID() } // diff only increases, since view change timeout is shorter than // view change slot now, we want to make sure diff is always greater than 0 diff := uint64((curTimestamp-blockTimestamp)/viewChangeSlot + 1) nextViewID := diff + stuckBlockViewID - consensus.getLogger().Info(). + pm.getLogger().Info(). Int64("curTimestamp", curTimestamp). Int64("blockTimestamp", blockTimestamp). Uint64("nextViewID", nextViewID). @@ -144,34 +122,33 @@ func (consensus *Consensus) getNextViewID() (uint64, time.Duration) { // It reads the current leader's pubkey based on the blockchain data and returns // the next leader based on the gap of the viewID of the view change and the last // know view id of the block. -func (consensus *Consensus) getNextLeaderKey(viewID uint64, committee *shard.Committee) *bls.PublicKeyWrapper { +func (pm *State) getNextLeaderKey(blockchain engine.ChainReader, decider quorum.Decider, viewID uint64, committee *shard.Committee) *bls.PublicKeyWrapper { gap := 1 - cur := consensus.getCurBlockViewID() + cur := pm.GetCurBlockViewID() if viewID > cur { gap = int(viewID - cur) } var lastLeaderPubKey *bls.PublicKeyWrapper var err error - blockchain := consensus.Blockchain() epoch := big.NewInt(0) if blockchain == nil { - consensus.getLogger().Error().Msg("[getNextLeaderKey] Blockchain is nil. Use consensus.LeaderPubKey") - lastLeaderPubKey = consensus.getLeaderPubKey() + pm.getLogger().Error().Msg("[getNextLeaderKey] Blockchain is nil. Use consensus.LeaderPubKey") + lastLeaderPubKey = pm.getLeaderPubKey() } else { curHeader := blockchain.CurrentHeader() if curHeader == nil { - consensus.getLogger().Error().Msg("[getNextLeaderKey] Failed to get current header from blockchain") - lastLeaderPubKey = consensus.getLeaderPubKey() + pm.getLogger().Error().Msg("[getNextLeaderKey] Failed to get current header from blockchain") + lastLeaderPubKey = pm.getLeaderPubKey() } else { stuckBlockViewID := curHeader.ViewID().Uint64() + 1 gap = int(viewID - stuckBlockViewID) // this is the truth of the leader based on blockchain blocks lastLeaderPubKey, err = chain.GetLeaderPubKeyFromCoinbase(blockchain, curHeader) if err != nil || lastLeaderPubKey == nil { - consensus.getLogger().Error().Err(err). + pm.getLogger().Error().Err(err). Msg("[getNextLeaderKey] Unable to get leaderPubKey from coinbase. Set it to consensus.LeaderPubKey") - lastLeaderPubKey = consensus.getLeaderPubKey() + lastLeaderPubKey = pm.getLeaderPubKey() } epoch = curHeader.Epoch() // viewchange happened at the first block of new epoch @@ -181,17 +158,17 @@ func (consensus *Consensus) getNextLeaderKey(viewID uint64, committee *shard.Com // so, when validator joined the view change process later in the epoch block // it can still sync with other validators. if curHeader.IsLastBlockInEpoch() { - consensus.getLogger().Info().Msg("[getNextLeaderKey] view change in the first block of new epoch") - lastLeaderPubKey = consensus.decider.FirstParticipant(shard.Schedule.InstanceForEpoch(epoch)) + pm.getLogger().Info().Msg("[getNextLeaderKey] view change in the first block of new epoch") + lastLeaderPubKey = decider.FirstParticipant(shard.Schedule.InstanceForEpoch(epoch)) } } } - consensus.getLogger().Info(). + pm.getLogger().Info(). Str("lastLeaderPubKey", lastLeaderPubKey.Bytes.Hex()). - Str("leaderPubKey", consensus.getLeaderPubKey().Bytes.Hex()). + Str("leaderPubKey", pm.getLeaderPubKey().Bytes.Hex()). Int("gap", gap). Uint64("newViewID", viewID). - Uint64("myCurBlockViewID", consensus.getCurBlockViewID()). + Uint64("myCurBlockViewID", pm.getCurBlockViewID()). Msg("[getNextLeaderKey] got leaderPubKey from coinbase") // wasFound, next := consensus.Decider.NthNext(lastLeaderPubKey, gap) // FIXME: rotate leader on harmony nodes only before fully externalization @@ -199,33 +176,33 @@ func (consensus *Consensus) getNextLeaderKey(viewID uint64, committee *shard.Com var next *bls.PublicKeyWrapper if blockchain != nil && blockchain.Config().IsLeaderRotationInternalValidators(epoch) { if blockchain.Config().IsLeaderRotationV2Epoch(epoch) { - wasFound, next = consensus.decider.NthNextValidatorV2( + wasFound, next = decider.NthNextValidatorV2( committee.Slots, lastLeaderPubKey, gap) } else if blockchain.Config().IsLeaderRotationExternalValidatorsAllowed(epoch) { - wasFound, next = consensus.decider.NthNextValidator( + wasFound, next = decider.NthNextValidator( committee.Slots, lastLeaderPubKey, gap) } else { - wasFound, next = consensus.decider.NthNextHmy( + wasFound, next = decider.NthNextHmy( shard.Schedule.InstanceForEpoch(epoch), lastLeaderPubKey, gap) } } else { - wasFound, next = consensus.decider.NthNextHmy( + wasFound, next = decider.NthNextHmy( shard.Schedule.InstanceForEpoch(epoch), lastLeaderPubKey, gap) } if !wasFound { - consensus.getLogger().Warn(). - Str("key", consensus.getLeaderPubKey().Bytes.Hex()). + pm.getLogger().Warn(). + Str("key", pm.getLeaderPubKey().Bytes.Hex()). Msg("[getNextLeaderKey] currentLeaderKey not found") } - consensus.getLogger().Info(). + pm.getLogger().Info(). Str("nextLeader", next.Bytes.Hex()). Msg("[getNextLeaderKey] next Leader") return next @@ -248,9 +225,10 @@ func (consensus *Consensus) startViewChange() { consensus.consensusTimeout[timeoutConsensus].Stop() consensus.consensusTimeout[timeoutBootstrap].Stop() consensus.current.SetMode(ViewChanging) - nextViewID, duration := consensus.getNextViewID() + curHeader := consensus.Blockchain().CurrentHeader() + nextViewID, duration := consensus.current.getNextViewID(curHeader) consensus.setViewChangingID(nextViewID) - epoch := consensus.Blockchain().CurrentHeader().Epoch() + epoch := curHeader.Epoch() ss, err := consensus.Blockchain().ReadShardState(epoch) if err != nil { utils.Logger().Error().Err(err).Msg("Failed to read shard state") @@ -267,7 +245,8 @@ func (consensus *Consensus) startViewChange() { // aganist the consensus.LeaderPubKey variable. // Ideally, we shall use another variable to keep track of the // leader pubkey in viewchange mode - consensus.setLeaderPubKey(consensus.getNextLeaderKey(nextViewID, committee)) + consensus.setLeaderPubKey( + consensus.current.getNextLeaderKey(consensus.Blockchain(), consensus.decider, nextViewID, committee)) consensus.getLogger().Warn(). Uint64("nextViewID", nextViewID). @@ -508,7 +487,7 @@ func (consensus *Consensus) onNewView(recvMsg *FBFTMessage) { utils.CountOneBits(m3Mask.Bitmap) > utils.CountOneBits(m2Mask.Bitmap)) { // m1 is not empty, check it's valid blockHash := recvMsg.Payload[:32] - aggSig, mask, err := consensus.readSignatureBitmapPayload(recvMsg.Payload, 32, consensus.decider.Participants()) + aggSig, mask, err := readSignatureBitmapPayload(recvMsg.Payload, 32, consensus.decider.Participants()) if err != nil { consensus.getLogger().Error().Err(err). Msg("[onNewView] ReadSignatureBitmapPayload Failed") @@ -519,7 +498,7 @@ func (consensus *Consensus) onNewView(recvMsg *FBFTMessage) { Msg("[onNewView] Failed to Verify Signature for M1 (prepare) message") return } - copy(consensus.blockHash[:], blockHash) + copy(consensus.current.blockHash[:], blockHash) consensus.aggregatedPrepareSig = aggSig consensus.prepareBitmap = mask @@ -581,7 +560,7 @@ func (consensus *Consensus) ResetViewChangeState() { // ResetViewChangeState resets the view change structure func (consensus *Consensus) resetViewChangeState() { consensus.getLogger().Info(). - Str("Phase", consensus.phase.String()). + Str("Phase", consensus.current.phase.String()). Msg("[ResetViewChangeState] Resetting view change state") consensus.current.SetMode(Normal) consensus.vc.Reset() diff --git a/consensus/view_change_test.go b/consensus/view_change_test.go index 42b44f7c78..3ab78bd2d6 100644 --- a/consensus/view_change_test.go +++ b/consensus/view_change_test.go @@ -14,7 +14,7 @@ func TestBasicViewChanging(t *testing.T) { _, _, consensus, _, err := GenerateConsensusForTesting() assert.NoError(t, err) - state := NewState(Normal) + state := NewState(Normal, consensus.ShardID) // Change Mode assert.Equal(t, state.mode, consensus.current.mode) @@ -42,7 +42,7 @@ func TestPhaseSwitching(t *testing.T) { _, _, consensus, _, err := GenerateConsensusForTesting() assert.NoError(t, err) - assert.Equal(t, FBFTAnnounce, consensus.phase) // It's a new consensus, we should be at the FBFTAnnounce phase. + assert.Equal(t, FBFTAnnounce, consensus.current.phase) // It's a new consensus, we should be at the FBFTAnnounce phase. switches := []phaseSwitch{ {start: FBFTAnnounce, end: FBFTPrepare}, @@ -72,10 +72,10 @@ func TestPhaseSwitching(t *testing.T) { func testPhaseGroupSwitching(t *testing.T, consensus *Consensus, phases []FBFTPhase, startPhase FBFTPhase, desiredPhase FBFTPhase) { for range phases { consensus.switchPhase("test", desiredPhase) - assert.Equal(t, desiredPhase, consensus.phase) + assert.Equal(t, desiredPhase, consensus.current.phase) } - assert.Equal(t, desiredPhase, consensus.phase) + assert.Equal(t, desiredPhase, consensus.current.phase) return } @@ -86,7 +86,7 @@ func TestGetNextLeaderKeyShouldFailForStandardGeneratedConsensus(t *testing.T) { // The below results in: "panic: runtime error: integer divide by zero" // This happens because there's no check for if there are any participants or not in https://github.com/harmony-one/harmony/blob/main/consensus/quorum/quorum.go#L188-L197 - assert.Panics(t, func() { consensus.getNextLeaderKey(uint64(1), nil) }) + assert.Panics(t, func() { consensus.current.getNextLeaderKey(consensus.Blockchain(), consensus.decider, uint64(1), nil) }) } func TestGetNextLeaderKeyShouldSucceed(t *testing.T) { @@ -114,7 +114,7 @@ func TestGetNextLeaderKeyShouldSucceed(t *testing.T) { assert.Equal(t, keyCount, consensus.Decider().ParticipantsCount()) consensus.setLeaderPubKey(&wrappedBLSKeys[0]) - nextKey := consensus.getNextLeaderKey(uint64(1), nil) + nextKey := consensus.current.getNextLeaderKey(consensus.Blockchain(), consensus.decider, uint64(1), nil) assert.Equal(t, nextKey, &wrappedBLSKeys[1]) } diff --git a/consensus/votepower/roster_test.go b/consensus/votepower/roster_test.go index 17d979301b..0e7148fa27 100644 --- a/consensus/votepower/roster_test.go +++ b/consensus/votepower/roster_test.go @@ -149,3 +149,11 @@ func compareStakedVoter(a, b *AccommodateHarmonyVote) bool { a.OverallPercent.Equal(b.OverallPercent) && a.EffectiveStake.Equal(b.EffectiveStake) } + +func TestNewRound(t *testing.T) { + // Test NewRound + round := NewRound() + if round == nil { + t.Error("NewRound failed") + } +} diff --git a/internal/chain/supply.go b/internal/chain/supply.go index 2890d88a37..b130315d1a 100644 --- a/internal/chain/supply.go +++ b/internal/chain/supply.go @@ -83,7 +83,7 @@ func getTotalCirculatingSupply(chain engine.ChainReader) (ret numeric.Dec, err e } releasedInitSupply := stakingReward.TotalInitialTokens.Mul( - reward.PercentageForTimeStamp(timestamp), + reward.PercentageForTimeStamp(shard.Schedule, timestamp), ) preStakingBlockRewards := stakingReward.GetTotalPreStakingTokens().Sub(stakingReward.TotalInitialTokens) return releasedInitSupply.Add(preStakingBlockRewards).Add( diff --git a/staking/network/reward.go b/staking/network/reward.go index c308a57aac..428a5079d6 100644 --- a/staking/network/reward.go +++ b/staking/network/reward.go @@ -126,7 +126,7 @@ func WhatPercentStakedNow( ) } percentage := stakedNow.Quo(stakingReward.TotalInitialTokens.Mul( - reward.PercentageForTimeStamp(timestamp), + reward.PercentageForTimeStamp(shard.Schedule, timestamp), ).Add(dole)) utils.Logger().Info(). Str("so-far-doled-out", dole.String()).