Skip to content

Commit

Permalink
[IMPROVED] Consumer failing to deliver re-adjusts delivered count and…
Browse files Browse the repository at this point in the history
… any waiting request. (#4472)

When we fail to deliver a message for a consumer, either through
didNotDeliver() or LoadMsg() failure re-adjust delivered count and
waitingRequest accounting.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Sep 1, 2023
2 parents 45e6812 + c679f9d commit a2373d9
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 4 deletions.
50 changes: 47 additions & 3 deletions server/consumer.go
Expand Up @@ -3030,6 +3030,22 @@ func (o *consumer) incDeliveryCount(sseq uint64) uint64 {
return o.rdc[sseq] + 1
}

// Used if we have to adjust on failed delivery or bad lookups.
// Those failed attempts should not increase deliver count.
// Lock should be held.
func (o *consumer) decDeliveryCount(sseq uint64) {
if o.rdc == nil {
return
}
if dc, ok := o.rdc[sseq]; ok {
if dc == 1 {
delete(o.rdc, sseq)
} else {
o.rdc[sseq] -= 1
}
}
}

// send a delivery exceeded advisory.
func (o *consumer) notifyDeliveryExceeded(sseq, dc uint64) {
e := JSConsumerDeliveryExceededAdvisory{
Expand Down Expand Up @@ -3104,6 +3120,8 @@ func (o *consumer) getNextMsg() (*jsPubMsg, uint64, error) {
if sm == nil || err != nil {
pmsg.returnToPool()
pmsg, dc = nil, 0
// Adjust back deliver count.
o.decDeliveryCount(seq)
}
return pmsg, dc, err
}
Expand Down Expand Up @@ -3205,6 +3223,7 @@ func (o *consumer) processWaiting(eos bool) (int, int, int, time.Time) {
interest = true
}
}

// If interest, update batch pending requests counter and update fexp timer.
if interest {
brp += wr.n
Expand Down Expand Up @@ -3508,10 +3527,16 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {
if err == ErrStoreEOF {
o.checkNumPendingOnEOF()
}
if err == ErrStoreMsgNotFound || err == errDeletedMsg || err == ErrStoreEOF || err == errMaxAckPending || err == errPartialCache {
if err == ErrStoreMsgNotFound || err == errDeletedMsg || err == ErrStoreEOF || err == errMaxAckPending {
goto waitForMsgs
} else if err == errPartialCache {
s.Warnf("Unexpected partial cache error looking up message for consumer '%s > %s > %s'",
o.mset.acc, o.mset.cfg.Name, o.cfg.Name)
goto waitForMsgs

} else {
s.Errorf("Received an error looking up message for consumer: %v", err)
s.Errorf("Received an error looking up message for consumer '%s > %s > %s': %v",
o.mset.acc, o.mset.cfg.Name, o.cfg.Name, err)
goto waitForMsgs
}
}
Expand Down Expand Up @@ -3906,20 +3931,39 @@ func (o *consumer) trackPending(sseq, dseq uint64) {
}
}

// Credit back a failed delivery.
// lock should be held.
func (o *consumer) creditWaitingRequest(reply string) {
for i, rp := 0, o.waiting.rp; i < o.waiting.n; i++ {
if wr := o.waiting.reqs[rp]; wr != nil {
if wr.reply == reply {
wr.n++
wr.d--
return
}
}
rp = (rp + 1) % cap(o.waiting.reqs)
}
}

// didNotDeliver is called when a delivery for a consumer message failed.
// Depending on our state, we will process the failure.
func (o *consumer) didNotDeliver(seq uint64) {
func (o *consumer) didNotDeliver(seq uint64, subj string) {
o.mu.Lock()
mset := o.mset
if mset == nil {
o.mu.Unlock()
return
}
// Adjust back deliver count.
o.decDeliveryCount(seq)

var checkDeliveryInterest bool
if o.isPushMode() {
o.active = false
checkDeliveryInterest = true
} else if o.pending != nil {
o.creditWaitingRequest(subj)
// pull mode and we have pending.
if _, ok := o.pending[seq]; ok {
// We found this messsage on pending, we need
Expand Down
96 changes: 96 additions & 0 deletions server/filestore_test.go
Expand Up @@ -5727,3 +5727,99 @@ func TestFileStoreTombstoneBackwardCompatibility(t *testing.T) {

checkPurgeState()
}

// Test that loads from lmb under lots of writes do not return errPartialCache.
func TestFileStoreErrPartialLoad(t *testing.T) {
fs, err := newFileStore(FileStoreConfig{StoreDir: t.TempDir()}, StreamConfig{Name: "zzz", Subjects: []string{"*"}, Storage: FileStorage})
require_NoError(t, err)
defer fs.Stop()

put := func(num int) {
for i := 0; i < num; i++ {
fs.StoreMsg("Z", nil, []byte("ZZZZZZZZZZZZZ"))
}
}

put(100)

// Dump cache of lmb.
clearCache := func() {
fs.mu.RLock()
lmb := fs.lmb
fs.mu.RUnlock()
lmb.mu.Lock()
lmb.clearCache()
lmb.mu.Unlock()
}
clearCache()

qch := make(chan struct{})
defer close(qch)

for i := 0; i < 10; i++ {
go func() {
for {
select {
case <-qch:
return
default:
put(5)
}
}
}()
}

time.Sleep(100 * time.Millisecond)

var smv StoreMsg
for i := 0; i < 10_000; i++ {
fs.mu.RLock()
lmb := fs.lmb
fs.mu.RUnlock()
lmb.mu.Lock()
first, last := fs.lmb.first.seq, fs.lmb.last.seq
if i%100 == 0 {
lmb.clearCache()
}
lmb.mu.Unlock()

if spread := int(last - first); spread > 0 {
seq := first + uint64(rand.Intn(spread))
_, err = fs.LoadMsg(seq, &smv)
require_NoError(t, err)
}
}
}

func TestFileStoreErrPartialLoadOnSyncClose(t *testing.T) {
fs, err := newFileStore(
FileStoreConfig{StoreDir: t.TempDir(), BlockSize: 500},
StreamConfig{Name: "zzz", Subjects: []string{"*"}, Storage: FileStorage},
)
require_NoError(t, err)
defer fs.Stop()

// This yields an internal record length of 50 bytes. So 10 msgs per blk.
msgLen := 19
msg := bytes.Repeat([]byte("A"), msgLen)

// Load up half the block.
for _, subj := range []string{"A", "B", "C", "D", "E"} {
fs.StoreMsg(subj, nil, msg)
}

// Now simulate the sync timer closing the last block.
fs.mu.RLock()
lmb := fs.lmb
fs.mu.RUnlock()
require_True(t, lmb != nil)

lmb.mu.Lock()
lmb.expireCacheLocked()
lmb.dirtyCloseWithRemove(false)
lmb.mu.Unlock()

fs.StoreMsg("Z", nil, msg)
_, err = fs.LoadMsg(1, nil)
require_NoError(t, err)
}
2 changes: 1 addition & 1 deletion server/stream.go
Expand Up @@ -4455,7 +4455,7 @@ func (mset *stream) internalLoop() {
// Check to see if this is a delivery for a consumer and
// we failed to deliver the message. If so alert the consumer.
if pm.o != nil && pm.seq > 0 && !didDeliver {
pm.o.didNotDeliver(pm.seq)
pm.o.didNotDeliver(pm.seq, pm.dsubj)
}
pm.returnToPool()
}
Expand Down

0 comments on commit a2373d9

Please sign in to comment.