Skip to content

Commit

Permalink
Add integration tests for MSK integration
Browse files Browse the repository at this point in the history
  • Loading branch information
pgrzesik committed Sep 15, 2020
1 parent 0163e87 commit 8a88c9e
Show file tree
Hide file tree
Showing 8 changed files with 389 additions and 0 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Expand Up @@ -5,3 +5,6 @@
/node_modules
npm-debug.log
/package-lock.json

/test/fixtures/**/node_modules
/test/fixtures/**/package-lock.json
43 changes: 43 additions & 0 deletions test/fixtures/functionMsk/core.js
@@ -0,0 +1,43 @@
'use strict';

// NOTE: the `utils.js` file is bundled into the deployment package
// eslint-disable-next-line
const { log } = require('./utils');

const { Kafka } = require('kafkajs');

function consumer(event, context, callback) {
const functionName = 'consumer';
const { records } = event;
const messages = Object.values(records)[0].map(record =>
Buffer.from(record.value, 'base64').toString()
);
log(functionName, JSON.stringify(messages));
return callback(null, event);
}

async function producer() {
const kafkaBrokers = process.env.BROKER_URLS.split(',');
const kafkaTopic = process.env.TOPIC_NAME;

const kafka = new Kafka({
clientId: 'myapp',
brokers: kafkaBrokers,
ssl: true,
});

const kafkaProducer = kafka.producer();
await kafkaProducer.connect();
await kafkaProducer.send({
topic: kafkaTopic,
messages: [{ value: 'Hello from MSK Integration test!' }],
});

await kafkaProducer.disconnect();

return {
statusCode: 200,
};
}

module.exports = { producer, consumer };
14 changes: 14 additions & 0 deletions test/fixtures/functionMsk/package.json
@@ -0,0 +1,14 @@
{
"name": "functionMsk",
"version": "1.0.0",
"description": "",
"main": "core.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"author": "",
"license": "MIT",
"dependencies": {
"kafkajs": "^1.13.0"
}
}
17 changes: 17 additions & 0 deletions test/fixtures/functionMsk/serverless.yml
@@ -0,0 +1,17 @@
service: service

configValidationMode: error

# VPC and Events configuration is added dynamically during test run
# Because it has to be provisioned separately via CloudFormation stack

provider:
name: aws
runtime: nodejs12.x
versionFunctions: false

functions:
producer:
handler: core.producer
consumer:
handler: core.consumer
22 changes: 22 additions & 0 deletions test/fixtures/functionMsk/utils.js
@@ -0,0 +1,22 @@
'use strict';

const logger = console;

function getMarkers(functionName) {
return {
start: `--- START ${functionName} ---`,
end: `--- END ${functionName} ---`,
};
}

function log(functionName, message) {
const markers = getMarkers(functionName);
logger.log(markers.start);
logger.log(message);
logger.log(markers.end);
}

module.exports = {
getMarkers,
log,
};
151 changes: 151 additions & 0 deletions test/integration/msk/cloudformation.yml
@@ -0,0 +1,151 @@
AWSTemplateFormatVersion: 2010-09-09

Parameters:
ClusterName:
Type: String
Description: Name of MSK Cluster
ClusterConfigurationArn:
Type: String
Description: MSK Cluster Configuration ARN
ClusterConfigurationRevision:
Type: Number
Description: MSK Cluster Configuration Revision number
Default: 1

Resources:
VPC:
Type: AWS::EC2::VPC
Properties:
CidrBlock: 172.31.0.0/16
Tags:
- Key: Name
Value: !Ref AWS::StackName

PublicSubnet:
Type: AWS::EC2::Subnet
Properties:
VpcId: !Ref VPC
AvailabilityZone:
Fn::Select:
- 0
- Fn::GetAZs: ''
CidrBlock: 172.31.0.0/24
MapPublicIpOnLaunch: true

PrivateSubnetA:
Type: AWS::EC2::Subnet
Properties:
VpcId: !Ref VPC
AvailabilityZone:
Fn::Select:
- 0
- Fn::GetAZs: ''
CidrBlock: 172.31.3.0/24
MapPublicIpOnLaunch: false

