Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IMPROVED] Protect against usage drift #4131

Merged
merged 1 commit into from May 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
67 changes: 59 additions & 8 deletions server/filestore.go
Expand Up @@ -1364,6 +1364,9 @@ func (fs *fileStore) expireMsgsOnRecover() {
if mb.msgs > 0 {
mb.first.seq, needNextFirst = seq, true
sz := fileStoreMsgSize(sm.subj, sm.hdr, sm.msg)
if sz > mb.bytes {
sz = mb.bytes
}
mb.bytes -= sz
bytes += sz
mb.msgs--
Expand Down Expand Up @@ -1411,8 +1414,16 @@ func (fs *fileStore) expireMsgsOnRecover() {
}
}
// Update top level accounting.
fs.state.Msgs -= purged
fs.state.Bytes -= bytes
if purged < fs.state.Msgs {
fs.state.Msgs -= purged
} else {
fs.state.Msgs = 0
}
if bytes < fs.state.Bytes {
fs.state.Bytes -= bytes
} else {
fs.state.Bytes = 0
}
// Make sure to we properly set the fs first sequence and timestamp.
fs.selectNextFirst()
}
Expand Down Expand Up @@ -2757,12 +2768,24 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) (
mb.lrts = time.Now().UnixNano()

// Global stats
fs.state.Msgs--
fs.state.Bytes -= msz
if fs.state.Msgs > 0 {
fs.state.Msgs--
}
if msz < fs.state.Bytes {
fs.state.Bytes -= msz
} else {
fs.state.Bytes = 0
}

// Now local mb updates.
mb.msgs--
mb.bytes -= msz
if mb.msgs > 0 {
mb.msgs--
}
if msz < mb.bytes {
mb.bytes -= msz
} else {
mb.bytes = 0
}

// If we are tracking subjects here make sure we update that accounting.
mb.ensurePerSubjectInfoLoaded()
Expand Down Expand Up @@ -3240,6 +3263,9 @@ func (mb *msgBlock) truncate(sm *StoreMsg) (nmsgs, nbytes uint64, err error) {
if mb.msgs > 0 {
rl := fileStoreMsgSize(m.subj, m.hdr, m.msg)
mb.msgs--
if rl > mb.bytes {
rl = mb.bytes
}
mb.bytes -= rl
mb.rbytes -= rl
// For return accounting.
Expand Down Expand Up @@ -5137,10 +5163,19 @@ func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint
// Do fast in place remove.
// Stats
if mb.msgs > 0 {
// Msgs
fs.state.Msgs--
fs.state.Bytes -= rl
mb.msgs--
// Bytes, make sure to not go negative.
if rl > fs.state.Bytes {
rl = fs.state.Bytes
}
if rl > mb.bytes {
rl = mb.bytes
}
fs.state.Bytes -= rl
mb.bytes -= rl
// Totals
purged++
bytes += rl
}
Expand Down Expand Up @@ -5347,9 +5382,12 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) {
} else if sm != nil {
sz := fileStoreMsgSize(sm.subj, sm.hdr, sm.msg)
if smb.msgs > 0 {
smb.msgs--
if sz > smb.bytes {
sz = smb.bytes
}
smb.bytes -= sz
bytes += sz
smb.msgs--
purged++
}
// Update fss
Expand Down Expand Up @@ -5433,7 +5471,14 @@ SKIP:
}

// Update top level accounting.
if purged > fs.state.Msgs {
purged = fs.state.Msgs
}
fs.state.Msgs -= purged

if bytes > fs.state.Bytes {
bytes = fs.state.Bytes
}
fs.state.Bytes -= bytes

cb := fs.scb
Expand Down Expand Up @@ -5555,7 +5600,13 @@ func (fs *fileStore) Truncate(seq uint64) error {
fs.state.LastSeq = lsm.seq
fs.state.LastTime = time.Unix(0, lsm.ts).UTC()
// Update msgs and bytes.
if purged > fs.state.Msgs {
purged = fs.state.Msgs
}
fs.state.Msgs -= purged
if bytes > fs.state.Bytes {
bytes = fs.state.Bytes
}
fs.state.Bytes -= bytes

// Reset our subject lookup info.
Expand Down
98 changes: 89 additions & 9 deletions server/jetstream.go
Expand Up @@ -1791,6 +1791,71 @@ func (jsa *jsAccount) remoteUpdateUsage(sub *subscription, c *client, _ *Account
jsa.usageMu.Unlock()
}

// When we detect a skew of some sort this will verify the usage reporting is correct.
// No locks should be held.
func (jsa *jsAccount) checkAndSyncUsage(tierName string, storeType StorageType) {
// Hold the account read lock and the usage lock while we calculate.
// We scope by tier and storage type, but if R3 File has 200 streams etc. could
// show a pause. I did test with > 100 non-active streams and was 80-200ns or so.
// Should be rare this gets called as well.
jsa.mu.RLock()
defer jsa.mu.RUnlock()
js := jsa.js
if js == nil {
return
}
s := js.srv

// Now range and qualify, hold usage lock to prevent updates.
jsa.usageMu.Lock()
defer jsa.usageMu.Unlock()

usage, ok := jsa.usage[tierName]
if !ok {
return
}

var total int64
var state StreamState
for _, mset := range jsa.streams {
mset.mu.RLock()
if mset.tier == tierName && mset.stype == storeType {
mset.store.FastState(&state)
total += int64(state.Bytes)
}
mset.mu.RUnlock()
}

var needClusterUpdate bool
// If we do not match on our calculations compute delta and adjust.
if storeType == MemoryStorage {
if total != usage.local.mem {
s.Warnf("MemStore usage drift of %v vs %v detected for account %q",
friendlyBytes(total), friendlyBytes(usage.local.mem), jsa.account.GetName())
delta := total - usage.local.mem
usage.local.mem += delta
usage.total.mem += delta
atomic.AddInt64(&js.memUsed, delta)
needClusterUpdate = true
}
} else {
if total != usage.local.store {
s.Warnf("FileStore usage drift of %v vs %v detected for account %q",
friendlyBytes(total), friendlyBytes(usage.local.store), jsa.account.GetName())
delta := total - usage.local.store
usage.local.store += delta
usage.total.store += delta
atomic.AddInt64(&js.storeUsed, delta)
needClusterUpdate = true
}
}

// Publish our local updates if in clustered mode.
if needClusterUpdate && js.isClusteredNoLock() {
jsa.sendClusterUsageUpdate()
}
}

// Updates accounting on in use memory and storage. This is called from locally
// by the lower storage layers.
func (jsa *jsAccount) updateUsage(tierName string, storeType StorageType, delta int64) {
Expand All @@ -1801,9 +1866,8 @@ func (jsa *jsAccount) updateUsage(tierName string, storeType StorageType, delta
// use of an atomic to do the check without having data race reports.
isClustered := js.isClusteredNoLock()

var needsCheck bool
jsa.usageMu.Lock()
defer jsa.usageMu.Unlock()

s, ok := jsa.usage[tierName]
if !ok {
s = &jsaStorage{}
Expand All @@ -1813,15 +1877,22 @@ func (jsa *jsAccount) updateUsage(tierName string, storeType StorageType, delta
s.local.mem += delta
s.total.mem += delta
atomic.AddInt64(&js.memUsed, delta)
needsCheck = s.local.mem < 0
} else {
s.local.store += delta
s.total.store += delta
atomic.AddInt64(&js.storeUsed, delta)
needsCheck = s.local.store < 0
}
// Publish our local updates if in clustered mode.
if isClustered {
jsa.sendClusterUsageUpdate()
}
jsa.usageMu.Unlock()

if needsCheck {
jsa.checkAndSyncUsage(tierName, storeType)
}
}

var usageTick = 1500 * time.Millisecond
Expand Down Expand Up @@ -2108,15 +2179,24 @@ func (js *jetStream) usageStats() *JetStreamStats {
var stats JetStreamStats
js.mu.RLock()
stats.Accounts = len(js.accounts)
stats.ReservedMemory = (uint64)(js.memReserved)
stats.ReservedStore = (uint64)(js.storeReserved)
stats.ReservedMemory = uint64(js.memReserved)
stats.ReservedStore = uint64(js.storeReserved)
s := js.srv
js.mu.RUnlock()
stats.API.Total = (uint64)(atomic.LoadInt64(&js.apiTotal))
stats.API.Errors = (uint64)(atomic.LoadInt64(&js.apiErrors))
stats.API.Inflight = (uint64)(atomic.LoadInt64(&js.apiInflight))
stats.Memory = (uint64)(atomic.LoadInt64(&js.memUsed))
stats.Store = (uint64)(atomic.LoadInt64(&js.storeUsed))
stats.API.Total = uint64(atomic.LoadInt64(&js.apiTotal))
stats.API.Errors = uint64(atomic.LoadInt64(&js.apiErrors))
stats.API.Inflight = uint64(atomic.LoadInt64(&js.apiInflight))
// Make sure we do not report negative.
used := atomic.LoadInt64(&js.memUsed)
if used < 0 {
used = 0
}
stats.Memory = uint64(used)
used = atomic.LoadInt64(&js.storeUsed)
if used < 0 {
used = 0
}
stats.Store = uint64(used)
stats.HAAssets = s.numRaftNodes()
return &stats
}
Expand Down
60 changes: 60 additions & 0 deletions server/jetstream_cluster_3_test.go
Expand Up @@ -3936,3 +3936,63 @@ func TestJetStreamClusterStreamAccountingOnStoreError(t *testing.T) {
t.Fatalf("Expected store reserved to be 0 after stream delete, got %v", friendlyBytes(reserved))
}
}

func TestJetStreamClusterStreamAccountingDriftFixups(t *testing.T) {
c := createJetStreamClusterWithTemplate(t, jsClusterMaxBytesAccountLimitTempl, "NATS", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"*"},
MaxBytes: 2 * 1024 * 1024,
Replicas: 3,
})
require_NoError(t, err)

msg := strings.Repeat("Z", 32*1024)
for i := 0; i < 100; i++ {
sendStreamMsg(t, nc, "foo", msg)
}

err = js.PurgeStream("TEST")
require_NoError(t, err)

checkFor(t, time.Second, 200*time.Millisecond, func() error {
info, err := js.AccountInfo()
require_NoError(t, err)
if info.Store != 0 {
return fmt.Errorf("Store usage not 0: %d", info.Store)
}
return nil
})

s := c.leader()
jsz, err := s.Jsz(nil)
require_NoError(t, err)
require_True(t, jsz.JetStreamStats.Store == 0)

acc, err := s.LookupAccount("$U")
require_NoError(t, err)
mset, err := acc.lookupStream("TEST")
require_NoError(t, err)
mset.mu.RLock()
jsa, tier, stype := mset.jsa, mset.tier, mset.stype
mset.mu.RUnlock()
// Drift the usage.
jsa.updateUsage(tier, stype, -100)

checkFor(t, time.Second, 200*time.Millisecond, func() error {
info, err := js.AccountInfo()
require_NoError(t, err)
if info.Store != 0 {
return fmt.Errorf("Store usage not 0: %d", info.Store)
}
return nil
})
jsz, err = s.Jsz(nil)
require_NoError(t, err)
require_True(t, jsz.JetStreamStats.Store == 0)
}
21 changes: 19 additions & 2 deletions server/memstore.go
Expand Up @@ -715,7 +715,13 @@ func (ms *memStore) Compact(seq uint64) (uint64, error) {
ms.removeSeqPerSubject(sm.subj, seq)
}
}
if purged > ms.state.Msgs {
purged = ms.state.Msgs
}
ms.state.Msgs -= purged
if bytes > ms.state.Bytes {
bytes = ms.state.Bytes
}
ms.state.Bytes -= bytes
} else {
// We are compacting past the end of our range. Do purge and set sequences correctly
Expand Down Expand Up @@ -800,7 +806,13 @@ func (ms *memStore) Truncate(seq uint64) error {
ms.state.LastSeq = lsm.seq
ms.state.LastTime = time.Unix(0, lsm.ts).UTC()
// Update msgs and bytes.
if purged > ms.state.Msgs {
purged = ms.state.Msgs
}
ms.state.Msgs -= purged
if bytes > ms.state.Bytes {
bytes = ms.state.Bytes
}
ms.state.Bytes -= bytes

cb := ms.scb
Expand Down Expand Up @@ -1033,8 +1045,13 @@ func (ms *memStore) removeMsg(seq uint64, secure bool) bool {
ss = memStoreMsgSize(sm.subj, sm.hdr, sm.msg)

delete(ms.msgs, seq)
ms.state.Msgs--
ms.state.Bytes -= ss
if ms.state.Msgs > 0 {
ms.state.Msgs--
if ss > ms.state.Bytes {
ss = ms.state.Bytes
}
ms.state.Bytes -= ss
}
ms.updateFirstSeq(seq)

if secure {
Expand Down
10 changes: 5 additions & 5 deletions server/stream.go
Expand Up @@ -1615,14 +1615,14 @@ func (mset *stream) updateWithAdvisory(config *StreamConfig, sendAdvisory bool)

if targetTier := tierName(cfg); mset.tier != targetTier {
// In cases such as R1->R3, only one update is needed
mset.jsa.usageMu.RLock()
_, ok := mset.jsa.limits[targetTier]
mset.jsa.usageMu.RUnlock()
jsa.usageMu.RLock()
_, ok := jsa.limits[targetTier]
jsa.usageMu.RUnlock()
if ok {
// error never set
_, reported, _ := mset.store.Utilization()
mset.jsa.updateUsage(mset.tier, mset.stype, -int64(reported))
mset.jsa.updateUsage(targetTier, mset.stype, int64(reported))
jsa.updateUsage(mset.tier, mset.stype, -int64(reported))
jsa.updateUsage(targetTier, mset.stype, int64(reported))
mset.tier = targetTier
}
// else in case the new tier does not exist (say on move), keep the old tier around
Expand Down