Skip to content

Commit

Permalink
Run a check for ack floor drift. (#4086)
Browse files Browse the repository at this point in the history
Also periodically check. If all normal will be very cheap.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Apr 21, 2023
2 parents 01041ca + da9a17f commit f9f4bf5
Show file tree
Hide file tree
Showing 2 changed files with 213 additions and 0 deletions.
71 changes: 71 additions & 0 deletions server/consumer.go
Expand Up @@ -3237,13 +3237,82 @@ func (o *consumer) hbTimer() (time.Duration, *time.Timer) {
return o.cfg.Heartbeat, time.NewTimer(o.cfg.Heartbeat)
}

// Check here for conditions when our ack floor may have drifted below the streams first sequence.
// In general this is accounted for in normal operations, but if the consumer misses the signal from
// the stream it will not clear the message and move the ack state.
// Should only be called from consumer leader.
func (o *consumer) checkAckFloor() {
o.mu.RLock()
mset, closed, asflr := o.mset, o.closed, o.asflr
o.mu.RUnlock()

if closed || mset == nil {
return
}

var ss StreamState
mset.store.FastState(&ss)

// If our floor is equal or greater that is normal and nothing for us to do.
if ss.FirstSeq == 0 || asflr >= ss.FirstSeq-1 {
return
}

// Process all messages that no longer exist.
for seq := asflr + 1; seq < ss.FirstSeq; seq++ {
// Check if this message was pending.
o.mu.RLock()
p, isPending := o.pending[seq]
var rdc uint64 = 1
if o.rdc != nil {
rdc = o.rdc[seq]
}
o.mu.RUnlock()
// If it was pending for us, get rid of it.
if isPending {
o.processTerm(seq, p.Sequence, rdc)
}
}

// Do one final check here.
o.mu.Lock()
defer o.mu.Unlock()

// If we are here, and this should be rare, we still are off with our ack floor.
// We will set it explicitly to 1 behind our current lowest in pending, or if
// pending is empty, to our current delivered -1.
if o.asflr < ss.FirstSeq-1 {
var psseq, pdseq uint64
for seq, p := range o.pending {
if psseq == 0 || seq < psseq {
psseq, pdseq = seq, p.Sequence
}
}
// If we still have none, set to current delivered -1.
if psseq == 0 {
psseq, pdseq = o.sseq-1, o.dseq-1
// If still not adjusted.
if psseq < ss.FirstSeq-1 {
psseq, pdseq = ss.FirstSeq-1, ss.FirstSeq-1
}
}
o.asflr, o.adflr = psseq, pdseq
}
}

func (o *consumer) processInboundAcks(qch chan struct{}) {
// Grab the server lock to watch for server quit.
o.mu.RLock()
s := o.srv
hasInactiveThresh := o.cfg.InactiveThreshold > 0
o.mu.RUnlock()

// We will check this on entry and periodically.
o.checkAckFloor()

// How often we will check for ack floor drift.
var ackFloorCheck = 30 * time.Second

for {
select {
case <-o.ackMsgs.ch:
Expand All @@ -3257,6 +3326,8 @@ func (o *consumer) processInboundAcks(qch chan struct{}) {
if hasInactiveThresh {
o.suppressDeletion()
}
case <-time.After(ackFloorCheck):
o.checkAckFloor()
case <-qch:
return
case <-s.quitCh:
Expand Down
142 changes: 142 additions & 0 deletions server/jetstream_cluster_3_test.go
Expand Up @@ -3511,3 +3511,145 @@ func TestJetStreamNoLeadersDuringLameDuck(t *testing.T) {
}
}
}

// If a consumer has not been registered (possible in heavily loaded systems with lots of assets)
// it could miss the signal of a message going away. If that message was pending and expires the
// ack floor could fall below the stream first sequence. This test will force that condition and
// make sure the system resolves itself.
func TestJetStreamConsumerAckFloorDrift(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"*"},
Replicas: 3,
MaxAge: 200 * time.Millisecond,
MaxMsgs: 10,
})
require_NoError(t, err)

sub, err := js.PullSubscribe("foo", "C")
require_NoError(t, err)

for i := 0; i < 10; i++ {
sendStreamMsg(t, nc, "foo", "HELLO")
}

// No-op but will surface as delivered.
_, err = sub.Fetch(10)
require_NoError(t, err)

// We will grab the state with delivered being 10 and ackfloor being 0 directly.
cl := c.consumerLeader(globalAccountName, "TEST", "C")
require_NotNil(t, cl)

mset, err := cl.GlobalAccount().lookupStream("TEST")
require_NoError(t, err)
o := mset.lookupConsumer("C")
require_NotNil(t, o)
o.mu.RLock()
state, err := o.store.State()
o.mu.RUnlock()
require_NoError(t, err)
require_NotNil(t, state)

// Now let messages expire.
checkFor(t, time.Second, 100*time.Millisecond, func() error {
si, err := js.StreamInfo("TEST")
require_NoError(t, err)
if si.State.Msgs == 0 {
return nil
}
return fmt.Errorf("stream still has msgs")
})

// Set state to ackfloor of 5 and no pending.
state.AckFloor.Consumer = 5
state.AckFloor.Stream = 5
state.Pending = nil

// Now put back the state underneath of the consumers.
for _, s := range c.servers {
mset, err := s.GlobalAccount().lookupStream("TEST")
require_NoError(t, err)
o := mset.lookupConsumer("C")
require_NotNil(t, o)
o.mu.Lock()
err = o.setStoreState(state)
cfs := o.store.(*consumerFileStore)
o.mu.Unlock()
require_NoError(t, err)
// The lower layer will ignore, so set more directly.
cfs.mu.Lock()
cfs.state = *state
cfs.mu.Unlock()
// Also snapshot to remove any raft entries that could affect it.
snap, err := o.store.EncodedState()
require_NoError(t, err)
require_NoError(t, o.raftNode().InstallSnapshot(snap))
}

cl.JetStreamStepdownConsumer(globalAccountName, "TEST", "C")
c.waitOnConsumerLeader(globalAccountName, "TEST", "C")

checkFor(t, 5*time.Second, 100*time.Millisecond, func() error {
ci, err := js.ConsumerInfo("TEST", "C")
require_NoError(t, err)
// Make sure we catch this and adjust.
if ci.AckFloor.Stream == 10 && ci.AckFloor.Consumer == 10 {
return nil
}
return fmt.Errorf("AckFloor not correct, expected 10, got %+v", ci.AckFloor)
})
}

