Skip to content

Commit

Permalink
Merge branch 'feat/trie-mutex-refactor' into refactor-trie-commit
Browse files Browse the repository at this point in the history
  • Loading branch information
BeniaminDrasovean authored Jan 14, 2025
2 parents 5a0e48a + 25ac93f commit d17c44c
Show file tree
Hide file tree
Showing 8 changed files with 229 additions and 33 deletions.
46 changes: 46 additions & 0 deletions trie/disabledGoroutinesManager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package trie

import "github.com/multiversx/mx-chain-go/common"

type disabledGoroutinesManager struct {
err error
}

// NewDisabledGoroutinesManager creates a new instance of disabledGoroutinesManager
func NewDisabledGoroutinesManager() *disabledGoroutinesManager {
return &disabledGoroutinesManager{}
}

// ShouldContinueProcessing returns true if there is no error
func (d *disabledGoroutinesManager) ShouldContinueProcessing() bool {
return d.err == nil
}

// CanStartGoRoutine returns false
func (d *disabledGoroutinesManager) CanStartGoRoutine() bool {
return false
}

// EndGoRoutineProcessing does nothing
func (d *disabledGoroutinesManager) EndGoRoutineProcessing() {
}

// SetNewErrorChannel does nothing
func (d *disabledGoroutinesManager) SetNewErrorChannel(_ common.BufferedErrChan) error {
return nil
}

// SetError sets the given error
func (d *disabledGoroutinesManager) SetError(err error) {
d.err = err
}

// GetError returns the error
func (d *disabledGoroutinesManager) GetError() error {
return d.err
}

// IsInterfaceNil returns true if there is no value under the interface
func (d *disabledGoroutinesManager) IsInterfaceNil() bool {
return d == nil
}
44 changes: 44 additions & 0 deletions trie/disabledGoroutinesManager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package trie

import (
"errors"
"testing"

"github.com/multiversx/mx-chain-core-go/core/check"
"github.com/stretchr/testify/assert"
)

func TestNewDisabledGoroutinesManager(t *testing.T) {
t.Parallel()

d := NewDisabledGoroutinesManager()
assert.False(t, check.IfNil(d))
}

func TestDisabledGoroutinesManager_ShouldContinueProcessing(t *testing.T) {
t.Parallel()

d := NewDisabledGoroutinesManager()
assert.True(t, d.ShouldContinueProcessing())

d.SetError(errors.New("error"))
assert.False(t, d.ShouldContinueProcessing())
}

func TestDisabledGoroutinesManager_CanStartGoRoutine(t *testing.T) {
t.Parallel()

d := NewDisabledGoroutinesManager()
assert.False(t, d.CanStartGoRoutine())
}

func TestDisabledGoroutinesManager_SetAndGetError(t *testing.T) {
t.Parallel()

d := NewDisabledGoroutinesManager()
assert.Nil(t, d.GetError())

err := errors.New("error")
d.SetError(err)
assert.Equal(t, err, d.GetError())
}
5 changes: 3 additions & 2 deletions trie/extensionNode.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@ import (
"context"
"encoding/hex"
"fmt"
"io"
"math"

"github.com/multiversx/mx-chain-core-go/core"
"github.com/multiversx/mx-chain-core-go/core/check"
"github.com/multiversx/mx-chain-core-go/hashing"
"github.com/multiversx/mx-chain-core-go/marshal"
"github.com/multiversx/mx-chain-go/common"
vmcommon "github.com/multiversx/mx-chain-vm-common-go"
"io"
"math"
)

var _ = node(&extensionNode{})
Expand Down
5 changes: 3 additions & 2 deletions trie/leafNode.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,16 @@ import (
"context"
"encoding/hex"
"fmt"
"io"
"math"

"github.com/multiversx/mx-chain-core-go/core"
"github.com/multiversx/mx-chain-core-go/core/check"
"github.com/multiversx/mx-chain-core-go/core/keyValStorage"
"github.com/multiversx/mx-chain-core-go/hashing"
"github.com/multiversx/mx-chain-core-go/marshal"
"github.com/multiversx/mx-chain-go/common"
vmcommon "github.com/multiversx/mx-chain-vm-common-go"
"io"
"math"
)

