Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#1039 Migrate azure/go-amqp to version 1.0.5 #1040

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 3 additions & 3 deletions protocol/amqp/v2/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ go 1.18
replace github.com/cloudevents/sdk-go/v2 => ../../../v2

require (
github.com/Azure/go-amqp v0.17.0
github.com/Azure/go-amqp v1.0.5
github.com/cloudevents/sdk-go/v2 v2.5.0
github.com/stretchr/testify v1.8.0
github.com/stretchr/testify v1.9.0
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/google/go-cmp v0.5.1 // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/json-iterator/go v1.1.10 // indirect
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 // indirect
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742 // indirect
Expand Down
18 changes: 6 additions & 12 deletions protocol/amqp/v2/go.sum
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
github.com/Azure/go-amqp v0.17.0 h1:HHXa3149nKrI0IZwyM7DRcRy5810t9ZICDutn4BYzj4=
github.com/Azure/go-amqp v0.17.0/go.mod h1:9YJ3RhxRT1gquYnzpZO1vcYMMpAdJT+QEg6fwmw9Zlg=
github.com/Azure/go-amqp v1.0.5 h1:po5+ljlcNSU8xtapHTe8gIc8yHxCzC03E8afH2g1ftU=
github.com/Azure/go-amqp v1.0.5/go.mod h1:vZAogwdrkbyK3Mla8m/CxSc/aKdnTZ4IbPxl51Y5WZE=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
github.com/google/go-cmp v0.5.1 h1:JFrFEBb2xKufg6XkJsJr+WbKb4FQlURi5RUcBveYu9k=
github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/json-iterator/go v1.1.10 h1:Kz6Cvnvv2wGdaG/V8yMvfkmNiXq9Ya2KUv4rouJJr68=
github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
Expand All @@ -19,18 +18,13 @@ github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWb
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU=
go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI=
go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
2 changes: 1 addition & 1 deletion protocol/amqp/v2/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ const prefix = "cloudEvents:" // Name prefix for AMQP properties that hold CE at

var (
// Use the package path as AMQP error condition name
condition = amqp.ErrorCondition(reflect.TypeOf(Message{}).PkgPath())
condition = amqp.ErrCond(reflect.TypeOf(Message{}).PkgPath())
specs = spec.WithPrefix(prefix)
)

Expand Down
53 changes: 0 additions & 53 deletions protocol/amqp/v2/options.go

This file was deleted.

127 changes: 60 additions & 67 deletions protocol/amqp/v2/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,8 @@ import (
)

type Protocol struct {
connOpts []amqp.ConnOption
sessionOpts []amqp.SessionOption
senderLinkOpts []amqp.LinkOption
receiverLinkOpts []amqp.LinkOption

// AMQP
Client *amqp.Client
Client *amqp.Conn
Session *amqp.Session
ownedClient bool
Node string
Expand All @@ -35,54 +30,60 @@ type Protocol struct {
}

// NewProtocolFromClient creates a new amqp transport.
func NewProtocolFromClient(client *amqp.Client, session *amqp.Session, queue string, opts ...Option) (*Protocol, error) {
func NewProtocolFromClient(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: are we ok with this breaking change on this signature? I understand why we technically might not need options anymore (since those are now structs), but you might want to keep the options pattern to not break this signature and allow for future expansions. You could introduce a new option to set sender/receiver options on the protocol.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or are you worried that those two structs are always needed and thus an options pattern doesn't really apply?

ctx context.Context,
client *amqp.Conn,
session *amqp.Session,
queue string,
senderOptions amqp.SenderOptions,
receiverOptions amqp.ReceiverOptions,
) (*Protocol, error) {
t := &Protocol{
Node: queue,
senderLinkOpts: []amqp.LinkOption(nil),
receiverLinkOpts: []amqp.LinkOption(nil),
Client: client,
Session: session,
}
if err := t.applyOptions(opts...); err != nil {
return nil, err
Node: queue,
Client: client,
Session: session,
}

t.senderLinkOpts = append(t.senderLinkOpts, amqp.LinkTargetAddress(queue))

// Create a sender
amqpSender, err := session.NewSender(t.senderLinkOpts...)
amqpSender, err := session.NewSender(ctx, queue, &senderOptions)
if err != nil {
_ = client.Close()
_ = session.Close(context.Background())
return nil, err
}
t.Sender = NewSender(amqpSender).(*sender)
t.Sender = NewSender(amqpSender, &amqp.SendOptions{}).(*sender)
t.SenderContextDecorators = []func(context.Context) context.Context{}

t.receiverLinkOpts = append(t.receiverLinkOpts, amqp.LinkSourceAddress(t.Node))
amqpReceiver, err := t.Session.NewReceiver(t.receiverLinkOpts...)
amqpReceiver, err := t.Session.NewReceiver(ctx, t.Node, &receiverOptions)
if err != nil {
return nil, err
}
t.Receiver = NewReceiver(amqpReceiver).(*receiver)
t.Receiver = NewReceiver(amqpReceiver, amqp.ReceiveOptions{}).(*receiver)
return t, nil
}

// NewProtocol creates a new amqp transport.
func NewProtocol(server, queue string, connOption []amqp.ConnOption, sessionOption []amqp.SessionOption, opts ...Option) (*Protocol, error) {
client, err := amqp.Dial(server, connOption...)
func NewProtocol(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same question as above

ctx context.Context,
server, queue string,
connOptions amqp.ConnOptions,
sessionOptions amqp.SessionOptions,
senderOptions amqp.SenderOptions,
receiverOptions amqp.ReceiverOptions,
) (*Protocol, error) {
client, err := amqp.Dial(ctx, server, &connOptions)
if err != nil {
return nil, err
}

// Open a session
session, err := client.NewSession(sessionOption...)
session, err := client.NewSession(ctx, &sessionOptions)
if err != nil {
_ = client.Close()
return nil, err
}

p, err := NewProtocolFromClient(client, session, queue, opts...)
p, err := NewProtocolFromClient(ctx, client, session, queue, senderOptions, receiverOptions)
if err != nil {
return nil, err
}
Expand All @@ -92,69 +93,70 @@ func NewProtocol(server, queue string, connOption []amqp.ConnOption, sessionOpti
}

// NewSenderProtocolFromClient creates a new amqp sender transport.
func NewSenderProtocolFromClient(client *amqp.Client, session *amqp.Session, address string, opts ...Option) (*Protocol, error) {
func NewSenderProtocolFromClient(
ctx context.Context,
client *amqp.Conn,
session *amqp.Session,
address string,
senderOptions amqp.SenderOptions,
) (*Protocol, error) {
t := &Protocol{
Node: address,
senderLinkOpts: []amqp.LinkOption(nil),
receiverLinkOpts: []amqp.LinkOption(nil),
Client: client,
Session: session,
Node: address,
Client: client,
Session: session,
}
if err := t.applyOptions(opts...); err != nil {
return nil, err
}
t.senderLinkOpts = append(t.senderLinkOpts, amqp.LinkTargetAddress(address))

// Create a sender
amqpSender, err := session.NewSender(t.senderLinkOpts...)
amqpSender, err := session.NewSender(ctx, address, &senderOptions)
if err != nil {
_ = client.Close()
_ = session.Close(context.Background())
return nil, err
}
t.Sender = NewSender(amqpSender).(*sender)
t.Sender = NewSender(amqpSender, &amqp.SendOptions{}).(*sender)
t.SenderContextDecorators = []func(context.Context) context.Context{}

return t, nil
}

// NewReceiverProtocolFromClient creates a new receiver amqp transport.
func NewReceiverProtocolFromClient(client *amqp.Client, session *amqp.Session, address string, opts ...Option) (*Protocol, error) {
func NewReceiverProtocolFromClient(
ctx context.Context,
client *amqp.Conn,
session *amqp.Session,
address string,
receiverOptions amqp.ReceiverOptions,
) (*Protocol, error) {
t := &Protocol{
Node: address,
senderLinkOpts: []amqp.LinkOption(nil),
receiverLinkOpts: []amqp.LinkOption(nil),
Client: client,
Session: session,
}
if err := t.applyOptions(opts...); err != nil {
return nil, err
Node: address,
Client: client,
Session: session,
}

t.Node = address
t.receiverLinkOpts = append(t.receiverLinkOpts, amqp.LinkSourceAddress(address))
amqpReceiver, err := t.Session.NewReceiver(t.receiverLinkOpts...)
amqpReceiver, err := t.Session.NewReceiver(ctx, address, &receiverOptions)
if err != nil {
return nil, err
}
t.Receiver = NewReceiver(amqpReceiver).(*receiver)
t.Receiver = NewReceiver(amqpReceiver, amqp.ReceiveOptions{}).(*receiver)
return t, nil
}

// NewSenderProtocol creates a new sender amqp transport.
func NewSenderProtocol(server, address string, connOption []amqp.ConnOption, sessionOption []amqp.SessionOption, opts ...Option) (*Protocol, error) {
client, err := amqp.Dial(server, connOption...)
func NewSenderProtocol(ctx context.Context, server, address string, connOptions amqp.ConnOptions, sessionOptions amqp.SessionOptions, senderOptions amqp.SenderOptions) (*Protocol, error) {
client, err := amqp.Dial(ctx, server, &connOptions)
if err != nil {
return nil, err
}

// Open a session
session, err := client.NewSession(sessionOption...)
session, err := client.NewSession(ctx, &sessionOptions)
if err != nil {
_ = client.Close()
return nil, err
}

p, err := NewSenderProtocolFromClient(client, session, address, opts...)
p, err := NewSenderProtocolFromClient(ctx, client, session, address, senderOptions)
if err != nil {
return nil, err
}
Expand All @@ -164,20 +166,20 @@ func NewSenderProtocol(server, address string, connOption []amqp.ConnOption, ses
}

// NewReceiverProtocol creates a new receiver amqp transport.
func NewReceiverProtocol(server, address string, connOption []amqp.ConnOption, sessionOption []amqp.SessionOption, opts ...Option) (*Protocol, error) {
client, err := amqp.Dial(server, connOption...)
func NewReceiverProtocol(ctx context.Context, server, address string, connOptions amqp.ConnOptions, sessionOptions amqp.SessionOptions, receiverOptions amqp.ReceiverOptions) (*Protocol, error) {
client, err := amqp.Dial(ctx, server, &connOptions)
if err != nil {
return nil, err
}

// Open a session
session, err := client.NewSession(sessionOption...)
session, err := client.NewSession(ctx, &sessionOptions)
if err != nil {
_ = client.Close()
return nil, err
}

p, err := NewReceiverProtocolFromClient(client, session, address, opts...)
p, err := NewReceiverProtocolFromClient(ctx, client, session, address, receiverOptions)

if err != nil {
return nil, err
Expand All @@ -187,15 +189,6 @@ func NewReceiverProtocol(server, address string, connOption []amqp.ConnOption, s
return p, nil
}

func (t *Protocol) applyOptions(opts ...Option) error {
for _, fn := range opts {
if err := fn(t); err != nil {
return err
}
}
return nil
}

func (t *Protocol) Close(ctx context.Context) (err error) {
if t.ownedClient {
// Closing the client will close at cascade sender and receiver
Expand Down
11 changes: 7 additions & 4 deletions protocol/amqp/v2/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@ import (
const serverDown = "session ended by server"

// receiver wraps an amqp.Receiver as a binding.Receiver
type receiver struct{ amqp *amqp.Receiver }
type receiver struct {
amqp *amqp.Receiver
options amqp.ReceiveOptions
}

func (r *receiver) Receive(ctx context.Context) (binding.Message, error) {
m, err := r.amqp.Receive(ctx)
m, err := r.amqp.Receive(ctx, &r.options)
if err != nil {
if err == ctx.Err() {
return nil, io.EOF
Expand All @@ -38,6 +41,6 @@ func (r *receiver) Receive(ctx context.Context) (binding.Message, error) {
}

// NewReceiver create a new Receiver which wraps an amqp.Receiver in a binding.Receiver
func NewReceiver(amqp *amqp.Receiver) protocol.Receiver {
return &receiver{amqp: amqp}
func NewReceiver(amqp *amqp.Receiver, options amqp.ReceiveOptions) protocol.Receiver {
return &receiver{amqp: amqp, options: options}
}