Skip to content

Commit

Permalink
Add first_seq to StreamConfig (#4322)
Browse files Browse the repository at this point in the history
This allows specifying `first_seq` in the stream config when creating a
stream.

Signed-off-by: Neil Twigg <neil@nats.io>
  • Loading branch information
neilalexander committed Jul 26, 2023
2 parents ff33dd2 + 65cb4b9 commit 2cb1512
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 1 deletion.
15 changes: 15 additions & 0 deletions server/filestore.go
Expand Up @@ -416,6 +416,21 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim
return nil, err
}

// If the stream has an initial sequence number then make sure we
// have purged up until that point. We will do this only if the
// recovered first sequence number is before our configured first
// sequence. Need to do this locked as by now the age check timer
// has started.
var st StreamState
fs.mu.RLock()
fs.FastState(&st)
fs.mu.RUnlock()
if cfg.FirstSeq > 0 && st.FirstSeq <= cfg.FirstSeq {
if _, err := fs.purge(cfg.FirstSeq); err != nil {
return nil, err
}
}

// Write our meta data if it does not exist or is zero'd out.
meta := filepath.Join(fcfg.StoreDir, JetStreamMetaFile)
fi, err := os.Stat(meta)
Expand Down
31 changes: 31 additions & 0 deletions server/filestore_test.go
Expand Up @@ -5642,3 +5642,34 @@ func TestFileStoreRestoreEncryptedWithNoKeyFuncFails(t *testing.T) {
)
require_Error(t, err, errNoMainKey)
}

func TestFileStoreInitialFirstSeq(t *testing.T) {
testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) {
fs, err := newFileStore(fcfg, StreamConfig{Name: "zzz", Storage: FileStorage, FirstSeq: 1000})
require_NoError(t, err)
defer fs.Stop()

seq, _, err := fs.StoreMsg("A", nil, []byte("OK"))
require_NoError(t, err)
if seq != 1000 {
t.Fatalf("Message should have been sequence 1000 but was %d", seq)
}

seq, _, err = fs.StoreMsg("B", nil, []byte("OK"))
require_NoError(t, err)
if seq != 1001 {
t.Fatalf("Message should have been sequence 1001 but was %d", seq)
}

var state StreamState
fs.FastState(&state)
switch {
case state.Msgs != 2:
t.Fatalf("Expected 2 messages, got %d", state.Msgs)
case state.FirstSeq != 1000:
t.Fatalf("Expected first seq 1000, got %d", state.FirstSeq)
case state.LastSeq != 1001:
t.Fatalf("Expected last seq 1001, got %d", state.LastSeq)
}
})
}
18 changes: 17 additions & 1 deletion server/memstore.go
Expand Up @@ -51,6 +51,11 @@ func newMemStore(cfg *StreamConfig) (*memStore, error) {
maxp: cfg.MaxMsgsPer,
cfg: *cfg,
}
if cfg.FirstSeq > 0 {
if _, err := ms.purge(cfg.FirstSeq); err != nil {
return nil, err
}
}

return ms, nil
}
Expand Down Expand Up @@ -666,11 +671,22 @@ func (ms *memStore) PurgeEx(subject string, sequence, keep uint64) (purged uint6
// Purge will remove all messages from this store.
// Will return the number of purged messages.
func (ms *memStore) Purge() (uint64, error) {
ms.mu.RLock()
first := ms.state.LastSeq + 1
ms.mu.RUnlock()
return ms.purge(first)
}

func (ms *memStore) purge(fseq uint64) (uint64, error) {
ms.mu.Lock()
purged := uint64(len(ms.msgs))
cb := ms.scb
bytes := int64(ms.state.Bytes)
ms.state.FirstSeq = ms.state.LastSeq + 1
if fseq < ms.state.LastSeq {
return 0, fmt.Errorf("partial purges not supported on memory store")
}
ms.state.FirstSeq = fseq
ms.state.LastSeq = fseq - 1
ms.state.FirstTime = time.Time{}
ms.state.Bytes = 0
ms.state.Msgs = 0
Expand Down
33 changes: 33 additions & 0 deletions server/memstore_test.go
Expand Up @@ -721,3 +721,36 @@ func TestMemStoreNumPending(t *testing.T) {
}
}
}

func TestMemStoreInitialFirstSeq(t *testing.T) {
cfg := &StreamConfig{
Name: "zzz",
Storage: MemoryStorage,
FirstSeq: 1000,
}
ms, err := newMemStore(cfg)
require_NoError(t, err)

seq, _, err := ms.StoreMsg("A", nil, []byte("OK"))
require_NoError(t, err)
if seq != 1000 {
t.Fatalf("Message should have been sequence 1000 but was %d", seq)
}

seq, _, err = ms.StoreMsg("B", nil, []byte("OK"))
require_NoError(t, err)
if seq != 1001 {
t.Fatalf("Message should have been sequence 1001 but was %d", seq)
}

var state StreamState
ms.FastState(&state)
switch {
case state.Msgs != 2:
t.Fatalf("Expected 2 messages, got %d", state.Msgs)
case state.FirstSeq != 1000:
t.Fatalf("Expected first seq 1000, got %d", state.FirstSeq)
case state.LastSeq != 1001:
t.Fatalf("Expected last seq 1001, got %d", state.LastSeq)
}
}
1 change: 1 addition & 0 deletions server/stream.go
Expand Up @@ -58,6 +58,7 @@ type StreamConfig struct {
Mirror *StreamSource `json:"mirror,omitempty"`
Sources []*StreamSource `json:"sources,omitempty"`
Compression StoreCompression `json:"compression"`
FirstSeq uint64 `json:"first_seq,omitempty"`

// Allow applying a subject transform to incoming messages before doing anything else
SubjectTransform *SubjectTransformConfig `json:"subject_transform,omitempty"`
Expand Down

0 comments on commit 2cb1512

Please sign in to comment.