Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IMPROVED] Updating out of date fblks in psim entries. #5577

Merged
merged 3 commits into from
Jun 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 58 additions & 17 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2590,9 +2590,48 @@ func (fs *fileStore) FilteredState(sseq uint64, subj string) SimpleState {
return ss
}

// This is used to see if we can selectively jump start blocks based on filter subject and a floor block index.
// Will return -1 if no matches at all.
func (fs *fileStore) checkSkipFirstBlock(filter string, wc bool) int {
start := uint32(math.MaxUint32)
if wc {
fs.psim.Match(stringToBytes(filter), func(_ []byte, psi *psi) {
if psi.fblk < start {
start = psi.fblk
}
})
} else if psi, ok := fs.psim.Find(stringToBytes(filter)); ok {
start = psi.fblk
}
// Nothing found.
if start == uint32(math.MaxUint32) {
return -1
}
// Here we need to translate this to index into fs.blks.
mb := fs.bim[start]
if mb == nil {
return -1
}
bi, _ := fs.selectMsgBlockWithIndex(atomic.LoadUint64(&mb.last.seq))
return bi
}

// Optimized way for getting all num pending matching a filter subject.
// Lock should be held.
func (fs *fileStore) numFilteredPending(filter string, ss *SimpleState) {
fs.numFilteredPendingWithLast(filter, true, ss)
}

// Optimized way for getting all num pending matching a filter subject and first sequence only.
// Lock should be held.
func (fs *fileStore) numFilteredPendingNoLast(filter string, ss *SimpleState) {
fs.numFilteredPendingWithLast(filter, false, ss)
}