PrivateSubnetB:
Type: AWS::EC2::Subnet
Properties:
VpcId: !Ref VPC
AvailabilityZone:
Fn::Select:
- 1
- Fn::GetAZs: ''
CidrBlock: 172.31.2.0/24
MapPublicIpOnLaunch: false

InternetGateway:
Type: AWS::EC2::InternetGateway

GatewayAttachment:
Type: AWS::EC2::VPCGatewayAttachment
Properties:
VpcId: !Ref VPC
InternetGatewayId: !Ref InternetGateway

PublicRouteTable:
Type: AWS::EC2::RouteTable
Properties:
VpcId: !Ref VPC

PublicRoute:
Type: AWS::EC2::Route
DependsOn: GatewayAttachment
Properties:
RouteTableId: !Ref PublicRouteTable
DestinationCidrBlock: 0.0.0.0/0
GatewayId: !Ref InternetGateway

PublicSubnetRouteTableAssociation:
Type: AWS::EC2::SubnetRouteTableAssociation
Properties:
SubnetId: !Ref PublicSubnet
RouteTableId: !Ref PublicRouteTable

NatGateway:
Type: AWS::EC2::NatGateway
DependsOn: NatPublicIP
Properties:
AllocationId: !GetAtt NatPublicIP.AllocationId
SubnetId: !Ref PublicSubnet

NatPublicIP:
Type: AWS::EC2::EIP
DependsOn: VPC
Properties:
Domain: vpc

PrivateRouteTable:
Type: AWS::EC2::RouteTable
Properties:
VpcId: !Ref VPC

PrivateRoute:
Type: AWS::EC2::Route
Properties:
RouteTableId: !Ref PrivateRouteTable
DestinationCidrBlock: 0.0.0.0/0
NatGatewayId: !Ref NatGateway

PrivateSubnetARouteTableAssociation:
Type: AWS::EC2::SubnetRouteTableAssociation
Properties:
SubnetId: !Ref PrivateSubnetA
RouteTableId: !Ref PrivateRouteTable

PrivateSubnetBRouteTableAssociation:
Type: AWS::EC2::SubnetRouteTableAssociation
Properties:
SubnetId: !Ref PrivateSubnetB
RouteTableId: !Ref PrivateRouteTable

MSKCluster:
Type: 'AWS::MSK::Cluster'
Properties:
ClusterName: !Ref ClusterName
KafkaVersion: 2.2.1
NumberOfBrokerNodes: 2
BrokerNodeGroupInfo:
InstanceType: kafka.t3.small
ClientSubnets:
- !Ref PrivateSubnetA
- !Ref PrivateSubnetB
StorageInfo:
EBSStorageInfo:
VolumeSize: 1
ConfigurationInfo:
Arn: !Ref ClusterConfigurationArn
Revision: !Ref ClusterConfigurationRevision

Outputs:
PrivateSubnetA:
Description: Private Subnet A ID
Value: !Ref PrivateSubnetA

SecurityGroup:
Description: Default security for Lambda VPC
Value: !GetAtt VPC.DefaultSecurityGroup

MSKCluster:
Description: Created MSK Cluster
Value: !Ref MSKCluster
137 changes: 137 additions & 0 deletions test/integration/msk/index.test.js
@@ -0,0 +1,137 @@
'use strict';

const path = require('path');
const { expect } = require('chai');
const log = require('log').get('serverless:test');
const fixtures = require('../../fixtures');
const { confirmCloudWatchLogs } = require('../../utils/misc');

const awsRequest = require('@serverless/test/aws-request');
const fs = require('fs');
const crypto = require('crypto');
const { deployService, removeService } = require('../../utils/integration');

