Skip to content

Commit

Permalink
Merge branch 'lukebakken/interface-conversion-fixes' into gh-85-followup
Browse files Browse the repository at this point in the history
  • Loading branch information
lukebakken committed May 26, 2022
2 parents 9f0d7de + 2fad9b4 commit 0294034
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 62 deletions.
26 changes: 13 additions & 13 deletions _examples/simple-consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,24 @@ import (
"log"
"os"
"os/signal"
"time"
"syscall"
"time"

amqp "github.com/rabbitmq/amqp091-go"
)

var (
uri = flag.String("uri", "amqp://guest:guest@localhost:5672/", "AMQP URI")
exchange = flag.String("exchange", "test-exchange", "Durable, non-auto-deleted AMQP exchange name")
exchangeType = flag.String("exchange-type", "direct", "Exchange type - direct|fanout|topic|x-custom")
queue = flag.String("queue", "test-queue", "Ephemeral AMQP queue name")
bindingKey = flag.String("key", "test-key", "AMQP binding key")
consumerTag = flag.String("consumer-tag", "simple-consumer", "AMQP consumer tag (should not be blank)")
lifetime = flag.Duration("lifetime", 5*time.Second, "lifetime of process before shutdown (0s=infinite)")
verbose = flag.Bool("verbose", true, "enable verbose output of message data")
autoAck = flag.Bool("auto_ack", false, "enable message auto-ack")
ErrLog = log.New(os.Stderr, "[ERROR] ", log.LstdFlags|log.Lmsgprefix)
Log = log.New(os.Stdout, "[INFO] ", log.LstdFlags|log.Lmsgprefix)
uri = flag.String("uri", "amqp://guest:guest@localhost:5672/", "AMQP URI")
exchange = flag.String("exchange", "test-exchange", "Durable, non-auto-deleted AMQP exchange name")
exchangeType = flag.String("exchange-type", "direct", "Exchange type - direct|fanout|topic|x-custom")
queue = flag.String("queue", "test-queue", "Ephemeral AMQP queue name")
bindingKey = flag.String("key", "test-key", "AMQP binding key")
consumerTag = flag.String("consumer-tag", "simple-consumer", "AMQP consumer tag (should not be blank)")
lifetime = flag.Duration("lifetime", 5*time.Second, "lifetime of process before shutdown (0s=infinite)")
verbose = flag.Bool("verbose", true, "enable verbose output of message data")
autoAck = flag.Bool("auto_ack", false, "enable message auto-ack")
ErrLog = log.New(os.Stderr, "[ERROR] ", log.LstdFlags|log.Lmsgprefix)
Log = log.New(os.Stdout, "[INFO] ", log.LstdFlags|log.Lmsgprefix)
deliveryCount int = 0
)

