diff --git a/docs/providers/aws/events/msk.md b/docs/providers/aws/events/msk.md new file mode 100644 index 00000000000..64407a51580 --- /dev/null +++ b/docs/providers/aws/events/msk.md @@ -0,0 +1,84 @@ + + + + +### [Read this on the main serverless docs site](https://www.serverless.com/framework/docs/providers/aws/events/msk) + + + +# MSK + +[Amazon Managed Streaming for Apache Kafka (Amazon MSK)](https://aws.amazon.com/msk/) is a fully managed streaming service that uses Apache Kafka. +Amazon MSK can be used as event source for Lambda, which allows Lambda service to internally poll it for new messages and invoke corresponding Lambda functions. + +## Simple event definition + +In the following example, we specify that the `compute` function should be triggered whenever there are new messages available to consume from defined Kafka `topic`. + +In order to configure `msk` event, you have to provide two required properties: `arn`, which represents an ARN of MSK cluster and `topic` to consume messages from. + +The ARN for the MSK cluster can be specified as a string, the reference to the ARN resource by a logical ID, or the import of an ARN that was exported by a different service or CloudFormation stack. + +```yml +functions: + compute: + handler: handler.compute + events: + # These are all possible formats + - msk: + arn: arn:aws:kafka:region:XXXXXX:cluster/MyCluster/xxxx-xxxxx-xxxx + topic: mytopic + - msk: + arn: + Fn::ImportValue: MyExportedMSKClusterArn + topic: mytopic + - msk: + arn: !Ref MyMSKCluster + topic: mytopic +``` + +## Setting the BatchSize and StartingPosition + +For the MSK event integration, you can set the `batchSize`, which effects how many messages can be processed in a single Lambda invocation. The default `batchSize` is 100, and the max `batchSize` is 10000. +In addition, you can also configure `startingPosition`, which controls the position at which Lambda should start consuming messages from MSK topic. It supports two possible values, `TRIM_HORIZON` and `LATEST`, with `TRIM_HORIZON` being the default. + +In the following example, we specify that the `compute` function should have an `msk` event configured with `batchSize` of 1000 and `startingPosition` equal to `LATEST`. + +```yml +functions: + compute: + handler: handler.compute + events: + - msk: + arn: arn:aws:kafka:region:XXXXXX:cluster/MyCluster/xxxx-xxxxx-xxxx + topic: mytopic + batchSize: 1000 + startingPosition: LATEST +``` + +## Enabling and disabling MSK event + +The `msk` event also supports `enabled` parameter, which is used to control if the event source mapping is active. Setting it to `false` will pause polling for and processing new messages. + +In the following example, we specify that the `company` function's `msk` event should be disabled. + +```yml +functions: + compute: + handler: handler.compute + events: + - msk: + arn: arn:aws:kafka:region:XXXXXX:cluster/MyCluster/xxxx-xxxxx-xxxx + topic: mytopic + enabled: false +``` + +## IAM Permissions + +The Serverless Framework will automatically configure the most minimal set of IAM permissions for you. However you can still add additional permissions if you need to. Read the official [AWS documentation](https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html) for more information about IAM Permissions for MSK events. diff --git a/lib/plugins/aws/lib/naming.js b/lib/plugins/aws/lib/naming.js index fe351f9a18b..4005ec742b9 100644 --- a/lib/plugins/aws/lib/naming.js +++ b/lib/plugins/aws/lib/naming.js @@ -408,6 +408,15 @@ module.exports = { )}EventSourceMappingSQS${this.normalizeNameToAlphaNumericOnly(queueName)}`; }, + // MSK + getMSKEventLogicalId(functionName, clusterName, topicName) { + const normalizedFunctionName = this.getNormalizedFunctionName(functionName); + // Both clusterName and topicName are trimmed to 79 chars to avoid going over 255 character limit + const normalizedClusterName = this.normalizeNameToAlphaNumericOnly(clusterName).slice(0, 79); + const normalizedTopicName = this.normalizeNameToAlphaNumericOnly(topicName).slice(0, 79); + return `${normalizedFunctionName}EventSourceMappingMSK${normalizedClusterName}${normalizedTopicName}`; + }, + // ALB getAlbTargetGroupLogicalId(functionName, albId, multiValueHeaders) { return `${this.getNormalizedFunctionName(functionName)}Alb${ diff --git a/lib/plugins/aws/lib/naming.test.js b/lib/plugins/aws/lib/naming.test.js index 0615f63bbb0..ffa03ab14c3 100644 --- a/lib/plugins/aws/lib/naming.test.js +++ b/lib/plugins/aws/lib/naming.test.js @@ -719,6 +719,14 @@ describe('#naming()', () => { }); }); + describe('#getMSKEventLogicalId()', () => { + it('should normalize the function name and append normalized cluster and topic names', () => { + expect( + sdk.naming.getMSKEventLogicalId('functionName', 'my-kafka-cluster', 'kafka-topic') + ).to.equal('FunctionNameEventSourceMappingMSKMykafkaclusterKafkatopic'); + }); + }); + describe('#getAlbTargetGroupLogicalId()', () => { it('should normalize the function name', () => { expect(sdk.naming.getAlbTargetGroupLogicalId('functionName', 'abc123')).to.equal( diff --git a/lib/plugins/aws/package/compile/events/msk/getMskClusterNameToken.js b/lib/plugins/aws/package/compile/events/msk/getMskClusterNameToken.js new file mode 100644 index 00000000000..ce0fdd5d37a --- /dev/null +++ b/lib/plugins/aws/package/compile/events/msk/getMskClusterNameToken.js @@ -0,0 +1,13 @@ +'use strict'; + +const getMskClusterNameToken = eventSourceArn => { + if (eventSourceArn['Fn::ImportValue']) { + return eventSourceArn['Fn::ImportValue']; + } else if (eventSourceArn.Ref) { + return eventSourceArn.Ref; + } + + return eventSourceArn.split('/')[1]; +}; + +module.exports = getMskClusterNameToken; diff --git a/lib/plugins/aws/package/compile/events/msk/getMskClusterNameToken.test.js b/lib/plugins/aws/package/compile/events/msk/getMskClusterNameToken.test.js new file mode 100644 index 00000000000..d401dfd71c1 --- /dev/null +++ b/lib/plugins/aws/package/compile/events/msk/getMskClusterNameToken.test.js @@ -0,0 +1,27 @@ +'use strict'; + +const chai = require('chai'); +const getMskClusterNameToken = require('./getMskClusterNameToken'); + +const { expect } = chai; + +describe('getMskClusterNameToken', () => { + it('with ARN', () => { + const eventSourceArn = + 'arn:aws:kafka:us-east-1:111111111111:cluster/ClusterName/a1a1a1a1a1a1a1a1a'; + const result = getMskClusterNameToken(eventSourceArn); + expect(result).to.equal('ClusterName'); + }); + + it('with Fn::ImportValue', () => { + const eventSourceArn = { 'Fn::ImportValue': 'importvalue' }; + const result = getMskClusterNameToken(eventSourceArn); + expect(result).to.equal('importvalue'); + }); + + it('with Ref', () => { + const eventSourceArn = { Ref: 'ReferencedResource' }; + const result = getMskClusterNameToken(eventSourceArn); + expect(result).to.equal('ReferencedResource'); + }); +}); diff --git a/lib/plugins/aws/package/compile/events/msk/index.js b/lib/plugins/aws/package/compile/events/msk/index.js new file mode 100644 index 00000000000..134bb3f664a --- /dev/null +++ b/lib/plugins/aws/package/compile/events/msk/index.js @@ -0,0 +1,128 @@ +'use strict'; + +const getMskClusterNameToken = require('./getMskClusterNameToken'); + +class AwsCompileMSKEvents { + constructor(serverless) { + this.serverless = serverless; + this.provider = this.serverless.getProvider('aws'); + + this.hooks = { + 'package:compileEvents': this.compileMSKEvents.bind(this), + }; + + this.serverless.configSchemaHandler.defineFunctionEvent('aws', 'msk', { + type: 'object', + properties: { + arn: { + oneOf: [ + { $ref: '#/definitions/awsArnString' }, + { $ref: '#/definitions/awsCfImport' }, + { $ref: '#/definitions/awsCfRef' }, + ], + }, + batchSize: { + type: 'number', + minimum: 1, + maximum: 10000, + }, + enabled: { + type: 'boolean', + }, + startingPosition: { + type: 'string', + enum: ['LATEST', 'TRIM_HORIZON'], + }, + topic: { + type: 'string', + }, + }, + additionalProperties: false, + required: ['arn', 'topic'], + }); + } + + compileMSKEvents() { + this.serverless.service.getAllFunctions().forEach(functionName => { + const functionObj = this.serverless.service.getFunction(functionName); + const cfTemplate = this.serverless.service.provider.compiledCloudFormationTemplate; + + // It is required to add the following statement in order to be able to connect to MSK cluster + const ec2Statement = { + Effect: 'Allow', + Action: [ + 'ec2:CreateNetworkInterface', + 'ec2:DescribeNetworkInterfaces', + 'ec2:DescribeVpcs', + 'ec2:DeleteNetworkInterface', + 'ec2:DescribeSubnets', + 'ec2:DescribeSecurityGroups', + ], + Resource: '*', + }; + const mskStatement = { + Effect: 'Allow', + Action: ['kafka:DescribeCluster', 'kafka:GetBootstrapBrokers'], + Resource: [], + }; + + functionObj.events.forEach(event => { + if (event.msk) { + const eventSourceArn = event.msk.arn; + const topic = event.msk.topic; + const batchSize = event.msk.batchSize; + const enabled = event.msk.enabled; + const startingPosition = event.msk.startingPosition || 'TRIM_HORIZON'; + + const mskClusterNameToken = getMskClusterNameToken(eventSourceArn); + const mskEventLogicalId = this.provider.naming.getMSKEventLogicalId( + functionName, + mskClusterNameToken, + topic + ); + + const dependsOn = this.provider.resolveFunctionIamRoleResourceName(functionObj) || []; + + const lambdaLogicalId = this.provider.naming.getLambdaLogicalId(functionName); + + const mskResource = { + Type: 'AWS::Lambda::EventSourceMapping', + DependsOn: dependsOn, + Properties: { + EventSourceArn: eventSourceArn, + FunctionName: { + 'Fn::GetAtt': [lambdaLogicalId, 'Arn'], + }, + StartingPosition: startingPosition, + Topics: [topic], + }, + }; + + if (batchSize) { + mskResource.Properties.BatchSize = batchSize; + } + + if (enabled != null) { + mskResource.Properties.Enabled = enabled; + } + + mskStatement.Resource.push(eventSourceArn); + + cfTemplate.Resources[mskEventLogicalId] = mskResource; + } + }); + + if (cfTemplate.Resources.IamRoleLambdaExecution) { + const statement = + cfTemplate.Resources.IamRoleLambdaExecution.Properties.Policies[0].PolicyDocument + .Statement; + if (mskStatement.Resource.length) { + statement.push(mskStatement); + statement.push(ec2Statement); + } + } + }); + } +} + +module.exports = AwsCompileMSKEvents; diff --git a/lib/plugins/aws/package/compile/events/msk/index.test.js b/lib/plugins/aws/package/compile/events/msk/index.test.js new file mode 100644 index 00000000000..be4f87b541d --- /dev/null +++ b/lib/plugins/aws/package/compile/events/msk/index.test.js @@ -0,0 +1,144 @@ +'use strict'; + +const chai = require('chai'); +const runServerless = require('../../../../../../../test/utils/run-serverless'); + +const { expect } = chai; + +chai.use(require('chai-as-promised')); + +describe('AwsCompileMSKEvents', () => { + const arn = 'arn:aws:kafka:us-east-1:111111111111:cluster/ClusterName/a1a1a1a1a1a1a1a1a'; + const topic = 'TestingTopic'; + const enabled = false; + const startingPosition = 'LATEST'; + const batchSize = 5000; + + describe('when there are msk events defined', () => { + let minimalEventSourceMappingResource; + let allParamsEventSourceMappingResource; + let defaultIamRole; + let naming; + + before(async () => { + const { awsNaming, cfTemplate } = await runServerless({ + fixture: 'function', + configExt: { + functions: { + foo: { + events: [ + { + msk: { + topic, + arn, + }, + }, + ], + }, + other: { + events: [ + { + msk: { + topic, + arn, + batchSize, + enabled, + startingPosition, + }, + }, + ], + }, + }, + }, + cliArgs: ['package'], + }); + naming = awsNaming; + minimalEventSourceMappingResource = + cfTemplate.Resources[naming.getMSKEventLogicalId('foo', 'ClusterName', 'TestingTopic')]; + allParamsEventSourceMappingResource = + cfTemplate.Resources[naming.getMSKEventLogicalId('other', 'ClusterName', 'TestingTopic')]; + defaultIamRole = cfTemplate.Resources.IamRoleLambdaExecution; + }); + + it('should correctly compile EventSourceMapping resource properties with minimal configuration', () => { + expect(minimalEventSourceMappingResource.Properties).to.deep.equal({ + EventSourceArn: arn, + StartingPosition: 'TRIM_HORIZON', + Topics: [topic], + FunctionName: { + 'Fn::GetAtt': [naming.getLambdaLogicalId('foo'), 'Arn'], + }, + }); + }); + + it('should update default IAM role with MSK statement', () => { + expect(defaultIamRole.Properties.Policies[0].PolicyDocument.Statement).to.deep.include({ + Effect: 'Allow', + Action: ['kafka:DescribeCluster', 'kafka:GetBootstrapBrokers'], + Resource: [arn], + }); + }); + + it('should update default IAM role with EC2 statement', () => { + expect(defaultIamRole.Properties.Policies[0].PolicyDocument.Statement).to.deep.include({ + Effect: 'Allow', + Action: [ + 'ec2:CreateNetworkInterface', + 'ec2:DescribeNetworkInterfaces', + 'ec2:DescribeVpcs', + 'ec2:DeleteNetworkInterface', + 'ec2:DescribeSubnets', + 'ec2:DescribeSecurityGroups', + ], + Resource: '*', + }); + }); + + it('should correctly compile EventSourceMapping resource DependsOn ', () => { + expect(minimalEventSourceMappingResource.DependsOn).to.equal('IamRoleLambdaExecution'); + expect(allParamsEventSourceMappingResource.DependsOn).to.equal('IamRoleLambdaExecution'); + }); + + it('should correctly complie EventSourceMapping resource with all parameters', () => { + expect(allParamsEventSourceMappingResource.Properties).to.deep.equal({ + BatchSize: batchSize, + Enabled: enabled, + EventSourceArn: arn, + StartingPosition: startingPosition, + Topics: [topic], + FunctionName: { + 'Fn::GetAtt': [naming.getLambdaLogicalId('other'), 'Arn'], + }, + }); + }); + }); + + describe('when no msk events are defined', () => { + it('should not modify the default IAM role', async () => { + const { cfTemplate } = await runServerless({ + fixture: 'function', + cliArgs: ['package'], + }); + + const defaultIamRole = cfTemplate.Resources.IamRoleLambdaExecution; + expect(defaultIamRole.Properties.Policies[0].PolicyDocument.Statement).not.to.deep.include({ + Effect: 'Allow', + Action: ['kafka:DescribeCluster', 'kafka:GetBootstrapBrokers'], + Resource: [], + }); + + expect(defaultIamRole.Properties.Policies[0].PolicyDocument.Statement).not.to.deep.include({ + Effect: 'Allow', + Action: [ + 'ec2:CreateNetworkInterface', + 'ec2:DescribeNetworkInterfaces', + 'ec2:DescribeVpcs', + 'ec2:DeleteNetworkInterface', + 'ec2:DescribeSubnets', + 'ec2:DescribeSecurityGroups', + ], + Resource: '*', + }); + }); + }); +}); diff --git a/lib/plugins/index.js b/lib/plugins/index.js index fdd7d41f7a9..79b2145bb5e 100644 --- a/lib/plugins/index.js +++ b/lib/plugins/index.js @@ -40,6 +40,7 @@ module.exports = [ require('./aws/package/compile/events/websockets/index.js'), require('./aws/package/compile/events/sns/index.js'), require('./aws/package/compile/events/stream/index.js'), + require('./aws/package/compile/events/msk/index.js'), require('./aws/package/compile/events/alb/index.js'), require('./aws/package/compile/events/alexaSkill/index.js'), require('./aws/package/compile/events/alexaSmartHome/index.js'), diff --git a/package.json b/package.json index 10377d4e188..e6432e8bbe6 100644 --- a/package.json +++ b/package.json @@ -86,6 +86,7 @@ "jszip": "^3.5.0", "lint-staged": "^10.4.0", "log": "^6.0.0", + "log-node": "^7.0.0", "mocha": "^6.2.3", "mock-require": "^3.0.3", "nyc": "^15.1.0", @@ -185,6 +186,8 @@ "integration-test-run-all": "mocha-isolated --pass-through-aws-creds --skip-fs-cleanup-check --max-workers=20 \"test/integration/**/*.test.js\"", "integration-test-run-basic": "mocha test/integrationBasic.test.js", "integration-test-run-package": "mocha-isolated --skip-fs-cleanup-check test/integrationPackage/**/*.tests.js", + "integration-test-setup": "node ./scripts/test/integration-setup/index.js", + "integration-test-teardown": "node ./scripts/test/integration-teardown.js", "lint": "eslint .", "lint:updated": "pipe-git-updated --ext=js -- eslint", "pkg:build": "node ./scripts/pkg/build.js", diff --git a/scripts/test/integration-setup/cloudformation.yml b/scripts/test/integration-setup/cloudformation.yml new file mode 100644 index 00000000000..812e2d75bcf --- /dev/null +++ b/scripts/test/integration-setup/cloudformation.yml @@ -0,0 +1,155 @@ +AWSTemplateFormatVersion: 2010-09-09 + +Parameters: + ClusterName: + Type: String + Description: Name of MSK Cluster + ClusterConfigurationArn: + Type: String + Description: MSK Cluster Configuration ARN + ClusterConfigurationRevision: + Type: Number + Description: MSK Cluster Configuration Revision number + Default: 1 + +Resources: + VPC: + Type: AWS::EC2::VPC + Properties: + CidrBlock: 172.31.0.0/16 + Tags: + - Key: Name + Value: !Ref AWS::StackName + + PublicSubnet: + Type: AWS::EC2::Subnet + Properties: + VpcId: !Ref VPC + AvailabilityZone: + Fn::Select: + - 0 + - Fn::GetAZs: '' + CidrBlock: 172.31.0.0/24 + MapPublicIpOnLaunch: true + + PrivateSubnetA: + Type: AWS::EC2::Subnet + Properties: + VpcId: !Ref VPC + AvailabilityZone: + Fn::Select: + - 0 + - Fn::GetAZs: '' + CidrBlock: 172.31.3.0/24 + MapPublicIpOnLaunch: false + + PrivateSubnetB: + Type: AWS::EC2::Subnet + Properties: + VpcId: !Ref VPC + AvailabilityZone: + Fn::Select: + - 1 + - Fn::GetAZs: '' + CidrBlock: 172.31.2.0/24 + MapPublicIpOnLaunch: false + + InternetGateway: + Type: AWS::EC2::InternetGateway + + GatewayAttachment: + Type: AWS::EC2::VPCGatewayAttachment + Properties: + VpcId: !Ref VPC + InternetGatewayId: !Ref InternetGateway + + PublicRouteTable: + Type: AWS::EC2::RouteTable + Properties: + VpcId: !Ref VPC + + PublicRoute: + Type: AWS::EC2::Route + DependsOn: GatewayAttachment + Properties: + RouteTableId: !Ref PublicRouteTable + DestinationCidrBlock: 0.0.0.0/0 + GatewayId: !Ref InternetGateway + + PublicSubnetRouteTableAssociation: + Type: AWS::EC2::SubnetRouteTableAssociation + Properties: + SubnetId: !Ref PublicSubnet + RouteTableId: !Ref PublicRouteTable + + NatGateway: + Type: AWS::EC2::NatGateway + DependsOn: NatPublicIP + Properties: + AllocationId: !GetAtt NatPublicIP.AllocationId + SubnetId: !Ref PublicSubnet + + NatPublicIP: + Type: AWS::EC2::EIP + DependsOn: VPC + Properties: + Domain: vpc + + PrivateRouteTable: + Type: AWS::EC2::RouteTable + Properties: + VpcId: !Ref VPC + + PrivateRoute: + Type: AWS::EC2::Route + Properties: + RouteTableId: !Ref PrivateRouteTable + DestinationCidrBlock: 0.0.0.0/0 + NatGatewayId: !Ref NatGateway + + PrivateSubnetARouteTableAssociation: + Type: AWS::EC2::SubnetRouteTableAssociation + Properties: + SubnetId: !Ref PrivateSubnetA + RouteTableId: !Ref PrivateRouteTable + + PrivateSubnetBRouteTableAssociation: + Type: AWS::EC2::SubnetRouteTableAssociation + Properties: + SubnetId: !Ref PrivateSubnetB + RouteTableId: !Ref PrivateRouteTable + + MSKCluster: + Type: 'AWS::MSK::Cluster' + Properties: + ClusterName: !Ref ClusterName + KafkaVersion: 2.2.1 + NumberOfBrokerNodes: 2 + BrokerNodeGroupInfo: + InstanceType: kafka.t3.small + ClientSubnets: + - !Ref PrivateSubnetA + - !Ref PrivateSubnetB + StorageInfo: + EBSStorageInfo: + VolumeSize: 1 + ConfigurationInfo: + Arn: !Ref ClusterConfigurationArn + Revision: !Ref ClusterConfigurationRevision + +Outputs: + VPC: + Description: VPC ID + Value: !Ref VPC + + PrivateSubnetA: + Description: Private Subnet A ID + Value: !Ref PrivateSubnetA + + SecurityGroup: + Description: Default security for Lambda VPC + Value: !GetAtt VPC.DefaultSecurityGroup + + MSKCluster: + Description: Created MSK Cluster + Value: !Ref MSKCluster diff --git a/scripts/test/integration-setup/index.js b/scripts/test/integration-setup/index.js new file mode 100755 index 00000000000..8bf49dc947d --- /dev/null +++ b/scripts/test/integration-setup/index.js @@ -0,0 +1,117 @@ +#!/usr/bin/env node + +'use strict'; + +require('essentials'); +require('log-node')(); + +const log = require('log').get('serverless'); +const awsRequest = require('@serverless/test/aws-request'); +const fsPromises = require('fs').promises; +const path = require('path'); +const { SHARED_INFRA_TESTS_CLOUDFORMATION_STACK } = require('../../../test/utils/cludformation'); + +async function handleInfrastructureCreation() { + const [cfnTemplate, kafkaServerProperties] = await Promise.all([ + fsPromises.readFile(path.join(__dirname, 'cloudformation.yml'), 'utf8'), + fsPromises.readFile(path.join(__dirname, 'kafka.server.properties')), + ]); + + const clusterName = 'integration-tests-msk-cluster'; + const clusterConfName = 'integration-tests-msk-cluster-configuration'; + + log.notice('Creating MSK Cluster configuration...'); + const clusterConfResponse = await awsRequest('Kafka', 'createConfiguration', { + Name: clusterConfName, + ServerProperties: kafkaServerProperties, + KafkaVersions: ['2.2.1'], + }); + + const clusterConfigurationArn = clusterConfResponse.Arn; + const clusterConfigurationRevision = clusterConfResponse.LatestRevision.Revision.toString(); + + log.notice('Deploying integration tests CloudFormation stack...'); + await awsRequest('CloudFormation', 'createStack', { + StackName: SHARED_INFRA_TESTS_CLOUDFORMATION_STACK, + TemplateBody: cfnTemplate, + Parameters: [ + { ParameterKey: 'ClusterName', ParameterValue: clusterName }, + { ParameterKey: 'ClusterConfigurationArn', ParameterValue: clusterConfigurationArn }, + { + ParameterKey: 'ClusterConfigurationRevision', + ParameterValue: clusterConfigurationRevision, + }, + ], + }); + + await awsRequest('CloudFormation', 'waitFor', 'stackCreateComplete', { + StackName: SHARED_INFRA_TESTS_CLOUDFORMATION_STACK, + }); + log.notice('Deployed integration tests CloudFormation stack!'); +} + +async function handleInfrastructureUpdate() { + log.notice('Updating integration tests CloudFormation stack...'); + + const cfnTemplate = await fsPromises.readFile(path.join(__dirname, 'cloudformation.yml'), 'utf8'); + + try { + await awsRequest('CloudFormation', 'updateStack', { + StackName: SHARED_INFRA_TESTS_CLOUDFORMATION_STACK, + TemplateBody: cfnTemplate, + Parameters: [ + { ParameterKey: 'ClusterName', UsePreviousValue: true }, + { ParameterKey: 'ClusterConfigurationArn', UsePreviousValue: true }, + { + ParameterKey: 'ClusterConfigurationRevision', + UsePreviousValue: true, + }, + ], + }); + } catch (e) { + if (e.message === 'No updates are to be performed.') { + log.notice('No changes detected. Integration tests CloudFormation stack is up to date.'); + return; + } + throw e; + } + + await awsRequest('CloudFormation', 'waitFor', 'stackUpdateComplete', { + StackName: SHARED_INFRA_TESTS_CLOUDFORMATION_STACK, + }); + log.notice('Updated integration tests CloudFormation stack!'); +} + +(async () => { + log.notice('Starting setup of integration infrastructure...'); + + let describeResponse; + + log.notice('Checking if integration tests CloudFormation stack already exists...'); + try { + describeResponse = await awsRequest('CloudFormation', 'describeStacks', { + StackName: SHARED_INFRA_TESTS_CLOUDFORMATION_STACK, + }); + log.notice('Integration tests CloudFormation stack already exists'); + } catch (e) { + if (e.code !== 'ValidationError') { + throw e; + } + log.notice('Integration tests CloudFormation does not exist'); + } + + if (describeResponse) { + const stackStatus = describeResponse.Stacks[0].StackStatus; + + if (['CREATE_COMPLETE', 'UPDATE_COMPLETE'].includes(stackStatus)) { + await handleInfrastructureUpdate(); + } else { + log.error('Existing stack has status: {stackStatus} and it cannot be updated.'); + process.exitCode = 1; + } + } else { + await handleInfrastructureCreation(); + } + + log.notice('Setup of integration infrastructure finished'); +})(); diff --git a/scripts/test/integration-setup/kafka.server.properties b/scripts/test/integration-setup/kafka.server.properties new file mode 100644 index 00000000000..435f23ad9da --- /dev/null +++ b/scripts/test/integration-setup/kafka.server.properties @@ -0,0 +1,2 @@ +auto.create.topics.enable=true +default.replication.factor=2 diff --git a/scripts/test/integration-teardown.js b/scripts/test/integration-teardown.js new file mode 100755 index 00000000000..03680ee921f --- /dev/null +++ b/scripts/test/integration-teardown.js @@ -0,0 +1,61 @@ +#!/usr/bin/env node + +'use strict'; + +require('essentials'); +require('log-node')(); + +const log = require('log').get('serverless'); +const awsRequest = require('@serverless/test/aws-request'); +const { + SHARED_INFRA_TESTS_CLOUDFORMATION_STACK, + getDependencyStackOutputMap, +} = require('../../test/utils/cludformation'); + +(async () => { + log.notice('Starting teardown of integration infrastructure...'); + const describeClustersResponse = await awsRequest('Kafka', 'listClusters'); + const clusterConfArn = + describeClustersResponse.ClusterInfoList[0].CurrentBrokerSoftwareInfo.ConfigurationArn; + + const outputMap = await getDependencyStackOutputMap(); + + log.notice('Removing leftover ENI...'); + const describeResponse = await awsRequest('EC2', 'describeNetworkInterfaces', { + Filters: [ + { + Name: 'vpc-id', + Values: [outputMap.get('VPC')], + }, + { + Name: 'status', + Values: ['available'], + }, + ], + }); + try { + await Promise.all( + describeResponse.NetworkInterfaces.map(networkInterface => + awsRequest('EC2', 'deleteNetworkInterface', { + NetworkInterfaceId: networkInterface.NetworkInterfaceId, + }) + ) + ); + } catch (e) { + log.error(`Error: ${e} while trying to remove leftover ENIs\n`); + } + log.notice('Removing integration tests CloudFormation stack...'); + await awsRequest('CloudFormation', 'deleteStack', { + StackName: SHARED_INFRA_TESTS_CLOUDFORMATION_STACK, + }); + await awsRequest('CloudFormation', 'waitFor', 'stackDeleteComplete', { + StackName: SHARED_INFRA_TESTS_CLOUDFORMATION_STACK, + }); + log.notice('Removed integration tests CloudFormation stack!'); + log.notice('Removing MSK Cluster configuration...'); + await awsRequest('Kafka', 'deleteConfiguration', { + Arn: clusterConfArn, + }); + log.notice('Removed MSK Cluster configuration'); + log.notice('Teardown of integration infrastructure finished'); +})(); diff --git a/test/README.md b/test/README.md index 77d22f09c8b..0a14ca6006c 100644 --- a/test/README.md +++ b/test/README.md @@ -60,6 +60,19 @@ Pass test file to Mocha directly as follows AWS_ACCESS_KEY_ID=XXX AWS_SECRET_ACCESS_KEY=xxx npx mocha tests/integration/{chosen}.test.js ``` +### Tests that depend on shared infrastructure stack + +Due to the fact that some of the tests require a bit more complex infrastructure setup which might be lengthy, two additional commands has been made available: + +- `integration-test-setup-infrastructure` - used for setting up all needed intrastructure dependencies +- `integration-test-teardown-infrastructure` - used for tearing down the infrastructure setup by the above command + +Such tests take advantage of `isDependencyStackAvailable` util to check if all needed dependencies are ready. If not, it skips the given test suite. + +Examples of such tests: + +- [MSK](./integration/msk.test.js) + ## Testing templates If you add a new template or want to test a template after changing it you can run the template integration tests. Make sure you have `docker` and `docker-compose` installed as they are required. The `docker` containers we're using through compose are automatically including your `$HOME/.aws` folder so you can deploy to AWS. diff --git a/test/fixtures/functionMsk/core.js b/test/fixtures/functionMsk/core.js new file mode 100644 index 00000000000..e00ec4ab8fd --- /dev/null +++ b/test/fixtures/functionMsk/core.js @@ -0,0 +1,42 @@ +'use strict'; + +// NOTE: `kafkajs` is bundled into the deployment package +// eslint-disable-next-line import/no-unresolved +const { Kafka } = require('kafkajs'); + +function consumer(event, context, callback) { + const functionName = 'consumer'; + const { records } = event; + const messages = Object.values(records)[0].map(record => + Buffer.from(record.value, 'base64').toString() + ); + // eslint-disable-next-line no-console + console.log(functionName, JSON.stringify(messages)); + return callback(null, event); +} + +async function producer() { + const kafkaBrokers = process.env.BROKER_URLS.split(','); + const kafkaTopic = process.env.TOPIC_NAME; + + const kafka = new Kafka({ + clientId: 'myapp', + brokers: kafkaBrokers, + ssl: true, + }); + + const kafkaProducer = kafka.producer(); + await kafkaProducer.connect(); + await kafkaProducer.send({ + topic: kafkaTopic, + messages: [{ value: 'Hello from MSK Integration test!' }], + }); + + await kafkaProducer.disconnect(); + + return { + statusCode: 200, + }; +} + +module.exports = { producer, consumer }; diff --git a/test/fixtures/functionMsk/package.json b/test/fixtures/functionMsk/package.json new file mode 100644 index 00000000000..86a9cdb040e --- /dev/null +++ b/test/fixtures/functionMsk/package.json @@ -0,0 +1,5 @@ +{ + "dependencies": { + "kafkajs": "^1.13.0" + } +} diff --git a/test/fixtures/functionMsk/serverless.yml b/test/fixtures/functionMsk/serverless.yml new file mode 100644 index 00000000000..02a66625baf --- /dev/null +++ b/test/fixtures/functionMsk/serverless.yml @@ -0,0 +1,18 @@ +service: service + +configValidationMode: error +frameworkVersion: '*' + +# VPC and Events configuration is added dynamically during test run +# Because it has to be provisioned separately via CloudFormation stack + +provider: + name: aws + runtime: nodejs12.x + versionFunctions: false + +functions: + producer: + handler: core.producer + consumer: + handler: core.consumer diff --git a/test/integration/infra-dependent/msk.test.js b/test/integration/infra-dependent/msk.test.js new file mode 100644 index 00000000000..57e0836b8e6 --- /dev/null +++ b/test/integration/infra-dependent/msk.test.js @@ -0,0 +1,96 @@ +'use strict'; + +const { expect } = require('chai'); +const log = require('log').get('serverless:test'); +const fixtures = require('../../fixtures'); +const { confirmCloudWatchLogs } = require('../../utils/misc'); +const { + isDependencyStackAvailable, + getDependencyStackOutputMap, +} = require('../../utils/cludformation'); + +const awsRequest = require('@serverless/test/aws-request'); +const crypto = require('crypto'); +const { deployService, removeService } = require('../../utils/integration'); + +describe('AWS - MSK Integration Test', function() { + this.timeout(1000 * 60 * 100); // Involves time-taking deploys + let stackName; + let servicePath; + const stage = 'dev'; + + const topicName = `msk-topic-${crypto.randomBytes(8).toString('hex')}`; + + before(async () => { + const isDepsStackAvailable = await isDependencyStackAvailable(); + if (!isDepsStackAvailable) { + throw new Error('CloudFormation stack with integration test dependencies not found.'); + } + + const outputMap = await getDependencyStackOutputMap(); + + log.notice('Getting MSK Boostrap Brokers URLs...'); + const getBootstrapBrokersResponse = await awsRequest('Kafka', 'getBootstrapBrokers', { + ClusterArn: outputMap.get('MSKCluster'), + }); + const brokerUrls = getBootstrapBrokersResponse.BootstrapBrokerStringTls; + + const serviceData = await fixtures.setup('functionMsk', { + configExt: { + functions: { + producer: { + vpc: { + subnetIds: [outputMap.get('PrivateSubnetA')], + securityGroupIds: [outputMap.get('SecurityGroup')], + }, + environment: { + TOPIC_NAME: topicName, + BROKER_URLS: brokerUrls, + }, + }, + consumer: { + events: [ + { + msk: { + arn: outputMap.get('MSKCluster'), + topic: topicName, + }, + }, + ], + }, + }, + }, + }); + + ({ servicePath } = serviceData); + + const serviceName = serviceData.serviceConfig.service; + stackName = `${serviceName}-${stage}`; + await deployService(servicePath); + }); + + after(async () => { + if (servicePath) { + await removeService(servicePath); + } + }); + + it('correctly processes messages from MSK topic', async () => { + const functionName = 'consumer'; + const message = 'Hello from MSK Integration test!'; + + const events = await confirmCloudWatchLogs( + `/aws/lambda/${stackName}-${functionName}`, + async () => + await awsRequest('Lambda', 'invoke', { + FunctionName: `${stackName}-producer`, + InvocationType: 'RequestResponse', + }), + { timeout: 120 * 1000 } + ); + + const logs = events.reduce((data, event) => data + event.message, ''); + expect(logs).to.include(functionName); + expect(logs).to.include(message); + }); +}); diff --git a/test/utils/cludformation.js b/test/utils/cludformation.js index d84c9cd8b07..243b55d69ca 100644 --- a/test/utils/cludformation.js +++ b/test/utils/cludformation.js @@ -2,6 +2,8 @@ const awsRequest = require('@serverless/test/aws-request'); +const SHARED_INFRA_TESTS_CLOUDFORMATION_STACK = 'integration-tests-deps-stack'; + function findStacks(name, status) { const params = {}; if (status) { @@ -57,9 +59,48 @@ function listStacks(status) { return awsRequest('CloudFormation', 'listStacks', params); } +async function getStackOutputMap(name) { + const describeStackResponse = await awsRequest('CloudFormation', 'describeStacks', { + StackName: name, + }); + + const outputsMap = new Map(); + for (const { OutputKey: key, OutputValue: value } of describeStackResponse.Stacks[0].Outputs) { + outputsMap.set(key, value); + } + return outputsMap; +} + +async function isDependencyStackAvailable() { + const validStatuses = ['CREATE_COMPLETE', 'UPDATE_COMPLETE']; + + try { + const describeStacksResponse = await awsRequest('CloudFormation', 'describeStacks', { + StackName: SHARED_INFRA_TESTS_CLOUDFORMATION_STACK, + }); + if (validStatuses.includes(describeStacksResponse.Stacks[0].StackStatus)) { + return true; + } + return false; + } catch (e) { + if (e.code === 'ValidationError') { + return false; + } + throw e; + } +} + +async function getDependencyStackOutputMap() { + return getStackOutputMap(SHARED_INFRA_TESTS_CLOUDFORMATION_STACK); +} + module.exports = { findStacks, deleteStack, listStackResources, listStacks, + getStackOutputMap, + SHARED_INFRA_TESTS_CLOUDFORMATION_STACK, + isDependencyStackAvailable, + getDependencyStackOutputMap, };