Skip to content

Commit

Permalink
consensus/istanbul: add block lock
Browse files Browse the repository at this point in the history
  • Loading branch information
markya0616 authored and yutelin committed Sep 2, 2017
1 parent 9ebf73c commit 78a616e
Show file tree
Hide file tree
Showing 12 changed files with 283 additions and 43 deletions.
6 changes: 3 additions & 3 deletions consensus/istanbul/core/backlog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestCheckMessage(t *testing.T) {
current: newRoundState(&istanbul.View{
Sequence: big.NewInt(1),
Round: big.NewInt(0),
}, newTestValidatorSet(4), nil),
}, newTestValidatorSet(4), common.Hash{}, nil, nil),
}

// invalid view format
Expand Down Expand Up @@ -209,7 +209,7 @@ func TestProcessFutureBacklog(t *testing.T) {
current: newRoundState(&istanbul.View{
Sequence: big.NewInt(1),
Round: big.NewInt(0),
}, newTestValidatorSet(4), nil),
}, newTestValidatorSet(4), common.Hash{}, nil, nil),
state: StateAcceptRequest,
}
c.subscribeEvents()
Expand Down Expand Up @@ -297,7 +297,7 @@ func testProcessBacklog(t *testing.T, msg *message) {
current: newRoundState(&istanbul.View{
Sequence: big.NewInt(1),
Round: big.NewInt(0),
}, newTestValidatorSet(4), nil),
}, newTestValidatorSet(4), common.Hash{}, nil, nil),
}
c.subscribeEvents()
defer c.unsubscribeEvents()
Expand Down
21 changes: 18 additions & 3 deletions consensus/istanbul/core/commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,26 @@ package core
import (
"reflect"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus/istanbul"
)

func (c *core) sendCommit() {
sub := c.current.Subject()
c.broadcastCommit(sub)
}

func (c *core) sendCommitForOldBlock(view *istanbul.View, digest common.Hash) {
sub := &istanbul.Subject{
View: view,
Digest: digest,
}
c.broadcastCommit(sub)
}

func (c *core) broadcastCommit(sub *istanbul.Subject) {
logger := c.logger.New("state", c.state)

sub := c.current.Subject()
encodedSubject, err := Encode(sub)
if err != nil {
logger.Error("Failed to encode", "subject", sub)
Expand Down Expand Up @@ -55,11 +68,13 @@ func (c *core) handleCommit(msg *message, src istanbul.Validator) error {

c.acceptCommit(msg, src)

// Commit the proposal once we have enough commit messages and we are not in StateCommitted.
// Commit the proposal once we have enough COMMIT messages and we are not in the Committed state.
//
// If we already have a proposal, we may have chance to speed up the consensus process
// by committing the proposal without prepare messages.
// by committing the proposal without PREPARE messages.
if c.current.Commits.Size() > 2*c.valSet.F() && c.state.Cmp(StateCommitted) < 0 {
// Still need to call LockHash here since state can skip Prepared state and jump directly to the Committed state.
c.current.LockHash()
c.commit()
}

Expand Down
10 changes: 9 additions & 1 deletion consensus/istanbul/core/commit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,9 @@ OUTER:
if err != test.expectedErr {
t.Errorf("error mismatch: have %v, want %v", err, test.expectedErr)
}
if r0.current.IsHashLocked() {
t.Errorf("block should not be locked")
}
continue OUTER
}
}
Expand All @@ -191,7 +194,9 @@ OUTER:
if r0.current.Commits.Size() > 2*r0.valSet.F() {
t.Errorf("the size of commit messages should be less than %v", 2*r0.valSet.F()+1)
}

if r0.current.IsHashLocked() {
t.Errorf("block should not be locked")
}
continue
}

Expand All @@ -214,6 +219,9 @@ OUTER:
if signedCount <= 2*r0.valSet.F() {
t.Errorf("the expected signed count should be larger than %v, but got %v", 2*r0.valSet.F(), signedCount)
}
if !r0.current.IsHashLocked() {
t.Errorf("block should be locked")
}
}
}

Expand Down
36 changes: 28 additions & 8 deletions consensus/istanbul/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus/istanbul"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
metrics "github.com/ethereum/go-ethereum/metrics"
Expand Down Expand Up @@ -171,6 +172,7 @@ func (c *core) commit() {
}

