Skip to content

Commit df7b9b4

Browse files
authoredNov 26, 2021
feat(ecs-service-extensions): Auto scaling for Queue Extension (#17430)
---- This PR adds target tracking auto scaling policy for the the SQS Queues provided to and created by the `QueueExtension` (in the `useService()` hook). The auto scaling is based on `backlogPerTask` custom metric which is emitted by an AWS Lambda Function. The PR also contains this Lambda Function and its tests. *By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license*
1 parent efaaaf5 commit df7b9b4

18 files changed

+1456
-254
lines changed
 

‎packages/@aws-cdk-containers/ecs-service-extensions/README.md

+33-2
Original file line numberDiff line numberDiff line change
@@ -392,11 +392,42 @@ For setting up a topic-specific queue subscription, you can provide a custom que
392392
393393
```ts
394394
nameDescription.add(new QueueExtension({
395-
queue: myEventsQueue,
395+
eventsQueue: myEventsQueue,
396396
subscriptions: [new TopicSubscription({
397397
topic: new sns.Topic(stack, 'my-topic'),
398398
// `myTopicQueue` will subscribe to the `my-topic` instead of `eventsQueue`
399-
queue: myTopicQueue,
399+
topicSubscriptionQueue: {
400+
queue: myTopicQueue,
401+
},
402+
}],
403+
}));
404+
```
405+
406+
### Configuring auto scaling based on SQS Queues
407+
408+
You can scale your service up or down to maintain an acceptable queue latency by tracking the backlog per task. It configures a target tracking scaling policy with target value (acceptable backlog per task) calculated by dividing the `acceptableLatency` by `messageProcessingTime`. For example, if the maximum acceptable latency for a message to be processed after its arrival in the SQS Queue is 10 mins and the average processing time for a task is 250 milliseconds per message, then `acceptableBacklogPerTask = 10 * 60 / 0.25 = 2400`. Therefore, each queue can hold up to 2400 messages before the service starts to scale up. For this, a target tracking policy will be attached to the scaling target for your service with target value `2400`. For more information, please refer: https://docs.aws.amazon.com/autoscaling/ec2/userguide/as-using-sqs-queue.html .
409+
410+
You can configure auto scaling based on SQS Queue for your service as follows:
411+
412+
```ts
413+
nameDescription.add(new QueueExtension({
414+
eventsQueue: myEventsQueue,
415+
// Need to specify `scaleOnLatency` to configure auto scaling based on SQS Queue
416+
scaleOnLatency: {
417+
acceptableLatency: cdk.Duration.minutes(10),
418+
messageProcessingTime: cdk.Duration.millis(250),
419+
},
420+
subscriptions: [new TopicSubscription({
421+
topic: new sns.Topic(stack, 'my-topic'),
422+
// `myTopicQueue` will subscribe to the `my-topic` instead of `eventsQueue`
423+
topicSubscriptionQueue: {
424+
queue: myTopicQueue,
425+
// Optionally provide `scaleOnLatency` for configuring separate autoscaling for `myTopicQueue`
426+
scaleOnLatency: {
427+
acceptableLatency: cdk.Duration.minutes(10),
428+
messageProcessingTime: cdk.Duration.millis(250),
429+
}
430+
},
400431
}],
401432
}));
402433
```

‎packages/@aws-cdk-containers/ecs-service-extensions/lib/extensions/index.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,5 +6,5 @@ export * from './cloudwatch-agent';
66
export * from './scale-on-cpu-utilization';
77
export * from './xray';
88
export * from './assign-public-ip';
9-
export * from './queue';
9+
export * from './queue/queue';
1010
export * from './injecter';

‎packages/@aws-cdk-containers/ecs-service-extensions/lib/extensions/queue.ts

-212
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
export * from './queue';
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
import os
2+
import boto3
3+
from queue_backlog_calculator import QueueHandler
4+
5+
def queue_handler(event, context):
6+
"""
7+
Handler for the lambda trigger
8+
"""
9+
10+
ecs = boto3.client('ecs')
11+
sqs = boto3.client('sqs')
12+
13+
queue_handler = QueueHandler(ecs_client=ecs, sqs_client=sqs, environ=os.environ)
14+
15+
return queue_handler.emit()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
from math import ceil
2+
import time
3+
import json
4+
5+
class QueueHandler:
6+
def __init__(self, ecs_client, sqs_client, environ):
7+
self.ecs = ecs_client
8+
self.sqs = sqs_client
9+
self.cluster_name = environ['CLUSTER_NAME']
10+
self.service_name = environ['SERVICE_NAME']
11+
self.namespace = environ['NAMESPACE']
12+
self.queue_names = environ['QUEUE_NAMES'].split(',')
13+
14+
def emit(self):
15+
try:
16+
running_count = self.get_running_task_count()
17+
backlogs = [self.get_queue_backlog(queue_name, running_count) for queue_name in self.queue_names]
18+
self.timestamp = int(time.time() * 1000)
19+
for backlog in backlogs:
20+
self.emit_backlog_per_task_metric(backlog['queueName'], backlog['backlogPerTask'])
21+
except Exception as e:
22+
Exception('Exception: {}'.format(e))
23+
24+
"""
25+
Write the backlogPerTask metric to the stdout according to the Cloudwatch embedded metric format.
26+
"""
27+
def emit_backlog_per_task_metric(self, queue_name, backlog_per_task):
28+
print(json.dumps({
29+
"_aws": {
30+
"Timestamp": self.timestamp,
31+
"CloudWatchMetrics": [{
32+
"Namespace": self.namespace,
33+
"Dimensions": [["QueueName"]],
34+
"Metrics": [{"Name":"BacklogPerTask", "Unit": "Count"}]
35+
}],
36+
},
37+
"QueueName": queue_name,
38+
"BacklogPerTask": backlog_per_task,
39+
}))
40+
41+
"""
42+
Get the number of tasks in the 'RUNNING' state for the service 'service_name'.
43+
"""
44+
def get_running_task_count(self):
45+
service_desc = self.ecs.describe_services(
46+
cluster=self.cluster_name,
47+
services=[self.service_name],
48+
)
49+
if len(service_desc['services']) == 0:
50+
raise Exception('There are no services with name {} in cluster: {}'.format(self.service_name, self.cluster_name))
51+
return service_desc['services'][0].get('runningCount', 0)
52+
53+
"""
54+
This method calculates and returns the backlogPerTask metric for the given queue.
55+
"""
56+
def get_queue_backlog(self, queue_name, count):
57+
queue_url = self.sqs.get_queue_url(QueueName=queue_name)
58+
running_count = 1 if count == 0 else count
59+
60+
def get_backlog_per_task():
61+
queue_attributes = self.sqs.get_queue_attributes(
62+
QueueUrl=queue_url['QueueUrl'],
63+
AttributeNames=['ApproximateNumberOfMessages']
64+
)
65+
num_of_msgs = int(queue_attributes['Attributes'].get('ApproximateNumberOfMessages', 0))
66+
return ceil(num_of_msgs/running_count)
67+
68+
return {
69+
'queueName': queue_name,
70+
'backlogPerTask': get_backlog_per_task()
71+
}

0 commit comments

Comments
 (0)
Please sign in to comment.