Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream schema #8201

Merged
merged 5 commits into from Sep 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
261 changes: 97 additions & 164 deletions lib/plugins/aws/package/compile/events/stream/index.js
Expand Up @@ -11,40 +11,85 @@ class AwsCompileStreamEvents {
'package:compileEvents': this.compileStreamEvents.bind(this),
};

// TODO: Complete schema, see https://github.com/serverless/serverless/issues/8034
this.serverless.configSchemaHandler.defineFunctionEvent('aws', 'stream', {
anyOf: [{ type: 'string' }, { type: 'object' }],
anyOf: [
{ $ref: '#/definitions/awsArnString' },
{
type: 'object',
properties: {
// arn constraints are listed in oneOf property of this schema
arn: {},
type: { enum: ['dynamodb', 'kinesis'] },
batchSize: { type: 'integer', minimum: 1, maximum: 10000 },
parallelizationFactor: { type: 'integer', minimum: 1, maximum: 10 },
startingPosition: { enum: ['LATEST', 'TRIM_HORIZON'] },
enabled: { type: 'boolean' },
consumer: { oneOf: [{ const: true }, { $ref: '#/definitions/awsArn' }] },
batchWindow: { type: 'integer', minimum: 0, maximum: 300 },
maximumRetryAttempts: { type: 'integer', minimum: -1, maximum: 10000 },
bisectBatchOnFunctionError: { type: 'boolean' },
maximumRecordAgeInSeconds: {
oneOf: [
{ type: 'integer', minimum: -1, maximum: -1 },
{ type: 'integer', minimum: 60, maximum: 604800 },
],
},
destinations: {
type: 'object',
properties: {
onFailure: {
anyOf: [
{ $ref: '#/definitions/awsArnString' },
{
type: 'object',
properties: {
// arn constraints are listed in oneOf property of this schema
arn: {},
type: { enum: ['sns', 'sqs'] },
},
additionalProperties: false,
oneOf: [
{
properties: {
arn: { $ref: '#/definitions/awsCfFunction' },
},
required: ['arn', 'type'],
},
{
properties: {
arn: { $ref: '#/definitions/awsArnString' },
},
required: ['arn'],
},
],
},
],
},
},
additionalProperties: false,
required: ['onFailure'],
},
},
additionalProperties: false,
oneOf: [
{
properties: {
arn: { $ref: '#/definitions/awsCfFunction' },
},
required: ['arn', 'type'],
},
{
properties: {
arn: { $ref: '#/definitions/awsArnString' },
},
required: ['arn'],
},
],
},
],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Going that route is problematic for error reporting.

As e.g. if one makes error in batchSize, we will get error for both alternatives, and error normalizer, not being able to decide which alternative was chosen to be followed by user, will expose the error as generic functions[].events[].sns: unsupported configuration format which doesn't give a hint of what the real error is.

I think this can be cleanly solved solved by using additionalProperties as a container to define two options for arn an type, and keep others defined as it was in previous version

It also doesn't feel good, that we repeat definitions for most of properties, seems error-prone.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @medikoo , not sur what you mean by using additionalProperties. Could you provide an exemple ?

In the meantime, I found a solution detailed in json-schema/json-schema#158 showing how to implement validation based on property value (the correct way of JSON schema is schema dependency based on property existence with dependencies keyword).

The resulting error message when specifying a CF intrinsic function without type is Configuration error at 'functions.toto.events[0].stream.destinations.onFailure.arn': should be string which seems correct. WDYT ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @medikoo , not sur what you mean by using additionalProperties. Could you provide an exemple ?

Sorry, that was an invalid suggestion, via additionalProperties you can enforce schema for other properties, but obviously not one by one, but for all generally.

which seems correct. WDYT ?

Looks great to me!

});
}

isValidStackImport(variable) {
if (Object.keys(variable).length !== 1) {
return false;
}
if (
variable['Fn::ImportValue'] &&
(variable['Fn::ImportValue']['Fn::GetAtt'] || variable['Fn::ImportValue'].Ref)
) {
return false;
}
const intrinsicFunctions = ['Fn::ImportValue', 'Ref', 'Fn::GetAtt', 'Fn::Sub', 'Fn::Join'];
return intrinsicFunctions.some(cfInstructionName => variable[cfInstructionName]);
}

