Skip to content

Commit

Permalink
Add test case for concurrent expected last subject sequence (#4319)
Browse files Browse the repository at this point in the history
Resolves: #4320
  • Loading branch information
derekcollison committed Jul 18, 2023
2 parents 75ad503 + 244dda8 commit 80fb29f
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 1 deletion.
83 changes: 83 additions & 0 deletions server/jetstream_test.go
Expand Up @@ -20282,3 +20282,86 @@ func TestJetStreamMaxBytesIgnored(t *testing.T) {
require_NoError(t, err)
require_True(t, si.State.Bytes <= 10*1024*1024)
}

func TestJetStreamLastSequenceBySubjectConcurrent(t *testing.T) {
for _, st := range []StorageType{FileStorage, MemoryStorage} {
t.Run(st.String(), func(t *testing.T) {
c := createJetStreamClusterExplicit(t, "JSC", 3)
defer c.shutdown()

nc0, js0 := jsClientConnect(t, c.randomServer())
defer nc0.Close()

nc1, js1 := jsClientConnect(t, c.randomServer())
defer nc1.Close()

cfg := StreamConfig{
Name: "KV",
Subjects: []string{"kv.>"},
Storage: st,
Replicas: 3,
}

req, err := json.Marshal(cfg)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Do manually for now.
m, err := nc0.Request(fmt.Sprintf(JSApiStreamCreateT, cfg.Name), req, time.Second)
require_NoError(t, err)
si, err := js0.StreamInfo("KV")
if err != nil {
t.Fatalf("Unexpected error: %v, respmsg: %q", err, string(m.Data))
}
if si == nil || si.Config.Name != "KV" {
t.Fatalf("StreamInfo is not correct %+v", si)
}

pub := func(js nats.JetStreamContext, subj, data, seq string) {
t.Helper()
m := nats.NewMsg(subj)
m.Data = []byte(data)
m.Header.Set(JSExpectedLastSubjSeq, seq)
js.PublishMsg(m)
}

ready := make(chan struct{})
wg := &sync.WaitGroup{}
wg.Add(2)

go func() {
<-ready
pub(js0, "kv.foo", "0-0", "0")
pub(js0, "kv.foo", "0-1", "1")
pub(js0, "kv.foo", "0-2", "2")
wg.Done()
}()

go func() {
<-ready
pub(js1, "kv.foo", "1-0", "0")
pub(js1, "kv.foo", "1-1", "1")
pub(js1, "kv.foo", "1-2", "2")
wg.Done()
}()

time.Sleep(50 * time.Millisecond)
close(ready)
wg.Wait()

// Read the messages.
sub, err := js0.PullSubscribe(_EMPTY_, _EMPTY_, nats.BindStream("KV"))
require_NoError(t, err)
msgs, err := sub.Fetch(10)
require_NoError(t, err)
if len(msgs) != 3 {
t.Errorf("Expected 3 messages, got %d", len(msgs))
}
for i, m := range msgs {
if m.Header.Get(JSExpectedLastSubjSeq) != fmt.Sprint(i) {
t.Errorf("Expected %d for last sequence, got %q", i, m.Header.Get(JSExpectedLastSubjSeq))
}
}
})
}
}
2 changes: 1 addition & 1 deletion server/stream.go
Expand Up @@ -3838,7 +3838,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
}
// Expected last sequence per subject.
// If we are clustered we have prechecked seq > 0.
if seq, exists := getExpectedLastSeqPerSubject(hdr); exists && (!isClustered || seq == 0) {
if seq, exists := getExpectedLastSeqPerSubject(hdr); exists {
// TODO(dlc) - We could make a new store func that does this all in one.
var smv StoreMsg
var fseq uint64
Expand Down

0 comments on commit 80fb29f

Please sign in to comment.