Skip to content

Commit

Permalink
Fix invalid char in SQS Headers
Browse files Browse the repository at this point in the history
  • Loading branch information
jderusse authored and fabpot committed Jun 3, 2020
1 parent e5b5d9e commit 76a18b0
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 3 deletions.
Expand Up @@ -45,7 +45,7 @@ private function execute(string $dsn): void
$connection->setup();
$this->clearSqs($dsn);

$connection->send('{"message": "Hi"}', ['type' => DummyMessage::class]);
$connection->send('{"message": "Hi"}', ['type' => DummyMessage::class, DummyMessage::class => 'special']);
$this->assertSame(1, $connection->getMessageCount());

$wait = 0;
Expand All @@ -54,7 +54,7 @@ private function execute(string $dsn): void
}

$this->assertEquals('{"message": "Hi"}', $encoded['body']);
$this->assertEquals(['type' => DummyMessage::class], $encoded['headers']);
$this->assertEquals(['type' => DummyMessage::class, DummyMessage::class => 'special'], $encoded['headers']);
}

private function clearSqs(string $dsn): void
Expand Down
Expand Up @@ -30,6 +30,7 @@
class Connection
{
private const AWS_SQS_FIFO_SUFFIX = '.fifo';
private const MESSAGE_ATTRIBUTE_NAME = 'X-Symfony-Messenger';

private const DEFAULT_OPTIONS = [
'buffer_size' => 9,
Expand Down Expand Up @@ -200,7 +201,12 @@ private function fetchMessage(): bool

foreach ($this->currentResponse->getMessages() as $message) {
$headers = [];
foreach ($message->getMessageAttributes() as $name => $attribute) {
$attributes = $message->getMessageAttributes();
if (isset($attributes[self::MESSAGE_ATTRIBUTE_NAME]) && 'String' === $attributes[self::MESSAGE_ATTRIBUTE_NAME]->getDataType()) {
$headers = json_decode($attributes[self::MESSAGE_ATTRIBUTE_NAME]->getStringValue(), true);
unset($attributes[self::MESSAGE_ATTRIBUTE_NAME]);
}
foreach ($attributes as $name => $attribute) {
if ('String' !== $attribute->getDataType()) {
continue;
}
Expand Down Expand Up @@ -284,13 +290,29 @@ public function send(string $body, array $headers, int $delay = 0, ?string $mess
'MessageAttributes' => [],
];

$specialHeaders = [];
foreach ($headers as $name => $value) {
if ('.' === $name[0] || self::MESSAGE_ATTRIBUTE_NAME === $name || \strlen($name) > 256 || '.' === substr($name, -1) || 'AWS.' === substr($name, 0, \strlen('AWS.')) || 'Amazon.' === substr($name, 0, \strlen('Amazon.')) || preg_match('/([^a-zA-Z0-9_\.-]+|\.\.)/', $name)) {
$specialHeaders[$name] = $value;

continue;
}

$parameters['MessageAttributes'][$name] = new MessageAttributeValue([
'DataType' => 'String',
'StringValue' => $value,
]);
}

if (!empty($specialHeaders)) {
$parameters['MessageAttributes'][self::MESSAGE_ATTRIBUTE_NAME] = new MessageAttributeValue([
'DataType' => 'String',
'StringValue' => json_encode($specialHeaders),
]);
}

dd($parameters);

if (self::isFifoQueue($this->configuration['queue_name'])) {
$parameters['MessageGroupId'] = null !== $messageGroupId ? $messageGroupId : __METHOD__;
$parameters['MessageDeduplicationId'] = null !== $messageDeduplicationId ? $messageDeduplicationId : sha1(json_encode(['body' => $body, 'headers' => $headers]));
Expand Down

0 comments on commit 76a18b0

Please sign in to comment.