Skip to content

Commit

Permalink
chore: minor adjustments
Browse files Browse the repository at this point in the history
  • Loading branch information
pgrzesik committed Sep 21, 2020
1 parent f769789 commit 21fe37e
Show file tree
Hide file tree
Showing 16 changed files with 152 additions and 177 deletions.
3 changes: 0 additions & 3 deletions .gitignore
Expand Up @@ -5,6 +5,3 @@
/node_modules
npm-debug.log
/package-lock.json

/test/fixtures/**/node_modules
/test/fixtures/**/package-lock.json
5 changes: 5 additions & 0 deletions docs/providers/aws/events/msk.md
Expand Up @@ -14,6 +14,11 @@ layout: Doc

# MSK

[Amazon Managed Streaming for Apache Kafka (Amazon MSK)](https://aws.amazon.com/msk/) is a fully managed streaming service that uses Apache Kafka.
Amazon MSK can be used as event source for Lambda, which allows Lambda service to internally poll it for new messages and invoke corresponding Lambda functions.

## Simple event definition

In the following example, we specify that the `compute` function should be triggered whenever there are new messages available to consume from defined Kafka `topic`.

In order to configure `msk` event, you have to provide two required properties: `arn`, which represents an ARN of MSK cluster and `topic` to consume messages from.
Expand Down
7 changes: 2 additions & 5 deletions lib/plugins/aws/lib/naming.js
Expand Up @@ -412,11 +412,8 @@ module.exports = {
getMSKEventLogicalId(functionName, clusterName, topicName) {
const normalizedFunctionName = this.getNormalizedFunctionName(functionName);
// Both clusterName and topicName are trimmed to 79 chars to avoid going over 255 character limit
const normalizedClusterName = this.normalizeNameToAlphaNumericOnly(clusterName).substring(
0,
79
);
const normalizedTopicName = this.normalizeNameToAlphaNumericOnly(topicName).substring(0, 79);
const normalizedClusterName = this.normalizeNameToAlphaNumericOnly(clusterName).slice(0, 79);
const normalizedTopicName = this.normalizeNameToAlphaNumericOnly(topicName).slice(0, 79);
return `${normalizedFunctionName}EventSourceMappingMSK${normalizedClusterName}${normalizedTopicName}`;
},

Expand Down
63 changes: 31 additions & 32 deletions lib/plugins/aws/package/compile/events/msk/index.test.js
Expand Up @@ -20,8 +20,8 @@ describe('AwsCompileMSKEvents', () => {
let defaultIamRole;
let naming;

before(() =>
runServerless({
before(async () => {
const { awsNaming, cfTemplate } = await runServerless({
fixture: 'function',
configExt: {
functions: {
Expand Down Expand Up @@ -51,15 +51,14 @@ describe('AwsCompileMSKEvents', () => {
},
},
cliArgs: ['package'],
}).then(({ awsNaming, cfTemplate }) => {
naming = awsNaming;
minimalEventSourceMappingResource =
cfTemplate.Resources[naming.getMSKEventLogicalId('foo', 'ClusterName', 'TestingTopic')];
allParamsEventSourceMappingResource =
cfTemplate.Resources[naming.getMSKEventLogicalId('other', 'ClusterName', 'TestingTopic')];
defaultIamRole = cfTemplate.Resources.IamRoleLambdaExecution;
})
);
});
naming = awsNaming;
minimalEventSourceMappingResource =
cfTemplate.Resources[naming.getMSKEventLogicalId('foo', 'ClusterName', 'TestingTopic')];
allParamsEventSourceMappingResource =
cfTemplate.Resources[naming.getMSKEventLogicalId('other', 'ClusterName', 'TestingTopic')];
defaultIamRole = cfTemplate.Resources.IamRoleLambdaExecution;
});

it('should correctly compile EventSourceMapping resource properties with minimal configuration', () => {
expect(minimalEventSourceMappingResource.Properties).to.deep.equal({
Expand Down Expand Up @@ -115,30 +114,30 @@ describe('AwsCompileMSKEvents', () => {
});

describe('when no msk events are defined', () => {
it('should not modify the default IAM role', () => {
return runServerless({
it('should not modify the default IAM role', async () => {
const { cfTemplate } = await runServerless({
fixture: 'function',
cliArgs: ['package'],
}).then(({ cfTemplate }) => {
const defaultIamRole = cfTemplate.Resources.IamRoleLambdaExecution;
expect(defaultIamRole.Properties.Policies[0].PolicyDocument.Statement).not.to.deep.include({
Effect: 'Allow',
Action: ['kafka:DescribeCluster', 'kafka:GetBootstrapBrokers'],
Resource: [],
});
});

expect(defaultIamRole.Properties.Policies[0].PolicyDocument.Statement).not.to.deep.include({
Effect: 'Allow',
Action: [
'ec2:CreateNetworkInterface',
'ec2:DescribeNetworkInterfaces',
'ec2:DescribeVpcs',
'ec2:DeleteNetworkInterface',
'ec2:DescribeSubnets',
'ec2:DescribeSecurityGroups',
],
Resource: '*',
});
const defaultIamRole = cfTemplate.Resources.IamRoleLambdaExecution;
expect(defaultIamRole.Properties.Policies[0].PolicyDocument.Statement).not.to.deep.include({
Effect: 'Allow',
Action: ['kafka:DescribeCluster', 'kafka:GetBootstrapBrokers'],
Resource: [],
});

expect(defaultIamRole.Properties.Policies[0].PolicyDocument.Statement).not.to.deep.include({
Effect: 'Allow',
Action: [
'ec2:CreateNetworkInterface',
'ec2:DescribeNetworkInterfaces',
'ec2:DescribeVpcs',
'ec2:DeleteNetworkInterface',
'ec2:DescribeSubnets',
'ec2:DescribeSecurityGroups',
],
Resource: '*',
});
});
});
Expand Down
5 changes: 3 additions & 2 deletions package.json
Expand Up @@ -87,6 +87,7 @@
"jszip": "^3.5.0",
"lint-staged": "^10.4.0",
"log": "^6.0.0",
"log-node": "^7.0.0",
"mocha": "^6.2.3",
"mock-require": "^3.0.3",
"nyc": "^15.1.0",
Expand Down Expand Up @@ -186,8 +187,8 @@
"integration-test-run-all": "mocha-isolated --pass-through-aws-creds --skip-fs-cleanup-check --max-workers=20 \"test/integration/**/*.test.js\"",
"integration-test-run-basic": "mocha test/integrationBasic.test.js",
"integration-test-run-package": "mocha-isolated --skip-fs-cleanup-check test/integrationPackage/**/*.tests.js",
"integration-test-setup-infrastructure": "node ./scripts/test/setup-integration-infra.js",
"integration-test-teardown-infrastructure": "node ./scripts/test/teardown-integration-infra.js",
"integration-test-setup": "node ./scripts/test/integration-setup/index.js",
"integration-test-teardown": "node ./scripts/test/integration-teardown.js",
"lint": "eslint .",
"lint:updated": "pipe-git-updated --ext=js -- eslint",
"pkg:build": "node ./scripts/pkg/build.js",
Expand Down
File renamed without changes.
68 changes: 68 additions & 0 deletions scripts/test/integration-setup/index.js
@@ -0,0 +1,68 @@
#!/usr/bin/env node

'use strict';

require('essentials');
require('log-node')();

const log = require('log').get('serverless:scripts');
const awsRequest = require('@serverless/test/aws-request');
const fsPromises = require('fs').promises;
const path = require('path');
const { SHARED_INFRA_TESTS_CLOUDFORMATION_STACK } = require('../../../test/utils/cludformation');

(async () => {
log.notice('Starting setup of integration infrastructure...');

const [cfnTemplate, kafkaServerProperties] = await Promise.all([
fsPromises.readFile(path.join(__dirname, 'cloudformation.yml'), 'utf8'),
fsPromises.readFile(path.join(__dirname, 'kafka.server.properties')),
]);

log.notice('Checking if integration tests CloudFormation stack already exists...');
try {
await awsRequest('CloudFormation', 'describeStacks', {
StackName: SHARED_INFRA_TESTS_CLOUDFORMATION_STACK,
});
log.error('Integration tests CloudFormation stack already exists. Quitting.');
return;
} catch (e) {
if (e.code !== 'ValidationError') {
throw e;
}
}
log.notice('Integration tests CloudFormation does not exist. Continuing.');

const clusterName = 'integration-tests-msk-cluster';
const clusterConfName = 'integration-tests-msk-cluster-configuration';

log.notice('Creating MSK Cluster configuration...');
const clusterConfResponse = await awsRequest('Kafka', 'createConfiguration', {
Name: clusterConfName,
ServerProperties: kafkaServerProperties,
KafkaVersions: ['2.2.1'],
});

const clusterConfigurationArn = clusterConfResponse.Arn;
const clusterConfigurationRevision = clusterConfResponse.LatestRevision.Revision.toString();

log.notice('Deploying integration tests CloudFormation stack...');
await awsRequest('CloudFormation', 'createStack', {
StackName: SHARED_INFRA_TESTS_CLOUDFORMATION_STACK,
TemplateBody: cfnTemplate,
Parameters: [
{ ParameterKey: 'ClusterName', ParameterValue: clusterName },
{ ParameterKey: 'ClusterConfigurationArn', ParameterValue: clusterConfigurationArn },
{
ParameterKey: 'ClusterConfigurationRevision',
ParameterValue: clusterConfigurationRevision,
},
],
});

await awsRequest('CloudFormation', 'waitFor', 'stackCreateComplete', {
StackName: SHARED_INFRA_TESTS_CLOUDFORMATION_STACK,
});
log.notice('Deployed integration tests CloudFormation stack!');
log.notice('Setup of integration infrastructure finished');
})();
File renamed without changes.
@@ -1,25 +1,31 @@
#!/usr/bin/env node

'use strict';

require('essentials');
require('log-node')();

const log = require('log').get('serverless:scripts');
const awsRequest = require('@serverless/test/aws-request');
const {
SHARED_INFRA_TESTS_CLOUDFORMATION_STACK,
getDependencyStackOutputMap,
} = require('../../test/utils/cludformation');

(async () => {
process.stdout.write('Starting teardown of integration infrastructure...\n');
log.notice('Starting teardown of integration infrastructure...');
const describeClustersResponse = await awsRequest('Kafka', 'listClusters');
const clusterConfArn =
describeClustersResponse.ClusterInfoList[0].CurrentBrokerSoftwareInfo.ConfigurationArn;

const outputMap = await getDependencyStackOutputMap();

process.stdout.write('Removing leftover ENI...\n');
log.notice('Removing leftover ENI...');
const describeResponse = await awsRequest('EC2', 'describeNetworkInterfaces', {
Filters: [
{
Name: 'vpc-id',
Values: [outputMap.VPC],
Values: [outputMap.get('VPC')],
},
{
Name: 'status',
Expand All @@ -36,20 +42,20 @@ const {
)
);
} catch (e) {
process.stdout.write(`Error: ${e} while trying to remove leftover ENIs\n`);
log.error(`Error: ${e} while trying to remove leftover ENIs\n`);
}
process.stdout.write('Removing integration tests CloudFormation stack...\n');
log.notice('Removing integration tests CloudFormation stack...');
await awsRequest('CloudFormation', 'deleteStack', {
StackName: SHARED_INFRA_TESTS_CLOUDFORMATION_STACK,
});
await awsRequest('CloudFormation', 'waitFor', 'stackDeleteComplete', {
StackName: SHARED_INFRA_TESTS_CLOUDFORMATION_STACK,
});
process.stdout.write('Removed integration tests CloudFormation stack!\n');
process.stdout.write('Removing MSK Cluster configuration...\n');
log.notice('Removed integration tests CloudFormation stack!');
log.notice('Removing MSK Cluster configuration...');
await awsRequest('Kafka', 'deleteConfiguration', {
Arn: clusterConfArn,
});
process.stdout.write('Removed MSK Cluster configuration\n');
process.stdout.write('Teardown of integration infrastructure finished\n');
log.notice('Removed MSK Cluster configuration');
log.notice('Teardown of integration infrastructure finished');
})();
64 changes: 0 additions & 64 deletions scripts/test/setup-integration-infra.js

This file was deleted.

7 changes: 2 additions & 5 deletions test/fixtures/functionMsk/core.js
@@ -1,9 +1,5 @@
'use strict';

// NOTE: the `utils.js` file is bundled into the deployment package
// eslint-disable-next-line
const { log } = require('./utils');

// NOTE: `kafkajs` is bundled into the deployment package
// eslint-disable-next-line
const { Kafka } = require('kafkajs');
Expand All @@ -14,7 +10,8 @@ function consumer(event, context, callback) {
const messages = Object.values(records)[0].map(record =>
Buffer.from(record.value, 'base64').toString()
);
log(functionName, JSON.stringify(messages));
// eslint-disable-next-line
console.log(functionName, JSON.stringify(messages));
return callback(null, event);
}

Expand Down
9 changes: 0 additions & 9 deletions test/fixtures/functionMsk/package.json
@@ -1,13 +1,4 @@
{
"name": "functionMsk",
"version": "1.0.0",
"description": "",
"main": "core.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"author": "",
"license": "MIT",
"dependencies": {
"kafkajs": "^1.13.0"
}
Expand Down
1 change: 1 addition & 0 deletions test/fixtures/functionMsk/serverless.yml
@@ -1,6 +1,7 @@
service: service

configValidationMode: error
frameworkVersion: '*'

# VPC and Events configuration is added dynamically during test run
# Because it has to be provisioned separately via CloudFormation stack
Expand Down
22 changes: 0 additions & 22 deletions test/fixtures/functionMsk/utils.js

This file was deleted.

0 comments on commit 21fe37e

Please sign in to comment.