Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run a check for ack floor drift. #4086

Merged
merged 2 commits into from Apr 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
71 changes: 71 additions & 0 deletions server/consumer.go
Expand Up @@ -3237,13 +3237,82 @@ func (o *consumer) hbTimer() (time.Duration, *time.Timer) {
return o.cfg.Heartbeat, time.NewTimer(o.cfg.Heartbeat)
}

// Check here for conditions when our ack floor may have drifted below the streams first sequence.
// In general this is accounted for in normal operations, but if the consumer misses the signal from
// the stream it will not clear the message and move the ack state.
// Should only be called from consumer leader.
func (o *consumer) checkAckFloor() {
o.mu.RLock()
mset, closed, asflr := o.mset, o.closed, o.asflr
o.mu.RUnlock()

if closed || mset == nil {
return
}

var ss StreamState
mset.store.FastState(&ss)

// If our floor is equal or greater that is normal and nothing for us to do.
if ss.FirstSeq == 0 || asflr >= ss.FirstSeq-1 {
return
}

// Process all messages that no longer exist.
for seq := asflr + 1; seq < ss.FirstSeq; seq++ {
// Check if this message was pending.
o.mu.RLock()
p, isPending := o.pending[seq]
var rdc uint64 = 1
if o.rdc != nil {
rdc = o.rdc[seq]
}
o.mu.RUnlock()
// If it was pending for us, get rid of it.
if isPending {
o.processTerm(seq, p.Sequence, rdc)
}
}

// Do one final check here.
o.mu.Lock()
defer o.mu.Unlock()

// If we are here, and this should be rare, we still are off with our ack floor.
// We will set it explicitly to 1 behind our current lowest in pending, or if
// pending is empty, to our current delivered -1.
if o.asflr < ss.FirstSeq-1 {
var psseq, pdseq uint64
for seq, p := range o.pending {
if psseq == 0 || seq < psseq {
psseq, pdseq = seq, p.Sequence
}
}
// If we still have none, set to current delivered -1.
if psseq == 0 {
psseq, pdseq = o.sseq-1, o.dseq-1
// If still not adjusted.
if psseq < ss.FirstSeq-1 {
psseq, pdseq = ss.FirstSeq-1, ss.FirstSeq-1
}
}
o.asflr, o.adflr = psseq, pdseq
}
}

func (o *consumer) processInboundAcks(qch chan struct{}) {
// Grab the server lock to watch for server quit.
o.mu.RLock()
s := o.srv
hasInactiveThresh := o.cfg.InactiveThreshold > 0
o.mu.RUnlock()

// We will check this on entry and periodically.
o.checkAckFloor()

// How often we will check for ack floor drift.
var ackFloorCheck = 30 * time.Second

for {
select {
case <-o.ackMsgs.ch:
Expand All @@ -3257,6 +3326,8 @@ func (o *consumer) processInboundAcks(qch chan struct{}) {
if hasInactiveThresh {
o.suppressDeletion()
}
case <-time.After(ackFloorCheck):
o.checkAckFloor()
case <-qch:
return
case <-s.quitCh:
Expand Down
142 changes: 142 additions & 0 deletions server/jetstream_cluster_3_test.go
Expand Up @@ -3511,3 +3511,145 @@ func TestJetStreamNoLeadersDuringLameDuck(t *testing.T) {
}
}
}

// If a consumer has not been registered (possible in heavily loaded systems with lots of assets)
// it could miss the signal of a message going away. If that message was pending and expires the
// ack floor could fall below the stream first sequence. This test will force that condition and
// make sure the system resolves itself.
func TestJetStreamConsumerAckFloorDrift(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"*"},
Replicas: 3,
MaxAge: 200 * time.Millisecond,
MaxMsgs: 10,
})
require_NoError(t, err)

sub, err := js.PullSubscribe("foo", "C")
require_NoError(t, err)

for i := 0; i < 10; i++ {
sendStreamMsg(t, nc, "foo", "HELLO")
}

// No-op but will surface as delivered.
_, err = sub.Fetch(10)
require_NoError(t, err)

// We will grab the state with delivered being 10 and ackfloor being 0 directly.
cl := c.consumerLeader(globalAccountName, "TEST", "C")
require_NotNil(t, cl)

mset, err := cl.GlobalAccount().lookupStream("TEST")
require_NoError(t, err)
o := mset.lookupConsumer("C")
require_NotNil(t, o)
o.mu.RLock()
state, err := o.store.State()
o.mu.RUnlock()
require_NoError(t, err)
require_NotNil(t, state)

// Now let messages expire.
checkFor(t, time.Second, 100*time.Millisecond, func() error {
si, err := js.StreamInfo("TEST")
require_NoError(t, err)
if si.State.Msgs == 0 {
return nil
}
return fmt.Errorf("stream still has msgs")
})

// Set state to ackfloor of 5 and no pending.
state.AckFloor.Consumer = 5
state.AckFloor.Stream = 5
state.Pending = nil

// Now put back the state underneath of the consumers.
for _, s := range c.servers {
mset, err := s.GlobalAccount().lookupStream("TEST")
require_NoError(t, err)
o := mset.lookupConsumer("C")
require_NotNil(t, o)
o.mu.Lock()
err = o.setStoreState(state)
cfs := o.store.(*consumerFileStore)
o.mu.Unlock()
require_NoError(t, err)
// The lower layer will ignore, so set more directly.
cfs.mu.Lock()
cfs.state = *state
cfs.mu.Unlock()
// Also snapshot to remove any raft entries that could affect it.
snap, err := o.store.EncodedState()
require_NoError(t, err)
require_NoError(t, o.raftNode().InstallSnapshot(snap))
}

cl.JetStreamStepdownConsumer(globalAccountName, "TEST", "C")
c.waitOnConsumerLeader(globalAccountName, "TEST", "C")

checkFor(t, 5*time.Second, 100*time.Millisecond, func() error {
ci, err := js.ConsumerInfo("TEST", "C")
require_NoError(t, err)
// Make sure we catch this and adjust.
if ci.AckFloor.Stream == 10 && ci.AckFloor.Consumer == 10 {
return nil
}
return fmt.Errorf("AckFloor not correct, expected 10, got %+v", ci.AckFloor)
})
}

func TestJetStreamClusterInterestStreamFilteredConsumersWithNoInterest(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R5S", 5)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"*"},
Retention: nats.InterestPolicy,
Replicas: 3,
})
require_NoError(t, err)

// Create three subscribers.
ackCb := func(m *nats.Msg) { m.Ack() }

_, err = js.Subscribe("foo", ackCb, nats.BindStream("TEST"), nats.ManualAck())
require_NoError(t, err)

_, err = js.Subscribe("bar", ackCb, nats.BindStream("TEST"), nats.ManualAck())
require_NoError(t, err)

_, err = js.Subscribe("baz", ackCb, nats.BindStream("TEST"), nats.ManualAck())
require_NoError(t, err)

// Now send 100 messages, randomly picking foo or bar, but never baz.
for i := 0; i < 100; i++ {
if rand.Intn(2) > 0 {
sendStreamMsg(t, nc, "foo", "HELLO")
} else {
sendStreamMsg(t, nc, "bar", "WORLD")
}
}

// Messages are expected to go to 0.
checkFor(t, time.Second, 100*time.Millisecond, func() error {
si, err := js.StreamInfo("TEST")
require_NoError(t, err)
if si.State.Msgs == 0 {
return nil
}
return fmt.Errorf("stream still has msgs")
})
}