Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] JetStream: ordered consumers handling of auto unsub #847

Merged
merged 1 commit into from Oct 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
23 changes: 23 additions & 0 deletions js.go
Expand Up @@ -1644,6 +1644,26 @@ func (sub *Subscription) resetOrderedConsumer(sseq uint64) {
return
}

var maxStr string
// If there was an AUTO_UNSUB done, we need to adjust the new value
// to send after the SUB for the new sid.
if sub.max > 0 {
if sub.jsi.fciseq < sub.max {
adjustedMax := sub.max - sub.jsi.fciseq
maxStr = strconv.Itoa(int(adjustedMax))
} else {
// We are already at the max, so we should just unsub the
// existing sub and be done
go func(sid int64) {
nc.mu.Lock()
nc.bw.appendString(fmt.Sprintf(unsubProto, sid, _EMPTY_))
nc.kickFlusher()
nc.mu.Unlock()
}(sub.sid)
return
}
}

// Quick unsubscribe. Since we know this is a simple push subscriber we do in place.
osid := sub.applyNewSID()

Expand All @@ -1663,6 +1683,9 @@ func (sub *Subscription) resetOrderedConsumer(sseq uint64) {
nc.mu.Lock()
nc.bw.appendString(fmt.Sprintf(unsubProto, osid, _EMPTY_))
nc.bw.appendString(fmt.Sprintf(subProto, newDeliver, _EMPTY_, nsid))
if maxStr != _EMPTY_ {
nc.bw.appendString(fmt.Sprintf(unsubProto, nsid, maxStr))
}
nc.kickFlusher()
nc.mu.Unlock()

Expand Down
113 changes: 113 additions & 0 deletions js_test.go
Expand Up @@ -377,6 +377,119 @@ func TestJetStreamOrderedConsumerWithErrors(t *testing.T) {
testSubError(deleteConsumer)
}

func TestJetStreamOrderedConsumerWithAutoUnsub(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()

if config := s.JetStreamConfig(); config != nil {
defer os.RemoveAll(config.StoreDir)
}

nc, err := Connect(s.ClientURL())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer nc.Close()

js, err := nc.JetStream()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

_, err = js.AddStream(&StreamConfig{
Name: "OBJECT",
Subjects: []string{"a"},
Storage: MemoryStorage,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

count := int32(0)
sub, err := js.Subscribe("a", func(m *Msg) {
atomic.AddInt32(&count, 1)
}, OrderedConsumer(), IdleHeartbeat(250*time.Millisecond))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

// Ask to auto-unsub after 10 messages.
sub.AutoUnsubscribe(10)

// Set a message filter that will drop 1 message
dm := 0
singleLoss := func(m *Msg) *Msg {
if m.Header.Get("data") != _EMPTY_ {
dm++
if dm == 5 {
nc.removeMsgFilter("a")
return nil
}
}
return m
}
nc.addMsgFilter("a", singleLoss)

// Now produce 20 messages
for i := 0; i < 20; i++ {
msg := NewMsg("a")
msg.Data = []byte(fmt.Sprintf("msg_%d", i+1))
msg.Header.Set("data", "true")
js.PublishMsgAsync(msg)
}

select {
case <-js.PublishAsyncComplete():
case <-time.After(time.Second):
t.Fatalf("Did not receive completion signal")
}

// Wait for the subscription to be marked as invalid
deadline := time.Now().Add(time.Second)
ok := false
for time.Now().Before(deadline) {
if !sub.IsValid() {
ok = true
break
}
}
if !ok {
t.Fatalf("Subscription still valid")
}

// Wait a bit to make sure we are not receiving more than expected,
// and give a chance for the server to process the auto-unsub
// protocol.
time.Sleep(500 * time.Millisecond)

if n := atomic.LoadInt32(&count); n != 10 {
t.Fatalf("Sub should have received only 10 messages, got %v", n)
}

// Now capture the in msgs count for the connection
inMsgs := nc.Stats().InMsgs

// Send one more message and this count should not increase if the
// server had properly processed the auto-unsub after the
// reset of the ordered consumer. Use a different connection
// to send.
nc2, err := Connect(s.ClientURL())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer nc2.Close()
js2, err := nc2.JetStream()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
js2.Publish("a", []byte("should not be received"))

newInMsgs := nc.Stats().InMsgs
if inMsgs != newInMsgs {
t.Fatal("Seems that AUTO-UNSUB was not properly handled")
}
}

// We want to make sure we do the right thing with lots of concurrent queue durable consumer requests.
// One should win and the others should share the delivery subject with the first one who wins.
func TestJetStreamConcurrentQueueDurablePushConsumers(t *testing.T) {
Expand Down