Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IMPROVED] Clustered consumer improvements #4107

Merged
merged 2 commits into from Apr 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 10 additions & 0 deletions server/consumer.go
Expand Up @@ -2314,6 +2314,16 @@ func (o *consumer) infoWithSnapAndReply(snap bool, reply string) *ConsumerInfo {
NumPending: o.checkNumPending(),
PushBound: o.isPushMode() && o.active,
}

// If we are replicated and we are not the leader we need to pull certain data from our store.
if rg != nil && rg.node != nil && !o.isLeader() && o.store != nil {
state, _ := o.store.BorrowState()
info.Delivered.Consumer, info.Delivered.Stream = state.Delivered.Consumer, state.Delivered.Stream
info.AckFloor.Consumer, info.AckFloor.Stream = state.AckFloor.Consumer, state.AckFloor.Stream
info.NumAckPending = len(state.Pending)
info.NumRedelivered = len(state.Redelivered)
}

// Adjust active based on non-zero etc. Also make UTC here.
if !o.ldt.IsZero() {
ldt := o.ldt.UTC() // This copies as well.
Expand Down
30 changes: 22 additions & 8 deletions server/filestore.go
Expand Up @@ -6536,6 +6536,10 @@ func (fs *fileStore) ConsumerStore(name string, cfg *ConsumerConfig) (ConsumerSt
o.qch = make(chan struct{})
go o.flushLoop(o.fch, o.qch)

// Make sure to load in our state from disk if needed.
o.loadState()

// Assign to filestore.
fs.AddConsumer(o)

return o, nil
Expand Down Expand Up @@ -6841,18 +6845,16 @@ const seqsHdrSize = 6*binary.MaxVarintLen64 + hdrLen
func (o *consumerFileStore) EncodedState() ([]byte, error) {
o.mu.Lock()
defer o.mu.Unlock()

if o.closed {
return nil, ErrStoreClosed
}
return encodeConsumerState(&o.state), nil
return o.encodeState()
}

func (o *consumerFileStore) encodeState() ([]byte, error) {
if o.closed {
return nil, ErrStoreClosed
// Grab reference to state, but make sure we load in if needed, so do not reference o.state directly.
state, err := o.stateWithCopyLocked(false)
if err != nil {
return nil, err
}
return encodeConsumerState(&o.state), nil
return encodeConsumerState(state), nil
}

func (o *consumerFileStore) UpdateConfig(cfg *ConsumerConfig) error {
Expand Down Expand Up @@ -7084,7 +7086,11 @@ func (o *consumerFileStore) BorrowState() (*ConsumerState, error) {
func (o *consumerFileStore) stateWithCopy(doCopy bool) (*ConsumerState, error) {
o.mu.Lock()
defer o.mu.Unlock()
return o.stateWithCopyLocked(doCopy)
}

// Lock should be held.
func (o *consumerFileStore) stateWithCopyLocked(doCopy bool) (*ConsumerState, error) {
if o.closed {
return nil, ErrStoreClosed
}
Expand Down Expand Up @@ -7163,6 +7169,14 @@ func (o *consumerFileStore) stateWithCopy(doCopy bool) (*ConsumerState, error) {
return state, nil
}

// Lock should be held. Called at startup.
func (o *consumerFileStore) loadState() {
if _, err := os.Stat(o.ifn); err == nil {
// This will load our state in from disk.
o.stateWithCopyLocked(false)
}
}

// Decode consumer state.
func decodeConsumerState(buf []byte) (*ConsumerState, error) {
version, err := checkConsumerHeader(buf)
Expand Down
35 changes: 35 additions & 0 deletions server/filestore_test.go
Expand Up @@ -5373,3 +5373,38 @@ func TestFileStoreSubjectsTotals(t *testing.T) {
t.Fatalf("Expected %d subjects for %q, got %d", expected, "*.*", len(st))
}
}

func TestFileStoreConsumerStoreEncodeAfterRestart(t *testing.T) {
testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) {
fs, err := newFileStore(fcfg, StreamConfig{Name: "zzz", Storage: FileStorage})
require_NoError(t, err)
defer fs.Stop()

o, err := fs.ConsumerStore("o22", &ConsumerConfig{AckPolicy: AckExplicit})
require_NoError(t, err)

state := &ConsumerState{}
state.Delivered.Consumer = 22
state.Delivered.Stream = 22
state.AckFloor.Consumer = 11
state.AckFloor.Stream = 11
err = o.Update(state)
require_NoError(t, err)

fs.Stop()

fs, err = newFileStore(fcfg, StreamConfig{Name: "zzz", Storage: FileStorage})
require_NoError(t, err)
defer fs.Stop()

o, err = fs.ConsumerStore("o22", &ConsumerConfig{AckPolicy: AckExplicit})
require_NoError(t, err)

if o.(*consumerFileStore).state.Delivered != state.Delivered {
t.Fatalf("Consumer state is wrong %+v vs %+v", o.(*consumerFileStore).state, state)
}
if o.(*consumerFileStore).state.AckFloor != state.AckFloor {
t.Fatalf("Consumer state is wrong %+v vs %+v", o.(*consumerFileStore).state, state)
}
})
}
50 changes: 50 additions & 0 deletions server/jetstream_cluster_3_test.go
Expand Up @@ -3713,3 +3713,53 @@ func TestJetStreamClusterChangeClusterAfterStreamCreate(t *testing.T) {
})
require_NoError(t, err)
}

// The consumer info() call does not take into account whether a consumer
// is a leader or not, so results would be very different when asking servers
// that housed consumer followers vs leaders.
func TestJetStreamClusterConsumerInfoForJszForFollowers(t *testing.T) {
c := createJetStreamClusterExplicit(t, "NATS", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"*"},
Replicas: 3,
})
require_NoError(t, err)

for i := 0; i < 1000; i++ {
sendStreamMsg(t, nc, "foo", "HELLO")
}

sub, err := js.PullSubscribe("foo", "d")
require_NoError(t, err)

fetch, ack := 122, 22
msgs, err := sub.Fetch(fetch, nats.MaxWait(10*time.Second))
require_NoError(t, err)
require_True(t, len(msgs) == fetch)
for _, m := range msgs[:ack] {
m.AckSync()
}
// Let acks propagate.
time.Sleep(100 * time.Millisecond)

for _, s := range c.servers {
jsz, err := s.Jsz(&JSzOptions{Accounts: true, Consumer: true})
require_NoError(t, err)
require_True(t, len(jsz.AccountDetails) == 1)
require_True(t, len(jsz.AccountDetails[0].Streams) == 1)
require_True(t, len(jsz.AccountDetails[0].Streams[0].Consumer) == 1)
consumer := jsz.AccountDetails[0].Streams[0].Consumer[0]
if consumer.Delivered.Consumer != uint64(fetch) || consumer.Delivered.Stream != uint64(fetch) {
t.Fatalf("Incorrect delivered for %v: %+v", s, consumer.Delivered)
}
if consumer.AckFloor.Consumer != uint64(ack) || consumer.AckFloor.Stream != uint64(ack) {
t.Fatalf("Incorrect ackfloor for %v: %+v", s, consumer.AckFloor)
}
}
}