Skip to content

Commit

Permalink
get rid of InvertedIndexPos (#13431)
Browse files Browse the repository at this point in the history
- trying to solve this -
#13432
- doesn't solve it completely...next step is to AccountsHistoryIdx,
StorageHistoryIdx etc. linked to AccountsDomain, StorageDomain etc. this
will make IndexRange one-liner
  • Loading branch information
sudeepdino008 authored Jan 15, 2025
1 parent 1564283 commit b2f772b
Show file tree
Hide file tree
Showing 12 changed files with 117 additions and 156 deletions.
2 changes: 1 addition & 1 deletion core/rawdb/rawdbreset/reset_stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func ResetExec(ctx context.Context, db kv.RwDB, agg *state.Aggregator, chain str
cleanupList = append(cleanupList, stateBuckets...)
cleanupList = append(cleanupList, stateHistoryBuckets...)
cleanupList = append(cleanupList, agg.DomainTables(kv.AccountsDomain, kv.StorageDomain, kv.CodeDomain, kv.CommitmentDomain, kv.ReceiptDomain)...)
cleanupList = append(cleanupList, agg.InvertedIndexTables(kv.LogAddrIdxPos, kv.LogTopicIdxPos, kv.TracesFromIdxPos, kv.TracesToIdxPos)...)
cleanupList = append(cleanupList, agg.InvertedIndexTables(kv.LogAddrIdx, kv.LogTopicIdx, kv.TracesFromIdx, kv.TracesToIdx)...)

return db.Update(ctx, func(tx kv.RwTx) error {
if err := clearStageProgress(tx, stages.Execution); err != nil {
Expand Down
8 changes: 4 additions & 4 deletions core/state/rw_v3.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,23 +211,23 @@ func (rs *StateV3) ApplyState4(ctx context.Context, txTask *TxTask) error {

func (rs *StateV3) ApplyLogsAndTraces4(txTask *TxTask, domains *libstate.SharedDomains) error {
for addr := range txTask.TraceFroms {
if err := domains.IndexAdd(kv.TblTracesFromIdx, addr[:]); err != nil {
if err := domains.IndexAdd(kv.TracesFromIdx, addr[:]); err != nil {
return err
}
}

for addr := range txTask.TraceTos {
if err := domains.IndexAdd(kv.TblTracesToIdx, addr[:]); err != nil {
if err := domains.IndexAdd(kv.TracesToIdx, addr[:]); err != nil {
return err
}
}

for _, lg := range txTask.Logs {
if err := domains.IndexAdd(kv.TblLogAddressIdx, lg.Address[:]); err != nil {
if err := domains.IndexAdd(kv.LogAddrIdx, lg.Address[:]); err != nil {
return err
}
for _, topic := range lg.Topics {
if err := domains.IndexAdd(kv.TblLogTopicsIdx, topic[:]); err != nil {
if err := domains.IndexAdd(kv.LogTopicIdx, topic[:]); err != nil {
return err
}
}
Expand Down
2 changes: 0 additions & 2 deletions erigon-lib/kv/kv_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,8 +459,6 @@ type (
Appendable uint16
History string
InvertedIdx string

InvertedIdxPos uint16
)

type TemporalGetter interface {
Expand Down
21 changes: 0 additions & 21 deletions erigon-lib/kv/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -745,34 +745,13 @@ const (
LogAddrIdx InvertedIdx = "LogAddrIdx"
TracesFromIdx InvertedIdx = "TracesFromIdx"
TracesToIdx InvertedIdx = "TracesToIdx"

LogAddrIdxPos InvertedIdxPos = 0
LogTopicIdxPos InvertedIdxPos = 1
TracesFromIdxPos InvertedIdxPos = 2
TracesToIdxPos InvertedIdxPos = 3
StandaloneIdxLen InvertedIdxPos = 4
)

const (
ReceiptsAppendable Appendable = 0
AppendableLen Appendable = 0
)

func (iip InvertedIdxPos) String() string {
switch iip {
case LogAddrIdxPos:
return "logAddr"
case LogTopicIdxPos:
return "logTopic"
case TracesFromIdxPos:
return "traceFrom"
case TracesToIdxPos:
return "traceTo"
default:
return "unknown inverted index"
}
}

func (d Domain) String() string {
switch d {
case AccountsDomain:
Expand Down
93 changes: 48 additions & 45 deletions erigon-lib/state/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ import (
type Aggregator struct {
db kv.RoDB
d [kv.DomainLen]*Domain
iis [kv.StandaloneIdxLen]*InvertedIndex
iis map[kv.InvertedIdx]*InvertedIndex
dirs datadir.Dirs
tmpdir string
aggregationStep uint64
Expand Down Expand Up @@ -141,6 +141,7 @@ func NewAggregator(ctx context.Context, dirs datadir.Dirs, aggregationStep uint6
logger: logger,
collateAndBuildWorkers: 1,
mergeWorkers: 1,
iis: make(map[kv.InvertedIdx]*InvertedIndex),

commitmentValuesTransform: AggregatorSqueezeCommitmentValues,

Expand Down Expand Up @@ -209,7 +210,7 @@ func (a *Aggregator) registerDomain(name kv.Domain, salt *uint32, dirs datadir.D
return nil
}

func (a *Aggregator) registerII(idx kv.InvertedIdxPos, salt *uint32, dirs datadir.Dirs, aggregationStep uint64, filenameBase, indexKeysTable, indexTable string, logger log.Logger) error {
func (a *Aggregator) registerII(idx kv.InvertedIdx, salt *uint32, dirs datadir.Dirs, aggregationStep uint64, filenameBase, indexKeysTable, indexTable string, logger log.Logger) error {
idxCfg := iiCfg{
salt: salt, dirs: dirs,
aggregationStep: aggregationStep,
Expand All @@ -218,7 +219,12 @@ func (a *Aggregator) registerII(idx kv.InvertedIdxPos, salt *uint32, dirs datadi
valuesTable: indexTable,
compression: seg.CompressNone,
}

if _, ok := a.iis[idx]; ok {
return fmt.Errorf("inverted index %s already registered", idx)
}
var err error

a.iis[idx], err = NewInvertedIndex(idxCfg, logger)
if err != nil {
return err
Expand Down Expand Up @@ -490,7 +496,13 @@ func (c AggV3Collation) Close() {

type AggV3StaticFiles struct {
d [kv.DomainLen]StaticFiles
ivfs [kv.StandaloneIdxLen]InvertedFiles
ivfs map[kv.InvertedIdx]InvertedFiles
}

func NewAggV3StaticFiles() *AggV3StaticFiles {
return &AggV3StaticFiles{
ivfs: make(map[kv.InvertedIdx]InvertedFiles),
}
}

// CleanupOnError - call it on collation fail. It's closing all files
Expand All @@ -512,7 +524,7 @@ func (a *Aggregator) buildFiles(ctx context.Context, step uint64) error {
txTo = a.FirstTxNumOfStep(step + 1)
stepStartedAt = time.Now()

static AggV3StaticFiles
static = NewAggV3StaticFiles()
closeCollations = true
collListMu = sync.Mutex{}
collations = make([]Collation, 0)
Expand Down Expand Up @@ -572,7 +584,7 @@ func (a *Aggregator) buildFiles(ctx context.Context, step uint64) error {
closeCollations = false

// indices are built concurrently
for _, ii := range a.iis {
for iikey, ii := range a.iis {
ii := ii
dc := ii.BeginFilesRo()
firstStepNotInFiles := dc.FirstStepNotInFiles()
Expand All @@ -599,18 +611,7 @@ func (a *Aggregator) buildFiles(ctx context.Context, step uint64) error {
return err
}

switch ii.keysTable {
case kv.TblLogTopicsKeys:
static.ivfs[kv.LogTopicIdxPos] = sf
case kv.TblLogAddressKeys:
static.ivfs[kv.LogAddrIdxPos] = sf
case kv.TblTracesFromKeys:
static.ivfs[kv.TracesFromIdxPos] = sf
case kv.TblTracesToKeys:
static.ivfs[kv.TracesToIdxPos] = sf
default:
panic("unknown index " + ii.keysTable)
}
static.ivfs[iikey] = sf
return nil
})
}
Expand Down Expand Up @@ -744,7 +745,7 @@ func (a *Aggregator) MergeLoop(ctx context.Context) error {
}
}

func (a *Aggregator) integrateDirtyFiles(sf AggV3StaticFiles, txNumFrom, txNumTo uint64) {
func (a *Aggregator) integrateDirtyFiles(sf *AggV3StaticFiles, txNumFrom, txNumTo uint64) {
a.dirtyFilesLock.Lock()
defer a.dirtyFilesLock.Unlock()

Expand All @@ -762,7 +763,7 @@ func (a *Aggregator) DomainTables(domains ...kv.Domain) (tables []string) {
}
return tables
}
func (a *Aggregator) InvertedIndexTables(indices ...kv.InvertedIdxPos) (tables []string) {
func (a *Aggregator) InvertedIndexTables(indices ...kv.InvertedIdx) (tables []string) {
for _, idx := range indices {
tables = append(tables, a.iis[idx].Tables()...)
}
Expand Down Expand Up @@ -1072,17 +1073,17 @@ func (ac *AggregatorRoTx) Prune(ctx context.Context, tx kv.RwTx, limit uint64, l
return aggStat, err
}
}
var stats [kv.StandaloneIdxLen]*InvertedIndexPruneStat
for i := 0; i < int(kv.StandaloneIdxLen); i++ {
stat, err := ac.iis[i].Prune(ctx, tx, txFrom, txTo, limit, logEvery, false, nil)

stats := make(map[kv.InvertedIdx]*InvertedIndexPruneStat)
for iikey := range ac.a.iis {
stat, err := ac.iis[iikey].Prune(ctx, tx, txFrom, txTo, limit, logEvery, false, nil)
if err != nil {
return nil, err
}
stats[i] = stat
stats[iikey] = stat
}

for i := 0; i < int(kv.StandaloneIdxLen); i++ {
aggStat.Indices[ac.iis[i].ii.filenameBase] = stats[i]
for iikey, _ := range ac.a.iis {
aggStat.Indices[ac.iis[iikey].ii.filenameBase] = stats[iikey]
}

return aggStat, nil
Expand Down Expand Up @@ -1232,7 +1233,13 @@ func (a *Aggregator) recalcVisibleFilesMinimaxTxNum() {

type RangesV3 struct {
domain [kv.DomainLen]DomainRanges
invertedIndex [kv.StandaloneIdxLen]*MergeRange
invertedIndex map[kv.InvertedIdx]*MergeRange
}

func NewRangesV3() *RangesV3 {
return &RangesV3{
invertedIndex: make(map[kv.InvertedIdx]*MergeRange),
}
}

func (r RangesV3) String() string {
Expand All @@ -1246,7 +1253,7 @@ func (r RangesV3) String() string {
aggStep := r.domain[kv.AccountsDomain].aggStep
for p, mr := range r.invertedIndex {
if mr != nil && mr.needMerge {
ss = append(ss, mr.String(kv.InvertedIdxPos(p).String(), aggStep))
ss = append(ss, mr.String(string(p), aggStep))
}
}
return strings.Join(ss, ", ")
Expand All @@ -1266,8 +1273,8 @@ func (r RangesV3) any() bool {
return false
}

func (ac *AggregatorRoTx) findMergeRange(maxEndTxNum, maxSpan uint64) RangesV3 {
var r RangesV3
func (ac *AggregatorRoTx) findMergeRange(maxEndTxNum, maxSpan uint64) *RangesV3 {
r := NewRangesV3()
if ac.a.commitmentValuesTransform {
lmrAcc := ac.d[kv.AccountsDomain].files.LatestMergedRange()
lmrSto := ac.d[kv.StorageDomain].files.LatestMergedRange()
Expand Down Expand Up @@ -1328,8 +1335,8 @@ func (ac *AggregatorRoTx) RestrictSubsetFileDeletions(b bool) {
ac.a.d[kv.CommitmentDomain].restrictSubsetFileDeletions = b
}

func (ac *AggregatorRoTx) mergeFiles(ctx context.Context, files SelectedStaticFilesV3, r RangesV3) (MergedFilesV3, error) {
var mf MergedFilesV3
func (ac *AggregatorRoTx) mergeFiles(ctx context.Context, files *SelectedStaticFilesV3, r *RangesV3) (*MergedFilesV3, error) {
mf := NewMergedFilesV3()
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(ac.a.mergeWorkers)
closeFiles := true
Expand Down Expand Up @@ -1404,7 +1411,7 @@ func (ac *AggregatorRoTx) mergeFiles(ctx context.Context, files SelectedStaticFi
return mf, err
}

func (a *Aggregator) integrateMergedDirtyFiles(outs SelectedStaticFilesV3, in MergedFilesV3) {
func (a *Aggregator) integrateMergedDirtyFiles(outs *SelectedStaticFilesV3, in *MergedFilesV3) {
a.dirtyFilesLock.Lock()
defer a.dirtyFilesLock.Unlock()

Expand All @@ -1418,7 +1425,7 @@ func (a *Aggregator) integrateMergedDirtyFiles(outs SelectedStaticFilesV3, in Me

}

func (a *Aggregator) cleanAfterMerge(in MergedFilesV3) {
func (a *Aggregator) cleanAfterMerge(in *MergedFilesV3) {
at := a.BeginFilesRo()
defer at.Close()

Expand Down Expand Up @@ -1564,17 +1571,12 @@ func (ac *AggregatorRoTx) IndexRange(name kv.InvertedIdx, k []byte, fromTs, toTs
return ac.d[kv.StorageDomain].ht.IdxRange(k, fromTs, toTs, asc, limit, tx)
case kv.ReceiptHistoryIdx:
return ac.d[kv.ReceiptDomain].ht.IdxRange(k, fromTs, toTs, asc, limit, tx)
//case kv.GasUsedHistoryIdx:
// return ac.d[kv.GasUsedDomain].ht.IdxRange(k, fromTs, toTs, asc, limit, tx)
case kv.LogTopicIdx:
return ac.iis[kv.LogTopicIdxPos].IdxRange(k, fromTs, toTs, asc, limit, tx)
case kv.LogAddrIdx:
return ac.iis[kv.LogAddrIdxPos].IdxRange(k, fromTs, toTs, asc, limit, tx)
case kv.TracesFromIdx:
return ac.iis[kv.TracesFromIdxPos].IdxRange(k, fromTs, toTs, asc, limit, tx)
case kv.TracesToIdx:
return ac.iis[kv.TracesToIdxPos].IdxRange(k, fromTs, toTs, asc, limit, tx)
default:
// check the ii
if v, ok := ac.iis[name]; ok {
return v.IdxRange(k, fromTs, toTs, asc, limit, tx)
}

return nil, fmt.Errorf("unexpected history name: %s", name)
}
}
Expand Down Expand Up @@ -1635,7 +1637,7 @@ func (ac *AggregatorRoTx) nastyFileRead(name kv.Domain, from, to uint64) (*seg.R
type AggregatorRoTx struct {
a *Aggregator
d [kv.DomainLen]*DomainRoTx
iis [kv.StandaloneIdxLen]*InvertedIndexRoTx
iis map[kv.InvertedIdx]*InvertedIndexRoTx

id uint64 // auto-increment id of ctx for logs
_leakID uint64 // set only if TRACE_AGG=true
Expand All @@ -1646,6 +1648,7 @@ func (a *Aggregator) BeginFilesRo() *AggregatorRoTx {
a: a,
id: a.ctxAutoIncrement.Add(1),
_leakID: a.leakDetector.Add(),
iis: make(map[kv.InvertedIdx]*InvertedIndexRoTx),
}

a.visibleFilesLock.RLock()
Expand Down
8 changes: 4 additions & 4 deletions erigon-lib/state/aggregator2.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,16 @@ func NewAggregator2(ctx context.Context, dirs datadir.Dirs, aggregationStep uint
if err := a.registerDomain(kv.ReceiptDomain, salt, dirs, aggregationStep, logger); err != nil {
return nil, err
}
if err := a.registerII(kv.LogAddrIdxPos, salt, dirs, aggregationStep, kv.FileLogAddressIdx, kv.TblLogAddressKeys, kv.TblLogAddressIdx, logger); err != nil {
if err := a.registerII(kv.LogAddrIdx, salt, dirs, aggregationStep, kv.FileLogAddressIdx, kv.TblLogAddressKeys, kv.TblLogAddressIdx, logger); err != nil {
return nil, err
}
if err := a.registerII(kv.LogTopicIdxPos, salt, dirs, aggregationStep, kv.FileLogTopicsIdx, kv.TblLogTopicsKeys, kv.TblLogTopicsIdx, logger); err != nil {
if err := a.registerII(kv.LogTopicIdx, salt, dirs, aggregationStep, kv.FileLogTopicsIdx, kv.TblLogTopicsKeys, kv.TblLogTopicsIdx, logger); err != nil {
return nil, err
}
if err := a.registerII(kv.TracesFromIdxPos, salt, dirs, aggregationStep, kv.FileTracesFromIdx, kv.TblTracesFromKeys, kv.TblTracesFromIdx, logger); err != nil {
if err := a.registerII(kv.TracesFromIdx, salt, dirs, aggregationStep, kv.FileTracesFromIdx, kv.TblTracesFromKeys, kv.TblTracesFromIdx, logger); err != nil {
return nil, err
}
if err := a.registerII(kv.TracesToIdxPos, salt, dirs, aggregationStep, kv.FileTracesToIdx, kv.TblTracesToKeys, kv.TblTracesToIdx, logger); err != nil {
if err := a.registerII(kv.TracesToIdx, salt, dirs, aggregationStep, kv.FileTracesToIdx, kv.TblTracesToKeys, kv.TblTracesToIdx, logger); err != nil {
return nil, err
}
a.KeepRecentTxnsOfHistoriesWithDisabledSnapshots(100_000) // ~1k blocks of history
Expand Down
Loading

0 comments on commit b2f772b

Please sign in to comment.