Skip to content

Commit

Permalink
Make health checks more consistent with stream health checks.
Browse files Browse the repository at this point in the history
Check for closed state on leader change for consumers.

Signed-off-by: Derek Collison <[email protected]>
  • Loading branch information
derekcollison committed May 18, 2023
1 parent f63d63f commit 7e3f3f4
Showing 1 changed file with 47 additions and 19 deletions.
66 changes: 47 additions & 19 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,10 +526,14 @@ func (js *jetStream) isStreamHealthy(acc *Account, sa *streamAssignment) bool {
if !mset.isCatchingUp() {
return true
}
} else if node != nil && node != mset.raftNode() {
s.Warnf("Detected stream cluster node skew '%s > %s'", acc.GetName(), streamName)
node.Delete()
mset.resetClusteredState(nil)
} else if node != nil {
if node != mset.raftNode() {
s.Warnf("Detected stream cluster node skew '%s > %s'", acc.GetName(), streamName)
node.Delete()
mset.resetClusteredState(nil)
} else if node.State() == Closed {
js.restartStream(acc, sa)
}
}

return false
Expand All @@ -541,44 +545,65 @@ func (js *jetStream) isConsumerHealthy(mset *stream, consumer string, ca *consum
if mset == nil {
return false
}

js.mu.RLock()
cc := js.cluster
js.mu.RUnlock()

if cc == nil {
// Non-clustered mode
js.mu.RUnlock()
return true
}
// These are required.
if ca == nil || ca.Group == nil {
js.mu.RUnlock()
return false
}
s := js.srv
js.mu.RUnlock()

// Capture RAFT node from assignment.
node := ca.Group.node

// When we try to restart we nil out the node if applicable
// and reprocess the consumer assignment.
restartConsumer := func() {
js.mu.Lock()
if ca.Group != nil {
// Make sure the node is stopped if still running.
if node := ca.Group.node; node != nil && node.State() != Closed {
node.Stop()
}
ca.Group.node = nil
// Make sure the node is stopped if still running.
if node != nil && node.State() != Closed {
node.Stop()
}
ca.Group.node = nil
deleted := ca.deleted
js.mu.Unlock()
if !deleted {
js.processConsumerAssignment(ca)
}
}

// Check if not running at all.
o := mset.lookupConsumer(consumer)
if o == nil {
restartConsumer()
return false
}
if node := o.raftNode(); node == nil || node.Healthy() {

// Check RAFT node state.
if node == nil || node.Healthy() {
return true
} else if node != nil && node.State() == Closed {
// We have a consumer, and it should have a running node but it is closed.
o.stop()
restartConsumer()
} else if node != nil {
if node != o.raftNode() {
mset.mu.RLock()
accName, streamName := mset.acc.GetName(), mset.cfg.Name
mset.mu.RUnlock()
s.Warnf("Detected consumer cluster node skew '%s > %s'", accName, streamName, consumer)
node.Delete()
o.deleteWithoutAdvisory()
restartConsumer()
} else if node.State() == Closed {
// We have a consumer, and it should have a running node but it is closed.
o.stop()
restartConsumer()
}
}
return false
}
Expand Down Expand Up @@ -4750,18 +4775,21 @@ func (js *jetStream) processConsumerLeaderChange(o *consumer, isLeader bool) err
return errors.New("failed to update consumer leader status")
}

if o == nil || o.isClosed() {
return stepDownIfLeader()
}

ca := o.consumerAssignment()
if ca == nil {
return stepDownIfLeader()
}
js.mu.Lock()
s, account, err := js.srv, ca.Client.serviceAccount(), ca.err
client, subject, reply := ca.Client, ca.Subject, ca.Reply
client, subject, reply, streamName, consumerName := ca.Client, ca.Subject, ca.Reply, ca.Stream, ca.Name
hasResponded := ca.responded
ca.responded = true
js.mu.Unlock()

streamName, consumerName := o.streamName(), o.String()
acc, _ := s.LookupAccount(account)
if acc == nil {
return stepDownIfLeader()
Expand Down

0 comments on commit 7e3f3f4

Please sign in to comment.