Skip to content

Commit

Permalink
Add first_seq support to memory store too
Browse files Browse the repository at this point in the history
Signed-off-by: Neil Twigg <neil@nats.io>
  • Loading branch information
neilalexander committed Jul 25, 2023
1 parent 9538a18 commit 4d8d019
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 10 deletions.
11 changes: 5 additions & 6 deletions server/filestore.go
Expand Up @@ -417,12 +417,11 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim
}

// If the stream has an initial sequence number then make sure we
// have purged up until that point. Checking the number of blocks
// is a cheap way of checking if the stream looks new-ish, rather
// than re-running a potentially expensive set of dirty closes on
// an existing stream. A new stream should have 1 block (as the
// lmb has already been created) but account for 0 just in case.
if len(fs.blks) <= 1 && cfg.FirstSeq > 0 {
// have purged up until that point. We will do this only if the
// recovered first sequence number is before our configured first
// sequence.
fs.FastState(&fs.state)
if cfg.FirstSeq > 0 && fs.state.FirstSeq <= cfg.FirstSeq {
if _, err := fs.purge(cfg.FirstSeq); err != nil {
return nil, err
}
Expand Down
21 changes: 17 additions & 4 deletions server/memstore.go
Expand Up @@ -45,15 +45,17 @@ func newMemStore(cfg *StreamConfig) (*memStore, error) {
if cfg.Storage != MemoryStorage {
return nil, fmt.Errorf("memStore requires memory storage type in config")
}
if cfg.FirstSeq > 0 {
return nil, fmt.Errorf("setting the initial sequence is not supported by memory store")
}
ms := &memStore{
msgs: make(map[uint64]*StoreMsg),
fss: make(map[string]*SimpleState),
maxp: cfg.MaxMsgsPer,
cfg: *cfg,
}
if cfg.FirstSeq > 0 {
if _, err := ms.purge(cfg.FirstSeq); err != nil {
return nil, err
}
}

return ms, nil
}
Expand Down Expand Up @@ -669,11 +671,22 @@ func (ms *memStore) PurgeEx(subject string, sequence, keep uint64) (purged uint6
// Purge will remove all messages from this store.
// Will return the number of purged messages.
func (ms *memStore) Purge() (uint64, error) {
ms.mu.RLock()
first := ms.state.LastSeq + 1
ms.mu.RUnlock()
return ms.purge(first)
}

func (ms *memStore) purge(fseq uint64) (uint64, error) {
ms.mu.Lock()
purged := uint64(len(ms.msgs))
cb := ms.scb
bytes := int64(ms.state.Bytes)
ms.state.FirstSeq = ms.state.LastSeq + 1
if fseq < ms.state.LastSeq {
return 0, fmt.Errorf("partial purges not supported on memory store")
}
ms.state.FirstSeq = fseq
ms.state.LastSeq = fseq - 1
ms.state.FirstTime = time.Time{}
ms.state.Bytes = 0
ms.state.Msgs = 0
Expand Down
33 changes: 33 additions & 0 deletions server/memstore_test.go
Expand Up @@ -721,3 +721,36 @@ func TestMemStoreNumPending(t *testing.T) {
}
}
}

func TestMemStoreInitialFirstSeq(t *testing.T) {
cfg := &StreamConfig{
Name: "zzz",
Storage: MemoryStorage,
FirstSeq: 1000,
}
ms, err := newMemStore(cfg)
require_NoError(t, err)

seq, _, err := ms.StoreMsg("A", nil, []byte("OK"))
require_NoError(t, err)
if seq != 1000 {
t.Fatalf("Message should have been sequence 1000 but was %d", seq)
}

seq, _, err = ms.StoreMsg("B", nil, []byte("OK"))
require_NoError(t, err)
if seq != 1001 {
t.Fatalf("Message should have been sequence 1001 but was %d", seq)
}

var state StreamState
ms.FastState(&state)
switch {
case state.Msgs != 2:
t.Fatalf("Expected 2 messages, got %d", state.Msgs)
case state.FirstSeq != 1000:
t.Fatalf("Expected first seq 1000, got %d", state.FirstSeq)
case state.LastSeq != 1001:
t.Fatalf("Expected last seq 1001, got %d", state.LastSeq)
}
}

0 comments on commit 4d8d019

Please sign in to comment.