Skip to content

Commit

Permalink
Cherry picks for v2.10.17 RC.2 (#5512)
Browse files Browse the repository at this point in the history
  • Loading branch information
wallyqs authored Jun 10, 2024
2 parents 73b2fd0 + 067b6aa commit 005cb53
Show file tree
Hide file tree
Showing 19 changed files with 628 additions and 281 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/nats-io/nkeys v0.4.7
github.com/nats-io/nuid v1.0.1
go.uber.org/automaxprocs v1.5.3
golang.org/x/crypto v0.23.0
golang.org/x/sys v0.20.0
golang.org/x/crypto v0.24.0
golang.org/x/sys v0.21.0
golang.org/x/time v0.5.0
)
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4
github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY=
go.uber.org/automaxprocs v1.5.3 h1:kWazyxZUrS3Gs4qUpbwo5kEIMGe/DAvi5Z4tl2NW4j8=
go.uber.org/automaxprocs v1.5.3/go.mod h1:eRbA25aqJrxAbsLO0xy5jVwPt7FQnRgjW+efnwa1WM0=
golang.org/x/crypto v0.23.0 h1:dIJU/v2J8Mdglj/8rJ6UUOM3Zc9zLZxVZwwxMooUSAI=
golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8=
golang.org/x/crypto v0.24.0 h1:mnl8DM0o513X8fdIkmyFE/5hTYxbwYOjDS/+rK6qpRI=
golang.org/x/crypto v0.24.0/go.mod h1:Z1PMYSOR5nyMcyAVAIQSKCDwalqy85Aqn1x3Ws4L5DM=
golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y=
golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws=
golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=
golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
10 changes: 9 additions & 1 deletion locksordering.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,12 @@ A reloadMu lock was added to prevent newly connecting clients racing with the co
This must be taken out as soon as a reload is about to happen before any other locks:

reloadMu -> Server
reloadMu -> optsMu
reloadMu -> optsMu

The "jscmMu" lock in the Account is used to serialise calls to checkJetStreamMigrate and
clearObserverState so that they cannot interleave which would leave Raft nodes in
inconsistent observer states.

jscmMu -> Account -> jsAccount
jscmMu -> stream.clsMu
jscmMu -> RaftNode
11 changes: 11 additions & 0 deletions server/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ type Account struct {
nameTag string
lastLimErr int64
routePoolIdx int
// Guarantee that only one goroutine can be running either checkJetStreamMigrate
// or clearObserverState at a given time for this account to prevent interleaving.
jscmMu sync.Mutex
}

