Skip to content

Commit

Permalink
Do not embed context in DeferredConfirmation
Browse files Browse the repository at this point in the history
This change removes embedded context.Context (which is generally an
antipattern) from DeferredConfirmation. Instead, we use a simple channel
to wait for ack/nack status. This approach is more flexible since it can
be combined with timers, tickers, other contexts and channels in general
using select{} statements and there is no overhead from context
cancellation setup.

Note that rabbitmq#96 introduced a behavior where Wait would unblock and return
false once the context passed to Publish expires. This commit reverts
this (arguably breaking) behavior in favor of WaitContext function.
  • Loading branch information
tie committed Dec 16, 2022
1 parent 1e67c9e commit efd6da1
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 117 deletions.
3 changes: 1 addition & 2 deletions channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,6 @@ func (ch *Channel) NotifyPublish(confirm chan Confirmation) chan Confirmation {
}

return confirm

}

/*
Expand Down Expand Up @@ -1458,7 +1457,7 @@ func (ch *Channel) PublishWithDeferredConfirmWithContext(ctx context.Context, ex
}

if ch.confirming {
return ch.confirms.Publish(ctx), nil
return ch.confirms.Publish(), nil
}

return nil, nil
Expand Down
73 changes: 52 additions & 21 deletions confirms.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package amqp091
import (
"context"
"sync"
"sync/atomic"
)

// confirms resequences and notifies one or multiple publisher confirmation listeners
Expand Down Expand Up @@ -39,12 +40,12 @@ func (c *confirms) Listen(l chan Confirmation) {
}

// Publish increments the publishing counter
func (c *confirms) Publish(ctx context.Context) *DeferredConfirmation {
func (c *confirms) Publish() *DeferredConfirmation {
c.publishedMut.Lock()
defer c.publishedMut.Unlock()

c.published++
return c.deferredConfirmations.Add(ctx, c.published)
return c.deferredConfirmations.Add(c.published)
}

// confirm confirms one publishing, increments the expecting delivery tag, and
Expand Down Expand Up @@ -125,12 +126,12 @@ func newDeferredConfirmations() *deferredConfirmations {
}
}

func (d *deferredConfirmations) Add(ctx context.Context, tag uint64) *DeferredConfirmation {
func (d *deferredConfirmations) Add(tag uint64) *DeferredConfirmation {
d.m.Lock()
defer d.m.Unlock()

dc := &DeferredConfirmation{DeliveryTag: tag}
dc.ctx, dc.cancel = context.WithCancel(ctx)
dc.done = make(chan struct{})
d.confirmations[tag] = dc
return dc
}
Expand All @@ -141,10 +142,11 @@ func (d *deferredConfirmations) Confirm(confirmation Confirmation) {

dc, found := d.confirmations[confirmation.DeliveryTag]
if !found {
// we should never receive a confirmation for a tag that hasn't been published, but a test causes this to happen
// We should never receive a confirmation for a tag that hasn't
// been published, but a test causes this to happen.
return
}
dc.Confirm(confirmation.Ack)
dc.setAck(confirmation.Ack)
delete(d.confirmations, confirmation.DeliveryTag)
}

Expand All @@ -154,37 +156,66 @@ func (d *deferredConfirmations) ConfirmMultiple(confirmation Confirmation) {

for k, v := range d.confirmations {
if k <= confirmation.DeliveryTag {
v.Confirm(confirmation.Ack)
v.setAck(confirmation.Ack)
delete(d.confirmations, k)
}
}
}

// Nacks all pending DeferredConfirmations being blocked by dc.Wait()
// Close nacks all pending DeferredConfirmations being blocked by dc.Wait().
func (d *deferredConfirmations) Close() {
d.m.Lock()
defer d.m.Unlock()

for k, v := range d.confirmations {
v.Confirm(false)
v.setAck(false)
delete(d.confirmations, k)
}
}

// Confirm ack confirmation.
func (d *DeferredConfirmation) Confirm(ack bool) {
d.m.Lock()
defer d.m.Unlock()
// Confirm does nothing. It exists for backwards compatibility.
//
// Deprecated: do not use.
func (d *DeferredConfirmation) Confirm(_ bool) {}

d.confirmation.Ack = ack
d.cancel()
// setAck sets the acknowledgement status of the confirmation. Note that it is
// not safe for concurrent use and must not be called more than once.
func (d *DeferredConfirmation) setAck(ack bool) {
if ack {
atomic.StoreInt32(&d.ack, 1)
}
close(d.done)
}

// Waits for publisher confirmation. Returns true if server successfully received the publishing.
func (d *DeferredConfirmation) Wait() bool {
<-d.ctx.Done()
// Done returns the channel that can be used to wait for the publisher
// confirmation.
func (d *DeferredConfirmation) Done() chan struct{} {
return d.done
}

d.m.Lock()
defer d.m.Unlock()
return d.confirmation.Ack
// Acked returns the publisher confirmation in a non-blocking manner. It returns
// false if the confirmation was not confirmed yet or was not acknowledged.
func (d *DeferredConfirmation) Acked() bool {
return atomic.LoadInt32(&d.ack) != 0
}

// Wait blocks until the publisher confirmation. Returns true if server
// successfully received the publishing.
func (d *DeferredConfirmation) Wait() bool {
<-d.done
// NB we do not use atomics here since setAck is called at most once and
// ack is already guarded by done channel. That is, ack is guaranteed to
// be read-only at this point.
return d.ack != 0
}

// WaitContext is like Wait but returns ctx.Err() if the given context expires.
func (d *DeferredConfirmation) WaitContext(ctx context.Context) (bool, error) {
select {
case <-ctx.Done():
return false, ctx.Err()
case <-d.done:
}
// See comment in Wait regarding non-atomic ack access.
return d.ack != 0, nil
}

0 comments on commit efd6da1

Please sign in to comment.