Skip to content

Commit

Permalink
Add support for pausing consumers (#5066)
Browse files Browse the repository at this point in the history
This adds a new `pause_until` configuration option for pausing
consumers.

It can either be set when the consumer is created (but not via a
standard consumer update) or it can be changed later by using the new
`$JS.API.CONSUMER.PAUSE.*.*` API endpoint by sending:
```
{"pause_until": "2024-02-08T19:00:00Z"}
```

Any time that is in the past, or a zero timestamp, is considered as
"unpaused". Once the consumer reaches the `pause_until` time, messages
will start flowing again automatically.

The consumer info will additionally include `paused` (type `bool`) and,
if paused, a `pause_remaining` (type `time.Duration`) to report the
pause status.

Also adds `$JS.EVENT.ADVISORY.CONSUMER.PAUSE.*.*` advisory messages
which are sent when pausing and unpausing (i.e. reaching the pause
deadline).

Idle heartbeats continue to be sent while the consumer is paused to
satisfy liveness checks.

**Before merge:** nats-io/nats.go#1554 and `go.mod` updated.

Signed-off-by: Neil Twigg <neil@nats.io>

---------

Signed-off-by: Neil Twigg <neil@nats.io>
  • Loading branch information
neilalexander committed Feb 26, 2024
1 parent 5f6595b commit bb5a959
Show file tree
Hide file tree
Showing 8 changed files with 1,125 additions and 4 deletions.
110 changes: 106 additions & 4 deletions server/consumer.go
Expand Up @@ -55,6 +55,8 @@ type ConsumerInfo struct {
NumPending uint64 `json:"num_pending"`
Cluster *ClusterInfo `json:"cluster,omitempty"`
PushBound bool `json:"push_bound,omitempty"`
Paused bool `json:"paused,omitempty"`
PauseRemaining time.Duration `json:"pause_remaining,omitempty"`
// TimeStamp indicates when the info was gathered
TimeStamp time.Time `json:"ts"`
}
Expand Down Expand Up @@ -104,6 +106,9 @@ type ConsumerConfig struct {

// Metadata is additional metadata for the Consumer.
Metadata map[string]string `json:"metadata,omitempty"`

// PauseUntil is for suspending the consumer until the deadline.
PauseUntil *time.Time `json:"pause_until,omitempty"`
}

// SequenceInfo has both the consumer and the stream sequence and last activity.
Expand Down Expand Up @@ -352,11 +357,12 @@ type consumer struct {
active bool
replay bool
dtmr *time.Timer
uptmr *time.Timer // Unpause timer
gwdtmr *time.Timer
dthresh time.Duration
mch chan struct{}
qch chan struct{}
inch chan bool
mch chan struct{} // Message channel
qch chan struct{} // Quit channel
inch chan bool // Interest change channel
sfreq int32
ackEventT string
nakEventT string
Expand Down Expand Up @@ -1072,6 +1078,34 @@ func (o *consumer) updateInactiveThreshold(cfg *ConsumerConfig) {
}
}

// Updates the paused state. If we are the leader and the pause deadline
// hasn't passed yet then we will start a timer to kick the consumer once
// that deadline is reached. Lock should be held.
func (o *consumer) updatePauseState(cfg *ConsumerConfig) {
if o.uptmr != nil {
stopAndClearTimer(&o.uptmr)
}
if !o.isLeader() {
// Only the leader will run the timer as only the leader will run
// loopAndGatherMsgs.
return
}
if cfg.PauseUntil == nil || cfg.PauseUntil.IsZero() || cfg.PauseUntil.Before(time.Now()) {
// Either the PauseUntil is unset (is effectively zero) or the
// deadline has already passed, in which case there is nothing
// to do.
return
}
o.uptmr = time.AfterFunc(time.Until(*cfg.PauseUntil), func() {
o.mu.Lock()
defer o.mu.Unlock()

stopAndClearTimer(&o.uptmr)
o.sendPauseAdvisoryLocked(&o.cfg)
o.signalNewMessages()
})
}

func (o *consumer) consumerAssignment() *consumerAssignment {
o.mu.RLock()
defer o.mu.RUnlock()
Expand Down Expand Up @@ -1265,6 +1299,9 @@ func (o *consumer) setLeader(isLeader bool) {
o.dtmr = time.AfterFunc(o.dthresh, o.deleteNotActive)
}

// Update the consumer pause tracking.
o.updatePauseState(&o.cfg)

// If we are not in ReplayInstant mode mark us as in replay state until resolved.
if o.cfg.ReplayPolicy != ReplayInstant {
o.replay = true
Expand Down Expand Up @@ -1332,7 +1369,8 @@ func (o *consumer) setLeader(isLeader bool) {
}
// Stop any inactivity timers. Should only be running on leaders.
stopAndClearTimer(&o.dtmr)

// Stop any unpause timers. Should only be running on leaders.
stopAndClearTimer(&o.uptmr)
// Make sure to clear out any re-deliver queues
stopAndClearTimer(&o.ptmr)
o.rdq = nil
Expand Down Expand Up @@ -1449,6 +1487,32 @@ func (o *consumer) sendCreateAdvisory() {
o.sendAdvisory(subj, j)
}

func (o *consumer) sendPauseAdvisoryLocked(cfg *ConsumerConfig) {
e := JSConsumerPauseAdvisory{
TypedEvent: TypedEvent{
Type: JSConsumerPauseAdvisoryType,
ID: nuid.Next(),
Time: time.Now().UTC(),
},
Stream: o.stream,
Consumer: o.name,
Domain: o.srv.getOpts().JetStreamDomain,
}

if cfg.PauseUntil != nil {
e.PauseUntil = *cfg.PauseUntil
e.Paused = time.Now().Before(e.PauseUntil)
}

j, err := json.Marshal(e)
if err != nil {
return
}

subj := JSAdvisoryConsumerPausePre + "." + o.stream + "." + o.name
o.sendAdvisory(subj, j)
}

// Created returns created time.
func (o *consumer) createdTime() time.Time {
o.mu.Lock()
Expand Down Expand Up @@ -1812,6 +1876,12 @@ func (o *consumer) updateConfig(cfg *ConsumerConfig) error {
return err
}

// Make sure we always store PauseUntil in UTC.
if cfg.PauseUntil != nil {
utc := (*cfg.PauseUntil).UTC()
cfg.PauseUntil = &utc
}

if o.store != nil {
// Update local state always.
if err := o.store.UpdateConfig(cfg); err != nil {
Expand Down Expand Up @@ -1860,6 +1930,22 @@ func (o *consumer) updateConfig(cfg *ConsumerConfig) error {
o.dtmr = time.AfterFunc(o.dthresh, o.deleteNotActive)
}
}
// Check whether the pause has changed
{
var old, new time.Time
if o.cfg.PauseUntil != nil {
old = *o.cfg.PauseUntil
}
if cfg.PauseUntil != nil {
new = *cfg.PauseUntil
}
if !old.Equal(new) {
o.updatePauseState(cfg)
if o.isLeader() {
o.sendPauseAdvisoryLocked(cfg)
}
}
}

// Check for Subject Filters update.
newSubjects := gatherSubjectFilters(cfg.FilterSubject, cfg.FilterSubjects)
Expand Down Expand Up @@ -2574,6 +2660,12 @@ func (o *consumer) infoWithSnapAndReply(snap bool, reply string) *ConsumerInfo {
PushBound: o.isPushMode() && o.active,
TimeStamp: time.Now().UTC(),
}
if o.cfg.PauseUntil != nil {
p := *o.cfg.PauseUntil
if info.Paused = time.Now().Before(p); info.Paused {
info.PauseRemaining = time.Until(p)
}
}

// If we are replicated and we are not the leader we need to pull certain data from our store.
if rg != nil && rg.node != nil && !o.isLeader() && o.store != nil {
Expand Down Expand Up @@ -3841,6 +3933,8 @@ func (o *consumer) suppressDeletion() {
}
}

// loopAndGatherMsgs waits for messages for the consumer. qch is the quit channel,
// upch is the unpause channel which fires when the PauseUntil deadline is reached.
func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {
// On startup check to see if we are in a reply situation where replay policy is not instant.
var (
Expand Down Expand Up @@ -3907,6 +4001,13 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {
// Clear last error.
err = nil

// If the consumer is paused then stop sending.
if o.cfg.PauseUntil != nil && !o.cfg.PauseUntil.IsZero() && time.Now().Before(*o.cfg.PauseUntil) {
// If the consumer is paused and we haven't reached the deadline yet then
// go back to waiting.
goto waitForMsgs
}

// If we are in push mode and not active or under flowcontrol let's stop sending.
if o.isPushMode() {
if !o.active || (o.maxpb > 0 && o.pbytes > o.maxpb) {
Expand Down Expand Up @@ -5220,6 +5321,7 @@ func (o *consumer) switchToEphemeral() {
rr := o.acc.sl.Match(o.cfg.DeliverSubject)
// Setup dthresh.
o.updateInactiveThreshold(&o.cfg)
o.updatePauseState(&o.cfg)
o.mu.Unlock()

// Update interest
Expand Down

0 comments on commit bb5a959

Please sign in to comment.