Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] Additional fix for #3734. #4166

Merged
merged 1 commit into from May 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
37 changes: 28 additions & 9 deletions server/filestore.go
Expand Up @@ -989,6 +989,9 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) {
mb.last.seq, mb.last.ts = 0, 0
firstNeedsSet := true

// Remove the .fss file from disk.
mb.removePerSubjectInfoLocked()

// Check if we need to decrypt.
if mb.bek != nil && len(buf) > 0 {
// Recreate to reset counter.
Expand Down Expand Up @@ -1186,6 +1189,13 @@ func (fs *fileStore) recoverMsgs() error {
return err
}
if mb, err := fs.recoverMsgBlock(finfo, index); err == nil && mb != nil {
// This is a truncate block with possibly no index. If the OS got shutdown
// out from underneath of us this is possible.
if mb.first.seq == 0 {
mb.dirtyCloseWithRemove(true)
fs.removeMsgBlockFromList(mb)
continue
}
if fs.state.FirstSeq == 0 || mb.first.seq < fs.state.FirstSeq {
fs.state.FirstSeq = mb.first.seq
fs.state.FirstTime = time.Unix(0, mb.first.ts).UTC()
Expand Down Expand Up @@ -2468,12 +2478,16 @@ func (fs *fileStore) rebuildFirst() {
if len(fs.blks) == 0 {
return
}
if fmb := fs.blks[0]; fmb != nil {
fmb.removeIndexFile()
fmb.rebuildState()
fmb.writeIndexInfo()
fs.selectNextFirst()
fmb := fs.blks[0]
if fmb == nil {
return
}

fmb.removeIndexFile()
ld, _ := fmb.rebuildState()
fmb.writeIndexInfo()
fs.selectNextFirst()
fs.rebuildStateLocked(ld)
}

// Optimized helper function to return first sequence.
Expand Down Expand Up @@ -5667,11 +5681,9 @@ func (fs *fileStore) addMsgBlock(mb *msgBlock) {
fs.bim[mb.index] = mb
}

// Removes the msgBlock
// Remove from our list of blks.
// Both locks should be held.
func (fs *fileStore) removeMsgBlock(mb *msgBlock) {
mb.dirtyCloseWithRemove(true)

func (fs *fileStore) removeMsgBlockFromList(mb *msgBlock) {
// Remove from list.
for i, omb := range fs.blks {
if mb == omb {
Expand All @@ -5683,6 +5695,13 @@ func (fs *fileStore) removeMsgBlock(mb *msgBlock) {
break
}
}
}

// Removes the msgBlock
// Both locks should be held.
func (fs *fileStore) removeMsgBlock(mb *msgBlock) {
mb.dirtyCloseWithRemove(true)
fs.removeMsgBlockFromList(mb)
// Check for us being last message block
if mb == fs.lmb {
// Creating a new message write block requires that the lmb lock is not held.
Expand Down
59 changes: 59 additions & 0 deletions server/jetstream_test.go
Expand Up @@ -20002,3 +20002,62 @@ func TestJetStreamSnapshotRestoreStallAndHealthz(t *testing.T) {
t.Fatalf("Expected health to be ok, got %+v", hs)
}
}

// https://github.com/nats-io/nats-server/pull/4163
func TestJetStreamMaxBytesIgnored(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

nc, js := jsClientConnect(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"*"},
MaxBytes: 10 * 1024 * 1024,
})
require_NoError(t, err)

msg := bytes.Repeat([]byte("A"), 1024*1024)

for i := 0; i < 10; i++ {
_, err := js.Publish("x", msg)
require_NoError(t, err)
}

si, err := js.StreamInfo("TEST")
require_NoError(t, err)
require_True(t, si.State.Msgs == 9)

// Stop current
sd := s.JetStreamConfig().StoreDir
s.Shutdown()

// We will remove the idx file and truncate the blk and fss files.
mdir := filepath.Join(sd, "$G", "streams", "TEST", "msgs")
// Remove idx
err = os.Remove(filepath.Join(mdir, "1.idx"))
require_NoError(t, err)
// Truncate fss
err = os.WriteFile(filepath.Join(mdir, "1.fss"), nil, defaultFilePerms)
require_NoError(t, err)
// Truncate blk
err = os.WriteFile(filepath.Join(mdir, "1.blk"), nil, defaultFilePerms)
require_NoError(t, err)

// Restart.
s = RunJetStreamServerOnPort(-1, sd)
defer s.Shutdown()

nc, js = jsClientConnect(t, s)
defer nc.Close()

for i := 0; i < 10; i++ {
_, err := js.Publish("x", msg)
require_NoError(t, err)
}

si, err = js.StreamInfo("TEST")
require_NoError(t, err)
require_True(t, si.State.Bytes <= 10*1024*1024)
}