Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] Accounting drifts #4357

Merged
merged 3 commits into from Aug 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 9 additions & 2 deletions server/filestore.go
Expand Up @@ -5421,13 +5421,13 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) {

var purged, bytes uint64

// We have to delete interior messages.
fs.mu.Lock()
// Same as purge all.
if lseq := fs.state.LastSeq; seq > lseq {
fs.mu.Unlock()
return fs.purge(seq)
}

// We have to delete interior messages.
smb := fs.selectMsgBlock(seq)
if smb == nil {
fs.mu.Unlock()
Expand Down Expand Up @@ -6345,6 +6345,9 @@ func (fs *fileStore) Stop() error {
fs.cancelSyncTimer()
fs.cancelAgeChk()

// We should update the upper usage layer on a stop.
cb, bytes := fs.scb, int64(fs.state.Bytes)

var _cfs [256]ConsumerStore
cfs := append(_cfs[:0], fs.cfs...)
fs.cfs = nil
Expand All @@ -6354,6 +6357,10 @@ func (fs *fileStore) Stop() error {
o.Stop()
}

if bytes > 0 && cb != nil {
cb(0, -bytes, 0, _EMPTY_)
}

return nil
}

Expand Down
4 changes: 2 additions & 2 deletions server/jetstream_api.go
@@ -1,4 +1,4 @@
// Copyright 2020-2022 The NATS Authors
// Copyright 2020-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -2656,7 +2656,7 @@ func (s *Server) jsLeaderAccountPurgeRequest(sub *subscription, c *client, _ *Ac
}

// Request to have the meta leader stepdown.
// These will only be received the meta leaders, so less checking needed.
// These will only be received by the meta leader, so less checking needed.
func (s *Server) jsLeaderStepDownRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
if c == nil || !s.JetStreamEnabled() {
return
Expand Down
13 changes: 8 additions & 5 deletions server/jetstream_cluster.go
Expand Up @@ -2808,12 +2808,15 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco
}
panic(err.Error())
}
// Ignore if we are recovering and we have already processed.
if isRecovering && (sp.Request == nil || sp.Request.Sequence == 0) {
// If no explicit request, fill in with leader stamped last sequence to protect ourselves on replay during server start.
if sp.Request == nil || sp.Request.Sequence == 0 {
purgeSeq := sp.LastSeq + 1
if sp.Request == nil {
sp.Request = &JSApiStreamPurgeRequest{Sequence: sp.LastSeq}
} else {
sp.Request.Sequence = sp.LastSeq
sp.Request = &JSApiStreamPurgeRequest{Sequence: purgeSeq}
} else if sp.Request.Keep == 0 {
sp.Request.Sequence = purgeSeq
} else if isRecovering {
continue
}
}

Expand Down
221 changes: 220 additions & 1 deletion server/jetstream_cluster_3_test.go
Expand Up @@ -31,6 +31,7 @@ import (
"testing"
"time"

"github.com/nats-io/jwt/v2"
"github.com/nats-io/nats.go"
)

Expand Down Expand Up @@ -399,7 +400,7 @@ func TestJetStreamClusterNegativeReplicas(t *testing.T) {
})
require_NoError(t, err)

// Check upadte now.
// Check update now.
_, err = js.UpdateStream(&nats.StreamConfig{
Name: name,
Replicas: -11,
Expand Down Expand Up @@ -4566,3 +4567,221 @@ func TestJetStreamClusterBadEncryptKey(t *testing.T) {
t.Fatalf("Expected server not to start")
}
}

func TestJetStreamAccountUsageDrifts(t *testing.T) {
tmpl := `
listen: 127.0.0.1:-1
server_name: %s
jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'}
leaf {
listen: 127.0.0.1:-1
}
cluster {
name: %s
listen: 127.0.0.1:%d
routes = [%s]
}
`
opFrag := `
operator: %s
system_account: %s
resolver: { type: MEM }
resolver_preload = {
%s : %s
%s : %s
}
`

_, syspub := createKey(t)
sysJwt := encodeClaim(t, jwt.NewAccountClaims(syspub), syspub)

accKp, aExpPub := createKey(t)
accClaim := jwt.NewAccountClaims(aExpPub)
accClaim.Limits.JetStreamTieredLimits["R1"] = jwt.JetStreamLimits{
DiskStorage: -1, Consumer: 1, Streams: 1}
accClaim.Limits.JetStreamTieredLimits["R3"] = jwt.JetStreamLimits{
DiskStorage: -1, Consumer: 1, Streams: 1}
accJwt := encodeClaim(t, accClaim, aExpPub)
accCreds := newUser(t, accKp)

template := tmpl + fmt.Sprintf(opFrag, ojwt, syspub, syspub, sysJwt, aExpPub, accJwt)
c := createJetStreamClusterWithTemplate(t, template, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer(), nats.UserCredentials(accCreds))
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST1",
Subjects: []string{"foo"},
MaxBytes: 1 * 1024 * 1024 * 1024,
MaxMsgs: 1000,
Replicas: 3,
})
require_NoError(t, err)

