Skip to content

Commit

Permalink
Add DynamoDB and Kinesis Time Window events and response models
Browse files Browse the repository at this point in the history
  • Loading branch information
carlzogh committed Apr 25, 2022
1 parent a0b8eda commit 327ba2b
Show file tree
Hide file tree
Showing 8 changed files with 280 additions and 1 deletion.
15 changes: 14 additions & 1 deletion events/dynamodb.go
Expand Up @@ -8,7 +8,20 @@ type DynamoDBEvent struct {
Records []DynamoDBEventRecord `json:"Records"`
}

// DynamoDbEventRecord stores information about each record of a DynamoDb stream event
// DynamoDBTimeWindowEvent represents an Amazon Dynamodb event when using time windows
// ref. https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html#services-ddb-windows
type DynamoDBTimeWindowEvent struct {
DynamoDBEvent
TimeWindowProperties
}

// DynamoDBTimeWindowEventResponse is the outer structure to report batch item failures for DynamoDBTimeWindowEvent.
type DynamoDBTimeWindowEventResponse struct {
TimeWindowEventResponseProperties
BatchItemFailures []DynamoDBBatchItemFailure `json:"batchItemFailures"`
}

// DynamoDBEventRecord stores information about each record of a DynamoDB stream event
type DynamoDBEventRecord struct {
// The region in which the GetRecords request was received.
AWSRegion string `json:"awsRegion"`
Expand Down
24 changes: 24 additions & 0 deletions events/dynamodb_test.go
Expand Up @@ -34,3 +34,27 @@ func TestDynamoDBEventMarshaling(t *testing.T) {
func TestDynamoDBEventMarshalingMalformedJson(t *testing.T) {
test.TestMalformedJson(t, DynamoDBEvent{})
}

func TestDynamoDBTimeWindowEventMarshaling(t *testing.T) {
// 1. read JSON from file
inputJSON := test.ReadJSONFromFile(t, "./testdata/dynamodb-time-window-event.json")

// 2. de-serialize into Go object
var inputEvent DynamoDBTimeWindowEvent
if err := json.Unmarshal(inputJSON, &inputEvent); err != nil {
t.Errorf("could not unmarshal event. details: %v", err)
}

// 3. serialize to JSON
outputJSON, err := json.Marshal(inputEvent)
if err != nil {
t.Errorf("could not marshal event. details: %v", err)
}

// 4. check result
assert.JSONEq(t, string(inputJSON), string(outputJSON))
}

func TestDynamoDBTimeWindowEventMarshalingMalformedJson(t *testing.T) {
test.TestMalformedJson(t, DynamoDBTimeWindowEvent{})
}
26 changes: 26 additions & 0 deletions events/epoch_time.go
Expand Up @@ -7,6 +7,11 @@ import (
"time"
)

// RFC3339EpochTime serializes a time.Time in JSON as an ISO 8601 string.
type RFC3339EpochTime struct {
time.Time
}

// SecondsEpochTime serializes a time.Time in JSON as a UNIX epoch time in seconds
type SecondsEpochTime struct {
time.Time
Expand Down Expand Up @@ -57,3 +62,24 @@ func (e *MilliSecondsEpochTime) UnmarshalJSON(b []byte) error {
*e = MilliSecondsEpochTime{time.Unix(epoch/1000, (epoch%1000)*1000000)}
return nil
}

func (e RFC3339EpochTime) MarshalJSON() ([]byte, error) {
isoTimestampStr := e.Format(time.RFC3339)
return json.Marshal(isoTimestampStr)
}

func (e *RFC3339EpochTime) UnmarshalJSON(b []byte) error {
var isoTimestampStr string
err := json.Unmarshal(b, &isoTimestampStr)
if err != nil {
return err
}

parsed, err := time.Parse(time.RFC3339, isoTimestampStr)
if err != nil {
return err
}

*e = RFC3339EpochTime{parsed}
return nil
}
13 changes: 13 additions & 0 deletions events/kinesis.go
Expand Up @@ -6,6 +6,19 @@ type KinesisEvent struct {
Records []KinesisEventRecord `json:"Records"`
}

// KinesisTimeWindowEvent represents an Amazon Dynamodb event when using time windows
// ref. https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html#services-kinesis-windows
type KinesisTimeWindowEvent struct {
KinesisEvent
TimeWindowProperties
}

// KinesisTimeWindowEventResponse is the outer structure to report batch item failures for KinesisTimeWindowEvent.
type KinesisTimeWindowEventResponse struct {
TimeWindowEventResponseProperties
BatchItemFailures []KinesisBatchItemFailure `json:"batchItemFailures"`
}

type KinesisEventRecord struct {
AwsRegion string `json:"awsRegion"` //nolint: stylecheck
EventID string `json:"eventID"`
Expand Down
24 changes: 24 additions & 0 deletions events/kinesis_test.go
Expand Up @@ -33,3 +33,27 @@ func TestKinesisEventMarshaling(t *testing.T) {
func TestKinesisMarshalingMalformedJson(t *testing.T) {
test.TestMalformedJson(t, KinesisEvent{})
}

func TestKinesisTimeWindowEventMarshaling(t *testing.T) {
// 1. read JSON from file
inputJSON := test.ReadJSONFromFile(t, "./testdata/kinesis-time-window-event.json")

// 2. de-serialize into Go object
var inputEvent KinesisTimeWindowEvent
if err := json.Unmarshal(inputJSON, &inputEvent); err != nil {
t.Errorf("could not unmarshal event. details: %v", err)
}

// 3. serialize to JSON
outputJSON, err := json.Marshal(inputEvent)
if err != nil {
t.Errorf("could not marshal event. details: %v", err)
}

// 4. check result
assert.JSONEq(t, string(inputJSON), string(outputJSON))
}

func TestKinesisTimeWindowEventMarshalingMalformedJson(t *testing.T) {
test.TestMalformedJson(t, KinesisTimeWindowEvent{})
}
104 changes: 104 additions & 0 deletions events/testdata/dynamodb-time-window-event.json
@@ -0,0 +1,104 @@
{
"Records":[
{
"eventID":"1",
"eventName":"INSERT",
"eventVersion":"1.0",
"eventSource":"aws:dynamodb",
"awsRegion":"us-east-1",
"dynamodb":{
"ApproximateCreationDateTime": 1480642020,
"Keys":{
"Id":{
"N":"101"
}
},
"NewImage":{
"Message":{
"S":"New item!"
},
"Id":{
"N":"101"
}
},
"SequenceNumber":"111",
"SizeBytes":26,
"StreamViewType":"NEW_AND_OLD_IMAGES"
},
"eventSourceARN":"stream-ARN"
},
{
"eventID":"2",
"eventName":"MODIFY",
"eventVersion":"1.0",
"eventSource":"aws:dynamodb",
"awsRegion":"us-east-1",
"dynamodb":{
"ApproximateCreationDateTime": 1480642020,
"Keys":{
"Id":{
"N":"101"
}
},
"NewImage":{
"Message":{
"S":"This item has changed"
},
"Id":{
"N":"101"
}
},
"OldImage":{
"Message":{
"S":"New item!"
},
"Id":{
"N":"101"
}
},
"SequenceNumber":"222",
"SizeBytes":59,
"StreamViewType":"NEW_AND_OLD_IMAGES"
},
"eventSourceARN":"stream-ARN"
},
{
"eventID":"3",
"eventName":"REMOVE",
"eventVersion":"1.0",
"eventSource":"aws:dynamodb",
"awsRegion":"us-east-1",
"dynamodb":{
"ApproximateCreationDateTime": 1480642020,
"Keys":{
"Id":{
"N":"101"
}
},
"OldImage":{
"Message":{
"S":"This item has changed"
},
"Id":{
"N":"101"
}
},
"SequenceNumber":"333",
"SizeBytes":38,
"StreamViewType":"NEW_AND_OLD_IMAGES"
},
"eventSourceARN":"stream-ARN"
}
],
"window": {
"start": "2020-07-30T17:00:00Z",
"end": "2020-07-30T17:05:00Z"
},
"state": {
"1": "state1"
},
"shardId": "shard123456789",
"eventSourceARN": "stream-ARN",
"isFinalInvokeForWindow": false,
"isWindowTerminatedEarly": false
}
33 changes: 33 additions & 0 deletions events/testdata/kinesis-time-window-event.json
@@ -0,0 +1,33 @@

{
"Records": [
{
"kinesis": {
"kinesisSchemaVersion": "1.0",
"partitionKey": "1",
"sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
"data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
"approximateArrivalTimestamp": 1607497475.000
},
"eventSource": "aws:kinesis",
"eventVersion": "1.0",
"eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
"eventName": "aws:kinesis:record",
"invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-kinesis-role",
"awsRegion": "us-east-1",
"eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream"
}
],
"window": {
"start": "2020-12-09T07:04:00Z",
"end": "2020-12-09T07:06:00Z"
},
"state": {
"1": "state 1",
"2": "state 2"
},
"shardId": "shardId-000000000006",
"eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream",
"isFinalInvokeForWindow": false,
"isWindowTerminatedEarly": false
}
42 changes: 42 additions & 0 deletions events/time_window.go
@@ -0,0 +1,42 @@
package events

// Window is the object that captures the time window for the records in the event when using the tumbling windows feature
// Kinesis: https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html#services-kinesis-windows
// DDB: https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html#services-ddb-windows
type Window struct {
Start RFC3339EpochTime `json:"start"`
End RFC3339EpochTime `json:"end"`
}

// TimeWindowProperties is the object that captures properties that relate to the tumbling windows feature
// Kinesis: https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html#services-kinesis-windows
// DDB: https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html#services-ddb-windows
type TimeWindowProperties struct {
// Time window for the records in the event.
Window Window `json:"window"`

// State being built up to this invoke in the time window.
State map[string]string `json:"state"`

// Shard id of the records
ShardID string `json:"shardId"`

// The event source ARN of the service that generated the event (eg. DynamoDB or Kinesis)
EventSourceARN string `json:"eventSourceARN"`

// Set to true for the last invoke of the time window.
// Subsequent invoke will start a new time window along with a fresh state.
IsFinalInvokeForWindow bool `json:"isFinalInvokeForWindow"`

// Set to true if window is terminated prematurely.
// Subsequent invoke will continue the same window with a fresh state.
IsWindowTerminatedEarly bool `json:"isWindowTerminatedEarly"`
}

// TimeWindowEventResponseProperties is the object that captures response properties that relate to the tumbling windows feature
// Kinesis: https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html#services-kinesis-windows
// DDB: https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html#services-ddb-windows
type TimeWindowEventResponseProperties struct {
// State being built up to this invoke in the time window.
State map[string]string `json:"state"`
}

0 comments on commit 327ba2b

Please sign in to comment.