You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
A bit of history : We had a failure in our application and a lot (~50K) of messages went to the dead letter queue. ATM, when running the retry command, it processes messages synchronously. So it was super slow to recover. However, we have a lot of workers in our infra. But the retry command did not leverage this power.
I propose to add a --dispatch option, to send all messages to their transports instead of processing them synchronously.
More over, I also propose to add an option --class to allow filter messages of certain type.
WDYT?
If this is accepted, I would be glad to contribute to Symfony
BTW, to fix this issue, we create this command as a workarround
command.php
namespaceAppBundle\Command\Tool\Messenger;
useDoctrine\DBAL\ArrayParameterType;
useDoctrine\DBAL\Connection;
useSymfony\Component\Console\Attribute\AsCommand;
useSymfony\Component\Console\Command\Command;
useSymfony\Component\Console\Input\InputArgument;
useSymfony\Component\Console\Input\InputInterface;
useSymfony\Component\Console\Output\OutputInterface;
useSymfony\Component\Messenger\MessageBusInterface;
useSymfony\Component\Messenger\Transport\Serialization\SerializerInterface;
#[AsCommand(
name: 'redirectionio:messenger:retry-failed-messages',
description: 'Redispatch all log messages.'
)]
classRetryAllMessagesCommandextendsCommand
{
publicfunction__construct(
private readonly Connection$connection,
private readonly SerializerInterface$serializer,
private readonly MessageBusInterface$bus,
) {
parent::__construct();
}
protectedfunctionconfigure()
{
$this
->addArgument('class', InputArgument::REQUIRED, 'The class of the message to retry (FQCN)')
;
}
protectedfunctionexecute(InputInterface$input, OutputInterface$output): int
{
$class = $input->getArgument('class');
if (!class_exists($class)) {
$output->writeln(sprintf('<error>Class "%s" does not exist</error>', $class));
returnCommand::FAILURE;
}
$nativeConnection = $this->connection->getNativeConnection();
if ($nativeConnection instanceof \PDO) {
$nativeConnection->setAttribute(\PDO::MYSQL_ATTR_USE_BUFFERED_QUERY, false);
}
$result = $this->connection->createQueryBuilder()
->select('m.*')
->from('messenger_messages', 'm')
->where('m.queue_name = "failed"')
->orderBy('m.created_at', 'ASC')
->executeQuery()
;
$ids = [];
while ($message = $result->fetchAssociative()) {
try {
$envelope = $this->serializer->decode([
'body' => $message['body'],
'headers' => $message['headers'],
]);
} catch (\Throwable$e) {
// some class might not exist anymore, we skip$output->writeln(sprintf('<error>Skipping message "%s"</error>, error: "%s"', $message['id'], $e->getMessage()));
continue;
}
if (!is_a($envelope->getMessage(), $input->getArgument('class'))) {
continue;
}
$this->bus->dispatch($envelope->getMessage());
$ids[] = $message['id'];
$output->writeln(sprintf('Retrying message %s', $message['id']));
}
if ($nativeConnection instanceof \PDO) {
$nativeConnection->setAttribute(\PDO::MYSQL_ATTR_USE_BUFFERED_QUERY, true);
}
if (\count($ids) > 0) {
$this->connection->createQueryBuilder()
->delete('messenger_messages')
->where('id IN (:ids)')
->setParameter('ids', $ids, ArrayParameterType::INTEGER)
->executeStatement()
;
}
returnCommand::SUCCESS;
}
}
The text was updated successfully, but these errors were encountered:
Description
A bit of history : We had a failure in our application and a lot (~50K) of messages went to the dead letter queue. ATM, when running the retry command, it processes messages synchronously. So it was super slow to recover. However, we have a lot of workers in our infra. But the retry command did not leverage this power.
I propose to add a
--dispatch
option, to send all messages to their transports instead of processing them synchronously.More over, I also propose to add an option
--class
to allow filter messages of certain type.WDYT?
If this is accepted, I would be glad to contribute to Symfony
BTW, to fix this issue, we create this command as a workarround
command.php
The text was updated successfully, but these errors were encountered: