Skip to content

Commit

Permalink
Merge pull request #127 from getamis/feature/add-block-lock
Browse files Browse the repository at this point in the history
consensus/istanbul: add block lock
  • Loading branch information
yutelin authored Jul 24, 2017
2 parents 265828f + 09ea540 commit d4ae30d
Show file tree
Hide file tree
Showing 15 changed files with 405 additions and 27 deletions.
10 changes: 10 additions & 0 deletions consensus/istanbul/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package istanbul

import (
"math/big"
"time"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -54,4 +55,13 @@ type Backend interface {
// CheckSignature verifies the signature by checking if it's signed by
// the given validator
CheckSignature(data []byte, addr common.Address, sig []byte) error

// HasBlock checks if the combination of the given hash and height matches any existing blocks
HasBlock(hash common.Hash, number *big.Int) bool

// GetProposer returns the proposer of the given block height
GetProposer(number uint64) common.Address

// ParentValidators returns the validator set of the given proposal's parent block
ParentValidators(proposal Proposal) ValidatorSet
}
37 changes: 32 additions & 5 deletions consensus/istanbul/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package backend

import (
"crypto/ecdsa"
"math/big"
"sync"
"time"

Expand Down Expand Up @@ -92,11 +93,7 @@ func (sb *backend) Address() common.Address {

// Validators implements istanbul.Backend.Validators
func (sb *backend) Validators(proposal istanbul.Proposal) istanbul.ValidatorSet {
snap, err := sb.snapshot(sb.chain, proposal.Number().Uint64(), proposal.Hash(), nil)
if err != nil {
return validator.NewSet(nil, sb.config.ProposerPolicy)
}
return snap.ValSet
return sb.getValidators(proposal.Number().Uint64(), proposal.Hash())
}

// Broadcast implements istanbul.Backend.Send
Expand Down Expand Up @@ -220,3 +217,33 @@ func (sb *backend) CheckSignature(data []byte, address common.Address, sig []byt
}
return nil
}

// HasBlock implements istanbul.Backend.HashBlock
func (sb *backend) HasBlock(hash common.Hash, number *big.Int) bool {
return sb.chain.GetHeader(hash, number.Uint64()) != nil
}

// GetProposer implements istanbul.Backend.GetProposer
func (sb *backend) GetProposer(number uint64) common.Address {
if h := sb.chain.GetHeaderByNumber(number); h != nil {
a, _ := sb.Author(h)
return a
}
return common.Address{}
}

// ParentValidators implements istanbul.Backend.GetParentValidators
func (sb *backend) ParentValidators(proposal istanbul.Proposal) istanbul.ValidatorSet {
if block, ok := proposal.(*types.Block); ok {
return sb.getValidators(block.Number().Uint64()-1, block.ParentHash())
}
return validator.NewSet(nil, sb.config.ProposerPolicy)
}

func (sb *backend) getValidators(number uint64, hash common.Hash) istanbul.ValidatorSet {
snap, err := sb.snapshot(sb.chain, number, hash, nil)
if err != nil {
return validator.NewSet(nil, sb.config.ProposerPolicy)
}
return snap.ValSet
}
38 changes: 38 additions & 0 deletions consensus/istanbul/backend/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,44 @@ func TestCommit(t *testing.T) {
}
}

func TestHasBlock(t *testing.T) {
chain, engine := newBlockChain(1)
block := makeBlockWithoutSeal(chain, engine, chain.Genesis())
finalBlock, _ := engine.Seal(chain, block, nil)
chain.InsertChain(types.Blocks{finalBlock})
if engine.HasBlock(block.Hash(), finalBlock.Number()) {
t.Errorf("error mismatch: have true, want false")
}
if !engine.HasBlock(finalBlock.Hash(), finalBlock.Number()) {
t.Errorf("error mismatch: have false, want true")
}
}

func TestGetProposer(t *testing.T) {
chain, engine := newBlockChain(1)
block := makeBlock(chain, engine, chain.Genesis())
chain.InsertChain(types.Blocks{block})
expected := engine.GetProposer(1)
actual := engine.Address()
if actual != expected {
t.Errorf("proposer mismatch: have %v, want %v", actual.Hex(), expected.Hex())
}
}

func TestParentValidators(t *testing.T) {
chain, engine := newBlockChain(1)
block := makeBlock(chain, engine, chain.Genesis())
chain.InsertChain(types.Blocks{block})
expected := engine.Validators(block).List()
//Block without seal will make empty validator set
block = makeBlockWithoutSeal(chain, engine, block)
chain.InsertChain(types.Blocks{block})
actual := engine.ParentValidators(block).List()
if len(expected) != len(actual) || expected[0] != actual[0] {
t.Errorf("validator set mismatch: have %v, want %v", actual, expected)
}
}

/**
* SimpleBackend
* Private key: bb047e5940b6d83354d9432db7c449ac8fca2248008aaa7271369880f9f11cc1
Expand Down
6 changes: 3 additions & 3 deletions consensus/istanbul/core/backlog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestCheckMessage(t *testing.T) {
current: newRoundState(&istanbul.View{
Sequence: big.NewInt(1),
Round: big.NewInt(0),
}, newTestValidatorSet(4)),
}, newTestValidatorSet(4), common.Hash{}, nil),
}

// invalid view format
Expand Down Expand Up @@ -209,7 +209,7 @@ func TestProcessFutureBacklog(t *testing.T) {
current: newRoundState(&istanbul.View{
Sequence: big.NewInt(1),
Round: big.NewInt(0),
}, newTestValidatorSet(4)),
}, newTestValidatorSet(4), common.Hash{}, nil),
state: StateAcceptRequest,
}
c.subscribeEvents()
Expand Down Expand Up @@ -294,7 +294,7 @@ func testProcessBacklog(t *testing.T, msg *message) {
current: newRoundState(&istanbul.View{
Sequence: big.NewInt(1),
Round: big.NewInt(0),
}, newTestValidatorSet(4)),
}, newTestValidatorSet(4), common.Hash{}, nil),
}
c.subscribeEvents()
defer c.unsubscribeEvents()
Expand Down
17 changes: 16 additions & 1 deletion consensus/istanbul/core/commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,26 @@ package core
import (
"reflect"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus/istanbul"
)

func (c *core) sendCommit() {
sub := c.current.Subject()
c.broadcastCommit(sub)
}

func (c *core) sendCommitForOldBlock(view *istanbul.View, digest common.Hash) {
sub := &istanbul.Subject{
View: view,
Digest: digest,
}
c.broadcastCommit(sub)
}

func (c *core) broadcastCommit(sub *istanbul.Subject) {
logger := c.logger.New("state", c.state)

sub := c.current.Subject()
encodedSubject, err := Encode(sub)
if err != nil {
logger.Error("Failed to encode", "subject", sub)
Expand Down Expand Up @@ -60,6 +73,8 @@ func (c *core) handleCommit(msg *message, src istanbul.Validator) error {
// If we already have a proposal, we may have chance to speed up the consensus process
// by committing the proposal without prepare messages.
if c.current.Commits.Size() > 2*c.valSet.F() && c.state.Cmp(StateCommitted) < 0 {
// Still need to call LockBlock here since state can skip Prepared state and jump directly to Committed state.
c.current.LockHash()
c.commit()
}

Expand Down
10 changes: 9 additions & 1 deletion consensus/istanbul/core/commit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,9 @@ OUTER:
if err != test.expectedErr {
t.Errorf("error mismatch: have %v, want %v", err, test.expectedErr)
}
if r0.current.IsHashLocked() {
t.Errorf("block should not be locked")
}
continue OUTER
}
}
Expand All @@ -191,7 +194,9 @@ OUTER:
if r0.current.Commits.Size() > 2*r0.valSet.F() {
t.Errorf("the size of commit messages should be less than %v", 2*r0.valSet.F()+1)
}

if r0.current.IsHashLocked() {
t.Errorf("block should not be locked")
}
continue
}

Expand All @@ -214,6 +219,9 @@ OUTER:
if signedCount <= 2*r0.valSet.F() {
t.Errorf("the expected signed count should be larger than %v, but got %v", 2*r0.valSet.F(), signedCount)
}
if !r0.current.IsHashLocked() {
t.Errorf("block should be locked")
}
}
}

Expand Down
27 changes: 24 additions & 3 deletions consensus/istanbul/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ func (c *core) commit() {
}

if err := c.backend.Commit(proposal, committedSeals); err != nil {
c.current.UnlockHash() //Unlock block when insertion fails
c.sendNextRoundChange()
return
}
Expand All @@ -188,13 +189,21 @@ func (c *core) startNewRound(newView *istanbul.View, roundChange bool) {
// Clear invalid round change messages
c.roundChangeSet = newRoundChangeSet(c.valSet)
// New snapshot for new round
c.current = newRoundState(newView, c.valSet)
c.updateRoundState(newView, c.valSet, roundChange)
// Calculate new proposer
c.valSet.CalcProposer(c.lastProposer, newView.Round.Uint64())
c.waitingForRoundChange = false
c.setState(StateAcceptRequest)
if roundChange && c.isProposer() {
c.backend.NextRound()
// If it is locked, propose the old proposal
if c.current != nil && c.current.IsHashLocked() {
r := &istanbul.Request{
Proposal: c.current.Proposal(), //c.current.Proposal would be the locked proposal by previous proposer, see updateRoundState
}
c.sendPreprepare(r)
} else {
c.backend.NextRound()
}
}
c.newRoundChangeTimer()

Expand All @@ -208,13 +217,25 @@ func (c *core) catchUpRound(view *istanbul.View) {
c.roundMeter.Mark(new(big.Int).Sub(view.Round, c.current.Round()).Int64())
}
c.waitingForRoundChange = true
c.current = newRoundState(view, c.valSet)

//Needs to keep block lock for round catching up
c.updateRoundState(view, c.valSet, true)
c.roundChangeSet.Clear(view.Round)
c.newRoundChangeTimer()

logger.Trace("Catch up round", "new_round", view.Round, "new_seq", view.Sequence, "new_proposer", c.valSet)
}

// updateRoundState updates round state by checking if locking block is necessary
func (c *core) updateRoundState(view *istanbul.View, validatorSet istanbul.ValidatorSet, roundChange bool) {
// Lock only if both roundChange is true and it is locked
if roundChange && c.current != nil && c.current.IsHashLocked() {
c.current = newRoundState(view, validatorSet, c.current.GetLockedHash(), c.current.Preprepare)
} else {
c.current = newRoundState(view, validatorSet, common.Hash{}, nil)
}
}

func (c *core) setState(state State) {
if c.state != state {
c.state = state
Expand Down
7 changes: 5 additions & 2 deletions consensus/istanbul/core/prepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,18 @@ func (c *core) handlePrepare(msg *message, src istanbul.Validator) error {
return err
}

// If it is locked, it can only process on the locked block.
// Passing verifyPrepare and checkMessage implies it is processing on the locked block since it was verified in the Preprepare step.
if err := c.verifyPrepare(prepare, src); err != nil {
return err
}

c.acceptPrepare(msg, src)

// Change to StatePrepared if we've received enough prepare messages
// Change to StatePrepared if we've received enough prepare messages or it is locked
// and we are in earlier state before StatePrepared
if c.current.Prepares.Size() > 2*c.valSet.F() && c.state.Cmp(StatePrepared) < 0 {
if (c.current.IsHashLocked() || c.current.Prepares.Size() > 2*c.valSet.F()) && c.state.Cmp(StatePrepared) < 0 {
c.current.LockHash()
c.setState(StatePrepared)
c.sendCommit()
}
Expand Down
9 changes: 9 additions & 0 deletions consensus/istanbul/core/prepare_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,9 @@ OUTER:
if err != test.expectedErr {
t.Errorf("error mismatch: have %v, want %v", err, test.expectedErr)
}
if r0.current.IsHashLocked() {
t.Errorf("block should not be locked")
}
continue OUTER
}
}
Expand All @@ -214,6 +217,9 @@ OUTER:
if r0.current.Prepares.Size() > 2*r0.valSet.F() {
t.Errorf("the size of prepare messages should be less than %v", 2*r0.valSet.F()+1)
}
if r0.current.IsHashLocked() {
t.Errorf("block should not be locked")
}

continue
}
Expand Down Expand Up @@ -246,6 +252,9 @@ OUTER:
if !reflect.DeepEqual(m, expectedSubject) {
t.Errorf("subject mismatch: have %v, want %v", m, expectedSubject)
}
if !r0.current.IsHashLocked() {
t.Errorf("block should be locked")
}
}
}

Expand Down
37 changes: 34 additions & 3 deletions consensus/istanbul/core/preprepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,21 @@ func (c *core) handlePreprepare(msg *message, src istanbul.Validator) error {
}

// Ensure we have the same view with the preprepare message
// If it is old message, see if we need to broadcast COMMIT
if err := c.checkMessage(msgPreprepare, preprepare.View); err != nil {
if err == errOldMessage {
// Get validator set for the given proposal
valSet := c.backend.ParentValidators(preprepare.Proposal).Copy()
previousProposer := c.backend.GetProposer(preprepare.Proposal.Number().Uint64() - 1)
valSet.CalcProposer(previousProposer, preprepare.View.Round.Uint64())
// Broadcast COMMIT if it is an existing block
// 1. The proposer needs to be a proposer matches the given (Sequence + Round)
// 2. The given block must exist
if valSet.IsProposer(src.Address()) && c.backend.HasBlock(preprepare.Proposal.Hash(), preprepare.Proposal.Number()) {
c.sendCommitForOldBlock(preprepare.View, preprepare.Proposal.Hash())
return nil
}
}
return err
}

Expand Down Expand Up @@ -84,10 +98,27 @@ func (c *core) handlePreprepare(msg *message, src istanbul.Validator) error {
return err
}

// Here is about to accept the preprepare
if c.state == StateAcceptRequest {
c.acceptPreprepare(preprepare)
c.setState(StatePreprepared)
c.sendPrepare()
// If it is locked, it can only process on the locked block
// Otherwise, broadcast PREPARE and enter Prepared state
if c.current.IsHashLocked() {
// Broadcast COMMIT directly if the proposal matches the locked block
// Otherwise, send ROUND CHANGE
if preprepare.Proposal.Hash() == c.current.GetLockedHash() {
// Broadcast COMMIT and enters Prepared state directly
c.acceptPreprepare(preprepare)
c.setState(StatePrepared)
c.sendCommit()
} else {
// Send round change
c.sendNextRoundChange()
}
} else {
c.acceptPreprepare(preprepare)
c.setState(StatePreprepared)
c.sendPrepare()
}
}

return nil
Expand Down
Loading

0 comments on commit d4ae30d

Please sign in to comment.