Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Messenger] enhance the retry command #54186

Open
lyrixx opened this issue Mar 7, 2024 · 0 comments
Open

[Messenger] enhance the retry command #54186

lyrixx opened this issue Mar 7, 2024 · 0 comments
Labels
Messenger RFC RFC = Request For Comments (proposals about features that you want to be discussed)

Comments

@lyrixx
Copy link
Member

lyrixx commented Mar 7, 2024

Description

A bit of history : We had a failure in our application and a lot (~50K) of messages went to the dead letter queue. ATM, when running the retry command, it processes messages synchronously. So it was super slow to recover. However, we have a lot of workers in our infra. But the retry command did not leverage this power.

I propose to add a --dispatch option, to send all messages to their transports instead of processing them synchronously.

More over, I also propose to add an option --class to allow filter messages of certain type.

WDYT?

If this is accepted, I would be glad to contribute to Symfony


BTW, to fix this issue, we create this command as a workarround

command.php

namespace AppBundle\Command\Tool\Messenger;

use Doctrine\DBAL\ArrayParameterType;
use Doctrine\DBAL\Connection;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;

#[AsCommand(
    name: 'redirectionio:messenger:retry-failed-messages',
    description: 'Redispatch all log messages.'
)]
class RetryAllMessagesCommand extends Command
{
    public function __construct(
        private readonly Connection $connection,
        private readonly SerializerInterface $serializer,
        private readonly MessageBusInterface $bus,
    ) {
        parent::__construct();
    }

    protected function configure()
    {
        $this
            ->addArgument('class', InputArgument::REQUIRED, 'The class of the message to retry (FQCN)')
        ;
    }

    protected function execute(InputInterface $input, OutputInterface $output): int
    {
        $class = $input->getArgument('class');
        if (!class_exists($class)) {
            $output->writeln(sprintf('<error>Class "%s" does not exist</error>', $class));

            return Command::FAILURE;
        }

        $nativeConnection = $this->connection->getNativeConnection();

        if ($nativeConnection instanceof \PDO) {
            $nativeConnection->setAttribute(\PDO::MYSQL_ATTR_USE_BUFFERED_QUERY, false);
        }

        $result = $this->connection->createQueryBuilder()
            ->select('m.*')
            ->from('messenger_messages', 'm')
            ->where('m.queue_name = "failed"')
            ->orderBy('m.created_at', 'ASC')
            ->executeQuery()
        ;

        $ids = [];

        while ($message = $result->fetchAssociative()) {
            try {
                $envelope = $this->serializer->decode([
                    'body' => $message['body'],
                    'headers' => $message['headers'],
                ]);
            } catch (\Throwable $e) {
                // some class might not exist anymore, we skip
                $output->writeln(sprintf('<error>Skipping message "%s"</error>, error: "%s"', $message['id'], $e->getMessage()));

                continue;
            }

            if (!is_a($envelope->getMessage(), $input->getArgument('class'))) {
                continue;
            }

            $this->bus->dispatch($envelope->getMessage());
            $ids[] = $message['id'];

            $output->writeln(sprintf('Retrying message %s', $message['id']));
        }

        if ($nativeConnection instanceof \PDO) {
            $nativeConnection->setAttribute(\PDO::MYSQL_ATTR_USE_BUFFERED_QUERY, true);
        }

        if (\count($ids) > 0) {
            $this->connection->createQueryBuilder()
                ->delete('messenger_messages')
                ->where('id IN (:ids)')
                ->setParameter('ids', $ids, ArrayParameterType::INTEGER)
                ->executeStatement()
            ;
        }

        return Command::SUCCESS;
    }
}

@lyrixx lyrixx added the RFC RFC = Request For Comments (proposals about features that you want to be discussed) label Mar 7, 2024
@lyrixx lyrixx changed the title [Messenger] enhance de retry command [Messenger] enhance the retry command Mar 7, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Messenger RFC RFC = Request For Comments (proposals about features that you want to be discussed)
Projects
None yet
Development

No branches or pull requests

2 participants