From 2ac521e807b8abdc41f2ad97df583802b79bff9c Mon Sep 17 00:00:00 2001 From: Piotr Grzesik Date: Mon, 31 Aug 2020 15:47:24 +0200 Subject: [PATCH 01/12] feat(aws): Add support for MSK event --- lib/plugins/aws/lib/naming.js | 9 + lib/plugins/aws/lib/naming.test.js | 8 + .../aws/package/compile/events/msk/index.js | 178 ++++++++++++++++++ .../package/compile/events/msk/index.test.js | 171 +++++++++++++++++ lib/plugins/index.js | 1 + 5 files changed, 367 insertions(+) create mode 100644 lib/plugins/aws/package/compile/events/msk/index.js create mode 100644 lib/plugins/aws/package/compile/events/msk/index.test.js diff --git a/lib/plugins/aws/lib/naming.js b/lib/plugins/aws/lib/naming.js index fe351f9a18b..4700a4033d3 100644 --- a/lib/plugins/aws/lib/naming.js +++ b/lib/plugins/aws/lib/naming.js @@ -408,6 +408,15 @@ module.exports = { )}EventSourceMappingSQS${this.normalizeNameToAlphaNumericOnly(queueName)}`; }, + // MSK + getMSKEventLogicalId(functionName, clusterName, topicName) { + return `${this.getNormalizedFunctionName( + functionName + )}EventSourceMappingMSK${this.normalizeNameToAlphaNumericOnly( + clusterName + )}${this.normalizeNameToAlphaNumericOnly(topicName)}`; + }, + // ALB getAlbTargetGroupLogicalId(functionName, albId, multiValueHeaders) { return `${this.getNormalizedFunctionName(functionName)}Alb${ diff --git a/lib/plugins/aws/lib/naming.test.js b/lib/plugins/aws/lib/naming.test.js index 0615f63bbb0..ffa03ab14c3 100644 --- a/lib/plugins/aws/lib/naming.test.js +++ b/lib/plugins/aws/lib/naming.test.js @@ -719,6 +719,14 @@ describe('#naming()', () => { }); }); + describe('#getMSKEventLogicalId()', () => { + it('should normalize the function name and append normalized cluster and topic names', () => { + expect( + sdk.naming.getMSKEventLogicalId('functionName', 'my-kafka-cluster', 'kafka-topic') + ).to.equal('FunctionNameEventSourceMappingMSKMykafkaclusterKafkatopic'); + }); + }); + describe('#getAlbTargetGroupLogicalId()', () => { it('should normalize the function name', () => { expect(sdk.naming.getAlbTargetGroupLogicalId('functionName', 'abc123')).to.equal( diff --git a/lib/plugins/aws/package/compile/events/msk/index.js b/lib/plugins/aws/package/compile/events/msk/index.js new file mode 100644 index 00000000000..25e77407d1a --- /dev/null +++ b/lib/plugins/aws/package/compile/events/msk/index.js @@ -0,0 +1,178 @@ +'use strict'; + +const _ = require('lodash'); + +class AwsCompileMSKEvents { + constructor(serverless) { + this.serverless = serverless; + this.provider = this.serverless.getProvider('aws'); + + this.hooks = { + 'package:compileEvents': this.compileMSKEvents.bind(this), + }; + + this.serverless.configSchemaHandler.defineFunctionEvent('aws', 'msk', { + type: 'object', + properties: { + arn: { + // TODO: Add possiblity to use Fn::ImportValue and Ref + type: 'string', + // TODO: Should it be even more detailed? + pattern: + '^arn:aws[a-zA-Z-]*:kafka:[a-z]{2}((-gov)|(-iso(b?)))?-[a-z]+-[1-9]{1}:[0-9]{12}:cluster', + }, + batchSize: { + type: 'number', + minimum: 1, + maximum: 10000, + }, + enabled: { + type: 'boolean', + }, + startingPosition: { + type: 'string', + enum: ['LATEST', 'TRIM_HORIZON'], + }, + topic: { + type: 'string', + }, + }, + additionalProperties: false, + required: ['arn', 'topic'], + }); + } + + // TODO: Copied from sqs/stream - see if it can be refactored + resolveDependsOn(funcRole) { + let dependsOn = 'IamRoleLambdaExecution'; + + if (funcRole) { + if ( + // check whether the custom role is an ARN + typeof funcRole === 'string' && + funcRole.indexOf(':') !== -1 + ) { + dependsOn = []; + } else if ( + // otherwise, check if we have an in-service reference to a role ARN + typeof funcRole === 'object' && + 'Fn::GetAtt' in funcRole && + Array.isArray(funcRole['Fn::GetAtt']) && + funcRole['Fn::GetAtt'].length === 2 && + typeof funcRole['Fn::GetAtt'][0] === 'string' && + typeof funcRole['Fn::GetAtt'][1] === 'string' && + funcRole['Fn::GetAtt'][1] === 'Arn' + ) { + dependsOn = funcRole['Fn::GetAtt'][0]; + } else if ( + // otherwise, check if we have an import or parameters ref + typeof funcRole === 'object' && + ('Fn::ImportValue' in funcRole || 'Ref' in funcRole) + ) { + dependsOn = []; + } else if (typeof funcRole === 'string') { + dependsOn = funcRole; + } + } + + return dependsOn; + } + + getMSKClusterName(eventSourceArn) { + // TODO: EVALUATE IF THIS WORKS PROPERLY + if (eventSourceArn['Fn::ImportValue']) { + return eventSourceArn['Fn::ImportValue']; + } else if (eventSourceArn.Ref) { + return eventSourceArn.Ref; + } + + return eventSourceArn.split('/')[1]; + } + + // TODO: Add documentation + compileMSKEvents() { + this.serverless.service.getAllFunctions().forEach(functionName => { + const functionObj = this.serverless.service.getFunction(functionName); + const cfTemplate = this.serverless.service.provider.compiledCloudFormationTemplate; + + if (functionObj.events) { + // It is required to add the following statement in order to be able to connect to MSK cluster + const ec2Statement = { + Effect: 'Allow', + Action: [ + 'ec2:CreateNetworkInterface', + 'ec2:DescribeNetworkInterfaces', + 'ec2:DescribeVpcs', + 'ec2:DeleteNetworkInterface', + 'ec2:DescribeSubnets', + 'ec2:DescribeSecurityGroups', + ], + Resource: '*', + }; + const mskStatement = { + Effect: 'Allow', + Action: ['kafka:DescribeCluster', 'kafka:GetBootstrapBrokers'], + Resource: [], + }; + + functionObj.events.forEach(event => { + if (event.msk) { + const EventSourceArn = event.msk.arn; + const Topic = event.msk.topic; + const BatchSize = event.msk.batchSize || 100; + const Enabled = event.msk.enabled != null ? event.msk.enabled : true; + const StartingPosition = event.msk.startingPosition || 'TRIM_HORIZON'; + + const mskClusterName = this.getMSKClusterName(EventSourceArn); + const mskEventLogicalId = this.provider.naming.getMSKEventLogicalId( + functionName, + mskClusterName, + Topic + ); + + const funcRole = functionObj.role || this.serverless.service.provider.role; + const dependsOn = this.resolveDependsOn(funcRole); + + const lambdaLogicalId = this.provider.naming.getLambdaLogicalId(functionName); + + const mskResource = { + Type: 'AWS::Lambda::EventSourceMapping', + DependsOn: dependsOn, + Properties: { + BatchSize, + EventSourceArn, + FunctionName: { + 'Fn::GetAtt': [lambdaLogicalId, 'Arn'], + }, + StartingPosition, + Enabled, + Topics: [Topic], + }, + }; + + mskStatement.Resource.push(EventSourceArn); + + const newMSKObject = { + [mskEventLogicalId]: mskResource, + }; + + // TODO: Potentially replace _merge with Object.assign + _.merge(cfTemplate.Resources, newMSKObject); + } + }); + + if (cfTemplate.Resources.IamRoleLambdaExecution) { + const statement = + cfTemplate.Resources.IamRoleLambdaExecution.Properties.Policies[0].PolicyDocument + .Statement; + if (mskStatement.Resource.length) { + statement.push(mskStatement); + statement.push(ec2Statement); + } + } + } + }); + } +} + +module.exports = AwsCompileMSKEvents; diff --git a/lib/plugins/aws/package/compile/events/msk/index.test.js b/lib/plugins/aws/package/compile/events/msk/index.test.js new file mode 100644 index 00000000000..840faad9477 --- /dev/null +++ b/lib/plugins/aws/package/compile/events/msk/index.test.js @@ -0,0 +1,171 @@ +'use strict'; + +const AwsCompileMSKEvents = require('./index'); +const Serverless = require('../../../../../../Serverless'); +const AwsProvider = require('../../../../provider/awsProvider'); + +const chai = require('chai'); +chai.use(require('chai-as-promised')); +chai.use(require('sinon-chai')); + +const expect = chai.expect; + +const runServerless = require('../../../../../../../tests/utils/run-serverless'); +const fixtures = require('../../../../../../../tests/fixtures'); + +describe('AwsCompileMSKEvents', () => { + after(fixtures.cleanup); + + const arn = 'arn:aws:kafka:us-east-1:111111111111:cluster/ClusterName/a1a1a1a1a1a1a1a1a'; + const topic = 'TestingTopic'; + + describe('when using default parameters', () => { + let eventSourceMappingResource; + let defaultIamRole; + let naming; + + before(() => + fixtures + .extend('function', { + functions: { + foo: { + events: [ + { + msk: { + topic, + arn, + }, + }, + ], + }, + }, + }) + .then(fixturePath => + runServerless({ + cwd: fixturePath, + cliArgs: ['package'], + }).then(({ awsNaming, cfTemplate }) => { + naming = awsNaming; + eventSourceMappingResource = + cfTemplate.Resources[ + naming.getMSKEventLogicalId('foo', 'ClusterName', 'TestingTopic') + ].Properties; + defaultIamRole = cfTemplate.Resources.IamRoleLambdaExecution; + }) + ) + ); + + it('should correctly compile EventSourceMapping resource', () => { + expect(eventSourceMappingResource).to.deep.equal({ + BatchSize: 100, + Enabled: true, + EventSourceArn: arn, + StartingPosition: 'TRIM_HORIZON', + Topics: [topic], + FunctionName: { + 'Fn::GetAtt': [naming.getLambdaLogicalId('foo'), 'Arn'], + }, + }); + }); + + it('should update default IAM role with MSK statement', () => { + expect(defaultIamRole.Properties.Policies[0].PolicyDocument.Statement).to.deep.include({ + Effect: 'Allow', + Action: ['kafka:DescribeCluster', 'kafka:GetBootstrapBrokers'], + Resource: [arn], + }); + }); + + it('should update default IAM role with EC2 statement', () => { + expect(defaultIamRole.Properties.Policies[0].PolicyDocument.Statement).to.deep.include({ + Effect: 'Allow', + Action: [ + 'ec2:CreateNetworkInterface', + 'ec2:DescribeNetworkInterfaces', + 'ec2:DescribeVpcs', + 'ec2:DeleteNetworkInterface', + 'ec2:DescribeSubnets', + 'ec2:DescribeSecurityGroups', + ], + Resource: '*', + }); + }); + }); + + describe('when using all parameters', () => { + it('should correctly compile EventSourceMapping resource', () => { + const enabled = false; + const startingPosition = 'LATEST'; + const batchSize = 5000; + + return fixtures + .extend('function', { + functions: { + foo: { + events: [ + { + msk: { + topic, + arn, + batchSize, + enabled, + startingPosition, + }, + }, + ], + }, + }, + }) + .then(fixturePath => + runServerless({ + cwd: fixturePath, + cliArgs: ['package'], + }).then(({ awsNaming, cfTemplate }) => { + const resource = + cfTemplate.Resources[ + awsNaming.getMSKEventLogicalId('foo', 'ClusterName', 'TestingTopic') + ].Properties; + expect(resource).to.deep.equal({ + BatchSize: batchSize, + Enabled: enabled, + EventSourceArn: arn, + StartingPosition: startingPosition, + Topics: [topic], + FunctionName: { + 'Fn::GetAtt': [awsNaming.getLambdaLogicalId('foo'), 'Arn'], + }, + }); + }) + ); + }); + }); +}); + +describe('getMSKClusterName', () => { + let awsCompileMSKEvents; + + before(() => { + const serverless = new Serverless(); + serverless.setProvider('aws', new AwsProvider(serverless)); + awsCompileMSKEvents = new AwsCompileMSKEvents(serverless); + }); + + it('with ARN', () => { + const eventSourceArn = + 'arn:aws:kafka:us-east-1:111111111111:cluster/ClusterName/a1a1a1a1a1a1a1a1a'; + const result = awsCompileMSKEvents.getMSKClusterName(eventSourceArn); + expect(result).to.equal('ClusterName'); + }); + + it('with Fn::ImportValue', () => { + const eventSourceArn = { 'Fn::ImportValue': 'importvalue' }; + const result = awsCompileMSKEvents.getMSKClusterName(eventSourceArn); + expect(result).to.equal('importvalue'); + }); + + it('with Ref', () => { + const eventSourceArn = { Ref: 'ReferencedResource' }; + const result = awsCompileMSKEvents.getMSKClusterName(eventSourceArn); + expect(result).to.equal('ReferencedResource'); + }); +}); diff --git a/lib/plugins/index.js b/lib/plugins/index.js index fdd7d41f7a9..79b2145bb5e 100644 --- a/lib/plugins/index.js +++ b/lib/plugins/index.js @@ -40,6 +40,7 @@ module.exports = [ require('./aws/package/compile/events/websockets/index.js'), require('./aws/package/compile/events/sns/index.js'), require('./aws/package/compile/events/stream/index.js'), + require('./aws/package/compile/events/msk/index.js'), require('./aws/package/compile/events/alb/index.js'), require('./aws/package/compile/events/alexaSkill/index.js'), require('./aws/package/compile/events/alexaSmartHome/index.js'), From 3d3e9efcc26d907cc393e5428b0ce25fa8f28a1e Mon Sep 17 00:00:00 2001 From: Piotr Grzesik Date: Mon, 31 Aug 2020 21:33:08 +0200 Subject: [PATCH 02/12] Add docs --- docs/providers/aws/events/msk.md | 79 ++++++++++++++++++++++++++++++++ 1 file changed, 79 insertions(+) create mode 100644 docs/providers/aws/events/msk.md diff --git a/docs/providers/aws/events/msk.md b/docs/providers/aws/events/msk.md new file mode 100644 index 00000000000..322fbd9891a --- /dev/null +++ b/docs/providers/aws/events/msk.md @@ -0,0 +1,79 @@ + + + + +### [Read this on the main serverless docs site](https://www.serverless.com/framework/docs/providers/aws/events/cognito-user-pool) + + + +# MSK + +In the following example, we specify that the `compute` function should be triggered whenever there are new messages available to consume from defined Kafka `topic`. + +In order to configure `msk` event, you have to provide two required properties: `arn`, which represents an ARN of MSK cluster and `topic` to consume messages from. + +The ARN for the MSK cluster can be specified as a string, the reference to the ARN resource by a logical ID, or the import of an ARN that was exported by a different service or CloudFormation stack. + +```yml +functions: + compute: + handler: handler.compute + events: + # These are all possible formats + - msk: + arn: arn:aws:kafka:region:XXXXXX:cluster/MyCluster/xxxx-xxxxx-xxxx + topic: mytopic + - msk: + arn: + Fn::ImportValue: MyExportedMSKClusterArn + topic: mytopic + - msk: + arn: !Ref MyMSKCluster + topic: mytopic +``` + +## Setting the BatchSize and StartingPosition + +For the MSK event integration, you can set the `batchSize`, which effects how many messages can be processed in a single Lambda invocation. The default `batchSize` is 100, and the max `batchSize` is 10000. +In addition, you can also configure `startingPosition`, which controls the position at which Lambda should start consuming messages from MSK topic. It supports two possible values, `TRIM_HORIZON` and `LATEST`, with `TRIM_HORIZON` being the default. + +In the following example, we specify that the `compute` function should have an `msk` event configured with `batchSize` of 1000 and `startingPosition` equal to `LATEST`. + +```yml +functions: + compute: + handler: handler.compute + events: + - msk: + arn: arn:aws:kafka:region:XXXXXX:cluster/MyCluster/xxxx-xxxxx-xxxx + topic: mytopic + batchSize: 1000 + startingPosition: LATEST +``` + +## Enabling and disabling MSK event + +The `msk` event also supports `enabled` parameter, which is used to control if the event source mapping is active. Setting it to `false` will pause polling for and processing new messages. + +In the following example, we specify that the `company` function's `msk` event should be disabled. + +```yml +functions: + compute: + handler: handler.compute + events: + - msk: + arn: arn:aws:kafka:region:XXXXXX:cluster/MyCluster/xxxx-xxxxx-xxxx + topic: mytopic + enabled: false +``` + +## IAM Permissions + +The Serverless Framework will automatically configure the most minimal set of IAM permissions for you. However you can still add additional permissions if you need to. Read the official [AWS documentation](https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html) for more information about IAM Permissions for MSK events. From e7a3fd0949f6fb9e3527d3cc41584f46a1babe77 Mon Sep 17 00:00:00 2001 From: Piotr Grzesik Date: Tue, 1 Sep 2020 18:07:01 +0200 Subject: [PATCH 03/12] Add missing tests + cleanup --- .../aws/package/compile/events/msk/index.js | 22 ++--- .../package/compile/events/msk/index.test.js | 95 ++++++++++++++++--- 2 files changed, 93 insertions(+), 24 deletions(-) diff --git a/lib/plugins/aws/package/compile/events/msk/index.js b/lib/plugins/aws/package/compile/events/msk/index.js index 25e77407d1a..3737d4fc1b7 100644 --- a/lib/plugins/aws/package/compile/events/msk/index.js +++ b/lib/plugins/aws/package/compile/events/msk/index.js @@ -1,7 +1,5 @@ 'use strict'; -const _ = require('lodash'); - class AwsCompileMSKEvents { constructor(serverless) { this.serverless = serverless; @@ -15,11 +13,15 @@ class AwsCompileMSKEvents { type: 'object', properties: { arn: { - // TODO: Add possiblity to use Fn::ImportValue and Ref - type: 'string', - // TODO: Should it be even more detailed? - pattern: - '^arn:aws[a-zA-Z-]*:kafka:[a-z]{2}((-gov)|(-iso(b?)))?-[a-z]+-[1-9]{1}:[0-9]{12}:cluster', + oneOf: [ + { + type: 'string', + pattern: + '^arn:aws[a-zA-Z-]*:kafka:[a-z]{2}((-gov)|(-iso(b?)))?-[a-z]+-[1-9]{1}:[0-9]{12}:cluster', + }, + { $ref: '#/definitions/awsCfImport' }, + { $ref: '#/definitions/awsCfRef' }, + ], }, batchSize: { type: 'number', @@ -42,7 +44,6 @@ class AwsCompileMSKEvents { }); } - // TODO: Copied from sqs/stream - see if it can be refactored resolveDependsOn(funcRole) { let dependsOn = 'IamRoleLambdaExecution'; @@ -79,7 +80,6 @@ class AwsCompileMSKEvents { } getMSKClusterName(eventSourceArn) { - // TODO: EVALUATE IF THIS WORKS PROPERLY if (eventSourceArn['Fn::ImportValue']) { return eventSourceArn['Fn::ImportValue']; } else if (eventSourceArn.Ref) { @@ -89,7 +89,6 @@ class AwsCompileMSKEvents { return eventSourceArn.split('/')[1]; } - // TODO: Add documentation compileMSKEvents() { this.serverless.service.getAllFunctions().forEach(functionName => { const functionObj = this.serverless.service.getFunction(functionName); @@ -156,8 +155,7 @@ class AwsCompileMSKEvents { [mskEventLogicalId]: mskResource, }; - // TODO: Potentially replace _merge with Object.assign - _.merge(cfTemplate.Resources, newMSKObject); + Object.assign(cfTemplate.Resources, newMSKObject); } }); diff --git a/lib/plugins/aws/package/compile/events/msk/index.test.js b/lib/plugins/aws/package/compile/events/msk/index.test.js index 840faad9477..386c0bfb8e2 100644 --- a/lib/plugins/aws/package/compile/events/msk/index.test.js +++ b/lib/plugins/aws/package/compile/events/msk/index.test.js @@ -1,17 +1,15 @@ 'use strict'; +const chai = require('chai'); +const runServerless = require('../../../../../../../test/utils/run-serverless'); +const fixtures = require('../../../../../../../test/fixtures'); const AwsCompileMSKEvents = require('./index'); const Serverless = require('../../../../../../Serverless'); const AwsProvider = require('../../../../provider/awsProvider'); -const chai = require('chai'); -chai.use(require('chai-as-promised')); -chai.use(require('sinon-chai')); +const { expect } = chai; -const expect = chai.expect; - -const runServerless = require('../../../../../../../tests/utils/run-serverless'); -const fixtures = require('../../../../../../../tests/fixtures'); +chai.use(require('chai-as-promised')); describe('AwsCompileMSKEvents', () => { after(fixtures.cleanup); @@ -49,14 +47,14 @@ describe('AwsCompileMSKEvents', () => { eventSourceMappingResource = cfTemplate.Resources[ naming.getMSKEventLogicalId('foo', 'ClusterName', 'TestingTopic') - ].Properties; + ]; defaultIamRole = cfTemplate.Resources.IamRoleLambdaExecution; }) ) ); - it('should correctly compile EventSourceMapping resource', () => { - expect(eventSourceMappingResource).to.deep.equal({ + it('should correctly compile EventSourceMapping resource properties', () => { + expect(eventSourceMappingResource.Properties).to.deep.equal({ BatchSize: 100, Enabled: true, EventSourceArn: arn, @@ -90,6 +88,10 @@ describe('AwsCompileMSKEvents', () => { Resource: '*', }); }); + + it('should correctly compile EventSourceMapping resource DependsOn ', () => { + expect(eventSourceMappingResource.DependsOn).to.equal('IamRoleLambdaExecution'); + }); }); describe('when using all parameters', () => { @@ -124,8 +126,8 @@ describe('AwsCompileMSKEvents', () => { const resource = cfTemplate.Resources[ awsNaming.getMSKEventLogicalId('foo', 'ClusterName', 'TestingTopic') - ].Properties; - expect(resource).to.deep.equal({ + ]; + expect(resource.Properties).to.deep.equal({ BatchSize: batchSize, Enabled: enabled, EventSourceArn: arn, @@ -139,6 +141,35 @@ describe('AwsCompileMSKEvents', () => { ); }); }); + + describe('when no msk events are defined', () => { + it('should not modify the default IAM role', () => { + return runServerless({ + cwd: fixtures.map.function, + cliArgs: ['package'], + }).then(({ cfTemplate }) => { + const defaultIamRole = cfTemplate.Resources.IamRoleLambdaExecution; + expect(defaultIamRole.Properties.Policies[0].PolicyDocument.Statement).not.to.deep.include({ + Effect: 'Allow', + Action: ['kafka:DescribeCluster', 'kafka:GetBootstrapBrokers'], + Resource: [], + }); + + expect(defaultIamRole.Properties.Policies[0].PolicyDocument.Statement).not.to.deep.include({ + Effect: 'Allow', + Action: [ + 'ec2:CreateNetworkInterface', + 'ec2:DescribeNetworkInterfaces', + 'ec2:DescribeVpcs', + 'ec2:DeleteNetworkInterface', + 'ec2:DescribeSubnets', + 'ec2:DescribeSecurityGroups', + ], + Resource: '*', + }); + }); + }); + }); }); describe('getMSKClusterName', () => { @@ -169,3 +200,43 @@ describe('getMSKClusterName', () => { expect(result).to.equal('ReferencedResource'); }); }); + +describe('resolveDependsOn', () => { + let awsCompileMSKEvents; + + before(() => { + const serverless = new Serverless(); + serverless.setProvider('aws', new AwsProvider(serverless)); + awsCompileMSKEvents = new AwsCompileMSKEvents(serverless); + }); + + it('with ARN', () => { + const funcRole = 'arn:aws:iam::xxxxxxxxxxxx:role/xxxx'; + const result = awsCompileMSKEvents.resolveDependsOn(funcRole); + expect(result).to.deep.equal([]); + }); + + it('with string', () => { + const funcRole = 'role'; + const result = awsCompileMSKEvents.resolveDependsOn(funcRole); + expect(result).to.equal(funcRole); + }); + + it('with Fn::ImportValue', () => { + const funcRole = { 'Fn::ImportValue': 'importvalue' }; + const result = awsCompileMSKEvents.resolveDependsOn(funcRole); + expect(result).to.deep.equal([]); + }); + + it('with Ref', () => { + const funcRole = { Ref: 'ReferencedResource' }; + const result = awsCompileMSKEvents.resolveDependsOn(funcRole); + expect(result).to.deep.equal([]); + }); + + it('with Fn::GetAtt reference to role ARN', () => { + const funcRole = { 'Fn::GetAtt': ['Resource', 'Arn'] }; + const result = awsCompileMSKEvents.resolveDependsOn(funcRole); + expect(result).to.equal('Resource'); + }); +}); From 5c2e367305f961c4837df6decc1cfa76456eba91 Mon Sep 17 00:00:00 2001 From: Piotr Grzesik Date: Wed, 2 Sep 2020 16:35:25 +0200 Subject: [PATCH 04/12] chore(aws): apply code review suggestions --- docs/providers/aws/events/msk.md | 2 +- lib/plugins/aws/lib/naming.js | 13 +- .../events/msk/getMskClusterNameToken.js | 13 ++ .../events/msk/getMskClusterNameToken.test.js | 27 +++ .../aws/package/compile/events/msk/index.js | 94 +++------- .../package/compile/events/msk/index.test.js | 165 +++++------------- 6 files changed, 112 insertions(+), 202 deletions(-) create mode 100644 lib/plugins/aws/package/compile/events/msk/getMskClusterNameToken.js create mode 100644 lib/plugins/aws/package/compile/events/msk/getMskClusterNameToken.test.js diff --git a/docs/providers/aws/events/msk.md b/docs/providers/aws/events/msk.md index 322fbd9891a..a5bd06c0118 100644 --- a/docs/providers/aws/events/msk.md +++ b/docs/providers/aws/events/msk.md @@ -8,7 +8,7 @@ layout: Doc -### [Read this on the main serverless docs site](https://www.serverless.com/framework/docs/providers/aws/events/cognito-user-pool) +### [Read this on the main serverless docs site](https://www.serverless.com/framework/docs/providers/aws/events/msk) diff --git a/lib/plugins/aws/lib/naming.js b/lib/plugins/aws/lib/naming.js index 4700a4033d3..60b1da1a8a3 100644 --- a/lib/plugins/aws/lib/naming.js +++ b/lib/plugins/aws/lib/naming.js @@ -410,11 +410,14 @@ module.exports = { // MSK getMSKEventLogicalId(functionName, clusterName, topicName) { - return `${this.getNormalizedFunctionName( - functionName - )}EventSourceMappingMSK${this.normalizeNameToAlphaNumericOnly( - clusterName - )}${this.normalizeNameToAlphaNumericOnly(topicName)}`; + const normalizedFunctionName = this.getNormalizedFunctionName(functionName); + // Both clusterName and topicName are trimmed to 79 chars to avoid going over 255 character limit + const normalizedClusterName = this.normalizeNameToAlphaNumericOnly(clusterName).substring( + 0, + 79 + ); + const normalizedTopicName = this.normalizeNameToAlphaNumericOnly(topicName).substring(0, 79); + return `${normalizedFunctionName}EventSourceMappingMSK${normalizedClusterName}${normalizedTopicName}`; }, // ALB diff --git a/lib/plugins/aws/package/compile/events/msk/getMskClusterNameToken.js b/lib/plugins/aws/package/compile/events/msk/getMskClusterNameToken.js new file mode 100644 index 00000000000..ce0fdd5d37a --- /dev/null +++ b/lib/plugins/aws/package/compile/events/msk/getMskClusterNameToken.js @@ -0,0 +1,13 @@ +'use strict'; + +const getMskClusterNameToken = eventSourceArn => { + if (eventSourceArn['Fn::ImportValue']) { + return eventSourceArn['Fn::ImportValue']; + } else if (eventSourceArn.Ref) { + return eventSourceArn.Ref; + } + + return eventSourceArn.split('/')[1]; +}; + +module.exports = getMskClusterNameToken; diff --git a/lib/plugins/aws/package/compile/events/msk/getMskClusterNameToken.test.js b/lib/plugins/aws/package/compile/events/msk/getMskClusterNameToken.test.js new file mode 100644 index 00000000000..d401dfd71c1 --- /dev/null +++ b/lib/plugins/aws/package/compile/events/msk/getMskClusterNameToken.test.js @@ -0,0 +1,27 @@ +'use strict'; + +const chai = require('chai'); +const getMskClusterNameToken = require('./getMskClusterNameToken'); + +const { expect } = chai; + +describe('getMskClusterNameToken', () => { + it('with ARN', () => { + const eventSourceArn = + 'arn:aws:kafka:us-east-1:111111111111:cluster/ClusterName/a1a1a1a1a1a1a1a1a'; + const result = getMskClusterNameToken(eventSourceArn); + expect(result).to.equal('ClusterName'); + }); + + it('with Fn::ImportValue', () => { + const eventSourceArn = { 'Fn::ImportValue': 'importvalue' }; + const result = getMskClusterNameToken(eventSourceArn); + expect(result).to.equal('importvalue'); + }); + + it('with Ref', () => { + const eventSourceArn = { Ref: 'ReferencedResource' }; + const result = getMskClusterNameToken(eventSourceArn); + expect(result).to.equal('ReferencedResource'); + }); +}); diff --git a/lib/plugins/aws/package/compile/events/msk/index.js b/lib/plugins/aws/package/compile/events/msk/index.js index 3737d4fc1b7..abaf678446b 100644 --- a/lib/plugins/aws/package/compile/events/msk/index.js +++ b/lib/plugins/aws/package/compile/events/msk/index.js @@ -1,5 +1,7 @@ 'use strict'; +const getMskClusterNameToken = require('./getMskClusterNameToken'); + class AwsCompileMSKEvents { constructor(serverless) { this.serverless = serverless; @@ -14,11 +16,7 @@ class AwsCompileMSKEvents { properties: { arn: { oneOf: [ - { - type: 'string', - pattern: - '^arn:aws[a-zA-Z-]*:kafka:[a-z]{2}((-gov)|(-iso(b?)))?-[a-z]+-[1-9]{1}:[0-9]{12}:cluster', - }, + { $ref: '#/definitions/awsArnString' }, { $ref: '#/definitions/awsCfImport' }, { $ref: '#/definitions/awsCfRef' }, ], @@ -44,51 +42,6 @@ class AwsCompileMSKEvents { }); } - resolveDependsOn(funcRole) { - let dependsOn = 'IamRoleLambdaExecution'; - - if (funcRole) { - if ( - // check whether the custom role is an ARN - typeof funcRole === 'string' && - funcRole.indexOf(':') !== -1 - ) { - dependsOn = []; - } else if ( - // otherwise, check if we have an in-service reference to a role ARN - typeof funcRole === 'object' && - 'Fn::GetAtt' in funcRole && - Array.isArray(funcRole['Fn::GetAtt']) && - funcRole['Fn::GetAtt'].length === 2 && - typeof funcRole['Fn::GetAtt'][0] === 'string' && - typeof funcRole['Fn::GetAtt'][1] === 'string' && - funcRole['Fn::GetAtt'][1] === 'Arn' - ) { - dependsOn = funcRole['Fn::GetAtt'][0]; - } else if ( - // otherwise, check if we have an import or parameters ref - typeof funcRole === 'object' && - ('Fn::ImportValue' in funcRole || 'Ref' in funcRole) - ) { - dependsOn = []; - } else if (typeof funcRole === 'string') { - dependsOn = funcRole; - } - } - - return dependsOn; - } - - getMSKClusterName(eventSourceArn) { - if (eventSourceArn['Fn::ImportValue']) { - return eventSourceArn['Fn::ImportValue']; - } else if (eventSourceArn.Ref) { - return eventSourceArn.Ref; - } - - return eventSourceArn.split('/')[1]; - } - compileMSKEvents() { this.serverless.service.getAllFunctions().forEach(functionName => { const functionObj = this.serverless.service.getFunction(functionName); @@ -116,21 +69,20 @@ class AwsCompileMSKEvents { functionObj.events.forEach(event => { if (event.msk) { - const EventSourceArn = event.msk.arn; - const Topic = event.msk.topic; - const BatchSize = event.msk.batchSize || 100; - const Enabled = event.msk.enabled != null ? event.msk.enabled : true; - const StartingPosition = event.msk.startingPosition || 'TRIM_HORIZON'; + const eventSourceArn = event.msk.arn; + const topic = event.msk.topic; + const batchSize = event.msk.batchSize; + const enabled = event.msk.enabled; + const startingPosition = event.msk.startingPosition || 'TRIM_HORIZON'; - const mskClusterName = this.getMSKClusterName(EventSourceArn); + const mskClusterNameToken = getMskClusterNameToken(eventSourceArn); const mskEventLogicalId = this.provider.naming.getMSKEventLogicalId( functionName, - mskClusterName, - Topic + mskClusterNameToken, + topic ); - const funcRole = functionObj.role || this.serverless.service.provider.role; - const dependsOn = this.resolveDependsOn(funcRole); + const dependsOn = this.provider.resolveFunctionIamRoleResourceName(functionObj) || []; const lambdaLogicalId = this.provider.naming.getLambdaLogicalId(functionName); @@ -138,24 +90,26 @@ class AwsCompileMSKEvents { Type: 'AWS::Lambda::EventSourceMapping', DependsOn: dependsOn, Properties: { - BatchSize, - EventSourceArn, + EventSourceArn: eventSourceArn, FunctionName: { 'Fn::GetAtt': [lambdaLogicalId, 'Arn'], }, - StartingPosition, - Enabled, - Topics: [Topic], + StartingPosition: startingPosition, + Topics: [topic], }, }; - mskStatement.Resource.push(EventSourceArn); + if (batchSize) { + mskResource.Properties.BatchSize = batchSize; + } - const newMSKObject = { - [mskEventLogicalId]: mskResource, - }; + if (enabled != null) { + mskResource.Properties.Enabled = enabled; + } + + mskStatement.Resource.push(eventSourceArn); - Object.assign(cfTemplate.Resources, newMSKObject); + cfTemplate.Resources[mskEventLogicalId] = mskResource; } }); diff --git a/lib/plugins/aws/package/compile/events/msk/index.test.js b/lib/plugins/aws/package/compile/events/msk/index.test.js index 386c0bfb8e2..696e8137e86 100644 --- a/lib/plugins/aws/package/compile/events/msk/index.test.js +++ b/lib/plugins/aws/package/compile/events/msk/index.test.js @@ -3,9 +3,6 @@ const chai = require('chai'); const runServerless = require('../../../../../../../test/utils/run-serverless'); const fixtures = require('../../../../../../../test/fixtures'); -const AwsCompileMSKEvents = require('./index'); -const Serverless = require('../../../../../../Serverless'); -const AwsProvider = require('../../../../provider/awsProvider'); const { expect } = chai; @@ -16,9 +13,13 @@ describe('AwsCompileMSKEvents', () => { const arn = 'arn:aws:kafka:us-east-1:111111111111:cluster/ClusterName/a1a1a1a1a1a1a1a1a'; const topic = 'TestingTopic'; + const enabled = false; + const startingPosition = 'LATEST'; + const batchSize = 5000; - describe('when using default parameters', () => { - let eventSourceMappingResource; + describe('when there are msk events defined', () => { + let minimalEventSourceMappingResource; + let allParamsEventSourceMappingResource; let defaultIamRole; let naming; @@ -36,6 +37,19 @@ describe('AwsCompileMSKEvents', () => { }, ], }, + other: { + events: [ + { + msk: { + topic, + arn, + batchSize, + enabled, + startingPosition, + }, + }, + ], + }, }, }) .then(fixturePath => @@ -44,19 +58,21 @@ describe('AwsCompileMSKEvents', () => { cliArgs: ['package'], }).then(({ awsNaming, cfTemplate }) => { naming = awsNaming; - eventSourceMappingResource = + minimalEventSourceMappingResource = cfTemplate.Resources[ naming.getMSKEventLogicalId('foo', 'ClusterName', 'TestingTopic') ]; + allParamsEventSourceMappingResource = + cfTemplate.Resources[ + naming.getMSKEventLogicalId('other', 'ClusterName', 'TestingTopic') + ]; defaultIamRole = cfTemplate.Resources.IamRoleLambdaExecution; }) ) ); - it('should correctly compile EventSourceMapping resource properties', () => { - expect(eventSourceMappingResource.Properties).to.deep.equal({ - BatchSize: 100, - Enabled: true, + it('should correctly compile EventSourceMapping resource properties with minimal configuration', () => { + expect(minimalEventSourceMappingResource.Properties).to.deep.equal({ EventSourceArn: arn, StartingPosition: 'TRIM_HORIZON', Topics: [topic], @@ -90,55 +106,21 @@ describe('AwsCompileMSKEvents', () => { }); it('should correctly compile EventSourceMapping resource DependsOn ', () => { - expect(eventSourceMappingResource.DependsOn).to.equal('IamRoleLambdaExecution'); + expect(minimalEventSourceMappingResource.DependsOn).to.equal('IamRoleLambdaExecution'); + expect(allParamsEventSourceMappingResource.DependsOn).to.equal('IamRoleLambdaExecution'); }); - }); - - describe('when using all parameters', () => { - it('should correctly compile EventSourceMapping resource', () => { - const enabled = false; - const startingPosition = 'LATEST'; - const batchSize = 5000; - return fixtures - .extend('function', { - functions: { - foo: { - events: [ - { - msk: { - topic, - arn, - batchSize, - enabled, - startingPosition, - }, - }, - ], - }, - }, - }) - .then(fixturePath => - runServerless({ - cwd: fixturePath, - cliArgs: ['package'], - }).then(({ awsNaming, cfTemplate }) => { - const resource = - cfTemplate.Resources[ - awsNaming.getMSKEventLogicalId('foo', 'ClusterName', 'TestingTopic') - ]; - expect(resource.Properties).to.deep.equal({ - BatchSize: batchSize, - Enabled: enabled, - EventSourceArn: arn, - StartingPosition: startingPosition, - Topics: [topic], - FunctionName: { - 'Fn::GetAtt': [awsNaming.getLambdaLogicalId('foo'), 'Arn'], - }, - }); - }) - ); + it('should correctly complie EventSourceMapping resource with all parameters', () => { + expect(allParamsEventSourceMappingResource.Properties).to.deep.equal({ + BatchSize: batchSize, + Enabled: enabled, + EventSourceArn: arn, + StartingPosition: startingPosition, + Topics: [topic], + FunctionName: { + 'Fn::GetAtt': [naming.getLambdaLogicalId('other'), 'Arn'], + }, + }); }); }); @@ -171,72 +153,3 @@ describe('AwsCompileMSKEvents', () => { }); }); }); - -describe('getMSKClusterName', () => { - let awsCompileMSKEvents; - - before(() => { - const serverless = new Serverless(); - serverless.setProvider('aws', new AwsProvider(serverless)); - awsCompileMSKEvents = new AwsCompileMSKEvents(serverless); - }); - - it('with ARN', () => { - const eventSourceArn = - 'arn:aws:kafka:us-east-1:111111111111:cluster/ClusterName/a1a1a1a1a1a1a1a1a'; - const result = awsCompileMSKEvents.getMSKClusterName(eventSourceArn); - expect(result).to.equal('ClusterName'); - }); - - it('with Fn::ImportValue', () => { - const eventSourceArn = { 'Fn::ImportValue': 'importvalue' }; - const result = awsCompileMSKEvents.getMSKClusterName(eventSourceArn); - expect(result).to.equal('importvalue'); - }); - - it('with Ref', () => { - const eventSourceArn = { Ref: 'ReferencedResource' }; - const result = awsCompileMSKEvents.getMSKClusterName(eventSourceArn); - expect(result).to.equal('ReferencedResource'); - }); -}); - -describe('resolveDependsOn', () => { - let awsCompileMSKEvents; - - before(() => { - const serverless = new Serverless(); - serverless.setProvider('aws', new AwsProvider(serverless)); - awsCompileMSKEvents = new AwsCompileMSKEvents(serverless); - }); - - it('with ARN', () => { - const funcRole = 'arn:aws:iam::xxxxxxxxxxxx:role/xxxx'; - const result = awsCompileMSKEvents.resolveDependsOn(funcRole); - expect(result).to.deep.equal([]); - }); - - it('with string', () => { - const funcRole = 'role'; - const result = awsCompileMSKEvents.resolveDependsOn(funcRole); - expect(result).to.equal(funcRole); - }); - - it('with Fn::ImportValue', () => { - const funcRole = { 'Fn::ImportValue': 'importvalue' }; - const result = awsCompileMSKEvents.resolveDependsOn(funcRole); - expect(result).to.deep.equal([]); - }); - - it('with Ref', () => { - const funcRole = { Ref: 'ReferencedResource' }; - const result = awsCompileMSKEvents.resolveDependsOn(funcRole); - expect(result).to.deep.equal([]); - }); - - it('with Fn::GetAtt reference to role ARN', () => { - const funcRole = { 'Fn::GetAtt': ['Resource', 'Arn'] }; - const result = awsCompileMSKEvents.resolveDependsOn(funcRole); - expect(result).to.equal('Resource'); - }); -}); From 8021233cdc79d25bc30a7986c932d444e241b957 Mon Sep 17 00:00:00 2001 From: Piotr Grzesik Date: Tue, 15 Sep 2020 17:05:06 +0200 Subject: [PATCH 05/12] Add integration tests for MSK integration --- .gitignore | 3 + test/fixtures/functionMsk/core.js | 43 ++++++ test/fixtures/functionMsk/package.json | 14 ++ test/fixtures/functionMsk/serverless.yml | 17 +++ test/fixtures/functionMsk/utils.js | 22 +++ test/integration/msk/cloudformation.yml | 151 +++++++++++++++++++ test/integration/msk/index.test.js | 137 +++++++++++++++++ test/integration/msk/kafka.server.properties | 2 + 8 files changed, 389 insertions(+) create mode 100644 test/fixtures/functionMsk/core.js create mode 100644 test/fixtures/functionMsk/package.json create mode 100644 test/fixtures/functionMsk/serverless.yml create mode 100644 test/fixtures/functionMsk/utils.js create mode 100644 test/integration/msk/cloudformation.yml create mode 100644 test/integration/msk/index.test.js create mode 100644 test/integration/msk/kafka.server.properties diff --git a/.gitignore b/.gitignore index 3e71f2a2d15..fac841ca046 100755 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,6 @@ /node_modules npm-debug.log /package-lock.json + +/test/fixtures/**/node_modules +/test/fixtures/**/package-lock.json diff --git a/test/fixtures/functionMsk/core.js b/test/fixtures/functionMsk/core.js new file mode 100644 index 00000000000..2d7db774edd --- /dev/null +++ b/test/fixtures/functionMsk/core.js @@ -0,0 +1,43 @@ +'use strict'; + +// NOTE: the `utils.js` file is bundled into the deployment package +// eslint-disable-next-line +const { log } = require('./utils'); + +const { Kafka } = require('kafkajs'); + +function consumer(event, context, callback) { + const functionName = 'consumer'; + const { records } = event; + const messages = Object.values(records)[0].map(record => + Buffer.from(record.value, 'base64').toString() + ); + log(functionName, JSON.stringify(messages)); + return callback(null, event); +} + +async function producer() { + const kafkaBrokers = process.env.BROKER_URLS.split(','); + const kafkaTopic = process.env.TOPIC_NAME; + + const kafka = new Kafka({ + clientId: 'myapp', + brokers: kafkaBrokers, + ssl: true, + }); + + const kafkaProducer = kafka.producer(); + await kafkaProducer.connect(); + await kafkaProducer.send({ + topic: kafkaTopic, + messages: [{ value: 'Hello from MSK Integration test!' }], + }); + + await kafkaProducer.disconnect(); + + return { + statusCode: 200, + }; +} + +module.exports = { producer, consumer }; diff --git a/test/fixtures/functionMsk/package.json b/test/fixtures/functionMsk/package.json new file mode 100644 index 00000000000..ab1a46cd75c --- /dev/null +++ b/test/fixtures/functionMsk/package.json @@ -0,0 +1,14 @@ +{ + "name": "functionMsk", + "version": "1.0.0", + "description": "", + "main": "core.js", + "scripts": { + "test": "echo \"Error: no test specified\" && exit 1" + }, + "author": "", + "license": "MIT", + "dependencies": { + "kafkajs": "^1.13.0" + } +} diff --git a/test/fixtures/functionMsk/serverless.yml b/test/fixtures/functionMsk/serverless.yml new file mode 100644 index 00000000000..85d0254a8ea --- /dev/null +++ b/test/fixtures/functionMsk/serverless.yml @@ -0,0 +1,17 @@ +service: service + +configValidationMode: error + +# VPC and Events configuration is added dynamically during test run +# Because it has to be provisioned separately via CloudFormation stack + +provider: + name: aws + runtime: nodejs12.x + versionFunctions: false + +functions: + producer: + handler: core.producer + consumer: + handler: core.consumer diff --git a/test/fixtures/functionMsk/utils.js b/test/fixtures/functionMsk/utils.js new file mode 100644 index 00000000000..f4a99e00afa --- /dev/null +++ b/test/fixtures/functionMsk/utils.js @@ -0,0 +1,22 @@ +'use strict'; + +const logger = console; + +function getMarkers(functionName) { + return { + start: `--- START ${functionName} ---`, + end: `--- END ${functionName} ---`, + }; +} + +function log(functionName, message) { + const markers = getMarkers(functionName); + logger.log(markers.start); + logger.log(message); + logger.log(markers.end); +} + +module.exports = { + getMarkers, + log, +}; diff --git a/test/integration/msk/cloudformation.yml b/test/integration/msk/cloudformation.yml new file mode 100644 index 00000000000..789fea3667f --- /dev/null +++ b/test/integration/msk/cloudformation.yml @@ -0,0 +1,151 @@ +AWSTemplateFormatVersion: 2010-09-09 + +Parameters: + ClusterName: + Type: String + Description: Name of MSK Cluster + ClusterConfigurationArn: + Type: String + Description: MSK Cluster Configuration ARN + ClusterConfigurationRevision: + Type: Number + Description: MSK Cluster Configuration Revision number + Default: 1 + +Resources: + VPC: + Type: AWS::EC2::VPC + Properties: + CidrBlock: 172.31.0.0/16 + Tags: + - Key: Name + Value: !Ref AWS::StackName + + PublicSubnet: + Type: AWS::EC2::Subnet + Properties: + VpcId: !Ref VPC + AvailabilityZone: + Fn::Select: + - 0 + - Fn::GetAZs: '' + CidrBlock: 172.31.0.0/24 + MapPublicIpOnLaunch: true + + PrivateSubnetA: + Type: AWS::EC2::Subnet + Properties: + VpcId: !Ref VPC + AvailabilityZone: + Fn::Select: + - 0 + - Fn::GetAZs: '' + CidrBlock: 172.31.3.0/24 + MapPublicIpOnLaunch: false + + PrivateSubnetB: + Type: AWS::EC2::Subnet + Properties: + VpcId: !Ref VPC + AvailabilityZone: + Fn::Select: + - 1 + - Fn::GetAZs: '' + CidrBlock: 172.31.2.0/24 + MapPublicIpOnLaunch: false + + InternetGateway: + Type: AWS::EC2::InternetGateway + + GatewayAttachment: + Type: AWS::EC2::VPCGatewayAttachment + Properties: + VpcId: !Ref VPC + InternetGatewayId: !Ref InternetGateway + + PublicRouteTable: + Type: AWS::EC2::RouteTable + Properties: + VpcId: !Ref VPC + + PublicRoute: + Type: AWS::EC2::Route + DependsOn: GatewayAttachment + Properties: + RouteTableId: !Ref PublicRouteTable + DestinationCidrBlock: 0.0.0.0/0 + GatewayId: !Ref InternetGateway + + PublicSubnetRouteTableAssociation: + Type: AWS::EC2::SubnetRouteTableAssociation + Properties: + SubnetId: !Ref PublicSubnet + RouteTableId: !Ref PublicRouteTable + + NatGateway: + Type: AWS::EC2::NatGateway + DependsOn: NatPublicIP + Properties: + AllocationId: !GetAtt NatPublicIP.AllocationId + SubnetId: !Ref PublicSubnet + + NatPublicIP: + Type: AWS::EC2::EIP + DependsOn: VPC + Properties: + Domain: vpc + + PrivateRouteTable: + Type: AWS::EC2::RouteTable + Properties: + VpcId: !Ref VPC + + PrivateRoute: + Type: AWS::EC2::Route + Properties: + RouteTableId: !Ref PrivateRouteTable + DestinationCidrBlock: 0.0.0.0/0 + NatGatewayId: !Ref NatGateway + + PrivateSubnetARouteTableAssociation: + Type: AWS::EC2::SubnetRouteTableAssociation + Properties: + SubnetId: !Ref PrivateSubnetA + RouteTableId: !Ref PrivateRouteTable + + PrivateSubnetBRouteTableAssociation: + Type: AWS::EC2::SubnetRouteTableAssociation + Properties: + SubnetId: !Ref PrivateSubnetB + RouteTableId: !Ref PrivateRouteTable + + MSKCluster: + Type: 'AWS::MSK::Cluster' + Properties: + ClusterName: !Ref ClusterName + KafkaVersion: 2.2.1 + NumberOfBrokerNodes: 2 + BrokerNodeGroupInfo: + InstanceType: kafka.t3.small + ClientSubnets: + - !Ref PrivateSubnetA + - !Ref PrivateSubnetB + StorageInfo: + EBSStorageInfo: + VolumeSize: 1 + ConfigurationInfo: + Arn: !Ref ClusterConfigurationArn + Revision: !Ref ClusterConfigurationRevision + +Outputs: + PrivateSubnetA: + Description: Private Subnet A ID + Value: !Ref PrivateSubnetA + + SecurityGroup: + Description: Default security for Lambda VPC + Value: !GetAtt VPC.DefaultSecurityGroup + + MSKCluster: + Description: Created MSK Cluster + Value: !Ref MSKCluster diff --git a/test/integration/msk/index.test.js b/test/integration/msk/index.test.js new file mode 100644 index 00000000000..1f29565d2b3 --- /dev/null +++ b/test/integration/msk/index.test.js @@ -0,0 +1,137 @@ +'use strict'; + +const path = require('path'); +const { expect } = require('chai'); +const log = require('log').get('serverless:test'); +const fixtures = require('../../fixtures'); +const { confirmCloudWatchLogs } = require('../../utils/misc'); + +const awsRequest = require('@serverless/test/aws-request'); +const fs = require('fs'); +const crypto = require('crypto'); +const { deployService, removeService } = require('../../utils/integration'); + +describe('AWS - MSK Integration Test', function() { + this.timeout(1000 * 60 * 100); // Involves time-taking deploys + let stackName; + let servicePath; + let clusterConfigurationArn; + const stage = 'dev'; + + const suffix = crypto.randomBytes(8).toString('hex'); + const resourcesStackName = `msk-integration-tests-deps-stack-${suffix}`; + const clusterConfName = `msk-cluster-configuration-${suffix}`; + const topicName = `msk-topic-${suffix}`; + const clusterName = `msk-integration-tests-msk-cluster-${suffix}`; + + before(async () => { + const cfnTemplate = fs.readFileSync(path.join(__dirname, 'cloudformation.yml'), 'utf8'); + const kafkaServerProperties = fs.readFileSync(path.join(__dirname, 'kafka.server.properties')); + + log.notice('Creating MSK Cluster configuration...'); + const clusterConfResponse = await awsRequest('Kafka', 'createConfiguration', { + Name: clusterConfName, + ServerProperties: kafkaServerProperties, + KafkaVersions: ['2.2.1'], + }); + + clusterConfigurationArn = clusterConfResponse.Arn; + const clusterConfigurationRevision = clusterConfResponse.LatestRevision.Revision.toString(); + + log.notice('Deploying CloudFormation stack with required resources...'); + await awsRequest('CloudFormation', 'createStack', { + StackName: resourcesStackName, + TemplateBody: cfnTemplate, + Parameters: [ + { ParameterKey: 'ClusterName', ParameterValue: clusterName }, + { ParameterKey: 'ClusterConfigurationArn', ParameterValue: clusterConfigurationArn }, + { + ParameterKey: 'ClusterConfigurationRevision', + ParameterValue: clusterConfigurationRevision, + }, + ], + }); + + const waitForResult = await awsRequest('CloudFormation', 'waitFor', 'stackCreateComplete', { + StackName: resourcesStackName, + }); + + const outputMap = waitForResult.Stacks[0].Outputs.reduce((map, output) => { + map[output.OutputKey] = output.OutputValue; + return map; + }, {}); + + log.notice('Getting MSK Boostrap Brokers URLs...'); + const getBootstrapBrokersResponse = await awsRequest('Kafka', 'getBootstrapBrokers', { + ClusterArn: outputMap.MSKCluster, + }); + const brokerUrls = getBootstrapBrokersResponse.BootstrapBrokerStringTls; + + const serviceData = await fixtures.setup('functionMsk', { + configExt: { + functions: { + producer: { + vpc: { + subnetIds: [outputMap.PrivateSubnetA], + securityGroupIds: [outputMap.SecurityGroup], + }, + environment: { + TOPIC_NAME: topicName, + BROKER_URLS: brokerUrls, + }, + }, + consumer: { + events: [ + { + msk: { + arn: outputMap.MSKCluster, + topic: topicName, + }, + }, + ], + }, + }, + }, + }); + + ({ servicePath } = serviceData); + + const serviceName = serviceData.serviceConfig.service; + stackName = `${serviceName}-${stage}`; + log.notice(`Deploying "${stackName}" service...`); + await deployService(servicePath); + }); + + after(async () => { + log.notice('Removing service...'); + await removeService(servicePath); + log.notice('Removing CloudFormation stack with required resources...'); + await awsRequest('CloudFormation', 'deleteStack', { StackName: resourcesStackName }); + await awsRequest('CloudFormation', 'waitFor', 'stackDeleteComplete', { + StackName: resourcesStackName, + }); + log.notice('Removing MSK Cluster configuration...'); + return awsRequest('Kafka', 'deleteConfiguration', { + Arn: clusterConfigurationArn, + }); + }); + + it('correctly processes messages from MSK topic', async () => { + const functionName = 'consumer'; + const message = 'Hello from MSK Integration test!'; + + return confirmCloudWatchLogs( + `/aws/lambda/${stackName}-${functionName}`, + async () => + await awsRequest('Lambda', 'invoke', { + FunctionName: `${stackName}-producer`, + InvocationType: 'RequestResponse', + }), + { timeout: 120 * 1000 } + ).then(events => { + const logs = events.reduce((data, event) => data + event.message, ''); + expect(logs).to.include(functionName); + expect(logs).to.include(message); + }); + }); +}); diff --git a/test/integration/msk/kafka.server.properties b/test/integration/msk/kafka.server.properties new file mode 100644 index 00000000000..435f23ad9da --- /dev/null +++ b/test/integration/msk/kafka.server.properties @@ -0,0 +1,2 @@ +auto.create.topics.enable=true +default.replication.factor=2 From 7afdcbf5fd170992c2bf8913f33fb2487fbdd96a Mon Sep 17 00:00:00 2001 From: Piotr Grzesik Date: Tue, 15 Sep 2020 17:26:35 +0200 Subject: [PATCH 06/12] Fix previous tests --- .../package/compile/events/msk/index.test.js | 38 +++++++------------ test/fixtures/functionMsk/core.js | 2 + 2 files changed, 16 insertions(+), 24 deletions(-) diff --git a/lib/plugins/aws/package/compile/events/msk/index.test.js b/lib/plugins/aws/package/compile/events/msk/index.test.js index 696e8137e86..224e41d4f25 100644 --- a/lib/plugins/aws/package/compile/events/msk/index.test.js +++ b/lib/plugins/aws/package/compile/events/msk/index.test.js @@ -2,15 +2,12 @@ const chai = require('chai'); const runServerless = require('../../../../../../../test/utils/run-serverless'); -const fixtures = require('../../../../../../../test/fixtures'); const { expect } = chai; chai.use(require('chai-as-promised')); describe('AwsCompileMSKEvents', () => { - after(fixtures.cleanup); - const arn = 'arn:aws:kafka:us-east-1:111111111111:cluster/ClusterName/a1a1a1a1a1a1a1a1a'; const topic = 'TestingTopic'; const enabled = false; @@ -24,8 +21,9 @@ describe('AwsCompileMSKEvents', () => { let naming; before(() => - fixtures - .extend('function', { + runServerless({ + fixture: 'function', + configExt: { functions: { foo: { events: [ @@ -51,24 +49,16 @@ describe('AwsCompileMSKEvents', () => { ], }, }, - }) - .then(fixturePath => - runServerless({ - cwd: fixturePath, - cliArgs: ['package'], - }).then(({ awsNaming, cfTemplate }) => { - naming = awsNaming; - minimalEventSourceMappingResource = - cfTemplate.Resources[ - naming.getMSKEventLogicalId('foo', 'ClusterName', 'TestingTopic') - ]; - allParamsEventSourceMappingResource = - cfTemplate.Resources[ - naming.getMSKEventLogicalId('other', 'ClusterName', 'TestingTopic') - ]; - defaultIamRole = cfTemplate.Resources.IamRoleLambdaExecution; - }) - ) + }, + cliArgs: ['package'], + }).then(({ awsNaming, cfTemplate }) => { + naming = awsNaming; + minimalEventSourceMappingResource = + cfTemplate.Resources[naming.getMSKEventLogicalId('foo', 'ClusterName', 'TestingTopic')]; + allParamsEventSourceMappingResource = + cfTemplate.Resources[naming.getMSKEventLogicalId('other', 'ClusterName', 'TestingTopic')]; + defaultIamRole = cfTemplate.Resources.IamRoleLambdaExecution; + }) ); it('should correctly compile EventSourceMapping resource properties with minimal configuration', () => { @@ -127,7 +117,7 @@ describe('AwsCompileMSKEvents', () => { describe('when no msk events are defined', () => { it('should not modify the default IAM role', () => { return runServerless({ - cwd: fixtures.map.function, + fixture: 'function', cliArgs: ['package'], }).then(({ cfTemplate }) => { const defaultIamRole = cfTemplate.Resources.IamRoleLambdaExecution; diff --git a/test/fixtures/functionMsk/core.js b/test/fixtures/functionMsk/core.js index 2d7db774edd..6c7389e4a08 100644 --- a/test/fixtures/functionMsk/core.js +++ b/test/fixtures/functionMsk/core.js @@ -4,6 +4,8 @@ // eslint-disable-next-line const { log } = require('./utils'); +// NOTE: `kafkajs` is bundled into the deployment package +// eslint-disable-next-line const { Kafka } = require('kafkajs'); function consumer(event, context, callback) { From 6413b4e29f3da9bc4ac6d3b25742dedf80b48ce9 Mon Sep 17 00:00:00 2001 From: Piotr Grzesik Date: Wed, 16 Sep 2020 21:12:19 +0200 Subject: [PATCH 07/12] Add removal of "dangling" ENIs from MSK event integration --- test/integration/msk/cloudformation.yml | 4 ++++ test/integration/msk/index.test.js | 23 ++++++++++++++++++++++- 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/test/integration/msk/cloudformation.yml b/test/integration/msk/cloudformation.yml index 789fea3667f..812e2d75bcf 100644 --- a/test/integration/msk/cloudformation.yml +++ b/test/integration/msk/cloudformation.yml @@ -138,6 +138,10 @@ Resources: Revision: !Ref ClusterConfigurationRevision Outputs: + VPC: + Description: VPC ID + Value: !Ref VPC + PrivateSubnetA: Description: Private Subnet A ID Value: !Ref PrivateSubnetA diff --git a/test/integration/msk/index.test.js b/test/integration/msk/index.test.js index 1f29565d2b3..d5df3ef5690 100644 --- a/test/integration/msk/index.test.js +++ b/test/integration/msk/index.test.js @@ -16,6 +16,7 @@ describe('AWS - MSK Integration Test', function() { let stackName; let servicePath; let clusterConfigurationArn; + let outputMap; const stage = 'dev'; const suffix = crypto.randomBytes(8).toString('hex'); @@ -56,7 +57,7 @@ describe('AWS - MSK Integration Test', function() { StackName: resourcesStackName, }); - const outputMap = waitForResult.Stacks[0].Outputs.reduce((map, output) => { + outputMap = waitForResult.Stacks[0].Outputs.reduce((map, output) => { map[output.OutputKey] = output.OutputValue; return map; }, {}); @@ -105,6 +106,26 @@ describe('AWS - MSK Integration Test', function() { after(async () => { log.notice('Removing service...'); await removeService(servicePath); + log.notice('Removing leftover ENI...'); + const describeResponse = await awsRequest('EC2', 'describeNetworkInterfaces', { + Filters: [ + { + Name: 'vpc-id', + Values: [outputMap.VPC], + }, + { + Name: 'status', + Values: ['available'], + }, + ], + }); + await Promise.all( + describeResponse.NetworkInterfaces.map(networkInterface => + awsRequest('EC2', 'deleteNetworkInterface', { + NetworkInterfaceId: networkInterface.NetworkInterfaceId, + }) + ) + ); log.notice('Removing CloudFormation stack with required resources...'); await awsRequest('CloudFormation', 'deleteStack', { StackName: resourcesStackName }); await awsRequest('CloudFormation', 'waitFor', 'stackDeleteComplete', { From 6808be849581a015b8ac2aa0176f26c07d291d10 Mon Sep 17 00:00:00 2001 From: Piotr Grzesik Date: Fri, 18 Sep 2020 14:02:40 +0200 Subject: [PATCH 08/12] Introduce test based on setup/teardown infrastructure --- package.json | 2 + .../msk => scripts/test}/cloudformation.yml | 0 .../test}/kafka.server.properties | 0 scripts/test/setup-integration-infra.js | 64 +++++++ scripts/test/teardown-integration-infra.js | 55 ++++++ test/README.md | 13 ++ test/integration/msk.test.js | 101 +++++++++++ test/integration/msk/index.test.js | 158 ------------------ test/utils/cludformation.js | 43 +++++ 9 files changed, 278 insertions(+), 158 deletions(-) rename {test/integration/msk => scripts/test}/cloudformation.yml (100%) rename {test/integration/msk => scripts/test}/kafka.server.properties (100%) create mode 100755 scripts/test/setup-integration-infra.js create mode 100755 scripts/test/teardown-integration-infra.js create mode 100644 test/integration/msk.test.js delete mode 100644 test/integration/msk/index.test.js diff --git a/package.json b/package.json index 7c21bcc975d..08692608076 100644 --- a/package.json +++ b/package.json @@ -186,6 +186,8 @@ "integration-test-run-all": "mocha-isolated --pass-through-aws-creds --skip-fs-cleanup-check --max-workers=20 \"test/integration/**/*.test.js\"", "integration-test-run-basic": "mocha test/integrationBasic.test.js", "integration-test-run-package": "mocha-isolated --skip-fs-cleanup-check test/integrationPackage/**/*.tests.js", + "integration-test-setup-infrastructure": "node ./scripts/test/setup-integration-infra.js", + "integration-test-teardown-infrastructure": "node ./scripts/test/teardown-integration-infra.js", "lint": "eslint .", "lint:updated": "pipe-git-updated --ext=js -- eslint", "pkg:build": "node ./scripts/pkg/build.js", diff --git a/test/integration/msk/cloudformation.yml b/scripts/test/cloudformation.yml similarity index 100% rename from test/integration/msk/cloudformation.yml rename to scripts/test/cloudformation.yml diff --git a/test/integration/msk/kafka.server.properties b/scripts/test/kafka.server.properties similarity index 100% rename from test/integration/msk/kafka.server.properties rename to scripts/test/kafka.server.properties diff --git a/scripts/test/setup-integration-infra.js b/scripts/test/setup-integration-infra.js new file mode 100755 index 00000000000..e89036297d3 --- /dev/null +++ b/scripts/test/setup-integration-infra.js @@ -0,0 +1,64 @@ +'use strict'; + +const awsRequest = require('@serverless/test/aws-request'); +const fs = require('fs'); +const path = require('path'); +const { SHARED_INFRA_TESTS_CLOUDFORMATION_STACK } = require('../../test/utils/cludformation'); + +(async () => { + process.stdout.write('Starting setup of integration infrastructure...\n'); + const cfnTemplate = fs.readFileSync(path.join(__dirname, 'cloudformation.yml'), 'utf8'); + const kafkaServerProperties = fs.readFileSync(path.join(__dirname, 'kafka.server.properties')); + + process.stdout.write('Checking if integration tests CloudFormation stack already exists...\n'); + try { + await awsRequest('CloudFormation', 'describeStacks', { + StackName: SHARED_INFRA_TESTS_CLOUDFORMATION_STACK, + }); + process.stdout.write('Integration tests CloudFormation stack already exists. Quitting.\n'); + return; + } catch (e) { + process.stdout.write('Integration tests CloudFormation does not exist. Continuing.\n'); + } + + const clusterName = 'integration-tests-msk-cluster'; + const clusterConfName = 'integration-tests-msk-cluster-configuration'; + + process.stdout.write('Creating MSK Cluster configuration...\n'); + let clusterConfResponse; + try { + clusterConfResponse = await awsRequest('Kafka', 'createConfiguration', { + Name: clusterConfName, + ServerProperties: kafkaServerProperties, + KafkaVersions: ['2.2.1'], + }); + } catch (e) { + process.stdout.write( + `Error: ${e} while trying to create MSK Cluster configuration. Quitting. \n` + ); + return; + } + + const clusterConfigurationArn = clusterConfResponse.Arn; + const clusterConfigurationRevision = clusterConfResponse.LatestRevision.Revision.toString(); + + process.stdout.write('Deploying integration tests CloudFormation stack...\n'); + await awsRequest('CloudFormation', 'createStack', { + StackName: SHARED_INFRA_TESTS_CLOUDFORMATION_STACK, + TemplateBody: cfnTemplate, + Parameters: [ + { ParameterKey: 'ClusterName', ParameterValue: clusterName }, + { ParameterKey: 'ClusterConfigurationArn', ParameterValue: clusterConfigurationArn }, + { + ParameterKey: 'ClusterConfigurationRevision', + ParameterValue: clusterConfigurationRevision, + }, + ], + }); + + await awsRequest('CloudFormation', 'waitFor', 'stackCreateComplete', { + StackName: SHARED_INFRA_TESTS_CLOUDFORMATION_STACK, + }); + process.stdout.write('Deployed integration tests CloudFormation stack!\n'); + process.stdout.write('Setup of integration infrastructure finished\n'); +})(); diff --git a/scripts/test/teardown-integration-infra.js b/scripts/test/teardown-integration-infra.js new file mode 100755 index 00000000000..eb7810e9531 --- /dev/null +++ b/scripts/test/teardown-integration-infra.js @@ -0,0 +1,55 @@ +'use strict'; + +const awsRequest = require('@serverless/test/aws-request'); +const { + SHARED_INFRA_TESTS_CLOUDFORMATION_STACK, + getDependencyStackOutputMap, +} = require('../../test/utils/cludformation'); + +(async () => { + process.stdout.write('Starting teardown of integration infrastructure...\n'); + const describeClustersResponse = await awsRequest('Kafka', 'listClusters'); + const clusterConfArn = + describeClustersResponse.ClusterInfoList[0].CurrentBrokerSoftwareInfo.ConfigurationArn; + + const outputMap = await getDependencyStackOutputMap(); + + process.stdout.write('Removing leftover ENI...\n'); + const describeResponse = await awsRequest('EC2', 'describeNetworkInterfaces', { + Filters: [ + { + Name: 'vpc-id', + Values: [outputMap.VPC], + }, + { + Name: 'status', + Values: ['available'], + }, + ], + }); + try { + await Promise.all( + describeResponse.NetworkInterfaces.map(networkInterface => + awsRequest('EC2', 'deleteNetworkInterface', { + NetworkInterfaceId: networkInterface.NetworkInterfaceId, + }) + ) + ); + } catch (e) { + process.stdout.write(`Error: ${e} while trying to remove leftover ENIs\n`); + } + process.stdout.write('Removing integration tests CloudFormation stack...\n'); + await awsRequest('CloudFormation', 'deleteStack', { + StackName: SHARED_INFRA_TESTS_CLOUDFORMATION_STACK, + }); + await awsRequest('CloudFormation', 'waitFor', 'stackDeleteComplete', { + StackName: SHARED_INFRA_TESTS_CLOUDFORMATION_STACK, + }); + process.stdout.write('Removed integration tests CloudFormation stack!\n'); + process.stdout.write('Removing MSK Cluster configuration...\n'); + await awsRequest('Kafka', 'deleteConfiguration', { + Arn: clusterConfArn, + }); + process.stdout.write('Removed MSK Cluster configuration\n'); + process.stdout.write('Teardown of integration infrastructure finished\n'); +})(); diff --git a/test/README.md b/test/README.md index 77d22f09c8b..9c963f82af0 100644 --- a/test/README.md +++ b/test/README.md @@ -60,6 +60,19 @@ Pass test file to Mocha directly as follows AWS_ACCESS_KEY_ID=XXX AWS_SECRET_ACCESS_KEY=xxx npx mocha tests/integration/{chosen}.test.js ``` +### Tests that depend on shared infrastructure stack + +Due to the fact that some of the tests require a bit more complex infrastructure setup which might be lengthy, two additional commands has been made available: + +- `integration-test-setup-infrastructure` - used for setting up all needed intrastructure dependencies +- `integration-test-teardown-infrastructure` - used for tearing down the infrastructure setup by the above command + +Such tests take advantage of `isDependencyStackAvailable` util to check if all needed dependencies are ready. If not, it skips the given test suite. + +Examples of such tests: + +- [MSK]('./integration/msk.test.js') + ## Testing templates If you add a new template or want to test a template after changing it you can run the template integration tests. Make sure you have `docker` and `docker-compose` installed as they are required. The `docker` containers we're using through compose are automatically including your `$HOME/.aws` folder so you can deploy to AWS. diff --git a/test/integration/msk.test.js b/test/integration/msk.test.js new file mode 100644 index 00000000000..d65f20d6d14 --- /dev/null +++ b/test/integration/msk.test.js @@ -0,0 +1,101 @@ +'use strict'; + +const { expect } = require('chai'); +const log = require('log').get('serverless:test'); +const fixtures = require('../fixtures'); +const { confirmCloudWatchLogs } = require('../utils/misc'); +const { + isDependencyStackAvailable, + getDependencyStackOutputMap, +} = require('../utils/cludformation'); + +const awsRequest = require('@serverless/test/aws-request'); +const crypto = require('crypto'); +const { deployService, removeService } = require('../utils/integration'); + +describe('AWS - MSK Integration Test', function() { + this.timeout(1000 * 60 * 100); // Involves time-taking deploys + let stackName; + let servicePath; + const stage = 'dev'; + + const topicName = `msk-topic-${crypto.randomBytes(8).toString('hex')}`; + + before(async function beforeHook() { + const isDepsStackAvailable = await isDependencyStackAvailable(); + if (!isDepsStackAvailable) { + log.notice( + 'CloudFormation stack with integration test dependencies not found. Skipping test.' + ); + this.skip(); + } + + const outputMap = await getDependencyStackOutputMap(); + + log.notice('Getting MSK Boostrap Brokers URLs...'); + const getBootstrapBrokersResponse = await awsRequest('Kafka', 'getBootstrapBrokers', { + ClusterArn: outputMap.MSKCluster, + }); + const brokerUrls = getBootstrapBrokersResponse.BootstrapBrokerStringTls; + + const serviceData = await fixtures.setup('functionMsk', { + configExt: { + functions: { + producer: { + vpc: { + subnetIds: [outputMap.PrivateSubnetA], + securityGroupIds: [outputMap.SecurityGroup], + }, + environment: { + TOPIC_NAME: topicName, + BROKER_URLS: brokerUrls, + }, + }, + consumer: { + events: [ + { + msk: { + arn: outputMap.MSKCluster, + topic: topicName, + }, + }, + ], + }, + }, + }, + }); + + ({ servicePath } = serviceData); + + const serviceName = serviceData.serviceConfig.service; + stackName = `${serviceName}-${stage}`; + log.notice(`Deploying "${stackName}" service...`); + await deployService(servicePath); + }); + + after(async () => { + if (servicePath) { + log.notice('Removing service...'); + await removeService(servicePath); + } + }); + + it('correctly processes messages from MSK topic', async () => { + const functionName = 'consumer'; + const message = 'Hello from MSK Integration test!'; + + return confirmCloudWatchLogs( + `/aws/lambda/${stackName}-${functionName}`, + async () => + await awsRequest('Lambda', 'invoke', { + FunctionName: `${stackName}-producer`, + InvocationType: 'RequestResponse', + }), + { timeout: 120 * 1000 } + ).then(events => { + const logs = events.reduce((data, event) => data + event.message, ''); + expect(logs).to.include(functionName); + expect(logs).to.include(message); + }); + }); +}); diff --git a/test/integration/msk/index.test.js b/test/integration/msk/index.test.js deleted file mode 100644 index d5df3ef5690..00000000000 --- a/test/integration/msk/index.test.js +++ /dev/null @@ -1,158 +0,0 @@ -'use strict'; - -const path = require('path'); -const { expect } = require('chai'); -const log = require('log').get('serverless:test'); -const fixtures = require('../../fixtures'); -const { confirmCloudWatchLogs } = require('../../utils/misc'); - -const awsRequest = require('@serverless/test/aws-request'); -const fs = require('fs'); -const crypto = require('crypto'); -const { deployService, removeService } = require('../../utils/integration'); - -describe('AWS - MSK Integration Test', function() { - this.timeout(1000 * 60 * 100); // Involves time-taking deploys - let stackName; - let servicePath; - let clusterConfigurationArn; - let outputMap; - const stage = 'dev'; - - const suffix = crypto.randomBytes(8).toString('hex'); - const resourcesStackName = `msk-integration-tests-deps-stack-${suffix}`; - const clusterConfName = `msk-cluster-configuration-${suffix}`; - const topicName = `msk-topic-${suffix}`; - const clusterName = `msk-integration-tests-msk-cluster-${suffix}`; - - before(async () => { - const cfnTemplate = fs.readFileSync(path.join(__dirname, 'cloudformation.yml'), 'utf8'); - const kafkaServerProperties = fs.readFileSync(path.join(__dirname, 'kafka.server.properties')); - - log.notice('Creating MSK Cluster configuration...'); - const clusterConfResponse = await awsRequest('Kafka', 'createConfiguration', { - Name: clusterConfName, - ServerProperties: kafkaServerProperties, - KafkaVersions: ['2.2.1'], - }); - - clusterConfigurationArn = clusterConfResponse.Arn; - const clusterConfigurationRevision = clusterConfResponse.LatestRevision.Revision.toString(); - - log.notice('Deploying CloudFormation stack with required resources...'); - await awsRequest('CloudFormation', 'createStack', { - StackName: resourcesStackName, - TemplateBody: cfnTemplate, - Parameters: [ - { ParameterKey: 'ClusterName', ParameterValue: clusterName }, - { ParameterKey: 'ClusterConfigurationArn', ParameterValue: clusterConfigurationArn }, - { - ParameterKey: 'ClusterConfigurationRevision', - ParameterValue: clusterConfigurationRevision, - }, - ], - }); - - const waitForResult = await awsRequest('CloudFormation', 'waitFor', 'stackCreateComplete', { - StackName: resourcesStackName, - }); - - outputMap = waitForResult.Stacks[0].Outputs.reduce((map, output) => { - map[output.OutputKey] = output.OutputValue; - return map; - }, {}); - - log.notice('Getting MSK Boostrap Brokers URLs...'); - const getBootstrapBrokersResponse = await awsRequest('Kafka', 'getBootstrapBrokers', { - ClusterArn: outputMap.MSKCluster, - }); - const brokerUrls = getBootstrapBrokersResponse.BootstrapBrokerStringTls; - - const serviceData = await fixtures.setup('functionMsk', { - configExt: { - functions: { - producer: { - vpc: { - subnetIds: [outputMap.PrivateSubnetA], - securityGroupIds: [outputMap.SecurityGroup], - }, - environment: { - TOPIC_NAME: topicName, - BROKER_URLS: brokerUrls, - }, - }, - consumer: { - events: [ - { - msk: { - arn: outputMap.MSKCluster, - topic: topicName, - }, - }, - ], - }, - }, - }, - }); - - ({ servicePath } = serviceData); - - const serviceName = serviceData.serviceConfig.service; - stackName = `${serviceName}-${stage}`; - log.notice(`Deploying "${stackName}" service...`); - await deployService(servicePath); - }); - - after(async () => { - log.notice('Removing service...'); - await removeService(servicePath); - log.notice('Removing leftover ENI...'); - const describeResponse = await awsRequest('EC2', 'describeNetworkInterfaces', { - Filters: [ - { - Name: 'vpc-id', - Values: [outputMap.VPC], - }, - { - Name: 'status', - Values: ['available'], - }, - ], - }); - await Promise.all( - describeResponse.NetworkInterfaces.map(networkInterface => - awsRequest('EC2', 'deleteNetworkInterface', { - NetworkInterfaceId: networkInterface.NetworkInterfaceId, - }) - ) - ); - log.notice('Removing CloudFormation stack with required resources...'); - await awsRequest('CloudFormation', 'deleteStack', { StackName: resourcesStackName }); - await awsRequest('CloudFormation', 'waitFor', 'stackDeleteComplete', { - StackName: resourcesStackName, - }); - log.notice('Removing MSK Cluster configuration...'); - return awsRequest('Kafka', 'deleteConfiguration', { - Arn: clusterConfigurationArn, - }); - }); - - it('correctly processes messages from MSK topic', async () => { - const functionName = 'consumer'; - const message = 'Hello from MSK Integration test!'; - - return confirmCloudWatchLogs( - `/aws/lambda/${stackName}-${functionName}`, - async () => - await awsRequest('Lambda', 'invoke', { - FunctionName: `${stackName}-producer`, - InvocationType: 'RequestResponse', - }), - { timeout: 120 * 1000 } - ).then(events => { - const logs = events.reduce((data, event) => data + event.message, ''); - expect(logs).to.include(functionName); - expect(logs).to.include(message); - }); - }); -}); diff --git a/test/utils/cludformation.js b/test/utils/cludformation.js index d84c9cd8b07..3bad72af176 100644 --- a/test/utils/cludformation.js +++ b/test/utils/cludformation.js @@ -2,6 +2,8 @@ const awsRequest = require('@serverless/test/aws-request'); +const SHARED_INFRA_TESTS_CLOUDFORMATION_STACK = 'integration-tests-deps-stack'; + function findStacks(name, status) { const params = {}; if (status) { @@ -57,9 +59,50 @@ function listStacks(status) { return awsRequest('CloudFormation', 'listStacks', params); } +async function doesStackWithNameAndStatusExists(name, status) { + try { + const describeStacksResponse = await awsRequest('CloudFormation', 'describeStacks', { + StackName: name, + }); + if (describeStacksResponse.Stacks[0].StackStatus === status) { + return true; + } + return false; + } catch (e) { + return false; + } +} + +async function getStackOutputMap(name) { + const describeStackResponse = await awsRequest('CloudFormation', 'describeStacks', { + StackName: name, + }); + + return describeStackResponse.Stacks[0].Outputs.reduce((map, output) => { + map[output.OutputKey] = output.OutputValue; + return map; + }, {}); +} + +async function isDependencyStackAvailable() { + return doesStackWithNameAndStatusExists( + SHARED_INFRA_TESTS_CLOUDFORMATION_STACK, + 'CREATE_COMPLETE' + ); +} + +async function getDependencyStackOutputMap() { + return getStackOutputMap(SHARED_INFRA_TESTS_CLOUDFORMATION_STACK); +} + module.exports = { findStacks, deleteStack, listStackResources, listStacks, + doesStackWithNameAndStatusExists, + getStackOutputMap, + SHARED_INFRA_TESTS_CLOUDFORMATION_STACK, + isDependencyStackAvailable, + getDependencyStackOutputMap, }; From f769789643fd7579f2c70b4c507c3846cf9b8817 Mon Sep 17 00:00:00 2001 From: Piotr Grzesik Date: Fri, 18 Sep 2020 14:06:11 +0200 Subject: [PATCH 09/12] update link to example test --- test/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/README.md b/test/README.md index 9c963f82af0..0a14ca6006c 100644 --- a/test/README.md +++ b/test/README.md @@ -71,7 +71,7 @@ Such tests take advantage of `isDependencyStackAvailable` util to check if all n Examples of such tests: -- [MSK]('./integration/msk.test.js') +- [MSK](./integration/msk.test.js) ## Testing templates From 21fe37e9dccfa0659ace57f14d822f7590db3ee6 Mon Sep 17 00:00:00 2001 From: Piotr Grzesik Date: Mon, 21 Sep 2020 16:51:45 +0200 Subject: [PATCH 10/12] chore: minor adjustments --- .gitignore | 3 - docs/providers/aws/events/msk.md | 5 ++ lib/plugins/aws/lib/naming.js | 7 +- .../package/compile/events/msk/index.test.js | 63 +++++++++-------- package.json | 5 +- .../cloudformation.yml | 0 scripts/test/integration-setup/index.js | 68 +++++++++++++++++++ .../kafka.server.properties | 0 ...ation-infra.js => integration-teardown.js} | 24 ++++--- scripts/test/setup-integration-infra.js | 64 ----------------- test/fixtures/functionMsk/core.js | 7 +- test/fixtures/functionMsk/package.json | 9 --- test/fixtures/functionMsk/serverless.yml | 1 + test/fixtures/functionMsk/utils.js | 22 ------ .../{ => infra-dependent}/msk.test.js | 37 +++++----- test/utils/cludformation.js | 14 ++-- 16 files changed, 152 insertions(+), 177 deletions(-) rename scripts/test/{ => integration-setup}/cloudformation.yml (100%) create mode 100755 scripts/test/integration-setup/index.js rename scripts/test/{ => integration-setup}/kafka.server.properties (100%) rename scripts/test/{teardown-integration-infra.js => integration-teardown.js} (66%) delete mode 100755 scripts/test/setup-integration-infra.js delete mode 100644 test/fixtures/functionMsk/utils.js rename test/integration/{ => infra-dependent}/msk.test.js (70%) diff --git a/.gitignore b/.gitignore index fac841ca046..3e71f2a2d15 100755 --- a/.gitignore +++ b/.gitignore @@ -5,6 +5,3 @@ /node_modules npm-debug.log /package-lock.json - -/test/fixtures/**/node_modules -/test/fixtures/**/package-lock.json diff --git a/docs/providers/aws/events/msk.md b/docs/providers/aws/events/msk.md index a5bd06c0118..64407a51580 100644 --- a/docs/providers/aws/events/msk.md +++ b/docs/providers/aws/events/msk.md @@ -14,6 +14,11 @@ layout: Doc # MSK +[Amazon Managed Streaming for Apache Kafka (Amazon MSK)](https://aws.amazon.com/msk/) is a fully managed streaming service that uses Apache Kafka. +Amazon MSK can be used as event source for Lambda, which allows Lambda service to internally poll it for new messages and invoke corresponding Lambda functions. + +## Simple event definition + In the following example, we specify that the `compute` function should be triggered whenever there are new messages available to consume from defined Kafka `topic`. In order to configure `msk` event, you have to provide two required properties: `arn`, which represents an ARN of MSK cluster and `topic` to consume messages from. diff --git a/lib/plugins/aws/lib/naming.js b/lib/plugins/aws/lib/naming.js index 60b1da1a8a3..4005ec742b9 100644 --- a/lib/plugins/aws/lib/naming.js +++ b/lib/plugins/aws/lib/naming.js @@ -412,11 +412,8 @@ module.exports = { getMSKEventLogicalId(functionName, clusterName, topicName) { const normalizedFunctionName = this.getNormalizedFunctionName(functionName); // Both clusterName and topicName are trimmed to 79 chars to avoid going over 255 character limit - const normalizedClusterName = this.normalizeNameToAlphaNumericOnly(clusterName).substring( - 0, - 79 - ); - const normalizedTopicName = this.normalizeNameToAlphaNumericOnly(topicName).substring(0, 79); + const normalizedClusterName = this.normalizeNameToAlphaNumericOnly(clusterName).slice(0, 79); + const normalizedTopicName = this.normalizeNameToAlphaNumericOnly(topicName).slice(0, 79); return `${normalizedFunctionName}EventSourceMappingMSK${normalizedClusterName}${normalizedTopicName}`; }, diff --git a/lib/plugins/aws/package/compile/events/msk/index.test.js b/lib/plugins/aws/package/compile/events/msk/index.test.js index 224e41d4f25..be4f87b541d 100644 --- a/lib/plugins/aws/package/compile/events/msk/index.test.js +++ b/lib/plugins/aws/package/compile/events/msk/index.test.js @@ -20,8 +20,8 @@ describe('AwsCompileMSKEvents', () => { let defaultIamRole; let naming; - before(() => - runServerless({ + before(async () => { + const { awsNaming, cfTemplate } = await runServerless({ fixture: 'function', configExt: { functions: { @@ -51,15 +51,14 @@ describe('AwsCompileMSKEvents', () => { }, }, cliArgs: ['package'], - }).then(({ awsNaming, cfTemplate }) => { - naming = awsNaming; - minimalEventSourceMappingResource = - cfTemplate.Resources[naming.getMSKEventLogicalId('foo', 'ClusterName', 'TestingTopic')]; - allParamsEventSourceMappingResource = - cfTemplate.Resources[naming.getMSKEventLogicalId('other', 'ClusterName', 'TestingTopic')]; - defaultIamRole = cfTemplate.Resources.IamRoleLambdaExecution; - }) - ); + }); + naming = awsNaming; + minimalEventSourceMappingResource = + cfTemplate.Resources[naming.getMSKEventLogicalId('foo', 'ClusterName', 'TestingTopic')]; + allParamsEventSourceMappingResource = + cfTemplate.Resources[naming.getMSKEventLogicalId('other', 'ClusterName', 'TestingTopic')]; + defaultIamRole = cfTemplate.Resources.IamRoleLambdaExecution; + }); it('should correctly compile EventSourceMapping resource properties with minimal configuration', () => { expect(minimalEventSourceMappingResource.Properties).to.deep.equal({ @@ -115,30 +114,30 @@ describe('AwsCompileMSKEvents', () => { }); describe('when no msk events are defined', () => { - it('should not modify the default IAM role', () => { - return runServerless({ + it('should not modify the default IAM role', async () => { + const { cfTemplate } = await runServerless({ fixture: 'function', cliArgs: ['package'], - }).then(({ cfTemplate }) => { - const defaultIamRole = cfTemplate.Resources.IamRoleLambdaExecution; - expect(defaultIamRole.Properties.Policies[0].PolicyDocument.Statement).not.to.deep.include({ - Effect: 'Allow', - Action: ['kafka:DescribeCluster', 'kafka:GetBootstrapBrokers'], - Resource: [], - }); + }); - expect(defaultIamRole.Properties.Policies[0].PolicyDocument.Statement).not.to.deep.include({ - Effect: 'Allow', - Action: [ - 'ec2:CreateNetworkInterface', - 'ec2:DescribeNetworkInterfaces', - 'ec2:DescribeVpcs', - 'ec2:DeleteNetworkInterface', - 'ec2:DescribeSubnets', - 'ec2:DescribeSecurityGroups', - ], - Resource: '*', - }); + const defaultIamRole = cfTemplate.Resources.IamRoleLambdaExecution; + expect(defaultIamRole.Properties.Policies[0].PolicyDocument.Statement).not.to.deep.include({ + Effect: 'Allow', + Action: ['kafka:DescribeCluster', 'kafka:GetBootstrapBrokers'], + Resource: [], + }); + + expect(defaultIamRole.Properties.Policies[0].PolicyDocument.Statement).not.to.deep.include({ + Effect: 'Allow', + Action: [ + 'ec2:CreateNetworkInterface', + 'ec2:DescribeNetworkInterfaces', + 'ec2:DescribeVpcs', + 'ec2:DeleteNetworkInterface', + 'ec2:DescribeSubnets', + 'ec2:DescribeSecurityGroups', + ], + Resource: '*', }); }); }); diff --git a/package.json b/package.json index 08692608076..2aab03a3829 100644 --- a/package.json +++ b/package.json @@ -87,6 +87,7 @@ "jszip": "^3.5.0", "lint-staged": "^10.4.0", "log": "^6.0.0", + "log-node": "^7.0.0", "mocha": "^6.2.3", "mock-require": "^3.0.3", "nyc": "^15.1.0", @@ -186,8 +187,8 @@ "integration-test-run-all": "mocha-isolated --pass-through-aws-creds --skip-fs-cleanup-check --max-workers=20 \"test/integration/**/*.test.js\"", "integration-test-run-basic": "mocha test/integrationBasic.test.js", "integration-test-run-package": "mocha-isolated --skip-fs-cleanup-check test/integrationPackage/**/*.tests.js", - "integration-test-setup-infrastructure": "node ./scripts/test/setup-integration-infra.js", - "integration-test-teardown-infrastructure": "node ./scripts/test/teardown-integration-infra.js", + "integration-test-setup": "node ./scripts/test/integration-setup/index.js", + "integration-test-teardown": "node ./scripts/test/integration-teardown.js", "lint": "eslint .", "lint:updated": "pipe-git-updated --ext=js -- eslint", "pkg:build": "node ./scripts/pkg/build.js", diff --git a/scripts/test/cloudformation.yml b/scripts/test/integration-setup/cloudformation.yml similarity index 100% rename from scripts/test/cloudformation.yml rename to scripts/test/integration-setup/cloudformation.yml diff --git a/scripts/test/integration-setup/index.js b/scripts/test/integration-setup/index.js new file mode 100755 index 00000000000..e0d60e57e43 --- /dev/null +++ b/scripts/test/integration-setup/index.js @@ -0,0 +1,68 @@ +#!/usr/bin/env node + +'use strict'; + +require('essentials'); +require('log-node')(); + +const log = require('log').get('serverless:scripts'); +const awsRequest = require('@serverless/test/aws-request'); +const fsPromises = require('fs').promises; +const path = require('path'); +const { SHARED_INFRA_TESTS_CLOUDFORMATION_STACK } = require('../../../test/utils/cludformation'); + +(async () => { + log.notice('Starting setup of integration infrastructure...'); + + const [cfnTemplate, kafkaServerProperties] = await Promise.all([ + fsPromises.readFile(path.join(__dirname, 'cloudformation.yml'), 'utf8'), + fsPromises.readFile(path.join(__dirname, 'kafka.server.properties')), + ]); + + log.notice('Checking if integration tests CloudFormation stack already exists...'); + try { + await awsRequest('CloudFormation', 'describeStacks', { + StackName: SHARED_INFRA_TESTS_CLOUDFORMATION_STACK, + }); + log.error('Integration tests CloudFormation stack already exists. Quitting.'); + return; + } catch (e) { + if (e.code !== 'ValidationError') { + throw e; + } + } + log.notice('Integration tests CloudFormation does not exist. Continuing.'); + + const clusterName = 'integration-tests-msk-cluster'; + const clusterConfName = 'integration-tests-msk-cluster-configuration'; + + log.notice('Creating MSK Cluster configuration...'); + const clusterConfResponse = await awsRequest('Kafka', 'createConfiguration', { + Name: clusterConfName, + ServerProperties: kafkaServerProperties, + KafkaVersions: ['2.2.1'], + }); + + const clusterConfigurationArn = clusterConfResponse.Arn; + const clusterConfigurationRevision = clusterConfResponse.LatestRevision.Revision.toString(); + + log.notice('Deploying integration tests CloudFormation stack...'); + await awsRequest('CloudFormation', 'createStack', { + StackName: SHARED_INFRA_TESTS_CLOUDFORMATION_STACK, + TemplateBody: cfnTemplate, + Parameters: [ + { ParameterKey: 'ClusterName', ParameterValue: clusterName }, + { ParameterKey: 'ClusterConfigurationArn', ParameterValue: clusterConfigurationArn }, + { + ParameterKey: 'ClusterConfigurationRevision', + ParameterValue: clusterConfigurationRevision, + }, + ], + }); + + await awsRequest('CloudFormation', 'waitFor', 'stackCreateComplete', { + StackName: SHARED_INFRA_TESTS_CLOUDFORMATION_STACK, + }); + log.notice('Deployed integration tests CloudFormation stack!'); + log.notice('Setup of integration infrastructure finished'); +})(); diff --git a/scripts/test/kafka.server.properties b/scripts/test/integration-setup/kafka.server.properties similarity index 100% rename from scripts/test/kafka.server.properties rename to scripts/test/integration-setup/kafka.server.properties diff --git a/scripts/test/teardown-integration-infra.js b/scripts/test/integration-teardown.js similarity index 66% rename from scripts/test/teardown-integration-infra.js rename to scripts/test/integration-teardown.js index eb7810e9531..8abea967da6 100755 --- a/scripts/test/teardown-integration-infra.js +++ b/scripts/test/integration-teardown.js @@ -1,5 +1,11 @@ +#!/usr/bin/env node + 'use strict'; +require('essentials'); +require('log-node')(); + +const log = require('log').get('serverless:scripts'); const awsRequest = require('@serverless/test/aws-request'); const { SHARED_INFRA_TESTS_CLOUDFORMATION_STACK, @@ -7,19 +13,19 @@ const { } = require('../../test/utils/cludformation'); (async () => { - process.stdout.write('Starting teardown of integration infrastructure...\n'); + log.notice('Starting teardown of integration infrastructure...'); const describeClustersResponse = await awsRequest('Kafka', 'listClusters'); const clusterConfArn = describeClustersResponse.ClusterInfoList[0].CurrentBrokerSoftwareInfo.ConfigurationArn; const outputMap = await getDependencyStackOutputMap(); - process.stdout.write('Removing leftover ENI...\n'); + log.notice('Removing leftover ENI...'); const describeResponse = await awsRequest('EC2', 'describeNetworkInterfaces', { Filters: [ { Name: 'vpc-id', - Values: [outputMap.VPC], + Values: [outputMap.get('VPC')], }, { Name: 'status', @@ -36,20 +42,20 @@ const { ) ); } catch (e) { - process.stdout.write(`Error: ${e} while trying to remove leftover ENIs\n`); + log.error(`Error: ${e} while trying to remove leftover ENIs\n`); } - process.stdout.write('Removing integration tests CloudFormation stack...\n'); + log.notice('Removing integration tests CloudFormation stack...'); await awsRequest('CloudFormation', 'deleteStack', { StackName: SHARED_INFRA_TESTS_CLOUDFORMATION_STACK, }); await awsRequest('CloudFormation', 'waitFor', 'stackDeleteComplete', { StackName: SHARED_INFRA_TESTS_CLOUDFORMATION_STACK, }); - process.stdout.write('Removed integration tests CloudFormation stack!\n'); - process.stdout.write('Removing MSK Cluster configuration...\n'); + log.notice('Removed integration tests CloudFormation stack!'); + log.notice('Removing MSK Cluster configuration...'); await awsRequest('Kafka', 'deleteConfiguration', { Arn: clusterConfArn, }); - process.stdout.write('Removed MSK Cluster configuration\n'); - process.stdout.write('Teardown of integration infrastructure finished\n'); + log.notice('Removed MSK Cluster configuration'); + log.notice('Teardown of integration infrastructure finished'); })(); diff --git a/scripts/test/setup-integration-infra.js b/scripts/test/setup-integration-infra.js deleted file mode 100755 index e89036297d3..00000000000 --- a/scripts/test/setup-integration-infra.js +++ /dev/null @@ -1,64 +0,0 @@ -'use strict'; - -const awsRequest = require('@serverless/test/aws-request'); -const fs = require('fs'); -const path = require('path'); -const { SHARED_INFRA_TESTS_CLOUDFORMATION_STACK } = require('../../test/utils/cludformation'); - -(async () => { - process.stdout.write('Starting setup of integration infrastructure...\n'); - const cfnTemplate = fs.readFileSync(path.join(__dirname, 'cloudformation.yml'), 'utf8'); - const kafkaServerProperties = fs.readFileSync(path.join(__dirname, 'kafka.server.properties')); - - process.stdout.write('Checking if integration tests CloudFormation stack already exists...\n'); - try { - await awsRequest('CloudFormation', 'describeStacks', { - StackName: SHARED_INFRA_TESTS_CLOUDFORMATION_STACK, - }); - process.stdout.write('Integration tests CloudFormation stack already exists. Quitting.\n'); - return; - } catch (e) { - process.stdout.write('Integration tests CloudFormation does not exist. Continuing.\n'); - } - - const clusterName = 'integration-tests-msk-cluster'; - const clusterConfName = 'integration-tests-msk-cluster-configuration'; - - process.stdout.write('Creating MSK Cluster configuration...\n'); - let clusterConfResponse; - try { - clusterConfResponse = await awsRequest('Kafka', 'createConfiguration', { - Name: clusterConfName, - ServerProperties: kafkaServerProperties, - KafkaVersions: ['2.2.1'], - }); - } catch (e) { - process.stdout.write( - `Error: ${e} while trying to create MSK Cluster configuration. Quitting. \n` - ); - return; - } - - const clusterConfigurationArn = clusterConfResponse.Arn; - const clusterConfigurationRevision = clusterConfResponse.LatestRevision.Revision.toString(); - - process.stdout.write('Deploying integration tests CloudFormation stack...\n'); - await awsRequest('CloudFormation', 'createStack', { - StackName: SHARED_INFRA_TESTS_CLOUDFORMATION_STACK, - TemplateBody: cfnTemplate, - Parameters: [ - { ParameterKey: 'ClusterName', ParameterValue: clusterName }, - { ParameterKey: 'ClusterConfigurationArn', ParameterValue: clusterConfigurationArn }, - { - ParameterKey: 'ClusterConfigurationRevision', - ParameterValue: clusterConfigurationRevision, - }, - ], - }); - - await awsRequest('CloudFormation', 'waitFor', 'stackCreateComplete', { - StackName: SHARED_INFRA_TESTS_CLOUDFORMATION_STACK, - }); - process.stdout.write('Deployed integration tests CloudFormation stack!\n'); - process.stdout.write('Setup of integration infrastructure finished\n'); -})(); diff --git a/test/fixtures/functionMsk/core.js b/test/fixtures/functionMsk/core.js index 6c7389e4a08..93ff87cb512 100644 --- a/test/fixtures/functionMsk/core.js +++ b/test/fixtures/functionMsk/core.js @@ -1,9 +1,5 @@ 'use strict'; -// NOTE: the `utils.js` file is bundled into the deployment package -// eslint-disable-next-line -const { log } = require('./utils'); - // NOTE: `kafkajs` is bundled into the deployment package // eslint-disable-next-line const { Kafka } = require('kafkajs'); @@ -14,7 +10,8 @@ function consumer(event, context, callback) { const messages = Object.values(records)[0].map(record => Buffer.from(record.value, 'base64').toString() ); - log(functionName, JSON.stringify(messages)); + // eslint-disable-next-line + console.log(functionName, JSON.stringify(messages)); return callback(null, event); } diff --git a/test/fixtures/functionMsk/package.json b/test/fixtures/functionMsk/package.json index ab1a46cd75c..86a9cdb040e 100644 --- a/test/fixtures/functionMsk/package.json +++ b/test/fixtures/functionMsk/package.json @@ -1,13 +1,4 @@ { - "name": "functionMsk", - "version": "1.0.0", - "description": "", - "main": "core.js", - "scripts": { - "test": "echo \"Error: no test specified\" && exit 1" - }, - "author": "", - "license": "MIT", "dependencies": { "kafkajs": "^1.13.0" } diff --git a/test/fixtures/functionMsk/serverless.yml b/test/fixtures/functionMsk/serverless.yml index 85d0254a8ea..02a66625baf 100644 --- a/test/fixtures/functionMsk/serverless.yml +++ b/test/fixtures/functionMsk/serverless.yml @@ -1,6 +1,7 @@ service: service configValidationMode: error +frameworkVersion: '*' # VPC and Events configuration is added dynamically during test run # Because it has to be provisioned separately via CloudFormation stack diff --git a/test/fixtures/functionMsk/utils.js b/test/fixtures/functionMsk/utils.js deleted file mode 100644 index f4a99e00afa..00000000000 --- a/test/fixtures/functionMsk/utils.js +++ /dev/null @@ -1,22 +0,0 @@ -'use strict'; - -const logger = console; - -function getMarkers(functionName) { - return { - start: `--- START ${functionName} ---`, - end: `--- END ${functionName} ---`, - }; -} - -function log(functionName, message) { - const markers = getMarkers(functionName); - logger.log(markers.start); - logger.log(message); - logger.log(markers.end); -} - -module.exports = { - getMarkers, - log, -}; diff --git a/test/integration/msk.test.js b/test/integration/infra-dependent/msk.test.js similarity index 70% rename from test/integration/msk.test.js rename to test/integration/infra-dependent/msk.test.js index d65f20d6d14..57e0836b8e6 100644 --- a/test/integration/msk.test.js +++ b/test/integration/infra-dependent/msk.test.js @@ -2,16 +2,16 @@ const { expect } = require('chai'); const log = require('log').get('serverless:test'); -const fixtures = require('../fixtures'); -const { confirmCloudWatchLogs } = require('../utils/misc'); +const fixtures = require('../../fixtures'); +const { confirmCloudWatchLogs } = require('../../utils/misc'); const { isDependencyStackAvailable, getDependencyStackOutputMap, -} = require('../utils/cludformation'); +} = require('../../utils/cludformation'); const awsRequest = require('@serverless/test/aws-request'); const crypto = require('crypto'); -const { deployService, removeService } = require('../utils/integration'); +const { deployService, removeService } = require('../../utils/integration'); describe('AWS - MSK Integration Test', function() { this.timeout(1000 * 60 * 100); // Involves time-taking deploys @@ -21,20 +21,17 @@ describe('AWS - MSK Integration Test', function() { const topicName = `msk-topic-${crypto.randomBytes(8).toString('hex')}`; - before(async function beforeHook() { + before(async () => { const isDepsStackAvailable = await isDependencyStackAvailable(); if (!isDepsStackAvailable) { - log.notice( - 'CloudFormation stack with integration test dependencies not found. Skipping test.' - ); - this.skip(); + throw new Error('CloudFormation stack with integration test dependencies not found.'); } const outputMap = await getDependencyStackOutputMap(); log.notice('Getting MSK Boostrap Brokers URLs...'); const getBootstrapBrokersResponse = await awsRequest('Kafka', 'getBootstrapBrokers', { - ClusterArn: outputMap.MSKCluster, + ClusterArn: outputMap.get('MSKCluster'), }); const brokerUrls = getBootstrapBrokersResponse.BootstrapBrokerStringTls; @@ -43,8 +40,8 @@ describe('AWS - MSK Integration Test', function() { functions: { producer: { vpc: { - subnetIds: [outputMap.PrivateSubnetA], - securityGroupIds: [outputMap.SecurityGroup], + subnetIds: [outputMap.get('PrivateSubnetA')], + securityGroupIds: [outputMap.get('SecurityGroup')], }, environment: { TOPIC_NAME: topicName, @@ -55,7 +52,7 @@ describe('AWS - MSK Integration Test', function() { events: [ { msk: { - arn: outputMap.MSKCluster, + arn: outputMap.get('MSKCluster'), topic: topicName, }, }, @@ -69,13 +66,11 @@ describe('AWS - MSK Integration Test', function() { const serviceName = serviceData.serviceConfig.service; stackName = `${serviceName}-${stage}`; - log.notice(`Deploying "${stackName}" service...`); await deployService(servicePath); }); after(async () => { if (servicePath) { - log.notice('Removing service...'); await removeService(servicePath); } }); @@ -84,7 +79,7 @@ describe('AWS - MSK Integration Test', function() { const functionName = 'consumer'; const message = 'Hello from MSK Integration test!'; - return confirmCloudWatchLogs( + const events = await confirmCloudWatchLogs( `/aws/lambda/${stackName}-${functionName}`, async () => await awsRequest('Lambda', 'invoke', { @@ -92,10 +87,10 @@ describe('AWS - MSK Integration Test', function() { InvocationType: 'RequestResponse', }), { timeout: 120 * 1000 } - ).then(events => { - const logs = events.reduce((data, event) => data + event.message, ''); - expect(logs).to.include(functionName); - expect(logs).to.include(message); - }); + ); + + const logs = events.reduce((data, event) => data + event.message, ''); + expect(logs).to.include(functionName); + expect(logs).to.include(message); }); }); diff --git a/test/utils/cludformation.js b/test/utils/cludformation.js index 3bad72af176..26fcd027870 100644 --- a/test/utils/cludformation.js +++ b/test/utils/cludformation.js @@ -69,7 +69,10 @@ async function doesStackWithNameAndStatusExists(name, status) { } return false; } catch (e) { - return false; + if (e.code === 'ValidationError') { + return false; + } + throw e; } } @@ -78,10 +81,11 @@ async function getStackOutputMap(name) { StackName: name, }); - return describeStackResponse.Stacks[0].Outputs.reduce((map, output) => { - map[output.OutputKey] = output.OutputValue; - return map; - }, {}); + const outputsMap = new Map(); + for (const { OutputKey: key, OutputValue: value } of describeStackResponse.Stacks[0].Outputs) { + outputsMap.set(key, value); + } + return outputsMap; } async function isDependencyStackAvailable() { From a2643e4e905c280c16b6e0c9878ae2256e5093ed Mon Sep 17 00:00:00 2001 From: Piotr Grzesik Date: Tue, 22 Sep 2020 15:46:39 +0200 Subject: [PATCH 11/12] feat(aws): allow to update shared infra stack --- .../aws/package/compile/events/msk/index.js | 136 +++++++++--------- scripts/test/integration-setup/index.js | 82 ++++++++--- test/fixtures/functionMsk/core.js | 2 +- test/utils/cludformation.js | 38 +++-- 4 files changed, 149 insertions(+), 109 deletions(-) diff --git a/lib/plugins/aws/package/compile/events/msk/index.js b/lib/plugins/aws/package/compile/events/msk/index.js index abaf678446b..134bb3f664a 100644 --- a/lib/plugins/aws/package/compile/events/msk/index.js +++ b/lib/plugins/aws/package/compile/events/msk/index.js @@ -47,80 +47,78 @@ class AwsCompileMSKEvents { const functionObj = this.serverless.service.getFunction(functionName); const cfTemplate = this.serverless.service.provider.compiledCloudFormationTemplate; - if (functionObj.events) { - // It is required to add the following statement in order to be able to connect to MSK cluster - const ec2Statement = { - Effect: 'Allow', - Action: [ - 'ec2:CreateNetworkInterface', - 'ec2:DescribeNetworkInterfaces', - 'ec2:DescribeVpcs', - 'ec2:DeleteNetworkInterface', - 'ec2:DescribeSubnets', - 'ec2:DescribeSecurityGroups', - ], - Resource: '*', - }; - const mskStatement = { - Effect: 'Allow', - Action: ['kafka:DescribeCluster', 'kafka:GetBootstrapBrokers'], - Resource: [], - }; - - functionObj.events.forEach(event => { - if (event.msk) { - const eventSourceArn = event.msk.arn; - const topic = event.msk.topic; - const batchSize = event.msk.batchSize; - const enabled = event.msk.enabled; - const startingPosition = event.msk.startingPosition || 'TRIM_HORIZON'; - - const mskClusterNameToken = getMskClusterNameToken(eventSourceArn); - const mskEventLogicalId = this.provider.naming.getMSKEventLogicalId( - functionName, - mskClusterNameToken, - topic - ); - - const dependsOn = this.provider.resolveFunctionIamRoleResourceName(functionObj) || []; - - const lambdaLogicalId = this.provider.naming.getLambdaLogicalId(functionName); - - const mskResource = { - Type: 'AWS::Lambda::EventSourceMapping', - DependsOn: dependsOn, - Properties: { - EventSourceArn: eventSourceArn, - FunctionName: { - 'Fn::GetAtt': [lambdaLogicalId, 'Arn'], - }, - StartingPosition: startingPosition, - Topics: [topic], + // It is required to add the following statement in order to be able to connect to MSK cluster + const ec2Statement = { + Effect: 'Allow', + Action: [ + 'ec2:CreateNetworkInterface', + 'ec2:DescribeNetworkInterfaces', + 'ec2:DescribeVpcs', + 'ec2:DeleteNetworkInterface', + 'ec2:DescribeSubnets', + 'ec2:DescribeSecurityGroups', + ], + Resource: '*', + }; + const mskStatement = { + Effect: 'Allow', + Action: ['kafka:DescribeCluster', 'kafka:GetBootstrapBrokers'], + Resource: [], + }; + + functionObj.events.forEach(event => { + if (event.msk) { + const eventSourceArn = event.msk.arn; + const topic = event.msk.topic; + const batchSize = event.msk.batchSize; + const enabled = event.msk.enabled; + const startingPosition = event.msk.startingPosition || 'TRIM_HORIZON'; + + const mskClusterNameToken = getMskClusterNameToken(eventSourceArn); + const mskEventLogicalId = this.provider.naming.getMSKEventLogicalId( + functionName, + mskClusterNameToken, + topic + ); + + const dependsOn = this.provider.resolveFunctionIamRoleResourceName(functionObj) || []; + + const lambdaLogicalId = this.provider.naming.getLambdaLogicalId(functionName); + + const mskResource = { + Type: 'AWS::Lambda::EventSourceMapping', + DependsOn: dependsOn, + Properties: { + EventSourceArn: eventSourceArn, + FunctionName: { + 'Fn::GetAtt': [lambdaLogicalId, 'Arn'], }, - }; + StartingPosition: startingPosition, + Topics: [topic], + }, + }; - if (batchSize) { - mskResource.Properties.BatchSize = batchSize; - } + if (batchSize) { + mskResource.Properties.BatchSize = batchSize; + } - if (enabled != null) { - mskResource.Properties.Enabled = enabled; - } + if (enabled != null) { + mskResource.Properties.Enabled = enabled; + } - mskStatement.Resource.push(eventSourceArn); + mskStatement.Resource.push(eventSourceArn); - cfTemplate.Resources[mskEventLogicalId] = mskResource; - } - }); - - if (cfTemplate.Resources.IamRoleLambdaExecution) { - const statement = - cfTemplate.Resources.IamRoleLambdaExecution.Properties.Policies[0].PolicyDocument - .Statement; - if (mskStatement.Resource.length) { - statement.push(mskStatement); - statement.push(ec2Statement); - } + cfTemplate.Resources[mskEventLogicalId] = mskResource; + } + }); + + if (cfTemplate.Resources.IamRoleLambdaExecution) { + const statement = + cfTemplate.Resources.IamRoleLambdaExecution.Properties.Policies[0].PolicyDocument + .Statement; + if (mskStatement.Resource.length) { + statement.push(mskStatement); + statement.push(ec2Statement); } } }); diff --git a/scripts/test/integration-setup/index.js b/scripts/test/integration-setup/index.js index e0d60e57e43..894eeb2f4d1 100755 --- a/scripts/test/integration-setup/index.js +++ b/scripts/test/integration-setup/index.js @@ -11,28 +11,12 @@ const fsPromises = require('fs').promises; const path = require('path'); const { SHARED_INFRA_TESTS_CLOUDFORMATION_STACK } = require('../../../test/utils/cludformation'); -(async () => { - log.notice('Starting setup of integration infrastructure...'); - +async function handleInfrastructureCreation() { const [cfnTemplate, kafkaServerProperties] = await Promise.all([ fsPromises.readFile(path.join(__dirname, 'cloudformation.yml'), 'utf8'), fsPromises.readFile(path.join(__dirname, 'kafka.server.properties')), ]); - log.notice('Checking if integration tests CloudFormation stack already exists...'); - try { - await awsRequest('CloudFormation', 'describeStacks', { - StackName: SHARED_INFRA_TESTS_CLOUDFORMATION_STACK, - }); - log.error('Integration tests CloudFormation stack already exists. Quitting.'); - return; - } catch (e) { - if (e.code !== 'ValidationError') { - throw e; - } - } - log.notice('Integration tests CloudFormation does not exist. Continuing.'); - const clusterName = 'integration-tests-msk-cluster'; const clusterConfName = 'integration-tests-msk-cluster-configuration'; @@ -64,5 +48,69 @@ const { SHARED_INFRA_TESTS_CLOUDFORMATION_STACK } = require('../../../test/utils StackName: SHARED_INFRA_TESTS_CLOUDFORMATION_STACK, }); log.notice('Deployed integration tests CloudFormation stack!'); +} + +async function handleInfrastructureUpdate() { + log.notice('Updating integration tests CloudFormation stack...'); + + const cfnTemplate = await fsPromises.readFile(path.join(__dirname, 'cloudformation.yml'), 'utf8'); + + try { + await awsRequest('CloudFormation', 'updateStack', { + StackName: SHARED_INFRA_TESTS_CLOUDFORMATION_STACK, + TemplateBody: cfnTemplate, + Parameters: [ + { ParameterKey: 'ClusterName', UsePreviousValue: true }, + { ParameterKey: 'ClusterConfigurationArn', UsePreviousValue: true }, + { + ParameterKey: 'ClusterConfigurationRevision', + UsePreviousValue: true, + }, + ], + }); + } catch (e) { + if (e.message === 'No updates are to be performed.') { + log.notice('No changes detected. Integration tests CloudFormation stack is up to date.'); + return; + } + throw e; + } + + await awsRequest('CloudFormation', 'waitFor', 'stackUpdateComplete', { + StackName: SHARED_INFRA_TESTS_CLOUDFORMATION_STACK, + }); + log.notice('Updated integration tests CloudFormation stack!'); +} + +(async () => { + log.notice('Starting setup of integration infrastructure...'); + + let describeResponse; + + log.notice('Checking if integration tests CloudFormation stack already exists...'); + try { + describeResponse = await awsRequest('CloudFormation', 'describeStacks', { + StackName: SHARED_INFRA_TESTS_CLOUDFORMATION_STACK, + }); + log.notice('Integration tests CloudFormation stack already exists'); + } catch (e) { + if (e.code !== 'ValidationError') { + throw e; + } + log.notice('Integration tests CloudFormation does not exist'); + } + + if (describeResponse) { + const stackStatus = describeResponse.Stacks[0].StackStatus; + + if (['CREATE_COMPLETE', 'UPDATE_COMPLETE'].includes(stackStatus)) { + await handleInfrastructureUpdate(); + } else { + log.error('Existing stack has status: {stackStatus} and it cannot be updated.'); + } + } else { + await handleInfrastructureCreation(); + } + log.notice('Setup of integration infrastructure finished'); })(); diff --git a/test/fixtures/functionMsk/core.js b/test/fixtures/functionMsk/core.js index 93ff87cb512..ee5e4662bea 100644 --- a/test/fixtures/functionMsk/core.js +++ b/test/fixtures/functionMsk/core.js @@ -10,7 +10,7 @@ function consumer(event, context, callback) { const messages = Object.values(records)[0].map(record => Buffer.from(record.value, 'base64').toString() ); - // eslint-disable-next-line + // eslint-disable-next-line no-console console.log(functionName, JSON.stringify(messages)); return callback(null, event); } diff --git a/test/utils/cludformation.js b/test/utils/cludformation.js index 26fcd027870..243b55d69ca 100644 --- a/test/utils/cludformation.js +++ b/test/utils/cludformation.js @@ -59,23 +59,6 @@ function listStacks(status) { return awsRequest('CloudFormation', 'listStacks', params); } -async function doesStackWithNameAndStatusExists(name, status) { - try { - const describeStacksResponse = await awsRequest('CloudFormation', 'describeStacks', { - StackName: name, - }); - if (describeStacksResponse.Stacks[0].StackStatus === status) { - return true; - } - return false; - } catch (e) { - if (e.code === 'ValidationError') { - return false; - } - throw e; - } -} - async function getStackOutputMap(name) { const describeStackResponse = await awsRequest('CloudFormation', 'describeStacks', { StackName: name, @@ -89,10 +72,22 @@ async function getStackOutputMap(name) { } async function isDependencyStackAvailable() { - return doesStackWithNameAndStatusExists( - SHARED_INFRA_TESTS_CLOUDFORMATION_STACK, - 'CREATE_COMPLETE' - ); + const validStatuses = ['CREATE_COMPLETE', 'UPDATE_COMPLETE']; + + try { + const describeStacksResponse = await awsRequest('CloudFormation', 'describeStacks', { + StackName: SHARED_INFRA_TESTS_CLOUDFORMATION_STACK, + }); + if (validStatuses.includes(describeStacksResponse.Stacks[0].StackStatus)) { + return true; + } + return false; + } catch (e) { + if (e.code === 'ValidationError') { + return false; + } + throw e; + } } async function getDependencyStackOutputMap() { @@ -104,7 +99,6 @@ module.exports = { deleteStack, listStackResources, listStacks, - doesStackWithNameAndStatusExists, getStackOutputMap, SHARED_INFRA_TESTS_CLOUDFORMATION_STACK, isDependencyStackAvailable, From b5aeb05bd268b7c4d8940e0d9962b6ed1d8a9805 Mon Sep 17 00:00:00 2001 From: Piotr Grzesik Date: Wed, 23 Sep 2020 16:42:19 +0200 Subject: [PATCH 12/12] minor fixes --- scripts/test/integration-setup/index.js | 3 ++- scripts/test/integration-teardown.js | 2 +- test/fixtures/functionMsk/core.js | 2 +- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/scripts/test/integration-setup/index.js b/scripts/test/integration-setup/index.js index 894eeb2f4d1..8bf49dc947d 100755 --- a/scripts/test/integration-setup/index.js +++ b/scripts/test/integration-setup/index.js @@ -5,7 +5,7 @@ require('essentials'); require('log-node')(); -const log = require('log').get('serverless:scripts'); +const log = require('log').get('serverless'); const awsRequest = require('@serverless/test/aws-request'); const fsPromises = require('fs').promises; const path = require('path'); @@ -107,6 +107,7 @@ async function handleInfrastructureUpdate() { await handleInfrastructureUpdate(); } else { log.error('Existing stack has status: {stackStatus} and it cannot be updated.'); + process.exitCode = 1; } } else { await handleInfrastructureCreation(); diff --git a/scripts/test/integration-teardown.js b/scripts/test/integration-teardown.js index 8abea967da6..03680ee921f 100755 --- a/scripts/test/integration-teardown.js +++ b/scripts/test/integration-teardown.js @@ -5,7 +5,7 @@ require('essentials'); require('log-node')(); -const log = require('log').get('serverless:scripts'); +const log = require('log').get('serverless'); const awsRequest = require('@serverless/test/aws-request'); const { SHARED_INFRA_TESTS_CLOUDFORMATION_STACK, diff --git a/test/fixtures/functionMsk/core.js b/test/fixtures/functionMsk/core.js index ee5e4662bea..e00ec4ab8fd 100644 --- a/test/fixtures/functionMsk/core.js +++ b/test/fixtures/functionMsk/core.js @@ -1,7 +1,7 @@ 'use strict'; // NOTE: `kafkajs` is bundled into the deployment package -// eslint-disable-next-line +// eslint-disable-next-line import/no-unresolved const { Kafka } = require('kafkajs'); function consumer(event, context, callback) {