// Optimized way for getting all num pending matching a filter subject.
// Optionally look up last sequence. Sometimes do not need last and this avoids cost.
// Lock should be held.
func (fs *fileStore) numFilteredPendingWithLast(filter string, last bool, ss *SimpleState) {
isAll := filter == _EMPTY_ || filter == fwcs

// If isAll we do not need to do anything special to calculate the first and last and total.
Expand All @@ -2602,6 +2641,8 @@ func (fs *fileStore) numFilteredPending(filter string, ss *SimpleState) {
ss.Msgs = fs.state.Msgs
return
}
// Always reset.
ss.First, ss.Last, ss.Msgs = 0, 0, 0

// We do need to figure out the first and last sequences.
wc := subjectHasWildcard(filter)
Expand All @@ -2625,7 +2666,6 @@ func (fs *fileStore) numFilteredPending(filter string, ss *SimpleState) {

// Did not find anything.
if stop == 0 {
ss.First, ss.Last, ss.Msgs = 0, 0, 0
return
}

Expand All @@ -2636,10 +2676,12 @@ func (fs *fileStore) numFilteredPending(filter string, ss *SimpleState) {
ss.First = f
}

// Hold this outside loop for psim fblk updates on misses.
i := start + 1
if ss.First == 0 {
// This is a miss. This can happen since psi.fblk is lazy, but should be very rare.
// This is a miss. This can happen since psi.fblk is lazy.
// We will make sure to update fblk.

// Hold this outside loop for psim fblk updates when done.
i := start + 1
for ; i <= stop; i++ {
mb := fs.bim[i]
if mb == nil {
Expand All @@ -2650,25 +2692,25 @@ func (fs *fileStore) numFilteredPending(filter string, ss *SimpleState) {
break
}
}
}
// Update fblk if we missed matching some blocks, meaning fblk was outdated.
if i > start+1 {
// Update fblk since fblk was outdated.
if !wc {
if info, ok := fs.psim.Find(stringToBytes(filter)); ok {
info.fblk = i
}
} else {
fs.psim.Match(stringToBytes(filter), func(_ []byte, psi *psi) {
fs.psim.Match(stringToBytes(filter), func(subj []byte, psi *psi) {
if i > psi.fblk {
psi.fblk = i
}
})
}
}
// Now last
if mb = fs.bim[stop]; mb != nil {
_, _, l := mb.filteredPending(filter, wc, 0)
ss.Last = l
// Now gather last sequence if asked to do so.
if last {
if mb = fs.bim[stop]; mb != nil {
_, _, l := mb.filteredPending(filter, wc, 0)
ss.Last = l
}
}
}

Expand Down Expand Up @@ -6396,7 +6438,7 @@ func (fs *fileStore) LoadNextMsg(filter string, wc bool, start uint64, sm *Store
// let's check the psim to see if we can skip ahead.
if start <= fs.state.FirstSeq {
var ss SimpleState
fs.numFilteredPending(filter, &ss)
fs.numFilteredPendingNoLast(filter, &ss)
// Nothing available.
if ss.Msgs == 0 {
return nil, fs.state.LastSeq, ErrStoreEOF
Expand All @@ -6422,16 +6464,15 @@ func (fs *fileStore) LoadNextMsg(filter string, wc bool, start uint64, sm *Store
// Similar to above if start <= first seq.
// TODO(dlc) - For v2 track these by filter subject since they will represent filtered consumers.
if i == bi {
var ss SimpleState
fs.numFilteredPending(filter, &ss)
nbi := fs.checkSkipFirstBlock(filter, wc)
// Nothing available.
if ss.Msgs == 0 {
if nbi < 0 {
return nil, fs.state.LastSeq, ErrStoreEOF
}
// See if we can jump ahead here.
// Right now we can only spin on first, so if we have interior sparseness need to favor checking per block fss if loaded.
// For v2 will track all blocks that have matches for psim.
if nbi, _ := fs.selectMsgBlockWithIndex(ss.First); nbi > i {
if nbi > i {
i = nbi - 1 // For the iterator condition i++
}
}
Expand Down
80 changes: 79 additions & 1 deletion server/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7107,7 +7107,7 @@ func TestFileStoreWildcardFilteredPendingPSIMFirstBlockUpdate(t *testing.T) {
for i := 0; i < 1000; i++ {
fs.StoreMsg("foo.1.foo", nil, msg)
}
// Bookend with 3 more,twoe foo.baz and two foo.bar.
// Bookend with 3 more, two foo.baz and two foo.bar.
fs.StoreMsg("foo.22.baz", nil, msg)
fs.StoreMsg("foo.22.baz", nil, msg)
fs.StoreMsg("foo.22.bar", nil, msg)
Expand Down Expand Up @@ -7166,6 +7166,84 @@ func TestFileStoreWildcardFilteredPendingPSIMFirstBlockUpdate(t *testing.T) {
require_Equal(t, psi.lblk, 92)
}

// Make sure if we only miss by one for fblk that we still update it.
func TestFileStoreFilteredPendingPSIMFirstBlockUpdateNextBlock(t *testing.T) {
sd := t.TempDir()
fs, err := newFileStore(
FileStoreConfig{StoreDir: sd, BlockSize: 128},
StreamConfig{Name: "zzz", Subjects: []string{"foo.*.*"}, Storage: FileStorage})
require_NoError(t, err)
defer fs.Stop()

msg := []byte("hello")
// Create 4 blocks, each block holds 2 msgs
for i := 0; i < 4; i++ {
fs.StoreMsg("foo.22.bar", nil, msg)
fs.StoreMsg("foo.22.baz", nil, msg)
}
require_Equal(t, fs.numMsgBlocks(), 4)

fetch := func(subj string) *psi {
t.Helper()
fs.mu.RLock()
psi, ok := fs.psim.Find([]byte(subj))
fs.mu.RUnlock()
require_True(t, ok)
return psi
}

psi := fetch("foo.22.bar")
require_Equal(t, psi.total, 4)
require_Equal(t, psi.fblk, 1)
require_Equal(t, psi.lblk, 4)

// Now remove first instance of "foo.22.bar"
removed, err := fs.RemoveMsg(1)
require_NoError(t, err)
require_True(t, removed)

// Call into numFilterePending(), we want to make sure it updates fblk.
var ss SimpleState
fs.mu.Lock()
fs.numFilteredPending("foo.22.bar", &ss)
fs.mu.Unlock()
require_Equal(t, ss.Msgs, 3)
require_Equal(t, ss.First, 3)
require_Equal(t, ss.Last, 7)

// Now make sure that we properly updated the psim entry.
psi = fetch("foo.22.bar")
require_Equal(t, psi.total, 3)
require_Equal(t, psi.fblk, 2)
require_Equal(t, psi.lblk, 4)

// Now make sure wildcard calls into also update blks.
// First remove first "foo.22.baz" which will remove first block.
removed, err = fs.RemoveMsg(2)
require_NoError(t, err)
require_True(t, removed)
// Make sure 3 blks left
require_Equal(t, fs.numMsgBlocks(), 3)

psi = fetch("foo.22.baz")
require_Equal(t, psi.total, 3)
require_Equal(t, psi.fblk, 1)
require_Equal(t, psi.lblk, 4)

// Now call wildcard version of numFilteredPending to make sure it clears.
fs.mu.Lock()
fs.numFilteredPending("foo.*.baz", &ss)
fs.mu.Unlock()
require_Equal(t, ss.Msgs, 3)
require_Equal(t, ss.First, 4)
require_Equal(t, ss.Last, 8)

psi = fetch("foo.22.baz")
require_Equal(t, psi.total, 3)
require_Equal(t, psi.fblk, 2)
require_Equal(t, psi.lblk, 4)
}

///////////////////////////////////////////////////////////////////////////
// Benchmarks
///////////////////////////////////////////////////////////////////////////
Expand Down
Loading