From bb92040cdfdd5a70f8a7240068ff1616733ac288 Mon Sep 17 00:00:00 2001 From: Carl Zogheib Date: Mon, 25 Apr 2022 12:15:08 +0100 Subject: [PATCH] Add DynamoDB and Kinesis Time Window events and response models --- events/dynamodb.go | 15 ++- events/dynamodb_test.go | 24 ++++ events/epoch_time.go | 26 +++++ events/kinesis.go | 13 +++ events/kinesis_test.go | 24 ++++ .../testdata/dynamodb-time-window-event.json | 104 ++++++++++++++++++ .../testdata/kinesis-time-window-event.json | 33 ++++++ events/time_window.go | 42 +++++++ 8 files changed, 280 insertions(+), 1 deletion(-) create mode 100644 events/testdata/dynamodb-time-window-event.json create mode 100644 events/testdata/kinesis-time-window-event.json create mode 100644 events/time_window.go diff --git a/events/dynamodb.go b/events/dynamodb.go index 30afa756..d5eeeeef 100644 --- a/events/dynamodb.go +++ b/events/dynamodb.go @@ -8,7 +8,20 @@ type DynamoDBEvent struct { Records []DynamoDBEventRecord `json:"Records"` } -// DynamoDbEventRecord stores information about each record of a DynamoDb stream event +// DynamoDBTimeWindowEvent represents an Amazon Dynamodb event when using time windows +// ref. https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html#services-ddb-windows +type DynamoDBTimeWindowEvent struct { + DynamoDBEvent + TimeWindowProperties +} + +// DynamoDBTimeWindowEventResponse is the outer structure to report batch item failures for DynamoDBTimeWindowEvent. +type DynamoDBTimeWindowEventResponse struct { + TimeWindowEventResponseProperties + BatchItemFailures []DynamoDBBatchItemFailure `json:"batchItemFailures"` +} + +// DynamoDBEventRecord stores information about each record of a DynamoDB stream event type DynamoDBEventRecord struct { // The region in which the GetRecords request was received. AWSRegion string `json:"awsRegion"` diff --git a/events/dynamodb_test.go b/events/dynamodb_test.go index 79608955..e364cb02 100644 --- a/events/dynamodb_test.go +++ b/events/dynamodb_test.go @@ -34,3 +34,27 @@ func TestDynamoDBEventMarshaling(t *testing.T) { func TestDynamoDBEventMarshalingMalformedJson(t *testing.T) { test.TestMalformedJson(t, DynamoDBEvent{}) } + +func TestDynamoDBTimeWindowEventMarshaling(t *testing.T) { + // 1. read JSON from file + inputJSON := test.ReadJSONFromFile(t, "./testdata/dynamodb-time-window-event.json") + + // 2. de-serialize into Go object + var inputEvent DynamoDBTimeWindowEvent + if err := json.Unmarshal(inputJSON, &inputEvent); err != nil { + t.Errorf("could not unmarshal event. details: %v", err) + } + + // 3. serialize to JSON + outputJSON, err := json.Marshal(inputEvent) + if err != nil { + t.Errorf("could not marshal event. details: %v", err) + } + + // 4. check result + assert.JSONEq(t, string(inputJSON), string(outputJSON)) +} + +func TestDynamoDBTimeWindowEventMarshalingMalformedJson(t *testing.T) { + test.TestMalformedJson(t, DynamoDBTimeWindowEvent{}) +} diff --git a/events/epoch_time.go b/events/epoch_time.go index b0e48a0e..8636cd5a 100644 --- a/events/epoch_time.go +++ b/events/epoch_time.go @@ -7,6 +7,11 @@ import ( "time" ) +// ISO8601EpochTime serializes a time.Time in JSON as an ISO 8601 string. +type ISO8601EpochTime struct { + time.Time +} + // SecondsEpochTime serializes a time.Time in JSON as a UNIX epoch time in seconds type SecondsEpochTime struct { time.Time @@ -57,3 +62,24 @@ func (e *MilliSecondsEpochTime) UnmarshalJSON(b []byte) error { *e = MilliSecondsEpochTime{time.Unix(epoch/1000, (epoch%1000)*1000000)} return nil } + +func (e ISO8601EpochTime) MarshalJSON() ([]byte, error) { + isoTimestampStr := e.Format(time.RFC3339) + return json.Marshal(isoTimestampStr) +} + +func (e *ISO8601EpochTime) UnmarshalJSON(b []byte) error { + var isoTimestampStr string + err := json.Unmarshal(b, &isoTimestampStr) + if err != nil { + return err + } + + parsed, _ := time.Parse(time.RFC3339, isoTimestampStr) + if err != nil { + return err + } + + *e = ISO8601EpochTime{parsed} + return nil +} diff --git a/events/kinesis.go b/events/kinesis.go index e95733c8..41bae7a9 100644 --- a/events/kinesis.go +++ b/events/kinesis.go @@ -6,6 +6,19 @@ type KinesisEvent struct { Records []KinesisEventRecord `json:"Records"` } +// KinesisTimeWindowEvent represents an Amazon Dynamodb event when using time windows +// ref. https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html#services-kinesis-windows +type KinesisTimeWindowEvent struct { + KinesisEvent + TimeWindowProperties +} + +// KinesisTimeWindowEventResponse is the outer structure to report batch item failures for KinesisTimeWindowEvent. +type KinesisTimeWindowEventResponse struct { + TimeWindowEventResponseProperties + BatchItemFailures []KinesisBatchItemFailure `json:"batchItemFailures"` +} + type KinesisEventRecord struct { AwsRegion string `json:"awsRegion"` //nolint: stylecheck EventID string `json:"eventID"` diff --git a/events/kinesis_test.go b/events/kinesis_test.go index 53a604d9..4ec2bbb1 100644 --- a/events/kinesis_test.go +++ b/events/kinesis_test.go @@ -33,3 +33,27 @@ func TestKinesisEventMarshaling(t *testing.T) { func TestKinesisMarshalingMalformedJson(t *testing.T) { test.TestMalformedJson(t, KinesisEvent{}) } + +func TestKinesisTimeWindowEventMarshaling(t *testing.T) { + // 1. read JSON from file + inputJSON := test.ReadJSONFromFile(t, "./testdata/kinesis-time-window-event.json") + + // 2. de-serialize into Go object + var inputEvent KinesisTimeWindowEvent + if err := json.Unmarshal(inputJSON, &inputEvent); err != nil { + t.Errorf("could not unmarshal event. details: %v", err) + } + + // 3. serialize to JSON + outputJSON, err := json.Marshal(inputEvent) + if err != nil { + t.Errorf("could not marshal event. details: %v", err) + } + + // 4. check result + assert.JSONEq(t, string(inputJSON), string(outputJSON)) +} + +func TestKinesisTimeWindowEventMarshalingMalformedJson(t *testing.T) { + test.TestMalformedJson(t, KinesisTimeWindowEvent{}) +} diff --git a/events/testdata/dynamodb-time-window-event.json b/events/testdata/dynamodb-time-window-event.json new file mode 100644 index 00000000..30aaf5a4 --- /dev/null +++ b/events/testdata/dynamodb-time-window-event.json @@ -0,0 +1,104 @@ +{ + "Records":[ + { + "eventID":"1", + "eventName":"INSERT", + "eventVersion":"1.0", + "eventSource":"aws:dynamodb", + "awsRegion":"us-east-1", + "dynamodb":{ + "ApproximateCreationDateTime": 1480642020, + "Keys":{ + "Id":{ + "N":"101" + } + }, + "NewImage":{ + "Message":{ + "S":"New item!" + }, + "Id":{ + "N":"101" + } + }, + "SequenceNumber":"111", + "SizeBytes":26, + "StreamViewType":"NEW_AND_OLD_IMAGES" + }, + "eventSourceARN":"stream-ARN" + }, + { + "eventID":"2", + "eventName":"MODIFY", + "eventVersion":"1.0", + "eventSource":"aws:dynamodb", + "awsRegion":"us-east-1", + "dynamodb":{ + "ApproximateCreationDateTime": 1480642020, + "Keys":{ + "Id":{ + "N":"101" + } + }, + "NewImage":{ + "Message":{ + "S":"This item has changed" + }, + "Id":{ + "N":"101" + } + }, + "OldImage":{ + "Message":{ + "S":"New item!" + }, + "Id":{ + "N":"101" + } + }, + "SequenceNumber":"222", + "SizeBytes":59, + "StreamViewType":"NEW_AND_OLD_IMAGES" + }, + "eventSourceARN":"stream-ARN" + }, + { + "eventID":"3", + "eventName":"REMOVE", + "eventVersion":"1.0", + "eventSource":"aws:dynamodb", + "awsRegion":"us-east-1", + "dynamodb":{ + "ApproximateCreationDateTime": 1480642020, + "Keys":{ + "Id":{ + "N":"101" + } + }, + "OldImage":{ + "Message":{ + "S":"This item has changed" + }, + "Id":{ + "N":"101" + } + }, + "SequenceNumber":"333", + "SizeBytes":38, + "StreamViewType":"NEW_AND_OLD_IMAGES" + }, + "eventSourceARN":"stream-ARN" + } + ], + "window": { + "start": "2020-07-30T17:00:00Z", + "end": "2020-07-30T17:05:00Z" + }, + "state": { + "1": "state1" + }, + "shardId": "shard123456789", + "eventSourceARN": "stream-ARN", + "isFinalInvokeForWindow": false, + "isWindowTerminatedEarly": false +} diff --git a/events/testdata/kinesis-time-window-event.json b/events/testdata/kinesis-time-window-event.json new file mode 100644 index 00000000..4a630913 --- /dev/null +++ b/events/testdata/kinesis-time-window-event.json @@ -0,0 +1,33 @@ + +{ + "Records": [ + { + "kinesis": { + "kinesisSchemaVersion": "1.0", + "partitionKey": "1", + "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", + "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", + "approximateArrivalTimestamp": 1607497475.000 + }, + "eventSource": "aws:kinesis", + "eventVersion": "1.0", + "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", + "eventName": "aws:kinesis:record", + "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-kinesis-role", + "awsRegion": "us-east-1", + "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream" + } + ], + "window": { + "start": "2020-12-09T07:04:00Z", + "end": "2020-12-09T07:06:00Z" + }, + "state": { + "1": "state 1", + "2": "state 2" + }, + "shardId": "shardId-000000000006", + "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream", + "isFinalInvokeForWindow": false, + "isWindowTerminatedEarly": false +} diff --git a/events/time_window.go b/events/time_window.go new file mode 100644 index 00000000..a72ee61d --- /dev/null +++ b/events/time_window.go @@ -0,0 +1,42 @@ +package events + +// Window is the object that captures the time window for the records in the event when using the tumbling windows feature +// Kinesis: https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html#services-kinesis-windows +// DDB: https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html#services-ddb-windows +type Window struct { + Start ISO8601EpochTime `json:"start"` + End ISO8601EpochTime `json:"end"` +} + +// TimeWindowProperties is the object that captures properties that relate to the tumbling windows feature +// Kinesis: https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html#services-kinesis-windows +// DDB: https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html#services-ddb-windows +type TimeWindowProperties struct { + // Time window for the records in the event. + Window Window `json:"window"` + + // State being built up to this invoke in the time window. + State map[string]string `json:"state"` + + // Shard id of the records + ShardID string `json:"shardId"` + + // The event source ARN of the service that generated the event (eg. DynamoDB or Kinesis) + EventSourceARN string `json:"eventSourceARN"` + + // Set to true for the last invoke of the time window. + // Subsequent invoke will start a new time window along with a fresh state. + IsFinalInvokeForWindow bool `json:"isFinalInvokeForWindow"` + + // Set to true if window is terminated prematurely. + // Subsequent invoke will continue the same window with a fresh state. + IsWindowTerminatedEarly bool `json:"isWindowTerminatedEarly"` +} + +// TimeWindowEventResponseProperties is the object that captures response properties that relate to the tumbling windows feature +// Kinesis: https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html#services-kinesis-windows +// DDB: https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html#services-ddb-windows +type TimeWindowEventResponseProperties struct { + // State being built up to this invoke in the time window. + State map[string]string `json:"state"` +}