Skip to content

Commit

Permalink
Fix invalid char in SQS Headers
Browse files Browse the repository at this point in the history
  • Loading branch information
jderusse committed Jun 1, 2020
1 parent e5b5d9e commit b1a5394
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 9 deletions.
Expand Up @@ -45,7 +45,7 @@ private function execute(string $dsn): void
$connection->setup();
$this->clearSqs($dsn);

$connection->send('{"message": "Hi"}', ['type' => DummyMessage::class]);
$connection->send('{"message": "Hi"}', ['type' => DummyMessage::class, DummyMessage::class => 'special']);
$this->assertSame(1, $connection->getMessageCount());

$wait = 0;
Expand All @@ -54,7 +54,7 @@ private function execute(string $dsn): void
}

$this->assertEquals('{"message": "Hi"}', $encoded['body']);
$this->assertEquals(['type' => DummyMessage::class], $encoded['headers']);
$this->assertEquals(['type' => DummyMessage::class, DummyMessage::class => 'special'], $encoded['headers']);
}

private function clearSqs(string $dsn): void
Expand Down
Expand Up @@ -30,6 +30,7 @@
class Connection
{
private const AWS_SQS_FIFO_SUFFIX = '.fifo';
private const MESSAGE_ATTRIBUTE_NAME = 'X-Symfony-Messenger';

private const DEFAULT_OPTIONS = [
'buffer_size' => 9,
Expand Down Expand Up @@ -200,7 +201,12 @@ private function fetchMessage(): bool

foreach ($this->currentResponse->getMessages() as $message) {
$headers = [];
foreach ($message->getMessageAttributes() as $name => $attribute) {
$attributes = $message->getMessageAttributes();
if (isset($attributes[self::MESSAGE_ATTRIBUTE_NAME]) && 'String' === $attributes[self::MESSAGE_ATTRIBUTE_NAME]->getDataType()) {
$headers = \json_decode($attributes[self::MESSAGE_ATTRIBUTE_NAME]->getStringValue(), true);
unset($attributes[self::MESSAGE_ATTRIBUTE_NAME]);
}
foreach ($attributes as $name => $attribute) {
if ('String' !== $attribute->getDataType()) {
continue;
}
Expand Down Expand Up @@ -284,12 +290,10 @@ public function send(string $body, array $headers, int $delay = 0, ?string $mess
'MessageAttributes' => [],
];

foreach ($headers as $name => $value) {
$parameters['MessageAttributes'][$name] = new MessageAttributeValue([
'DataType' => 'String',
'StringValue' => $value,
]);
}
$parameters['MessageAttributes'][self::MESSAGE_ATTRIBUTE_NAME] = new MessageAttributeValue([
'DataType' => 'String',
'StringValue' => \json_encode($headers),
]);

if (self::isFifoQueue($this->configuration['queue_name'])) {
$parameters['MessageGroupId'] = null !== $messageGroupId ? $messageGroupId : __METHOD__;
Expand Down

0 comments on commit b1a5394

Please sign in to comment.