From 76a18b0b479af08f19894bf58fad213ebd81b98f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=A9my=20Deruss=C3=A9?= Date: Mon, 1 Jun 2020 10:33:14 +0200 Subject: [PATCH] Fix invalid char in SQS Headers --- .../Transport/AmazonSqsIntegrationTest.php | 4 ++-- .../Bridge/AmazonSqs/Transport/Connection.php | 24 ++++++++++++++++++- 2 files changed, 25 insertions(+), 3 deletions(-) diff --git a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/AmazonSqsIntegrationTest.php b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/AmazonSqsIntegrationTest.php index 3c47be561475..c840822abea4 100644 --- a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/AmazonSqsIntegrationTest.php +++ b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/AmazonSqsIntegrationTest.php @@ -45,7 +45,7 @@ private function execute(string $dsn): void $connection->setup(); $this->clearSqs($dsn); - $connection->send('{"message": "Hi"}', ['type' => DummyMessage::class]); + $connection->send('{"message": "Hi"}', ['type' => DummyMessage::class, DummyMessage::class => 'special']); $this->assertSame(1, $connection->getMessageCount()); $wait = 0; @@ -54,7 +54,7 @@ private function execute(string $dsn): void } $this->assertEquals('{"message": "Hi"}', $encoded['body']); - $this->assertEquals(['type' => DummyMessage::class], $encoded['headers']); + $this->assertEquals(['type' => DummyMessage::class, DummyMessage::class => 'special'], $encoded['headers']); } private function clearSqs(string $dsn): void diff --git a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/Connection.php b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/Connection.php index 5d5bfb36c65c..a492cde374a8 100644 --- a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/Connection.php +++ b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/Connection.php @@ -30,6 +30,7 @@ class Connection { private const AWS_SQS_FIFO_SUFFIX = '.fifo'; + private const MESSAGE_ATTRIBUTE_NAME = 'X-Symfony-Messenger'; private const DEFAULT_OPTIONS = [ 'buffer_size' => 9, @@ -200,7 +201,12 @@ private function fetchMessage(): bool foreach ($this->currentResponse->getMessages() as $message) { $headers = []; - foreach ($message->getMessageAttributes() as $name => $attribute) { + $attributes = $message->getMessageAttributes(); + if (isset($attributes[self::MESSAGE_ATTRIBUTE_NAME]) && 'String' === $attributes[self::MESSAGE_ATTRIBUTE_NAME]->getDataType()) { + $headers = json_decode($attributes[self::MESSAGE_ATTRIBUTE_NAME]->getStringValue(), true); + unset($attributes[self::MESSAGE_ATTRIBUTE_NAME]); + } + foreach ($attributes as $name => $attribute) { if ('String' !== $attribute->getDataType()) { continue; } @@ -284,13 +290,29 @@ public function send(string $body, array $headers, int $delay = 0, ?string $mess 'MessageAttributes' => [], ]; + $specialHeaders = []; foreach ($headers as $name => $value) { + if ('.' === $name[0] || self::MESSAGE_ATTRIBUTE_NAME === $name || \strlen($name) > 256 || '.' === substr($name, -1) || 'AWS.' === substr($name, 0, \strlen('AWS.')) || 'Amazon.' === substr($name, 0, \strlen('Amazon.')) || preg_match('/([^a-zA-Z0-9_\.-]+|\.\.)/', $name)) { + $specialHeaders[$name] = $value; + + continue; + } + $parameters['MessageAttributes'][$name] = new MessageAttributeValue([ 'DataType' => 'String', 'StringValue' => $value, ]); } + if (!empty($specialHeaders)) { + $parameters['MessageAttributes'][self::MESSAGE_ATTRIBUTE_NAME] = new MessageAttributeValue([ + 'DataType' => 'String', + 'StringValue' => json_encode($specialHeaders), + ]); + } + + dd($parameters); + if (self::isFifoQueue($this->configuration['queue_name'])) { $parameters['MessageGroupId'] = null !== $messageGroupId ? $messageGroupId : __METHOD__; $parameters['MessageDeduplicationId'] = null !== $messageDeduplicationId ? $messageDeduplicationId : sha1(json_encode(['body' => $body, 'headers' => $headers]));