Skip to content

Commit

Permalink
feat(AWS Stream): Support tumblingWindowInSeconds (#9979)
Browse files Browse the repository at this point in the history
  • Loading branch information
gmcrocetti committed Sep 22, 2021
1 parent 0e83462 commit af39fc0
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 2 deletions.
23 changes: 23 additions & 0 deletions docs/providers/aws/events/streams.md
Expand Up @@ -320,3 +320,26 @@ functions:
```

For more information, read this [AWS blog post](https://aws.amazon.com/blogs/compute/increasing-real-time-stream-processing-performance-with-amazon-kinesis-data-streams-enhanced-fan-out-and-aws-lambda/) or this [AWS documentation](https://docs.aws.amazon.com/streams/latest/dev/introduction-to-enhanced-consumers.html).

## Setting TumblingWindowInSeconds

This configuration allows customers to aggregate values in near-realtime, allowing state to by passed forward by Lambda invocations. A event source created with this property adds several new attributes to the events delivered to the Lambda function.

- **window**: beginning and ending timestamps of the tumbling window;
- **state**: an object containing state of a previous execution. Initially empty can contain up to **1mb** of data;
- **isFinalInvokeForWindow**: indicates if this is the last execution for the tumbling window;
- **isWindowTerminatedEarly**: happens only when the state object exceeds maximum allowed size of 1mb.

For more information and examples, read the [AWS release announcement](https://aws.amazon.com/blogs/compute/using-aws-lambda-for-streaming-analytics/)

Note: Serverless only sets this property if you explicitly add it to the stream configuration (see example below).

```yml
functions:
preprocess:
handler: handler.preprocess
events:
- stream:
arn: arn:aws:dynamodb:region:XXXXXX:table/foo/stream/1970-01-01T00:00:00.000
tumblingWindowInSeconds: 30
```
10 changes: 8 additions & 2 deletions lib/plugins/aws/package/compile/events/stream.js
Expand Up @@ -68,6 +68,7 @@ class AwsCompileStreamEvents {
additionalProperties: false,
required: ['onFailure'],
},
tumblingWindowInSeconds: { type: 'integer', minimum: 0, maximum: 900 },
},
additionalProperties: false,
anyOf: [
Expand Down Expand Up @@ -203,11 +204,11 @@ class AwsCompileStreamEvents {
DependsOn: dependsOn,
Properties: {
BatchSize,
ParallelizationFactor,
Enabled,
EventSourceArn,
FunctionName: resolveLambdaTarget(functionName, functionObj),
ParallelizationFactor,
StartingPosition,
Enabled,
},
};

Expand Down Expand Up @@ -242,6 +243,11 @@ class AwsCompileStreamEvents {
event.stream.maximumRecordAgeInSeconds;
}

if (event.stream.tumblingWindowInSeconds != null) {
streamResource.Properties.TumblingWindowInSeconds =
event.stream.tumblingWindowInSeconds;
}

if (event.stream.destinations) {
let OnFailureDestinationArn;

Expand Down
59 changes: 59 additions & 0 deletions test/unit/lib/plugins/aws/package/compile/events/stream.test.js
Expand Up @@ -1665,4 +1665,63 @@ describe('AwsCompileStreamEvents #2', () => {
]);
});
});
describe('with TumblingWindowInSeconds enabled', () => {
let eventSourceMappingKinesisResource;
let eventSourceMappingDynamoDBResource;
let eventSourceMappingNoTumblingResource;

before(async () => {
const { awsNaming, cfTemplate } = await runServerless({
fixture: 'function',
configExt: {
functions: {
foo: {
events: [
{
stream: {
arn: 'arn:aws:kinesis:us-east-1:123456789012:stream/myKinesisStream',
tumblingWindowInSeconds: 30,
},
},
{
stream: {
arn: 'arn:aws:dynamodb:region:account:table/myDDBstream/stream/1',
tumblingWindowInSeconds: 50,
},
},
{
stream: {
arn: 'arn:aws:dynamodb:region:account:table/noTumblingStream/stream/1',
},
},
],
},
},
},
command: 'package',
});
const kinesisLogicalId = awsNaming.getStreamLogicalId('foo', 'kinesis', 'myKinesisStream');
const dynamoLogicalId = awsNaming.getStreamLogicalId('foo', 'dynamodb', 'myDDBstream');
const noTumblingLogicalId = awsNaming.getStreamLogicalId(
'foo',
'dynamodb',
'noTumblingStream'
);

eventSourceMappingKinesisResource = cfTemplate.Resources[kinesisLogicalId];
eventSourceMappingDynamoDBResource = cfTemplate.Resources[dynamoLogicalId];
eventSourceMappingNoTumblingResource = cfTemplate.Resources[noTumblingLogicalId];
});

it('should have TumblingWindowInSeconds property', () => {
expect(eventSourceMappingKinesisResource.Properties.TumblingWindowInSeconds).to.equal(30);
expect(eventSourceMappingDynamoDBResource.Properties.TumblingWindowInSeconds).to.equal(50);
});

it('should not have TumblingWindowInSeconds property', () => {
expect(eventSourceMappingNoTumblingResource.Properties).to.not.have.property(
'TumblingWindowInSeconds'
);
});
});
});

0 comments on commit af39fc0

Please sign in to comment.