Skip to content

Commit

Permalink
[FIXED] A stream raft node could stay running after a stop(). (#4118)
Browse files Browse the repository at this point in the history
This can happen when we reset a stream internally and the stream had a
prior snapshot.
Also make sure to always release resources back to the account
regardless if the store is no longer present.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed May 1, 2023
2 parents 1eed0e8 + f098c25 commit ff6c803
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 13 deletions.
10 changes: 10 additions & 0 deletions server/filestore.go
Expand Up @@ -6104,6 +6104,16 @@ func (fs *fileStore) Delete() error {
if fs.isClosed() {
// Always attempt to remove since we could have been closed beforehand.
os.RemoveAll(fs.fcfg.StoreDir)
// Since we did remove, if we did have anything remaining make sure to
// call into any storage updates that had been registered.
fs.mu.Lock()
cb, msgs, bytes := fs.scb, int64(fs.state.Msgs), int64(fs.state.Bytes)
// Guard against double accounting if called twice.
fs.state.Msgs, fs.state.Bytes = 0, 0
fs.mu.Unlock()
if msgs > 0 && cb != nil {
cb(-msgs, -bytes, 0, _EMPTY_)
}
return ErrStoreClosed
}
fs.Purge()
Expand Down
6 changes: 5 additions & 1 deletion server/jetstream.go
Expand Up @@ -2582,9 +2582,13 @@ func (jsa *jsAccount) checkTemplateOwnership(tname, sname string) bool {
return false
}

type Number interface {
int | int8 | int16 | int32 | int64 | uint | uint8 | uint16 | uint32 | uint64 | float32 | float64
}

// friendlyBytes returns a string with the given bytes int64
// represented as a size, such as 1KB, 10MB, etc...
func friendlyBytes(bytes int64) string {
func friendlyBytes[T Number](bytes T) string {
fbytes := float64(bytes)
base := 1024
pre := []string{"K", "M", "G", "T", "P", "E"}
Expand Down
87 changes: 87 additions & 0 deletions server/jetstream_cluster_3_test.go
Expand Up @@ -3849,3 +3849,90 @@ func TestJetStreamClusterHealthzCheckForStoppedAssets(t *testing.T) {
return nil
})
}

// Make sure that stopping a stream shutdowns down it's raft node.
func TestJetStreamClusterStreamNodeShutdownBugOnStop(t *testing.T) {
c := createJetStreamClusterExplicit(t, "NATS", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"*"},
Replicas: 3,
})
require_NoError(t, err)

for i := 0; i < 100; i++ {
sendStreamMsg(t, nc, "foo", "HELLO")
}

s := c.randomServer()
numNodesStart := s.numRaftNodes()
mset, err := s.GlobalAccount().lookupStream("TEST")
require_NoError(t, err)
node := mset.raftNode()
require_NotNil(t, node)
node.InstallSnapshot(mset.stateSnapshot())
// Stop the stream
mset.stop(false, false)

if numNodes := s.numRaftNodes(); numNodes != numNodesStart-1 {
t.Fatalf("RAFT nodes after stream stop incorrect: %d vs %d", numNodesStart, numNodes)
}
}

func TestJetStreamClusterStreamAccountingOnStoreError(t *testing.T) {
c := createJetStreamClusterWithTemplate(t, jsClusterMaxBytesAccountLimitTempl, "NATS", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"*"},
MaxBytes: 1 * 1024 * 1024 * 1024,
Replicas: 3,
})
require_NoError(t, err)

msg := strings.Repeat("Z", 32*1024)
for i := 0; i < 100; i++ {
sendStreamMsg(t, nc, "foo", msg)
}
s := c.randomServer()
acc, err := s.LookupAccount("$U")
require_NoError(t, err)
mset, err := acc.lookupStream("TEST")
require_NoError(t, err)
mset.mu.Lock()
mset.store.Stop()
sjs := mset.js
mset.mu.Unlock()

// Now delete the stream
js.DeleteStream("TEST")

// Wait for this to propgate.
// The bug will have us not release reserved resources properly.
time.Sleep(time.Second)
info, err := js.AccountInfo()
require_NoError(t, err)

// Default tier
if info.Store != 0 {
t.Fatalf("Expected store to be 0 but got %v", friendlyBytes(info.Store))
}

// Now check js from server directly regarding reserved.
sjs.mu.RLock()
reserved := sjs.storeReserved
sjs.mu.RUnlock()
// Under bug will show 1GB
if reserved != 0 {
t.Fatalf("Expected store reserved to be 0 after stream delete, got %v", friendlyBytes(reserved))
}
}
24 changes: 12 additions & 12 deletions server/stream.go
Expand Up @@ -4548,9 +4548,11 @@ func (mset *stream) stop(deleteFlag, advisory bool) error {
if deleteFlag {
n.Delete()
sa = mset.sa
} else if n.NeedSnapshot() {
// Attempt snapshot on clean exit.
n.InstallSnapshot(mset.stateSnapshotLocked())
} else {
if n.NeedSnapshot() {
// Attempt snapshot on clean exit.
n.InstallSnapshot(mset.stateSnapshotLocked())
}
n.Stop()
}
}
Expand Down Expand Up @@ -4642,23 +4644,21 @@ func (mset *stream) stop(deleteFlag, advisory bool) error {
sysc.closeConnection(ClientClosed)
}

if store == nil {
return nil
}

if deleteFlag {
if err := store.Delete(); err != nil {
return err
if store != nil {
// Ignore errors.
store.Delete()
}
// Release any resources.
js.releaseStreamResources(&mset.cfg)

// cleanup directories after the stream
accDir := filepath.Join(js.config.StoreDir, accName)
// no op if not empty
os.Remove(filepath.Join(accDir, streamsDir))
os.Remove(accDir)
} else if err := store.Stop(); err != nil {
return err
} else if store != nil {
// Ignore errors.
store.Stop()
}

return nil
Expand Down

0 comments on commit ff6c803

Please sign in to comment.