From 21fe37e9dccfa0659ace57f14d822f7590db3ee6 Mon Sep 17 00:00:00 2001 From: Piotr Grzesik Date: Mon, 21 Sep 2020 16:51:45 +0200 Subject: [PATCH] chore: minor adjustments --- .gitignore | 3 - docs/providers/aws/events/msk.md | 5 ++ lib/plugins/aws/lib/naming.js | 7 +- .../package/compile/events/msk/index.test.js | 63 +++++++++-------- package.json | 5 +- .../cloudformation.yml | 0 scripts/test/integration-setup/index.js | 68 +++++++++++++++++++ .../kafka.server.properties | 0 ...ation-infra.js => integration-teardown.js} | 24 ++++--- scripts/test/setup-integration-infra.js | 64 ----------------- test/fixtures/functionMsk/core.js | 7 +- test/fixtures/functionMsk/package.json | 9 --- test/fixtures/functionMsk/serverless.yml | 1 + test/fixtures/functionMsk/utils.js | 22 ------ .../{ => infra-dependent}/msk.test.js | 37 +++++----- test/utils/cludformation.js | 14 ++-- 16 files changed, 152 insertions(+), 177 deletions(-) rename scripts/test/{ => integration-setup}/cloudformation.yml (100%) create mode 100755 scripts/test/integration-setup/index.js rename scripts/test/{ => integration-setup}/kafka.server.properties (100%) rename scripts/test/{teardown-integration-infra.js => integration-teardown.js} (66%) delete mode 100755 scripts/test/setup-integration-infra.js delete mode 100644 test/fixtures/functionMsk/utils.js rename test/integration/{ => infra-dependent}/msk.test.js (70%) diff --git a/.gitignore b/.gitignore index fac841ca046..3e71f2a2d15 100755 --- a/.gitignore +++ b/.gitignore @@ -5,6 +5,3 @@ /node_modules npm-debug.log /package-lock.json - -/test/fixtures/**/node_modules -/test/fixtures/**/package-lock.json diff --git a/docs/providers/aws/events/msk.md b/docs/providers/aws/events/msk.md index a5bd06c0118..64407a51580 100644 --- a/docs/providers/aws/events/msk.md +++ b/docs/providers/aws/events/msk.md @@ -14,6 +14,11 @@ layout: Doc # MSK +[Amazon Managed Streaming for Apache Kafka (Amazon MSK)](https://aws.amazon.com/msk/) is a fully managed streaming service that uses Apache Kafka. +Amazon MSK can be used as event source for Lambda, which allows Lambda service to internally poll it for new messages and invoke corresponding Lambda functions. + +## Simple event definition + In the following example, we specify that the `compute` function should be triggered whenever there are new messages available to consume from defined Kafka `topic`. In order to configure `msk` event, you have to provide two required properties: `arn`, which represents an ARN of MSK cluster and `topic` to consume messages from. diff --git a/lib/plugins/aws/lib/naming.js b/lib/plugins/aws/lib/naming.js index 60b1da1a8a3..4005ec742b9 100644 --- a/lib/plugins/aws/lib/naming.js +++ b/lib/plugins/aws/lib/naming.js @@ -412,11 +412,8 @@ module.exports = { getMSKEventLogicalId(functionName, clusterName, topicName) { const normalizedFunctionName = this.getNormalizedFunctionName(functionName); // Both clusterName and topicName are trimmed to 79 chars to avoid going over 255 character limit - const normalizedClusterName = this.normalizeNameToAlphaNumericOnly(clusterName).substring( - 0, - 79 - ); - const normalizedTopicName = this.normalizeNameToAlphaNumericOnly(topicName).substring(0, 79); + const normalizedClusterName = this.normalizeNameToAlphaNumericOnly(clusterName).slice(0, 79); + const normalizedTopicName = this.normalizeNameToAlphaNumericOnly(topicName).slice(0, 79); return `${normalizedFunctionName}EventSourceMappingMSK${normalizedClusterName}${normalizedTopicName}`; }, diff --git a/lib/plugins/aws/package/compile/events/msk/index.test.js b/lib/plugins/aws/package/compile/events/msk/index.test.js index 224e41d4f25..be4f87b541d 100644 --- a/lib/plugins/aws/package/compile/events/msk/index.test.js +++ b/lib/plugins/aws/package/compile/events/msk/index.test.js @@ -20,8 +20,8 @@ describe('AwsCompileMSKEvents', () => { let defaultIamRole; let naming; - before(() => - runServerless({ + before(async () => { + const { awsNaming, cfTemplate } = await runServerless({ fixture: 'function', configExt: { functions: { @@ -51,15 +51,14 @@ describe('AwsCompileMSKEvents', () => { }, }, cliArgs: ['package'], - }).then(({ awsNaming, cfTemplate }) => { - naming = awsNaming; - minimalEventSourceMappingResource = - cfTemplate.Resources[naming.getMSKEventLogicalId('foo', 'ClusterName', 'TestingTopic')]; - allParamsEventSourceMappingResource = - cfTemplate.Resources[naming.getMSKEventLogicalId('other', 'ClusterName', 'TestingTopic')]; - defaultIamRole = cfTemplate.Resources.IamRoleLambdaExecution; - }) - ); + }); + naming = awsNaming; + minimalEventSourceMappingResource = + cfTemplate.Resources[naming.getMSKEventLogicalId('foo', 'ClusterName', 'TestingTopic')]; + allParamsEventSourceMappingResource = + cfTemplate.Resources[naming.getMSKEventLogicalId('other', 'ClusterName', 'TestingTopic')]; + defaultIamRole = cfTemplate.Resources.IamRoleLambdaExecution; + }); it('should correctly compile EventSourceMapping resource properties with minimal configuration', () => { expect(minimalEventSourceMappingResource.Properties).to.deep.equal({ @@ -115,30 +114,30 @@ describe('AwsCompileMSKEvents', () => { }); describe('when no msk events are defined', () => { - it('should not modify the default IAM role', () => { - return runServerless({ + it('should not modify the default IAM role', async () => { + const { cfTemplate } = await runServerless({ fixture: 'function', cliArgs: ['package'], - }).then(({ cfTemplate }) => { - const defaultIamRole = cfTemplate.Resources.IamRoleLambdaExecution; - expect(defaultIamRole.Properties.Policies[0].PolicyDocument.Statement).not.to.deep.include({ - Effect: 'Allow', - Action: ['kafka:DescribeCluster', 'kafka:GetBootstrapBrokers'], - Resource: [], - }); + }); - expect(defaultIamRole.Properties.Policies[0].PolicyDocument.Statement).not.to.deep.include({ - Effect: 'Allow', - Action: [ - 'ec2:CreateNetworkInterface', - 'ec2:DescribeNetworkInterfaces', - 'ec2:DescribeVpcs', - 'ec2:DeleteNetworkInterface', - 'ec2:DescribeSubnets', - 'ec2:DescribeSecurityGroups', - ], - Resource: '*', - }); + const defaultIamRole = cfTemplate.Resources.IamRoleLambdaExecution; + expect(defaultIamRole.Properties.Policies[0].PolicyDocument.Statement).not.to.deep.include({ + Effect: 'Allow', + Action: ['kafka:DescribeCluster', 'kafka:GetBootstrapBrokers'], + Resource: [], + }); + + expect(defaultIamRole.Properties.Policies[0].PolicyDocument.Statement).not.to.deep.include({ + Effect: 'Allow', + Action: [ + 'ec2:CreateNetworkInterface', + 'ec2:DescribeNetworkInterfaces', + 'ec2:DescribeVpcs', + 'ec2:DeleteNetworkInterface', + 'ec2:DescribeSubnets', + 'ec2:DescribeSecurityGroups', + ], + Resource: '*', }); }); }); diff --git a/package.json b/package.json index 08692608076..2aab03a3829 100644 --- a/package.json +++ b/package.json @@ -87,6 +87,7 @@ "jszip": "^3.5.0", "lint-staged": "^10.4.0", "log": "^6.0.0", + "log-node": "^7.0.0", "mocha": "^6.2.3", "mock-require": "^3.0.3", "nyc": "^15.1.0", @@ -186,8 +187,8 @@ "integration-test-run-all": "mocha-isolated --pass-through-aws-creds --skip-fs-cleanup-check --max-workers=20 \"test/integration/**/*.test.js\"", "integration-test-run-basic": "mocha test/integrationBasic.test.js", "integration-test-run-package": "mocha-isolated --skip-fs-cleanup-check test/integrationPackage/**/*.tests.js", - "integration-test-setup-infrastructure": "node ./scripts/test/setup-integration-infra.js", - "integration-test-teardown-infrastructure": "node ./scripts/test/teardown-integration-infra.js", + "integration-test-setup": "node ./scripts/test/integration-setup/index.js", + "integration-test-teardown": "node ./scripts/test/integration-teardown.js", "lint": "eslint .", "lint:updated": "pipe-git-updated --ext=js -- eslint", "pkg:build": "node ./scripts/pkg/build.js", diff --git a/scripts/test/cloudformation.yml b/scripts/test/integration-setup/cloudformation.yml similarity index 100% rename from scripts/test/cloudformation.yml rename to scripts/test/integration-setup/cloudformation.yml diff --git a/scripts/test/integration-setup/index.js b/scripts/test/integration-setup/index.js new file mode 100755 index 00000000000..e0d60e57e43 --- /dev/null +++ b/scripts/test/integration-setup/index.js @@ -0,0 +1,68 @@ +#!/usr/bin/env node + +'use strict'; + +require('essentials'); +require('log-node')(); + +const log = require('log').get('serverless:scripts'); +const awsRequest = require('@serverless/test/aws-request'); +const fsPromises = require('fs').promises; +const path = require('path'); +const { SHARED_INFRA_TESTS_CLOUDFORMATION_STACK } = require('../../../test/utils/cludformation'); + +(async () => { + log.notice('Starting setup of integration infrastructure...'); + + const [cfnTemplate, kafkaServerProperties] = await Promise.all([ + fsPromises.readFile(path.join(__dirname, 'cloudformation.yml'), 'utf8'), + fsPromises.readFile(path.join(__dirname, 'kafka.server.properties')), + ]); + + log.notice('Checking if integration tests CloudFormation stack already exists...'); + try { + await awsRequest('CloudFormation', 'describeStacks', { + StackName: SHARED_INFRA_TESTS_CLOUDFORMATION_STACK, + }); + log.error('Integration tests CloudFormation stack already exists. Quitting.'); + return; + } catch (e) { + if (e.code !== 'ValidationError') { + throw e; + } + } + log.notice('Integration tests CloudFormation does not exist. Continuing.'); + + const clusterName = 'integration-tests-msk-cluster'; + const clusterConfName = 'integration-tests-msk-cluster-configuration'; + + log.notice('Creating MSK Cluster configuration...'); + const clusterConfResponse = await awsRequest('Kafka', 'createConfiguration', { + Name: clusterConfName, + ServerProperties: kafkaServerProperties, + KafkaVersions: ['2.2.1'], + }); + + const clusterConfigurationArn = clusterConfResponse.Arn; + const clusterConfigurationRevision = clusterConfResponse.LatestRevision.Revision.toString(); + + log.notice('Deploying integration tests CloudFormation stack...'); + await awsRequest('CloudFormation', 'createStack', { + StackName: SHARED_INFRA_TESTS_CLOUDFORMATION_STACK, + TemplateBody: cfnTemplate, + Parameters: [ + { ParameterKey: 'ClusterName', ParameterValue: clusterName }, + { ParameterKey: 'ClusterConfigurationArn', ParameterValue: clusterConfigurationArn }, + { + ParameterKey: 'ClusterConfigurationRevision', + ParameterValue: clusterConfigurationRevision, + }, + ], + }); + + await awsRequest('CloudFormation', 'waitFor', 'stackCreateComplete', { + StackName: SHARED_INFRA_TESTS_CLOUDFORMATION_STACK, + }); + log.notice('Deployed integration tests CloudFormation stack!'); + log.notice('Setup of integration infrastructure finished'); +})(); diff --git a/scripts/test/kafka.server.properties b/scripts/test/integration-setup/kafka.server.properties similarity index 100% rename from scripts/test/kafka.server.properties rename to scripts/test/integration-setup/kafka.server.properties diff --git a/scripts/test/teardown-integration-infra.js b/scripts/test/integration-teardown.js similarity index 66% rename from scripts/test/teardown-integration-infra.js rename to scripts/test/integration-teardown.js index eb7810e9531..8abea967da6 100755 --- a/scripts/test/teardown-integration-infra.js +++ b/scripts/test/integration-teardown.js @@ -1,5 +1,11 @@ +#!/usr/bin/env node + 'use strict'; +require('essentials'); +require('log-node')(); + +const log = require('log').get('serverless:scripts'); const awsRequest = require('@serverless/test/aws-request'); const { SHARED_INFRA_TESTS_CLOUDFORMATION_STACK, @@ -7,19 +13,19 @@ const { } = require('../../test/utils/cludformation'); (async () => { - process.stdout.write('Starting teardown of integration infrastructure...\n'); + log.notice('Starting teardown of integration infrastructure...'); const describeClustersResponse = await awsRequest('Kafka', 'listClusters'); const clusterConfArn = describeClustersResponse.ClusterInfoList[0].CurrentBrokerSoftwareInfo.ConfigurationArn; const outputMap = await getDependencyStackOutputMap(); - process.stdout.write('Removing leftover ENI...\n'); + log.notice('Removing leftover ENI...'); const describeResponse = await awsRequest('EC2', 'describeNetworkInterfaces', { Filters: [ { Name: 'vpc-id', - Values: [outputMap.VPC], + Values: [outputMap.get('VPC')], }, { Name: 'status', @@ -36,20 +42,20 @@ const { ) ); } catch (e) { - process.stdout.write(`Error: ${e} while trying to remove leftover ENIs\n`); + log.error(`Error: ${e} while trying to remove leftover ENIs\n`); } - process.stdout.write('Removing integration tests CloudFormation stack...\n'); + log.notice('Removing integration tests CloudFormation stack...'); await awsRequest('CloudFormation', 'deleteStack', { StackName: SHARED_INFRA_TESTS_CLOUDFORMATION_STACK, }); await awsRequest('CloudFormation', 'waitFor', 'stackDeleteComplete', { StackName: SHARED_INFRA_TESTS_CLOUDFORMATION_STACK, }); - process.stdout.write('Removed integration tests CloudFormation stack!\n'); - process.stdout.write('Removing MSK Cluster configuration...\n'); + log.notice('Removed integration tests CloudFormation stack!'); + log.notice('Removing MSK Cluster configuration...'); await awsRequest('Kafka', 'deleteConfiguration', { Arn: clusterConfArn, }); - process.stdout.write('Removed MSK Cluster configuration\n'); - process.stdout.write('Teardown of integration infrastructure finished\n'); + log.notice('Removed MSK Cluster configuration'); + log.notice('Teardown of integration infrastructure finished'); })(); diff --git a/scripts/test/setup-integration-infra.js b/scripts/test/setup-integration-infra.js deleted file mode 100755 index e89036297d3..00000000000 --- a/scripts/test/setup-integration-infra.js +++ /dev/null @@ -1,64 +0,0 @@ -'use strict'; - -const awsRequest = require('@serverless/test/aws-request'); -const fs = require('fs'); -const path = require('path'); -const { SHARED_INFRA_TESTS_CLOUDFORMATION_STACK } = require('../../test/utils/cludformation'); - -(async () => { - process.stdout.write('Starting setup of integration infrastructure...\n'); - const cfnTemplate = fs.readFileSync(path.join(__dirname, 'cloudformation.yml'), 'utf8'); - const kafkaServerProperties = fs.readFileSync(path.join(__dirname, 'kafka.server.properties')); - - process.stdout.write('Checking if integration tests CloudFormation stack already exists...\n'); - try { - await awsRequest('CloudFormation', 'describeStacks', { - StackName: SHARED_INFRA_TESTS_CLOUDFORMATION_STACK, - }); - process.stdout.write('Integration tests CloudFormation stack already exists. Quitting.\n'); - return; - } catch (e) { - process.stdout.write('Integration tests CloudFormation does not exist. Continuing.\n'); - } - - const clusterName = 'integration-tests-msk-cluster'; - const clusterConfName = 'integration-tests-msk-cluster-configuration'; - - process.stdout.write('Creating MSK Cluster configuration...\n'); - let clusterConfResponse; - try { - clusterConfResponse = await awsRequest('Kafka', 'createConfiguration', { - Name: clusterConfName, - ServerProperties: kafkaServerProperties, - KafkaVersions: ['2.2.1'], - }); - } catch (e) { - process.stdout.write( - `Error: ${e} while trying to create MSK Cluster configuration. Quitting. \n` - ); - return; - } - - const clusterConfigurationArn = clusterConfResponse.Arn; - const clusterConfigurationRevision = clusterConfResponse.LatestRevision.Revision.toString(); - - process.stdout.write('Deploying integration tests CloudFormation stack...\n'); - await awsRequest('CloudFormation', 'createStack', { - StackName: SHARED_INFRA_TESTS_CLOUDFORMATION_STACK, - TemplateBody: cfnTemplate, - Parameters: [ - { ParameterKey: 'ClusterName', ParameterValue: clusterName }, - { ParameterKey: 'ClusterConfigurationArn', ParameterValue: clusterConfigurationArn }, - { - ParameterKey: 'ClusterConfigurationRevision', - ParameterValue: clusterConfigurationRevision, - }, - ], - }); - - await awsRequest('CloudFormation', 'waitFor', 'stackCreateComplete', { - StackName: SHARED_INFRA_TESTS_CLOUDFORMATION_STACK, - }); - process.stdout.write('Deployed integration tests CloudFormation stack!\n'); - process.stdout.write('Setup of integration infrastructure finished\n'); -})(); diff --git a/test/fixtures/functionMsk/core.js b/test/fixtures/functionMsk/core.js index 6c7389e4a08..93ff87cb512 100644 --- a/test/fixtures/functionMsk/core.js +++ b/test/fixtures/functionMsk/core.js @@ -1,9 +1,5 @@ 'use strict'; -// NOTE: the `utils.js` file is bundled into the deployment package -// eslint-disable-next-line -const { log } = require('./utils'); - // NOTE: `kafkajs` is bundled into the deployment package // eslint-disable-next-line const { Kafka } = require('kafkajs'); @@ -14,7 +10,8 @@ function consumer(event, context, callback) { const messages = Object.values(records)[0].map(record => Buffer.from(record.value, 'base64').toString() ); - log(functionName, JSON.stringify(messages)); + // eslint-disable-next-line + console.log(functionName, JSON.stringify(messages)); return callback(null, event); } diff --git a/test/fixtures/functionMsk/package.json b/test/fixtures/functionMsk/package.json index ab1a46cd75c..86a9cdb040e 100644 --- a/test/fixtures/functionMsk/package.json +++ b/test/fixtures/functionMsk/package.json @@ -1,13 +1,4 @@ { - "name": "functionMsk", - "version": "1.0.0", - "description": "", - "main": "core.js", - "scripts": { - "test": "echo \"Error: no test specified\" && exit 1" - }, - "author": "", - "license": "MIT", "dependencies": { "kafkajs": "^1.13.0" } diff --git a/test/fixtures/functionMsk/serverless.yml b/test/fixtures/functionMsk/serverless.yml index 85d0254a8ea..02a66625baf 100644 --- a/test/fixtures/functionMsk/serverless.yml +++ b/test/fixtures/functionMsk/serverless.yml @@ -1,6 +1,7 @@ service: service configValidationMode: error +frameworkVersion: '*' # VPC and Events configuration is added dynamically during test run # Because it has to be provisioned separately via CloudFormation stack diff --git a/test/fixtures/functionMsk/utils.js b/test/fixtures/functionMsk/utils.js deleted file mode 100644 index f4a99e00afa..00000000000 --- a/test/fixtures/functionMsk/utils.js +++ /dev/null @@ -1,22 +0,0 @@ -'use strict'; - -const logger = console; - -function getMarkers(functionName) { - return { - start: `--- START ${functionName} ---`, - end: `--- END ${functionName} ---`, - }; -} - -function log(functionName, message) { - const markers = getMarkers(functionName); - logger.log(markers.start); - logger.log(message); - logger.log(markers.end); -} - -module.exports = { - getMarkers, - log, -}; diff --git a/test/integration/msk.test.js b/test/integration/infra-dependent/msk.test.js similarity index 70% rename from test/integration/msk.test.js rename to test/integration/infra-dependent/msk.test.js index d65f20d6d14..57e0836b8e6 100644 --- a/test/integration/msk.test.js +++ b/test/integration/infra-dependent/msk.test.js @@ -2,16 +2,16 @@ const { expect } = require('chai'); const log = require('log').get('serverless:test'); -const fixtures = require('../fixtures'); -const { confirmCloudWatchLogs } = require('../utils/misc'); +const fixtures = require('../../fixtures'); +const { confirmCloudWatchLogs } = require('../../utils/misc'); const { isDependencyStackAvailable, getDependencyStackOutputMap, -} = require('../utils/cludformation'); +} = require('../../utils/cludformation'); const awsRequest = require('@serverless/test/aws-request'); const crypto = require('crypto'); -const { deployService, removeService } = require('../utils/integration'); +const { deployService, removeService } = require('../../utils/integration'); describe('AWS - MSK Integration Test', function() { this.timeout(1000 * 60 * 100); // Involves time-taking deploys @@ -21,20 +21,17 @@ describe('AWS - MSK Integration Test', function() { const topicName = `msk-topic-${crypto.randomBytes(8).toString('hex')}`; - before(async function beforeHook() { + before(async () => { const isDepsStackAvailable = await isDependencyStackAvailable(); if (!isDepsStackAvailable) { - log.notice( - 'CloudFormation stack with integration test dependencies not found. Skipping test.' - ); - this.skip(); + throw new Error('CloudFormation stack with integration test dependencies not found.'); } const outputMap = await getDependencyStackOutputMap(); log.notice('Getting MSK Boostrap Brokers URLs...'); const getBootstrapBrokersResponse = await awsRequest('Kafka', 'getBootstrapBrokers', { - ClusterArn: outputMap.MSKCluster, + ClusterArn: outputMap.get('MSKCluster'), }); const brokerUrls = getBootstrapBrokersResponse.BootstrapBrokerStringTls; @@ -43,8 +40,8 @@ describe('AWS - MSK Integration Test', function() { functions: { producer: { vpc: { - subnetIds: [outputMap.PrivateSubnetA], - securityGroupIds: [outputMap.SecurityGroup], + subnetIds: [outputMap.get('PrivateSubnetA')], + securityGroupIds: [outputMap.get('SecurityGroup')], }, environment: { TOPIC_NAME: topicName, @@ -55,7 +52,7 @@ describe('AWS - MSK Integration Test', function() { events: [ { msk: { - arn: outputMap.MSKCluster, + arn: outputMap.get('MSKCluster'), topic: topicName, }, }, @@ -69,13 +66,11 @@ describe('AWS - MSK Integration Test', function() { const serviceName = serviceData.serviceConfig.service; stackName = `${serviceName}-${stage}`; - log.notice(`Deploying "${stackName}" service...`); await deployService(servicePath); }); after(async () => { if (servicePath) { - log.notice('Removing service...'); await removeService(servicePath); } }); @@ -84,7 +79,7 @@ describe('AWS - MSK Integration Test', function() { const functionName = 'consumer'; const message = 'Hello from MSK Integration test!'; - return confirmCloudWatchLogs( + const events = await confirmCloudWatchLogs( `/aws/lambda/${stackName}-${functionName}`, async () => await awsRequest('Lambda', 'invoke', { @@ -92,10 +87,10 @@ describe('AWS - MSK Integration Test', function() { InvocationType: 'RequestResponse', }), { timeout: 120 * 1000 } - ).then(events => { - const logs = events.reduce((data, event) => data + event.message, ''); - expect(logs).to.include(functionName); - expect(logs).to.include(message); - }); + ); + + const logs = events.reduce((data, event) => data + event.message, ''); + expect(logs).to.include(functionName); + expect(logs).to.include(message); }); }); diff --git a/test/utils/cludformation.js b/test/utils/cludformation.js index 3bad72af176..26fcd027870 100644 --- a/test/utils/cludformation.js +++ b/test/utils/cludformation.js @@ -69,7 +69,10 @@ async function doesStackWithNameAndStatusExists(name, status) { } return false; } catch (e) { - return false; + if (e.code === 'ValidationError') { + return false; + } + throw e; } } @@ -78,10 +81,11 @@ async function getStackOutputMap(name) { StackName: name, }); - return describeStackResponse.Stacks[0].Outputs.reduce((map, output) => { - map[output.OutputKey] = output.OutputValue; - return map; - }, {}); + const outputsMap = new Map(); + for (const { OutputKey: key, OutputValue: value } of describeStackResponse.Stacks[0].Outputs) { + outputsMap.set(key, value); + } + return outputsMap; } async function isDependencyStackAvailable() {