diff --git a/server/filestore.go b/server/filestore.go index 4a4b2d4741d..a1dacf7662c 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -214,7 +214,7 @@ type msgBlock struct { bytes uint64 // User visible bytes count. rbytes uint64 // Total bytes (raw) including deleted. Used for rolling to new blk. msgs uint64 // User visible message count. - fss map[string]*SimpleState + fss *stree.SubjectTree[SimpleState] kfn string lwts int64 llts int64 @@ -2063,11 +2063,13 @@ func (fs *fileStore) expireMsgsOnRecover() { } // Make sure we do subject cleanup as well. mb.ensurePerSubjectInfoLoaded() - for subj, ss := range mb.fss { + mb.fss.Iter(func(bsubj []byte, ss *SimpleState) bool { + subj := bytesToString(bsubj) for i := uint64(0); i < ss.Msgs; i++ { fs.removePerSubject(subj) } - } + return true + }) mb.dirtyCloseWithRemove(true) deleted++ } @@ -2315,8 +2317,8 @@ func (mb *msgBlock) firstMatching(filter string, wc bool, start uint64, sm *Stor mb.lsts = time.Now().UnixNano() // If we only have 1 subject currently and it matches our filter we can also set isAll. - if !isAll && len(mb.fss) == 1 { - _, isAll = mb.fss[filter] + if !isAll && mb.fss.Size() == 1 { + _, isAll = mb.fss.Find(stringToBytes(filter)) } // Make sure to start at mb.first.seq if fseq < mb.first.seq if seq := atomic.LoadUint64(&mb.first.seq); seq > fseq { @@ -2345,18 +2347,19 @@ func (mb *msgBlock) firstMatching(filter string, wc bool, start uint64, sm *Stor // 25th quantile of a match in a linear walk. Filter should be a wildcard. // We should consult fss if our cache is not loaded and we only have fss loaded. if !doLinearScan && wc && mb.cacheAlreadyLoaded() { - doLinearScan = len(mb.fss)*4 > int(lseq-fseq) + doLinearScan = mb.fss.Size()*4 > int(lseq-fseq) } if !doLinearScan { // If we have a wildcard match against all tracked subjects we know about. if wc { subs = subs[:0] - for subj := range mb.fss { - if isMatch(subj) { - subs = append(subs, subj) + mb.fss.Iter(func(bsubj []byte, _ *SimpleState) bool { + if subj := bytesToString(bsubj); isMatch(subj) { + subs = append(subs, string(bsubj)) } - } + return true + }) // Check if we matched anything if len(subs) == 0 { return nil, didLoad, ErrStoreMsgNotFound @@ -2364,7 +2367,7 @@ func (mb *msgBlock) firstMatching(filter string, wc bool, start uint64, sm *Stor } fseq = lseq + 1 for _, subj := range subs { - ss := mb.fss[subj] + ss, _ := mb.fss.Find(stringToBytes(subj)) if ss != nil && ss.firstNeedsUpdate { mb.recalculateFirstForSubj(subj, ss.First, ss) } @@ -2485,8 +2488,8 @@ func (mb *msgBlock) filteredPendingLocked(filter string, wc bool, sseq uint64) ( } var havePartial bool - for subj, ss := range mb.fss { - if isAll || isMatch(subj) { + mb.fss.Iter(func(bsubj []byte, ss *SimpleState) bool { + if subj := bytesToString(bsubj); isAll || isMatch(subj) { if ss.firstNeedsUpdate { mb.recalculateFirstForSubj(subj, ss.First, ss) } @@ -2495,10 +2498,11 @@ func (mb *msgBlock) filteredPendingLocked(filter string, wc bool, sseq uint64) ( } else if sseq <= ss.Last { // We matched but its a partial. havePartial = true - break + return false } } - } + return true + }) // If we did not encounter any partials we can return here. if !havePartial { @@ -2711,7 +2715,8 @@ func (fs *fileStore) SubjectsState(subject string) map[string]SimpleState { } // Mark fss activity. mb.lsts = time.Now().UnixNano() - for subj, ss := range mb.fss { + mb.fss.Iter(func(bsubj []byte, ss *SimpleState) bool { + subj := string(bsubj) if subject == _EMPTY_ || subject == fwcs || subjectIsSubsetMatch(subj, subject) { if ss.firstNeedsUpdate { mb.recalculateFirstForSubj(subj, ss.First, ss) @@ -2725,7 +2730,8 @@ func (fs *fileStore) SubjectsState(subject string) map[string]SimpleState { fss[subj] = oss } } - } + return true + }) if shouldExpire { // Expire this cache before moving on. mb.tryForceExpireCacheLocked() @@ -2810,7 +2816,7 @@ func (fs *fileStore) MultiLastSeqs(filters []string, maxSeq uint64, maxAllowed i mb.mu.Lock() mb.ensurePerSubjectInfoLoaded() for subj, psi := range subs { - if ss := mb.fss[subj]; ss != nil { + if ss, ok := mb.fss.Find(stringToBytes(subj)); ok && ss != nil { if ss.Last <= maxSeq { seqs = append(seqs, ss.Last) delete(subs, subj) @@ -3008,7 +3014,8 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool) mb.lsts = time.Now().UnixNano() var havePartial bool - for subj, ss := range mb.fss { + mb.fss.Iter(func(bsubj []byte, ss *SimpleState) bool { + subj := bytesToString(bsubj) if isMatch(subj) { if ss.firstNeedsUpdate { mb.recalculateFirstForSubj(subj, ss.First, ss) @@ -3018,10 +3025,11 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool) } else if sseq <= ss.Last { // We matched but its a partial. havePartial = true - break + return false } } - } + return true + }) // See if we need to scan msgs here. if havePartial { @@ -3099,11 +3107,12 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool) // Mark fss activity. mb.lsts = time.Now().UnixNano() - for subj, ss := range mb.fss { - if isMatch(subj) { + mb.fss.Iter(func(bsubj []byte, ss *SimpleState) bool { + if subj := bytesToString(bsubj); isMatch(subj) { adjust += ss.Msgs } - } + return true + }) } } else { // This is the last block. We need to scan per message here. @@ -3224,7 +3233,7 @@ func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) { // Lock should be held to quiet race detector. mb.mu.Lock() mb.setupWriteCache(rbuf) - mb.fss = make(map[string]*SimpleState) + mb.fss = stree.NewSubjectTree[SimpleState]() // Set cache time to creation time to start. ts := time.Now().UnixNano() @@ -3676,10 +3685,11 @@ func (fs *fileStore) firstSeqForSubj(subj string) (uint64, error) { // Mark fss activity. mb.lsts = time.Now().UnixNano() - if ss := mb.fss[subj]; ss != nil { + bsubj := stringToBytes(subj) + if ss, ok := mb.fss.Find(bsubj); ok && ss != nil { // Adjust first if it was not where we thought it should be. if i != start { - if info, ok := fs.psim.Find(stringToBytes(subj)); ok { + if info, ok := fs.psim.Find(bsubj); ok { info.fblk = i } } @@ -3812,8 +3822,8 @@ func (fs *fileStore) enforceMsgPerSubjectLimit(fireCallback bool) { // Grab the ss entry for this subject in case sparse. mb.mu.Lock() mb.ensurePerSubjectInfoLoaded() - ss := mb.fss[subj] - if ss != nil && ss.firstNeedsUpdate { + ss, ok := mb.fss.Find(stringToBytes(subj)) + if ok && ss != nil && ss.firstNeedsUpdate { mb.recalculateFirstForSubj(subj, ss.First, ss) } mb.mu.Unlock() @@ -4908,11 +4918,11 @@ func (mb *msgBlock) writeMsgRecord(rl, seq uint64, subj string, mhdr, msg []byte } // Mark fss activity. mb.lsts = time.Now().UnixNano() - if ss := mb.fss[subj]; ss != nil { + if ss, ok := mb.fss.Find(stringToBytes(subj)); ok && ss != nil { ss.Msgs++ ss.Last = seq } else { - mb.fss[subj] = &SimpleState{Msgs: 1, First: seq, Last: seq} + mb.fss.Insert(stringToBytes(subj), SimpleState{Msgs: 1, First: seq, Last: seq}) } } @@ -5513,7 +5523,7 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error { // Create FSS if we should track. var popFss bool if mb.fssNotLoaded() { - mb.fss = make(map[string]*SimpleState) + mb.fss = stree.NewSubjectTree[SimpleState]() popFss = true } // Mark fss activity. @@ -5580,15 +5590,15 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error { // Handle FSS inline here. if popFss && slen > 0 && !mb.noTrack && !erased && !mb.dmap.Exists(seq) { bsubj := buf[index+msgHdrSize : index+msgHdrSize+uint32(slen)] - if ss := mb.fss[string(bsubj)]; ss != nil { + if ss, ok := mb.fss.Find(bsubj); ok && ss != nil { ss.Msgs++ ss.Last = seq } else { - mb.fss[string(bsubj)] = &SimpleState{ + mb.fss.Insert(bsubj, SimpleState{ Msgs: 1, First: seq, Last: seq, - } + }) } } } @@ -6302,7 +6312,7 @@ func (fs *fileStore) loadLast(subj string, sm *StoreMsg) (lsm *StoreMsg, err err var l uint64 // Optimize if subject is not a wildcard. if !wc { - if ss := mb.fss[subj]; ss != nil { + if ss, ok := mb.fss.Find(stringToBytes(subj)); ok && ss != nil { l = ss.Last } } @@ -7018,11 +7028,13 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) { bytes += mb.bytes // Make sure we do subject cleanup as well. mb.ensurePerSubjectInfoLoaded() - for subj, ss := range mb.fss { + mb.fss.Iter(func(bsubj []byte, ss *SimpleState) bool { + subj := bytesToString(bsubj) for i := uint64(0); i < ss.Msgs; i++ { fs.removePerSubject(subj) } - } + return true + }) // Now close. mb.dirtyCloseWithRemove(true) mb.mu.Unlock() @@ -7423,13 +7435,17 @@ func (mb *msgBlock) dirtyCloseWithRemove(remove bool) { // Lock should be held. func (mb *msgBlock) removeSeqPerSubject(subj string, seq uint64) { mb.ensurePerSubjectInfoLoaded() - ss := mb.fss[subj] - if ss == nil { + if mb.fss == nil { + return + } + bsubj := stringToBytes(subj) + ss, ok := mb.fss.Find(bsubj) + if !ok || ss == nil { return } if ss.Msgs == 1 { - delete(mb.fss, subj) + mb.fss.Delete(bsubj) return } @@ -7531,7 +7547,7 @@ func (mb *msgBlock) generatePerSubjectInfo() error { } // Create new one regardless. - mb.fss = make(map[string]*SimpleState) + mb.fss = stree.NewSubjectTree[SimpleState]() var smv StoreMsg fseq, lseq := atomic.LoadUint64(&mb.first.seq), atomic.LoadUint64(&mb.last.seq) @@ -7548,16 +7564,16 @@ func (mb *msgBlock) generatePerSubjectInfo() error { return err } if sm != nil && len(sm.subj) > 0 { - if ss := mb.fss[sm.subj]; ss != nil { + if ss, ok := mb.fss.Find(stringToBytes(sm.subj)); ok && ss != nil { ss.Msgs++ ss.Last = seq } else { - mb.fss[sm.subj] = &SimpleState{Msgs: 1, First: seq, Last: seq} + mb.fss.Insert(stringToBytes(sm.subj), SimpleState{Msgs: 1, First: seq, Last: seq}) } } } - if len(mb.fss) > 0 { + if mb.fss.Size() > 0 { // Make sure we run the cache expire timer. mb.llts = time.Now().UnixNano() // Mark fss activity. @@ -7578,7 +7594,7 @@ func (mb *msgBlock) ensurePerSubjectInfoLoaded() error { return nil } if mb.msgs == 0 { - mb.fss = make(map[string]*SimpleState) + mb.fss = stree.NewSubjectTree[SimpleState]() return nil } return mb.generatePerSubjectInfo() @@ -7595,9 +7611,8 @@ func (fs *fileStore) populateGlobalPerSubjectInfo(mb *msgBlock) { } // Now populate psim. - for subj, ss := range mb.fss { - if len(subj) > 0 { - bsubj := stringToBytes(subj) + mb.fss.Iter(func(bsubj []byte, ss *SimpleState) bool { + if len(bsubj) > 0 { if info, ok := fs.psim.Find(bsubj); ok { info.total += ss.Msgs if mb.index > info.lblk { @@ -7605,10 +7620,11 @@ func (fs *fileStore) populateGlobalPerSubjectInfo(mb *msgBlock) { } } else { fs.psim.Insert(bsubj, psi{total: ss.Msgs, fblk: mb.index, lblk: mb.index}) - fs.tsl += len(subj) + fs.tsl += len(bsubj) } } - } + return true + }) } // Close the message block. diff --git a/server/filestore_test.go b/server/filestore_test.go index 302dce59180..a0ccde29c9f 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -4098,10 +4098,10 @@ func TestFileStoreNoFSSBugAfterRemoveFirst(t *testing.T) { mb := fs.blks[0] fs.mu.Unlock() mb.mu.RLock() - ss := mb.fss["foo.bar.0"] + ss, ok := mb.fss.Find([]byte("foo.bar.0")) mb.mu.RUnlock() - if ss != nil { + if ok && ss != nil { t.Fatalf("Expected no state for %q, but got %+v\n", "foo.bar.0", ss) } }) @@ -6883,7 +6883,7 @@ func TestFileStoreFSSExpireNumPending(t *testing.T) { require_True(t, elapsed > time.Since(start)) // Sleep enough so that all mb.fss should expire, which is 2s above. - time.Sleep(3 * time.Second) + time.Sleep(4 * time.Second) fs.mu.RLock() for i, mb := range fs.blks { mb.mu.RLock() @@ -6891,7 +6891,7 @@ func TestFileStoreFSSExpireNumPending(t *testing.T) { mb.mu.RUnlock() if fss != nil { fs.mu.RUnlock() - t.Fatalf("Detected loaded fss for mb %d", i) + t.Fatalf("Detected loaded fss for mb %d (size %d)", i, fss.Size()) } } fs.mu.RUnlock()