Skip to content

Commit

Permalink
Merge pull request #6611 from multiversx/refactor-trie-commit
Browse files Browse the repository at this point in the history
use goroutines manager for trie commit
  • Loading branch information
BeniaminDrasovean authored Jan 14, 2025
2 parents 25ac93f + d17c44c commit 3e43c9c
Show file tree
Hide file tree
Showing 45 changed files with 850 additions and 644 deletions.
16 changes: 11 additions & 5 deletions common/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,9 @@ type Trie interface {
Update(key, value []byte) error
Delete(key []byte)
RootHash() ([]byte, error)
Commit() error
Commit(collector TrieHashesCollector) error
Recreate(root []byte) (Trie, error)
RecreateFromEpoch(options RootHashHolder) (Trie, error)
String() string
GetObsoleteHashes() [][]byte
GetDirtyHashes() (ModifiedHashes, error)
GetOldRoot() []byte
GetSerializedNodes([]byte, uint64) ([][]byte, uint64, error)
GetSerializedNode([]byte) ([]byte, error)
GetAllLeavesOnChannel(allLeavesChan *TrieIteratorChannels, ctx context.Context, rootHash []byte, keyBuilder KeyBuilder, trieLeafParser TrieLeafParser) error
Expand Down Expand Up @@ -410,3 +406,13 @@ type AtomicBytesSlice interface {
Append(data [][]byte)
Get() [][]byte
}

// TrieHashesCollector defines the methods needed for collecting trie hashes
type TrieHashesCollector interface {
AddDirtyHash(hash []byte)
GetDirtyHashes() ModifiedHashes
AddObsoleteHashes(oldRootHash []byte, oldHashes [][]byte)
GetCollectedData() ([]byte, ModifiedHashes, ModifiedHashes)
Clean()
IsInterfaceNil() bool
}
3 changes: 2 additions & 1 deletion integrationTests/benchmarks/loadFromTrie_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/multiversx/mx-chain-go/common"
disabledStatistics "github.com/multiversx/mx-chain-go/common/statistics/disabled"
"github.com/multiversx/mx-chain-go/integrationTests"
"github.com/multiversx/mx-chain-go/state/hashesCollector"
"github.com/multiversx/mx-chain-go/storage"
"github.com/multiversx/mx-chain-go/storage/database"
"github.com/multiversx/mx-chain-go/storage/storageunit"
Expand Down Expand Up @@ -120,7 +121,7 @@ func insertKeysIntoTrie(t *testing.T, tr common.Trie, numTrieLevels int, numChil

_, depth, _ := tr.Get(key)
require.Equal(t, uint32(numTrieLevels), depth+1)
_ = tr.Commit()
_ = tr.Commit(hashesCollector.NewDisabledHashesCollector())
return key
}

Expand Down
3 changes: 2 additions & 1 deletion integrationTests/longTests/storage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/multiversx/mx-chain-core-go/hashing/blake2b"
"github.com/multiversx/mx-chain-core-go/marshal"
"github.com/multiversx/mx-chain-go/integrationTests"
"github.com/multiversx/mx-chain-go/state/hashesCollector"
"github.com/multiversx/mx-chain-go/testscommon/enableEpochsHandlerMock"
"github.com/multiversx/mx-chain-go/testscommon/storage"
"github.com/multiversx/mx-chain-go/trie"
Expand Down Expand Up @@ -130,7 +131,7 @@ func TestWriteContinuouslyInTree(t *testing.T) {
if i%written == 0 {
endTime := time.Now()
diff := endTime.Sub(startTime)
_ = tr.Commit()
_ = tr.Commit(hashesCollector.NewDisabledHashesCollector())
fmt.Printf("Written %d, total %d in %f s\n", written, i, diff.Seconds())
startTime = time.Now()
}
Expand Down
17 changes: 11 additions & 6 deletions integrationTests/state/genesisState/genesisState_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ type testPair struct {
const generate32ByteSlices = 0
const generate32HexByteSlices = 1

type trieWithToString interface {
common.Trie
ToString() string
}

func TestCreationOfTheGenesisState(t *testing.T) {
if testing.Short() {
t.Skip("this is not a short test")
Expand Down Expand Up @@ -85,7 +90,7 @@ func TestExtensionNodeToBranchEdgeCaseSet1(t *testing.T) {
_ = tr1.Update([]byte(key3), []byte(val))

fmt.Println()
strTr1 := tr1.String()
strTr1 := tr1.(trieWithToString).ToString()
fmt.Println(strTr1)

hash1, _ := tr1.RootHash()
Expand All @@ -98,7 +103,7 @@ func TestExtensionNodeToBranchEdgeCaseSet1(t *testing.T) {
fmt.Printf("root hash2: %s\n", base64.StdEncoding.EncodeToString(hash2))

fmt.Println()
strTr2 := tr2.String()
strTr2 := tr2.(trieWithToString).ToString()
fmt.Println(strTr2)

assert.Equal(t, hash1, hash2)
Expand Down Expand Up @@ -126,7 +131,7 @@ func TestExtensionNodeToBranchEdgeCaseSet2(t *testing.T) {
_ = tr1.Update([]byte(key4), []byte(val))

fmt.Println()
strTr1 := tr1.String()
strTr1 := tr1.(trieWithToString).ToString()
fmt.Println(strTr1)

hash1, _ := tr1.RootHash()
Expand All @@ -140,7 +145,7 @@ func TestExtensionNodeToBranchEdgeCaseSet2(t *testing.T) {
_ = tr2.Update([]byte(key6), []byte(val))

fmt.Println()
strTr2 := tr2.String()
strTr2 := tr2.(trieWithToString).ToString()
fmt.Println(strTr2)

hash2, _ := tr2.RootHash()
Expand Down Expand Up @@ -299,12 +304,12 @@ func printTestDebugLines(

fmt.Println()
fmt.Println("Reference trie:")
strRefTrie := referenceTrie.String()
strRefTrie := referenceTrie.(trieWithToString).ToString()
fmt.Println(strRefTrie)

fmt.Println()
fmt.Println("Actual trie:")
strTr := tr.String()
strTr := tr.(trieWithToString).ToString()
fmt.Println(strTr)
}

Expand Down
10 changes: 8 additions & 2 deletions integrationTests/state/stateTrie/stateTrie_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/multiversx/mx-chain-go/sharding"
"github.com/multiversx/mx-chain-go/state"
"github.com/multiversx/mx-chain-go/state/factory"
"github.com/multiversx/mx-chain-go/state/hashesCollector"
"github.com/multiversx/mx-chain-go/state/iteratorChannelsProvider"
"github.com/multiversx/mx-chain-go/state/lastSnapshotMarker"
"github.com/multiversx/mx-chain-go/state/storagePruningManager"
Expand Down Expand Up @@ -279,7 +280,7 @@ func TestTrieDB_RecreateFromStorageShouldWork(t *testing.T) {

_ = tr1.Update(key, value)
h1, _ := tr1.RootHash()
err := tr1.Commit()
err := tr1.Commit(hashesCollector.NewDisabledHashesCollector())
require.Nil(t, err)

tr2, err := tr1.Recreate(h1)
Expand Down Expand Up @@ -961,6 +962,11 @@ func TestAccountsDB_ExecALotOfBalanceTxOKorNOK(t *testing.T) {
integrationTests.PrintShardAccount(acntDest.(state.UserAccountHandler), "Destination")
}

type trieWithToString interface {
common.Trie
ToString() string
}

func BenchmarkCreateOneMillionAccountsWithMockDB(b *testing.B) {
nrOfAccounts := 1000000
balance := 1500000
Expand Down Expand Up @@ -994,7 +1000,7 @@ func BenchmarkCreateOneMillionAccountsWithMockDB(b *testing.B) {
core.ConvertBytes(rtm.Sys),
)

_ = tr.String()
_ = tr.(trieWithToString).ToString()
}

func BenchmarkCreateOneMillionAccounts(b *testing.B) {
Expand Down
7 changes: 4 additions & 3 deletions integrationTests/state/stateTrieClose/stateTrieClose_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/multiversx/mx-chain-go/common"
"github.com/multiversx/mx-chain-go/common/errChan"
"github.com/multiversx/mx-chain-go/integrationTests"
"github.com/multiversx/mx-chain-go/state/hashesCollector"
"github.com/multiversx/mx-chain-go/state/parsers"
"github.com/multiversx/mx-chain-go/testscommon/enableEpochsHandlerMock"
"github.com/multiversx/mx-chain-go/testscommon/goroutines"
Expand All @@ -28,7 +29,7 @@ func TestPatriciaMerkleTrie_Close(t *testing.T) {
for i := 0; i < numLeavesToAdd; i++ {
_ = tr.Update([]byte(strconv.Itoa(i)), []byte(strconv.Itoa(i)))
}
_ = tr.Commit()
_ = tr.Commit(hashesCollector.NewDisabledHashesCollector())
time.Sleep(time.Second * 2) // allow the commit go routines to finish completely as to not alter the further counters

gc := goroutines.NewGoCounter(goroutines.TestsRelevantGoRoutines)
Expand Down Expand Up @@ -70,7 +71,7 @@ func TestPatriciaMerkleTrie_Close(t *testing.T) {
assert.Nil(t, err)

_ = tr.Update([]byte("god"), []byte("puppy"))
_ = tr.Commit()
_ = tr.Commit(hashesCollector.NewDisabledHashesCollector())

rootHash, _ = tr.RootHash()
leavesChannel1 = &common.TrieIteratorChannels{
Expand All @@ -91,7 +92,7 @@ func TestPatriciaMerkleTrie_Close(t *testing.T) {
assert.Nil(t, err)

_ = tr.Update([]byte("eggod"), []byte("cat"))
_ = tr.Commit()
_ = tr.Commit(hashesCollector.NewDisabledHashesCollector())

rootHash, _ = tr.RootHash()
leavesChannel2 := &common.TrieIteratorChannels{
Expand Down
5 changes: 3 additions & 2 deletions integrationTests/state/stateTrieSync/stateTrieSync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/multiversx/mx-chain-go/integrationTests"
"github.com/multiversx/mx-chain-go/process/factory"
"github.com/multiversx/mx-chain-go/state"
"github.com/multiversx/mx-chain-go/state/hashesCollector"
"github.com/multiversx/mx-chain-go/state/parsers"
"github.com/multiversx/mx-chain-go/state/syncer"
"github.com/multiversx/mx-chain-go/storage"
Expand Down Expand Up @@ -103,7 +104,7 @@ func testNodeRequestInterceptTrieNodesWithMessenger(t *testing.T, version int) {
_ = resolverTrie.Update([]byte(strconv.Itoa(i)), []byte(strconv.Itoa(i)))
}

_ = resolverTrie.Commit()
_ = resolverTrie.Commit(hashesCollector.NewDisabledHashesCollector())
rootHash, _ := resolverTrie.RootHash()

numLeaves := getNumLeaves(t, resolverTrie, rootHash)
Expand Down Expand Up @@ -224,7 +225,7 @@ func testNodeRequestInterceptTrieNodesWithMessengerNotSyncingShouldErr(t *testin
_ = resolverTrie.Update([]byte(strconv.Itoa(i)), []byte(strconv.Itoa(i)))
}

_ = resolverTrie.Commit()
_ = resolverTrie.Commit(hashesCollector.NewDisabledHashesCollector())
rootHash, _ := resolverTrie.RootHash()

numLeaves := getNumLeaves(t, resolverTrie, rootHash)
Expand Down
5 changes: 3 additions & 2 deletions integrationTests/vm/txsFee/migrateDataTrie_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/multiversx/mx-chain-go/integrationTests/vm"
"github.com/multiversx/mx-chain-go/sharding"
"github.com/multiversx/mx-chain-go/state"
"github.com/multiversx/mx-chain-go/state/hashesCollector"
vmcommon "github.com/multiversx/mx-chain-vm-common-go"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -110,7 +111,7 @@ func TestMigrateDataTrieBuiltInFunc(t *testing.T) {
migrateDataTrie(t, testContext, sndAddr, gasPrice, gasLimit, vmcommon.Ok)
testGasConsumed(t, testContext, gasLimit, 20000)

err = dtr.Commit()
err = dtr.Commit(hashesCollector.NewDisabledHashesCollector())
require.Nil(t, err)

rootHash, err = dtr.RootHash()
Expand Down Expand Up @@ -162,7 +163,7 @@ func TestMigrateDataTrieBuiltInFunc(t *testing.T) {
migrateDataTrie(t, testContext, sndAddr, gasPrice, gasLimit, vmcommon.Ok)
numMigrateDataTrieCalls++

err = dtr.Commit()
err = dtr.Commit(hashesCollector.NewDisabledHashesCollector())
require.Nil(t, err)

rootHash, err = dtr.RootHash()
Expand Down
59 changes: 25 additions & 34 deletions state/accountsDB.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/multiversx/mx-chain-go/common"
"github.com/multiversx/mx-chain-go/common/errChan"
"github.com/multiversx/mx-chain-go/common/holders"
"github.com/multiversx/mx-chain-go/state/hashesCollector"
"github.com/multiversx/mx-chain-go/state/parsers"
"github.com/multiversx/mx-chain-go/trie/keyBuilder"
"github.com/multiversx/mx-chain-go/trie/statistics"
Expand Down Expand Up @@ -79,9 +80,10 @@ type AccountsDB struct {
storagePruningManager StoragePruningManager
snapshotsManger SnapshotsManager

lastRootHash []byte
dataTries common.TriesHolder
entries []JournalEntry
lastRootHash []byte
dataTries common.TriesHolder
entries []JournalEntry
hashesCollector common.TrieHashesCollector

mutOp sync.RWMutex
loadCodeMeasurements *loadingMeasurements
Expand Down Expand Up @@ -128,9 +130,18 @@ func createAccountsDb(args ArgsAccountsDB) *AccountsDB {
},
addressConverter: args.AddressConverter,
snapshotsManger: args.SnapshotsManager,
hashesCollector: createTrieHashesCollector(args.Trie),
}
}

func createTrieHashesCollector(mainTrie common.Trie) common.TrieHashesCollector {
if mainTrie.GetStorageManager().IsPruningEnabled() {
return hashesCollector.NewDataTrieHashesCollector()
}

return hashesCollector.NewDisabledHashesCollector()
}

func checkArgsAccountsDB(args ArgsAccountsDB) error {
if check.IfNil(args.Trie) {
return ErrNilTrie
Expand Down Expand Up @@ -776,25 +787,27 @@ func (adb *AccountsDB) Commit() ([]byte, error) {
}

func (adb *AccountsDB) commit() ([]byte, error) {
defer adb.hashesCollector.Clean()
log.Trace("accountsDB.Commit started")
adb.entries = make([]JournalEntry, 0)

oldHashes := make(common.ModifiedHashes)
newHashes := make(common.ModifiedHashes)
// Step 1. commit all data tries
dataTries := adb.dataTries.GetAll()
for i := 0; i < len(dataTries); i++ {
err := adb.commitTrie(dataTries[i], oldHashes, newHashes)
err := dataTries[i].Commit(adb.hashesCollector)
if err != nil {
return nil, err
}
}
adb.dataTries.Reset()

oldRoot := adb.mainTrie.GetOldRoot()

// Step 2. commit main trie
err := adb.commitTrie(adb.mainTrie, oldHashes, newHashes)
hc, err := hashesCollector.NewHashesCollector(adb.hashesCollector)
if err != nil {
return nil, err
}

err = adb.mainTrie.Commit(hc)
if err != nil {
return nil, err
}
Expand All @@ -805,7 +818,7 @@ func (adb *AccountsDB) commit() ([]byte, error) {
return nil, err
}

err = adb.markForEviction(oldRoot, newRoot, oldHashes, newHashes)
err = adb.markForEviction(newRoot, hc)
if err != nil {
return nil, err
}
Expand All @@ -820,36 +833,14 @@ func (adb *AccountsDB) commit() ([]byte, error) {
}

func (adb *AccountsDB) markForEviction(
oldRoot []byte,
newRoot []byte,
oldHashes common.ModifiedHashes,
newHashes common.ModifiedHashes,
collector common.TrieHashesCollector,
) error {
if !adb.mainTrie.GetStorageManager().IsPruningEnabled() {
return nil
}

return adb.storagePruningManager.MarkForEviction(oldRoot, newRoot, oldHashes, newHashes)
}

func (adb *AccountsDB) commitTrie(tr common.Trie, oldHashes common.ModifiedHashes, newHashes common.ModifiedHashes) error {
if adb.mainTrie.GetStorageManager().IsPruningEnabled() {
oldTrieHashes := tr.GetObsoleteHashes()
newTrieHashes, err := tr.GetDirtyHashes()
if err != nil {
return err
}

for _, hash := range oldTrieHashes {
oldHashes[string(hash)] = struct{}{}
}

for hash := range newTrieHashes {
newHashes[hash] = struct{}{}
}
}

return tr.Commit()
return adb.storagePruningManager.MarkForEviction(newRoot, collector)
}

// RootHash returns the main trie's root hash
Expand Down
Loading

0 comments on commit 3e43c9c

Please sign in to comment.