diff --git a/consensus/istanbul/core/backlog_test.go b/consensus/istanbul/core/backlog_test.go index 21d055cef4..6e2c77f9a4 100644 --- a/consensus/istanbul/core/backlog_test.go +++ b/consensus/istanbul/core/backlog_test.go @@ -37,7 +37,7 @@ func TestCheckMessage(t *testing.T) { current: newRoundState(&istanbul.View{ Sequence: big.NewInt(1), Round: big.NewInt(0), - }, newTestValidatorSet(4), nil), + }, newTestValidatorSet(4), common.Hash{}, nil, nil), } // invalid view format @@ -209,7 +209,7 @@ func TestProcessFutureBacklog(t *testing.T) { current: newRoundState(&istanbul.View{ Sequence: big.NewInt(1), Round: big.NewInt(0), - }, newTestValidatorSet(4), nil), + }, newTestValidatorSet(4), common.Hash{}, nil, nil), state: StateAcceptRequest, } c.subscribeEvents() @@ -297,7 +297,7 @@ func testProcessBacklog(t *testing.T, msg *message) { current: newRoundState(&istanbul.View{ Sequence: big.NewInt(1), Round: big.NewInt(0), - }, newTestValidatorSet(4), nil), + }, newTestValidatorSet(4), common.Hash{}, nil, nil), } c.subscribeEvents() defer c.unsubscribeEvents() diff --git a/consensus/istanbul/core/commit.go b/consensus/istanbul/core/commit.go index fd5223cdbf..fa58102254 100644 --- a/consensus/istanbul/core/commit.go +++ b/consensus/istanbul/core/commit.go @@ -19,13 +19,26 @@ package core import ( "reflect" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus/istanbul" ) func (c *core) sendCommit() { + sub := c.current.Subject() + c.broadcastCommit(sub) +} + +func (c *core) sendCommitForOldBlock(view *istanbul.View, digest common.Hash) { + sub := &istanbul.Subject{ + View: view, + Digest: digest, + } + c.broadcastCommit(sub) +} + +func (c *core) broadcastCommit(sub *istanbul.Subject) { logger := c.logger.New("state", c.state) - sub := c.current.Subject() encodedSubject, err := Encode(sub) if err != nil { logger.Error("Failed to encode", "subject", sub) @@ -55,11 +68,13 @@ func (c *core) handleCommit(msg *message, src istanbul.Validator) error { c.acceptCommit(msg, src) - // Commit the proposal once we have enough commit messages and we are not in StateCommitted. + // Commit the proposal once we have enough COMMIT messages and we are not in the Committed state. // // If we already have a proposal, we may have chance to speed up the consensus process - // by committing the proposal without prepare messages. + // by committing the proposal without PREPARE messages. if c.current.Commits.Size() > 2*c.valSet.F() && c.state.Cmp(StateCommitted) < 0 { + // Still need to call LockHash here since state can skip Prepared state and jump directly to the Committed state. + c.current.LockHash() c.commit() } diff --git a/consensus/istanbul/core/commit_test.go b/consensus/istanbul/core/commit_test.go index 432c6c7be0..cca5cbbaca 100644 --- a/consensus/istanbul/core/commit_test.go +++ b/consensus/istanbul/core/commit_test.go @@ -178,6 +178,9 @@ OUTER: if err != test.expectedErr { t.Errorf("error mismatch: have %v, want %v", err, test.expectedErr) } + if r0.current.IsHashLocked() { + t.Errorf("block should not be locked") + } continue OUTER } } @@ -191,7 +194,9 @@ OUTER: if r0.current.Commits.Size() > 2*r0.valSet.F() { t.Errorf("the size of commit messages should be less than %v", 2*r0.valSet.F()+1) } - + if r0.current.IsHashLocked() { + t.Errorf("block should not be locked") + } continue } @@ -214,6 +219,9 @@ OUTER: if signedCount <= 2*r0.valSet.F() { t.Errorf("the expected signed count should be larger than %v, but got %v", 2*r0.valSet.F(), signedCount) } + if !r0.current.IsHashLocked() { + t.Errorf("block should be locked") + } } } diff --git a/consensus/istanbul/core/core.go b/consensus/istanbul/core/core.go index f85d8b65dc..942de4b9ff 100644 --- a/consensus/istanbul/core/core.go +++ b/consensus/istanbul/core/core.go @@ -25,6 +25,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus/istanbul" + "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" metrics "github.com/ethereum/go-ethereum/metrics" @@ -171,6 +172,7 @@ func (c *core) commit() { } if err := c.backend.Commit(proposal, committedSeals); err != nil { + c.current.UnlockHash() //Unlock block when insertion fails c.sendNextRoundChange() return } @@ -203,18 +205,20 @@ func (c *core) startNewRound(newView *istanbul.View, lastProposal istanbul.Propo // Clear invalid ROUND CHANGE messages c.roundChangeSet = newRoundChangeSet(c.valSet) // New snapshot for new round - if c.current != nil { - c.current = newRoundState(newView, c.valSet, c.current.pendingRequest) - } else { - c.current = newRoundState(newView, c.valSet, nil) - } - + c.updateRoundState(newView, c.valSet, roundChange) // Calculate new proposer c.valSet.CalcProposer(c.lastProposer, newView.Round.Uint64()) c.waitingForRoundChange = false c.setState(StateAcceptRequest) if roundChange && c.isProposer() && c.current != nil { - if c.current.pendingRequest != nil { + // If it is locked, propose the old proposal + // If we have pending request, propose pending request + if c.current.IsHashLocked() { + r := &istanbul.Request{ + Proposal: c.current.Proposal(), //c.current.Proposal would be the locked proposal by previous proposer, see updateRoundState + } + c.sendPreprepare(r) + } else if c.current.pendingRequest != nil { c.sendPreprepare(c.current.pendingRequest) } } @@ -230,13 +234,29 @@ func (c *core) catchUpRound(view *istanbul.View) { c.roundMeter.Mark(new(big.Int).Sub(view.Round, c.current.Round()).Int64()) } c.waitingForRoundChange = true - c.current = newRoundState(view, c.valSet) + + // Need to keep block locked for round catching up + c.updateRoundState(view, c.valSet, true) c.roundChangeSet.Clear(view.Round) c.newRoundChangeTimer() logger.Trace("Catch up round", "new_round", view.Round, "new_seq", view.Sequence, "new_proposer", c.valSet) } +// updateRoundState updates round state by checking if locking block is necessary +func (c *core) updateRoundState(view *istanbul.View, validatorSet istanbul.ValidatorSet, roundChange bool) { + // Lock only if both roundChange is true and it is locked + if roundChange && c.current != nil { + if c.current.IsHashLocked() { + c.current = newRoundState(view, validatorSet, c.current.GetLockedHash(), c.current.Preprepare, c.current.pendingRequest) + } else { + c.current = newRoundState(view, validatorSet, common.Hash{}, nil, c.current.pendingRequest) + } + } else { + c.current = newRoundState(view, validatorSet, common.Hash{}, nil, nil) + } +} + func (c *core) setState(state State) { if c.state != state { c.state = state diff --git a/consensus/istanbul/core/prepare.go b/consensus/istanbul/core/prepare.go index 7738ab069b..ed1fc2815f 100644 --- a/consensus/istanbul/core/prepare.go +++ b/consensus/istanbul/core/prepare.go @@ -49,15 +49,18 @@ func (c *core) handlePrepare(msg *message, src istanbul.Validator) error { return err } + // If it is locked, it can only process on the locked block. + // Passing verifyPrepare and checkMessage implies it is processing on the locked block since it was verified in the Preprepared state. if err := c.verifyPrepare(prepare, src); err != nil { return err } c.acceptPrepare(msg, src) - // Change to Prepared state if we've received enough PREPARE messages - // and we are in earlier state before Prepared state - if c.current.Prepares.Size() > 2*c.valSet.F() && c.state.Cmp(StatePrepared) < 0 { + // Change to Prepared state if we've received enough PREPARE messages or it is locked + // and we are in earlier state before Prepared state. + if (c.current.IsHashLocked() || c.current.Prepares.Size() > 2*c.valSet.F()) && c.state.Cmp(StatePrepared) < 0 { + c.current.LockHash() c.setState(StatePrepared) c.sendCommit() } diff --git a/consensus/istanbul/core/prepare_test.go b/consensus/istanbul/core/prepare_test.go index 2770fe4086..a2b495d85f 100644 --- a/consensus/istanbul/core/prepare_test.go +++ b/consensus/istanbul/core/prepare_test.go @@ -201,6 +201,9 @@ OUTER: if err != test.expectedErr { t.Errorf("error mismatch: have %v, want %v", err, test.expectedErr) } + if r0.current.IsHashLocked() { + t.Errorf("block should not be locked") + } continue OUTER } } @@ -214,6 +217,9 @@ OUTER: if r0.current.Prepares.Size() > 2*r0.valSet.F() { t.Errorf("the size of PREPARE messages should be less than %v", 2*r0.valSet.F()+1) } + if r0.current.IsHashLocked() { + t.Errorf("block should not be locked") + } continue } @@ -246,6 +252,9 @@ OUTER: if !reflect.DeepEqual(m, expectedSubject) { t.Errorf("subject mismatch: have %v, want %v", m, expectedSubject) } + if !r0.current.IsHashLocked() { + t.Errorf("block should be locked") + } } } diff --git a/consensus/istanbul/core/preprepare.go b/consensus/istanbul/core/preprepare.go index 2473c7d438..a79bfc21f9 100644 --- a/consensus/istanbul/core/preprepare.go +++ b/consensus/istanbul/core/preprepare.go @@ -86,9 +86,17 @@ func (c *core) handlePreprepare(msg *message, src istanbul.Validator) error { // Here is about to accept the PRE-PREPARE if c.state == StateAcceptRequest { - c.acceptPreprepare(preprepare) - c.setState(StatePreprepared) - c.sendPrepare() + // Send ROUND CHANGE if the locked proposal and the received proposal are different + if c.current.IsHashLocked() && preprepare.Proposal.Hash() != c.current.GetLockedHash() { + c.sendNextRoundChange() + } else { + // Either + // 1. the locked proposal and the received proposal match + // 2. we have no locked proposal + c.acceptPreprepare(preprepare) + c.setState(StatePreprepared) + c.sendPrepare() + } } return nil diff --git a/consensus/istanbul/core/preprepare_test.go b/consensus/istanbul/core/preprepare_test.go index 8070211c7d..5ac6039913 100644 --- a/consensus/istanbul/core/preprepare_test.go +++ b/consensus/istanbul/core/preprepare_test.go @@ -39,6 +39,7 @@ func TestHandlePreprepare(t *testing.T) { system *testSystem expectedRequest istanbul.Proposal expectedErr error + existingBlock bool }{ { // normal case @@ -56,6 +57,7 @@ func TestHandlePreprepare(t *testing.T) { }(), newTestProposal(), nil, + false, }, { // future message @@ -84,6 +86,7 @@ func TestHandlePreprepare(t *testing.T) { }(), makeBlock(1), errFutureMessage, + false, }, { // non-proposer @@ -105,9 +108,10 @@ func TestHandlePreprepare(t *testing.T) { }(), makeBlock(1), errNotFromProposer, + false, }, { - // ErrInvalidMessage + // errOldMessage func() *testSystem { sys := NewTestSystemWithBackend(N, F) @@ -124,6 +128,7 @@ func TestHandlePreprepare(t *testing.T) { }(), makeBlock(1), errOldMessage, + false, }, } @@ -167,7 +172,7 @@ OUTER: t.Errorf("state mismatch: have %v, want %v", c.state, StatePreprepared) } - if !reflect.DeepEqual(c.current.Subject().View, curView) { + if !test.existingBlock && !reflect.DeepEqual(c.current.Subject().View, curView) { t.Errorf("view mismatch: have %v, want %v", c.current.Subject().View, curView) } @@ -178,17 +183,116 @@ OUTER: t.Errorf("error mismatch: have %v, want nil", err) } - if decodedMsg.Code != msgPrepare { - t.Errorf("message code mismatch: have %v, want %v", decodedMsg.Code, msgPrepare) + expectedCode := msgPrepare + if test.existingBlock { + expectedCode = msgCommit } + if decodedMsg.Code != expectedCode { + t.Errorf("message code mismatch: have %v, want %v", decodedMsg.Code, expectedCode) + } + var subject *istanbul.Subject err = decodedMsg.Decode(&subject) if err != nil { t.Errorf("error mismatch: have %v, want nil", err) } - if !reflect.DeepEqual(subject, c.current.Subject()) { + if !test.existingBlock && !reflect.DeepEqual(subject, c.current.Subject()) { t.Errorf("subject mismatch: have %v, want %v", subject, c.current.Subject()) } + + } + } +} + +func TestHandlePreprepareWithLock(t *testing.T) { + N := uint64(4) // replica 0 is the proposer, it will send messages to others + F := uint64(1) // F does not affect tests + proposal := newTestProposal() + mismatchProposal := makeBlock(10) + newSystem := func() *testSystem { + sys := NewTestSystemWithBackend(N, F) + + for i, backend := range sys.backends { + c := backend.engine.(*core) + c.valSet = backend.peers + if i != 0 { + c.state = StateAcceptRequest + } + c.roundChangeSet = newRoundChangeSet(c.valSet) + } + return sys + } + + testCases := []struct { + system *testSystem + proposal istanbul.Proposal + lockProposal istanbul.Proposal + }{ + { + newSystem(), + proposal, + proposal, + }, + { + newSystem(), + proposal, + mismatchProposal, + }, + } + + for _, test := range testCases { + test.system.Run(false) + v0 := test.system.backends[0] + r0 := v0.engine.(*core) + curView := r0.currentView() + preprepare := &istanbul.Preprepare{ + View: curView, + Proposal: test.proposal, + } + lockPreprepare := &istanbul.Preprepare{ + View: curView, + Proposal: test.lockProposal, + } + + for i, v := range test.system.backends { + // i == 0 is primary backend, it is responsible for send PRE-PREPARE messages to others. + if i == 0 { + continue + } + + c := v.engine.(*core) + c.current.SetPreprepare(lockPreprepare) + c.current.LockHash() + m, _ := Encode(preprepare) + _, val := r0.valSet.GetByAddress(v0.Address()) + if err := c.handlePreprepare(&message{ + Code: msgPreprepare, + Msg: m, + Address: v0.Address(), + }, val); err != nil { + t.Errorf("error mismatch: have %v, want nil", err) + } + if test.proposal == test.lockProposal { + if c.state != StatePreprepared { + t.Errorf("state mismatch: have %v, want %v", c.state, StatePreprepared) + } + if !reflect.DeepEqual(curView, c.currentView()) { + t.Errorf("view mismatch: have %v, want %v", c.currentView(), curView) + } + } else { + // Should stay at StateAcceptRequest + if c.state != StateAcceptRequest { + t.Errorf("state mismatch: have %v, want %v", c.state, StateAcceptRequest) + } + // Should have triggered a round change + expectedView := &istanbul.View{ + Sequence: curView.Sequence, + Round: big.NewInt(1), + } + if !reflect.DeepEqual(expectedView, c.currentView()) { + t.Errorf("view mismatch: have %v, want %v", c.currentView(), expectedView) + } + } } } } diff --git a/consensus/istanbul/core/request_test.go b/consensus/istanbul/core/request_test.go index 2c0b5ede9d..cbb585b95e 100644 --- a/consensus/istanbul/core/request_test.go +++ b/consensus/istanbul/core/request_test.go @@ -23,6 +23,7 @@ import ( "testing" "time" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus/istanbul" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" @@ -35,7 +36,7 @@ func TestCheckRequestMsg(t *testing.T) { current: newRoundState(&istanbul.View{ Sequence: big.NewInt(1), Round: big.NewInt(0), - }, newTestValidatorSet(4), nil), + }, newTestValidatorSet(4), common.Hash{}, nil, nil), } // invalid request @@ -90,7 +91,7 @@ func TestStoreRequestMsg(t *testing.T) { current: newRoundState(&istanbul.View{ Sequence: big.NewInt(0), Round: big.NewInt(0), - }, newTestValidatorSet(4), nil), + }, newTestValidatorSet(4), common.Hash{}, nil, nil), pendingRequests: prque.New(), pendingRequestsMu: new(sync.Mutex), } diff --git a/consensus/istanbul/core/roundstate.go b/consensus/istanbul/core/roundstate.go index 3c091c9820..18f8ace225 100644 --- a/consensus/istanbul/core/roundstate.go +++ b/consensus/istanbul/core/roundstate.go @@ -21,18 +21,22 @@ import ( "math/big" "sync" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus/istanbul" "github.com/ethereum/go-ethereum/rlp" ) -func newRoundState(view *istanbul.View, validatorSet istanbul.ValidatorSet, pendingRequest *istanbul.Request) *roundState { +// newRoundState creates a new roundState instance with the given view and validatorSet +// lockedHash and preprepare are for round change when lock exists, +// we need to keep a reference of preprepare in order to propose locked proposal when there is a lock and itself is the proposer +func newRoundState(view *istanbul.View, validatorSet istanbul.ValidatorSet, lockedHash common.Hash, preprepare *istanbul.Preprepare, pendingRequest *istanbul.Request) *roundState { return &roundState{ round: view.Round, sequence: view.Sequence, - Preprepare: nil, + Preprepare: preprepare, Prepares: newMessageSet(validatorSet), Commits: newMessageSet(validatorSet), - Checkpoints: newMessageSet(validatorSet), + lockedHash: lockedHash, mu: new(sync.RWMutex), pendingRequest: pendingRequest, } @@ -45,7 +49,7 @@ type roundState struct { Preprepare *istanbul.Preprepare Prepares *messageSet Commits *messageSet - Checkpoints *messageSet + lockedHash common.Hash pendingRequest *istanbul.Request mu *sync.RWMutex @@ -114,6 +118,36 @@ func (s *roundState) Sequence() *big.Int { return s.sequence } +func (s *roundState) LockHash() { + s.mu.Lock() + defer s.mu.Unlock() + + if s.Preprepare != nil { + s.lockedHash = s.Preprepare.Proposal.Hash() + } +} + +func (s *roundState) UnlockHash() { + s.mu.Lock() + defer s.mu.Unlock() + + s.lockedHash = common.Hash{} +} + +func (s *roundState) IsHashLocked() bool { + s.mu.RLock() + defer s.mu.RUnlock() + + return s.lockedHash != common.Hash{} +} + +func (s *roundState) GetLockedHash() common.Hash { + s.mu.RLock() + defer s.mu.RUnlock() + + return s.lockedHash +} + // The DecodeRLP method should read one value from the given // Stream. It is not forbidden to read less or more, but it might // be confusing. @@ -124,7 +158,7 @@ func (s *roundState) DecodeRLP(stream *rlp.Stream) error { Preprepare *istanbul.Preprepare Prepares *messageSet Commits *messageSet - Checkpoints *messageSet + lockedHash common.Hash pendingRequest *istanbul.Request } @@ -136,7 +170,7 @@ func (s *roundState) DecodeRLP(stream *rlp.Stream) error { s.Preprepare = ss.Preprepare s.Prepares = ss.Prepares s.Commits = ss.Commits - s.Checkpoints = ss.Checkpoints + s.lockedHash = ss.lockedHash s.pendingRequest = ss.pendingRequest s.mu = new(sync.RWMutex) @@ -161,7 +195,7 @@ func (s *roundState) EncodeRLP(w io.Writer) error { s.Preprepare, s.Prepares, s.Commits, - s.Checkpoints, + s.lockedHash, s.pendingRequest, }) } diff --git a/consensus/istanbul/core/roundstate_test.go b/consensus/istanbul/core/roundstate_test.go index 0e3ee0ef0f..cb8239ac28 100644 --- a/consensus/istanbul/core/roundstate_test.go +++ b/consensus/istanbul/core/roundstate_test.go @@ -17,19 +17,57 @@ package core import ( + "math/big" "sync" + "testing" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus/istanbul" ) func newTestRoundState(view *istanbul.View, validatorSet istanbul.ValidatorSet) *roundState { return &roundState{ - round: view.Round, - sequence: view.Sequence, - Preprepare: newTestPreprepare(view), - Prepares: newMessageSet(validatorSet), - Commits: newMessageSet(validatorSet), - Checkpoints: newMessageSet(validatorSet), - mu: new(sync.RWMutex), + round: view.Round, + sequence: view.Sequence, + Preprepare: newTestPreprepare(view), + Prepares: newMessageSet(validatorSet), + Commits: newMessageSet(validatorSet), + mu: new(sync.RWMutex), + } +} + +func TestLockHash(t *testing.T) { + sys := NewTestSystemWithBackend(1, 0) + rs := newTestRoundState( + &istanbul.View{ + Round: big.NewInt(0), + Sequence: big.NewInt(0), + }, + sys.backends[0].peers, + ) + if !common.EmptyHash(rs.GetLockedHash()) { + t.Errorf("error mismatch: have %v, want empty", rs.GetLockedHash()) + } + if rs.IsHashLocked() { + t.Error("IsHashLocked should return false") + } + + // Lock + expected := rs.Proposal().Hash() + rs.LockHash() + if expected != rs.GetLockedHash() { + t.Errorf("error mismatch: have %v, want %v", rs.GetLockedHash(), expected) + } + if !rs.IsHashLocked() { + t.Error("IsHashLocked should return true") + } + + // Unlock + rs.UnlockHash() + if !common.EmptyHash(rs.GetLockedHash()) { + t.Errorf("error mismatch: have %v, want empty", rs.GetLockedHash()) + } + if rs.IsHashLocked() { + t.Error("IsHashLocked should return false") } } diff --git a/consensus/istanbul/core/testbackend_test.go b/consensus/istanbul/core/testbackend_test.go index 4091841675..8d9e5f162e 100644 --- a/consensus/istanbul/core/testbackend_test.go +++ b/consensus/istanbul/core/testbackend_test.go @@ -186,7 +186,7 @@ func NewTestSystemWithBackend(n, f uint64) *testSystem { core.current = newRoundState(&istanbul.View{ Round: big.NewInt(0), Sequence: big.NewInt(1), - }, vset, nil) + }, vset, common.Hash{}, nil, nil) core.logger = testLogger core.validateFn = backend.CheckValidatorSignature