Skip to content

Commit

Permalink
Merge pull request #3970 from nats-io/ackfloor-fseq
Browse files Browse the repository at this point in the history
Remove msgs from interest based stream on consumer snapshot.
  • Loading branch information
derekcollison committed Mar 16, 2023
2 parents d1048ac + 5bb6f16 commit c4075dd
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 1 deletion.
2 changes: 1 addition & 1 deletion server/consumer.go
@@ -1,4 +1,4 @@
// Copyright 2019-2022 The NATS Authors
// Copyright 2019-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down
15 changes: 15 additions & 0 deletions server/jetstream_cluster.go
Expand Up @@ -4315,7 +4315,22 @@ func (js *jetStream) applyConsumerEntries(o *consumer, ce *CommittedEntry, isLea
if s != nil && mset != nil {
s.Warnf("Consumer '%s > %s > %s' error on store update from snapshot entry: %v", acc, mset.name(), name, err)
}
} else if state, err := o.store.State(); err == nil {
// See if we need to process this update if our parent stream is not a limits policy stream.
o.mu.RLock()
mset := o.mset
shouldProcessAcks := mset != nil && o.retention != LimitsPolicy
o.mu.RUnlock()
// We should make sure to update the acks.
if shouldProcessAcks {
var ss StreamState
mset.store.FastState(&ss)
for seq := ss.FirstSeq; seq <= state.AckFloor.Stream; seq++ {
mset.ackMsg(o, seq)
}
}
}

} else if e.Type == EntryRemovePeer {
js.mu.RLock()
var ourID string
Expand Down
91 changes: 91 additions & 0 deletions server/jetstream_cluster_3_test.go
Expand Up @@ -3016,3 +3016,94 @@ func TestJetStreamClusterWorkQueueAfterScaleUp(t *testing.T) {
require_NoError(t, err)
require_True(t, si.State.Msgs == 0)
}

func TestJetStreamClusterInterestBasedStreamAndConsumerSnapshots(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Replicas: 3,
Subjects: []string{"foo"},
Retention: nats.InterestPolicy,
})
require_NoError(t, err)

sub, err := js.SubscribeSync("foo", nats.Durable("d22"))
require_NoError(t, err)

num := 200
for i := 0; i < num; i++ {
js.PublishAsync("foo", []byte("ok"))
}
select {
case <-js.PublishAsyncComplete():
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive completion signal")
}

checkSubsPending(t, sub, num)

// Shutdown one server.
s := c.randomServer()
s.Shutdown()

c.waitOnStreamLeader(globalAccountName, "TEST")

nc, js = jsClientConnect(t, c.randomServer())
defer nc.Close()

// Now ack all messages while the other server is down.
for i := 0; i < num; i++ {
m, err := sub.NextMsg(time.Second)
require_NoError(t, err)
m.AckSync()
}

// Wait for all message acks to be processed and all messages to be removed.
checkFor(t, time.Second, 200*time.Millisecond, func() error {
si, err := js.StreamInfo("TEST")
require_NoError(t, err)
if si.State.Msgs == 0 {
return nil
}
return fmt.Errorf("Still have %d msgs left", si.State.Msgs)
})

// Force a snapshot on the consumer leader before restarting the downed server.
cl := c.consumerLeader(globalAccountName, "TEST", "d22")
require_NotNil(t, cl)

mset, err := cl.GlobalAccount().lookupStream("TEST")
require_NoError(t, err)

o := mset.lookupConsumer("d22")
require_NotNil(t, o)

snap, err := o.store.EncodedState()
require_NoError(t, err)

n := o.raftNode()
require_NotNil(t, n)
require_NoError(t, n.InstallSnapshot(snap))

// Now restart the downed server.
s = c.restartServer(s)

// Make the restarted server the eventual leader.
checkFor(t, 20*time.Second, 200*time.Millisecond, func() error {
c.waitOnStreamLeader(globalAccountName, "TEST")
if sl := c.streamLeader(globalAccountName, "TEST"); sl != s {
sl.JetStreamStepdownStream(globalAccountName, "TEST")
return fmt.Errorf("Server %s is not leader yet", s)
}
return nil
})

si, err := js.StreamInfo("TEST")
require_NoError(t, err)
require_True(t, si.State.Msgs == 0)
}

0 comments on commit c4075dd

Please sign in to comment.