Skip to content

Commit

Permalink
consensus/istanbul: add block lock
Browse files Browse the repository at this point in the history
  • Loading branch information
yutelin committed Jul 18, 2017
1 parent 21d9a78 commit 7daa006
Show file tree
Hide file tree
Showing 10 changed files with 169 additions and 22 deletions.
10 changes: 10 additions & 0 deletions consensus/istanbul/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package istanbul

import (
"math/big"
"time"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -54,4 +55,13 @@ type Backend interface {
// CheckSignature verifies the signature by checking if it's signed by
// the given validator
CheckSignature(data []byte, addr common.Address, sig []byte) error

// HasBlock checks if the combination of the given hash and height matches any existing blocks
HasBlock(hash common.Hash, number *big.Int) bool

// GetProposer returns the proposer of the given block height
GetProposer(number uint64) common.Address

// ParentValidators returns the validator set of the given proposal's parent block
ParentValidators(proposal Proposal) ValidatorSet
}
37 changes: 32 additions & 5 deletions consensus/istanbul/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package backend

import (
"crypto/ecdsa"
"math/big"
"sync"
"time"

Expand Down Expand Up @@ -92,11 +93,7 @@ func (sb *backend) Address() common.Address {

// Validators implements istanbul.Backend.Validators
func (sb *backend) Validators(proposal istanbul.Proposal) istanbul.ValidatorSet {
snap, err := sb.snapshot(sb.chain, proposal.Number().Uint64(), proposal.Hash(), nil)
if err != nil {
return validator.NewSet(nil, sb.config.ProposerPolicy)
}
return snap.ValSet
return sb.getValidators(proposal.Number().Uint64(), proposal.Hash())
}

// Broadcast implements istanbul.Backend.Send
Expand Down Expand Up @@ -220,3 +217,33 @@ func (sb *backend) CheckSignature(data []byte, address common.Address, sig []byt
}
return nil
}

// HasBlock implements istanbul.Backend.HashBlock
func (sb *backend) HasBlock(hash common.Hash, number *big.Int) bool {
return sb.chain.GetHeader(hash, number.Uint64()) != nil
}

// GetProposer implements istanbul.Backend.GetProposer
func (sb *backend) GetProposer(number uint64) common.Address {
if h := sb.chain.GetHeaderByNumber(number); h != nil {
a, _ := sb.Author(h)
return a
}
return common.Address{}
}

// ParentValidators implements istanbul.Backend.GetParentValidators
func (sb *backend) ParentValidators(proposal istanbul.Proposal) istanbul.ValidatorSet {
if block, ok := proposal.(*types.Block); ok {
return sb.getValidators(block.Number().Uint64()-1, block.ParentHash())
}
return validator.NewSet(nil, sb.config.ProposerPolicy)
}

func (sb *backend) getValidators(number uint64, hash common.Hash) istanbul.ValidatorSet {
snap, err := sb.snapshot(sb.chain, number, hash, nil)
if err != nil {
return validator.NewSet(nil, sb.config.ProposerPolicy)
}
return snap.ValSet
}
6 changes: 3 additions & 3 deletions consensus/istanbul/core/backlog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestCheckMessage(t *testing.T) {
current: newRoundState(&istanbul.View{
Sequence: big.NewInt(1),
Round: big.NewInt(0),
}, newTestValidatorSet(4)),
}, newTestValidatorSet(4), common.Hash{}, nil),
}

// invalid view format
Expand Down Expand Up @@ -209,7 +209,7 @@ func TestProcessFutureBacklog(t *testing.T) {
current: newRoundState(&istanbul.View{
Sequence: big.NewInt(1),
Round: big.NewInt(0),
}, newTestValidatorSet(4)),
}, newTestValidatorSet(4), common.Hash{}, nil),
state: StateAcceptRequest,
}
c.subscribeEvents()
Expand Down Expand Up @@ -294,7 +294,7 @@ func testProcessBacklog(t *testing.T, msg *message) {
current: newRoundState(&istanbul.View{
Sequence: big.NewInt(1),
Round: big.NewInt(0),
}, newTestValidatorSet(4)),
}, newTestValidatorSet(4), common.Hash{}, nil),
}
c.subscribeEvents()
defer c.unsubscribeEvents()
Expand Down
17 changes: 16 additions & 1 deletion consensus/istanbul/core/commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,26 @@ package core
import (
"reflect"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus/istanbul"
)

func (c *core) sendCommit() {
sub := c.current.Subject()
c.broadcastCommit(sub)
}

func (c *core) sendCommitForOldBlock(view *istanbul.View, digest common.Hash) {
sub := &istanbul.Subject{
View: view,
Digest: digest,
}
c.broadcastCommit(sub)
}

