Skip to content

Commit

Permalink
Merge pull request #299 from htxiong/feature/maximumConcurrency
Browse files Browse the repository at this point in the history
Add Max Concurrency configuration for AWS sqs.
  • Loading branch information
fredericbarthelet committed Mar 6, 2023
2 parents de59ab3 + e54dc84 commit f408415
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 2 deletions.
15 changes: 15 additions & 0 deletions docs/queue.md
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,21 @@ Note you can use [partial batch failures](#partial-batch-failures) to avoid fail

It is possible to set the batch size between 1 and 10.

### Max Concurrency

```yaml
constructs:
my-queue:
# ...
maxConcurrency: 10 # The maximum number of concurrent function instances that the SQS event source can invoke is 10
```

The launch of maximum concurrency for SQS as an event source allows you to control Lambda function concurrency per source. You set the maximum concurrency on the event source mapping, not on the Lambda function.

This event source mapping setting does not change the scaling or batching behavior of Lambda with SQS. You can continue to batch messages with a customized batch size and window. It rather sets a limit on the maximum number of concurrent function invocations per SQS event source. Once Lambda scales and reaches the maximum concurrency configured on the event source, Lambda stops reading more messages from the queue. This feature also provides you with the flexibility to define the maximum concurrency for individual event sources when the Lambda function has multiple event sources.

It is possible to set the `maxConcurrency` between 2 and 10000.

### Maximum Batching Window

```yaml
Expand Down
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
},
"devDependencies": {
"@serverless/test": "^11.0.1",
"@serverless/typescript": "^3.21.0",
"@serverless/typescript": "^3.27.0",
"@types/chai": "^4.2.21",
"@types/inquirer": "^7.3.3",
"@types/jest": "^27.0.1",
Expand All @@ -49,7 +49,7 @@
"lint-staged": "^11.0.0",
"nodemon": "^2.0.10",
"prettier": "^2.3.2",
"serverless": "^3.21.0",
"serverless": "^3.28.0",
"sinon": "^11.1.1",
"stdout-stderr": "^0.1.13",
"ts-jest": "^27.0.3",
Expand Down
7 changes: 7 additions & 0 deletions src/constructs/aws/Queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ const QUEUE_DEFINITION = {
minimum: 0,
maximum: 300,
},
maxConcurrency: {
type: "number",
minimum: 2,
maximum: 1000,
},
fifo: { type: "boolean" },
delay: { type: "number" },
encryption: { type: "string" },
Expand Down Expand Up @@ -301,6 +306,7 @@ export class Queue extends AwsConstruct {
// The default batch size is 1
const batchSize = this.configuration.batchSize ?? 1;
const maximumBatchingWindow = this.getMaximumBatchingWindow();
const maximumConcurrency = this.configuration.maxConcurrency;

// Override events for the worker
this.configuration.worker.events = [
Expand All @@ -310,6 +316,7 @@ export class Queue extends AwsConstruct {
arn: this.queue.queueArn,
batchSize: batchSize,
maximumBatchingWindow: maximumBatchingWindow,
maximumConcurrency: maximumConcurrency,
functionResponseType: "ReportBatchItemFailures",
},
},
Expand Down
26 changes: 26 additions & 0 deletions test/unit/queues.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,32 @@ describe("queues", () => {
});
});

it("allows changing the max concurrency", async () => {
const { cfTemplate, serverless } = await runServerless({
fixture: "queues",
configExt: merge({}, pluginConfigExt, {
constructs: {
emails: {
maxConcurrency: 10,
},
},
}),
command: "package",
});
const serverlessVersion = serverless.version as string;
if (serverlessVersion.startsWith("3")) {
expect(cfTemplate.Resources.EmailsWorkerEventSourceMappingSQSEmailsQueueF057328A).toMatchObject({
Properties: {
ScalingConfig: {
MaximumConcurrency: 10,
},
},
});
} else {
expect(true).toEqual(true);
}
});

it("allows changing the delivery delay", async () => {
const { cfTemplate, computeLogicalId } = await runServerless({
fixture: "queues",
Expand Down

0 comments on commit f408415

Please sign in to comment.