From 415094170609148c64ef9feeead16794c7a8b46d Mon Sep 17 00:00:00 2001 From: Andrei Krasnitski Date: Wed, 18 Aug 2021 21:24:51 +0200 Subject: [PATCH] Add Amazon MQ for RabbitMQ event structure Signed-off-by: Andrei Krasnitski --- events/rabbitmq.go | 62 +++++++++++++++++++++++++++++ events/rabbitmq_test.go | 46 +++++++++++++++++++++ events/testdata/rabbitmq-event.json | 51 ++++++++++++++++++++++++ 3 files changed, 159 insertions(+) create mode 100644 events/rabbitmq.go create mode 100644 events/rabbitmq_test.go create mode 100644 events/testdata/rabbitmq-event.json diff --git a/events/rabbitmq.go b/events/rabbitmq.go new file mode 100644 index 00000000..41c01721 --- /dev/null +++ b/events/rabbitmq.go @@ -0,0 +1,62 @@ +package events + +import ( + "encoding/json" + "time" +) + +type RabbitMQEvent struct { + EventSource string `json:"eventSource"` + EventSourceARN string `json:"eventSourceArn"` + MessagesByQueue map[string][]RabbitMQMessage `json:"rmqMessagesByQueue"` +} + +type RabbitMQMessage struct { + BasicProperties RabbitMQBasicProperties `json:"basicProperties"` + Data string `json:"data"` + Redelivered bool `json:"redelivered"` +} + +type RabbitMQBasicProperties struct { + ContentType string `json:"contentType"` + ContentEncoding *string `json:"contentEncoding"` + Headers map[string]interface{} `json:"headers"` // Application or header exchange table + DeliveryMode uint8 `json:"deliveryMode"` + Priority uint8 `json:"priority"` + CorrelationId *string `json:"correlationId"` + ReplyTo *string `json:"replyTo"` + Expiration string `json:"expiration"` + MessageId *string `json:"messageId"` + Timestamp RabbitMQTimestamp `json:"timestamp"` + Type *string `json:"type"` + UserId string `json:"userId"` + AppId *string `json:"appId"` + ClusterId *string `json:"clusterId"` + BodySize uint64 `json:"bodySize"` +} + +// RabbitMQTimestamp represents the time of the event +type RabbitMQTimestamp time.Time + +const rabbitmqTimestampFormat = "Jan 2, 2006, 3:04:05 PM" + +// MarshalJSON converts a given RabbitMQTimestamp to json +func (t RabbitMQTimestamp) MarshalJSON() ([]byte, error) { + return json.Marshal(time.Time(t).Format(rabbitmqTimestampFormat)) +} + +// UnmarshalJSON converts a given json to a RabbitMQTimestamp +func (t *RabbitMQTimestamp) UnmarshalJSON(data []byte) error { + var s string + if err := json.Unmarshal(data, &s); err != nil { + return err + } + + ts, err := time.Parse(rabbitmqTimestampFormat, s) + if err != nil { + return err + } + + *t = RabbitMQTimestamp(ts) + return nil +} diff --git a/events/rabbitmq_test.go b/events/rabbitmq_test.go new file mode 100644 index 00000000..a70d2ff1 --- /dev/null +++ b/events/rabbitmq_test.go @@ -0,0 +1,46 @@ +package events + +import ( + "encoding/json" + "testing" + "time" + + "github.com/aws/aws-lambda-go/events/test" + "github.com/stretchr/testify/assert" +) + +func TestRabbitMQEventMarshaling(t *testing.T) { + // 1. read JSON from file + inputJson := test.ReadJSONFromFile(t, "./testdata/rabbitmq-event.json") + + // 2. de-serialize into Go object + var inputEvent RabbitMQEvent + if err := json.Unmarshal(inputJson, &inputEvent); err != nil { + t.Errorf("could not unmarshal event. details: %v", err) + } + + // 3. Verify values populated into Go Object, at least one validation per data type + assert.Equal(t, "aws:rmq", inputEvent.EventSource) + assert.Equal(t, "arn:aws:mq:us-west-2:112556298976:broker:test:b-9bcfa592-423a-4942-879d-eb284b418fc8", inputEvent.EventSourceARN) + assert.Equal(t, 1, len(inputEvent.MessagesByQueue)) + for _, messages := range inputEvent.MessagesByQueue { + for _, message := range messages { + assert.Equal(t, false, message.Redelivered) + assert.Equal(t, "text/plain", message.BasicProperties.ContentType) + assert.Equal(t, RabbitMQTimestamp(time.Date(1970, 1, 1, 00, 33, 41, 0, time.UTC)), message.BasicProperties.Timestamp) + } + } + + // 4. serialize to JSON + outputJson, err := json.Marshal(inputEvent) + if err != nil { + t.Errorf("could not marshal event. details: %v", err) + } + + // 5. check result + assert.JSONEq(t, string(inputJson), string(outputJson)) +} + +func TestRabbitMQMarshalingMalformedJson(t *testing.T) { + test.TestMalformedJson(t, RabbitMQEvent{}) +} diff --git a/events/testdata/rabbitmq-event.json b/events/testdata/rabbitmq-event.json new file mode 100644 index 00000000..0d10f5fa --- /dev/null +++ b/events/testdata/rabbitmq-event.json @@ -0,0 +1,51 @@ +{ + "eventSource": "aws:rmq", + "eventSourceArn": "arn:aws:mq:us-west-2:112556298976:broker:test:b-9bcfa592-423a-4942-879d-eb284b418fc8", + "rmqMessagesByQueue": { + "test::/": [ + { + "basicProperties": { + "contentType": "text/plain", + "contentEncoding": null, + "headers": { + "header1": { + "bytes": [ + 118, + 97, + 108, + 117, + 101, + 49 + ] + }, + "header2": { + "bytes": [ + 118, + 97, + 108, + 117, + 101, + 50 + ] + }, + "numberInHeader": 10 + }, + "deliveryMode": 1, + "priority": 34, + "correlationId": null, + "replyTo": null, + "expiration": "60000", + "messageId": null, + "timestamp": "Jan 1, 1970, 12:33:41 AM", + "type": null, + "userId": "AIDACKCEVSQ6C2EXAMPLE", + "appId": null, + "clusterId": null, + "bodySize": 80 + }, + "redelivered": false, + "data": "eyJ0aW1lb3V0IjowLCJkYXRhIjoiQ1pybWYwR3c4T3Y0YnFMUXhENEUifQ==" + } + ] + } +}