Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IMPROVED] Consumer failing to deliver re-adjusts delivered count and any waiting request. #4472

Merged
merged 2 commits into from Sep 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
50 changes: 47 additions & 3 deletions server/consumer.go
Expand Up @@ -3030,6 +3030,22 @@ func (o *consumer) incDeliveryCount(sseq uint64) uint64 {
return o.rdc[sseq] + 1
}

// Used if we have to adjust on failed delivery or bad lookups.
// Those failed attempts should not increase deliver count.
// Lock should be held.
func (o *consumer) decDeliveryCount(sseq uint64) {
if o.rdc == nil {
return
}
if dc, ok := o.rdc[sseq]; ok {
if dc == 1 {
delete(o.rdc, sseq)
} else {
o.rdc[sseq] -= 1
}
}
}

// send a delivery exceeded advisory.
func (o *consumer) notifyDeliveryExceeded(sseq, dc uint64) {
e := JSConsumerDeliveryExceededAdvisory{
Expand Down Expand Up @@ -3104,6 +3120,8 @@ func (o *consumer) getNextMsg() (*jsPubMsg, uint64, error) {
if sm == nil || err != nil {
pmsg.returnToPool()
pmsg, dc = nil, 0
// Adjust back deliver count.
o.decDeliveryCount(seq)
}
return pmsg, dc, err
}
Expand Down Expand Up @@ -3205,6 +3223,7 @@ func (o *consumer) processWaiting(eos bool) (int, int, int, time.Time) {
interest = true
}
}

// If interest, update batch pending requests counter and update fexp timer.
if interest {
brp += wr.n
Expand Down Expand Up @@ -3508,10 +3527,16 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {
if err == ErrStoreEOF {
o.checkNumPendingOnEOF()
}
if err == ErrStoreMsgNotFound || err == errDeletedMsg || err == ErrStoreEOF || err == errMaxAckPending || err == errPartialCache {
if err == ErrStoreMsgNotFound || err == errDeletedMsg || err == ErrStoreEOF || err == errMaxAckPending {
goto waitForMsgs
} else if err == errPartialCache {
s.Warnf("Unexpected partial cache error looking up message for consumer '%s > %s > %s'",
o.mset.acc, o.mset.cfg.Name, o.cfg.Name)
goto waitForMsgs

} else {
s.Errorf("Received an error looking up message for consumer: %v", err)
s.Errorf("Received an error looking up message for consumer '%s > %s > %s': %v",
o.mset.acc, o.mset.cfg.Name, o.cfg.Name, err)
goto waitForMsgs
}
}
Expand Down Expand Up @@ -3906,20 +3931,39 @@ func (o *consumer) trackPending(sseq, dseq uint64) {
}
}

// Credit back a failed delivery.
// lock should be held.
func (o *consumer) creditWaitingRequest(reply string) {
for i, rp := 0, o.waiting.rp; i < o.waiting.n; i++ {
if wr := o.waiting.reqs[rp]; wr != nil {
if wr.reply == reply {
wr.n++
wr.d--
return
}
}
rp = (rp + 1) % cap(o.waiting.reqs)
}
}

// didNotDeliver is called when a delivery for a consumer message failed.
// Depending on our state, we will process the failure.
func (o *consumer) didNotDeliver(seq uint64) {
func (o *consumer) didNotDeliver(seq uint64, subj string) {
o.mu.Lock()
mset := o.mset
if mset == nil {
o.mu.Unlock()
return
}
// Adjust back deliver count.
o.decDeliveryCount(seq)

var checkDeliveryInterest bool
if o.isPushMode() {
o.active = false
checkDeliveryInterest = true
} else if o.pending != nil {
o.creditWaitingRequest(subj)
// pull mode and we have pending.
if _, ok := o.pending[seq]; ok {
// We found this messsage on pending, we need
Expand Down
96 changes: 96 additions & 0 deletions server/filestore_test.go
Expand Up @@ -5727,3 +5727,99 @@ func TestFileStoreTombstoneBackwardCompatibility(t *testing.T) {

checkPurgeState()
}

// Test that loads from lmb under lots of writes do not return errPartialCache.
func TestFileStoreErrPartialLoad(t *testing.T) {
fs, err := newFileStore(FileStoreConfig{StoreDir: t.TempDir()}, StreamConfig{Name: "zzz", Subjects: []string{"*"}, Storage: FileStorage})
require_NoError(t, err)
defer fs.Stop()

put := func(num int) {
for i := 0; i < num; i++ {
fs.StoreMsg("Z", nil, []byte("ZZZZZZZZZZZZZ"))
}
}

put(100)

// Dump cache of lmb.
clearCache := func() {
fs.mu.RLock()
lmb := fs.lmb
fs.mu.RUnlock()
lmb.mu.Lock()
lmb.clearCache()
lmb.mu.Unlock()
}
clearCache()

qch := make(chan struct{})
defer close(qch)

for i := 0; i < 10; i++ {
go func() {
for {
select {
case <-qch:
return
default:
put(5)
}
}
}()
}

time.Sleep(100 * time.Millisecond)

var smv StoreMsg
for i := 0; i < 10_000; i++ {
fs.mu.RLock()
lmb := fs.lmb
fs.mu.RUnlock()
lmb.mu.Lock()
first, last := fs.lmb.first.seq, fs.lmb.last.seq
if i%100 == 0 {
lmb.clearCache()
}
lmb.mu.Unlock()

if spread := int(last - first); spread > 0 {
seq := first + uint64(rand.Intn(spread))
_, err = fs.LoadMsg(seq, &smv)
require_NoError(t, err)
}
}
}

func TestFileStoreErrPartialLoadOnSyncClose(t *testing.T) {
fs, err := newFileStore(
FileStoreConfig{StoreDir: t.TempDir(), BlockSize: 500},
StreamConfig{Name: "zzz", Subjects: []string{"*"}, Storage: FileStorage},
)
require_NoError(t, err)
defer fs.Stop()

// This yields an internal record length of 50 bytes. So 10 msgs per blk.
msgLen := 19
msg := bytes.Repeat([]byte("A"), msgLen)

// Load up half the block.
for _, subj := range []string{"A", "B", "C", "D", "E"} {
fs.StoreMsg(subj, nil, msg)
}

// Now simulate the sync timer closing the last block.
fs.mu.RLock()
lmb := fs.lmb
fs.mu.RUnlock()
require_True(t, lmb != nil)

lmb.mu.Lock()
lmb.expireCacheLocked()
lmb.dirtyCloseWithRemove(false)
lmb.mu.Unlock()

fs.StoreMsg("Z", nil, msg)
_, err = fs.LoadMsg(1, nil)
require_NoError(t, err)
}
2 changes: 1 addition & 1 deletion server/stream.go
Expand Up @@ -4455,7 +4455,7 @@ func (mset *stream) internalLoop() {
// Check to see if this is a delivery for a consumer and
// we failed to deliver the message. If so alert the consumer.
if pm.o != nil && pm.seq > 0 && !didDeliver {
pm.o.didNotDeliver(pm.seq)
pm.o.didNotDeliver(pm.seq, pm.dsubj)
}
pm.returnToPool()
}
Expand Down