if err := c.backend.Commit(proposal, committedSeals); err != nil {
c.current.UnlockHash() //Unlock block when insertion fails
c.sendNextRoundChange()
return
}
Expand Down Expand Up @@ -203,18 +205,20 @@ func (c *core) startNewRound(newView *istanbul.View, lastProposal istanbul.Propo
// Clear invalid ROUND CHANGE messages
c.roundChangeSet = newRoundChangeSet(c.valSet)
// New snapshot for new round
if c.current != nil {
c.current = newRoundState(newView, c.valSet, c.current.pendingRequest)
} else {
c.current = newRoundState(newView, c.valSet, nil)
}

c.updateRoundState(newView, c.valSet, roundChange)
// Calculate new proposer
c.valSet.CalcProposer(c.lastProposer, newView.Round.Uint64())
c.waitingForRoundChange = false
c.setState(StateAcceptRequest)
if roundChange && c.isProposer() && c.current != nil {
if c.current.pendingRequest != nil {
// If it is locked, propose the old proposal
// If we have pending request, propose pending request
if c.current.IsHashLocked() {
r := &istanbul.Request{
Proposal: c.current.Proposal(), //c.current.Proposal would be the locked proposal by previous proposer, see updateRoundState
}
c.sendPreprepare(r)
} else if c.current.pendingRequest != nil {
c.sendPreprepare(c.current.pendingRequest)
}
}
Expand All @@ -230,13 +234,29 @@ func (c *core) catchUpRound(view *istanbul.View) {
c.roundMeter.Mark(new(big.Int).Sub(view.Round, c.current.Round()).Int64())
}
c.waitingForRoundChange = true
c.current = newRoundState(view, c.valSet)

// Need to keep block locked for round catching up
c.updateRoundState(view, c.valSet, true)
c.roundChangeSet.Clear(view.Round)
c.newRoundChangeTimer()

logger.Trace("Catch up round", "new_round", view.Round, "new_seq", view.Sequence, "new_proposer", c.valSet)
}

// updateRoundState updates round state by checking if locking block is necessary
func (c *core) updateRoundState(view *istanbul.View, validatorSet istanbul.ValidatorSet, roundChange bool) {
// Lock only if both roundChange is true and it is locked
if roundChange && c.current != nil {
if c.current.IsHashLocked() {
c.current = newRoundState(view, validatorSet, c.current.GetLockedHash(), c.current.Preprepare, c.current.pendingRequest)
} else {
c.current = newRoundState(view, validatorSet, common.Hash{}, nil, c.current.pendingRequest)
}
} else {
c.current = newRoundState(view, validatorSet, common.Hash{}, nil, nil)
}
}

func (c *core) setState(state State) {
if c.state != state {
c.state = state
Expand Down
9 changes: 6 additions & 3 deletions consensus/istanbul/core/prepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,18 @@ func (c *core) handlePrepare(msg *message, src istanbul.Validator) error {
return err
}

// If it is locked, it can only process on the locked block.
// Passing verifyPrepare and checkMessage implies it is processing on the locked block since it was verified in the Preprepared state.
if err := c.verifyPrepare(prepare, src); err != nil {
return err
}

c.acceptPrepare(msg, src)

// Change to Prepared state if we've received enough PREPARE messages
// and we are in earlier state before Prepared state
if c.current.Prepares.Size() > 2*c.valSet.F() && c.state.Cmp(StatePrepared) < 0 {
// Change to Prepared state if we've received enough PREPARE messages or it is locked
// and we are in earlier state before Prepared state.
if (c.current.IsHashLocked() || c.current.Prepares.Size() > 2*c.valSet.F()) && c.state.Cmp(StatePrepared) < 0 {
c.current.LockHash()
c.setState(StatePrepared)
c.sendCommit()
}
Expand Down
9 changes: 9 additions & 0 deletions consensus/istanbul/core/prepare_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,9 @@ OUTER:
if err != test.expectedErr {
t.Errorf("error mismatch: have %v, want %v", err, test.expectedErr)
}
if r0.current.IsHashLocked() {
t.Errorf("block should not be locked")
}
continue OUTER
}
}
Expand All @@ -214,6 +217,9 @@ OUTER:
if r0.current.Prepares.Size() > 2*r0.valSet.F() {
t.Errorf("the size of PREPARE messages should be less than %v", 2*r0.valSet.F()+1)
}
if r0.current.IsHashLocked() {
t.Errorf("block should not be locked")
}

continue
}
Expand Down Expand Up @@ -246,6 +252,9 @@ OUTER:
if !reflect.DeepEqual(m, expectedSubject) {
t.Errorf("subject mismatch: have %v, want %v", m, expectedSubject)
}
if !r0.current.IsHashLocked() {
t.Errorf("block should be locked")
}
}
}

Expand Down
14 changes: 11 additions & 3 deletions consensus/istanbul/core/preprepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,17 @@ func (c *core) handlePreprepare(msg *message, src istanbul.Validator) error {

// Here is about to accept the PRE-PREPARE
if c.state == StateAcceptRequest {
c.acceptPreprepare(preprepare)
c.setState(StatePreprepared)
c.sendPrepare()
// Send ROUND CHANGE if the locked proposal and the received proposal are different
if c.current.IsHashLocked() && preprepare.Proposal.Hash() != c.current.GetLockedHash() {
c.sendNextRoundChange()
} else {
// Either
// 1. the locked proposal and the received proposal match
// 2. we have no locked proposal
c.acceptPreprepare(preprepare)
c.setState(StatePreprepared)
c.sendPrepare()
}
}

return nil
Expand Down
Loading

0 comments on commit 78a616e

Please sign in to comment.