From 1fb338b184ed770bc5d8d162bf5c54336f3d2ddd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fr=C3=A9d=C3=A9ric=20Barthelet?= Date: Mon, 14 Sep 2020 15:46:13 +0200 Subject: [PATCH] feat(Config Schema): Schema for AWS `stream` event (#8201) --- .../package/compile/events/stream/index.js | 261 +++++++----------- .../compile/events/stream/index.test.js | 149 ---------- 2 files changed, 97 insertions(+), 313 deletions(-) diff --git a/lib/plugins/aws/package/compile/events/stream/index.js b/lib/plugins/aws/package/compile/events/stream/index.js index a9e67f15f40..463d88be03b 100644 --- a/lib/plugins/aws/package/compile/events/stream/index.js +++ b/lib/plugins/aws/package/compile/events/stream/index.js @@ -11,40 +11,85 @@ class AwsCompileStreamEvents { 'package:compileEvents': this.compileStreamEvents.bind(this), }; - // TODO: Complete schema, see https://github.com/serverless/serverless/issues/8034 this.serverless.configSchemaHandler.defineFunctionEvent('aws', 'stream', { - anyOf: [{ type: 'string' }, { type: 'object' }], + anyOf: [ + { $ref: '#/definitions/awsArnString' }, + { + type: 'object', + properties: { + // arn constraints are listed in oneOf property of this schema + arn: {}, + type: { enum: ['dynamodb', 'kinesis'] }, + batchSize: { type: 'integer', minimum: 1, maximum: 10000 }, + parallelizationFactor: { type: 'integer', minimum: 1, maximum: 10 }, + startingPosition: { enum: ['LATEST', 'TRIM_HORIZON'] }, + enabled: { type: 'boolean' }, + consumer: { oneOf: [{ const: true }, { $ref: '#/definitions/awsArn' }] }, + batchWindow: { type: 'integer', minimum: 0, maximum: 300 }, + maximumRetryAttempts: { type: 'integer', minimum: -1, maximum: 10000 }, + bisectBatchOnFunctionError: { type: 'boolean' }, + maximumRecordAgeInSeconds: { + oneOf: [ + { type: 'integer', minimum: -1, maximum: -1 }, + { type: 'integer', minimum: 60, maximum: 604800 }, + ], + }, + destinations: { + type: 'object', + properties: { + onFailure: { + anyOf: [ + { $ref: '#/definitions/awsArnString' }, + { + type: 'object', + properties: { + // arn constraints are listed in oneOf property of this schema + arn: {}, + type: { enum: ['sns', 'sqs'] }, + }, + additionalProperties: false, + oneOf: [ + { + properties: { + arn: { $ref: '#/definitions/awsCfFunction' }, + }, + required: ['arn', 'type'], + }, + { + properties: { + arn: { $ref: '#/definitions/awsArnString' }, + }, + required: ['arn'], + }, + ], + }, + ], + }, + }, + additionalProperties: false, + required: ['onFailure'], + }, + }, + additionalProperties: false, + oneOf: [ + { + properties: { + arn: { $ref: '#/definitions/awsCfFunction' }, + }, + required: ['arn', 'type'], + }, + { + properties: { + arn: { $ref: '#/definitions/awsArnString' }, + }, + required: ['arn'], + }, + ], + }, + ], }); } - isValidStackImport(variable) { - if (Object.keys(variable).length !== 1) { - return false; - } - if ( - variable['Fn::ImportValue'] && - (variable['Fn::ImportValue']['Fn::GetAtt'] || variable['Fn::ImportValue'].Ref) - ) { - return false; - } - const intrinsicFunctions = ['Fn::ImportValue', 'Ref', 'Fn::GetAtt', 'Fn::Sub', 'Fn::Join']; - return intrinsicFunctions.some(cfInstructionName => variable[cfInstructionName]); - } - - resolveInvalidDestinationPropertyErrorMessage(functionName, property) { - return [ - `Missing or invalid ${property} property for on failure destination`, - ` in function "${functionName}"`, - 'The correct syntax is: ', - 'destinations: ', - ' onFailure: ', - ' arn: resource-arn', - ' type: (sns/sqs)', - 'OR an object with arn and type', - 'Please check the docs for more info.', - ].join('\n'); - } - compileStreamEvents() { this.serverless.service.getAllFunctions().forEach(functionName => { const functionObj = this.serverless.service.getFunction(functionName); @@ -104,47 +149,7 @@ class AwsCompileStreamEvents { let StartingPosition = 'TRIM_HORIZON'; let Enabled = true; - // TODO validate arn syntax if (typeof event.stream === 'object') { - if (!event.stream.arn) { - const errorMessage = [ - `Missing "arn" property for stream event in function "${functionName}"`, - ' The correct syntax is: stream: ', - ' OR an object with an "arn" property.', - ' Please check the docs for more info.', - ].join(''); - throw new this.serverless.classes.Error(errorMessage); - } - if (typeof event.stream.arn !== 'string') { - // for dynamic arns (GetAtt/ImportValue) - if (!event.stream.type) { - const errorMessage = [ - `Missing "type" property for stream event in function "${functionName}"`, - ' If the "arn" property on a stream is a complex type (such as Fn::GetAtt)', - ' then a "type" must be provided for the stream, either "kinesis" or,', - ' "dynamodb". Please check the docs for more info.', - ].join(''); - throw new this.serverless.classes.Error(errorMessage); - } - if ( - Object.keys(event.stream.arn).length !== 1 || - !( - event.stream.arn['Fn::ImportValue'] || - event.stream.arn['Fn::GetAtt'] || - (event.stream.arn.Ref && - this.serverless.service.resources.Parameters[event.stream.arn.Ref]) || - event.stream.arn['Fn::Join'] - ) - ) { - const errorMessage = [ - `Bad dynamic ARN property on stream event in function "${functionName}"`, - ' If you use a dynamic "arn" (such as with Fn::GetAtt, Fn::Join, Ref', - ' or Fn::ImportValue) there must only be one key (either Fn::GetAtt, Fn::Join, Ref', - ' or Fn::ImportValue) in the arn object. Please check the docs for more info.', - ].join(''); - throw new this.serverless.classes.Error(errorMessage); - } - } EventSourceArn = event.stream.arn; BatchSize = event.stream.batchSize || BatchSize; if (event.stream.parallelizationFactor) { @@ -154,16 +159,8 @@ class AwsCompileStreamEvents { if (typeof event.stream.enabled !== 'undefined') { Enabled = event.stream.enabled; } - } else if (typeof event.stream === 'string') { - EventSourceArn = event.stream; } else { - const errorMessage = [ - `Stream event of function "${functionName}" is not an object nor a string`, - ' The correct syntax is: stream: ', - ' OR an object with an "arn" property.', - ' Please check the docs for more info.', - ].join(''); - throw new this.serverless.classes.Error(errorMessage); + EventSourceArn = event.stream; } const streamType = event.stream.type || EventSourceArn.split(':')[2]; @@ -211,21 +208,10 @@ class AwsCompileStreamEvents { // add event source ARNs to PolicyDocument statements if (streamType === 'dynamodb') { dynamodbStreamStatement.Resource.push(EventSourceArn); - } else if (streamType === 'kinesis') { - if (event.stream.consumer) { - kinesisStreamWithConsumerStatement.Resource.push(EventSourceArn); - } else { - kinesisStreamStatement.Resource.push(EventSourceArn); - } + } else if (event.stream.consumer) { + kinesisStreamWithConsumerStatement.Resource.push(EventSourceArn); } else { - const errorMessage = [ - `Stream event of function '${functionName}' had unsupported stream type of`, - ` '${streamType}'. Valid stream event source types include 'dynamodb' and`, - " 'kinesis'. Please check the docs for more info.", - ].join(''); - throw new this.serverless.classes.Properties.Policies[0].PolicyDocument.Error( - errorMessage - ); + kinesisStreamStatement.Resource.push(EventSourceArn); } if (event.stream.batchWindow != null) { @@ -247,81 +233,28 @@ class AwsCompileStreamEvents { } if (event.stream.destinations) { - if (event.stream.destinations.onFailure) { - let OnFailureDestinationArn; + let OnFailureDestinationArn; - if (typeof event.stream.destinations.onFailure === 'object') { - if (!event.stream.destinations.onFailure.arn) { - throw new this.serverless.classes.Error( - this.resolveInvalidDestinationPropertyErrorMessage(functionName, 'arn') - ); - } - if (typeof event.stream.destinations.onFailure.arn !== 'string') { - if (!event.stream.destinations.onFailure.type) { - const errorMessage = [ - `Missing "type" property for on failure destination in function "${functionName}"`, - ' If the "arn" property on a destination is a complex type (such as Fn::GetAtt)', - ' then a "type" must be provided for the destination, either "sns" or,', - ' "sqs". Please check the docs for more info.', - ].join(''); - throw new this.serverless.classes.Error(errorMessage); - } - if (!this.isValidStackImport(event.stream.destinations.onFailure.arn)) { - throw new this.serverless.classes.Error( - this.resolveInvalidDestinationPropertyErrorMessage(functionName, 'arn') - ); - } - } - if ( - typeof event.stream.destinations.onFailure.arn === 'string' && - !event.stream.destinations.onFailure.arn.startsWith('arn:') - ) { - throw new this.serverless.classes.Error( - this.resolveInvalidDestinationPropertyErrorMessage(functionName, 'arn') - ); - } - OnFailureDestinationArn = event.stream.destinations.onFailure.arn; - } else if (typeof event.stream.destinations.onFailure === 'string') { - if (!event.stream.destinations.onFailure.startsWith('arn:')) { - throw new this.serverless.classes.Error( - this.resolveInvalidDestinationPropertyErrorMessage(functionName, 'arn') - ); - } - OnFailureDestinationArn = event.stream.destinations.onFailure; - } else { - throw new this.serverless.classes.Error( - this.resolveInvalidDestinationPropertyErrorMessage(functionName, 'arn') - ); - } - - const destinationType = - event.stream.destinations.onFailure.type || OnFailureDestinationArn.split(':')[2]; - // add on failure destination ARNs to PolicyDocument statements - if (destinationType === 'sns') { - onFailureSnsStatement.Resource.push(OnFailureDestinationArn); - } else if (destinationType === 'sqs') { - onFailureSqsStatement.Resource.push(OnFailureDestinationArn); - } else { - const errorMessage = [ - `Stream event of function '${functionName}' had unsupported destination type of`, - ` '${streamType}'. Valid stream event source types include 'sns' and`, - " 'sqs'. Please check the docs for more info.", - ].join(''); - throw new this.serverless.classes.Properties.Policies[0].PolicyDocument.Error( - errorMessage - ); - } + if (typeof event.stream.destinations.onFailure === 'object') { + OnFailureDestinationArn = event.stream.destinations.onFailure.arn; + } else { + OnFailureDestinationArn = event.stream.destinations.onFailure; + } - streamResource.Properties.DestinationConfig = { - OnFailure: { - Destination: OnFailureDestinationArn, - }, - }; + const destinationType = + event.stream.destinations.onFailure.type || OnFailureDestinationArn.split(':')[2]; + // add on failure destination ARNs to PolicyDocument statements + if (destinationType === 'sns') { + onFailureSnsStatement.Resource.push(OnFailureDestinationArn); } else { - throw new this.serverless.classes.Error( - this.resolveInvalidDestinationPropertyErrorMessage(functionName, 'onFailure') - ); + onFailureSqsStatement.Resource.push(OnFailureDestinationArn); } + + streamResource.Properties.DestinationConfig = { + OnFailure: { + Destination: OnFailureDestinationArn, + }, + }; } const newStreamObject = { diff --git a/lib/plugins/aws/package/compile/events/stream/index.test.js b/lib/plugins/aws/package/compile/events/stream/index.test.js index 6dadbc3c21f..0d04c329a61 100644 --- a/lib/plugins/aws/package/compile/events/stream/index.test.js +++ b/lib/plugins/aws/package/compile/events/stream/index.test.js @@ -83,22 +83,6 @@ describe('AwsCompileStreamEvents', () => { expect(() => awsCompileStreamEvents.compileStreamEvents()).to.throw(Error); }); - it('should throw an error if the "arn" property contains an unsupported stream type', () => { - awsCompileStreamEvents.serverless.service.functions = { - first: { - events: [ - { - stream: { - arn: 'arn:aws:NOT-SUPPORTED:us-east-1:123456789012:stream/myStream', - }, - }, - ], - }, - }; - - expect(() => awsCompileStreamEvents.compileStreamEvents()).to.throw(Error); - }); - it('should not throw error or merge role statements if default policy is not present', () => { awsCompileStreamEvents.serverless.service.functions = { first: { @@ -995,96 +979,6 @@ describe('AwsCompileStreamEvents', () => { expect(() => awsCompileStreamEvents.compileStreamEvents()).to.throw(Error); }); - it('fails if keys other than Fn::GetAtt/ImportValue/Join are used for dynamic stream ARN', () => { - awsCompileStreamEvents.serverless.service.functions = { - first: { - events: [ - { - stream: { - type: 'dynamodb', - arn: { - 'Fn::GetAtt': ['SomeDdbTable', 'StreamArn'], - 'batchSize': 1, - }, - }, - }, - ], - }, - }; - - expect(() => awsCompileStreamEvents.compileStreamEvents()).to.throw(Error); - }); - - it('fails if keys other than Fn::GetAtt/ImportValue/Join are used for dynamic onFailure ARN', () => { - awsCompileStreamEvents.serverless.service.functions = { - first: { - events: [ - { - stream: { - arn: 'arn:aws:dynamodb:region:account:table/foo/stream/1', - destinations: { - onFailure: { - arn: { - 'Fn::GetAtt': ['SomeSNS', 'Arn'], - 'batchSize': 1, - }, - type: 'sns', - }, - }, - }, - }, - ], - }, - }; - - expect(() => awsCompileStreamEvents.compileStreamEvents()).to.throw(Error); - }); - - it('fails if Fn::ImportValue is misused for onFailure ARN', () => { - awsCompileStreamEvents.serverless.service.functions = { - first: { - events: [ - { - stream: { - arn: 'arn:aws:dynamodb:region:account:table/foo/stream/1', - destinations: { - onFailure: { - arn: { - 'Fn::ImportValue': { - 'Fn::GetAtt': ['SomeSNS', 'Arn'], - }, - }, - type: 'invalidType', - }, - }, - }, - }, - ], - }, - }; - - expect(() => awsCompileStreamEvents.compileStreamEvents()).to.throw(Error); - }); - - it('fails if onFailure ARN is given as a string that does not start with arn', () => { - awsCompileStreamEvents.serverless.service.functions = { - first: { - events: [ - { - stream: { - arn: 'arn:aws:dynamodb:region:account:table/foo/stream/1', - destinations: { - onFailure: 'invalidARN', - }, - }, - }, - ], - }, - }; - - expect(() => awsCompileStreamEvents.compileStreamEvents()).to.throw(Error); - }); - it('fails if onFailure ARN is given as a variable type other than string or object', () => { awsCompileStreamEvents.serverless.service.functions = { first: { @@ -1104,27 +998,6 @@ describe('AwsCompileStreamEvents', () => { expect(() => awsCompileStreamEvents.compileStreamEvents()).to.throw(Error); }); - it('fails if nested onFailure ARN is given as a string that does not start with arn', () => { - awsCompileStreamEvents.serverless.service.functions = { - first: { - events: [ - { - stream: { - arn: 'arn:aws:dynamodb:region:account:table/foo/stream/1', - destinations: { - onFailure: { - arn: 'invalidARN', - }, - }, - }, - }, - ], - }, - }; - - expect(() => awsCompileStreamEvents.compileStreamEvents()).to.throw(Error); - }); - it('fails if no arn key is given for a dynamic onFailure ARN', () => { awsCompileStreamEvents.serverless.service.functions = { first: { @@ -1170,28 +1043,6 @@ describe('AwsCompileStreamEvents', () => { expect(() => awsCompileStreamEvents.compileStreamEvents()).to.throw(Error); }); - it('fails if invalid onFailure type is given', () => { - awsCompileStreamEvents.serverless.service.functions = { - first: { - events: [ - { - stream: { - arn: 'arn:aws:dynamodb:region:account:table/foo/stream/1', - destinations: { - onFailure: { - arn: { 'Fn::GetAtt': ['SomeSNS', 'Arn'] }, - type: 'invalidType', - }, - }, - }, - }, - ], - }, - }; - - expect(() => awsCompileStreamEvents.compileStreamEvents()).to.throw(Error); - }); - it('should add the necessary IAM role statements', () => { awsCompileStreamEvents.serverless.service.functions = { first: {