Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use goroutines manager for trie commit #6611

Merged
merged 14 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions common/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,9 @@ type Trie interface {
Update(key, value []byte) error
Delete(key []byte)
RootHash() ([]byte, error)
Commit() error
Commit(collector TrieHashesCollector) error
Recreate(root []byte) (Trie, error)
RecreateFromEpoch(options RootHashHolder) (Trie, error)
String() string
GetObsoleteHashes() [][]byte
GetDirtyHashes() (ModifiedHashes, error)
GetOldRoot() []byte
GetSerializedNodes([]byte, uint64) ([][]byte, uint64, error)
GetSerializedNode([]byte) ([]byte, error)
GetAllLeavesOnChannel(allLeavesChan *TrieIteratorChannels, ctx context.Context, rootHash []byte, keyBuilder KeyBuilder, trieLeafParser TrieLeafParser) error
Expand Down Expand Up @@ -410,3 +406,12 @@ type AtomicBytesSlice interface {
Append(data [][]byte)
Get() [][]byte
}

// TrieHashesCollector defines the methods needed for collecting trie hashes
type TrieHashesCollector interface {
AddDirtyHash(hash []byte)
GetDirtyHashes() ModifiedHashes
AddObsoleteHashes(oldRootHash []byte, oldHashes [][]byte)
GetCollectedData() ([]byte, ModifiedHashes, ModifiedHashes)
IsInterfaceNil() bool
}
3 changes: 2 additions & 1 deletion integrationTests/benchmarks/loadFromTrie_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/multiversx/mx-chain-go/common"
disabledStatistics "github.com/multiversx/mx-chain-go/common/statistics/disabled"
"github.com/multiversx/mx-chain-go/integrationTests"
"github.com/multiversx/mx-chain-go/state/hashesCollector"
"github.com/multiversx/mx-chain-go/storage"
"github.com/multiversx/mx-chain-go/storage/database"
"github.com/multiversx/mx-chain-go/storage/storageunit"
Expand Down Expand Up @@ -120,7 +121,7 @@ func insertKeysIntoTrie(t *testing.T, tr common.Trie, numTrieLevels int, numChil

_, depth, _ := tr.Get(key)
require.Equal(t, uint32(numTrieLevels), depth+1)
_ = tr.Commit()
_ = tr.Commit(hashesCollector.NewDisabledHashesCollector())
return key
}

Expand Down
3 changes: 2 additions & 1 deletion integrationTests/longTests/storage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/multiversx/mx-chain-core-go/hashing/blake2b"
"github.com/multiversx/mx-chain-core-go/marshal"
"github.com/multiversx/mx-chain-go/integrationTests"
"github.com/multiversx/mx-chain-go/state/hashesCollector"
"github.com/multiversx/mx-chain-go/testscommon/enableEpochsHandlerMock"
"github.com/multiversx/mx-chain-go/testscommon/storage"
"github.com/multiversx/mx-chain-go/trie"
Expand Down Expand Up @@ -130,7 +131,7 @@ func TestWriteContinuouslyInTree(t *testing.T) {
if i%written == 0 {
endTime := time.Now()
diff := endTime.Sub(startTime)
_ = tr.Commit()
_ = tr.Commit(hashesCollector.NewDisabledHashesCollector())
fmt.Printf("Written %d, total %d in %f s\n", written, i, diff.Seconds())
startTime = time.Now()
}
Expand Down
17 changes: 11 additions & 6 deletions integrationTests/state/genesisState/genesisState_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ type testPair struct {
const generate32ByteSlices = 0
const generate32HexByteSlices = 1

type trieWithToString interface {
common.Trie
ToString() string
}

func TestCreationOfTheGenesisState(t *testing.T) {
if testing.Short() {
t.Skip("this is not a short test")
Expand Down Expand Up @@ -85,7 +90,7 @@ func TestExtensionNodeToBranchEdgeCaseSet1(t *testing.T) {
_ = tr1.Update([]byte(key3), []byte(val))

fmt.Println()
strTr1 := tr1.String()
strTr1 := tr1.(trieWithToString).ToString()
fmt.Println(strTr1)

hash1, _ := tr1.RootHash()
Expand All @@ -98,7 +103,7 @@ func TestExtensionNodeToBranchEdgeCaseSet1(t *testing.T) {
fmt.Printf("root hash2: %s\n", base64.StdEncoding.EncodeToString(hash2))

fmt.Println()
strTr2 := tr2.String()
strTr2 := tr2.(trieWithToString).ToString()
fmt.Println(strTr2)

assert.Equal(t, hash1, hash2)
Expand Down Expand Up @@ -126,7 +131,7 @@ func TestExtensionNodeToBranchEdgeCaseSet2(t *testing.T) {
_ = tr1.Update([]byte(key4), []byte(val))

fmt.Println()
strTr1 := tr1.String()
strTr1 := tr1.(trieWithToString).ToString()
fmt.Println(strTr1)

hash1, _ := tr1.RootHash()
Expand All @@ -140,7 +145,7 @@ func TestExtensionNodeToBranchEdgeCaseSet2(t *testing.T) {
_ = tr2.Update([]byte(key6), []byte(val))

fmt.Println()
strTr2 := tr2.String()
strTr2 := tr2.(trieWithToString).ToString()
fmt.Println(strTr2)

hash2, _ := tr2.RootHash()
Expand Down Expand Up @@ -299,12 +304,12 @@ func printTestDebugLines(

fmt.Println()
fmt.Println("Reference trie:")
strRefTrie := referenceTrie.String()
strRefTrie := referenceTrie.(trieWithToString).ToString()
fmt.Println(strRefTrie)

fmt.Println()
fmt.Println("Actual trie:")
strTr := tr.String()
strTr := tr.(trieWithToString).ToString()
fmt.Println(strTr)
}

Expand Down
10 changes: 8 additions & 2 deletions integrationTests/state/stateTrie/stateTrie_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/multiversx/mx-chain-go/sharding"
"github.com/multiversx/mx-chain-go/state"
"github.com/multiversx/mx-chain-go/state/factory"
"github.com/multiversx/mx-chain-go/state/hashesCollector"
"github.com/multiversx/mx-chain-go/state/iteratorChannelsProvider"
"github.com/multiversx/mx-chain-go/state/lastSnapshotMarker"
"github.com/multiversx/mx-chain-go/state/storagePruningManager"
Expand Down Expand Up @@ -279,7 +280,7 @@ func TestTrieDB_RecreateFromStorageShouldWork(t *testing.T) {

_ = tr1.Update(key, value)
h1, _ := tr1.RootHash()
err := tr1.Commit()
err := tr1.Commit(hashesCollector.NewDisabledHashesCollector())
require.Nil(t, err)

tr2, err := tr1.Recreate(h1)
Expand Down Expand Up @@ -961,6 +962,11 @@ func TestAccountsDB_ExecALotOfBalanceTxOKorNOK(t *testing.T) {
integrationTests.PrintShardAccount(acntDest.(state.UserAccountHandler), "Destination")
}

type trieWithToString interface {
common.Trie
ToString() string
}

func BenchmarkCreateOneMillionAccountsWithMockDB(b *testing.B) {
nrOfAccounts := 1000000
balance := 1500000
Expand Down Expand Up @@ -994,7 +1000,7 @@ func BenchmarkCreateOneMillionAccountsWithMockDB(b *testing.B) {
core.ConvertBytes(rtm.Sys),
)

_ = tr.String()
_ = tr.(trieWithToString).ToString()
}

func BenchmarkCreateOneMillionAccounts(b *testing.B) {
Expand Down
7 changes: 4 additions & 3 deletions integrationTests/state/stateTrieClose/stateTrieClose_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/multiversx/mx-chain-go/common"
"github.com/multiversx/mx-chain-go/common/errChan"
"github.com/multiversx/mx-chain-go/integrationTests"
"github.com/multiversx/mx-chain-go/state/hashesCollector"
"github.com/multiversx/mx-chain-go/state/parsers"
"github.com/multiversx/mx-chain-go/testscommon/enableEpochsHandlerMock"
"github.com/multiversx/mx-chain-go/testscommon/goroutines"
Expand All @@ -28,7 +29,7 @@ func TestPatriciaMerkleTrie_Close(t *testing.T) {
for i := 0; i < numLeavesToAdd; i++ {
_ = tr.Update([]byte(strconv.Itoa(i)), []byte(strconv.Itoa(i)))
}
_ = tr.Commit()
_ = tr.Commit(hashesCollector.NewDisabledHashesCollector())
time.Sleep(time.Second * 2) // allow the commit go routines to finish completely as to not alter the further counters

gc := goroutines.NewGoCounter(goroutines.TestsRelevantGoRoutines)
Expand Down Expand Up @@ -70,7 +71,7 @@ func TestPatriciaMerkleTrie_Close(t *testing.T) {
assert.Nil(t, err)

_ = tr.Update([]byte("god"), []byte("puppy"))
_ = tr.Commit()
_ = tr.Commit(hashesCollector.NewDisabledHashesCollector())

rootHash, _ = tr.RootHash()
leavesChannel1 = &common.TrieIteratorChannels{
Expand All @@ -91,7 +92,7 @@ func TestPatriciaMerkleTrie_Close(t *testing.T) {
assert.Nil(t, err)

_ = tr.Update([]byte("eggod"), []byte("cat"))
_ = tr.Commit()
_ = tr.Commit(hashesCollector.NewDisabledHashesCollector())

rootHash, _ = tr.RootHash()
leavesChannel2 := &common.TrieIteratorChannels{
Expand Down
5 changes: 3 additions & 2 deletions integrationTests/state/stateTrieSync/stateTrieSync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/multiversx/mx-chain-go/integrationTests"
"github.com/multiversx/mx-chain-go/process/factory"
"github.com/multiversx/mx-chain-go/state"
"github.com/multiversx/mx-chain-go/state/hashesCollector"
"github.com/multiversx/mx-chain-go/state/parsers"
"github.com/multiversx/mx-chain-go/state/syncer"
"github.com/multiversx/mx-chain-go/storage"
Expand Down Expand Up @@ -103,7 +104,7 @@ func testNodeRequestInterceptTrieNodesWithMessenger(t *testing.T, version int) {
_ = resolverTrie.Update([]byte(strconv.Itoa(i)), []byte(strconv.Itoa(i)))
}

_ = resolverTrie.Commit()
_ = resolverTrie.Commit(hashesCollector.NewDisabledHashesCollector())
rootHash, _ := resolverTrie.RootHash()

numLeaves := getNumLeaves(t, resolverTrie, rootHash)
Expand Down Expand Up @@ -224,7 +225,7 @@ func testNodeRequestInterceptTrieNodesWithMessengerNotSyncingShouldErr(t *testin
_ = resolverTrie.Update([]byte(strconv.Itoa(i)), []byte(strconv.Itoa(i)))
}

_ = resolverTrie.Commit()
_ = resolverTrie.Commit(hashesCollector.NewDisabledHashesCollector())
rootHash, _ := resolverTrie.RootHash()

numLeaves := getNumLeaves(t, resolverTrie, rootHash)
Expand Down
5 changes: 3 additions & 2 deletions integrationTests/vm/txsFee/migrateDataTrie_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/multiversx/mx-chain-go/integrationTests/vm"
"github.com/multiversx/mx-chain-go/sharding"
"github.com/multiversx/mx-chain-go/state"
"github.com/multiversx/mx-chain-go/state/hashesCollector"
vmcommon "github.com/multiversx/mx-chain-vm-common-go"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -110,7 +111,7 @@ func TestMigrateDataTrieBuiltInFunc(t *testing.T) {
migrateDataTrie(t, testContext, sndAddr, gasPrice, gasLimit, vmcommon.Ok)
testGasConsumed(t, testContext, gasLimit, 20000)

err = dtr.Commit()
err = dtr.Commit(hashesCollector.NewDisabledHashesCollector())
require.Nil(t, err)

rootHash, err = dtr.RootHash()
Expand Down Expand Up @@ -162,7 +163,7 @@ func TestMigrateDataTrieBuiltInFunc(t *testing.T) {
migrateDataTrie(t, testContext, sndAddr, gasPrice, gasLimit, vmcommon.Ok)
numMigrateDataTrieCalls++

err = dtr.Commit()
err = dtr.Commit(hashesCollector.NewDisabledHashesCollector())
require.Nil(t, err)

rootHash, err = dtr.RootHash()
Expand Down
49 changes: 18 additions & 31 deletions state/accountsDB.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/multiversx/mx-chain-go/common"
"github.com/multiversx/mx-chain-go/common/errChan"
"github.com/multiversx/mx-chain-go/common/holders"
"github.com/multiversx/mx-chain-go/state/hashesCollector"
"github.com/multiversx/mx-chain-go/state/parsers"
"github.com/multiversx/mx-chain-go/trie/keyBuilder"
"github.com/multiversx/mx-chain-go/trie/statistics"
Expand Down Expand Up @@ -779,22 +780,30 @@ func (adb *AccountsDB) commit() ([]byte, error) {
log.Trace("accountsDB.Commit started")
adb.entries = make([]JournalEntry, 0)

oldHashes := make(common.ModifiedHashes)
newHashes := make(common.ModifiedHashes)
hc := hashesCollector.NewDisabledHashesCollector()
if adb.mainTrie.GetStorageManager().IsPruningEnabled() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make a separate function for this 3 lines -> and could we have without NEW at every single commit? like you could have a cleanup function here -> might be faster.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for this if you could have a factory - which creates the good structure by default.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a cleanup function. There is no need for a factory after the changes since we create the component on the constructor.

hc = hashesCollector.NewDataTrieHashesCollector()
}

// Step 1. commit all data tries
dataTries := adb.dataTries.GetAll()
for i := 0; i < len(dataTries); i++ {
err := adb.commitTrie(dataTries[i], oldHashes, newHashes)
err := dataTries[i].Commit(hc)
if err != nil {
return nil, err
}
}
adb.dataTries.Reset()

oldRoot := adb.mainTrie.GetOldRoot()

// Step 2. commit main trie
err := adb.commitTrie(adb.mainTrie, oldHashes, newHashes)
if adb.mainTrie.GetStorageManager().IsPruningEnabled() {
wrappedHc, err := hashesCollector.NewHashesCollector(hc)
if err != nil {
return nil, err
}
hc = wrappedHc
}
err := adb.mainTrie.Commit(hc)
if err != nil {
return nil, err
}
Expand All @@ -805,7 +814,7 @@ func (adb *AccountsDB) commit() ([]byte, error) {
return nil, err
}

err = adb.markForEviction(oldRoot, newRoot, oldHashes, newHashes)
err = adb.markForEviction(newRoot, hc)
if err != nil {
return nil, err
}
Expand All @@ -820,36 +829,14 @@ func (adb *AccountsDB) commit() ([]byte, error) {
}

func (adb *AccountsDB) markForEviction(
oldRoot []byte,
newRoot []byte,
oldHashes common.ModifiedHashes,
newHashes common.ModifiedHashes,
collector common.TrieHashesCollector,
) error {
if !adb.mainTrie.GetStorageManager().IsPruningEnabled() {
return nil
}

return adb.storagePruningManager.MarkForEviction(oldRoot, newRoot, oldHashes, newHashes)
}

func (adb *AccountsDB) commitTrie(tr common.Trie, oldHashes common.ModifiedHashes, newHashes common.ModifiedHashes) error {
if adb.mainTrie.GetStorageManager().IsPruningEnabled() {
oldTrieHashes := tr.GetObsoleteHashes()
newTrieHashes, err := tr.GetDirtyHashes()
if err != nil {
return err
}

for _, hash := range oldTrieHashes {
oldHashes[string(hash)] = struct{}{}
}

for hash := range newTrieHashes {
newHashes[hash] = struct{}{}
}
}

return tr.Commit()
return adb.storagePruningManager.MarkForEviction(newRoot, collector)
}

// RootHash returns the main trie's root hash
Expand Down
Loading
Loading