func TestJetStreamClusterInterestStreamFilteredConsumersWithNoInterest(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R5S", 5)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"*"},
Retention: nats.InterestPolicy,
Replicas: 3,
})
require_NoError(t, err)

// Create three subscribers.
ackCb := func(m *nats.Msg) { m.Ack() }

_, err = js.Subscribe("foo", ackCb, nats.BindStream("TEST"), nats.ManualAck())
require_NoError(t, err)

_, err = js.Subscribe("bar", ackCb, nats.BindStream("TEST"), nats.ManualAck())
require_NoError(t, err)

_, err = js.Subscribe("baz", ackCb, nats.BindStream("TEST"), nats.ManualAck())
require_NoError(t, err)

// Now send 100 messages, randomly picking foo or bar, but never baz.
for i := 0; i < 100; i++ {
if rand.Intn(2) > 0 {
sendStreamMsg(t, nc, "foo", "HELLO")
} else {
sendStreamMsg(t, nc, "bar", "WORLD")
}
}

// Messages are expected to go to 0.
checkFor(t, time.Second, 100*time.Millisecond, func() error {
si, err := js.StreamInfo("TEST")
require_NoError(t, err)
if si.State.Msgs == 0 {
return nil
}
return fmt.Errorf("stream still has msgs")
})
}

0 comments on commit f9f4bf5

Please sign in to comment.