describe('AWS - MSK Integration Test', function() {
this.timeout(1000 * 60 * 100); // Involves time-taking deploys
let stackName;
let servicePath;
let clusterConfigurationArn;
const stage = 'dev';

const suffix = crypto.randomBytes(8).toString('hex');
const resourcesStackName = `msk-integration-tests-deps-stack-${suffix}`;
const clusterConfName = `msk-cluster-configuration-${suffix}`;
const topicName = `msk-topic-${suffix}`;
const clusterName = `msk-integration-tests-msk-cluster-${suffix}`;

before(async () => {
const cfnTemplate = fs.readFileSync(path.join(__dirname, 'cloudformation.yml'), 'utf8');
const kafkaServerProperties = fs.readFileSync(path.join(__dirname, 'kafka.server.properties'));

log.notice('Creating MSK Cluster configuration...');
const clusterConfResponse = await awsRequest('Kafka', 'createConfiguration', {
Name: clusterConfName,
ServerProperties: kafkaServerProperties,
KafkaVersions: ['2.2.1'],
});

clusterConfigurationArn = clusterConfResponse.Arn;
const clusterConfigurationRevision = clusterConfResponse.LatestRevision.Revision.toString();

log.notice('Deploying CloudFormation stack with required resources...');
await awsRequest('CloudFormation', 'createStack', {
StackName: resourcesStackName,
TemplateBody: cfnTemplate,
Parameters: [
{ ParameterKey: 'ClusterName', ParameterValue: clusterName },
{ ParameterKey: 'ClusterConfigurationArn', ParameterValue: clusterConfigurationArn },
{
ParameterKey: 'ClusterConfigurationRevision',
ParameterValue: clusterConfigurationRevision,
},
],
});

const waitForResult = await awsRequest('CloudFormation', 'waitFor', 'stackCreateComplete', {
StackName: resourcesStackName,
});

const outputMap = waitForResult.Stacks[0].Outputs.reduce((map, output) => {
map[output.OutputKey] = output.OutputValue;
return map;
}, {});

log.notice('Getting MSK Boostrap Brokers URLs...');
const getBootstrapBrokersResponse = await awsRequest('Kafka', 'getBootstrapBrokers', {
ClusterArn: outputMap.MSKCluster,
});
const brokerUrls = getBootstrapBrokersResponse.BootstrapBrokerStringTls;

const serviceData = await fixtures.setup('functionMsk', {
configExt: {
functions: {
producer: {
vpc: {
subnetIds: [outputMap.PrivateSubnetA],
securityGroupIds: [outputMap.SecurityGroup],
},
environment: {
TOPIC_NAME: topicName,
BROKER_URLS: brokerUrls,
},
},
consumer: {
events: [
{
msk: {
arn: outputMap.MSKCluster,
topic: topicName,
},
},
],
},
},
},
});

({ servicePath } = serviceData);

const serviceName = serviceData.serviceConfig.service;
stackName = `${serviceName}-${stage}`;
log.notice(`Deploying "${stackName}" service...`);
await deployService(servicePath);
});

after(async () => {
log.notice('Removing service...');
await removeService(servicePath);
log.notice('Removing CloudFormation stack with required resources...');
await awsRequest('CloudFormation', 'deleteStack', { StackName: resourcesStackName });
await awsRequest('CloudFormation', 'waitFor', 'stackDeleteComplete', {
StackName: resourcesStackName,
});
log.notice('Removing MSK Cluster configuration...');
return awsRequest('Kafka', 'deleteConfiguration', {
Arn: clusterConfigurationArn,
});
});

it('correctly processes messages from MSK topic', async () => {
const functionName = 'consumer';
const message = 'Hello from MSK Integration test!';

return confirmCloudWatchLogs(
`/aws/lambda/${stackName}-${functionName}`,
async () =>
await awsRequest('Lambda', 'invoke', {
FunctionName: `${stackName}-producer`,
InvocationType: 'RequestResponse',
}),
{ timeout: 120 * 1000 }
).then(events => {
const logs = events.reduce((data, event) => data + event.message, '');
expect(logs).to.include(functionName);
expect(logs).to.include(message);
});
});
});
2 changes: 2 additions & 0 deletions test/integration/msk/kafka.server.properties
@@ -0,0 +1,2 @@
auto.create.topics.enable=true
default.replication.factor=2

0 comments on commit 8a88c9e

Please sign in to comment.