Skip to content

Commit

Permalink
feat(AWS Stream): Support filterPatterns (#10285)
Browse files Browse the repository at this point in the history
  • Loading branch information
fredericbarthelet committed Nov 30, 2021
1 parent 0714f7d commit fc00505
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 0 deletions.
19 changes: 19 additions & 0 deletions docs/providers/aws/events/streams.md
Expand Up @@ -329,3 +329,22 @@ functions:
arn: arn:aws:dynamodb:region:XXXXXX:table/foo/stream/1970-01-01T00:00:00.000
tumblingWindowInSeconds: 30
```

## Setting filter patterns

This configuration allows customers to filter event before lambda invocation. It accepts up to 5 filter criterion by default and up to 10 with quota extension. If one event matches at least 1 pattern, lambda will process it.

For more details and examples of filter patterns, please see the [AWS event filtering documentation](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html)

Note: Serverless only sets this property if you explicitly add it to the stream configuration (see an example below). The following example will only process inserted items in the DynamoDB table (it will skip removed and modified items).

```yml
functions:
handleInsertedDynamoDBItem:
handler: handler.preprocess
events:
- stream:
arn: arn:aws:dynamodb:region:XXXXXX:table/foo/stream/1970-01-01T00:00:00.000
filterPatterns:
- eventName: [INSERT]
```
2 changes: 2 additions & 0 deletions docs/providers/aws/guide/serverless.yml.md
Expand Up @@ -458,6 +458,8 @@ functions:
startingPosition: LATEST
enabled: true
functionResponseType: ReportBatchItemFailures
filterPatterns:
- partitionKey: [ 1 ]
- msk:
arn: arn:aws:kafka:us-east-1:111111111111:cluster/ClusterName/a1a1a1a1a1a1a1a1a # ARN of MSK Cluster
topic: kafkaTopic # name of Kafka topic to consume from
Expand Down
14 changes: 14 additions & 0 deletions lib/plugins/aws/package/compile/events/stream.js
Expand Up @@ -69,6 +69,12 @@ class AwsCompileStreamEvents {
required: ['onFailure'],
},
tumblingWindowInSeconds: { type: 'integer', minimum: 0, maximum: 900 },
filterPatterns: {
type: 'array',
minItems: 1,
maxItems: 10,
items: { type: 'object' },
},
},
additionalProperties: false,
anyOf: [
Expand Down Expand Up @@ -273,6 +279,14 @@ class AwsCompileStreamEvents {
};
}

if (event.stream.filterPatterns) {
streamResource.Properties.FilterCriteria = {
Filters: event.stream.filterPatterns.map((pattern) => ({
Pattern: JSON.stringify(pattern),
})),
};
}

const newStreamObject = {
[streamLogicalId]: streamResource,
};
Expand Down
39 changes: 39 additions & 0 deletions test/unit/lib/plugins/aws/package/compile/events/stream.test.js
Expand Up @@ -1724,4 +1724,43 @@ describe('AwsCompileStreamEvents #2', () => {
);
});
});
describe('with filterPatterns', () => {
let eventSourceMappingResource;

before(async () => {
const { awsNaming, cfTemplate } = await runServerless({
fixture: 'function',
configExt: {
functions: {
basic: {
events: [
{
stream: {
arn: 'arn:aws:dynamodb:region:account:table/foo/stream/1',
filterPatterns: [{ eventName: ['INSERT'] }, { eventName: ['MODIFY'] }],
},
},
],
},
},
},
command: 'package',
});
const streamLogicalId = awsNaming.getStreamLogicalId('basic', 'dynamodb', 'foo');
eventSourceMappingResource = cfTemplate.Resources[streamLogicalId];
});

it('should wrap patterns within FilterCriteria property', () => {
expect(eventSourceMappingResource.Properties.FilterCriteria).to.deep.equal({
Filters: [
{
Pattern: JSON.stringify({ eventName: ['INSERT'] }),
},
{
Pattern: JSON.stringify({ eventName: ['MODIFY'] }),
},
],
});
});
});
});

0 comments on commit fc00505

Please sign in to comment.