_, err = js.AddStream(&nats.StreamConfig{
Name: "TEST2",
Subjects: []string{"bar"},
})
require_NoError(t, err)

// These expected store values can come directory from stream info's state bytes.
// We will *= 3 for R3
checkAccount := func(r1u, r3u uint64) {
t.Helper()
r3u *= 3

// Remote usage updates can be delayed, so wait for a bit for values we want.
checkFor(t, 5*time.Second, 250*time.Millisecond, func() error {
info, err := js.AccountInfo()
require_NoError(t, err)
require_True(t, len(info.Tiers) >= 2)
// These can move.
if u := info.Tiers["R1"].Store; u != r1u {
return fmt.Errorf("Expected R1 to be %v, got %v", friendlyBytes(r1u), friendlyBytes(u))
}
if u := info.Tiers["R3"].Store; u != r3u {
return fmt.Errorf("Expected R3 to be %v, got %v", friendlyBytes(r3u), friendlyBytes(u))
}
return nil
})
}

checkAccount(0, 0)

// Now add in some R3 data.
msg := bytes.Repeat([]byte("Z"), 32*1024) // 32k
smallMsg := bytes.Repeat([]byte("Z"), 4*1024) // 4k

for i := 0; i < 1000; i++ {
js.Publish("foo", msg)
}
sir3, err := js.StreamInfo("TEST1")
require_NoError(t, err)

checkAccount(0, sir3.State.Bytes)

// Now add in some R1 data.
for i := 0; i < 100; i++ {
js.Publish("bar", msg)
}

sir1, err := js.StreamInfo("TEST2")
require_NoError(t, err)

checkAccount(sir1.State.Bytes, sir3.State.Bytes)

// We will now test a bunch of scenarios to see that we are doing accounting correctly.

// Since our R3 has a limit of 1000 msgs, let's add in more msgs and drop older ones.
for i := 0; i < 100; i++ {
js.Publish("foo", smallMsg)
}
sir3, err = js.StreamInfo("TEST1")
require_NoError(t, err)

checkAccount(sir1.State.Bytes, sir3.State.Bytes)

// Move our R3 stream leader and make sure acounting is correct.
_, err = nc.Request(fmt.Sprintf(JSApiStreamLeaderStepDownT, "TEST1"), nil, time.Second)
require_NoError(t, err)

checkAccount(sir1.State.Bytes, sir3.State.Bytes)

// Now scale down.
_, err = js.UpdateStream(&nats.StreamConfig{
Name: "TEST1",
Subjects: []string{"foo"},
MaxBytes: 1 * 1024 * 1024 * 1024,
MaxMsgs: 1000,
Replicas: 1,
})
require_NoError(t, err)

checkAccount(sir1.State.Bytes+sir3.State.Bytes, 0)

// Add in more msgs which will replace the older and bigger ones.
for i := 0; i < 100; i++ {
js.Publish("foo", smallMsg)
}
sir3, err = js.StreamInfo("TEST1")
require_NoError(t, err)

// Now scale back up.
_, err = js.UpdateStream(&nats.StreamConfig{
Name: "TEST1",
Subjects: []string{"foo"},
MaxBytes: 1 * 1024 * 1024 * 1024,
MaxMsgs: 1000,
Replicas: 3,
})
require_NoError(t, err)

checkAccount(sir1.State.Bytes, sir3.State.Bytes)

// Test Purge.
err = js.PurgeStream("TEST1")
require_NoError(t, err)

checkAccount(sir1.State.Bytes, 0)

for i := 0; i < 1000; i++ {
js.Publish("foo", smallMsg)
}
sir3, err = js.StreamInfo("TEST1")
require_NoError(t, err)

checkAccount(sir1.State.Bytes, sir3.State.Bytes)

requestLeaderStepDown := func() {
ml := c.leader()
checkFor(t, 5*time.Second, 250*time.Millisecond, func() error {
if cml := c.leader(); cml == ml {
nc.Request(JSApiLeaderStepDown, nil, time.Second)
return fmt.Errorf("Metaleader has not moved yet")
}
return nil
})
}