var _ = node(&leafNode{})
Expand Down
6 changes: 1 addition & 5 deletions trie/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,7 @@ func concat(s1 []byte, s2 ...byte) []byte {

func hasValidHash(n node) bool {
childHash := n.getHash()
if childHash == nil {
return false
}

return true
return len(childHash) != 0
}

func decodeNode(encNode []byte, marshalizer marshal.Marshalizer, hasher hashing.Hasher) (node, error) {
Expand Down
18 changes: 7 additions & 11 deletions trie/patriciaMerkleTrie.go
Original file line number Diff line number Diff line change
Expand Up @@ -632,19 +632,15 @@ func logMapWithTrace(message string, paramName string, hashes common.ModifiedHas

// GetProof computes a Merkle proof for the node that is present at the given key
func (tr *patriciaMerkleTrie) GetProof(key []byte, rootHash []byte) ([][]byte, []byte, error) {
trie, err := tr.Recreate(rootHash)
if err != nil {
return nil, nil, err
}

pmt, ok := trie.(*patriciaMerkleTrie)
if !ok {
return nil, nil, ErrWrongTypeAssertion
//TODO refactor this function to avoid encoding the node after it is retrieved from the DB.
// The encoded node is actually the value from db, thus we can use the retrieved value directly
if len(key) == 0 || bytes.Equal(rootHash, common.EmptyTrieHash) {
return nil, nil, ErrNilNode
}

rootNode := pmt.GetRootNode()
if check.IfNil(rootNode) {
return nil, nil, ErrNilNode
rootNode, err := getNodeFromDBAndDecode(rootHash, tr.trieStorage, tr.marshalizer, tr.hasher)
if err != nil {
return nil, nil, fmt.Errorf("trie get proof error: %w", err)
}

var proof [][]byte
Expand Down
122 changes: 122 additions & 0 deletions trie/patriciaMerkleTrie_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1581,6 +1581,9 @@ func TestPatriciaMerkleTrie_AddBatchedDataToTrie(t *testing.T) {
time.Sleep(time.Millisecond * 100)
}
},
SetErrorCalled: func(err error) {
assert.Fail(t, "should not have called this function")
},
}
trie.SetGoRoutinesManager(tr, grm)

Expand Down Expand Up @@ -1646,6 +1649,9 @@ func TestPatriciaMerkleTrie_AddBatchedDataToTrie(t *testing.T) {
time.Sleep(time.Millisecond * 100)
}
},
SetErrorCalled: func(err error) {
assert.Fail(t, "should not have called this function")
},
}
trie.SetGoRoutinesManager(tr, grm)

Expand Down Expand Up @@ -1811,6 +1817,9 @@ func TestPatriciaMerkleTrie_Get(t *testing.T) {
time.Sleep(time.Millisecond * 100)
}
},
SetErrorCalled: func(err error) {
assert.Fail(t, "should not have called this function")
},
}
trie.SetGoRoutinesManager(tr, grm)

Expand Down Expand Up @@ -1843,6 +1852,119 @@ func TestPatriciaMerkleTrie_Get(t *testing.T) {
})
}

