Skip to content

Commit

Permalink
Merge pull request #3876 from nats-io/purge-accounting
Browse files Browse the repository at this point in the history
[FIXED] Extended Purge accounting was not updating account usage.
  • Loading branch information
derekcollison committed Feb 17, 2023
2 parents b3b9e88 + 0cb01f9 commit b91143a
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 2 deletions.
10 changes: 9 additions & 1 deletion server/filestore.go
@@ -1,4 +1,4 @@
// Copyright 2019-2022 The NATS Authors
// Copyright 2019-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -4663,6 +4663,7 @@ func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint

eq, wc := compareFn(subject), subjectHasWildcard(subject)
var firstSeqNeedsUpdate bool
var bytes uint64

// If we have a "keep" designation need to get full filtered state so we know how many to purge.
var maxp uint64
Expand Down Expand Up @@ -4712,6 +4713,7 @@ func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint
mb.msgs--
mb.bytes -= rl
purged++
bytes += rl
}
// FSS updates.
mb.removeSeqPerSubject(sm.subj, seq, &smv)
Expand Down Expand Up @@ -4759,7 +4761,13 @@ func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint
fs.selectNextFirst()
}

cb := fs.scb
fs.mu.Unlock()

if cb != nil {
cb(-int64(purged), -int64(bytes), 0, _EMPTY_)
}

return purged, nil
}

Expand Down
63 changes: 62 additions & 1 deletion server/jetstream_test.go
@@ -1,4 +1,4 @@
// Copyright 2019-2022 The NATS Authors
// Copyright 2019-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -19425,3 +19425,64 @@ func TestJetStreamConsumerFilterUpdate(t *testing.T) {
// and expect that numFilter reports correctly.
checkNumFilter(0)
}

func TestJetStreamPurgeExAndAccounting(t *testing.T) {
cases := []struct {
name string
cfg *nats.StreamConfig
}{
{name: "MemoryStore",
cfg: &nats.StreamConfig{
Name: "TEST",
Storage: nats.MemoryStorage,
Subjects: []string{"*"},
}},
{name: "FileStore",
cfg: &nats.StreamConfig{
Name: "TEST",
Storage: nats.FileStorage,
Subjects: []string{"*"},
}},
}
for _, c := range cases {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

// Client for API requests.
nc, js := jsClientConnect(t, s)
defer nc.Close()

_, err := js.AddStream(c.cfg)
require_NoError(t, err)

msg := []byte("accounting")
for i := 0; i < 100; i++ {
_, err = js.Publish("foo", msg)
require_NoError(t, err)
_, err = js.Publish("bar", msg)
require_NoError(t, err)
}

info, err := js.AccountInfo()
require_NoError(t, err)

err = js.PurgeStream("TEST", &nats.StreamPurgeRequest{Subject: "foo"})
require_NoError(t, err)

ninfo, err := js.AccountInfo()
require_NoError(t, err)

// Make sure we did the proper accounting.
if c.cfg.Storage == nats.MemoryStorage {
if ninfo.Memory != info.Memory/2 {
t.Fatalf("Accounting information incorrect for Memory: %d vs %d",
ninfo.Memory, info.Memory/2)
}
} else {
if ninfo.Store != info.Store/2 {
t.Fatalf("Accounting information incorrect for FileStore: %d vs %d",
ninfo.Store, info.Store/2)
}
}
}
}

0 comments on commit b91143a

Please sign in to comment.