Skip to content

Commit

Permalink
[FIXED] Additional fix for #3734. (#4166)
Browse files Browse the repository at this point in the history
When the first block was truncated and missing any index info we would
not properly rebuild the state.

Signed-off-by: Derek Collison <derek@nats.io>

Resolves #3734
  • Loading branch information
derekcollison committed May 15, 2023
2 parents ee38f8b + 3602ff5 commit 9434110
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 9 deletions.
37 changes: 28 additions & 9 deletions server/filestore.go
Expand Up @@ -989,6 +989,9 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) {
mb.last.seq, mb.last.ts = 0, 0
firstNeedsSet := true

// Remove the .fss file from disk.
mb.removePerSubjectInfoLocked()

// Check if we need to decrypt.
if mb.bek != nil && len(buf) > 0 {
// Recreate to reset counter.
Expand Down Expand Up @@ -1186,6 +1189,13 @@ func (fs *fileStore) recoverMsgs() error {
return err
}
if mb, err := fs.recoverMsgBlock(finfo, index); err == nil && mb != nil {
// This is a truncate block with possibly no index. If the OS got shutdown
// out from underneath of us this is possible.
if mb.first.seq == 0 {
mb.dirtyCloseWithRemove(true)
fs.removeMsgBlockFromList(mb)
continue
}
if fs.state.FirstSeq == 0 || mb.first.seq < fs.state.FirstSeq {
fs.state.FirstSeq = mb.first.seq
fs.state.FirstTime = time.Unix(0, mb.first.ts).UTC()
Expand Down Expand Up @@ -2468,12 +2478,16 @@ func (fs *fileStore) rebuildFirst() {
if len(fs.blks) == 0 {
return
}
if fmb := fs.blks[0]; fmb != nil {
fmb.removeIndexFile()
fmb.rebuildState()
fmb.writeIndexInfo()
fs.selectNextFirst()
fmb := fs.blks[0]
if fmb == nil {
return
}

fmb.removeIndexFile()
ld, _ := fmb.rebuildState()
fmb.writeIndexInfo()
fs.selectNextFirst()
fs.rebuildStateLocked(ld)
}

// Optimized helper function to return first sequence.
Expand Down Expand Up @@ -5667,11 +5681,9 @@ func (fs *fileStore) addMsgBlock(mb *msgBlock) {
fs.bim[mb.index] = mb
}

// Removes the msgBlock
// Remove from our list of blks.
// Both locks should be held.
func (fs *fileStore) removeMsgBlock(mb *msgBlock) {
mb.dirtyCloseWithRemove(true)

func (fs *fileStore) removeMsgBlockFromList(mb *msgBlock) {
// Remove from list.
for i, omb := range fs.blks {
if mb == omb {
Expand All @@ -5683,6 +5695,13 @@ func (fs *fileStore) removeMsgBlock(mb *msgBlock) {
break
}
}
}

// Removes the msgBlock
// Both locks should be held.
func (fs *fileStore) removeMsgBlock(mb *msgBlock) {
mb.dirtyCloseWithRemove(true)
fs.removeMsgBlockFromList(mb)
// Check for us being last message block
if mb == fs.lmb {
// Creating a new message write block requires that the lmb lock is not held.
Expand Down
59 changes: 59 additions & 0 deletions server/jetstream_test.go
Expand Up @@ -20002,3 +20002,62 @@ func TestJetStreamSnapshotRestoreStallAndHealthz(t *testing.T) {
t.Fatalf("Expected health to be ok, got %+v", hs)
}
}

// https://github.com/nats-io/nats-server/pull/4163
func TestJetStreamMaxBytesIgnored(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

nc, js := jsClientConnect(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"*"},
MaxBytes: 10 * 1024 * 1024,
})
require_NoError(t, err)

msg := bytes.Repeat([]byte("A"), 1024*1024)

for i := 0; i < 10; i++ {
_, err := js.Publish("x", msg)
require_NoError(t, err)
}

si, err := js.StreamInfo("TEST")
require_NoError(t, err)
require_True(t, si.State.Msgs == 9)

// Stop current
sd := s.JetStreamConfig().StoreDir
s.Shutdown()

// We will remove the idx file and truncate the blk and fss files.
mdir := filepath.Join(sd, "$G", "streams", "TEST", "msgs")
// Remove idx
err = os.Remove(filepath.Join(mdir, "1.idx"))
require_NoError(t, err)
// Truncate fss
err = os.WriteFile(filepath.Join(mdir, "1.fss"), nil, defaultFilePerms)
require_NoError(t, err)
// Truncate blk
err = os.WriteFile(filepath.Join(mdir, "1.blk"), nil, defaultFilePerms)
require_NoError(t, err)

// Restart.
s = RunJetStreamServerOnPort(-1, sd)
defer s.Shutdown()

nc, js = jsClientConnect(t, s)
defer nc.Close()

for i := 0; i < 10; i++ {
_, err := js.Publish("x", msg)
require_NoError(t, err)
}

si, err = js.StreamInfo("TEST")
require_NoError(t, err)
require_True(t, si.State.Bytes <= 10*1024*1024)
}

0 comments on commit 9434110

Please sign in to comment.