Skip to content

Commit

Permalink
[IMPROVED] Add in warnings for filestore recover state if happy path …
Browse files Browse the repository at this point in the history
…fails. (#4599)

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Sep 27, 2023
2 parents c6b26ab + aeef0ef commit bc012d7
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 17 deletions.
34 changes: 27 additions & 7 deletions server/filestore.go
Expand Up @@ -60,6 +60,9 @@ type FileStoreConfig struct {
Cipher StoreCipher
// Compression is the algorithm to use when compressing.
Compression StoreCompression

// Internal reference to our server.
srv *Server
}

// FileStreamInfo allows us to remember created time.
Expand Down Expand Up @@ -387,6 +390,7 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim
qch: make(chan struct{}),
fch: make(chan struct{}, 1),
fsld: make(chan struct{}),
srv: fcfg.srv,
}

// Set flush in place to AsyncFlush which by default is false.
Expand Down Expand Up @@ -527,12 +531,6 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim
return fs, nil
}

func (fs *fileStore) registerServer(s *Server) {
fs.mu.Lock()
defer fs.mu.Unlock()
fs.srv = s
}

// Lock all existing message blocks.
// Lock held on entry.
func (fs *fileStore) lockAllMsgBlocks() {
Expand Down Expand Up @@ -1436,6 +1434,16 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, []uint64, error) {
return nil, tombstones, nil
}

// For doing warn logging.
// Lock should be held.
func (fs *fileStore) warn(format string, args ...any) {
// No-op if no server configured.
if fs.srv == nil {
return
}
fs.srv.Warnf(fmt.Sprintf("Filestore [%s] %s", fs.cfg.Name, format), args...)
}

