Skip to content

Commit

Permalink
[IMPROVED] Protect against usage drift (#4131)
Browse files Browse the repository at this point in the history
If we detect a drift for any unforeseen reason correct it.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed May 4, 2023
2 parents 793db74 + 2123902 commit 413486f
Show file tree
Hide file tree
Showing 5 changed files with 232 additions and 24 deletions.
67 changes: 59 additions & 8 deletions server/filestore.go
Expand Up @@ -1364,6 +1364,9 @@ func (fs *fileStore) expireMsgsOnRecover() {
if mb.msgs > 0 {
mb.first.seq, needNextFirst = seq, true
sz := fileStoreMsgSize(sm.subj, sm.hdr, sm.msg)
if sz > mb.bytes {
sz = mb.bytes
}
mb.bytes -= sz
bytes += sz
mb.msgs--
Expand Down Expand Up @@ -1411,8 +1414,16 @@ func (fs *fileStore) expireMsgsOnRecover() {
}
}
// Update top level accounting.
fs.state.Msgs -= purged
fs.state.Bytes -= bytes
if purged < fs.state.Msgs {
fs.state.Msgs -= purged
} else {
fs.state.Msgs = 0
}
if bytes < fs.state.Bytes {
fs.state.Bytes -= bytes
} else {
fs.state.Bytes = 0
}
// Make sure to we properly set the fs first sequence and timestamp.
fs.selectNextFirst()
}
Expand Down Expand Up @@ -2757,12 +2768,24 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) (
mb.lrts = time.Now().UnixNano()

// Global stats
fs.state.Msgs--
fs.state.Bytes -= msz
if fs.state.Msgs > 0 {
fs.state.Msgs--
}
if msz < fs.state.Bytes {
fs.state.Bytes -= msz
} else {
fs.state.Bytes = 0
}

// Now local mb updates.
mb.msgs--
mb.bytes -= msz
if mb.msgs > 0 {
mb.msgs--
}
if msz < mb.bytes {
mb.bytes -= msz
} else {
mb.bytes = 0
}

// If we are tracking subjects here make sure we update that accounting.
mb.ensurePerSubjectInfoLoaded()
Expand Down Expand Up @@ -3240,6 +3263,9 @@ func (mb *msgBlock) truncate(sm *StoreMsg) (nmsgs, nbytes uint64, err error) {
if mb.msgs > 0 {
rl := fileStoreMsgSize(m.subj, m.hdr, m.msg)
mb.msgs--
if rl > mb.bytes {
rl = mb.bytes
}
mb.bytes -= rl
mb.rbytes -= rl
// For return accounting.
Expand Down Expand Up @@ -5137,10 +5163,19 @@ func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint
// Do fast in place remove.
// Stats
if mb.msgs > 0 {
// Msgs
fs.state.Msgs--
fs.state.Bytes -= rl
mb.msgs--
// Bytes, make sure to not go negative.
if rl > fs.state.Bytes {
rl = fs.state.Bytes
}
if rl > mb.bytes {
rl = mb.bytes
}
fs.state.Bytes -= rl
mb.bytes -= rl
// Totals
purged++
bytes += rl
}
Expand Down Expand Up @@ -5347,9 +5382,12 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) {
} else if sm != nil {
sz := fileStoreMsgSize(sm.subj, sm.hdr, sm.msg)
if smb.msgs > 0 {
smb.msgs--
if sz > smb.bytes {
sz = smb.bytes
}
smb.bytes -= sz
bytes += sz
smb.msgs--
purged++
}
// Update fss
Expand Down Expand Up @@ -5433,7 +5471,14 @@ SKIP:
}

// Update top level accounting.
if purged > fs.state.Msgs {
purged = fs.state.Msgs
}
fs.state.Msgs -= purged

if bytes > fs.state.Bytes {
bytes = fs.state.Bytes
}
fs.state.Bytes -= bytes

cb := fs.scb
Expand Down Expand Up @@ -5555,7 +5600,13 @@ func (fs *fileStore) Truncate(seq uint64) error {
fs.state.LastSeq = lsm.seq
fs.state.LastTime = time.Unix(0, lsm.ts).UTC()
// Update msgs and bytes.
if purged > fs.state.Msgs {
purged = fs.state.Msgs
}
fs.state.Msgs -= purged
if bytes > fs.state.Bytes {
bytes = fs.state.Bytes
}
fs.state.Bytes -= bytes

// Reset our subject lookup info.
Expand Down
98 changes: 89 additions & 9 deletions server/jetstream.go
Expand Up @@ -1791,6 +1791,71 @@ func (jsa *jsAccount) remoteUpdateUsage(sub *subscription, c *client, _ *Account
jsa.usageMu.Unlock()
}

// When we detect a skew of some sort this will verify the usage reporting is correct.
// No locks should be held.
func (jsa *jsAccount) checkAndSyncUsage(tierName string, storeType StorageType) {
// Hold the account read lock and the usage lock while we calculate.
// We scope by tier and storage type, but if R3 File has 200 streams etc. could
// show a pause. I did test with > 100 non-active streams and was 80-200ns or so.
// Should be rare this gets called as well.
jsa.mu.RLock()
defer jsa.mu.RUnlock()
js := jsa.js
if js == nil {
return
}
s := js.srv

// Now range and qualify, hold usage lock to prevent updates.
jsa.usageMu.Lock()
defer jsa.usageMu.Unlock()

usage, ok := jsa.usage[tierName]
if !ok {
return
}

var total int64
var state StreamState
for _, mset := range jsa.streams {
mset.mu.RLock()
if mset.tier == tierName && mset.stype == storeType {
mset.store.FastState(&state)
total += int64(state.Bytes)
}
mset.mu.RUnlock()
}

var needClusterUpdate bool
// If we do not match on our calculations compute delta and adjust.
if storeType == MemoryStorage {
if total != usage.local.mem {
s.Warnf("MemStore usage drift of %v vs %v detected for account %q",
friendlyBytes(total), friendlyBytes(usage.local.mem), jsa.account.GetName())
delta := total - usage.local.mem
usage.local.mem += delta
usage.total.mem += delta
atomic.AddInt64(&js.memUsed, delta)
needClusterUpdate = true
}
} else {
if total != usage.local.store {
s.Warnf("FileStore usage drift of %v vs %v detected for account %q",
friendlyBytes(total), friendlyBytes(usage.local.store), jsa.account.GetName())
delta := total - usage.local.store
usage.local.store += delta
usage.total.store += delta
atomic.AddInt64(&js.storeUsed, delta)
needClusterUpdate = true
}
}

// Publish our local updates if in clustered mode.
if needClusterUpdate && js.isClusteredNoLock() {
jsa.sendClusterUsageUpdate()
}
}

// Updates accounting on in use memory and storage. This is called from locally
// by the lower storage layers.
func (jsa *jsAccount) updateUsage(tierName string, storeType StorageType, delta int64) {
Expand All @@ -1801,9 +1866,8 @@ func (jsa *jsAccount) updateUsage(tierName string, storeType StorageType, delta
// use of an atomic to do the check without having data race reports.
isClustered := js.isClusteredNoLock()

var needsCheck bool
jsa.usageMu.Lock()
defer jsa.usageMu.Unlock()

s, ok := jsa.usage[tierName]
if !ok {
s = &jsaStorage{}
Expand All @@ -1813,15 +1877,22 @@ func (jsa *jsAccount) updateUsage(tierName string, storeType StorageType, delta
s.local.mem += delta
s.total.mem += delta
atomic.AddInt64(&js.memUsed, delta)
needsCheck = s.local.mem < 0
} else {
s.local.store += delta
s.total.store += delta
atomic.AddInt64(&js.storeUsed, delta)
needsCheck = s.local.store < 0
}
// Publish our local updates if in clustered mode.
if isClustered {
jsa.sendClusterUsageUpdate()
}
jsa.usageMu.Unlock()

if needsCheck {
jsa.checkAndSyncUsage(tierName, storeType)
}
}

var usageTick = 1500 * time.Millisecond
Expand Down Expand Up @@ -2108,15 +2179,24 @@ func (js *jetStream) usageStats() *JetStreamStats {
var stats JetStreamStats
js.mu.RLock()
stats.Accounts = len(js.accounts)
stats.ReservedMemory = (uint64)(js.memReserved)
stats.ReservedStore = (uint64)(js.storeReserved)
stats.ReservedMemory = uint64(js.memReserved)
stats.ReservedStore = uint64(js.storeReserved)
s := js.srv
js.mu.RUnlock()
stats.API.Total = (uint64)(atomic.LoadInt64(&js.apiTotal))
stats.API.Errors = (uint64)(atomic.LoadInt64(&js.apiErrors))
stats.API.Inflight = (uint64)(atomic.LoadInt64(&js.apiInflight))
stats.Memory = (uint64)(atomic.LoadInt64(&js.memUsed))
stats.Store = (uint64)(atomic.LoadInt64(&js.storeUsed))
stats.API.Total = uint64(atomic.LoadInt64(&js.apiTotal))
stats.API.Errors = uint64(atomic.LoadInt64(&js.apiErrors))
stats.API.Inflight = uint64(atomic.LoadInt64(&js.apiInflight))
// Make sure we do not report negative.
used := atomic.LoadInt64(&js.memUsed)
if used < 0 {
used = 0
}
stats.Memory = uint64(used)
used = atomic.LoadInt64(&js.storeUsed)
if used < 0 {
used = 0
}
stats.Store = uint64(used)
stats.HAAssets = s.numRaftNodes()
return &stats
}
Expand Down
60 changes: 60 additions & 0 deletions server/jetstream_cluster_3_test.go
Expand Up @@ -3936,3 +3936,63 @@ func TestJetStreamClusterStreamAccountingOnStoreError(t *testing.T) {
t.Fatalf("Expected store reserved to be 0 after stream delete, got %v", friendlyBytes(reserved))
}
}

func TestJetStreamClusterStreamAccountingDriftFixups(t *testing.T) {
c := createJetStreamClusterWithTemplate(t, jsClusterMaxBytesAccountLimitTempl, "NATS", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"*"},
MaxBytes: 2 * 1024 * 1024,
Replicas: 3,
})
require_NoError(t, err)

msg := strings.Repeat("Z", 32*1024)
for i := 0; i < 100; i++ {
sendStreamMsg(t, nc, "foo", msg)
}

err = js.PurgeStream("TEST")
require_NoError(t, err)

checkFor(t, time.Second, 200*time.Millisecond, func() error {
info, err := js.AccountInfo()
require_NoError(t, err)
if info.Store != 0 {
return fmt.Errorf("Store usage not 0: %d", info.Store)
}
return nil
})

s := c.leader()
jsz, err := s.Jsz(nil)
require_NoError(t, err)
require_True(t, jsz.JetStreamStats.Store == 0)

acc, err := s.LookupAccount("$U")
require_NoError(t, err)
mset, err := acc.lookupStream("TEST")
require_NoError(t, err)
mset.mu.RLock()
jsa, tier, stype := mset.jsa, mset.tier, mset.stype
mset.mu.RUnlock()
// Drift the usage.
jsa.updateUsage(tier, stype, -100)

checkFor(t, time.Second, 200*time.Millisecond, func() error {
info, err := js.AccountInfo()
require_NoError(t, err)
if info.Store != 0 {
return fmt.Errorf("Store usage not 0: %d", info.Store)
}
return nil
})
jsz, err = s.Jsz(nil)
require_NoError(t, err)
require_True(t, jsz.JetStreamStats.Store == 0)
}
21 changes: 19 additions & 2 deletions server/memstore.go
Expand Up @@ -715,7 +715,13 @@ func (ms *memStore) Compact(seq uint64) (uint64, error) {
ms.removeSeqPerSubject(sm.subj, seq)
}
}
if purged > ms.state.Msgs {
purged = ms.state.Msgs
}
ms.state.Msgs -= purged
if bytes > ms.state.Bytes {
bytes = ms.state.Bytes
}
ms.state.Bytes -= bytes
} else {
// We are compacting past the end of our range. Do purge and set sequences correctly
Expand Down Expand Up @@ -800,7 +806,13 @@ func (ms *memStore) Truncate(seq uint64) error {
ms.state.LastSeq = lsm.seq
ms.state.LastTime = time.Unix(0, lsm.ts).UTC()
// Update msgs and bytes.
if purged > ms.state.Msgs {
purged = ms.state.Msgs
}
ms.state.Msgs -= purged
if bytes > ms.state.Bytes {
bytes = ms.state.Bytes
}
ms.state.Bytes -= bytes

cb := ms.scb
Expand Down Expand Up @@ -1033,8 +1045,13 @@ func (ms *memStore) removeMsg(seq uint64, secure bool) bool {
ss = memStoreMsgSize(sm.subj, sm.hdr, sm.msg)

delete(ms.msgs, seq)
ms.state.Msgs--
ms.state.Bytes -= ss
if ms.state.Msgs > 0 {
ms.state.Msgs--
if ss > ms.state.Bytes {
ss = ms.state.Bytes
}
ms.state.Bytes -= ss
}
ms.updateFirstSeq(seq)

if secure {
Expand Down
10 changes: 5 additions & 5 deletions server/stream.go
Expand Up @@ -1615,14 +1615,14 @@ func (mset *stream) updateWithAdvisory(config *StreamConfig, sendAdvisory bool)

if targetTier := tierName(cfg); mset.tier != targetTier {
// In cases such as R1->R3, only one update is needed
mset.jsa.usageMu.RLock()
_, ok := mset.jsa.limits[targetTier]
mset.jsa.usageMu.RUnlock()
jsa.usageMu.RLock()
_, ok := jsa.limits[targetTier]
jsa.usageMu.RUnlock()
if ok {
// error never set
_, reported, _ := mset.store.Utilization()
mset.jsa.updateUsage(mset.tier, mset.stype, -int64(reported))
mset.jsa.updateUsage(targetTier, mset.stype, int64(reported))
jsa.updateUsage(mset.tier, mset.stype, -int64(reported))
jsa.updateUsage(targetTier, mset.stype, int64(reported))
mset.tier = targetTier
}
// else in case the new tier does not exist (say on move), keep the old tier around
Expand Down

0 comments on commit 413486f

Please sign in to comment.