Skip to content

Commit

Permalink
#186 Implement invalid ethereum block to prevent corrupted state
Browse files Browse the repository at this point in the history
  • Loading branch information
ggarri committed Jun 27, 2019
1 parent 78b3a3b commit 5334036
Show file tree
Hide file tree
Showing 8 changed files with 150 additions and 625 deletions.
47 changes: 44 additions & 3 deletions consensus/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

// maxTransactionSize is 32KB in order to prevent DOS attacks
const maxTransactionSize = 32768
const minBlockDurationInSeconds int64 = 1;

// TendermintABCI is the main hook of application layer (blockchain) for connecting to consensus (Tendermint) using ABCI.
//
Expand Down Expand Up @@ -100,7 +101,20 @@ func (abci *TendermintABCI) InitChain(req tmtAbciTypes.RequestInitChain) tmtAbci
// - Optional Key-Value tags for filtering and indexing
func (abci *TendermintABCI) BeginBlock(req tmtAbciTypes.RequestBeginBlock) tmtAbciTypes.ResponseBeginBlock {
abci.logger.Debug("Beginning new block", "hash", req.Hash)
abci.db.UpdateBlockState(&req.Header)

// TO REMOVE: test code
// if req.Header.Height == 3 {
// req.Header.Time = abci.db.ParentBlockTime()
// }

err := abci.db.UpdateBlockState(req.Header)

// Note: Tendermint does not expect errors from BeginBlock. In case this block is achieve consensus
// the database chain will end up corrupted, therefore skip block seems to be the only alternative
// ISSUE: https://github.com/tendermint/tendermint/issues/3755
if err != nil {
abci.logger.Error("Invalid block", "height", req.Header.Height, "msg", err.Error())
}

return tmtAbciTypes.ResponseBeginBlock{}
}
Expand All @@ -125,6 +139,12 @@ func (abci *TendermintABCI) BeginBlock(req tmtAbciTypes.RequestBeginBlock) tmtAb
func (abci *TendermintABCI) CheckTx(txBytes []byte) tmtAbciTypes.ResponseCheckTx {
abci.metrics.CheckTxsTotal.Add(1)

if abci.db.InvalidBlockState() {
abci.logger.Error("Cannot checkTX on invalid block")
abci.metrics.CheckErrTxsTotal.Add(1, "INVALID_BLOCK")
return tmtAbciTypes.ResponseCheckTx{Code: 1, Log: "INVALID_BLOCK"}
}

tx, err := decodeRLP(txBytes)
if err != nil {
abci.logger.Error("Unable to decode RLP TX", "err", err.Error())
Expand Down Expand Up @@ -219,6 +239,13 @@ func (abci *TendermintABCI) doMempoolValidation(tx *ethTypes.Transaction, from c
// E.g: ("account.owner": "Bob", "balance": "100.0", "time": "2018-01-02T12:30:00Z")
func (abci *TendermintABCI) DeliverTx(txBytes []byte) tmtAbciTypes.ResponseDeliverTx {
abci.metrics.DeliverTxsTotal.Add(1)

if abci.db.InvalidBlockState() {
abci.logger.Error("Cannot DeliverTx on invalid block")
abci.metrics.DeliverErrTxsTotal.Add(1, "INVALID_BLOCK")
return tmtAbciTypes.ResponseDeliverTx{Code: 1, Log: "INVALID_BLOCK"}
}

tx, err := decodeRLP(txBytes)
if err != nil {
abci.logger.Error(err.Error())
Expand Down Expand Up @@ -253,6 +280,13 @@ func (abci *TendermintABCI) DeliverTx(txBytes []byte) tmtAbciTypes.ResponseDeliv
func (abci *TendermintABCI) EndBlock(req tmtAbciTypes.RequestEndBlock) tmtAbciTypes.ResponseEndBlock {
abci.logger.Debug(fmt.Sprintf("Ending new block at height '%d'", req.Height))

// bDurationInSec := time.Now().Unix() - int64(abci.db.GetBlockStateHeader().Time)
// if bDurationInSec < minBlockDurationInSeconds {
// sleepDurationInSec := time.Duration(minBlockDurationInSeconds - bDurationInSec)
// abci.logger.Debug(fmt.Sprintf("Waiting for %d seconds to end block...", sleepDurationInSec))
// time.Sleep(sleepDurationInSec * time.Second)
// }

return tmtAbciTypes.ResponseEndBlock{}
}

Expand All @@ -263,14 +297,21 @@ func (abci *TendermintABCI) EndBlock(req tmtAbciTypes.RequestEndBlock) tmtAbciTy
// It's critical that all application instances return the same hash. If not, they will not be able
// to agree on the next block, because the hash is included in the next block!
func (abci *TendermintABCI) Commit() tmtAbciTypes.ResponseCommit {
abci.metrics.CommitBlockTotal.Add(1)
if abci.db.InvalidBlockState() {
abci.logger.Error("Invalid bloc, persisting an empty valid block...")
abci.ResetBlockState()
}

block, err := abci.db.Persist(abci.RewardReceiver())
if err != nil {
abci.logger.Error("Error getting latest database state", "err", err)
abci.metrics.CommitErrBlockTotal.Add(1, "UNABLE_TO_PERSIST")
panic(err)
// panic(err)
abci.ResetBlockState()
return tmtAbciTypes.ResponseCommit{Data: abci.db.ParentBlockRoot().Bytes()}
}

abci.metrics.CommitBlockTotal.Add(1)
ethState, err := abci.getCurrentDBState()
if err != nil {
abci.logger.Error("Error getting next latest state", "err", err)
Expand Down
15 changes: 15 additions & 0 deletions consensus/abci/application.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package abci

import "github.com/tendermint/tendermint/abci/types"

var _ types.Application = (*Application)(nil)

type Application struct {
types.BaseApplication
}

func (app *Application) BeginBlock(req types.Request_BeginBlock) (types.ResponseBeginBlock, error) {
return types.ResponseBeginBlock{}, nil
}


23 changes: 19 additions & 4 deletions database/blockstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type blockState struct {
txIndex int
transactions []*ethTypes.Transaction
receipts ethTypes.Receipts
isInvalid bool

totalUsedGas uint64
gp *core.GasPool
Expand All @@ -41,6 +42,12 @@ type blockState struct {
// Logic copied from `core/state_processor.go` `(p *StateProcessor) Process` that gets
// normally executed on block persist.
func (bs *blockState) execTx(bc *core.BlockChain, config *eth.Config, chainConfig *params.ChainConfig, blockHash common.Hash, tx *ethTypes.Transaction) tmtAbciTypes.ResponseDeliverTx {
if bs.isInvalid == true {
return tmtAbciTypes.ResponseDeliverTx{
Code: tmtCode.CodeTypeEncodingError,
Log: fmt.Sprintf("Cannot exec tx in this isInvalid block %d", bs.header.Number.Int64())}
}

// TODO: Investigate if snapshot should be used `snapshot := bs.state.Snapshot()`
bs.state.Prepare(tx.Hash(), blockHash, bs.txIndex)
receipt, _, err := core.ApplyTransaction(
Expand Down Expand Up @@ -72,6 +79,10 @@ func (bs *blockState) execTx(bc *core.BlockChain, config *eth.Config, chainConfi
//
// Returns the persisted Block.
func (bs *blockState) persist(bc *core.BlockChain, db ethdb.Database) (ethTypes.Block, error) {
if bs.isInvalid == true {
return ethTypes.Block{}, fmt.Errorf("Cannot persist an invalid block %d", bs.header.Number.Int64())
}

rootHash, err := bs.state.Commit(false)
if err != nil {
return ethTypes.Block{}, err
Expand All @@ -88,10 +99,14 @@ func (bs *blockState) persist(bc *core.BlockChain, db ethdb.Database) (ethTypes.
return *block, nil
}

func (bs *blockState) updateBlockState(config *params.ChainConfig, parentTime uint64, numTx uint64) {
func (bs *blockState) updateBlockState(config params.ChainConfig, blockTime uint64, numTx uint64) {
parentHeader := bs.parent.Header()
bs.header.Time = new(big.Int).SetUint64(parentTime).Uint64()
bs.header.Difficulty = ethash.CalcDifficulty(config, parentTime, parentHeader)
bs.header.Time = new(big.Int).SetUint64(blockTime).Uint64()
bs.header.Difficulty = ethash.CalcDifficulty(&config, blockTime, parentHeader)
bs.transactions = make([]*ethTypes.Transaction, 0, numTx)
bs.receipts = make([]*ethTypes.Receipt, 0, numTx)
}
}

func (bs *blockState) invalidate() {
bs.isInvalid = true
}
34 changes: 29 additions & 5 deletions database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/lightstreams-network/lightchain/database/metrics"
"github.com/lightstreams-network/lightchain/database/web3"
"time"
)

// Database manages the underlying ethereum state for storage and processing
Expand Down Expand Up @@ -88,23 +89,46 @@ func (db *Database) Persist(receiver common.Address) (ethTypes.Block, error) {
db.metrics.PersistedTxsTotal.Add(float64(len(db.ethState.blockState.transactions)))
db.metrics.ChaindbHeight.Set(float64(db.ethState.blockState.header.Number.Uint64()))

return db.ethState.Persist(receiver)
return db.ethState.Persist(*db.eth.APIBackend.ChainConfig(), receiver)
}

// ResetBlockState resets the in-memory block's processing state.
func (db *Database) ResetBlockState(receiver common.Address) error {
db.logger.Debug("Resetting DB BlockState", "receiver", receiver.Hex())
return db.ethState.ResetBlockState(receiver)
return db.ethState.ResetBlockState(*db.eth.APIBackend.ChainConfig(), receiver)
}

// UpdateBlockState uses the tendermint header to update the eth header.
func (db *Database) UpdateBlockState(tmHeader *tmtAbciTypes.Header) {
func (db *Database) UpdateBlockState(tmHeader tmtAbciTypes.Header) error {
db.logger.Debug("Updating DB BlockState")
db.ethState.UpdateBlockState(
db.eth.APIBackend.ChainConfig(),
err := db.ethState.UpdateBlockState(
*db.eth.APIBackend.ChainConfig(),
uint64(tmHeader.Time.Unix()),
uint64(tmHeader.GetNumTxs()),
)

if err != nil {
return err
}

return nil
}

func (db *Database) InvalidBlockState() bool {
return db.ethState.blockState.isInvalid == true
}

// TO REMOVE: test code
func (db *Database) ParentBlockTime() time.Time {
return time.Unix(int64(db.ethState.blockState.parent.Time()), 0)
}

func (db *Database) ParentBlockRoot() common.Hash {
return db.ethState.blockState.parent.Root()
}

func (db *Database) GetBlockStateHeader() ethTypes.Header {
return *db.ethState.blockState.header
}

// GasLimit returns the maximum gas per block.
Expand Down
35 changes: 23 additions & 12 deletions database/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ import (
ethTypes "github.com/ethereum/go-ethereum/core/types"
tmtAbciTypes "github.com/tendermint/tendermint/abci/types"
tmtLog "github.com/tendermint/tendermint/libs/log"
"github.com/ethereum/go-ethereum/consensus/ethash"
)

//----------------------------------------------------------------------
// ----------------------------------------------------------------------
// EthState manages concurrent access to the intermediate blockState object
// The eth tx pool fires TxPreEvent in a go-routine,
// and the miner subscribes to this in another go-routine and processes the tx onto
Expand All @@ -40,7 +41,7 @@ func NewEthState(ethereum *eth.Ethereum, ethCfg *eth.Config, logger tmtLog.Logge
return &EthState{
ethereum: ethereum,
ethConfig: ethCfg,
logger: logger,
logger: logger,
}
}

Expand All @@ -63,7 +64,7 @@ func (es *EthState) ExecuteTx(tx *ethTypes.Transaction) tmtAbciTypes.ResponseDel
// Triggered by ABCI::Commit(), to persist changes introduced in latest block.
//
// Returns the persisted Block.
func (es *EthState) Persist(receiver common.Address) (ethTypes.Block, error) {
func (es *EthState) Persist(config params.ChainConfig, receiver common.Address) (ethTypes.Block, error) {
es.mtx.Lock()
defer es.mtx.Unlock()

Expand All @@ -72,30 +73,30 @@ func (es *EthState) Persist(receiver common.Address) (ethTypes.Block, error) {
return ethTypes.Block{}, err
}

err = es.resetBlockState(receiver)
err = es.resetBlockState(config, receiver)
if err != nil {
return ethTypes.Block{}, err
}

return block, err
}

func (es *EthState) ResetBlockState(receiver common.Address) error {
func (es *EthState) ResetBlockState(config params.ChainConfig, receiver common.Address) error {
es.mtx.Lock()
defer es.mtx.Unlock()

return es.resetBlockState(receiver)
return es.resetBlockState(config, receiver)
}

func (es *EthState) resetBlockState(receiver common.Address) error {
func (es *EthState) resetBlockState(config params.ChainConfig, receiver common.Address) error {
blockchain := es.ethereum.BlockChain()
bcState, err := blockchain.State()
if err != nil {
return err
}

currentBlock := blockchain.CurrentBlock()
ethHeader := newBlockHeader(receiver, currentBlock)
ethHeader := newBlockHeader(config, receiver, currentBlock)

es.blockState = blockState{
header: ethHeader,
Expand All @@ -108,11 +109,19 @@ func (es *EthState) resetBlockState(receiver common.Address) error {
return nil
}

func (es *EthState) UpdateBlockState(config *params.ChainConfig, parentTime uint64, numTx uint64) {
func (es *EthState) UpdateBlockState(config params.ChainConfig, blockTime uint64, numTx uint64) error {
es.mtx.Lock()
defer es.mtx.Unlock()

es.blockState.updateBlockState(config, parentTime, numTx)
es.blockState.updateBlockState(config, blockTime, numTx)
bc := es.ethereum.BlockChain()
err := bc.Engine().VerifyHeader(bc, es.blockState.header, false)
if err != nil {
es.blockState.invalidate()
return err
}

return nil
}

func (es *EthState) GasLimit() *core.GasPool {
Expand All @@ -134,11 +143,13 @@ func (es *EthState) Pending() (*ethTypes.Block, *state.StateDB) {
), es.blockState.state.Copy()
}

func newBlockHeader(receiver common.Address, prevBlock *ethTypes.Block) *ethTypes.Header {
func newBlockHeader(config params.ChainConfig, receiver common.Address, prevBlock *ethTypes.Block) *ethTypes.Header {
return &ethTypes.Header{
Number: prevBlock.Number().Add(prevBlock.Number(), big.NewInt(1)),
ParentHash: prevBlock.Hash(),
GasLimit: core.CalcGasLimit(prevBlock, prevBlock.GasLimit(), prevBlock.GasLimit()),
Difficulty: ethash.CalcDifficulty(&config, prevBlock.Time() + 1, prevBlock.Header()),
Coinbase: receiver,
Time: prevBlock.Time() + 1, // Adding min gap between consecutive blocks
}
}
}
Loading

0 comments on commit 5334036

Please sign in to comment.