Skip to content

Commit

Permalink
Merge pull request #847 from nats-io/js_fix_ordered_cons_auto_unsub
Browse files Browse the repository at this point in the history
[FIXED] JetStream: ordered consumers handling of auto unsub
  • Loading branch information
kozlovic committed Oct 13, 2021
2 parents 0f47b1c + 51d43e4 commit 85f8574
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 0 deletions.
23 changes: 23 additions & 0 deletions js.go
Expand Up @@ -1644,6 +1644,26 @@ func (sub *Subscription) resetOrderedConsumer(sseq uint64) {
return
}

var maxStr string
// If there was an AUTO_UNSUB done, we need to adjust the new value
// to send after the SUB for the new sid.
if sub.max > 0 {
if sub.jsi.fciseq < sub.max {
adjustedMax := sub.max - sub.jsi.fciseq
maxStr = strconv.Itoa(int(adjustedMax))
} else {
// We are already at the max, so we should just unsub the
// existing sub and be done
go func(sid int64) {
nc.mu.Lock()
nc.bw.appendString(fmt.Sprintf(unsubProto, sid, _EMPTY_))
nc.kickFlusher()
nc.mu.Unlock()
}(sub.sid)
return
}
}

// Quick unsubscribe. Since we know this is a simple push subscriber we do in place.
osid := sub.applyNewSID()

Expand All @@ -1663,6 +1683,9 @@ func (sub *Subscription) resetOrderedConsumer(sseq uint64) {
nc.mu.Lock()
nc.bw.appendString(fmt.Sprintf(unsubProto, osid, _EMPTY_))
nc.bw.appendString(fmt.Sprintf(subProto, newDeliver, _EMPTY_, nsid))
if maxStr != _EMPTY_ {
nc.bw.appendString(fmt.Sprintf(unsubProto, nsid, maxStr))
}
nc.kickFlusher()
nc.mu.Unlock()

Expand Down
113 changes: 113 additions & 0 deletions js_test.go
Expand Up @@ -377,6 +377,119 @@ func TestJetStreamOrderedConsumerWithErrors(t *testing.T) {
testSubError(deleteConsumer)
}

func TestJetStreamOrderedConsumerWithAutoUnsub(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()

if config := s.JetStreamConfig(); config != nil {
defer os.RemoveAll(config.StoreDir)
}

nc, err := Connect(s.ClientURL())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer nc.Close()

js, err := nc.JetStream()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

_, err = js.AddStream(&StreamConfig{
Name: "OBJECT",
Subjects: []string{"a"},
Storage: MemoryStorage,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

count := int32(0)
sub, err := js.Subscribe("a", func(m *Msg) {
atomic.AddInt32(&count, 1)
}, OrderedConsumer(), IdleHeartbeat(250*time.Millisecond))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

// Ask to auto-unsub after 10 messages.
sub.AutoUnsubscribe(10)

// Set a message filter that will drop 1 message
dm := 0
singleLoss := func(m *Msg) *Msg {
if m.Header.Get("data") != _EMPTY_ {
dm++
if dm == 5 {
nc.removeMsgFilter("a")
return nil
}
}
return m
}
nc.addMsgFilter("a", singleLoss)

// Now produce 20 messages
for i := 0; i < 20; i++ {
msg := NewMsg("a")
msg.Data = []byte(fmt.Sprintf("msg_%d", i+1))
msg.Header.Set("data", "true")
js.PublishMsgAsync(msg)
}

select {
case <-js.PublishAsyncComplete():
case <-time.After(time.Second):
t.Fatalf("Did not receive completion signal")
}

// Wait for the subscription to be marked as invalid
deadline := time.Now().Add(time.Second)
ok := false
for time.Now().Before(deadline) {
if !sub.IsValid() {
ok = true
break
}
}
if !ok {
t.Fatalf("Subscription still valid")
}

// Wait a bit to make sure we are not receiving more than expected,
// and give a chance for the server to process the auto-unsub
// protocol.
time.Sleep(500 * time.Millisecond)

if n := atomic.LoadInt32(&count); n != 10 {
t.Fatalf("Sub should have received only 10 messages, got %v", n)
}

// Now capture the in msgs count for the connection
inMsgs := nc.Stats().InMsgs

// Send one more message and this count should not increase if the
// server had properly processed the auto-unsub after the
// reset of the ordered consumer. Use a different connection
// to send.
nc2, err := Connect(s.ClientURL())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer nc2.Close()
js2, err := nc2.JetStream()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
js2.Publish("a", []byte("should not be received"))

newInMsgs := nc.Stats().InMsgs
if inMsgs != newInMsgs {
t.Fatal("Seems that AUTO-UNSUB was not properly handled")
}
}

// We want to make sure we do the right thing with lots of concurrent queue durable consumer requests.
// One should win and the others should share the delivery subject with the first one who wins.
func TestJetStreamConcurrentQueueDurablePushConsumers(t *testing.T) {
Expand Down

0 comments on commit 85f8574

Please sign in to comment.