Skip to content

Commit

Permalink
bug #37038 Fix invalid char in SQS Headers (jderusse)
Browse files Browse the repository at this point in the history
This PR was squashed before being merged into the 5.1 branch.

Discussion
----------

Fix invalid char in SQS Headers

| Q             | A
| ------------- | ---
| Branch?       | 5.1
| Bug fix?      | yes
| New feature?  | no
| Deprecations? | yes
| Tickets       | #36525 (comment)
| License       | MIT
| Doc PR        | /

From [Amazon documnetation](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-message-attributes.html) header's name have constraints:
- only `a-zA-Z0-9_\.-` + not start/end with a `.`
- 256 char

This PR serialize ALL headers in a single SQS Attribute.

Commits
-------

76a18b0 Fix invalid char in SQS Headers
  • Loading branch information
fabpot committed Jun 3, 2020
2 parents d341254 + 76a18b0 commit 773b4ef
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 3 deletions.
Expand Up @@ -45,7 +45,7 @@ private function execute(string $dsn): void
$connection->setup();
$this->clearSqs($dsn);

$connection->send('{"message": "Hi"}', ['type' => DummyMessage::class]);
$connection->send('{"message": "Hi"}', ['type' => DummyMessage::class, DummyMessage::class => 'special']);
$this->assertSame(1, $connection->getMessageCount());

$wait = 0;
Expand All @@ -54,7 +54,7 @@ private function execute(string $dsn): void
}

$this->assertEquals('{"message": "Hi"}', $encoded['body']);
$this->assertEquals(['type' => DummyMessage::class], $encoded['headers']);
$this->assertEquals(['type' => DummyMessage::class, DummyMessage::class => 'special'], $encoded['headers']);
}

private function clearSqs(string $dsn): void
Expand Down
Expand Up @@ -30,6 +30,7 @@
class Connection
{
private const AWS_SQS_FIFO_SUFFIX = '.fifo';
private const MESSAGE_ATTRIBUTE_NAME = 'X-Symfony-Messenger';

private const DEFAULT_OPTIONS = [
'buffer_size' => 9,
Expand Down Expand Up @@ -200,7 +201,12 @@ private function fetchMessage(): bool

foreach ($this->currentResponse->getMessages() as $message) {
$headers = [];
foreach ($message->getMessageAttributes() as $name => $attribute) {
$attributes = $message->getMessageAttributes();
if (isset($attributes[self::MESSAGE_ATTRIBUTE_NAME]) && 'String' === $attributes[self::MESSAGE_ATTRIBUTE_NAME]->getDataType()) {
$headers = json_decode($attributes[self::MESSAGE_ATTRIBUTE_NAME]->getStringValue(), true);
unset($attributes[self::MESSAGE_ATTRIBUTE_NAME]);
}
foreach ($attributes as $name => $attribute) {
if ('String' !== $attribute->getDataType()) {
continue;
}
Expand Down Expand Up @@ -284,13 +290,29 @@ public function send(string $body, array $headers, int $delay = 0, ?string $mess
'MessageAttributes' => [],
];

$specialHeaders = [];
foreach ($headers as $name => $value) {
if ('.' === $name[0] || self::MESSAGE_ATTRIBUTE_NAME === $name || \strlen($name) > 256 || '.' === substr($name, -1) || 'AWS.' === substr($name, 0, \strlen('AWS.')) || 'Amazon.' === substr($name, 0, \strlen('Amazon.')) || preg_match('/([^a-zA-Z0-9_\.-]+|\.\.)/', $name)) {
$specialHeaders[$name] = $value;

continue;
}

$parameters['MessageAttributes'][$name] = new MessageAttributeValue([
'DataType' => 'String',
'StringValue' => $value,
]);
}

if (!empty($specialHeaders)) {
$parameters['MessageAttributes'][self::MESSAGE_ATTRIBUTE_NAME] = new MessageAttributeValue([
'DataType' => 'String',
'StringValue' => json_encode($specialHeaders),
]);
}

dd($parameters);

if (self::isFifoQueue($this->configuration['queue_name'])) {
$parameters['MessageGroupId'] = null !== $messageGroupId ? $messageGroupId : __METHOD__;
$parameters['MessageDeduplicationId'] = null !== $messageDeduplicationId ? $messageDeduplicationId : sha1(json_encode(['body' => $body, 'headers' => $headers]));
Expand Down

0 comments on commit 773b4ef

Please sign in to comment.