diff --git a/server/filestore.go b/server/filestore.go index 67e3a3e33bf..556646c0edb 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -2590,9 +2590,48 @@ func (fs *fileStore) FilteredState(sseq uint64, subj string) SimpleState { return ss } +// This is used to see if we can selectively jump start blocks based on filter subject and a floor block index. +// Will return -1 if no matches at all. +func (fs *fileStore) checkSkipFirstBlock(filter string, wc bool) int { + start := uint32(math.MaxUint32) + if wc { + fs.psim.Match(stringToBytes(filter), func(_ []byte, psi *psi) { + if psi.fblk < start { + start = psi.fblk + } + }) + } else if psi, ok := fs.psim.Find(stringToBytes(filter)); ok { + start = psi.fblk + } + // Nothing found. + if start == uint32(math.MaxUint32) { + return -1 + } + // Here we need to translate this to index into fs.blks. + mb := fs.bim[start] + if mb == nil { + return -1 + } + bi, _ := fs.selectMsgBlockWithIndex(atomic.LoadUint64(&mb.last.seq)) + return bi +} + // Optimized way for getting all num pending matching a filter subject. // Lock should be held. func (fs *fileStore) numFilteredPending(filter string, ss *SimpleState) { + fs.numFilteredPendingWithLast(filter, true, ss) +} + +// Optimized way for getting all num pending matching a filter subject and first sequence only. +// Lock should be held. +func (fs *fileStore) numFilteredPendingNoLast(filter string, ss *SimpleState) { + fs.numFilteredPendingWithLast(filter, false, ss) +} + +// Optimized way for getting all num pending matching a filter subject. +// Optionally look up last sequence. Sometimes do not need last and this avoids cost. +// Lock should be held. +func (fs *fileStore) numFilteredPendingWithLast(filter string, last bool, ss *SimpleState) { isAll := filter == _EMPTY_ || filter == fwcs // If isAll we do not need to do anything special to calculate the first and last and total. @@ -2602,6 +2641,8 @@ func (fs *fileStore) numFilteredPending(filter string, ss *SimpleState) { ss.Msgs = fs.state.Msgs return } + // Always reset. + ss.First, ss.Last, ss.Msgs = 0, 0, 0 // We do need to figure out the first and last sequences. wc := subjectHasWildcard(filter) @@ -2625,7 +2666,6 @@ func (fs *fileStore) numFilteredPending(filter string, ss *SimpleState) { // Did not find anything. if stop == 0 { - ss.First, ss.Last, ss.Msgs = 0, 0, 0 return } @@ -2636,10 +2676,12 @@ func (fs *fileStore) numFilteredPending(filter string, ss *SimpleState) { ss.First = f } - // Hold this outside loop for psim fblk updates on misses. - i := start + 1 if ss.First == 0 { - // This is a miss. This can happen since psi.fblk is lazy, but should be very rare. + // This is a miss. This can happen since psi.fblk is lazy. + // We will make sure to update fblk. + + // Hold this outside loop for psim fblk updates when done. + i := start + 1 for ; i <= stop; i++ { mb := fs.bim[i] if mb == nil { @@ -2650,25 +2692,25 @@ func (fs *fileStore) numFilteredPending(filter string, ss *SimpleState) { break } } - } - // Update fblk if we missed matching some blocks, meaning fblk was outdated. - if i > start+1 { + // Update fblk since fblk was outdated. if !wc { if info, ok := fs.psim.Find(stringToBytes(filter)); ok { info.fblk = i } } else { - fs.psim.Match(stringToBytes(filter), func(_ []byte, psi *psi) { + fs.psim.Match(stringToBytes(filter), func(subj []byte, psi *psi) { if i > psi.fblk { psi.fblk = i } }) } } - // Now last - if mb = fs.bim[stop]; mb != nil { - _, _, l := mb.filteredPending(filter, wc, 0) - ss.Last = l + // Now gather last sequence if asked to do so. + if last { + if mb = fs.bim[stop]; mb != nil { + _, _, l := mb.filteredPending(filter, wc, 0) + ss.Last = l + } } } @@ -6396,7 +6438,7 @@ func (fs *fileStore) LoadNextMsg(filter string, wc bool, start uint64, sm *Store // let's check the psim to see if we can skip ahead. if start <= fs.state.FirstSeq { var ss SimpleState - fs.numFilteredPending(filter, &ss) + fs.numFilteredPendingNoLast(filter, &ss) // Nothing available. if ss.Msgs == 0 { return nil, fs.state.LastSeq, ErrStoreEOF @@ -6422,16 +6464,15 @@ func (fs *fileStore) LoadNextMsg(filter string, wc bool, start uint64, sm *Store // Similar to above if start <= first seq. // TODO(dlc) - For v2 track these by filter subject since they will represent filtered consumers. if i == bi { - var ss SimpleState - fs.numFilteredPending(filter, &ss) + nbi := fs.checkSkipFirstBlock(filter, wc) // Nothing available. - if ss.Msgs == 0 { + if nbi < 0 { return nil, fs.state.LastSeq, ErrStoreEOF } // See if we can jump ahead here. // Right now we can only spin on first, so if we have interior sparseness need to favor checking per block fss if loaded. // For v2 will track all blocks that have matches for psim. - if nbi, _ := fs.selectMsgBlockWithIndex(ss.First); nbi > i { + if nbi > i { i = nbi - 1 // For the iterator condition i++ } } diff --git a/server/filestore_test.go b/server/filestore_test.go index 302dce59180..3238c296a78 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -7107,7 +7107,7 @@ func TestFileStoreWildcardFilteredPendingPSIMFirstBlockUpdate(t *testing.T) { for i := 0; i < 1000; i++ { fs.StoreMsg("foo.1.foo", nil, msg) } - // Bookend with 3 more,twoe foo.baz and two foo.bar. + // Bookend with 3 more, two foo.baz and two foo.bar. fs.StoreMsg("foo.22.baz", nil, msg) fs.StoreMsg("foo.22.baz", nil, msg) fs.StoreMsg("foo.22.bar", nil, msg) @@ -7166,6 +7166,84 @@ func TestFileStoreWildcardFilteredPendingPSIMFirstBlockUpdate(t *testing.T) { require_Equal(t, psi.lblk, 92) } +// Make sure if we only miss by one for fblk that we still update it. +func TestFileStoreFilteredPendingPSIMFirstBlockUpdateNextBlock(t *testing.T) { + sd := t.TempDir() + fs, err := newFileStore( + FileStoreConfig{StoreDir: sd, BlockSize: 128}, + StreamConfig{Name: "zzz", Subjects: []string{"foo.*.*"}, Storage: FileStorage}) + require_NoError(t, err) + defer fs.Stop() + + msg := []byte("hello") + // Create 4 blocks, each block holds 2 msgs + for i := 0; i < 4; i++ { + fs.StoreMsg("foo.22.bar", nil, msg) + fs.StoreMsg("foo.22.baz", nil, msg) + } + require_Equal(t, fs.numMsgBlocks(), 4) + + fetch := func(subj string) *psi { + t.Helper() + fs.mu.RLock() + psi, ok := fs.psim.Find([]byte(subj)) + fs.mu.RUnlock() + require_True(t, ok) + return psi + } + + psi := fetch("foo.22.bar") + require_Equal(t, psi.total, 4) + require_Equal(t, psi.fblk, 1) + require_Equal(t, psi.lblk, 4) + + // Now remove first instance of "foo.22.bar" + removed, err := fs.RemoveMsg(1) + require_NoError(t, err) + require_True(t, removed) + + // Call into numFilterePending(), we want to make sure it updates fblk. + var ss SimpleState + fs.mu.Lock() + fs.numFilteredPending("foo.22.bar", &ss) + fs.mu.Unlock() + require_Equal(t, ss.Msgs, 3) + require_Equal(t, ss.First, 3) + require_Equal(t, ss.Last, 7) + + // Now make sure that we properly updated the psim entry. + psi = fetch("foo.22.bar") + require_Equal(t, psi.total, 3) + require_Equal(t, psi.fblk, 2) + require_Equal(t, psi.lblk, 4) + + // Now make sure wildcard calls into also update blks. + // First remove first "foo.22.baz" which will remove first block. + removed, err = fs.RemoveMsg(2) + require_NoError(t, err) + require_True(t, removed) + // Make sure 3 blks left + require_Equal(t, fs.numMsgBlocks(), 3) + + psi = fetch("foo.22.baz") + require_Equal(t, psi.total, 3) + require_Equal(t, psi.fblk, 1) + require_Equal(t, psi.lblk, 4) + + // Now call wildcard version of numFilteredPending to make sure it clears. + fs.mu.Lock() + fs.numFilteredPending("foo.*.baz", &ss) + fs.mu.Unlock() + require_Equal(t, ss.Msgs, 3) + require_Equal(t, ss.First, 4) + require_Equal(t, ss.Last, 8) + + psi = fetch("foo.22.baz") + require_Equal(t, psi.total, 3) + require_Equal(t, psi.fblk, 2) + require_Equal(t, psi.lblk, 4) +} + /////////////////////////////////////////////////////////////////////////// // Benchmarks ///////////////////////////////////////////////////////////////////////////