From 8a88c9efdb145b3441f1bed351e1519feb1bceb0 Mon Sep 17 00:00:00 2001 From: Piotr Grzesik Date: Tue, 15 Sep 2020 17:05:06 +0200 Subject: [PATCH] Add integration tests for MSK integration --- .gitignore | 3 + test/fixtures/functionMsk/core.js | 43 ++++++ test/fixtures/functionMsk/package.json | 14 ++ test/fixtures/functionMsk/serverless.yml | 17 +++ test/fixtures/functionMsk/utils.js | 22 +++ test/integration/msk/cloudformation.yml | 151 +++++++++++++++++++ test/integration/msk/index.test.js | 137 +++++++++++++++++ test/integration/msk/kafka.server.properties | 2 + 8 files changed, 389 insertions(+) create mode 100644 test/fixtures/functionMsk/core.js create mode 100644 test/fixtures/functionMsk/package.json create mode 100644 test/fixtures/functionMsk/serverless.yml create mode 100644 test/fixtures/functionMsk/utils.js create mode 100644 test/integration/msk/cloudformation.yml create mode 100644 test/integration/msk/index.test.js create mode 100644 test/integration/msk/kafka.server.properties diff --git a/.gitignore b/.gitignore index 3e71f2a2d153..fac841ca046a 100755 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,6 @@ /node_modules npm-debug.log /package-lock.json + +/test/fixtures/**/node_modules +/test/fixtures/**/package-lock.json diff --git a/test/fixtures/functionMsk/core.js b/test/fixtures/functionMsk/core.js new file mode 100644 index 000000000000..2d7db774edde --- /dev/null +++ b/test/fixtures/functionMsk/core.js @@ -0,0 +1,43 @@ +'use strict'; + +// NOTE: the `utils.js` file is bundled into the deployment package +// eslint-disable-next-line +const { log } = require('./utils'); + +const { Kafka } = require('kafkajs'); + +function consumer(event, context, callback) { + const functionName = 'consumer'; + const { records } = event; + const messages = Object.values(records)[0].map(record => + Buffer.from(record.value, 'base64').toString() + ); + log(functionName, JSON.stringify(messages)); + return callback(null, event); +} + +async function producer() { + const kafkaBrokers = process.env.BROKER_URLS.split(','); + const kafkaTopic = process.env.TOPIC_NAME; + + const kafka = new Kafka({ + clientId: 'myapp', + brokers: kafkaBrokers, + ssl: true, + }); + + const kafkaProducer = kafka.producer(); + await kafkaProducer.connect(); + await kafkaProducer.send({ + topic: kafkaTopic, + messages: [{ value: 'Hello from MSK Integration test!' }], + }); + + await kafkaProducer.disconnect(); + + return { + statusCode: 200, + }; +} + +module.exports = { producer, consumer }; diff --git a/test/fixtures/functionMsk/package.json b/test/fixtures/functionMsk/package.json new file mode 100644 index 000000000000..ab1a46cd75c5 --- /dev/null +++ b/test/fixtures/functionMsk/package.json @@ -0,0 +1,14 @@ +{ + "name": "functionMsk", + "version": "1.0.0", + "description": "", + "main": "core.js", + "scripts": { + "test": "echo \"Error: no test specified\" && exit 1" + }, + "author": "", + "license": "MIT", + "dependencies": { + "kafkajs": "^1.13.0" + } +} diff --git a/test/fixtures/functionMsk/serverless.yml b/test/fixtures/functionMsk/serverless.yml new file mode 100644 index 000000000000..85d0254a8eac --- /dev/null +++ b/test/fixtures/functionMsk/serverless.yml @@ -0,0 +1,17 @@ +service: service + +configValidationMode: error + +# VPC and Events configuration is added dynamically during test run +# Because it has to be provisioned separately via CloudFormation stack + +provider: + name: aws + runtime: nodejs12.x + versionFunctions: false + +functions: + producer: + handler: core.producer + consumer: + handler: core.consumer diff --git a/test/fixtures/functionMsk/utils.js b/test/fixtures/functionMsk/utils.js new file mode 100644 index 000000000000..f4a99e00afa1 --- /dev/null +++ b/test/fixtures/functionMsk/utils.js @@ -0,0 +1,22 @@ +'use strict'; + +const logger = console; + +function getMarkers(functionName) { + return { + start: `--- START ${functionName} ---`, + end: `--- END ${functionName} ---`, + }; +} + +function log(functionName, message) { + const markers = getMarkers(functionName); + logger.log(markers.start); + logger.log(message); + logger.log(markers.end); +} + +module.exports = { + getMarkers, + log, +}; diff --git a/test/integration/msk/cloudformation.yml b/test/integration/msk/cloudformation.yml new file mode 100644 index 000000000000..789fea3667f3 --- /dev/null +++ b/test/integration/msk/cloudformation.yml @@ -0,0 +1,151 @@ +AWSTemplateFormatVersion: 2010-09-09 + +Parameters: + ClusterName: + Type: String + Description: Name of MSK Cluster + ClusterConfigurationArn: + Type: String + Description: MSK Cluster Configuration ARN + ClusterConfigurationRevision: + Type: Number + Description: MSK Cluster Configuration Revision number + Default: 1 + +Resources: + VPC: + Type: AWS::EC2::VPC + Properties: + CidrBlock: 172.31.0.0/16 + Tags: + - Key: Name + Value: !Ref AWS::StackName + + PublicSubnet: + Type: AWS::EC2::Subnet + Properties: + VpcId: !Ref VPC + AvailabilityZone: + Fn::Select: + - 0 + - Fn::GetAZs: '' + CidrBlock: 172.31.0.0/24 + MapPublicIpOnLaunch: true + + PrivateSubnetA: + Type: AWS::EC2::Subnet + Properties: + VpcId: !Ref VPC + AvailabilityZone: + Fn::Select: + - 0 + - Fn::GetAZs: '' + CidrBlock: 172.31.3.0/24 + MapPublicIpOnLaunch: false + + PrivateSubnetB: + Type: AWS::EC2::Subnet + Properties: + VpcId: !Ref VPC + AvailabilityZone: + Fn::Select: + - 1 + - Fn::GetAZs: '' + CidrBlock: 172.31.2.0/24 + MapPublicIpOnLaunch: false + + InternetGateway: + Type: AWS::EC2::InternetGateway + + GatewayAttachment: + Type: AWS::EC2::VPCGatewayAttachment + Properties: + VpcId: !Ref VPC + InternetGatewayId: !Ref InternetGateway + + PublicRouteTable: + Type: AWS::EC2::RouteTable + Properties: + VpcId: !Ref VPC + + PublicRoute: + Type: AWS::EC2::Route + DependsOn: GatewayAttachment + Properties: + RouteTableId: !Ref PublicRouteTable + DestinationCidrBlock: 0.0.0.0/0 + GatewayId: !Ref InternetGateway + + PublicSubnetRouteTableAssociation: + Type: AWS::EC2::SubnetRouteTableAssociation + Properties: + SubnetId: !Ref PublicSubnet + RouteTableId: !Ref PublicRouteTable + + NatGateway: + Type: AWS::EC2::NatGateway + DependsOn: NatPublicIP + Properties: + AllocationId: !GetAtt NatPublicIP.AllocationId + SubnetId: !Ref PublicSubnet + + NatPublicIP: + Type: AWS::EC2::EIP + DependsOn: VPC + Properties: + Domain: vpc + + PrivateRouteTable: + Type: AWS::EC2::RouteTable + Properties: + VpcId: !Ref VPC + + PrivateRoute: + Type: AWS::EC2::Route + Properties: + RouteTableId: !Ref PrivateRouteTable + DestinationCidrBlock: 0.0.0.0/0 + NatGatewayId: !Ref NatGateway + + PrivateSubnetARouteTableAssociation: + Type: AWS::EC2::SubnetRouteTableAssociation + Properties: + SubnetId: !Ref PrivateSubnetA + RouteTableId: !Ref PrivateRouteTable + + PrivateSubnetBRouteTableAssociation: + Type: AWS::EC2::SubnetRouteTableAssociation + Properties: + SubnetId: !Ref PrivateSubnetB + RouteTableId: !Ref PrivateRouteTable + + MSKCluster: + Type: 'AWS::MSK::Cluster' + Properties: + ClusterName: !Ref ClusterName + KafkaVersion: 2.2.1 + NumberOfBrokerNodes: 2 + BrokerNodeGroupInfo: + InstanceType: kafka.t3.small + ClientSubnets: + - !Ref PrivateSubnetA + - !Ref PrivateSubnetB + StorageInfo: + EBSStorageInfo: + VolumeSize: 1 + ConfigurationInfo: + Arn: !Ref ClusterConfigurationArn + Revision: !Ref ClusterConfigurationRevision + +Outputs: + PrivateSubnetA: + Description: Private Subnet A ID + Value: !Ref PrivateSubnetA + + SecurityGroup: + Description: Default security for Lambda VPC + Value: !GetAtt VPC.DefaultSecurityGroup + + MSKCluster: + Description: Created MSK Cluster + Value: !Ref MSKCluster diff --git a/test/integration/msk/index.test.js b/test/integration/msk/index.test.js new file mode 100644 index 000000000000..1f29565d2b3a --- /dev/null +++ b/test/integration/msk/index.test.js @@ -0,0 +1,137 @@ +'use strict'; + +const path = require('path'); +const { expect } = require('chai'); +const log = require('log').get('serverless:test'); +const fixtures = require('../../fixtures'); +const { confirmCloudWatchLogs } = require('../../utils/misc'); + +const awsRequest = require('@serverless/test/aws-request'); +const fs = require('fs'); +const crypto = require('crypto'); +const { deployService, removeService } = require('../../utils/integration'); + +describe('AWS - MSK Integration Test', function() { + this.timeout(1000 * 60 * 100); // Involves time-taking deploys + let stackName; + let servicePath; + let clusterConfigurationArn; + const stage = 'dev'; + + const suffix = crypto.randomBytes(8).toString('hex'); + const resourcesStackName = `msk-integration-tests-deps-stack-${suffix}`; + const clusterConfName = `msk-cluster-configuration-${suffix}`; + const topicName = `msk-topic-${suffix}`; + const clusterName = `msk-integration-tests-msk-cluster-${suffix}`; + + before(async () => { + const cfnTemplate = fs.readFileSync(path.join(__dirname, 'cloudformation.yml'), 'utf8'); + const kafkaServerProperties = fs.readFileSync(path.join(__dirname, 'kafka.server.properties')); + + log.notice('Creating MSK Cluster configuration...'); + const clusterConfResponse = await awsRequest('Kafka', 'createConfiguration', { + Name: clusterConfName, + ServerProperties: kafkaServerProperties, + KafkaVersions: ['2.2.1'], + }); + + clusterConfigurationArn = clusterConfResponse.Arn; + const clusterConfigurationRevision = clusterConfResponse.LatestRevision.Revision.toString(); + + log.notice('Deploying CloudFormation stack with required resources...'); + await awsRequest('CloudFormation', 'createStack', { + StackName: resourcesStackName, + TemplateBody: cfnTemplate, + Parameters: [ + { ParameterKey: 'ClusterName', ParameterValue: clusterName }, + { ParameterKey: 'ClusterConfigurationArn', ParameterValue: clusterConfigurationArn }, + { + ParameterKey: 'ClusterConfigurationRevision', + ParameterValue: clusterConfigurationRevision, + }, + ], + }); + + const waitForResult = await awsRequest('CloudFormation', 'waitFor', 'stackCreateComplete', { + StackName: resourcesStackName, + }); + + const outputMap = waitForResult.Stacks[0].Outputs.reduce((map, output) => { + map[output.OutputKey] = output.OutputValue; + return map; + }, {}); + + log.notice('Getting MSK Boostrap Brokers URLs...'); + const getBootstrapBrokersResponse = await awsRequest('Kafka', 'getBootstrapBrokers', { + ClusterArn: outputMap.MSKCluster, + }); + const brokerUrls = getBootstrapBrokersResponse.BootstrapBrokerStringTls; + + const serviceData = await fixtures.setup('functionMsk', { + configExt: { + functions: { + producer: { + vpc: { + subnetIds: [outputMap.PrivateSubnetA], + securityGroupIds: [outputMap.SecurityGroup], + }, + environment: { + TOPIC_NAME: topicName, + BROKER_URLS: brokerUrls, + }, + }, + consumer: { + events: [ + { + msk: { + arn: outputMap.MSKCluster, + topic: topicName, + }, + }, + ], + }, + }, + }, + }); + + ({ servicePath } = serviceData); + + const serviceName = serviceData.serviceConfig.service; + stackName = `${serviceName}-${stage}`; + log.notice(`Deploying "${stackName}" service...`); + await deployService(servicePath); + }); + + after(async () => { + log.notice('Removing service...'); + await removeService(servicePath); + log.notice('Removing CloudFormation stack with required resources...'); + await awsRequest('CloudFormation', 'deleteStack', { StackName: resourcesStackName }); + await awsRequest('CloudFormation', 'waitFor', 'stackDeleteComplete', { + StackName: resourcesStackName, + }); + log.notice('Removing MSK Cluster configuration...'); + return awsRequest('Kafka', 'deleteConfiguration', { + Arn: clusterConfigurationArn, + }); + }); + + it('correctly processes messages from MSK topic', async () => { + const functionName = 'consumer'; + const message = 'Hello from MSK Integration test!'; + + return confirmCloudWatchLogs( + `/aws/lambda/${stackName}-${functionName}`, + async () => + await awsRequest('Lambda', 'invoke', { + FunctionName: `${stackName}-producer`, + InvocationType: 'RequestResponse', + }), + { timeout: 120 * 1000 } + ).then(events => { + const logs = events.reduce((data, event) => data + event.message, ''); + expect(logs).to.include(functionName); + expect(logs).to.include(message); + }); + }); +}); diff --git a/test/integration/msk/kafka.server.properties b/test/integration/msk/kafka.server.properties new file mode 100644 index 000000000000..435f23ad9da6 --- /dev/null +++ b/test/integration/msk/kafka.server.properties @@ -0,0 +1,2 @@ +auto.create.topics.enable=true +default.replication.factor=2