diff --git a/.github/workflows/create_release.yml b/.github/workflows/create_release.yml index 083f58809ee..9916e67d744 100644 --- a/.github/workflows/create_release.yml +++ b/.github/workflows/create_release.yml @@ -111,8 +111,8 @@ jobs: env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} run: | - hub release create --draft --message="Release draft from Github Actions" vNext + gh release create --draft --notes="Release draft from Github Actions" vNext sleep 10 for i in $(find ./assets -name '*.tgz' -type f); do - hub release edit --attach=${i} --message="" vNext + gh release upload vNext ${i} done diff --git a/cmd/node/config/gasSchedules/gasScheduleV1.toml b/cmd/node/config/gasSchedules/gasScheduleV1.toml index a0664f4a13c..52175a228ee 100644 --- a/cmd/node/config/gasSchedules/gasScheduleV1.toml +++ b/cmd/node/config/gasSchedules/gasScheduleV1.toml @@ -18,7 +18,7 @@ SetGuardian = 250000 GuardAccount = 250000 UnGuardAccount = 250000 - TrieLoadPerNode = 20000 + TrieLoadPerNode = 100000 TrieStorePerNode = 50000 [MetaChainSystemSCsCost] diff --git a/cmd/node/config/gasSchedules/gasScheduleV2.toml b/cmd/node/config/gasSchedules/gasScheduleV2.toml index b75b56cbb74..38157aebf7a 100644 --- a/cmd/node/config/gasSchedules/gasScheduleV2.toml +++ b/cmd/node/config/gasSchedules/gasScheduleV2.toml @@ -18,7 +18,7 @@ SetGuardian = 250000 GuardAccount = 250000 UnGuardAccount = 250000 - TrieLoadPerNode = 20000 + TrieLoadPerNode = 100000 TrieStorePerNode = 50000 [MetaChainSystemSCsCost] diff --git a/cmd/node/config/gasSchedules/gasScheduleV3.toml b/cmd/node/config/gasSchedules/gasScheduleV3.toml index 2972ea5d953..3767f02833b 100644 --- a/cmd/node/config/gasSchedules/gasScheduleV3.toml +++ b/cmd/node/config/gasSchedules/gasScheduleV3.toml @@ -18,7 +18,7 @@ SetGuardian = 250000 GuardAccount = 250000 UnGuardAccount = 250000 - TrieLoadPerNode = 20000 + TrieLoadPerNode = 100000 TrieStorePerNode = 50000 [MetaChainSystemSCsCost] diff --git a/cmd/node/config/gasSchedules/gasScheduleV4.toml b/cmd/node/config/gasSchedules/gasScheduleV4.toml index 2e4956f47e9..f7d8e3a0a1f 100644 --- a/cmd/node/config/gasSchedules/gasScheduleV4.toml +++ b/cmd/node/config/gasSchedules/gasScheduleV4.toml @@ -18,7 +18,7 @@ SetGuardian = 250000 GuardAccount = 250000 UnGuardAccount = 250000 - TrieLoadPerNode = 20000 + TrieLoadPerNode = 100000 TrieStorePerNode = 50000 [MetaChainSystemSCsCost] diff --git a/cmd/node/config/gasSchedules/gasScheduleV5.toml b/cmd/node/config/gasSchedules/gasScheduleV5.toml index 65aa269d033..9e2b3ae7d2a 100644 --- a/cmd/node/config/gasSchedules/gasScheduleV5.toml +++ b/cmd/node/config/gasSchedules/gasScheduleV5.toml @@ -18,7 +18,7 @@ SetGuardian = 250000 GuardAccount = 250000 UnGuardAccount = 250000 - TrieLoadPerNode = 20000 + TrieLoadPerNode = 100000 TrieStorePerNode = 50000 [MetaChainSystemSCsCost] diff --git a/cmd/node/config/gasSchedules/gasScheduleV6.toml b/cmd/node/config/gasSchedules/gasScheduleV6.toml index 1a4cac3b059..82c658a151a 100644 --- a/cmd/node/config/gasSchedules/gasScheduleV6.toml +++ b/cmd/node/config/gasSchedules/gasScheduleV6.toml @@ -18,7 +18,7 @@ SetGuardian = 250000 GuardAccount = 250000 UnGuardAccount = 250000 - TrieLoadPerNode = 20000 + TrieLoadPerNode = 100000 TrieStorePerNode = 50000 [MetaChainSystemSCsCost] diff --git a/cmd/node/config/gasSchedules/gasScheduleV7.toml b/cmd/node/config/gasSchedules/gasScheduleV7.toml index 441bb321a22..f3930be81a1 100644 --- a/cmd/node/config/gasSchedules/gasScheduleV7.toml +++ b/cmd/node/config/gasSchedules/gasScheduleV7.toml @@ -19,7 +19,7 @@ SetGuardian = 250000 GuardAccount = 250000 UnGuardAccount = 250000 - TrieLoadPerNode = 20000 + TrieLoadPerNode = 100000 TrieStorePerNode = 50000 [MetaChainSystemSCsCost] diff --git a/examples/construction_test.go b/examples/construction_test.go index 18151b7705a..150a9306033 100644 --- a/examples/construction_test.go +++ b/examples/construction_test.go @@ -157,20 +157,21 @@ func TestConstructTransaction_WithGuardianFields(t *testing.T) { GasLimit: 150000, Data: []byte("test data field"), ChainID: []byte("local-testnet"), - Version: 1, + Version: 2, + Options: 2, } tx.GuardianAddr = getPubkeyOfAddress(t, "erd1x23lzn8483xs2su4fak0r0dqx6w38enpmmqf2yrkylwq7mfnvyhsxqw57y") tx.GuardianSignature = bytes.Repeat([]byte{0}, 64) tx.Signature = computeTransactionSignature(t, alicePrivateKeyHex, tx) - require.Equal(t, "540ad16e46f379f9adcb7b26c07b16a56f10c624c489103679e488c0a0cb996c71dbc0d765cf925e58cd493d09d8c1d619946618ebd8a2fb924b236b8137c706", hex.EncodeToString(tx.Signature)) + require.Equal(t, "e574d78b19e1481a6b9575c162e66f2f906a3178aec537509356385c4f1a5330a9b73a87a456fc6d7041e93b5f8a1231a92fb390174872a104a0929215600c0c", hex.EncodeToString(tx.Signature)) data, _ := contentMarshalizer.Marshal(tx) - require.Equal(t, "085c120e00018ee90ff6181f3761632000001a208049d639e5a6980d1cd2392abcce41029cda74a1563523a202f09641cc2618f82a200139472eff6886771a982f3083da5d421f24c29181e63888228dc81ca60d69e1388094ebdc0340f093094a0f746573742064617461206669656c64520d6c6f63616c2d746573746e657458016240540ad16e46f379f9adcb7b26c07b16a56f10c624c489103679e488c0a0cb996c71dbc0d765cf925e58cd493d09d8c1d619946618ebd8a2fb924b236b8137c706722032a3f14cf53c4d0543954f6cf1bda0369d13e661dec095107627dc0f6d33612f7a4000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", hex.EncodeToString(data)) + require.Equal(t, "085c120e00018ee90ff6181f3761632000001a208049d639e5a6980d1cd2392abcce41029cda74a1563523a202f09641cc2618f82a200139472eff6886771a982f3083da5d421f24c29181e63888228dc81ca60d69e1388094ebdc0340f093094a0f746573742064617461206669656c64520d6c6f63616c2d746573746e657458026240e574d78b19e1481a6b9575c162e66f2f906a3178aec537509356385c4f1a5330a9b73a87a456fc6d7041e93b5f8a1231a92fb390174872a104a0929215600c0c6802722032a3f14cf53c4d0543954f6cf1bda0369d13e661dec095107627dc0f6d33612f7a4000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", hex.EncodeToString(data)) txHash := contentHasher.Compute(string(data)) - require.Equal(t, "a5e63b5bf3b7eeb347cad1aa742770a29c7a88e59ac99cdc60dc612ebdc8a7d4", hex.EncodeToString(txHash)) + require.Equal(t, "242022e9dcfa0ee1d8199b0043314dbda8601619f70069ebc441b9f03349a35c", hex.EncodeToString(txHash)) } func TestConstructTransaction_WithNonceZero(t *testing.T) { diff --git a/state/accountsDB.go b/state/accountsDB.go index bc41d151da1..2e3471edfb0 100644 --- a/state/accountsDB.go +++ b/state/accountsDB.go @@ -19,6 +19,7 @@ import ( "github.com/multiversx/mx-chain-go/common/errChan" "github.com/multiversx/mx-chain-go/common/holders" "github.com/multiversx/mx-chain-go/state/iteratorChannelsProvider" + "github.com/multiversx/mx-chain-go/state/lastSnapshotMarker" "github.com/multiversx/mx-chain-go/state/parsers" "github.com/multiversx/mx-chain-go/state/stateMetrics" "github.com/multiversx/mx-chain-go/trie/keyBuilder" @@ -28,11 +29,8 @@ import ( ) const ( - leavesChannelSize = 100 - missingNodesChannelSize = 100 - lastSnapshot = "lastSnapshot" - waitTimeForSnapshotEpochCheck = time.Millisecond * 100 - snapshotWaitTimeout = time.Minute + leavesChannelSize = 100 + missingNodesChannelSize = 100 ) type loadingMeasurements struct { @@ -138,6 +136,7 @@ func NewAccountsDB(args ArgsAccountsDB) (*AccountsDB, error) { StateMetrics: sm, ChannelsProvider: iteratorChannelsProvider.NewUserStateIteratorChannelsProvider(), AccountFactory: args.AccountFactory, + LastSnapshotMarker: lastSnapshotMarker.NewLastSnapshotMarker(), } snapshotManager, err := NewSnapshotsManager(argsSnapshotsManager) if err != nil { diff --git a/state/accountsDB_test.go b/state/accountsDB_test.go index 98cbd657034..2083c1c8154 100644 --- a/state/accountsDB_test.go +++ b/state/accountsDB_test.go @@ -25,6 +25,7 @@ import ( "github.com/multiversx/mx-chain-go/state" "github.com/multiversx/mx-chain-go/state/accounts" "github.com/multiversx/mx-chain-go/state/factory" + "github.com/multiversx/mx-chain-go/state/lastSnapshotMarker" "github.com/multiversx/mx-chain-go/state/parsers" "github.com/multiversx/mx-chain-go/state/storagePruningManager" "github.com/multiversx/mx-chain-go/state/storagePruningManager/disabled" @@ -1011,7 +1012,7 @@ func TestAccountsDB_SnapshotStateOnAClosedStorageManagerShouldNotMarkActiveDB(t activeDBWasPut = true } - if string(key) == state.LastSnapshotStarted { + if string(key) == lastSnapshotMarker.LastSnapshot { lastSnapshotStartedWasPut = true } @@ -1026,7 +1027,7 @@ func TestAccountsDB_SnapshotStateOnAClosedStorageManagerShouldNotMarkActiveDB(t mut.RLock() defer mut.RUnlock() - assert.True(t, lastSnapshotStartedWasPut) + assert.False(t, lastSnapshotStartedWasPut) assert.False(t, activeDBWasPut) } @@ -1059,17 +1060,20 @@ func TestAccountsDB_SnapshotStateWithErrorsShouldNotMarkActiveDB(t *testing.T) { activeDBWasPut = true } - if string(key) == state.LastSnapshotStarted { + if string(key) == lastSnapshotMarker.LastSnapshot { lastSnapshotStartedWasPut = true } return nil }, + GetLatestStorageEpochCalled: func() (uint32, error) { + return 1, nil + }, } }, } adb := generateAccountDBFromTrie(trieStub) - adb.SnapshotState([]byte("roothash"), 0) + adb.SnapshotState([]byte("roothash"), 1) time.Sleep(time.Second) mut.RLock() @@ -1106,14 +1110,14 @@ func TestAccountsDB_SnapshotStateSnapshotSameRootHash(t *testing.T) { rootHash1 := []byte("rootHash1") rootHash2 := []byte("rootHash2") - latestEpoch := uint32(0) + latestEpoch := atomic.Uint32{} snapshotMutex := sync.RWMutex{} takeSnapshotCalled := 0 trieStub := &trieMock.TrieStub{ GetStorageManagerCalled: func() common.StorageManager { return &storageManager.StorageManagerStub{ GetLatestStorageEpochCalled: func() (uint32, error) { - return latestEpoch, nil + return latestEpoch.Get(), nil }, TakeSnapshotCalled: func(_ string, _ []byte, _ []byte, iteratorChannels *common.TrieIteratorChannels, _ chan []byte, stats common.SnapshotStatisticsHandler, _ uint32) { snapshotMutex.Lock() @@ -1141,7 +1145,7 @@ func TestAccountsDB_SnapshotStateSnapshotSameRootHash(t *testing.T) { snapshotMutex.Unlock() // snapshot rootHash1 and epoch 1 - latestEpoch = 1 + latestEpoch.Set(1) adb.SnapshotState(rootHash1, 1) for adb.IsSnapshotInProgress() { time.Sleep(waitForOpToFinish) @@ -1151,7 +1155,7 @@ func TestAccountsDB_SnapshotStateSnapshotSameRootHash(t *testing.T) { snapshotMutex.Unlock() // snapshot rootHash1 and epoch 0 again - latestEpoch = 0 + latestEpoch.Set(0) adb.SnapshotState(rootHash1, 0) for adb.IsSnapshotInProgress() { time.Sleep(waitForOpToFinish) @@ -1179,7 +1183,7 @@ func TestAccountsDB_SnapshotStateSnapshotSameRootHash(t *testing.T) { snapshotMutex.Unlock() // snapshot rootHash2 and epoch 1 - latestEpoch = 1 + latestEpoch.Set(1) adb.SnapshotState(rootHash2, 1) for adb.IsSnapshotInProgress() { time.Sleep(waitForOpToFinish) @@ -1189,7 +1193,7 @@ func TestAccountsDB_SnapshotStateSnapshotSameRootHash(t *testing.T) { snapshotMutex.Unlock() // snapshot rootHash2 and epoch 1 again - latestEpoch = 1 + latestEpoch.Set(1) adb.SnapshotState(rootHash2, 1) for adb.IsSnapshotInProgress() { time.Sleep(waitForOpToFinish) @@ -1205,26 +1209,29 @@ func TestAccountsDB_SnapshotStateSkipSnapshotIfSnapshotInProgress(t *testing.T) rootHashes := [][]byte{[]byte("rootHash1"), []byte("rootHash2"), []byte("rootHash3"), []byte("rootHash4")} snapshotMutex := sync.RWMutex{} takeSnapshotCalled := 0 - numPutInEpochCalled := 0 + numPutInEpochCalled := atomic.Counter{} trieStub := &trieMock.TrieStub{ GetStorageManagerCalled: func() common.StorageManager { return &storageManager.StorageManagerStub{ GetLatestStorageEpochCalled: func() (uint32, error) { - return 0, nil + return uint32(mathRand.Intn(5)), nil }, TakeSnapshotCalled: func(_ string, _ []byte, _ []byte, iteratorChannels *common.TrieIteratorChannels, _ chan []byte, stats common.SnapshotStatisticsHandler, _ uint32) { snapshotMutex.Lock() takeSnapshotCalled++ close(iteratorChannels.LeavesChan) stats.SnapshotFinished() + for numPutInEpochCalled.Get() != 4 { + time.Sleep(time.Millisecond * 10) + } snapshotMutex.Unlock() }, PutInEpochCalled: func(key []byte, val []byte, epoch uint32) error { - assert.Equal(t, []byte(state.LastSnapshotStarted), key) - assert.Equal(t, rootHashes[epoch], val) + assert.Equal(t, []byte(lastSnapshotMarker.LastSnapshot), key) + assert.Equal(t, rootHashes[epoch-1], val) - numPutInEpochCalled++ + numPutInEpochCalled.Add(1) return nil }, } @@ -1232,7 +1239,8 @@ func TestAccountsDB_SnapshotStateSkipSnapshotIfSnapshotInProgress(t *testing.T) } adb := generateAccountDBFromTrie(trieStub) - for epoch, rootHash := range rootHashes { + for i, rootHash := range rootHashes { + epoch := i + 1 adb.SnapshotState(rootHash, uint32(epoch)) } for adb.IsSnapshotInProgress() { @@ -1242,7 +1250,7 @@ func TestAccountsDB_SnapshotStateSkipSnapshotIfSnapshotInProgress(t *testing.T) snapshotMutex.Lock() assert.Equal(t, 1, takeSnapshotCalled) snapshotMutex.Unlock() - assert.Equal(t, len(rootHashes), numPutInEpochCalled) + assert.Equal(t, len(rootHashes), int(numPutInEpochCalled.Get())) } func TestAccountsDB_SnapshotStateCallsRemoveFromAllActiveEpochs(t *testing.T) { @@ -1263,7 +1271,7 @@ func TestAccountsDB_SnapshotStateCallsRemoveFromAllActiveEpochs(t *testing.T) { }, RemoveFromAllActiveEpochsCalled: func(hash []byte) error { removeFromAllActiveEpochsCalled = true - assert.Equal(t, []byte(state.LastSnapshotStarted), hash) + assert.Equal(t, []byte(lastSnapshotMarker.LastSnapshot), hash) return nil }, } diff --git a/state/errors.go b/state/errors.go index 5a56aff40ff..14c5ba77a54 100644 --- a/state/errors.go +++ b/state/errors.go @@ -144,3 +144,6 @@ var ErrNilStateMetrics = errors.New("nil sstate metrics") // ErrNilChannelsProvider signals that a nil channels provider has been given var ErrNilChannelsProvider = errors.New("nil channels provider") + +// ErrNilLastSnapshotMarker signals that a nil last snapshot marker has been given +var ErrNilLastSnapshotMarker = errors.New("nil last snapshot marker") diff --git a/state/export_test.go b/state/export_test.go index 43810db3749..0045adc880c 100644 --- a/state/export_test.go +++ b/state/export_test.go @@ -1,17 +1,11 @@ package state import ( - "time" - "github.com/multiversx/mx-chain-core-go/marshal" "github.com/multiversx/mx-chain-go/common" - "github.com/multiversx/mx-chain-go/testscommon/storageManager" vmcommon "github.com/multiversx/mx-chain-vm-common-go" ) -// LastSnapshotStarted - -const LastSnapshotStarted = lastSnapshot - // LoadCode - func (adb *AccountsDB) LoadCode(accountHandler baseAccountHandler) error { return adb.loadCode(accountHandler) @@ -92,21 +86,6 @@ func (sm *snapshotsManager) GetLastSnapshotInfo() ([]byte, uint32) { return sm.lastSnapshot.rootHash, sm.lastSnapshot.epoch } -// GetStorageEpochChangeWaitArgs - -func GetStorageEpochChangeWaitArgs() storageEpochChangeWaitArgs { - return storageEpochChangeWaitArgs{ - Epoch: 1, - WaitTimeForSnapshotEpochCheck: time.Millisecond * 100, - SnapshotWaitTimeout: time.Second, - TrieStorageManager: &storageManager.StorageManagerStub{}, - } -} - -// WaitForStorageEpochChange -func (sm *snapshotsManager) WaitForStorageEpochChange(args storageEpochChangeWaitArgs) error { - return sm.waitForStorageEpochChange(args) -} - // NewNilSnapshotsManager - func NewNilSnapshotsManager() *snapshotsManager { return nil diff --git a/state/interface.go b/state/interface.go index 56dd0e1b8c4..e56ce456ad5 100644 --- a/state/interface.go +++ b/state/interface.go @@ -183,7 +183,7 @@ type DataTrie interface { } // PeerAccountHandler models a peer state account, which can journalize a normal account's data -// with some extra features like signing statistics or rating information +// with some extra features like signing statistics or rating information type PeerAccountHandler interface { SetBLSPublicKey([]byte) error GetRewardAddress() []byte @@ -265,3 +265,11 @@ type SignRate interface { GetNumSuccess() uint32 GetNumFailure() uint32 } + +// LastSnapshotMarker manages the lastSnapshot marker operations +type LastSnapshotMarker interface { + AddMarker(trieStorageManager common.StorageManager, epoch uint32, rootHash []byte) + RemoveMarker(trieStorageManager common.StorageManager, epoch uint32, rootHash []byte) + GetMarkerInfo(trieStorageManager common.StorageManager) ([]byte, error) + IsInterfaceNil() bool +} diff --git a/state/lastSnapshotMarker/lastSnapshotMarker.go b/state/lastSnapshotMarker/lastSnapshotMarker.go new file mode 100644 index 00000000000..852f36c4e0b --- /dev/null +++ b/state/lastSnapshotMarker/lastSnapshotMarker.go @@ -0,0 +1,79 @@ +package lastSnapshotMarker + +import ( + "sync" + + "github.com/multiversx/mx-chain-go/common" + "github.com/multiversx/mx-chain-go/storage/storageEpochChange" + logger "github.com/multiversx/mx-chain-logger-go" +) + +var log = logger.GetOrCreate("state/lastSnapshotMarker") + +const ( + // LastSnapshot is the marker for the last snapshot started + LastSnapshot = "lastSnapshot" +) + +type lastSnapshotMarker struct { + mutex sync.RWMutex + latestFinishedSnapshotEpoch uint32 +} + +// NewLastSnapshotMarker creates a new instance of lastSnapshotMarker +func NewLastSnapshotMarker() *lastSnapshotMarker { + return &lastSnapshotMarker{} +} + +// AddMarker adds a marker for the last snapshot started in the given epoch +func (lsm *lastSnapshotMarker) AddMarker(trieStorageManager common.StorageManager, epoch uint32, rootHash []byte) { + err := storageEpochChange.WaitForStorageEpochChange(storageEpochChange.StorageEpochChangeWaitArgs{ + TrieStorageManager: trieStorageManager, + Epoch: epoch, + WaitTimeForSnapshotEpochCheck: storageEpochChange.WaitTimeForSnapshotEpochCheck, + SnapshotWaitTimeout: storageEpochChange.SnapshotWaitTimeout, + }) + if err != nil { + log.Warn("err while waiting for storage epoch change", "err", err, "epoch", epoch, "rootHash", rootHash) + return + } + + lsm.mutex.Lock() + defer lsm.mutex.Unlock() + + if epoch <= lsm.latestFinishedSnapshotEpoch { + log.Debug("will not put lastSnapshot marker in epoch storage", + "epoch", epoch, + "latestFinishedSnapshotEpoch", lsm.latestFinishedSnapshotEpoch, + ) + return + } + + err = trieStorageManager.PutInEpoch([]byte(LastSnapshot), rootHash, epoch) + if err != nil { + log.Warn("could not set lastSnapshot", err, "rootHash", rootHash, "epoch", epoch, "rootHash", rootHash) + } +} + +// RemoveMarker removes the marker for the last snapshot started +func (lsm *lastSnapshotMarker) RemoveMarker(trieStorageManager common.StorageManager, epoch uint32, rootHash []byte) { + lsm.mutex.Lock() + defer lsm.mutex.Unlock() + + err := trieStorageManager.RemoveFromAllActiveEpochs([]byte(LastSnapshot)) + if err != nil { + log.Warn("could not remove lastSnapshot", err, "rootHash", rootHash, "epoch", epoch) + } + + lsm.latestFinishedSnapshotEpoch = epoch +} + +// GetMarkerInfo returns the root hash of the last snapshot started +func (lsm *lastSnapshotMarker) GetMarkerInfo(trieStorageManager common.StorageManager) ([]byte, error) { + return trieStorageManager.GetFromCurrentEpoch([]byte(LastSnapshot)) +} + +// IsInterfaceNil returns true if there is no value under the interface +func (lsm *lastSnapshotMarker) IsInterfaceNil() bool { + return lsm == nil +} diff --git a/state/lastSnapshotMarker/lastSnapshotMarker_test.go b/state/lastSnapshotMarker/lastSnapshotMarker_test.go new file mode 100644 index 00000000000..0cedf22a120 --- /dev/null +++ b/state/lastSnapshotMarker/lastSnapshotMarker_test.go @@ -0,0 +1,116 @@ +package lastSnapshotMarker + +import ( + "testing" + + "github.com/multiversx/mx-chain-go/testscommon/storageManager" + "github.com/stretchr/testify/assert" +) + +func TestNewLastSnapshotMarker(t *testing.T) { + t.Parallel() + + var lsm *lastSnapshotMarker + assert.True(t, lsm.IsInterfaceNil()) + + lsm = NewLastSnapshotMarker() + assert.False(t, lsm.IsInterfaceNil()) +} + +func TestLastSnapshotMarker_AddMarker(t *testing.T) { + t.Parallel() + + t.Run("err waiting for storage epoch change", func(t *testing.T) { + t.Parallel() + + trieStorageManager := &storageManager.StorageManagerStub{ + IsClosedCalled: func() bool { + return true + }, + PutInEpochCalled: func(_ []byte, _ []byte, _ uint32) error { + assert.Fail(t, "should not have been called") + return nil + }, + } + + lsm := NewLastSnapshotMarker() + lsm.AddMarker(trieStorageManager, 1, []byte("rootHash")) + }) + t.Run("epoch <= latestFinishedSnapshotEpoch", func(t *testing.T) { + t.Parallel() + + trieStorageManager := &storageManager.StorageManagerStub{ + PutInEpochCalled: func(_ []byte, _ []byte, _ uint32) error { + assert.Fail(t, "should not have been called") + return nil + }, + GetLatestStorageEpochCalled: func() (uint32, error) { + return 1, nil + }, + } + + lsm := NewLastSnapshotMarker() + lsm.latestFinishedSnapshotEpoch = 2 + lsm.AddMarker(trieStorageManager, 1, []byte("rootHash")) + }) + t.Run("lastSnapshot is saved in epoch", func(t *testing.T) { + t.Parallel() + + val := []byte("rootHash") + epoch := uint32(1) + putInEpochCalled := false + trieStorageManager := &storageManager.StorageManagerStub{ + PutInEpochCalled: func(key []byte, v []byte, e uint32) error { + putInEpochCalled = true + assert.Equal(t, []byte(LastSnapshot), key) + assert.Equal(t, val, v) + assert.Equal(t, epoch, e) + return nil + }, + GetLatestStorageEpochCalled: func() (uint32, error) { + return epoch, nil + }, + } + + lsm := NewLastSnapshotMarker() + lsm.AddMarker(trieStorageManager, epoch, val) + assert.True(t, putInEpochCalled) + }) +} + +func TestLastSnapshotMarker_RemoveMarker(t *testing.T) { + t.Parallel() + + removeIsCalled := false + trieStorageManager := &storageManager.StorageManagerStub{ + RemoveFromAllActiveEpochsCalled: func(_ []byte) error { + removeIsCalled = true + return nil + }, + } + + lsm := NewLastSnapshotMarker() + lsm.RemoveMarker(trieStorageManager, 5, []byte("rootHash")) + assert.True(t, removeIsCalled) + assert.Equal(t, uint32(5), lsm.latestFinishedSnapshotEpoch) +} + +func TestLastSnapshotMarker_GetMarkerInfo(t *testing.T) { + t.Parallel() + + getCalled := false + rootHash := []byte("rootHash") + trieStorageManager := &storageManager.StorageManagerStub{ + GetFromCurrentEpochCalled: func(bytes []byte) ([]byte, error) { + getCalled = true + assert.Equal(t, []byte(LastSnapshot), bytes) + return rootHash, nil + }, + } + + lsm := NewLastSnapshotMarker() + val, err := lsm.GetMarkerInfo(trieStorageManager) + assert.Nil(t, err) + assert.True(t, getCalled) + assert.Equal(t, rootHash, val) +} diff --git a/state/peerAccountsDB.go b/state/peerAccountsDB.go index 95a4d44cf25..58c8720e916 100644 --- a/state/peerAccountsDB.go +++ b/state/peerAccountsDB.go @@ -4,6 +4,7 @@ import ( "github.com/multiversx/mx-chain-core-go/core/check" "github.com/multiversx/mx-chain-go/common" "github.com/multiversx/mx-chain-go/state/iteratorChannelsProvider" + "github.com/multiversx/mx-chain-go/state/lastSnapshotMarker" "github.com/multiversx/mx-chain-go/state/stateMetrics" ) @@ -38,6 +39,7 @@ func NewPeerAccountsDB(args ArgsAccountsDB) (*PeerAccountsDB, error) { StateMetrics: sm, ChannelsProvider: iteratorChannelsProvider.NewPeerStateIteratorChannelsProvider(), AccountFactory: args.AccountFactory, + LastSnapshotMarker: lastSnapshotMarker.NewLastSnapshotMarker(), } snapshotManager, err := NewSnapshotsManager(argsSnapshotsManager) if err != nil { diff --git a/state/peerAccountsDB_test.go b/state/peerAccountsDB_test.go index 65beb8432dd..2d18cfb6b35 100644 --- a/state/peerAccountsDB_test.go +++ b/state/peerAccountsDB_test.go @@ -13,6 +13,7 @@ import ( "github.com/multiversx/mx-chain-go/common" "github.com/multiversx/mx-chain-go/process/mock" "github.com/multiversx/mx-chain-go/state" + "github.com/multiversx/mx-chain-go/state/lastSnapshotMarker" testState "github.com/multiversx/mx-chain-go/testscommon/state" "github.com/multiversx/mx-chain-go/testscommon/storageManager" trieMock "github.com/multiversx/mx-chain-go/testscommon/trie" @@ -433,7 +434,7 @@ func TestPeerAccountsDB_SnapshotStateOnAClosedStorageManagerShouldNotMarkActiveD activeDBWasPut = true } - if string(key) == state.LastSnapshotStarted { + if string(key) == lastSnapshotMarker.LastSnapshot { lastSnapshotStartedWasPut = true } @@ -451,7 +452,7 @@ func TestPeerAccountsDB_SnapshotStateOnAClosedStorageManagerShouldNotMarkActiveD mut.RLock() defer mut.RUnlock() - assert.True(t, lastSnapshotStartedWasPut) + assert.False(t, lastSnapshotStartedWasPut) assert.False(t, activeDBWasPut) } diff --git a/state/snapshotsManager.go b/state/snapshotsManager.go index fff80151cdd..c4375267b28 100644 --- a/state/snapshotsManager.go +++ b/state/snapshotsManager.go @@ -2,27 +2,17 @@ package state import ( "bytes" - "context" - "fmt" "sync" - "time" "github.com/multiversx/mx-chain-core-go/core" "github.com/multiversx/mx-chain-core-go/core/atomic" "github.com/multiversx/mx-chain-core-go/core/check" "github.com/multiversx/mx-chain-core-go/marshal" "github.com/multiversx/mx-chain-go/common" + "github.com/multiversx/mx-chain-go/storage/storageEpochChange" "github.com/multiversx/mx-chain-go/trie/storageMarker" ) -// storageEpochChangeWaitArgs are the args needed for calling the WaitForStorageEpochChange function -type storageEpochChangeWaitArgs struct { - TrieStorageManager common.StorageManager - Epoch uint32 - WaitTimeForSnapshotEpochCheck time.Duration - SnapshotWaitTimeout time.Duration -} - // ArgsNewSnapshotsManager are the args needed for creating a new snapshots manager type ArgsNewSnapshotsManager struct { ShouldSerializeSnapshots bool @@ -33,6 +23,7 @@ type ArgsNewSnapshotsManager struct { StateMetrics StateMetrics AccountFactory AccountFactory ChannelsProvider IteratorChannelsProvider + LastSnapshotMarker LastSnapshotMarker } type snapshotsManager struct { @@ -42,6 +33,7 @@ type snapshotsManager struct { processingMode common.NodeProcessingMode stateMetrics StateMetrics + lastSnapshotMarker LastSnapshotMarker marshaller marshal.Marshalizer addressConverter core.PubkeyConverter trieSyncer AccountsDBSyncer @@ -71,6 +63,9 @@ func NewSnapshotsManager(args ArgsNewSnapshotsManager) (*snapshotsManager, error if check.IfNil(args.AccountFactory) { return nil, ErrNilAccountFactory } + if check.IfNil(args.LastSnapshotMarker) { + return nil, ErrNilLastSnapshotMarker + } return &snapshotsManager{ isSnapshotInProgress: atomic.Flag{}, @@ -85,6 +80,7 @@ func NewSnapshotsManager(args ArgsNewSnapshotsManager) (*snapshotsManager, error channelsProvider: args.ChannelsProvider, mutex: sync.RWMutex{}, accountFactory: args.AccountFactory, + lastSnapshotMarker: args.LastSnapshotMarker, }, nil } @@ -136,7 +132,7 @@ func (sm *snapshotsManager) StartSnapshotAfterRestartIfNeeded(trieStorageManager } func (sm *snapshotsManager) getSnapshotRootHashAndEpoch(trieStorageManager common.StorageManager) ([]byte, uint32, error) { - rootHash, err := trieStorageManager.GetFromCurrentEpoch([]byte(lastSnapshot)) + rootHash, err := sm.lastSnapshotMarker.GetMarkerInfo(trieStorageManager) if err != nil { return nil, 0, err } @@ -215,10 +211,9 @@ func (sm *snapshotsManager) prepareSnapshot(rootHash []byte, epoch uint32, trieS return nil, true } - defer func() { - err := trieStorageManager.PutInEpoch([]byte(lastSnapshot), rootHash, epoch) - handleLoggingWhenError("could not set lastSnapshot", err, "rootHash", rootHash) - }() + if sm.processingMode != common.ImportDb { + go sm.lastSnapshotMarker.AddMarker(trieStorageManager, epoch, rootHash) + } if sm.isSnapshotInProgress.IsSet() { return nil, true @@ -239,16 +234,18 @@ func (sm *snapshotsManager) snapshotState( trieStorageManager common.StorageManager, stats *snapshotStatistics, ) { - err := sm.waitForStorageEpochChange(storageEpochChangeWaitArgs{ - TrieStorageManager: trieStorageManager, - Epoch: epoch, - WaitTimeForSnapshotEpochCheck: waitTimeForSnapshotEpochCheck, - SnapshotWaitTimeout: snapshotWaitTimeout, - }) - if err != nil { - log.Error("error waiting for storage epoch change", "err", err) - sm.earlySnapshotCompletion(stats, trieStorageManager) - return + if sm.processingMode != common.ImportDb { + err := storageEpochChange.WaitForStorageEpochChange(storageEpochChange.StorageEpochChangeWaitArgs{ + TrieStorageManager: trieStorageManager, + Epoch: epoch, + WaitTimeForSnapshotEpochCheck: storageEpochChange.WaitTimeForSnapshotEpochCheck, + SnapshotWaitTimeout: storageEpochChange.SnapshotWaitTimeout, + }) + if err != nil { + log.Error("error waiting for storage epoch change", "err", err) + sm.earlySnapshotCompletion(stats, trieStorageManager) + return + } } if !trieStorageManager.ShouldTakeSnapshot() { @@ -286,46 +283,6 @@ func (sm *snapshotsManager) earlySnapshotCompletion(stats *snapshotStatistics, t trieStorageManager.ExitPruningBufferingMode() } -func (sm *snapshotsManager) waitForStorageEpochChange(args storageEpochChangeWaitArgs) error { - if sm.processingMode == common.ImportDb { - log.Debug("no need to wait for storage epoch change as the node is running in import-db mode") - return nil - } - - if args.SnapshotWaitTimeout < args.WaitTimeForSnapshotEpochCheck { - return fmt.Errorf("timeout (%s) must be greater than wait time between snapshot epoch check (%s)", args.SnapshotWaitTimeout, args.WaitTimeForSnapshotEpochCheck) - } - - ctx, cancel := context.WithTimeout(context.Background(), args.SnapshotWaitTimeout) - defer cancel() - - timer := time.NewTimer(args.WaitTimeForSnapshotEpochCheck) - defer timer.Stop() - - for { - timer.Reset(args.WaitTimeForSnapshotEpochCheck) - - if args.TrieStorageManager.IsClosed() { - return core.ErrContextClosing - } - - latestStorageEpoch, err := args.TrieStorageManager.GetLatestStorageEpoch() - if err != nil { - return err - } - - if latestStorageEpoch == args.Epoch { - return nil - } - - select { - case <-timer.C: - case <-ctx.Done(): - return fmt.Errorf("timeout waiting for storage epoch change, snapshot epoch %d", args.Epoch) - } - } -} - func (sm *snapshotsManager) snapshotUserAccountDataTrie( isSnapshot bool, mainTrieRootHash []byte, @@ -418,8 +375,7 @@ func (sm *snapshotsManager) processSnapshotCompletion( return } - err := trieStorageManager.RemoveFromAllActiveEpochs([]byte(lastSnapshot)) - handleLoggingWhenError("could not remove lastSnapshot", err, "rootHash", rootHash) + sm.lastSnapshotMarker.RemoveMarker(trieStorageManager, epoch, rootHash) log.Debug("set activeDB in epoch", "epoch", epoch) errPut := trieStorageManager.PutInEpochWithoutCache([]byte(common.ActiveDBKey), []byte(common.ActiveDBVal), epoch) diff --git a/state/snapshotsManager_test.go b/state/snapshotsManager_test.go index 70c2423ce51..b48fd2b1a95 100644 --- a/state/snapshotsManager_test.go +++ b/state/snapshotsManager_test.go @@ -6,12 +6,12 @@ import ( "testing" "time" - "github.com/multiversx/mx-chain-core-go/core" "github.com/multiversx/mx-chain-core-go/core/atomic" "github.com/multiversx/mx-chain-go/common" "github.com/multiversx/mx-chain-go/process/mock" "github.com/multiversx/mx-chain-go/state" "github.com/multiversx/mx-chain-go/state/iteratorChannelsProvider" + "github.com/multiversx/mx-chain-go/state/lastSnapshotMarker" "github.com/multiversx/mx-chain-go/testscommon" "github.com/multiversx/mx-chain-go/testscommon/marshallerMock" stateTest "github.com/multiversx/mx-chain-go/testscommon/state" @@ -29,6 +29,7 @@ func getDefaultSnapshotManagerArgs() state.ArgsNewSnapshotsManager { StateMetrics: &stateTest.StateMetricsStub{}, AccountFactory: &stateTest.AccountsFactoryStub{}, ChannelsProvider: iteratorChannelsProvider.NewUserStateIteratorChannelsProvider(), + LastSnapshotMarker: lastSnapshotMarker.NewLastSnapshotMarker(), } } @@ -95,6 +96,16 @@ func TestNewSnapshotsManager(t *testing.T) { assert.Nil(t, sm) assert.Equal(t, state.ErrNilAccountFactory, err) }) + t.Run("nil last snapshot marker", func(t *testing.T) { + t.Parallel() + + args := getDefaultSnapshotManagerArgs() + args.LastSnapshotMarker = nil + + sm, err := state.NewSnapshotsManager(args) + assert.Nil(t, sm) + assert.Equal(t, state.ErrNilLastSnapshotMarker, err) + }) t.Run("ok", func(t *testing.T) { t.Parallel() @@ -260,7 +271,7 @@ func TestSnapshotsManager_SnapshotState(t *testing.T) { t.Run("should not start snapshot if another snapshot is in progress, lastSnapshot should be saved", func(t *testing.T) { t.Parallel() - putInEpochCalled := false + putInEpochCalled := atomic.Flag{} args := getDefaultSnapshotManagerArgs() args.StateMetrics = &stateTest.StateMetricsStub{ @@ -276,28 +287,33 @@ func TestSnapshotsManager_SnapshotState(t *testing.T) { assert.Equal(t, []byte("lastSnapshot"), key) assert.Equal(t, rootHash, val) assert.Equal(t, epoch, e) - putInEpochCalled = true + putInEpochCalled.SetValue(true) return nil }, EnterPruningBufferingModeCalled: func() { assert.Fail(t, "the func should have returned before this is called") }, + GetLatestStorageEpochCalled: func() (uint32, error) { + return epoch, nil + }, } sm.SnapshotState(rootHash, epoch, tsm) - assert.True(t, putInEpochCalled) + for !putInEpochCalled.IsSet() { + time.Sleep(10 * time.Millisecond) + } }) t.Run("starting snapshot sets some parameters", func(t *testing.T) { t.Parallel() - putInEpochCalled := false + putInEpochCalled := atomic.Flag{} enterPruningBufferingModeCalled := false - getSnapshotMessageCalled := false + getSnapshotMessageCalled := atomic.Flag{} args := getDefaultSnapshotManagerArgs() args.StateMetrics = &stateTest.StateMetricsStub{ GetSnapshotMessageCalled: func() string { - getSnapshotMessageCalled = true + getSnapshotMessageCalled.SetValue(true) return "" }, } @@ -307,17 +323,23 @@ func TestSnapshotsManager_SnapshotState(t *testing.T) { assert.Equal(t, []byte("lastSnapshot"), key) assert.Equal(t, rootHash, val) assert.Equal(t, epoch, e) - putInEpochCalled = true + putInEpochCalled.SetValue(true) return nil }, EnterPruningBufferingModeCalled: func() { enterPruningBufferingModeCalled = true + for !putInEpochCalled.IsSet() { + time.Sleep(10 * time.Millisecond) + } + }, + GetLatestStorageEpochCalled: func() (uint32, error) { + return epoch, nil }, } sm.SnapshotState(rootHash, epoch, tsm) - assert.True(t, getSnapshotMessageCalled) - assert.True(t, putInEpochCalled) + assert.True(t, getSnapshotMessageCalled.IsSet()) + assert.True(t, putInEpochCalled.IsSet()) assert.True(t, enterPruningBufferingModeCalled) assert.True(t, sm.IsSnapshotInProgress()) @@ -329,15 +351,17 @@ func TestSnapshotsManager_SnapshotState(t *testing.T) { t.Parallel() expectedErr := errors.New("some error") - getLatestStorageEpochCalled := false + getLatestStorageEpochCalled := atomic.Flag{} sm, _ := state.NewSnapshotsManager(getDefaultSnapshotManagerArgs()) enterPruningBufferingModeCalled := atomic.Flag{} exitPruningBufferingModeCalled := atomic.Flag{} tsm := &storageManager.StorageManagerStub{ GetLatestStorageEpochCalled: func() (uint32, error) { - getLatestStorageEpochCalled = true - assert.True(t, sm.IsSnapshotInProgress()) + for !sm.IsSnapshotInProgress() { + time.Sleep(10 * time.Millisecond) + } + getLatestStorageEpochCalled.SetValue(true) return 0, expectedErr }, ShouldTakeSnapshotCalled: func() bool { @@ -357,7 +381,7 @@ func TestSnapshotsManager_SnapshotState(t *testing.T) { time.Sleep(10 * time.Millisecond) } - assert.True(t, getLatestStorageEpochCalled) + assert.True(t, getLatestStorageEpochCalled.IsSet()) assert.True(t, enterPruningBufferingModeCalled.IsSet()) assert.True(t, exitPruningBufferingModeCalled.IsSet()) }) @@ -471,99 +495,3 @@ func TestSnapshotsManager_SnapshotState(t *testing.T) { assert.True(t, removeFromAllActiveEpochsCalled) }) } - -func TestSnapshotsManager_WaitForStorageEpochChange(t *testing.T) { - t.Parallel() - - t.Run("invalid args", func(t *testing.T) { - t.Parallel() - - args := state.GetStorageEpochChangeWaitArgs() - args.SnapshotWaitTimeout = time.Millisecond - - sm, _ := state.NewSnapshotsManager(getDefaultSnapshotManagerArgs()) - err := sm.WaitForStorageEpochChange(args) - assert.Error(t, err) - }) - t.Run("getLatestStorageEpoch error", func(t *testing.T) { - t.Parallel() - - expectedError := errors.New("getLatestStorageEpoch error") - - args := state.GetStorageEpochChangeWaitArgs() - args.TrieStorageManager = &storageManager.StorageManagerStub{ - GetLatestStorageEpochCalled: func() (uint32, error) { - return 0, expectedError - }, - } - sm, _ := state.NewSnapshotsManager(getDefaultSnapshotManagerArgs()) - - err := sm.WaitForStorageEpochChange(args) - assert.Equal(t, expectedError, err) - }) - t.Run("storage manager closed error", func(t *testing.T) { - t.Parallel() - - args := state.GetStorageEpochChangeWaitArgs() - args.TrieStorageManager = &storageManager.StorageManagerStub{ - GetLatestStorageEpochCalled: func() (uint32, error) { - return 0, nil - }, - IsClosedCalled: func() bool { - return true - }, - } - sm, _ := state.NewSnapshotsManager(getDefaultSnapshotManagerArgs()) - - err := sm.WaitForStorageEpochChange(args) - assert.Equal(t, core.ErrContextClosing, err) - }) - t.Run("storage epoch change timeout", func(t *testing.T) { - t.Parallel() - - args := state.GetStorageEpochChangeWaitArgs() - args.WaitTimeForSnapshotEpochCheck = time.Millisecond - args.SnapshotWaitTimeout = time.Millisecond * 5 - args.TrieStorageManager = &storageManager.StorageManagerStub{ - GetLatestStorageEpochCalled: func() (uint32, error) { - return 0, nil - }, - } - sm, _ := state.NewSnapshotsManager(getDefaultSnapshotManagerArgs()) - - err := sm.WaitForStorageEpochChange(args) - assert.Error(t, err) - }) - t.Run("is in import-db mode should not return error on timeout condition", func(t *testing.T) { - t.Parallel() - - args := state.GetStorageEpochChangeWaitArgs() - args.WaitTimeForSnapshotEpochCheck = time.Millisecond - args.SnapshotWaitTimeout = time.Millisecond * 5 - args.TrieStorageManager = &storageManager.StorageManagerStub{ - GetLatestStorageEpochCalled: func() (uint32, error) { - return 0, nil - }, - } - argsSnapshotManager := getDefaultSnapshotManagerArgs() - argsSnapshotManager.ProcessingMode = common.ImportDb - sm, _ := state.NewSnapshotsManager(argsSnapshotManager) - - err := sm.WaitForStorageEpochChange(args) - assert.Nil(t, err) - }) - t.Run("returns when latestStorageEpoch == snapshotEpoch", func(t *testing.T) { - t.Parallel() - - args := state.GetStorageEpochChangeWaitArgs() - args.TrieStorageManager = &storageManager.StorageManagerStub{ - GetLatestStorageEpochCalled: func() (uint32, error) { - return 1, nil - }, - } - sm, _ := state.NewSnapshotsManager(getDefaultSnapshotManagerArgs()) - - err := sm.WaitForStorageEpochChange(args) - assert.Nil(t, err) - }) -} diff --git a/state/trackableDataTrie/trackableDataTrie.go b/state/trackableDataTrie/trackableDataTrie.go index 3115f662a2e..e7c874e7dbf 100644 --- a/state/trackableDataTrie/trackableDataTrie.go +++ b/state/trackableDataTrie/trackableDataTrie.go @@ -131,6 +131,7 @@ func (tdt *trackableDataTrie) MigrateDataTrieLeaves(args vmcommon.ArgsMigrateDat } dataToBeMigrated := args.TrieMigrator.GetLeavesToBeMigrated() + log.Debug("num leaves to be migrated", "num", len(dataToBeMigrated), "account", tdt.identifier) for _, leafData := range dataToBeMigrated { dataEntry := dirtyData{ value: leafData.Value, diff --git a/storage/storageEpochChange/storageEpochChange.go b/storage/storageEpochChange/storageEpochChange.go new file mode 100644 index 00000000000..9c6857706d8 --- /dev/null +++ b/storage/storageEpochChange/storageEpochChange.go @@ -0,0 +1,67 @@ +package storageEpochChange + +import ( + "context" + "fmt" + "time" + + "github.com/multiversx/mx-chain-core-go/core" + "github.com/multiversx/mx-chain-go/common" + logger "github.com/multiversx/mx-chain-logger-go" +) + +var log = logger.GetOrCreate("storage/storageEpochChange") + +const ( + // WaitTimeForSnapshotEpochCheck is the time to wait before checking the storage epoch + WaitTimeForSnapshotEpochCheck = time.Millisecond * 100 + + // SnapshotWaitTimeout is the timeout for waiting for the storage epoch to change + SnapshotWaitTimeout = time.Minute * 3 +) + +// StorageEpochChangeWaitArgs are the args needed for calling the WaitForStorageEpochChange function +type StorageEpochChangeWaitArgs struct { + TrieStorageManager common.StorageManager + Epoch uint32 + WaitTimeForSnapshotEpochCheck time.Duration + SnapshotWaitTimeout time.Duration +} + +// WaitForStorageEpochChange waits for the storage epoch to change to the given epoch +func WaitForStorageEpochChange(args StorageEpochChangeWaitArgs) error { + log.Debug("waiting for storage epoch change", "epoch", args.Epoch, "wait timeout", args.SnapshotWaitTimeout) + + if args.SnapshotWaitTimeout < args.WaitTimeForSnapshotEpochCheck { + return fmt.Errorf("timeout (%s) must be greater than wait time between snapshot epoch check (%s)", args.SnapshotWaitTimeout, args.WaitTimeForSnapshotEpochCheck) + } + + ctx, cancel := context.WithTimeout(context.Background(), args.SnapshotWaitTimeout) + defer cancel() + + timer := time.NewTimer(args.WaitTimeForSnapshotEpochCheck) + defer timer.Stop() + + for { + timer.Reset(args.WaitTimeForSnapshotEpochCheck) + + if args.TrieStorageManager.IsClosed() { + return core.ErrContextClosing + } + + latestStorageEpoch, err := args.TrieStorageManager.GetLatestStorageEpoch() + if err != nil { + return err + } + + if latestStorageEpoch == args.Epoch { + return nil + } + + select { + case <-timer.C: + case <-ctx.Done(): + return fmt.Errorf("timeout waiting for storage epoch change, snapshot epoch %d", args.Epoch) + } + } +} diff --git a/storage/storageEpochChange/storageEpochChange_test.go b/storage/storageEpochChange/storageEpochChange_test.go new file mode 100644 index 00000000000..8146c49b8ef --- /dev/null +++ b/storage/storageEpochChange/storageEpochChange_test.go @@ -0,0 +1,93 @@ +package storageEpochChange + +import ( + "errors" + "testing" + "time" + + "github.com/multiversx/mx-chain-core-go/core" + "github.com/multiversx/mx-chain-go/testscommon/storageManager" + "github.com/stretchr/testify/assert" +) + +func getDefaultArgs() StorageEpochChangeWaitArgs { + return StorageEpochChangeWaitArgs{ + Epoch: 1, + WaitTimeForSnapshotEpochCheck: time.Millisecond * 100, + SnapshotWaitTimeout: time.Second, + TrieStorageManager: &storageManager.StorageManagerStub{}, + } +} + +func TestSnapshotsManager_WaitForStorageEpochChange(t *testing.T) { + t.Parallel() + + t.Run("invalid args", func(t *testing.T) { + t.Parallel() + + args := getDefaultArgs() + args.SnapshotWaitTimeout = time.Millisecond + + err := WaitForStorageEpochChange(args) + assert.Error(t, err) + }) + t.Run("getLatestStorageEpoch error", func(t *testing.T) { + t.Parallel() + + expectedError := errors.New("getLatestStorageEpoch error") + + args := getDefaultArgs() + args.TrieStorageManager = &storageManager.StorageManagerStub{ + GetLatestStorageEpochCalled: func() (uint32, error) { + return 0, expectedError + }, + } + + err := WaitForStorageEpochChange(args) + assert.Equal(t, expectedError, err) + }) + t.Run("storage manager closed error", func(t *testing.T) { + t.Parallel() + + args := getDefaultArgs() + args.TrieStorageManager = &storageManager.StorageManagerStub{ + GetLatestStorageEpochCalled: func() (uint32, error) { + return 0, nil + }, + IsClosedCalled: func() bool { + return true + }, + } + + err := WaitForStorageEpochChange(args) + assert.Equal(t, core.ErrContextClosing, err) + }) + t.Run("storage epoch change timeout", func(t *testing.T) { + t.Parallel() + + args := getDefaultArgs() + args.WaitTimeForSnapshotEpochCheck = time.Millisecond + args.SnapshotWaitTimeout = time.Millisecond * 5 + args.TrieStorageManager = &storageManager.StorageManagerStub{ + GetLatestStorageEpochCalled: func() (uint32, error) { + return 0, nil + }, + } + + err := WaitForStorageEpochChange(args) + assert.Error(t, err) + }) + t.Run("returns when latestStorageEpoch == snapshotEpoch", func(t *testing.T) { + t.Parallel() + + args := getDefaultArgs() + args.TrieStorageManager = &storageManager.StorageManagerStub{ + GetLatestStorageEpochCalled: func() (uint32, error) { + return 1, nil + }, + } + + err := WaitForStorageEpochChange(args) + assert.Nil(t, err) + }) +}