Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(AWS MSK): Support MSK through "msk" event (#8164)
- Loading branch information
Showing
19 changed files
with
967 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,84 @@ | ||
<!-- | ||
title: Serverless Framework - AWS Lambda Events - Managed Streaming for Apache Kafka (MSK) | ||
menuText: MSK | ||
menuOrder: 18 | ||
description: Setting up AWS MSK Events with AWS Lambda via the Serverless Framework | ||
layout: Doc | ||
--> | ||
|
||
<!-- DOCS-SITE-LINK:START automatically generated --> | ||
|
||
### [Read this on the main serverless docs site](https://www.serverless.com/framework/docs/providers/aws/events/msk) | ||
|
||
<!-- DOCS-SITE-LINK:END --> | ||
|
||
# MSK | ||
|
||
[Amazon Managed Streaming for Apache Kafka (Amazon MSK)](https://aws.amazon.com/msk/) is a fully managed streaming service that uses Apache Kafka. | ||
Amazon MSK can be used as event source for Lambda, which allows Lambda service to internally poll it for new messages and invoke corresponding Lambda functions. | ||
|
||
## Simple event definition | ||
|
||
In the following example, we specify that the `compute` function should be triggered whenever there are new messages available to consume from defined Kafka `topic`. | ||
|
||
In order to configure `msk` event, you have to provide two required properties: `arn`, which represents an ARN of MSK cluster and `topic` to consume messages from. | ||
|
||
The ARN for the MSK cluster can be specified as a string, the reference to the ARN resource by a logical ID, or the import of an ARN that was exported by a different service or CloudFormation stack. | ||
|
||
```yml | ||
functions: | ||
compute: | ||
handler: handler.compute | ||
events: | ||
# These are all possible formats | ||
- msk: | ||
arn: arn:aws:kafka:region:XXXXXX:cluster/MyCluster/xxxx-xxxxx-xxxx | ||
topic: mytopic | ||
- msk: | ||
arn: | ||
Fn::ImportValue: MyExportedMSKClusterArn | ||
topic: mytopic | ||
- msk: | ||
arn: !Ref MyMSKCluster | ||
topic: mytopic | ||
``` | ||
|
||
## Setting the BatchSize and StartingPosition | ||
|
||
For the MSK event integration, you can set the `batchSize`, which effects how many messages can be processed in a single Lambda invocation. The default `batchSize` is 100, and the max `batchSize` is 10000. | ||
In addition, you can also configure `startingPosition`, which controls the position at which Lambda should start consuming messages from MSK topic. It supports two possible values, `TRIM_HORIZON` and `LATEST`, with `TRIM_HORIZON` being the default. | ||
|
||
In the following example, we specify that the `compute` function should have an `msk` event configured with `batchSize` of 1000 and `startingPosition` equal to `LATEST`. | ||
|
||
```yml | ||
functions: | ||
compute: | ||
handler: handler.compute | ||
events: | ||
- msk: | ||
arn: arn:aws:kafka:region:XXXXXX:cluster/MyCluster/xxxx-xxxxx-xxxx | ||
topic: mytopic | ||
batchSize: 1000 | ||
startingPosition: LATEST | ||
``` | ||
|
||
## Enabling and disabling MSK event | ||
|
||
The `msk` event also supports `enabled` parameter, which is used to control if the event source mapping is active. Setting it to `false` will pause polling for and processing new messages. | ||
|
||
In the following example, we specify that the `company` function's `msk` event should be disabled. | ||
|
||
```yml | ||
functions: | ||
compute: | ||
handler: handler.compute | ||
events: | ||
- msk: | ||
arn: arn:aws:kafka:region:XXXXXX:cluster/MyCluster/xxxx-xxxxx-xxxx | ||
topic: mytopic | ||
enabled: false | ||
``` | ||
|
||
## IAM Permissions | ||
|
||
The Serverless Framework will automatically configure the most minimal set of IAM permissions for you. However you can still add additional permissions if you need to. Read the official [AWS documentation](https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html) for more information about IAM Permissions for MSK events. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
13 changes: 13 additions & 0 deletions
13
lib/plugins/aws/package/compile/events/msk/getMskClusterNameToken.js
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
'use strict'; | ||
|
||
const getMskClusterNameToken = eventSourceArn => { | ||
if (eventSourceArn['Fn::ImportValue']) { | ||
return eventSourceArn['Fn::ImportValue']; | ||
} else if (eventSourceArn.Ref) { | ||
return eventSourceArn.Ref; | ||
} | ||
|
||
return eventSourceArn.split('/')[1]; | ||
}; | ||
|
||
module.exports = getMskClusterNameToken; |
27 changes: 27 additions & 0 deletions
27
lib/plugins/aws/package/compile/events/msk/getMskClusterNameToken.test.js
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
'use strict'; | ||
|
||
const chai = require('chai'); | ||
const getMskClusterNameToken = require('./getMskClusterNameToken'); | ||
|
||
const { expect } = chai; | ||
|
||
describe('getMskClusterNameToken', () => { | ||
it('with ARN', () => { | ||
const eventSourceArn = | ||
'arn:aws:kafka:us-east-1:111111111111:cluster/ClusterName/a1a1a1a1a1a1a1a1a'; | ||
const result = getMskClusterNameToken(eventSourceArn); | ||
expect(result).to.equal('ClusterName'); | ||
}); | ||
|
||
it('with Fn::ImportValue', () => { | ||
const eventSourceArn = { 'Fn::ImportValue': 'importvalue' }; | ||
const result = getMskClusterNameToken(eventSourceArn); | ||
expect(result).to.equal('importvalue'); | ||
}); | ||
|
||
it('with Ref', () => { | ||
const eventSourceArn = { Ref: 'ReferencedResource' }; | ||
const result = getMskClusterNameToken(eventSourceArn); | ||
expect(result).to.equal('ReferencedResource'); | ||
}); | ||
}); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,128 @@ | ||
'use strict'; | ||
|
||
const getMskClusterNameToken = require('./getMskClusterNameToken'); | ||
|
||
class AwsCompileMSKEvents { | ||
constructor(serverless) { | ||
this.serverless = serverless; | ||
this.provider = this.serverless.getProvider('aws'); | ||
|
||
this.hooks = { | ||
'package:compileEvents': this.compileMSKEvents.bind(this), | ||
}; | ||
|
||
this.serverless.configSchemaHandler.defineFunctionEvent('aws', 'msk', { | ||
type: 'object', | ||
properties: { | ||
arn: { | ||
oneOf: [ | ||
{ $ref: '#/definitions/awsArnString' }, | ||
{ $ref: '#/definitions/awsCfImport' }, | ||
{ $ref: '#/definitions/awsCfRef' }, | ||
], | ||
}, | ||
batchSize: { | ||
type: 'number', | ||
minimum: 1, | ||
maximum: 10000, | ||
}, | ||
enabled: { | ||
type: 'boolean', | ||
}, | ||
startingPosition: { | ||
type: 'string', | ||
enum: ['LATEST', 'TRIM_HORIZON'], | ||
}, | ||
topic: { | ||
type: 'string', | ||
}, | ||
}, | ||
additionalProperties: false, | ||
required: ['arn', 'topic'], | ||
}); | ||
} | ||
|
||
compileMSKEvents() { | ||
this.serverless.service.getAllFunctions().forEach(functionName => { | ||
const functionObj = this.serverless.service.getFunction(functionName); | ||
const cfTemplate = this.serverless.service.provider.compiledCloudFormationTemplate; | ||
|
||
// It is required to add the following statement in order to be able to connect to MSK cluster | ||
const ec2Statement = { | ||
Effect: 'Allow', | ||
Action: [ | ||
'ec2:CreateNetworkInterface', | ||
'ec2:DescribeNetworkInterfaces', | ||
'ec2:DescribeVpcs', | ||
'ec2:DeleteNetworkInterface', | ||
'ec2:DescribeSubnets', | ||
'ec2:DescribeSecurityGroups', | ||
], | ||
Resource: '*', | ||
}; | ||
const mskStatement = { | ||
Effect: 'Allow', | ||
Action: ['kafka:DescribeCluster', 'kafka:GetBootstrapBrokers'], | ||
Resource: [], | ||
}; | ||
|
||
functionObj.events.forEach(event => { | ||
if (event.msk) { | ||
const eventSourceArn = event.msk.arn; | ||
const topic = event.msk.topic; | ||
const batchSize = event.msk.batchSize; | ||
const enabled = event.msk.enabled; | ||
const startingPosition = event.msk.startingPosition || 'TRIM_HORIZON'; | ||
|
||
const mskClusterNameToken = getMskClusterNameToken(eventSourceArn); | ||
const mskEventLogicalId = this.provider.naming.getMSKEventLogicalId( | ||
functionName, | ||
mskClusterNameToken, | ||
topic | ||
); | ||
|
||
const dependsOn = this.provider.resolveFunctionIamRoleResourceName(functionObj) || []; | ||
|
||
const lambdaLogicalId = this.provider.naming.getLambdaLogicalId(functionName); | ||
|
||
const mskResource = { | ||
Type: 'AWS::Lambda::EventSourceMapping', | ||
DependsOn: dependsOn, | ||
Properties: { | ||
EventSourceArn: eventSourceArn, | ||
FunctionName: { | ||
'Fn::GetAtt': [lambdaLogicalId, 'Arn'], | ||
}, | ||
StartingPosition: startingPosition, | ||
Topics: [topic], | ||
}, | ||
}; | ||
|
||
if (batchSize) { | ||
mskResource.Properties.BatchSize = batchSize; | ||
} | ||
|
||
if (enabled != null) { | ||
mskResource.Properties.Enabled = enabled; | ||
} | ||
|
||
mskStatement.Resource.push(eventSourceArn); | ||
|
||
cfTemplate.Resources[mskEventLogicalId] = mskResource; | ||
} | ||
}); | ||
|
||
if (cfTemplate.Resources.IamRoleLambdaExecution) { | ||
const statement = | ||
cfTemplate.Resources.IamRoleLambdaExecution.Properties.Policies[0].PolicyDocument | ||
.Statement; | ||
if (mskStatement.Resource.length) { | ||
statement.push(mskStatement); | ||
statement.push(ec2Statement); | ||
} | ||
} | ||
}); | ||
} | ||
} | ||
|
||
module.exports = AwsCompileMSKEvents; |
Oops, something went wrong.