// recoverFullState will attempt to receover our last full state and re-process any state changes
// that happened afterwards.
func (fs *fileStore) recoverFullState() (rerr error) {
Expand All @@ -1455,12 +1463,16 @@ func (fs *fileStore) recoverFullState() (rerr error) {
dios <- struct{}{}

if err != nil {
if !os.IsNotExist(err) {
fs.warn("Could not read stream state file: %v", err)
}
return err
}

const minLen = 32
if len(buf) < minLen {
os.Remove(fn)
fs.warn("Stream state too short (%d bytes)", len(buf))
return errCorruptState
}

Expand All @@ -1471,6 +1483,7 @@ func (fs *fileStore) recoverFullState() (rerr error) {
fs.hh.Write(buf)
if !bytes.Equal(h, fs.hh.Sum(nil)) {
os.Remove(fn)
fs.warn("Stream state checksum did not match")
return errCorruptState
}

Expand All @@ -1482,13 +1495,15 @@ func (fs *fileStore) recoverFullState() (rerr error) {
ns := fs.aek.NonceSize()
buf, err = fs.aek.Open(nil, buf[:ns], buf[ns:], nil)
if err != nil {
fs.warn("Stream state error reading encryption key: %v", err)
return err
}
}
}

if buf[0] != fullStateMagic || buf[1] != fullStateVersion {
os.Remove(fn)
fs.warn("Stream state magic and version mismatch")
return errCorruptState
}

Expand Down Expand Up @@ -1543,6 +1558,7 @@ func (fs *fileStore) recoverFullState() (rerr error) {
if lsubj := int(readU64()); lsubj > 0 {
if bi+lsubj > len(buf) {
os.Remove(fn)
fs.warn("Stream state bad subject len (%d)", lsubj)
return errCorruptState
}
subj := fs.subjString(buf[bi : bi+lsubj])
Expand Down Expand Up @@ -1573,6 +1589,7 @@ func (fs *fileStore) recoverFullState() (rerr error) {
dmap, n, err := avl.Decode(buf[bi:])
if err != nil {
os.Remove(fn)
fs.warn("Stream state error decoding avl dmap: %v", err)
return errCorruptState
}
mb.dmap = *dmap
Expand Down Expand Up @@ -1605,6 +1622,7 @@ func (fs *fileStore) recoverFullState() (rerr error) {
// Check if we had any errors.
if bi < 0 {
os.Remove(fn)
fs.warn("Stream state has no checksum present")
return errCorruptState
}

Expand All @@ -1621,9 +1639,9 @@ func (fs *fileStore) recoverFullState() (rerr error) {
if ld, _, _ := mb.rebuildState(); ld != nil {
fs.addLostData(ld)
}
fs.warn("Stream state detected prior state, could not locate msg block %d", blkIndex)
return errPriorState
}

if matched = bytes.Equal(mb.lastChecksum(), lchk[:]); !matched {
// Remove the last message block since we will re-process below.
fs.removeMsgBlockFromList(mb)
Expand All @@ -1644,12 +1662,14 @@ func (fs *fileStore) recoverFullState() (rerr error) {
return nil
}
os.Remove(fn)
fs.warn("Stream state could not recover msg block %d", bi)
return err
}
if nmb != nil {
// Check if we have to account for a partial message block.
if !matched && mb != nil && mb.index == nmb.index {
if err := fs.adjustAccounting(mb, nmb); err != nil {
fs.warn("Stream state could not adjust accounting: %v", err)
return err
}
}
Expand Down
69 changes: 69 additions & 0 deletions server/filestore_test.go
Expand Up @@ -6096,6 +6096,75 @@ func TestFileStoreRemoveLastNoDoubleTombstones(t *testing.T) {
require_Equal(t, rbytes, emptyRecordLen)
}

func TestFileStoreFullStateMultiBlockPastWAL(t *testing.T) {
testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) {
fcfg.BlockSize = 100
scfg := StreamConfig{Name: "zzz", Subjects: []string{"*"}, Storage: FileStorage}

prf := func(context []byte) ([]byte, error) {
h := hmac.New(sha256.New, []byte("dlc22"))
if _, err := h.Write(context); err != nil {
return nil, err
}
return h.Sum(nil), nil
}
if fcfg.Cipher == NoCipher {
prf = nil
}

fs, err := newFileStoreWithCreated(fcfg, scfg, time.Now(), prf, nil)
require_NoError(t, err)
defer fs.Stop()

// This yields an internal record length of 50 bytes. So 2 msgs per blk.
msgLen := 19
msgA := bytes.Repeat([]byte("A"), msgLen)
msgZ := bytes.Repeat([]byte("Z"), msgLen)

// Store 2 msgs
fs.StoreMsg("A", nil, msgA)
fs.StoreMsg("B", nil, msgZ)
require_Equal(t, fs.numMsgBlocks(), 1)
fs.Stop()

// Grab the state from this stop.
sfile := filepath.Join(fcfg.StoreDir, msgDir, streamStreamStateFile)
buf, err := os.ReadFile(sfile)
require_NoError(t, err)

fs, err = newFileStoreWithCreated(fcfg, scfg, time.Now(), prf, nil)
require_NoError(t, err)
defer fs.Stop()

// Store 2 more msgs and delete 2 & 4, then another 2 msgs.
fs.StoreMsg("C", nil, msgA)
fs.StoreMsg("D", nil, msgZ)
fs.StoreMsg("E", nil, msgA)
fs.StoreMsg("F", nil, msgZ)
fs.StoreMsg("G", nil, msgA)
fs.StoreMsg("H", nil, msgZ)
require_Equal(t, fs.numMsgBlocks(), 4)
state := fs.State()
fs.Stop()

// Put back old stream state.
// This will test that we properly walk multiple blocks past where we snapshotted state.
fs.Stop()
err = os.WriteFile(sfile, buf, defaultFilePerms)
require_NoError(t, err)

fs, err = newFileStoreWithCreated(fcfg, scfg, time.Now(), prf, nil)
require_NoError(t, err)
defer fs.Stop()

if newState := fs.State(); !reflect.DeepEqual(state, newState) {
t.Fatalf("Restore state does not match:\n%+v\n%+v",
state, newState)
}
require_True(t, !state.FirstTime.IsZero())
})
}

///////////////////////////////////////////////////////////////////////////
// Benchmarks
///////////////////////////////////////////////////////////////////////////
Expand Down
9 changes: 2 additions & 7 deletions server/jetstream_cluster.go
Expand Up @@ -755,7 +755,7 @@ func (js *jetStream) setupMetaGroup() error {
storeDir := filepath.Join(js.config.StoreDir, sysAcc.Name, defaultStoreDirName, defaultMetaGroupName)

fs, err := newFileStoreWithCreated(
FileStoreConfig{StoreDir: storeDir, BlockSize: defaultMetaFSBlkSize, AsyncFlush: false},
FileStoreConfig{StoreDir: storeDir, BlockSize: defaultMetaFSBlkSize, AsyncFlush: false, srv: s},
StreamConfig{Name: defaultMetaGroupName, Storage: FileStorage},
time.Now().UTC(),
s.jsKeyGen(s.getOpts().JetStreamKey, defaultMetaGroupName),
Expand All @@ -766,9 +766,6 @@ func (js *jetStream) setupMetaGroup() error {
return err
}

// Register our server.
fs.registerServer(s)

cfg := &RaftConfig{Name: defaultMetaGroupName, Store: storeDir, Log: fs}

// If we are soliciting leafnode connections and we are sharing a system account and do not disable it with a hint,
Expand Down Expand Up @@ -2032,7 +2029,7 @@ func (js *jetStream) createRaftGroup(accName string, rg *raftGroup, storage Stor
var store StreamStore
if storage == FileStorage {
fs, err := newFileStoreWithCreated(
FileStoreConfig{StoreDir: storeDir, BlockSize: defaultMediumBlockSize, AsyncFlush: false, SyncInterval: 5 * time.Minute},
FileStoreConfig{StoreDir: storeDir, BlockSize: defaultMediumBlockSize, AsyncFlush: false, SyncInterval: 5 * time.Minute, srv: s},
StreamConfig{Name: rg.Name, Storage: FileStorage},
time.Now().UTC(),
s.jsKeyGen(s.getOpts().JetStreamKey, rg.Name),
Expand All @@ -2042,8 +2039,6 @@ func (js *jetStream) createRaftGroup(accName string, rg *raftGroup, storage Stor
s.Errorf("Error creating filestore WAL: %v", err)
return err
}
// Register our server.
fs.registerServer(s)
store = fs
} else {
ms, err := newMemStore(&StreamConfig{Name: rg.Name, Storage: MemoryStorage})
Expand Down
6 changes: 3 additions & 3 deletions server/stream.go
Expand Up @@ -3665,14 +3665,14 @@ func (mset *stream) setupStore(fsCfg *FileStoreConfig) error {
fsCfg.Cipher = s.getOpts().JetStreamCipher
}
oldprf := s.jsKeyGen(s.getOpts().JetStreamOldKey, mset.acc.Name)
fs, err := newFileStoreWithCreated(*fsCfg, mset.cfg, mset.created, prf, oldprf)
cfg := *fsCfg
cfg.srv = s
fs, err := newFileStoreWithCreated(cfg, mset.cfg, mset.created, prf, oldprf)
if err != nil {
mset.mu.Unlock()
return err
}
mset.store = fs
// Register our server.
fs.registerServer(s)
}
// This will fire the callback but we do not require the lock since md will be 0 here.
mset.store.RegisterStorageUpdates(mset.storeUpdates)
Expand Down

0 comments on commit bc012d7

Please sign in to comment.