func (c *core) broadcastCommit(sub *istanbul.Subject) {
logger := c.logger.New("state", c.state)

sub := c.current.Subject()
encodedSubject, err := Encode(sub)
if err != nil {
logger.Error("Failed to encode", "subject", sub)
Expand Down Expand Up @@ -60,6 +73,8 @@ func (c *core) handleCommit(msg *message, src istanbul.Validator) error {
// If we already have a proposal, we may have chance to speed up the consensus process
// by committing the proposal without prepare messages.
if c.current.Commits.Size() > 2*c.valSet.F() && c.state.Cmp(StateCommitted) < 0 {
// Still need to call LockBlock here since state can skip Prepared state and jump directly to Committed state.
c.current.LockHash()
c.commit()
}

Expand Down
27 changes: 24 additions & 3 deletions consensus/istanbul/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ func (c *core) commit() {
}

if err := c.backend.Commit(proposal, committedSeals); err != nil {
c.current.UnlockHash() //Unlock block when insertion fails
c.sendNextRoundChange()
return
}
Expand All @@ -187,13 +188,21 @@ func (c *core) startNewRound(newView *istanbul.View, roundChange bool) {
// Clear invalid round change messages
c.roundChangeSet = newRoundChangeSet(c.valSet)
// New snapshot for new round
c.current = newRoundState(newView, c.valSet)
c.updateRoundState(newView, c.valSet)
// Calculate new proposer
c.valSet.CalcProposer(c.lastProposer, newView.Round.Uint64())
c.waitingForRoundChange = false
c.setState(StateAcceptRequest)
if roundChange && c.isProposer() {
c.backend.NextRound()
// If it is locked, propose the old proposal
if c.current.IsHashLocked() {
r := &istanbul.Request{
Proposal: c.current.Proposal(), //c.current.Proposal would be the locked proposal by previous proposer, see updateRoundState
}
c.sendPreprepare(r)
} else {
c.backend.NextRound()
}
}
c.newRoundChangeTimer()

Expand All @@ -207,13 +216,25 @@ func (c *core) catchUpRound(view *istanbul.View) {
c.roundMeter.Mark(new(big.Int).Sub(view.Round, c.current.Round()).Int64())
}
c.waitingForRoundChange = true
c.current = newRoundState(view, c.valSet)

//Needs to keep block lock for round catching up
c.updateRoundState(view, c.valSet)
c.roundChangeSet.Clear(view.Round)
c.newRoundChangeTimer()

logger.Trace("Catch up round", "new_round", view.Round, "new_seq", view.Sequence, "new_proposer", c.valSet)
}

// updateRoundState updates round state by checking if locking block is necessary
func (c *core) updateRoundState(view *istanbul.View, validatorSet istanbul.ValidatorSet) {
// Lock only if both keepLock is true and it is locked
if c.current.IsHashLocked() {
c.current = newRoundState(view, validatorSet, c.current.GetLockedHash(), c.current.Preprepare)
} else {
c.current = newRoundState(view, validatorSet, common.Hash{}, nil)
}
}

func (c *core) setState(state State) {
if c.state != state {
c.state = state
Expand Down
7 changes: 5 additions & 2 deletions consensus/istanbul/core/prepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,18 @@ func (c *core) handlePrepare(msg *message, src istanbul.Validator) error {
return err
}

// If it is locked, it can only process on the locked block.
// Passing verifyPrepare and checkMessage implies it is processing on the locked block since it was verified in the Preprepare step.
if err := c.verifyPrepare(prepare, src); err != nil {
return err
}

c.acceptPrepare(msg, src)

// Change to StatePrepared if we've received enough prepare messages
// Change to StatePrepared if we've received enough prepare messages or it is locked
// and we are in earlier state before StatePrepared
if c.current.Prepares.Size() > 2*c.valSet.F() && c.state.Cmp(StatePrepared) < 0 {
if (c.current.IsHashLocked() || c.current.Prepares.Size() > 2*c.valSet.F()) && c.state.Cmp(StatePrepared) < 0 {
c.current.LockHash()
c.setState(StatePrepared)
c.sendCommit()
}
Expand Down
37 changes: 34 additions & 3 deletions consensus/istanbul/core/preprepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,21 @@ func (c *core) handlePreprepare(msg *message, src istanbul.Validator) error {
}

// Ensure we have the same view with the preprepare message
// If it is old message, see if we need to broadcast COMMIT
if err := c.checkMessage(msgPreprepare, preprepare.View); err != nil {
if err == errOldMessage {
// Get validator set for the given proposal
valSet := c.backend.ParentValidators(preprepare.Proposal).Copy()
previousProposer := c.backend.GetProposer(preprepare.Proposal.Number().Uint64() - 1)
valSet.CalcProposer(previousProposer, preprepare.View.Round.Uint64())
// Broadcast COMMIT if it is an existing block
// 1. The proposer needs to be a proposer matches the given (Sequence + Round)
// 2. The given block must exist
if valSet.IsProposer(src.Address()) && c.backend.HasBlock(preprepare.Proposal.Hash(), preprepare.Proposal.Number()) {
c.sendCommitForOldBlock(preprepare.View, preprepare.Proposal.Hash())
return nil
}
}
return err
}

Expand Down Expand Up @@ -84,10 +98,27 @@ func (c *core) handlePreprepare(msg *message, src istanbul.Validator) error {
return err
}

// Here is about to accept the preprepare
if c.state == StateAcceptRequest {
c.acceptPreprepare(preprepare)
c.setState(StatePreprepared)
c.sendPrepare()
// If it is locked, it can only process on the locked block
// Otherwise, broadcast PREPARE and enter Prepared state
if c.current.IsHashLocked() {
// Broadcast COMMIT directly if the proposal matches the locked block
// Otherwise, send ROUND CHANGE
if preprepare.Proposal.Hash() == c.current.GetLockedHash() {
// Broadcast COMMIT and enters Prepared state directly
c.acceptPreprepare(preprepare)
c.setState(StatePrepared)
c.sendCommit()
} else {
// Send round change
c.sendNextRoundChange()
}
} else {
c.acceptPreprepare(preprepare)
c.setState(StatePreprepared)
c.sendPrepare()
}
}

return nil
Expand Down
5 changes: 3 additions & 2 deletions consensus/istanbul/core/request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"testing"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus/istanbul"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
Expand All @@ -35,7 +36,7 @@ func TestCheckRequestMsg(t *testing.T) {
current: newRoundState(&istanbul.View{
Sequence: big.NewInt(1),
Round: big.NewInt(0),
}, newTestValidatorSet(4)),
}, newTestValidatorSet(4), common.Hash{}, nil),
}

// invalid request
Expand Down Expand Up @@ -90,7 +91,7 @@ func TestStoreRequestMsg(t *testing.T) {
current: newRoundState(&istanbul.View{
Sequence: big.NewInt(0),
Round: big.NewInt(0),
}, newTestValidatorSet(4)),
}, newTestValidatorSet(4), common.Hash{}, nil),
pendingRequests: prque.New(),
pendingRequestsMu: new(sync.Mutex),
}
Expand Down
43 changes: 41 additions & 2 deletions consensus/istanbul/core/roundstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,23 @@ import (
"math/big"
"sync"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus/istanbul"
"github.com/ethereum/go-ethereum/rlp"
)

func newRoundState(view *istanbul.View, validatorSet istanbul.ValidatorSet) *roundState {
// newRoundState creates a new roundState instance with the given view and validatorSet
// lockedHash and preprepare are for round change when lock exists,
// we need to keep a reference of preprepare in order to propose locked proposal when there is a lock and itself is the proposer
func newRoundState(view *istanbul.View, validatorSet istanbul.ValidatorSet, lockedHash common.Hash, preprepare *istanbul.Preprepare) *roundState {
return &roundState{
round: view.Round,
sequence: view.Sequence,
Preprepare: nil,
Preprepare: preprepare,
Prepares: newMessageSet(validatorSet),
Commits: newMessageSet(validatorSet),
Checkpoints: newMessageSet(validatorSet),
lockedHash: lockedHash,
mu: new(sync.RWMutex),
}
}
Expand All @@ -45,6 +50,7 @@ type roundState struct {
Prepares *messageSet
Commits *messageSet
Checkpoints *messageSet
lockedHash common.Hash

mu *sync.RWMutex
}
Expand Down Expand Up @@ -112,6 +118,36 @@ func (s *roundState) Sequence() *big.Int {
return s.sequence
}

func (s *roundState) LockHash() {
s.mu.Lock()
defer s.mu.Unlock()

if s.Preprepare != nil {
s.lockedHash = s.Preprepare.Proposal.Hash()
}
}

func (s *roundState) UnlockHash() {
s.mu.Lock()
defer s.mu.Unlock()

s.lockedHash = common.Hash{}
}

func (s *roundState) IsHashLocked() bool {
s.mu.RLock()
defer s.mu.RUnlock()

return s.lockedHash != common.Hash{}
}

func (s *roundState) GetLockedHash() common.Hash {
s.mu.RLock()
defer s.mu.RUnlock()

return s.lockedHash
}

// The DecodeRLP method should read one value from the given
// Stream. It is not forbidden to read less or more, but it might
// be confusing.
Expand All @@ -123,6 +159,7 @@ func (s *roundState) DecodeRLP(stream *rlp.Stream) error {
Prepares *messageSet
Commits *messageSet
Checkpoints *messageSet
lockedHash common.Hash
}

if err := stream.Decode(&ss); err != nil {
Expand All @@ -134,6 +171,7 @@ func (s *roundState) DecodeRLP(stream *rlp.Stream) error {
s.Prepares = ss.Prepares
s.Commits = ss.Commits
s.Checkpoints = ss.Checkpoints
s.lockedHash = ss.lockedHash
s.mu = new(sync.RWMutex)

return nil
Expand All @@ -158,5 +196,6 @@ func (s *roundState) EncodeRLP(w io.Writer) error {
s.Prepares,
s.Commits,
s.Checkpoints,
s.lockedHash,
})
}
Loading

0 comments on commit 7daa006

Please sign in to comment.