Skip to content

Commit

Permalink
This adds a new pause_until configuration option for pausing consum…
Browse files Browse the repository at this point in the history
…ers.

It can either be set when the consumer is created (but not via a
standard consumer update) or it can be changed later by using the new
`$JS.API.CONSUMER.PAUSE.*.*` API endpoint by sending:
```
{"pause_until": "2024-02-08T19:00:00Z"}
```

Any time that is in the past, or a zero timestamp, is considered as
"unpaused". Once the consumer reaches the `pause_until` time, messages
will start flowing again automatically.

The consumer info will additionally include `paused` (type `bool`) and,
if paused, a `pause_remaining` (type `time.Duration`) to report the pause
status.

Also adds `$JS.EVENT.ADVISORY.CONSUMER.PAUSE.*.*` advisory messages which
are sent when pausing and unpausing (i.e. reaching the pause deadline).

Idle heartbeats continue to be sent while the consumer is paused to
satisfy liveness checks.

Signed-off-by: Neil Twigg <[email protected]>
  • Loading branch information
neilalexander committed Feb 14, 2024
1 parent bd7ef64 commit c79c539
Show file tree
Hide file tree
Showing 8 changed files with 841 additions and 7 deletions.
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ module github.com/nats-io/nats-server/v2

go 1.20

replace github.com/nats-io/nats.go => github.com/nats-io/nats.go v1.32.1-0.20240208172212-5c945107cb81

