Skip to content

Commit

Permalink
[IMPROVED] Make health checks more consistent with stream health chec…
Browse files Browse the repository at this point in the history
…ks. (#4180)

Signed-off-by: Derek Collison <[email protected]>
  • Loading branch information
derekcollison authored May 18, 2023
2 parents f63d63f + 7e3f3f4 commit 25d9762
Showing 1 changed file with 47 additions and 19 deletions.
66 changes: 47 additions & 19 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,10 +526,14 @@ func (js *jetStream) isStreamHealthy(acc *Account, sa *streamAssignment) bool {
if !mset.isCatchingUp() {
return true
}
} else if node != nil && node != mset.raftNode() {
s.Warnf("Detected stream cluster node skew '%s > %s'", acc.GetName(), streamName)
node.Delete()
mset.resetClusteredState(nil)
} else if node != nil {
if node != mset.raftNode() {
s.Warnf("Detected stream cluster node skew '%s > %s'", acc.GetName(), streamName)
node.Delete()
mset.resetClusteredState(nil)
} else if node.State() == Closed {
js.restartStream(acc, sa)
}
}

return false
Expand All @@ -541,44 +545,65 @@ func (js *jetStream) isConsumerHealthy(mset *stream, consumer string, ca *consum
if mset == nil {
return false
}

js.mu.RLock()
cc := js.cluster
js.mu.RUnlock()

if cc == nil {
// Non-clustered mode
js.mu.RUnlock()
return true
}
// These are required.
if ca == nil || ca.Group == nil {
js.mu.RUnlock()
return false
}
s := js.srv
js.mu.RUnlock()

// Capture RAFT node from assignment.
node := ca.Group.node

// When we try to restart we nil out the node if applicable
// and reprocess the consumer assignment.
restartConsumer := func() {
js.mu.Lock()
if ca.Group != nil {
// Make sure the node is stopped if still running.
if node := ca.Group.node; node != nil && node.State() != Closed {
node.Stop()
}
ca.Group.node = nil
// Make sure the node is stopped if still running.
if node != nil && node.State() != Closed {
node.Stop()
}
ca.Group.node = nil
deleted := ca.deleted
js.mu.Unlock()
if !deleted {
js.processConsumerAssignment(ca)
}
}

// Check if not running at all.
o := mset.lookupConsumer(consumer)
if o == nil {
restartConsumer()
return false
}
if node := o.raftNode(); node == nil || node.Healthy() {

// Check RAFT node state.
if node == nil || node.Healthy() {
return true
} else if node != nil && node.State() == Closed {
// We have a consumer, and it should have a running node but it is closed.
o.stop()
restartConsumer()
} else if node != nil {
if node != o.raftNode() {
mset.mu.RLock()
accName, streamName := mset.acc.GetName(), mset.cfg.Name
mset.mu.RUnlock()
s.Warnf("Detected consumer cluster node skew '%s > %s'", accName, streamName, consumer)
node.Delete()
o.deleteWithoutAdvisory()
restartConsumer()
} else if node.State() == Closed {
// We have a consumer, and it should have a running node but it is closed.
o.stop()
restartConsumer()
}
}
return false
}
Expand Down Expand Up @@ -4750,18 +4775,21 @@ func (js *jetStream) processConsumerLeaderChange(o *consumer, isLeader bool) err
return errors.New("failed to update consumer leader status")
}

if o == nil || o.isClosed() {
return stepDownIfLeader()
}

ca := o.consumerAssignment()
if ca == nil {
return stepDownIfLeader()
}
js.mu.Lock()
s, account, err := js.srv, ca.Client.serviceAccount(), ca.err
client, subject, reply := ca.Client, ca.Subject, ca.Reply
client, subject, reply, streamName, consumerName := ca.Client, ca.Subject, ca.Reply, ca.Stream, ca.Name
hasResponded := ca.responded
ca.responded = true
js.mu.Unlock()

streamName, consumerName := o.streamName(), o.String()
acc, _ := s.LookupAccount(account)
if acc == nil {
return stepDownIfLeader()
Expand Down

0 comments on commit 25d9762

Please sign in to comment.