Skip to content

Commit

Permalink
Cherry-picks for 2.10.17-RC.4 (#5550)
Browse files Browse the repository at this point in the history
Includes the following:

* #5524
* #5528
* #5533
* #5535
* #5538
* #5543
* #5546
* #5545
* #5547
* #5548
* #5530 (**BETA**)
* #5549

The following PRs were **NOT** included as they were later reverted:

* #5532

Signed-off-by: Neil Twigg <[email protected]>
  • Loading branch information
wallyqs authored Jun 17, 2024
2 parents f9fc9b2 + 3993345 commit e8d6600
Show file tree
Hide file tree
Showing 17 changed files with 1,136 additions and 98 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ script: ./scripts/runTestsOnTravis.sh $TEST_SUITE
deploy:
provider: script
cleanup: true
script: curl -sfL https://goreleaser.com/static/run | VERSION=v1.26.2 bash
script: curl -o goreleaser.tar.gz -sLf https://github.com/goreleaser/goreleaser/releases/download/v1.26.2/goreleaser_Linux_x86_64.tar.gz && tar -xvf goreleaser.tar.gz && ./goreleaser
on:
tags: true
condition: ($TRAVIS_GO_VERSION =~ 1.22) && ($TEST_SUITE = "compile")
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
module github.com/nats-io/nats-server/v2

go 1.20
go 1.21

require (
github.com/klauspost/compress v1.17.8
github.com/klauspost/compress v1.17.9
github.com/minio/highwayhash v1.0.2
github.com/nats-io/jwt/v2 v2.5.7
github.com/nats-io/nats.go v1.35.0
github.com/nats-io/nats.go v1.36.0
github.com/nats-io/nkeys v0.4.7
github.com/nats-io/nuid v1.0.1
go.uber.org/automaxprocs v1.5.3
Expand Down
13 changes: 9 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,19 +1,23 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU=
github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA=
github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/nats-io/jwt/v2 v2.5.7 h1:j5lH1fUXCnJnY8SsQeB/a/z9Azgu2bYIDvtPVNdxe2c=
github.com/nats-io/jwt/v2 v2.5.7/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A=
github.com/nats-io/nats.go v1.35.0 h1:XFNqNM7v5B+MQMKqVGAyHwYhyKb48jrenXNxIU20ULk=
github.com/nats-io/nats.go v1.35.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
github.com/nats-io/nats.go v1.36.0 h1:suEUPuWzTSse/XhESwqLxXGuj8vGRuPRoG7MoRN/qyU=
github.com/nats-io/nats.go v1.36.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g=
github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U=
github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
go.uber.org/automaxprocs v1.5.3 h1:kWazyxZUrS3Gs4qUpbwo5kEIMGe/DAvi5Z4tl2NW4j8=
go.uber.org/automaxprocs v1.5.3/go.mod h1:eRbA25aqJrxAbsLO0xy5jVwPt7FQnRgjW+efnwa1WM0=
golang.org/x/crypto v0.24.0 h1:mnl8DM0o513X8fdIkmyFE/5hTYxbwYOjDS/+rK6qpRI=
Expand All @@ -24,3 +28,4 @@ golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=
golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
73 changes: 51 additions & 22 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2109,19 +2109,17 @@ func (o *consumer) loopAndForwardProposals(qch chan struct{}) {
const maxBatch = 256 * 1024
var entries []*Entry
for sz := 0; proposal != nil; proposal = proposal.next {
entry := entryPool.Get().(*Entry)
entry.Type, entry.Data = EntryNormal, proposal.data
entries = append(entries, entry)
entries = append(entries, newEntry(EntryNormal, proposal.data))
sz += len(proposal.data)
if sz > maxBatch {
node.ProposeDirect(entries)
node.ProposeMulti(entries)
// We need to re-create `entries` because there is a reference
// to it in the node's pae map.
sz, entries = 0, nil
}
}
if len(entries) > 0 {
node.ProposeDirect(entries)
node.ProposeMulti(entries)
}
return nil
}
Expand Down Expand Up @@ -2627,17 +2625,24 @@ func (o *consumer) infoWithSnapAndReply(snap bool, reply string) *ConsumerInfo {
TimeStamp: time.Now().UTC(),
}

// If we are replicated and we are not the leader we need to pull certain data from our store.
if rg != nil && rg.node != nil && !o.isLeader() && o.store != nil {
// If we are replicated and we are not the leader or we are filtered, we need to pull certain data from our store.
isLeader := o.isLeader()
if rg != nil && rg.node != nil && o.store != nil && (!isLeader || o.isFiltered()) {
state, err := o.store.BorrowState()
if err != nil {
o.mu.Unlock()
return nil
}
info.Delivered.Consumer, info.Delivered.Stream = state.Delivered.Consumer, state.Delivered.Stream
info.AckFloor.Consumer, info.AckFloor.Stream = state.AckFloor.Consumer, state.AckFloor.Stream
info.NumAckPending = len(state.Pending)
info.NumRedelivered = len(state.Redelivered)
if !isLeader {
info.Delivered.Consumer, info.Delivered.Stream = state.Delivered.Consumer, state.Delivered.Stream
info.AckFloor.Consumer, info.AckFloor.Stream = state.AckFloor.Consumer, state.AckFloor.Stream
info.NumAckPending = len(state.Pending)
info.NumRedelivered = len(state.Redelivered)
} else {
// Since we are filtered and we are the leader we could have o.sseq that is skipped ahead.
// To maintain consistency in reporting (e.g. jsz) we take the state for our delivered stream sequence.
info.Delivered.Stream = state.Delivered.Stream
}
}

// Adjust active based on non-zero etc. Also make UTC here.
Expand Down Expand Up @@ -2763,8 +2768,12 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample b
delete(o.pending, sseq)
// Use the original deliver sequence from our pending record.
dseq = p.Sequence

// Only move floors if we matched an existing pending.
if dseq == o.adflr+1 {
if len(o.pending) == 0 {
o.adflr = o.dseq - 1
o.asflr = o.sseq - 1
} else if dseq == o.adflr+1 {
o.adflr, o.asflr = dseq, sseq
for ss := sseq + 1; ss < o.sseq; ss++ {
if p, ok := o.pending[ss]; ok {
Expand All @@ -2775,11 +2784,6 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample b
}
}
}
// If nothing left set consumer to current delivered.
// Do not update stream.
if len(o.pending) == 0 {
o.adflr = o.dseq - 1
}
}
delete(o.rdc, sseq)
o.removeFromRedeliverQueue(sseq)
Expand Down Expand Up @@ -5218,18 +5222,19 @@ func (o *consumer) stopWithFlags(dflag, sdflag, doSignal, advisory bool) error {
// ignoreInterest marks whether the consumer should be ignored when determining interest.
// No lock held on entry.
func (o *consumer) cleanupNoInterestMessages(mset *stream, ignoreInterest bool) {
state := mset.state()
stop := state.LastSeq
o.mu.Lock()
if !o.isLeader() {
o.readStoredState(stop)
o.readStoredState(0)
}
start := o.asflr
o.mu.Unlock()

// Make sure we start at worst with first sequence in the stream.
state := mset.state()
if start < state.FirstSeq {
start = state.FirstSeq
}
stop := state.LastSeq

// Consumer's interests are ignored by default. If we should not ignore interest, unset.
co := o
Expand All @@ -5238,13 +5243,37 @@ func (o *consumer) cleanupNoInterestMessages(mset *stream, ignoreInterest bool)
}

var rmseqs []uint64
mset.mu.Lock()
mset.mu.RLock()

// If over this amount of messages to check, defer to checkInterestState() which
// will do the right thing since we are now removed.
// TODO(dlc) - Better way?
const bailThresh = 100_000

// Check if we would be spending too much time here and defer to separate go routine.
if len(mset.consumers) == 0 {
mset.mu.RUnlock()
mset.mu.Lock()
defer mset.mu.Unlock()
mset.store.Purge()
var state StreamState
mset.store.FastState(&state)
mset.lseq = state.LastSeq
// Also make sure we clear any pending acks.
mset.clearAllPreAcksBelowFloor(state.FirstSeq)
return
} else if stop-start > bailThresh {
mset.mu.RUnlock()
go mset.checkInterestState()
return
}

for seq := start; seq <= stop; seq++ {
if mset.noInterest(seq, co) {
rmseqs = append(rmseqs, seq)
}
}
mset.mu.Unlock()
mset.mu.RUnlock()

// These can be removed.
for _, seq := range rmseqs {
Expand Down
92 changes: 73 additions & 19 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2603,28 +2603,44 @@ func (fs *fileStore) numFilteredPending(filter string, ss *SimpleState) {
return
}

start, stop := uint32(math.MaxUint32), uint32(0)
fs.psim.Match(stringToBytes(filter), func(_ []byte, psi *psi) {
ss.Msgs += psi.total
// Keep track of start and stop indexes for this subject.
if psi.fblk < start {
start = psi.fblk
}
if psi.lblk > stop {
stop = psi.lblk
}
})
// We do need to figure out the first and last sequences.
wc := subjectHasWildcard(filter)
start, stop := uint32(math.MaxUint32), uint32(0)

if wc {
fs.psim.Match(stringToBytes(filter), func(_ []byte, psi *psi) {
ss.Msgs += psi.total
// Keep track of start and stop indexes for this subject.
if psi.fblk < start {
start = psi.fblk
}
if psi.lblk > stop {
stop = psi.lblk
}
})
} else if psi, ok := fs.psim.Find(stringToBytes(filter)); ok {
ss.Msgs += psi.total
start, stop = psi.fblk, psi.lblk
}

// Did not find anything.
if stop == 0 {
ss.First, ss.Last, ss.Msgs = 0, 0, 0
return
}

// Do start
mb := fs.bim[start]
if mb != nil {
_, f, _ := mb.filteredPending(filter, wc, 0)
ss.First = f
}

// Hold this outside loop for psim fblk updates on misses.
i := start + 1
if ss.First == 0 {
// This is a miss. This can happen since psi.fblk is lazy, but should be very rare.
for i := start + 1; i <= stop; i++ {
for ; i <= stop; i++ {
mb := fs.bim[i]
if mb == nil {
continue
Expand All @@ -2635,6 +2651,20 @@ func (fs *fileStore) numFilteredPending(filter string, ss *SimpleState) {
}
}
}
// Update fblk if we missed matching some blocks, meaning fblk was outdated.
if i > start+1 {
if !wc {
if info, ok := fs.psim.Find(stringToBytes(filter)); ok {
info.fblk = i
}
} else {
fs.psim.Match(stringToBytes(filter), func(_ []byte, psi *psi) {
if i > psi.fblk {
psi.fblk = i
}
})
}
}
// Now last
if mb = fs.bim[stop]; mb != nil {
_, _, l := mb.filteredPending(filter, wc, 0)
Expand Down Expand Up @@ -6117,15 +6147,29 @@ func (fs *fileStore) loadLast(subj string, sm *StoreMsg) (lsm *StoreMsg, err err
return nil, ErrStoreMsgNotFound
}

start, stop := fs.lmb.index, fs.blks[0].index
wc := subjectHasWildcard(subj)
var start, stop uint32

// If literal subject check for presence.
if !wc {
if info, ok := fs.psim.Find(stringToBytes(subj)); !ok {
if wc {
start = fs.lmb.index
fs.psim.Match(stringToBytes(subj), func(_ []byte, psi *psi) {
// Keep track of start and stop indexes for this subject.
if psi.fblk < start {
start = psi.fblk
}
if psi.lblk > stop {
stop = psi.lblk
}
})
// None matched.
if stop == 0 {
return nil, ErrStoreMsgNotFound
} else {
start, stop = info.lblk, info.fblk
}
} else if info, ok := fs.psim.Find(stringToBytes(subj)); ok {
start, stop = info.lblk, info.fblk
} else {
return nil, ErrStoreMsgNotFound
}

// Walk blocks backwards.
Expand Down Expand Up @@ -8587,7 +8631,8 @@ func (o *consumerFileStore) UpdateDelivered(dseq, sseq, dc uint64, ts int64) err
// Check for an update to a message already delivered.
if sseq <= o.state.Delivered.Stream {
if p = o.state.Pending[sseq]; p != nil {
p.Sequence, p.Timestamp = dseq, ts
// Do not update p.Sequence, that should be the original delivery sequence.
p.Timestamp = ts
}
} else {
// Add to pending.
Expand Down Expand Up @@ -8645,7 +8690,14 @@ func (o *consumerFileStore) UpdateAcks(dseq, sseq uint64) error {
return nil
}

// Match leader logic on checking if ack is ahead of delivered.
// This could happen on a cooperative takeover with high speed deliveries.
if sseq > o.state.Delivered.Stream {
o.state.Delivered.Stream = sseq + 1
}

if len(o.state.Pending) == 0 || o.state.Pending[sseq] == nil {
delete(o.state.Redelivered, sseq)
return ErrStoreMsgNotFound
}

Expand Down Expand Up @@ -8676,7 +8728,9 @@ func (o *consumerFileStore) UpdateAcks(dseq, sseq uint64) error {
// First delete from our pending state.
if p, ok := o.state.Pending[sseq]; ok {
delete(o.state.Pending, sseq)
dseq = p.Sequence // Use the original.
if dseq > p.Sequence && p.Sequence > 0 {
dseq = p.Sequence // Use the original.
}
}
if len(o.state.Pending) == 0 {
o.state.AckFloor.Consumer = o.state.Delivered.Consumer
Expand Down
Loading

0 comments on commit e8d6600

Please sign in to comment.