require (
github.com/klauspost/compress v1.17.6
github.com/minio/highwayhash v1.0.2
github.com/nats-io/jwt/v2 v2.5.3
github.com/nats-io/nats.go v1.32.0
github.com/nats-io/nats.go v1.32.1-0.20240212165827-a448ed8b765f
github.com/nats-io/nkeys v0.4.7
github.com/nats-io/nuid v1.0.1
go.uber.org/automaxprocs v1.5.3
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/nats-io/jwt/v2 v2.5.3 h1:/9SWvzc6hTfamcgXJ3uYRpgj+QuY2aLNqRiqrKcrpEo=
github.com/nats-io/jwt/v2 v2.5.3/go.mod h1:iysuPemFcc7p4IoYots3IuELSI4EDe9Y0bQMe+I3Bf4=
github.com/nats-io/nats.go v1.32.0 h1:Bx9BZS+aXYlxW08k8Gd3yR2s73pV5XSoAQUyp1Kwvp0=
github.com/nats-io/nats.go v1.32.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
github.com/nats-io/nats.go v1.32.1-0.20240208172212-5c945107cb81 h1:YssbQnYFHCqWevnA3EqR4VsJjHnxLOkLgHPBPjSOCOM=
github.com/nats-io/nats.go v1.32.1-0.20240208172212-5c945107cb81/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
Expand Down
94 changes: 90 additions & 4 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ type ConsumerInfo struct {
NumPending uint64 `json:"num_pending"`
Cluster *ClusterInfo `json:"cluster,omitempty"`
PushBound bool `json:"push_bound,omitempty"`
Paused bool `json:"paused,omitempty"`
PauseRemaining time.Duration `json:"pause_remaining,omitempty"`
// TimeStamp indicates when the info was gathered
TimeStamp time.Time `json:"ts"`
}
Expand Down Expand Up @@ -104,6 +106,9 @@ type ConsumerConfig struct {

// Metadata is additional metadata for the Consumer.
Metadata map[string]string `json:"metadata,omitempty"`

// PauseUntil is for suspending the consumer until the deadline.
PauseUntil time.Time `json:"pause_until,omitempty"`
}

// SequenceInfo has both the consumer and the stream sequence and last activity.
Expand Down Expand Up @@ -352,11 +357,12 @@ type consumer struct {
active bool
replay bool
dtmr *time.Timer
uptmr *time.Timer // Unpause timer
gwdtmr *time.Timer
dthresh time.Duration
mch chan struct{}
qch chan struct{}
inch chan bool
mch chan struct{} // Message channel
qch chan struct{} // Quit channel
inch chan bool // Interest change channel
sfreq int32
ackEventT string
nakEventT string
Expand Down Expand Up @@ -1002,6 +1008,10 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
// Check/update the inactive threshold
o.updateInactiveThreshold(&o.cfg)

// Check/update the pause state
o.updatePauseState(&o.cfg)
o.sendPauseAdvisoryLocked(&o.cfg)

if o.isPushMode() {
// Check if we are running only 1 replica and that the delivery subject has interest.
// Check in place here for interest. Will setup properly in setLeader.
Expand Down Expand Up @@ -1072,6 +1082,34 @@ func (o *consumer) updateInactiveThreshold(cfg *ConsumerConfig) {
}
}

// Updates the paused state. If we are the leader and the pause deadline
// hasn't passed yet then we will start a timer to kick the consumer once
// that deadline is reached. Lock should be held.
func (o *consumer) updatePauseState(cfg *ConsumerConfig) {
if o.uptmr != nil {
stopAndClearTimer(&o.uptmr)
}
if !o.isLeader() {
// Only the leader will run the timer as only the leader will run
// loopAndGatherMsgs.
return
}
if cfg.PauseUntil.Before(time.Now()) {
// Either the PauseUntil is unset (is effectively zero) or the
// deadline has already passed, in which case there is nothing
// to do.
return
}
o.uptmr = time.AfterFunc(time.Until(cfg.PauseUntil), func() {
o.mu.Lock()
defer o.mu.Unlock()

stopAndClearTimer(&o.uptmr)
o.signalNewMessages()
o.sendPauseAdvisoryLocked(&o.cfg)
})
}

func (o *consumer) consumerAssignment() *consumerAssignment {
o.mu.RLock()
defer o.mu.RUnlock()
Expand Down Expand Up @@ -1265,6 +1303,9 @@ func (o *consumer) setLeader(isLeader bool) {
o.dtmr = time.AfterFunc(o.dthresh, o.deleteNotActive)
}

// Update the consumer pause tracking.
o.updatePauseState(&o.cfg)

// If we are not in ReplayInstant mode mark us as in replay state until resolved.
if o.cfg.ReplayPolicy != ReplayInstant {
o.replay = true
Expand Down Expand Up @@ -1332,7 +1373,8 @@ func (o *consumer) setLeader(isLeader bool) {
}
// Stop any inactivity timers. Should only be running on leaders.
stopAndClearTimer(&o.dtmr)

// Stop any unpause timers. Should only be running on leaders.
stopAndClearTimer(&o.uptmr)
// Make sure to clear out any re-deliver queues
stopAndClearTimer(&o.ptmr)
o.rdq = nil
Expand Down Expand Up @@ -1449,6 +1491,29 @@ func (o *consumer) sendCreateAdvisory() {
o.sendAdvisory(subj, j)
}

func (o *consumer) sendPauseAdvisoryLocked(cfg *ConsumerConfig) {
e := JSConsumerPauseAdvisory{
TypedEvent: TypedEvent{
Type: JSConsumerPauseAdvisoryType,
ID: nuid.Next(),
Time: time.Now().UTC(),
},
Stream: o.stream,
Consumer: o.name,
Paused: time.Now().Before(cfg.PauseUntil),
PauseUntil: cfg.PauseUntil,
Domain: o.srv.getOpts().JetStreamDomain,
}

j, err := json.Marshal(e)
if err != nil {
return
}

subj := JSAdvisoryConsumerPausePre + "." + o.stream + "." + o.name
o.sendAdvisory(subj, j)
}

// Created returns created time.
func (o *consumer) createdTime() time.Time {
o.mu.Lock()
Expand Down Expand Up @@ -1812,6 +1877,9 @@ func (o *consumer) updateConfig(cfg *ConsumerConfig) error {
return err
}

// Make sure we always store PauseUntil in UTC.
cfg.PauseUntil = cfg.PauseUntil.UTC()

if o.store != nil {
// Update local state always.
if err := o.store.UpdateConfig(cfg); err != nil {
Expand Down Expand Up @@ -1860,6 +1928,10 @@ func (o *consumer) updateConfig(cfg *ConsumerConfig) error {
o.dtmr = time.AfterFunc(o.dthresh, o.deleteNotActive)
}
}
if !cfg.PauseUntil.Equal(o.cfg.PauseUntil) {
o.updatePauseState(cfg)
o.sendPauseAdvisoryLocked(cfg)
}

// Check for Subject Filters update.
newSubjects := gatherSubjectFilters(cfg.FilterSubject, cfg.FilterSubjects)
Expand Down Expand Up @@ -2573,6 +2645,10 @@ func (o *consumer) infoWithSnapAndReply(snap bool, reply string) *ConsumerInfo {
NumPending: o.checkNumPending(),
PushBound: o.isPushMode() && o.active,
TimeStamp: time.Now().UTC(),
Paused: time.Now().Before(o.cfg.PauseUntil),
}
if info.Paused {
info.PauseRemaining = time.Until(o.cfg.PauseUntil)
}

// If we are replicated and we are not the leader we need to pull certain data from our store.
Expand Down Expand Up @@ -3841,6 +3917,8 @@ func (o *consumer) suppressDeletion() {
}
}

// loopAndGatherMsgs waits for messages for the consumer. qch is the quit channel,
// upch is the unpause channel which fires when the PauseUntil deadline is reached.
func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {
// On startup check to see if we are in a reply situation where replay policy is not instant.
var (
Expand Down Expand Up @@ -3907,6 +3985,13 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {
// Clear last error.
err = nil

// If the consumer is paused then stop sending.
if time.Now().Before(o.cfg.PauseUntil) {
// If the consumer is paused and we haven't reached the deadline yet then
// go back to waiting.
goto waitForMsgs
}

// If we are in push mode and not active or under flowcontrol let's stop sending.
if o.isPushMode() {
if !o.active || (o.maxpb > 0 && o.pbytes > o.maxpb) {
Expand Down Expand Up @@ -5203,6 +5288,7 @@ func (o *consumer) switchToEphemeral() {
rr := o.acc.sl.Match(o.cfg.DeliverSubject)
// Setup dthresh.
o.updateInactiveThreshold(&o.cfg)
o.updatePauseState(&o.cfg)
o.mu.Unlock()

// Update interest
Expand Down
Loading

0 comments on commit c79c539

Please sign in to comment.