const (
Expand Down Expand Up @@ -1479,6 +1482,10 @@ func (a *Account) addServiceImportWithClaim(destination *Account, from, to strin
return err
}

if err := a.serviceImportFormsCycle(destination, to); err != nil {
return err
}

_, err := a.addServiceImport(destination, from, to, imClaim)

return err
Expand Down Expand Up @@ -2466,6 +2473,10 @@ func (a *Account) AddMappedStreamImportWithClaim(account *Account, from, to stri
return err
}

if err := a.streamImportFormsCycle(account, from); err != nil {
return err
}

var (
usePub bool
tr *subjectTransform
Expand Down
13 changes: 12 additions & 1 deletion server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -1869,7 +1869,7 @@ func (mb *msgBlock) lastChecksum() []byte {
mb.rbytes = uint64(fi.Size())
}
if mb.rbytes < checksumSize {
return nil
return lchk[:]
}
// Encrypted?
// Check for encryption, we do not load keys on startup anymore so might need to load them here.
Expand Down Expand Up @@ -3340,6 +3340,17 @@ func (mb *msgBlock) skipMsg(seq uint64, now time.Time) {
mb.last.ts = nowts
atomic.StoreUint64(&mb.first.seq, seq+1)
mb.first.ts = nowts
needsRecord = mb == mb.fs.lmb
if needsRecord && mb.rbytes > 0 {
// We want to make sure since we have no messages
// that we write to the beginning since we only need last one.
mb.rbytes, mb.cache = 0, &cache{}
// If encrypted we need to reset counter since we just keep one.
if mb.bek != nil {
// Recreate to reset counter.
mb.bek, _ = genBlockEncryptionKey(mb.fs.fcfg.Cipher, mb.seed, mb.nonce)
}
}
} else {
needsRecord = true
mb.dmap.Insert(seq)
Expand Down
53 changes: 53 additions & 0 deletions server/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6843,6 +6843,59 @@ func TestFileStoreRecoverWithRemovesAndNoIndexDB(t *testing.T) {
require_Equal(t, ss.Msgs, 7)
}

func TestFileStoreReloadAndLoseLastSequence(t *testing.T) {
sd := t.TempDir()
fs, err := newFileStore(
FileStoreConfig{StoreDir: sd},
StreamConfig{Name: "zzz", Subjects: []string{"foo.*"}, Storage: FileStorage})
require_NoError(t, err)
defer fs.Stop()

for i := 0; i < 22; i++ {
fs.SkipMsg()
}

// Restart 5 times.
for i := 0; i < 5; i++ {
fs.Stop()
fs, err = newFileStore(
FileStoreConfig{StoreDir: sd},
StreamConfig{Name: "zzz", Subjects: []string{"foo.*"}, Storage: FileStorage})
require_NoError(t, err)
defer fs.Stop()
var ss StreamState
fs.FastState(&ss)
require_Equal(t, ss.FirstSeq, 23)
require_Equal(t, ss.LastSeq, 22)
}
}

func TestFileStoreReloadAndLoseLastSequenceWithSkipMsgs(t *testing.T) {
sd := t.TempDir()
fs, err := newFileStore(
FileStoreConfig{StoreDir: sd},
StreamConfig{Name: "zzz", Subjects: []string{"foo.*"}, Storage: FileStorage})
require_NoError(t, err)
defer fs.Stop()

// Make sure same works with SkipMsgs which can kick in from delete blocks to replicas.
require_NoError(t, fs.SkipMsgs(0, 22))

// Restart 5 times.
for i := 0; i < 5; i++ {
fs.Stop()
fs, err = newFileStore(
FileStoreConfig{StoreDir: sd},
StreamConfig{Name: "zzz", Subjects: []string{"foo.*"}, Storage: FileStorage})
require_NoError(t, err)
defer fs.Stop()
var ss StreamState
fs.FastState(&ss)
require_Equal(t, ss.FirstSeq, 23)
require_Equal(t, ss.LastSeq, 22)
}
}

///////////////////////////////////////////////////////////////////////////
// Benchmarks
///////////////////////////////////////////////////////////////////////////
Expand Down
71 changes: 56 additions & 15 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,12 +534,18 @@ func (js *jetStream) isStreamHealthy(acc *Account, sa *streamAssignment) bool {
return false
}

// If we are catching up return false.
if mset.isCatchingUp() {
// If R1 we are good.
if node == nil {
return true
}

// Here we are a replicated stream.
// First make sure our monitor routine is running.
if !mset.isMonitorRunning() {
return false
}

if node == nil || node.Healthy() {
if node.Healthy() {
// Check if we are processing a snapshot and are catching up.
if !mset.isCatchingUp() {
return true
Expand All @@ -553,7 +559,6 @@ func (js *jetStream) isStreamHealthy(acc *Account, sa *streamAssignment) bool {
js.restartStream(acc, sa)
}
}

return false
}

Expand Down Expand Up @@ -863,6 +868,8 @@ func (js *jetStream) setupMetaGroup() error {
atomic.StoreInt32(&js.clustered, 1)
c.registerWithAccount(sacc)

// Set to true before we start.
js.metaRecovering = true
js.srv.startGoRoutine(
js.monitorCluster,
pprofLabels{
Expand Down Expand Up @@ -2164,7 +2171,7 @@ func genPeerInfo(peers []string, split int) (newPeers, oldPeers []string, newPee
// Should only be called from monitorStream.
func (mset *stream) waitOnConsumerAssignments() {
mset.mu.RLock()
s, js, acc, sa, name := mset.srv, mset.js, mset.acc, mset.sa, mset.cfg.Name
s, js, acc, sa, name, replicas := mset.srv, mset.js, mset.acc, mset.sa, mset.cfg.Name, mset.cfg.Replicas
mset.mu.RUnlock()

if s == nil || js == nil || acc == nil || sa == nil {
Expand All @@ -2186,6 +2193,9 @@ func (mset *stream) waitOnConsumerAssignments() {
for _, o := range mset.getConsumers() {
// Make sure we are registered with our consumer assignment.
if ca := o.consumerAssignment(); ca != nil {
if replicas > 1 && !o.isMonitorRunning() {
break
}
numReady++
} else {
break
Expand Down Expand Up @@ -2373,7 +2383,8 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
// since we process streams first then consumers as an asset class.
mset.waitOnConsumerAssignments()
// Setup a periodic check here.
cist = time.NewTicker(30 * time.Second)
// We will fire in 5s the first time then back off to 30s
cist = time.NewTicker(5 * time.Second)
cistc = cist.C
}

Expand Down Expand Up @@ -2496,6 +2507,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
}

case <-cistc:
cist.Reset(30 * time.Second)
// We may be adjusting some things with consumers so do this in its own go routine.
go mset.checkInterestState()

Expand Down Expand Up @@ -4424,11 +4436,15 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state
s.sendInternalMsgLocked(consumerAssignmentSubj, _EMPTY_, nil, b)
}
} else {
js.mu.RLock()
node := rg.node
js.mu.RUnlock()

if didCreate {
o.setCreatedTime(ca.Created)
} else {
// Check for scale down to 1..
if rg.node != nil && len(rg.Peers) == 1 {
if node != nil && len(rg.Peers) == 1 {
o.clearNode()
o.setLeader(true)
// Need to clear from rg too.
Expand All @@ -4443,7 +4459,7 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state
}
}

if rg.node == nil {
if node == nil {
// Single replica consumer, process manually here.
js.mu.Lock()
// Force response in case we think this is an update.
Expand Down Expand Up @@ -4913,7 +4929,22 @@ func (js *jetStream) applyConsumerEntries(o *consumer, ce *CommittedEntry, isLea
}
}
// Check our interest state if applicable.
o.checkStateForInterestStream()
if err := o.checkStateForInterestStream(); err == errAckFloorHigherThanLastSeq {
o.mu.RLock()
mset := o.mset
o.mu.RUnlock()
// Register pre-acks unless no state at all for the stream and we would create alot of pre-acks.
mset.mu.Lock()
var ss StreamState
mset.store.FastState(&ss)
// Only register if we have a valid FirstSeq.
if ss.FirstSeq > 0 {
for seq := ss.FirstSeq; seq < state.AckFloor.Stream; seq++ {
mset.registerPreAck(o, seq)
}
}
mset.mu.Unlock()
}
}

} else if e.Type == EntryRemovePeer {
Expand Down Expand Up @@ -7630,7 +7661,10 @@ func (mset *stream) stateSnapshot() []byte {
func (mset *stream) stateSnapshotLocked() []byte {
// Decide if we can support the new style of stream snapshots.
if mset.supportsBinarySnapshotLocked() {
snap, _ := mset.store.EncodedStreamState(mset.getCLFS())
snap, err := mset.store.EncodedStreamState(mset.getCLFS())
if err != nil {
return nil
}
return snap
}

Expand Down Expand Up @@ -8100,8 +8134,11 @@ func (mset *stream) processSnapshot(snap *StreamReplicatedState) (e error) {
var sub *subscription
var err error

const activityInterval = 30 * time.Second
notActive := time.NewTimer(activityInterval)
const (
startInterval = 5 * time.Second
activityInterval = 30 * time.Second
)
notActive := time.NewTimer(startInterval)
defer notActive.Stop()

defer func() {
Expand Down Expand Up @@ -8184,7 +8221,7 @@ RETRY:
default:
}
}
notActive.Reset(activityInterval)
notActive.Reset(startInterval)

// Grab sync request again on failures.
if sreq == nil {
Expand Down Expand Up @@ -8229,8 +8266,10 @@ RETRY:
// Send our sync request.
b, _ := json.Marshal(sreq)
s.sendInternalMsgLocked(subject, reply, nil, b)

// Remember when we sent this out to avoid loop spins on errors below.
reqSendTime := time.Now()

// Clear our sync request.
sreq = nil

Expand Down Expand Up @@ -8779,7 +8818,7 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) {
done = maxOutMsgs-atomic.LoadInt32(&outm) > minBatchWait
if !done {
// Wait for a small bit.
time.Sleep(50 * time.Millisecond)
time.Sleep(100 * time.Millisecond)
} else {
// GC friendly.
mw.Stop()
Expand Down Expand Up @@ -8868,7 +8907,9 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) {
mset.account(), mset.name(), seq, state)
// Try our best to redo our invalidated snapshot as well.
if n := mset.raftNode(); n != nil {
n.InstallSnapshot(mset.stateSnapshot())
if snap := mset.stateSnapshot(); snap != nil {
n.InstallSnapshot(snap)
}
}
// If we allow gap markers check if we have one pending.
if drOk && dr.First > 0 {
Expand Down
Loading

0 comments on commit 005cb53

Please sign in to comment.