// Test meta leader stepdowns.
for i := 0; i < len(c.servers); i++ {
requestLeaderStepDown()
checkAccount(sir1.State.Bytes, sir3.State.Bytes)
}

// Now test cluster reset operations where we internally reset the NRG and optionally the stream too.
nl := c.randomNonStreamLeader(aExpPub, "TEST1")
acc, err := nl.LookupAccount(aExpPub)
require_NoError(t, err)
mset, err := acc.lookupStream("TEST1")
require_NoError(t, err)
// NRG only
mset.resetClusteredState(nil)
checkAccount(sir1.State.Bytes, sir3.State.Bytes)
// Now NRG and Stream state itself.
mset.resetClusteredState(errFirstSequenceMismatch)
checkAccount(sir1.State.Bytes, sir3.State.Bytes)

// Now test server restart
for _, s := range c.servers {
s.Shutdown()
s.WaitForShutdown()
s = c.restartServer(s)

// Wait on healthz and leader etc.
checkFor(t, 10*time.Second, 200*time.Millisecond, func() error {
if hs := s.healthz(nil); hs.Error != _EMPTY_ {
return errors.New(hs.Error)
}
return nil
})
c.waitOnLeader()
c.waitOnStreamLeader(aExpPub, "TEST1")
c.waitOnStreamLeader(aExpPub, "TEST2")

// Now check account again.
checkAccount(sir1.State.Bytes, sir3.State.Bytes)
}
}
3 changes: 2 additions & 1 deletion server/memstore.go
Expand Up @@ -1144,11 +1144,12 @@ func memStoreMsgSize(subj string, hdr, msg []byte) uint64 {

// Delete is same as Stop for memory store.
func (ms *memStore) Delete() error {
ms.Purge()
return ms.Stop()
}

func (ms *memStore) Stop() error {
// These can't come back, so stop is same as Delete.
ms.Purge()
ms.mu.Lock()
if ms.ageChk != nil {
ms.ageChk.Stop()
Expand Down
22 changes: 15 additions & 7 deletions server/opts.go
Expand Up @@ -1186,17 +1186,22 @@ func (o *Options) processConfigFileLine(k string, v interface{}, errors *[]error
*errors = append(*errors, &configErr{tk, err.Error()})
return
}
if dir == "" {
*errors = append(*errors, &configErr{tk, "dir has no value and needs to point to a directory"})
return
}
if info, _ := os.Stat(dir); info != nil && (!info.IsDir() || info.Mode().Perm()&(1<<(uint(7))) == 0) {
*errors = append(*errors, &configErr{tk, "dir needs to point to an accessible directory"})
return

checkDir := func() {
if dir == _EMPTY_ {
*errors = append(*errors, &configErr{tk, "dir has no value and needs to point to a directory"})
return
}
if info, _ := os.Stat(dir); info != nil && (!info.IsDir() || info.Mode().Perm()&(1<<(uint(7))) == 0) {
*errors = append(*errors, &configErr{tk, "dir needs to point to an accessible directory"})
return
}
}

var res AccountResolver
switch strings.ToUpper(dirType) {
case "CACHE":
checkDir()
if sync != 0 {
*errors = append(*errors, &configErr{tk, "CACHE does not accept sync"})
}
Expand All @@ -1208,6 +1213,7 @@ func (o *Options) processConfigFileLine(k string, v interface{}, errors *[]error
}
res, err = NewCacheDirAccResolver(dir, limit, ttl, opts...)
case "FULL":
checkDir()
if ttl != 0 {
*errors = append(*errors, &configErr{tk, "FULL does not accept ttl"})
}
Expand All @@ -1223,6 +1229,8 @@ func (o *Options) processConfigFileLine(k string, v interface{}, errors *[]error
}
}
res, err = NewDirAccResolver(dir, limit, sync, delete, opts...)
case "MEM", "MEMORY":
res = &MemAccResolver{}
}
if err != nil {
*errors = append(*errors, &configErr{tk, err.Error()})
Expand Down
4 changes: 2 additions & 2 deletions server/stream.go
Expand Up @@ -3364,9 +3364,9 @@ func (mset *stream) setupStore(fsCfg *FileStoreConfig) error {
// Register our server.
fs.registerServer(s)
}
mset.mu.Unlock()

// This will fire the callback but we do not require the lock since md will be 0 here.
mset.store.RegisterStorageUpdates(mset.storeUpdates)
mset.mu.Unlock()

return nil
}
Expand Down