func TestPatriciaMerkleTrie_RootHash(t *testing.T) {
t.Parallel()

t.Run("set root hash with batched data commits batch", func(t *testing.T) {
t.Parallel()

tr := emptyTrie()
numOperations := 1000
for i := 0; i < numOperations; i++ {
_ = tr.Update([]byte("dog"+strconv.Itoa(i)), []byte("reindeer"))
}

rootHash, err := tr.RootHash()
assert.Nil(t, err)
assert.NotEqual(t, emptyTrieHash, rootHash)
})
t.Run("set root hash and update trie concurrently should serialize operations", func(t *testing.T) {
t.Parallel()

// create trie with some data
tr := emptyTrie()
numOperations := 1000
for i := 0; i < numOperations; i++ {
_ = tr.Update([]byte("dog"+strconv.Itoa(i)), []byte("reindeer"))
}
trie.ExecuteUpdatesFromBatch(tr)

// compute rootHash
waitForSignal := atomic.Bool{}
waitForSignal.Store(true)
startedComputingRootHash := atomic.Bool{}
grm := &mock.GoroutinesManagerStub{
CanStartGoRoutineCalled: func() bool {
startedComputingRootHash.Store(true)
return true
},
EndGoRoutineProcessingCalled: func() {
for waitForSignal.Load() {
time.Sleep(time.Millisecond * 100)
}
},
SetErrorCalled: func(err error) {
assert.Fail(t, "should not have called this function")
},
}
trie.SetGoRoutinesManager(tr, grm)

go func() {
rootHash1, err := tr.RootHash()
assert.Nil(t, err)
assert.NotEqual(t, emptyTrieHash, rootHash1)
}()

// wait for start of the computation of the root hash
for !startedComputingRootHash.Load() {
time.Sleep(time.Millisecond * 100)
}

for i := numOperations; i < numOperations*2; i++ {
_ = tr.Update([]byte("dog"+strconv.Itoa(i)), []byte("reindeer"))
}
setNewErrChanCalled := atomic.Bool{}
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
grm.SetNewErrorChannelCalled = func(common.BufferedErrChan) error {
setNewErrChanCalled.Store(true)
return nil
}
trie.ExecuteUpdatesFromBatch(tr)
wg.Done()
}()

// commit batch to trie does not start until root hash is fully computed
time.Sleep(time.Millisecond * 500)
assert.False(t, setNewErrChanCalled.Load())

waitForSignal.Store(false)
wg.Wait()
assert.True(t, setNewErrChanCalled.Load())
})
t.Run("set root hash and get from trie concurrently", func(t *testing.T) {
t.Parallel()

tr := emptyTrie()
numOperations := 100000
for i := 0; i < numOperations; i++ {
_ = tr.Update([]byte("dog"+strconv.Itoa(i)), []byte("reindeer"))
}
trie.ExecuteUpdatesFromBatch(tr)

wg := sync.WaitGroup{}
wg.Add(1)
setRootHashFinished := atomic.Bool{}
go func() {
for !setRootHashFinished.Load() {
index := rand.Intn(numOperations)
val, _, err := tr.Get([]byte("dog" + strconv.Itoa(index)))
assert.Nil(t, err)
assert.Equal(t, []byte("reindeer"), val)
}
wg.Done()
}()

rootHash, err := tr.RootHash()
assert.Nil(t, err)
assert.NotEqual(t, emptyTrieHash, rootHash)

setRootHashFinished.Store(true)
wg.Wait()
})
}

func TestPatricianMerkleTrie_ConcurrentOperations(t *testing.T) {
t.Parallel()

Expand Down
16 changes: 3 additions & 13 deletions trie/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"bytes"
"context"
"fmt"
"github.com/multiversx/mx-chain-core-go/core/throttler"
"github.com/multiversx/mx-chain-go/common/errChan"
"sync"
"time"

Expand Down Expand Up @@ -359,17 +357,9 @@ func trieNode(
return nil, err
}

th, err := throttler.NewNumGoRoutinesThrottler(1)
if err != nil {
return nil, err
}
goRoutinesManager, err := NewGoroutinesManager(th, errChan.NewErrChanWrapper(), make(chan struct{}))
if err != nil {
return nil, err
}

decodedNode.setHash(goRoutinesManager)
err = goRoutinesManager.GetError()
manager := NewDisabledGoroutinesManager()
decodedNode.setHash(manager)
err = manager.GetError()
if err != nil {
return nil, err
}
Expand Down

0 comments on commit d17c44c

Please sign in to comment.