Skip to content

Commit

Permalink
feat(Config Schema): Schema for AWS stream event (#8201)
Browse files Browse the repository at this point in the history
  • Loading branch information
fredericbarthelet committed Sep 14, 2020
1 parent 6961b62 commit 1fb338b
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 313 deletions.
261 changes: 97 additions & 164 deletions lib/plugins/aws/package/compile/events/stream/index.js
Expand Up @@ -11,40 +11,85 @@ class AwsCompileStreamEvents {
'package:compileEvents': this.compileStreamEvents.bind(this),
};

// TODO: Complete schema, see https://github.com/serverless/serverless/issues/8034
this.serverless.configSchemaHandler.defineFunctionEvent('aws', 'stream', {
anyOf: [{ type: 'string' }, { type: 'object' }],
anyOf: [
{ $ref: '#/definitions/awsArnString' },
{
type: 'object',
properties: {
// arn constraints are listed in oneOf property of this schema
arn: {},
type: { enum: ['dynamodb', 'kinesis'] },
batchSize: { type: 'integer', minimum: 1, maximum: 10000 },
parallelizationFactor: { type: 'integer', minimum: 1, maximum: 10 },
startingPosition: { enum: ['LATEST', 'TRIM_HORIZON'] },
enabled: { type: 'boolean' },
consumer: { oneOf: [{ const: true }, { $ref: '#/definitions/awsArn' }] },
batchWindow: { type: 'integer', minimum: 0, maximum: 300 },
maximumRetryAttempts: { type: 'integer', minimum: -1, maximum: 10000 },
bisectBatchOnFunctionError: { type: 'boolean' },
maximumRecordAgeInSeconds: {
oneOf: [
{ type: 'integer', minimum: -1, maximum: -1 },
{ type: 'integer', minimum: 60, maximum: 604800 },
],
},
destinations: {
type: 'object',
properties: {
onFailure: {
anyOf: [
{ $ref: '#/definitions/awsArnString' },
{
type: 'object',
properties: {
// arn constraints are listed in oneOf property of this schema
arn: {},
type: { enum: ['sns', 'sqs'] },
},
additionalProperties: false,
oneOf: [
{
properties: {
arn: { $ref: '#/definitions/awsCfFunction' },
},
required: ['arn', 'type'],
},
{
properties: {
arn: { $ref: '#/definitions/awsArnString' },
},
required: ['arn'],
},
],
},
],
},
},
additionalProperties: false,
required: ['onFailure'],
},
},
additionalProperties: false,
oneOf: [
{
properties: {
arn: { $ref: '#/definitions/awsCfFunction' },
},
required: ['arn', 'type'],
},
{
properties: {
arn: { $ref: '#/definitions/awsArnString' },
},
required: ['arn'],
},
],
},
],
});
}

isValidStackImport(variable) {
if (Object.keys(variable).length !== 1) {
return false;
}
if (
variable['Fn::ImportValue'] &&
(variable['Fn::ImportValue']['Fn::GetAtt'] || variable['Fn::ImportValue'].Ref)
) {
return false;
}
const intrinsicFunctions = ['Fn::ImportValue', 'Ref', 'Fn::GetAtt', 'Fn::Sub', 'Fn::Join'];
return intrinsicFunctions.some(cfInstructionName => variable[cfInstructionName]);
}

resolveInvalidDestinationPropertyErrorMessage(functionName, property) {
return [
`Missing or invalid ${property} property for on failure destination`,
` in function "${functionName}"`,
'The correct syntax is: ',
'destinations: ',
' onFailure: ',
' arn: resource-arn',
' type: (sns/sqs)',
'OR an object with arn and type',
'Please check the docs for more info.',
].join('\n');
}

