Skip to content

Commit

Permalink
Add Amazon MQ for RabbitMQ event structure
Browse files Browse the repository at this point in the history
Signed-off-by: Andrei Krasnitski <andrei.krasnitski@mendix.com>
  • Loading branch information
Infra-Red committed Aug 18, 2021
1 parent 85e7bc7 commit 4150941
Show file tree
Hide file tree
Showing 3 changed files with 159 additions and 0 deletions.
62 changes: 62 additions & 0 deletions events/rabbitmq.go
@@ -0,0 +1,62 @@
package events

import (
"encoding/json"
"time"
)

type RabbitMQEvent struct {
EventSource string `json:"eventSource"`
EventSourceARN string `json:"eventSourceArn"`
MessagesByQueue map[string][]RabbitMQMessage `json:"rmqMessagesByQueue"`
}

type RabbitMQMessage struct {
BasicProperties RabbitMQBasicProperties `json:"basicProperties"`
Data string `json:"data"`
Redelivered bool `json:"redelivered"`
}

type RabbitMQBasicProperties struct {
ContentType string `json:"contentType"`
ContentEncoding *string `json:"contentEncoding"`
Headers map[string]interface{} `json:"headers"` // Application or header exchange table
DeliveryMode uint8 `json:"deliveryMode"`
Priority uint8 `json:"priority"`
CorrelationId *string `json:"correlationId"`
ReplyTo *string `json:"replyTo"`
Expiration string `json:"expiration"`
MessageId *string `json:"messageId"`
Timestamp RabbitMQTimestamp `json:"timestamp"`
Type *string `json:"type"`
UserId string `json:"userId"`
AppId *string `json:"appId"`
ClusterId *string `json:"clusterId"`
BodySize uint64 `json:"bodySize"`
}

// RabbitMQTimestamp represents the time of the event
type RabbitMQTimestamp time.Time

const rabbitmqTimestampFormat = "Jan 2, 2006, 3:04:05 PM"

// MarshalJSON converts a given RabbitMQTimestamp to json
func (t RabbitMQTimestamp) MarshalJSON() ([]byte, error) {
return json.Marshal(time.Time(t).Format(rabbitmqTimestampFormat))
}

// UnmarshalJSON converts a given json to a RabbitMQTimestamp
func (t *RabbitMQTimestamp) UnmarshalJSON(data []byte) error {
var s string
if err := json.Unmarshal(data, &s); err != nil {
return err
}

ts, err := time.Parse(rabbitmqTimestampFormat, s)
if err != nil {
return err
}

*t = RabbitMQTimestamp(ts)
return nil
}
46 changes: 46 additions & 0 deletions events/rabbitmq_test.go
@@ -0,0 +1,46 @@
package events

import (
"encoding/json"
"testing"
"time"

"github.com/aws/aws-lambda-go/events/test"
"github.com/stretchr/testify/assert"
)

func TestRabbitMQEventMarshaling(t *testing.T) {
// 1. read JSON from file
inputJson := test.ReadJSONFromFile(t, "./testdata/rabbitmq-event.json")

// 2. de-serialize into Go object
var inputEvent RabbitMQEvent
if err := json.Unmarshal(inputJson, &inputEvent); err != nil {
t.Errorf("could not unmarshal event. details: %v", err)
}

// 3. Verify values populated into Go Object, at least one validation per data type
assert.Equal(t, "aws:rmq", inputEvent.EventSource)
assert.Equal(t, "arn:aws:mq:us-west-2:112556298976:broker:test:b-9bcfa592-423a-4942-879d-eb284b418fc8", inputEvent.EventSourceARN)
assert.Equal(t, 1, len(inputEvent.MessagesByQueue))
for _, messages := range inputEvent.MessagesByQueue {
for _, message := range messages {
assert.Equal(t, false, message.Redelivered)
assert.Equal(t, "text/plain", message.BasicProperties.ContentType)
assert.Equal(t, RabbitMQTimestamp(time.Date(1970, 1, 1, 00, 33, 41, 0, time.UTC)), message.BasicProperties.Timestamp)
}
}

// 4. serialize to JSON
outputJson, err := json.Marshal(inputEvent)
if err != nil {
t.Errorf("could not marshal event. details: %v", err)
}

// 5. check result
assert.JSONEq(t, string(inputJson), string(outputJson))
}

func TestRabbitMQMarshalingMalformedJson(t *testing.T) {
test.TestMalformedJson(t, RabbitMQEvent{})
}
51 changes: 51 additions & 0 deletions events/testdata/rabbitmq-event.json
@@ -0,0 +1,51 @@
{
"eventSource": "aws:rmq",
"eventSourceArn": "arn:aws:mq:us-west-2:112556298976:broker:test:b-9bcfa592-423a-4942-879d-eb284b418fc8",
"rmqMessagesByQueue": {
"test::/": [
{
"basicProperties": {
"contentType": "text/plain",
"contentEncoding": null,
"headers": {
"header1": {
"bytes": [
118,
97,
108,
117,
101,
49
]
},
"header2": {
"bytes": [
118,
97,
108,
117,
101,
50
]
},
"numberInHeader": 10
},
"deliveryMode": 1,
"priority": 34,
"correlationId": null,
"replyTo": null,
"expiration": "60000",
"messageId": null,
"timestamp": "Jan 1, 1970, 12:33:41 AM",
"type": null,
"userId": "AIDACKCEVSQ6C2EXAMPLE",
"appId": null,
"clusterId": null,
"bodySize": 80
},
"redelivered": false,
"data": "eyJ0aW1lb3V0IjowLCJkYXRhIjoiQ1pybWYwR3c4T3Y0YnFMUXhENEUifQ=="
}
]
}
}

0 comments on commit 4150941

Please sign in to comment.