resolveInvalidDestinationPropertyErrorMessage(functionName, property) {
return [
`Missing or invalid ${property} property for on failure destination`,
` in function "${functionName}"`,
'The correct syntax is: ',
'destinations: ',
' onFailure: ',
' arn: resource-arn',
' type: (sns/sqs)',
'OR an object with arn and type',
'Please check the docs for more info.',
].join('\n');
}

compileStreamEvents() {
this.serverless.service.getAllFunctions().forEach(functionName => {
const functionObj = this.serverless.service.getFunction(functionName);
Expand Down Expand Up @@ -104,47 +149,7 @@ class AwsCompileStreamEvents {
let StartingPosition = 'TRIM_HORIZON';
let Enabled = true;

// TODO validate arn syntax
if (typeof event.stream === 'object') {
if (!event.stream.arn) {
const errorMessage = [
`Missing "arn" property for stream event in function "${functionName}"`,
' The correct syntax is: stream: <StreamArn>',
' OR an object with an "arn" property.',
' Please check the docs for more info.',
].join('');
throw new this.serverless.classes.Error(errorMessage);
}
if (typeof event.stream.arn !== 'string') {
// for dynamic arns (GetAtt/ImportValue)
if (!event.stream.type) {
const errorMessage = [
`Missing "type" property for stream event in function "${functionName}"`,
' If the "arn" property on a stream is a complex type (such as Fn::GetAtt)',
' then a "type" must be provided for the stream, either "kinesis" or,',
' "dynamodb". Please check the docs for more info.',
].join('');
throw new this.serverless.classes.Error(errorMessage);
}
if (
Object.keys(event.stream.arn).length !== 1 ||
!(
event.stream.arn['Fn::ImportValue'] ||
event.stream.arn['Fn::GetAtt'] ||
(event.stream.arn.Ref &&
this.serverless.service.resources.Parameters[event.stream.arn.Ref]) ||
event.stream.arn['Fn::Join']
)
) {
const errorMessage = [
`Bad dynamic ARN property on stream event in function "${functionName}"`,
' If you use a dynamic "arn" (such as with Fn::GetAtt, Fn::Join, Ref',
' or Fn::ImportValue) there must only be one key (either Fn::GetAtt, Fn::Join, Ref',
' or Fn::ImportValue) in the arn object. Please check the docs for more info.',
].join('');
throw new this.serverless.classes.Error(errorMessage);
}
}
EventSourceArn = event.stream.arn;
BatchSize = event.stream.batchSize || BatchSize;
if (event.stream.parallelizationFactor) {
Expand All @@ -154,16 +159,8 @@ class AwsCompileStreamEvents {
if (typeof event.stream.enabled !== 'undefined') {
Enabled = event.stream.enabled;
}
} else if (typeof event.stream === 'string') {
EventSourceArn = event.stream;
} else {
const errorMessage = [
`Stream event of function "${functionName}" is not an object nor a string`,
' The correct syntax is: stream: <StreamArn>',
' OR an object with an "arn" property.',
' Please check the docs for more info.',
].join('');
throw new this.serverless.classes.Error(errorMessage);
EventSourceArn = event.stream;
}

const streamType = event.stream.type || EventSourceArn.split(':')[2];
Expand Down Expand Up @@ -211,21 +208,10 @@ class AwsCompileStreamEvents {
// add event source ARNs to PolicyDocument statements
if (streamType === 'dynamodb') {
dynamodbStreamStatement.Resource.push(EventSourceArn);
} else if (streamType === 'kinesis') {
if (event.stream.consumer) {
kinesisStreamWithConsumerStatement.Resource.push(EventSourceArn);
} else {
kinesisStreamStatement.Resource.push(EventSourceArn);
}
} else if (event.stream.consumer) {
kinesisStreamWithConsumerStatement.Resource.push(EventSourceArn);
} else {
const errorMessage = [
`Stream event of function '${functionName}' had unsupported stream type of`,
` '${streamType}'. Valid stream event source types include 'dynamodb' and`,
" 'kinesis'. Please check the docs for more info.",
].join('');
throw new this.serverless.classes.Properties.Policies[0].PolicyDocument.Error(
errorMessage
);
kinesisStreamStatement.Resource.push(EventSourceArn);
}

if (event.stream.batchWindow != null) {
Expand All @@ -247,81 +233,28 @@ class AwsCompileStreamEvents {
}

if (event.stream.destinations) {
if (event.stream.destinations.onFailure) {
let OnFailureDestinationArn;
let OnFailureDestinationArn;

if (typeof event.stream.destinations.onFailure === 'object') {
if (!event.stream.destinations.onFailure.arn) {
throw new this.serverless.classes.Error(
this.resolveInvalidDestinationPropertyErrorMessage(functionName, 'arn')
);
}
if (typeof event.stream.destinations.onFailure.arn !== 'string') {
if (!event.stream.destinations.onFailure.type) {
const errorMessage = [
`Missing "type" property for on failure destination in function "${functionName}"`,
' If the "arn" property on a destination is a complex type (such as Fn::GetAtt)',
' then a "type" must be provided for the destination, either "sns" or,',
' "sqs". Please check the docs for more info.',
].join('');
throw new this.serverless.classes.Error(errorMessage);
}
if (!this.isValidStackImport(event.stream.destinations.onFailure.arn)) {
throw new this.serverless.classes.Error(
this.resolveInvalidDestinationPropertyErrorMessage(functionName, 'arn')
);
}
}
if (
typeof event.stream.destinations.onFailure.arn === 'string' &&
!event.stream.destinations.onFailure.arn.startsWith('arn:')
) {
throw new this.serverless.classes.Error(
this.resolveInvalidDestinationPropertyErrorMessage(functionName, 'arn')
);
}
OnFailureDestinationArn = event.stream.destinations.onFailure.arn;
} else if (typeof event.stream.destinations.onFailure === 'string') {
if (!event.stream.destinations.onFailure.startsWith('arn:')) {
throw new this.serverless.classes.Error(
this.resolveInvalidDestinationPropertyErrorMessage(functionName, 'arn')
);
}
OnFailureDestinationArn = event.stream.destinations.onFailure;
} else {
throw new this.serverless.classes.Error(
this.resolveInvalidDestinationPropertyErrorMessage(functionName, 'arn')
);
}

const destinationType =
event.stream.destinations.onFailure.type || OnFailureDestinationArn.split(':')[2];
// add on failure destination ARNs to PolicyDocument statements
if (destinationType === 'sns') {
onFailureSnsStatement.Resource.push(OnFailureDestinationArn);
} else if (destinationType === 'sqs') {
onFailureSqsStatement.Resource.push(OnFailureDestinationArn);
} else {
const errorMessage = [
`Stream event of function '${functionName}' had unsupported destination type of`,
` '${streamType}'. Valid stream event source types include 'sns' and`,
" 'sqs'. Please check the docs for more info.",
].join('');
throw new this.serverless.classes.Properties.Policies[0].PolicyDocument.Error(
errorMessage
);
}
if (typeof event.stream.destinations.onFailure === 'object') {
OnFailureDestinationArn = event.stream.destinations.onFailure.arn;
} else {
OnFailureDestinationArn = event.stream.destinations.onFailure;
}

streamResource.Properties.DestinationConfig = {
OnFailure: {
Destination: OnFailureDestinationArn,
},
};
const destinationType =
event.stream.destinations.onFailure.type || OnFailureDestinationArn.split(':')[2];
// add on failure destination ARNs to PolicyDocument statements
if (destinationType === 'sns') {
onFailureSnsStatement.Resource.push(OnFailureDestinationArn);
} else {
throw new this.serverless.classes.Error(
this.resolveInvalidDestinationPropertyErrorMessage(functionName, 'onFailure')
);
onFailureSqsStatement.Resource.push(OnFailureDestinationArn);
}

streamResource.Properties.DestinationConfig = {
OnFailure: {
Destination: OnFailureDestinationArn,
},
};
}

const newStreamObject = {
Expand Down