diff --git a/protocol/pubsub/v2/attributes.go b/protocol/pubsub/v2/attributes.go new file mode 100644 index 00000000..714f9c04 --- /dev/null +++ b/protocol/pubsub/v2/attributes.go @@ -0,0 +1,25 @@ +/* + Copyright 2021 The CloudEvents Authors + SPDX-License-Identifier: Apache-2.0 +*/ + +package pubsub + +import ( + "context" + + "github.com/cloudevents/sdk-go/v2/binding" +) + +type withCustomAttributes struct{} + +func AttributesFrom(ctx context.Context) map[string]string { + return binding.GetOrDefaultFromCtx(ctx, withCustomAttributes{}, make(map[string]string)).(map[string]string) +} + +// WithCustomAttributes sets Message Attributes without any CloudEvent logic. +// Note that this function is not intended for CloudEvent Extensions or any `ce-`-prefixed Attributes. +// For these please see `Event` and `Event.SetExtension`. +func WithCustomAttributes(ctx context.Context, attrs map[string]string) context.Context { + return context.WithValue(ctx, withCustomAttributes{}, attrs) +} diff --git a/protocol/pubsub/v2/doc.go b/protocol/pubsub/v2/doc.go index 4ef4e6fd..db8467b0 100644 --- a/protocol/pubsub/v2/doc.go +++ b/protocol/pubsub/v2/doc.go @@ -5,5 +5,8 @@ /* Package pubsub implements a Pub/Sub binding using google.cloud.com/go/pubsub module + +PubSub Messages can be modified beyond what CloudEvents cover by using `WithOrderingKey` +or `WithCustomAttributes`. See function docs for more details. */ package pubsub diff --git a/protocol/pubsub/v2/protocol.go b/protocol/pubsub/v2/protocol.go index f8e70e2c..e3b257d7 100644 --- a/protocol/pubsub/v2/protocol.go +++ b/protocol/pubsub/v2/protocol.go @@ -110,7 +110,9 @@ func (t *Protocol) Send(ctx context.Context, in binding.Message, transformers .. conn := t.getOrCreateConnection(ctx, topic, "", "") - msg := &pubsub.Message{} + msg := &pubsub.Message{ + Attributes: AttributesFrom(ctx), + } if key, ok := ctx.Value(withOrderingKey{}).(string); ok { if !t.MessageOrdering { diff --git a/protocol/pubsub/v2/protocol_test.go b/protocol/pubsub/v2/protocol_test.go index 8350ac0f..c383c2a1 100644 --- a/protocol/pubsub/v2/protocol_test.go +++ b/protocol/pubsub/v2/protocol_test.go @@ -3,14 +3,17 @@ package pubsub import ( "context" "fmt" + "reflect" "testing" "cloud.google.com/go/pubsub" + "cloud.google.com/go/pubsub/apiv1/pubsubpb" "cloud.google.com/go/pubsub/pstest" "github.com/stretchr/testify/require" "google.golang.org/api/option" pb "google.golang.org/genproto/googleapis/pubsub/v1" "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" "github.com/cloudevents/sdk-go/v2/test" ) @@ -20,6 +23,32 @@ type testPubsubClient struct { conn *grpc.ClientConn } +func (pc *testPubsubClient) NewWithAttributesInterceptor(ctx context.Context, projectID, orderingKey string) (*pubsub.Client, error) { + pc.srv = pstest.NewServer() + conn, err := grpc.Dial(pc.srv.Addr, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithUnaryInterceptor(customAttributesInterceptor(map[string]string{ + "Content-Type": "text/json", + "ce-dataschema": "http://example.com/schema", + "ce-exbinary": "AAECAw==", + "ce-exbool": "true", + "ce-exint": "42", + "ce-exstring": "exstring", + "ce-extime": "2020-03-21T12:34:56.78Z", + "ce-exurl": "http://example.com/source", + "ce-id": "full-event", + "ce-source": "http://example.com/source", + "ce-specversion": "1.0", + "ce-subject": "topic", + "ce-time": "2020-03-21T12:34:56.78Z", + "ce-type": "com.example.FullEvent", + "Proxy-Authorization": "YWxhZGRpbjpvcGVuc2VzYW1l", + }))) + if err != nil { + return nil, err + } + pc.conn = conn + return pubsub.NewClient(ctx, projectID, option.WithGRPCConn(conn)) +} + func (pc *testPubsubClient) NewWithOrderInterceptor(ctx context.Context, projectID, orderingKey string) (*pubsub.Client, error) { pc.srv = pstest.NewServer() conn, err := grpc.Dial(pc.srv.Addr, grpc.WithInsecure(), grpc.WithUnaryInterceptor(orderingKeyInterceptor(orderingKey))) @@ -35,6 +64,20 @@ func (pc *testPubsubClient) Close() { pc.conn.Close() } +func customAttributesInterceptor(attrs map[string]string) grpc.UnaryClientInterceptor { + return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { + if method == "/google.pubsub.v1.Publisher/Publish" { + pr, _ := req.(*pubsubpb.PublishRequest) + for _, m := range pr.Messages { + if !reflect.DeepEqual(m.Attributes, attrs) { + return fmt.Errorf("expecting Attributes %q, got %q", attrs, m.Attributes) + } + } + } + return invoker(ctx, method, req, reply, cc, opts...) + } +} + func orderingKeyInterceptor(orderingKey string) grpc.UnaryClientInterceptor { return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { if method == "/google.pubsub.v1.Publisher/Publish" { @@ -49,6 +92,32 @@ func orderingKeyInterceptor(orderingKey string) grpc.UnaryClientInterceptor { } } +func TestPublishMessageHasCustomAttributes(t *testing.T) { + require := require.New(t) + ctx := context.Background() + pc := &testPubsubClient{} + defer pc.Close() + + projectID, topicID, orderingKey := "test-project", "test-topic", "foobar" + + client, err := pc.NewWithAttributesInterceptor(ctx, projectID, orderingKey) + require.NoError(err, "create pubsub client") + defer client.Close() + + prot, err := New(ctx, + WithClient(client), + WithProjectID(projectID), + WithTopicID(topicID), + AllowCreateTopic(true), + ) + require.NoError(err, "create protocol") + + err = prot.Send(WithCustomAttributes(ctx, map[string]string{ + "Proxy-Authorization": "YWxhZGRpbjpvcGVuc2VzYW1l", + }), test.FullMessage()) + require.NoError(err) +} + func TestPublishMessageHasOrderingKey(t *testing.T) { require := require.New(t) ctx := context.Background() diff --git a/protocol/pubsub/v2/write_pubsub_message.go b/protocol/pubsub/v2/write_pubsub_message.go index 4cdb1e31..ef5e9990 100644 --- a/protocol/pubsub/v2/write_pubsub_message.go +++ b/protocol/pubsub/v2/write_pubsub_message.go @@ -46,7 +46,6 @@ func (b *pubsubMessagePublisher) SetStructuredEvent(ctx context.Context, f forma } func (b *pubsubMessagePublisher) Start(ctx context.Context) error { - b.Attributes = make(map[string]string) return nil }