Skip to content

Commit

Permalink
adding best practises for NotifyPublish for issue_21 scenario (#68)
Browse files Browse the repository at this point in the history
Co-authored-by: Daniele Palaia <dpalaia@dpalaia-a02.vmware.com>
  • Loading branch information
DanielePalaia and Daniele Palaia committed Apr 11, 2022
1 parent 900561c commit 6cac2fa
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 18 deletions.
3 changes: 3 additions & 0 deletions channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,9 @@ or Channel while confirms are in-flight.
It's advisable to wait for all Confirmations to arrive before calling
Channel.Close() or Connection.Close().
It is also advisable for the caller to consume from the channel returned till it is closed
to avoid possible deadlocks
*/
func (ch *Channel) NotifyPublish(confirm chan Confirmation) chan Confirmation {
ch.notifyM.Lock()
Expand Down
58 changes: 40 additions & 18 deletions doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,9 @@ encounters an amqp:// scheme.
SSL/TLS in RabbitMQ is documented here: http://www.rabbitmq.com/ssl.html
Best practises for Connections and Channels notifications.
Best practises to handle library notifications.
Best practises for Connections and Channels notifications:
In order to be notified when a connection or channel gets closed both the structures offer the possibility to register channels using the `notifyClose` function like:
notifyConnClose := make(chan *amqp.Error)
Expand All @@ -123,24 +125,44 @@ The error is sent synchronously to the channel so that the flow will wait until
To avoid deadlocks it is necessary to consume the messages from the channels.
This could be done inside a different goroutine with a select listening on the two channels inside a for loop like:
go func() {
for notifyConnClose != nil || notifyChanClose != nil {
select {
case err, ok := <-notifyConnClose:
if !(ok) {
notifyConnClose = nil
} else {
fmt.Printf("connection closed, error %s", err)
}
case err, ok := <-notifyChanClose:
if !(ok) {
notifyChanClose = nil
} else {
fmt.Printf("channel closed, error %s", err)
}
go func() {
for notifyConnClose != nil || notifyChanClose != nil {
select {
case err, ok := <-notifyConnClose:
if !(ok) {
notifyConnClose = nil
} else {
fmt.Printf("connection closed, error %s", err)
}
case err, ok := <-notifyChanClose:
if !(ok) {
notifyChanClose = nil
} else {
fmt.Printf("channel closed, error %s", err)
}
}
}
}
}()
}()
Best practises for NotifyPublish notifications:
Similary to the previous sceneario using the NotifyPublish method allows the caller of the library to be notified through a go channel when a message has been received
from the broker after Channel.Confirm has been set.
It's advisable to wait for all Confirmations to arrive before calling Channel.Close() or Connection.Close().
It is also necessary for the caller to always consume from this channel till it get closed from the library to avoid possible deadlocks.
Confirmations go channel are indeed notified inside the confirm function of the Confirm struct synchronously:
// confirm confirms one publishing, increments the expecting delivery tag, and
// removes bookkeeping for that delivery tag.
func (c *confirms) confirm(confirmation Confirmation) {
delete(c.sequencer, c.expecting)
c.expecting++
for _, l := range c.listeners {
l <- confirmation
}
}
It is so necessary to have a goroutine consuming from this channel till it get closed.
*/
package amqp091

0 comments on commit 6cac2fa

Please sign in to comment.