compileStreamEvents() {
this.serverless.service.getAllFunctions().forEach(functionName => {
const functionObj = this.serverless.service.getFunction(functionName);
Expand Down Expand Up @@ -104,47 +149,7 @@ class AwsCompileStreamEvents {
let StartingPosition = 'TRIM_HORIZON';
let Enabled = true;

// TODO validate arn syntax
if (typeof event.stream === 'object') {
if (!event.stream.arn) {
const errorMessage = [
`Missing "arn" property for stream event in function "${functionName}"`,
' The correct syntax is: stream: <StreamArn>',
' OR an object with an "arn" property.',
' Please check the docs for more info.',
].join('');
throw new this.serverless.classes.Error(errorMessage);
}
if (typeof event.stream.arn !== 'string') {
// for dynamic arns (GetAtt/ImportValue)
if (!event.stream.type) {
const errorMessage = [
`Missing "type" property for stream event in function "${functionName}"`,
' If the "arn" property on a stream is a complex type (such as Fn::GetAtt)',
' then a "type" must be provided for the stream, either "kinesis" or,',
' "dynamodb". Please check the docs for more info.',
].join('');
throw new this.serverless.classes.Error(errorMessage);
}
if (
Object.keys(event.stream.arn).length !== 1 ||
!(
event.stream.arn['Fn::ImportValue'] ||
event.stream.arn['Fn::GetAtt'] ||
(event.stream.arn.Ref &&
this.serverless.service.resources.Parameters[event.stream.arn.Ref]) ||
event.stream.arn['Fn::Join']
)
) {
const errorMessage = [
`Bad dynamic ARN property on stream event in function "${functionName}"`,
' If you use a dynamic "arn" (such as with Fn::GetAtt, Fn::Join, Ref',
' or Fn::ImportValue) there must only be one key (either Fn::GetAtt, Fn::Join, Ref',
' or Fn::ImportValue) in the arn object. Please check the docs for more info.',
].join('');
throw new this.serverless.classes.Error(errorMessage);
}
}
EventSourceArn = event.stream.arn;
BatchSize = event.stream.batchSize || BatchSize;
if (event.stream.parallelizationFactor) {
Expand All @@ -154,16 +159,8 @@ class AwsCompileStreamEvents {
if (typeof event.stream.enabled !== 'undefined') {
Enabled = event.stream.enabled;
}
} else if (typeof event.stream === 'string') {
EventSourceArn = event.stream;
} else {
const errorMessage = [
`Stream event of function "${functionName}" is not an object nor a string`,
' The correct syntax is: stream: <StreamArn>',
' OR an object with an "arn" property.',
' Please check the docs for more info.',
].join('');
throw new this.serverless.classes.Error(errorMessage);
EventSourceArn = event.stream;
}

const streamType = event.stream.type || EventSourceArn.split(':')[2];
Expand Down Expand Up @@ -211,21 +208,10 @@ class AwsCompileStreamEvents {
// add event source ARNs to PolicyDocument statements
if (streamType === 'dynamodb') {
dynamodbStreamStatement.Resource.push(EventSourceArn);
} else if (streamType === 'kinesis') {
if (event.stream.consumer) {
kinesisStreamWithConsumerStatement.Resource.push(EventSourceArn);
} else {
kinesisStreamStatement.Resource.push(EventSourceArn);
}
} else if (event.stream.consumer) {
kinesisStreamWithConsumerStatement.Resource.push(EventSourceArn);
} else {
const errorMessage = [
`Stream event of function '${functionName}' had unsupported stream type of`,
` '${streamType}'. Valid stream event source types include 'dynamodb' and`,
" 'kinesis'. Please check the docs for more info.",
].join('');
throw new this.serverless.classes.Properties.Policies[0].PolicyDocument.Error(
errorMessage
);
kinesisStreamStatement.Resource.push(EventSourceArn);
}

if (event.stream.batchWindow != null) {
Expand All @@ -247,81 +233,28 @@ class AwsCompileStreamEvents {
}

if (event.stream.destinations) {
if (event.stream.destinations.onFailure) {
let OnFailureDestinationArn;
let OnFailureDestinationArn;

if (typeof event.stream.destinations.onFailure === 'object') {
if (!event.stream.destinations.onFailure.arn) {
throw new this.serverless.classes.Error(
this.resolveInvalidDestinationPropertyErrorMessage(functionName, 'arn')
);
}
if (typeof event.stream.destinations.onFailure.arn !== 'string') {
if (!event.stream.destinations.onFailure.type) {
const errorMessage = [
`Missing "type" property for on failure destination in function "${functionName}"`,
' If the "arn" property on a destination is a complex type (such as Fn::GetAtt)',
' then a "type" must be provided for the destination, either "sns" or,',
' "sqs". Please check the docs for more info.',
].join('');
throw new this.serverless.classes.Error(errorMessage);
}
if (!this.isValidStackImport(event.stream.destinations.onFailure.arn)) {
throw new this.serverless.classes.Error(
this.resolveInvalidDestinationPropertyErrorMessage(functionName, 'arn')
);
}
}
if (
typeof event.stream.destinations.onFailure.arn === 'string' &&
!event.stream.destinations.onFailure.arn.startsWith('arn:')
) {
throw new this.serverless.classes.Error(
this.resolveInvalidDestinationPropertyErrorMessage(functionName, 'arn')
);
}
OnFailureDestinationArn = event.stream.destinations.onFailure.arn;
} else if (typeof event.stream.destinations.onFailure === 'string') {
if (!event.stream.destinations.onFailure.startsWith('arn:')) {
throw new this.serverless.classes.Error(
this.resolveInvalidDestinationPropertyErrorMessage(functionName, 'arn')
);
}
OnFailureDestinationArn = event.stream.destinations.onFailure;
} else {
throw new this.serverless.classes.Error(
this.resolveInvalidDestinationPropertyErrorMessage(functionName, 'arn')
);
}

const destinationType =
event.stream.destinations.onFailure.type || OnFailureDestinationArn.split(':')[2];
// add on failure destination ARNs to PolicyDocument statements
if (destinationType === 'sns') {
onFailureSnsStatement.Resource.push(OnFailureDestinationArn);
} else if (destinationType === 'sqs') {
onFailureSqsStatement.Resource.push(OnFailureDestinationArn);
} else {
const errorMessage = [
`Stream event of function '${functionName}' had unsupported destination type of`,
` '${streamType}'. Valid stream event source types include 'sns' and`,
" 'sqs'. Please check the docs for more info.",
].join('');
throw new this.serverless.classes.Properties.Policies[0].PolicyDocument.Error(
errorMessage
);
}
if (typeof event.stream.destinations.onFailure === 'object') {
OnFailureDestinationArn = event.stream.destinations.onFailure.arn;
} else {
OnFailureDestinationArn = event.stream.destinations.onFailure;
}

streamResource.Properties.DestinationConfig = {
OnFailure: {
Destination: OnFailureDestinationArn,
},
};
const destinationType =
event.stream.destinations.onFailure.type || OnFailureDestinationArn.split(':')[2];
// add on failure destination ARNs to PolicyDocument statements
if (destinationType === 'sns') {
onFailureSnsStatement.Resource.push(OnFailureDestinationArn);
} else {
throw new this.serverless.classes.Error(
this.resolveInvalidDestinationPropertyErrorMessage(functionName, 'onFailure')
);
onFailureSqsStatement.Resource.push(OnFailureDestinationArn);
}

streamResource.Properties.DestinationConfig = {
OnFailure: {
Destination: OnFailureDestinationArn,
},
};
}

const newStreamObject = {
Expand Down

0 comments on commit 1fb338b

Please sign in to comment.