Skip to content

Commit

Permalink
chore(aws): apply code review suggestions
Browse files Browse the repository at this point in the history
  • Loading branch information
pgrzesik committed Sep 2, 2020
1 parent e02889a commit e52c2c4
Show file tree
Hide file tree
Showing 6 changed files with 112 additions and 202 deletions.
2 changes: 1 addition & 1 deletion docs/providers/aws/events/msk.md
Expand Up @@ -8,7 +8,7 @@ layout: Doc

<!-- DOCS-SITE-LINK:START automatically generated -->

### [Read this on the main serverless docs site](https://www.serverless.com/framework/docs/providers/aws/events/cognito-user-pool)
### [Read this on the main serverless docs site](https://www.serverless.com/framework/docs/providers/aws/events/msk)

<!-- DOCS-SITE-LINK:END -->

Expand Down
13 changes: 8 additions & 5 deletions lib/plugins/aws/lib/naming.js
Expand Up @@ -408,11 +408,14 @@ module.exports = {

// MSK
getMSKEventLogicalId(functionName, clusterName, topicName) {
return `${this.getNormalizedFunctionName(
functionName
)}EventSourceMappingMSK${this.normalizeNameToAlphaNumericOnly(
clusterName
)}${this.normalizeNameToAlphaNumericOnly(topicName)}`;
const normalizedFunctionName = this.getNormalizedFunctionName(functionName);
// Both clusterName and topicName are trimmed to 79 chars to avoid going over 255 character limit
const normalizedClusterName = this.normalizeNameToAlphaNumericOnly(clusterName).substring(
0,
79
);
const normalizedTopicName = this.normalizeNameToAlphaNumericOnly(topicName).substring(0, 79);
return `${normalizedFunctionName}EventSourceMappingMSK${normalizedClusterName}${normalizedTopicName}`;
},

// ALB
Expand Down
@@ -0,0 +1,13 @@
'use strict';

const getMskClusterNameToken = eventSourceArn => {
if (eventSourceArn['Fn::ImportValue']) {
return eventSourceArn['Fn::ImportValue'];
} else if (eventSourceArn.Ref) {
return eventSourceArn.Ref;
}

return eventSourceArn.split('/')[1];
};

module.exports = getMskClusterNameToken;
@@ -0,0 +1,27 @@
'use strict';

const chai = require('chai');
const getMskClusterNameToken = require('./getMskClusterNameToken');

const { expect } = chai;

describe('getMskClusterNameToken', () => {
it('with ARN', () => {
const eventSourceArn =
'arn:aws:kafka:us-east-1:111111111111:cluster/ClusterName/a1a1a1a1a1a1a1a1a';
const result = getMskClusterNameToken(eventSourceArn);
expect(result).to.equal('ClusterName');
});

it('with Fn::ImportValue', () => {
const eventSourceArn = { 'Fn::ImportValue': 'importvalue' };
const result = getMskClusterNameToken(eventSourceArn);
expect(result).to.equal('importvalue');
});

it('with Ref', () => {
const eventSourceArn = { Ref: 'ReferencedResource' };
const result = getMskClusterNameToken(eventSourceArn);
expect(result).to.equal('ReferencedResource');
});
});
94 changes: 24 additions & 70 deletions lib/plugins/aws/package/compile/events/msk/index.js
@@ -1,5 +1,7 @@
'use strict';

const getMskClusterNameToken = require('./getMskClusterNameToken');

class AwsCompileMSKEvents {
constructor(serverless) {
this.serverless = serverless;
Expand All @@ -14,11 +16,7 @@ class AwsCompileMSKEvents {
properties: {
arn: {
oneOf: [
{
type: 'string',
pattern:
'^arn:aws[a-zA-Z-]*:kafka:[a-z]{2}((-gov)|(-iso(b?)))?-[a-z]+-[1-9]{1}:[0-9]{12}:cluster',
},
{ $ref: '#/definitions/awsArnString' },
{ $ref: '#/definitions/awsCfImport' },
{ $ref: '#/definitions/awsCfRef' },
],
Expand All @@ -44,51 +42,6 @@ class AwsCompileMSKEvents {
});
}

resolveDependsOn(funcRole) {
let dependsOn = 'IamRoleLambdaExecution';

if (funcRole) {
if (
// check whether the custom role is an ARN
typeof funcRole === 'string' &&
funcRole.indexOf(':') !== -1
) {
dependsOn = [];
} else if (
// otherwise, check if we have an in-service reference to a role ARN
typeof funcRole === 'object' &&
'Fn::GetAtt' in funcRole &&
Array.isArray(funcRole['Fn::GetAtt']) &&
funcRole['Fn::GetAtt'].length === 2 &&
typeof funcRole['Fn::GetAtt'][0] === 'string' &&
typeof funcRole['Fn::GetAtt'][1] === 'string' &&
funcRole['Fn::GetAtt'][1] === 'Arn'
) {
dependsOn = funcRole['Fn::GetAtt'][0];
} else if (
// otherwise, check if we have an import or parameters ref
typeof funcRole === 'object' &&
('Fn::ImportValue' in funcRole || 'Ref' in funcRole)
) {
dependsOn = [];
} else if (typeof funcRole === 'string') {
dependsOn = funcRole;
}
}

return dependsOn;
}

getMSKClusterName(eventSourceArn) {
if (eventSourceArn['Fn::ImportValue']) {
return eventSourceArn['Fn::ImportValue'];
} else if (eventSourceArn.Ref) {
return eventSourceArn.Ref;
}

return eventSourceArn.split('/')[1];
}

compileMSKEvents() {
this.serverless.service.getAllFunctions().forEach(functionName => {
const functionObj = this.serverless.service.getFunction(functionName);
Expand Down Expand Up @@ -116,46 +69,47 @@ class AwsCompileMSKEvents {

functionObj.events.forEach(event => {
if (event.msk) {
const EventSourceArn = event.msk.arn;
const Topic = event.msk.topic;
const BatchSize = event.msk.batchSize || 100;
const Enabled = event.msk.enabled != null ? event.msk.enabled : true;
const StartingPosition = event.msk.startingPosition || 'TRIM_HORIZON';
const eventSourceArn = event.msk.arn;
const topic = event.msk.topic;
const batchSize = event.msk.batchSize;
const enabled = event.msk.enabled;
const startingPosition = event.msk.startingPosition || 'TRIM_HORIZON';

const mskClusterName = this.getMSKClusterName(EventSourceArn);
const mskClusterNameToken = getMskClusterNameToken(eventSourceArn);
const mskEventLogicalId = this.provider.naming.getMSKEventLogicalId(
functionName,
mskClusterName,
Topic
mskClusterNameToken,
topic
);

const funcRole = functionObj.role || this.serverless.service.provider.role;
const dependsOn = this.resolveDependsOn(funcRole);
const dependsOn = this.provider.resolveFunctionIamRoleResourceName(functionObj) || [];

const lambdaLogicalId = this.provider.naming.getLambdaLogicalId(functionName);

const mskResource = {
Type: 'AWS::Lambda::EventSourceMapping',
DependsOn: dependsOn,
Properties: {
BatchSize,
EventSourceArn,
EventSourceArn: eventSourceArn,
FunctionName: {
'Fn::GetAtt': [lambdaLogicalId, 'Arn'],
},
StartingPosition,
Enabled,
Topics: [Topic],
StartingPosition: startingPosition,
Topics: [topic],
},
};

mskStatement.Resource.push(EventSourceArn);
if (batchSize) {
mskResource.Properties.BatchSize = batchSize;
}

const newMSKObject = {
[mskEventLogicalId]: mskResource,
};
if (enabled != null) {
mskResource.Properties.Enabled = enabled;
}

mskStatement.Resource.push(eventSourceArn);

Object.assign(cfTemplate.Resources, newMSKObject);
cfTemplate.Resources[mskEventLogicalId] = mskResource;
}
});

Expand Down

0 comments on commit e52c2c4

Please sign in to comment.