Expand Down Expand Up @@ -196,7 +196,7 @@ func handle(deliveries <-chan amqp.Delivery, done chan error) {
d.Body,
)
} else {
if deliveryCount % 65536 == 0 {
if deliveryCount%65536 == 0 {
Log.Printf("delivery count %d", deliveryCount)
}
}
Expand Down
130 changes: 88 additions & 42 deletions _examples/simple-producer/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"

amqp "github.com/rabbitmq/amqp091-go"
Expand All @@ -30,25 +32,29 @@ func init() {
}

func main() {
for {
if err := publish(*uri, *exchangeName, *exchangeType, *routingKey, *body, *reliable); err != nil {
ErrLog.Fatalf("%s", err)
}
Log.Printf("published %dB OK", len(*body))
if (*continuous) {
time.Sleep(time.Second)
} else {
break
}
done := make(chan bool)

SetupCloseHandler(done)

if err := publish(done, *uri, *exchangeName, *exchangeType, *routingKey, *body, *reliable); err != nil {
ErrLog.Fatalf("%s", err)
}
}

func publish(amqpURI, exchange, exchangeType, routingKey, body string, reliable bool) error {
func SetupCloseHandler(done chan bool) {
c := make(chan os.Signal)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
go func() {
<-c
done <- true
Log.Printf("Ctrl+C pressed in Terminal")
}()
}

func publish(done chan bool, amqpURI, exchange, exchangeType, routingKey, body string, reliable bool) error {
// This function dials, connects, declares, publishes, and tears down,
// all in one go. In a real service, you probably want to maintain a
// long-lived connection as state, and publish against that.

Log.Printf("dialing %q", amqpURI)
connection, err := amqp.Dial(amqpURI)
if err != nil {
Expand All @@ -75,50 +81,90 @@ func publish(amqpURI, exchange, exchangeType, routingKey, body string, reliable
return fmt.Errorf("Exchange Declare: %s", err)
}

var publishes chan uint64 = nil
var confirms chan amqp.Confirmation = nil

// Reliable publisher confirms require confirm.select support from the
// connection.
if reliable {
Log.Printf("enabling publisher confirms.")
if err := channel.Confirm(false); err != nil {
return fmt.Errorf("Channel could not be put into confirm mode: %s", err)
}
// We'll allow for a few outstanding publisher confirms
publishes = make(chan uint64, 8)
confirms = channel.NotifyPublish(make(chan amqp.Confirmation, 1))

confirms := channel.NotifyPublish(make(chan amqp.Confirmation, 1))

defer confirmOne(confirms)
go confirmHandler(done, publishes, confirms)
}

Log.Printf("declared Exchange, publishing %dB body (%q)", len(body), body)
if err = channel.Publish(
exchange, // publish to an exchange
routingKey, // routing to 0 or more queues
false, // mandatory
false, // immediate
amqp.Publishing{
Headers: amqp.Table{},
ContentType: "text/plain",
ContentEncoding: "",
Body: []byte(body),
DeliveryMode: amqp.Transient, // 1=non-persistent, 2=persistent
Priority: 0, // 0-9
// a bunch of application/implementation-specific fields
},
); err != nil {
return fmt.Errorf("Exchange Publish: %s", err)
Log.Println("declared Exchange, publishing messages")

for {
seqNo := channel.GetNextPublishSeqNo()
Log.Printf("publishing %dB body (%q)", len(body), body)

if err := channel.Publish(
exchange, // publish to an exchange
routingKey, // routing to 0 or more queues
false, // mandatory
false, // immediate
amqp.Publishing{
Headers: amqp.Table{},
ContentType: "text/plain",
ContentEncoding: "",
Body: []byte(body),
DeliveryMode: amqp.Transient, // 1=non-persistent, 2=persistent
Priority: 0, // 0-9
// a bunch of application/implementation-specific fields
},
); err != nil {
return fmt.Errorf("Exchange Publish: %s", err)
}

Log.Printf("published %dB OK", len(body))
if reliable {
publishes <- seqNo
}

if *continuous {
select {
case <-done:
Log.Println("producer is stopping")
return nil
case <-time.After(time.Second):
continue
}
} else {
break
}
}

return nil
}

// One would typically keep a channel of publishings, a sequence number, and a
// set of unacknowledged sequence numbers and loop until the publishing channel
// is closed.
func confirmOne(confirms <-chan amqp.Confirmation) {
Log.Printf("waiting for confirmation of one publishing")

if confirmed := <-confirms; confirmed.Ack {
Log.Printf("confirmed delivery with delivery tag: %d", confirmed.DeliveryTag)
} else {
ErrLog.Printf("failed delivery of delivery tag: %d", confirmed.DeliveryTag)
func confirmHandler(done chan bool, publishes chan uint64, confirms chan amqp.Confirmation) {
m := make(map[uint64]bool)
for {
select {
case <-done:
Log.Println("confirmHandler is stopping")
return
case publishSeqNo := <-publishes:
Log.Printf("waiting for confirmation of %d", publishSeqNo)
m[publishSeqNo] = false
case confirmed := <-confirms:
if confirmed.DeliveryTag > 0 {
if confirmed.Ack {
Log.Printf("confirmed delivery with delivery tag: %d", confirmed.DeliveryTag)
} else {
ErrLog.Printf("failed delivery of delivery tag: %d", confirmed.DeliveryTag)
}
delete(m, confirmed.DeliveryTag)
}
}
if len(m) > 1 {
Log.Printf("outstanding confirmations: %d", len(m))
}
}
}
12 changes: 5 additions & 7 deletions integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1567,8 +1567,7 @@ func TestRepeatedChannelExceptionWithPublishAndMaxProcsIssue46(t *testing.T) {
}
err := ch.Publish("not-existing-exchange", "some-key", false, false, Publishing{Body: []byte("some-data")})
if err != nil {
publishError := err.(*Error)
if publishError.Code != 504 {
if publishError, ok := err.(*Error); !ok || publishError.Code != 504 {
t.Fatalf("expected channel only exception i: %d j: %d error: %+v", i, j, publishError)
}
}
Expand Down Expand Up @@ -1760,13 +1759,12 @@ func TestExchangeDeclarePrecondition(t *testing.T) {

if err == nil {
t.Fatalf("Expected to fail a redeclare with different durability, didn't receive an error")
}

if err, ok := err.(Error); ok {
if err.Code != PreconditionFailed {
} else {
declareErr := err.(*Error)
if declareErr.Code != PreconditionFailed {
t.Fatalf("Expected precondition error")
}
if !err.Recover {
if !declareErr.Recover {
t.Fatalf("Expected to be able to recover")
}
}
Expand Down

0 comments on commit 0294034

Please sign in to comment.