Skip to content

Commit

Permalink
[FIXED] Possible panic in consumer, needed to recheck if consumer was…
Browse files Browse the repository at this point in the history
… closed (#4541)

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Sep 15, 2023
2 parents 8f84ea4 + 097e409 commit d7c66e7
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 1 deletion.
9 changes: 9 additions & 0 deletions server/consumer.go
Expand Up @@ -3794,7 +3794,9 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {
sz int
wrn, wrb int
)

o.mu.Lock()

// consumer is closed when mset is set to nil.
if o.mset == nil {
o.mu.Unlock()
Expand All @@ -3817,6 +3819,13 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {
// Grab our next msg.
pmsg, dc, err = o.getNextMsg()

// We can release the lock now under getNextMsg so need to check this condition again here.
// consumer is closed when mset is set to nil.
if o.mset == nil {
o.mu.Unlock()
return
}

// On error either wait or return.
if err != nil || pmsg == nil {
// On EOF we can optionally fast sync num pending state.
Expand Down
2 changes: 1 addition & 1 deletion server/filestore_test.go
Expand Up @@ -5104,7 +5104,7 @@ func TestFileStoreNumPendingLargeNumBlks(t *testing.T) {

start = time.Now()
total, _ = fs.NumPending(6000, "zzz", false)
require_LessThan(t, time.Since(start), 15*time.Millisecond)
require_LessThan(t, time.Since(start), 25*time.Millisecond)
require_Equal(t, total, 4001)

// Now delete a message in first half and second half.
Expand Down

0 comments on commit d7c66e7

Please sign in to comment.