Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[sovereign] Bugfix notifier bootstrapper + cleanup notarized headers #6622

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions cmd/sovereignnode/chainSimulator/tests/esdt/issueSc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,12 @@ func TestSovereignChainSimulator_SmartContract_IssueToken(t *testing.T) {
PathToInitialConfig: defaultPathToInitialConfig,
GenesisTimestamp: time.Now().Unix(),
RoundDurationInMillis: uint64(6000),
RoundsPerEpoch: core.OptionalUint64{},
ApiInterface: api.NewNoApiInterface(),
MinNodesPerShard: 2,
RoundsPerEpoch: core.OptionalUint64{
HasValue: true,
Value: 25,
},
ApiInterface: api.NewNoApiInterface(),
MinNodesPerShard: 2,
AlterConfigsFunction: func(cfg *config.Configs) {
cfg.SystemSCConfig.ESDTSystemSCConfig.BaseIssuingCost = issuePrice
},
Expand All @@ -59,10 +62,10 @@ func TestSovereignChainSimulator_SmartContract_IssueToken(t *testing.T) {

defer cs.Close()

time.Sleep(time.Second) // wait for VM to be ready for processing queries
err = cs.GenerateBlocksUntilEpochIsReached(6)
require.Nil(t, err)

nodeHandler := cs.GetNodeHandler(core.SovereignChainShardId)

systemScAddress := chainSim.GetSysAccBytesAddress(t, nodeHandler)

wallet, err := cs.GenerateAndMintWalletAddress(core.SovereignChainShardId, big.NewInt(0).Mul(chainSim.OneEGLD, big.NewInt(100)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,7 @@ func checkCrossNotarizedHeader(t *testing.T, round int, crossNotarizedHeader dat

if round == 1 { // first cross header is dummy with nonce 0
require.Equal(t, uint64(0), crossNotarizedHeader.GetNonce())
} else if round == 2 { // first cross notarized header is 10000000
} else { // cross header with nonce is notarized
require.Equal(t, headerNonce, crossNotarizedHeader.GetNonce())
} else { // cross header with nonce-1 is notarized
require.Equal(t, headerNonce-1, crossNotarizedHeader.GetNonce())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func TestSovereignChainSimulator_AddIncomingHeaderCase1(t *testing.T) {
previousExtendedHeader = getExtendedHeader(t, nodeHandler, incomingHdr)
}

require.Equal(t, uint32(2), nodeHandler.GetCoreComponents().EpochNotifier().CurrentEpoch())
require.Equal(t, uint32(9), nodeHandler.GetCoreComponents().EpochNotifier().CurrentEpoch())
}

// In this test we simulate:
Expand Down
72 changes: 55 additions & 17 deletions cmd/sovereignnode/notifier/notifierBootstrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,16 @@ import (
"syscall"
"time"

"github.com/multiversx/mx-chain-core-go/core"
"github.com/multiversx/mx-chain-core-go/core/check"
"github.com/multiversx/mx-chain-go/errors"
"github.com/multiversx/mx-chain-go/process"
logger "github.com/multiversx/mx-chain-logger-go"
notifierProcess "github.com/multiversx/mx-chain-sovereign-notifier-go/process"

"github.com/multiversx/mx-chain-go/errors"
"github.com/multiversx/mx-chain-go/process"
)

const roundsThreshold = process.MaxRoundsWithoutNewBlockReceived + 1

var log = logger.GetOrCreate("notifier-bootstrap")

// ArgsNotifierBootstrapper defines args needed to create a new notifier bootstrapper
Expand All @@ -32,9 +34,9 @@ type notifierBootstrapper struct {
forkDetector process.ForkDetector
sigStopNode chan os.Signal

nodeSyncedChan chan bool
cancelFunc func()
roundDuration uint64
syncedRoundsChan chan int32
cancelFunc func()
roundDuration uint64
}

// NewNotifierBootstrapper creates a ws receiver connection registration bootstrapper
Expand All @@ -47,7 +49,7 @@ func NewNotifierBootstrapper(args ArgsNotifierBootstrapper) (*notifierBootstrapp
incomingHeaderHandler: args.IncomingHeaderHandler,
sovereignNotifier: args.SovereignNotifier,
forkDetector: args.ForkDetector,
nodeSyncedChan: make(chan bool, 1),
syncedRoundsChan: make(chan int32, 1),
cancelFunc: nil,
roundDuration: args.RoundDuration,
sigStopNode: args.SigStopNode,
Expand Down Expand Up @@ -80,14 +82,20 @@ func checkArgs(args ArgsNotifierBootstrapper) error {

func (nb *notifierBootstrapper) receivedSyncState(isNodeSynchronized bool) {
if isNodeSynchronized && nb.forkDetector.GetHighestFinalBlockNonce() != 0 {
select {
case nb.nodeSyncedChan <- true:
default:
}
nb.writeRoundsDeltaOnChan(1)
} else if !isNodeSynchronized {
nb.writeRoundsDeltaOnChan(-1)
}
}

func (nb *notifierBootstrapper) writeRoundsDeltaOnChan(delta int32) {
select {
case nb.syncedRoundsChan <- delta:
default:
}
}

// Start will start waiting on a go routine to be notified via nodeSyncedChan when the sovereign node is synced.
// Start will start waiting on a go routine to be notified via syncedRoundsChan when the sovereign node is synced.
// Meanwhile, it will print the current node state in log. When node is fully synced, it will register the incoming header
// processor to the websocket listener and exit the waiting loop.
func (nb *notifierBootstrapper) Start() {
Expand All @@ -97,22 +105,30 @@ func (nb *notifierBootstrapper) Start() {
}

func (nb *notifierBootstrapper) checkNodeState(ctx context.Context) {
timeToWaitReSync := (process.MaxRoundsWithoutNewBlockReceived + 1) * nb.roundDuration
timeToWaitReSync := uint64(roundsThreshold) * nb.roundDuration
ticker := time.NewTicker(time.Duration(timeToWaitReSync) * time.Millisecond)
defer ticker.Stop()

var syncedRounds uint32

for {
select {
case <-ctx.Done():
log.Debug("notifierBootstrapper.checkNodeState: worker's go routine is stopping...")
return
case <-nb.nodeSyncedChan:
case delta := <-nb.syncedRoundsChan:
syncedRounds = updateSyncedRounds(syncedRounds, delta)
if syncedRounds < uint32(roundsThreshold) {
log.Debug("notifierBootstrapper.checkNodeState", "syncedRounds", syncedRounds)
continue
}

err := nb.sovereignNotifier.RegisterHandler(nb.incomingHeaderHandler)
if err != nil {
log.Error("notifierBootstrapper: sovereignNotifier.RegisterHandler", "err", err)
nb.sigStopNode <- syscall.SIGTERM
} else {
log.Debug("notifierBootstrapper.checkNodeState", "is node synced", true)
log.Info("notifierBootstrapper.checkNodeState", "is node synced", true)
}

return
Expand All @@ -122,17 +138,39 @@ func (nb *notifierBootstrapper) checkNodeState(ctx context.Context) {
}
}

func updateSyncedRounds(syncedRounds uint32, delta int32) uint32 {
if delta > 0 {
syncedRounds += uint32(delta)
} else if syncedRounds > 0 {
syncedRounds--
}

return syncedRounds
}

// Close cancels current context and empties channel reads
func (nb *notifierBootstrapper) Close() error {
if nb.cancelFunc != nil {
nb.cancelFunc()
}

nrReads := core.EmptyChannel(nb.nodeSyncedChan)
log.Debug("notifierBootstrapper: emptied channel", "nodeSyncedChan nrReads", nrReads)
nrReads := emptyChannel(nb.syncedRoundsChan)
log.Debug("notifierBootstrapper: emptied channel", "syncedRoundsChan nrReads", nrReads)
return nil
}

func emptyChannel(ch chan int32) int {
readsCnt := 0
for {
select {
case <-ch:
readsCnt++
default:
return readsCnt
}
}
}

// IsInterfaceNil checks if the underlying pointer is nil
func (nb *notifierBootstrapper) IsInterfaceNil() bool {
return nb == nil
Expand Down
111 changes: 87 additions & 24 deletions cmd/sovereignnode/notifier/notifierBootstrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,25 @@ package notifier
import (
"context"
"errors"
"math/rand"
"os"
"reflect"
"runtime"
"sync"
"sync/atomic"
"syscall"
"testing"
"time"

notifierProcess "github.com/multiversx/mx-chain-sovereign-notifier-go/process"
"github.com/multiversx/mx-chain-sovereign-notifier-go/testscommon"
"github.com/stretchr/testify/require"

errorsMx "github.com/multiversx/mx-chain-go/errors"
"github.com/multiversx/mx-chain-go/integrationTests/mock"
"github.com/multiversx/mx-chain-go/process"
processMocks "github.com/multiversx/mx-chain-go/process/mock"
"github.com/multiversx/mx-chain-go/testscommon/sovereign"
notifierProcess "github.com/multiversx/mx-chain-sovereign-notifier-go/process"
"github.com/multiversx/mx-chain-sovereign-notifier-go/testscommon"
"github.com/stretchr/testify/require"
)

func createArgs() ArgsNotifierBootstrapper {
Expand Down Expand Up @@ -93,23 +97,23 @@ func TestNotifierBootstrapper_Start(t *testing.T) {
},
}

registerCalledCt := 0
registerCalledCt := atomic.Int64{}
args.SovereignNotifier = &testscommon.SovereignNotifierStub{
RegisterHandlerCalled: func(handler notifierProcess.IncomingHeaderSubscriber) error {
require.Equal(t, args.IncomingHeaderHandler, handler)
registerCalledCt++
registerCalledCt.Add(1)
return nil
},
}

getHighestNonceCalledCt := 0
getHighestNonceCalledCt := atomic.Int64{}
args.ForkDetector = &mock.ForkDetectorStub{
GetHighestFinalBlockNonceCalled: func() uint64 {
defer func() {
getHighestNonceCalledCt++
getHighestNonceCalledCt.Add(1)
}()

return uint64(getHighestNonceCalledCt)
return uint64(getHighestNonceCalledCt.Load())
},
}

Expand All @@ -124,32 +128,90 @@ func TestNotifierBootstrapper_Start(t *testing.T) {
}()

time.Sleep(time.Millisecond * 50)
require.Zero(t, registerCalledCt)
require.Zero(t, getHighestNonceCalledCt)
require.Zero(t, registerCalledCt.Load())
require.Zero(t, getHighestNonceCalledCt.Load())

nb.receivedSyncState(false)
time.Sleep(time.Millisecond * 50)
require.Zero(t, registerCalledCt)
require.Zero(t, registerCalledCt)
require.Zero(t, registerCalledCt.Load())
require.Zero(t, getHighestNonceCalledCt.Load())

nb.receivedSyncState(true)
time.Sleep(time.Millisecond * 50)
require.Zero(t, registerCalledCt)
require.Equal(t, 1, getHighestNonceCalledCt)
require.Zero(t, registerCalledCt.Load())
require.Equal(t, int64(1), getHighestNonceCalledCt.Load())

nb.receivedSyncState(true)
time.Sleep(time.Millisecond * 50)
require.Equal(t, 1, registerCalledCt)
require.Equal(t, 2, getHighestNonceCalledCt)
require.Zero(t, registerCalledCt.Load())
require.Equal(t, int64(2), getHighestNonceCalledCt.Load())

for i := int64(0); i < 10; i++ {
nb.receivedSyncState(false)
time.Sleep(time.Millisecond * 50)
require.Zero(t, registerCalledCt.Load())
require.Equal(t, int64(2), getHighestNonceCalledCt.Load())
}
for i := int64(1); i < roundsThreshold; i++ {
nb.receivedSyncState(true)
time.Sleep(time.Millisecond * 50)
require.Zero(t, registerCalledCt.Load())
require.Equal(t, i+2, getHighestNonceCalledCt.Load())
}

for i := 3; i < 10; i++ {
for i := roundsThreshold; i < roundsThreshold+10; i++ {
nb.receivedSyncState(true)
time.Sleep(time.Millisecond * 50)
require.Equal(t, 1, registerCalledCt)
require.Equal(t, i, getHighestNonceCalledCt)
require.Equal(t, int64(1), registerCalledCt.Load())
require.Equal(t, int64(i+2), getHighestNonceCalledCt.Load())
}
}

func TestNotifierBootstrapper_Start_ConcurrencyTest(t *testing.T) {
t.Parallel()

args := createArgs()

getHighestNonceCalledCt := atomic.Int64{}
args.ForkDetector = &mock.ForkDetectorStub{
GetHighestFinalBlockNonceCalled: func() uint64 {
defer func() {
getHighestNonceCalledCt.Add(1)
}()

return uint64(getHighestNonceCalledCt.Load())
},
}

nb, _ := NewNotifierBootstrapper(args)
nb.Start()

defer func() {
err := nb.Close()
require.Nil(t, err)
}()

numGoRoutines := 1000
wg := sync.WaitGroup{}
wg.Add(numGoRoutines)

for i := 0; i < numGoRoutines; i++ {
go func(idx int) {
isSynced := false
if rand.Int31n(100) < 51 {
isSynced = true
}

nb.receivedSyncState(isSynced)
wg.Done()
}(i)
}

wg.Wait()
require.NotZero(t, getHighestNonceCalledCt.Load())
require.True(t, getHighestNonceCalledCt.Load() < int64(numGoRoutines))
}

func TestNotifierBootstrapper_StartWithRegisterFailing(t *testing.T) {
t.Parallel()

Expand All @@ -159,13 +221,13 @@ func TestNotifierBootstrapper_StartWithRegisterFailing(t *testing.T) {
args.SigStopNode = sigStopNodeMock
args.RoundDuration = 10

registerCalledCt := 0
registerCalledCt := atomic.Int64{}
args.SovereignNotifier = &testscommon.SovereignNotifierStub{
RegisterHandlerCalled: func(handler notifierProcess.IncomingHeaderSubscriber) error {
require.Equal(t, args.IncomingHeaderHandler, handler)

defer func() {
registerCalledCt++
registerCalledCt.Add(1)
}()

return errors.New("local error")
Expand All @@ -179,6 +241,7 @@ func TestNotifierBootstrapper_StartWithRegisterFailing(t *testing.T) {
}

nb, _ := NewNotifierBootstrapper(args)
nb.syncedRoundsChan <- roundsThreshold - 1

nb.Start()

Expand All @@ -188,11 +251,11 @@ func TestNotifierBootstrapper_StartWithRegisterFailing(t *testing.T) {
}()

time.Sleep(time.Millisecond * 200)
require.Zero(t, registerCalledCt)
require.Zero(t, registerCalledCt.Load())

nb.receivedSyncState(true)
time.Sleep(time.Millisecond * 50)
require.Equal(t, 1, registerCalledCt)
require.Equal(t, int64(1), registerCalledCt.Load())

select {
case sig := <-sigStopNodeMock:
Expand All @@ -205,7 +268,7 @@ func TestNotifierBootstrapper_StartWithRegisterFailing(t *testing.T) {
for i := 0; i < 10; i++ {
nb.receivedSyncState(true)
time.Sleep(time.Millisecond * 50)
require.Equal(t, 1, registerCalledCt)
require.Equal(t, int64(1), registerCalledCt.Load())
}
}

Expand Down
Loading
Loading