Skip to content

Commit

Permalink
minor #54606 [Messenger] Messenger: avoid DELETE statement flood when…
Browse files Browse the repository at this point in the history
… using Doctrine+MySQL backend (giosh94mhz)

This PR was submitted for the 5.4 branch but it was merged into the 7.1 branch instead.

Discussion
----------

[Messenger] Messenger: avoid DELETE statement flood when using Doctrine+MySQL backend

| Q             | A
| ------------- | ---
| Branch       | 7.1
| Bug fix?      | no
| New feature?  | no
| Deprecations? | no
| Issues        |
| License       | MIT

### The issue

I found an issue with the  Doctrine bridge of Messenger when using MySQL. In version 4.4 (commit 12271a4 ) the MySQL removal of acknowledged/rejected was implemented by updating the queued message ID with a fake date (i.e. 9999-12-31) and then removing the messages later.

The problem is that this DELETE statement is done every time a connection is checked for job, which usually happens every seconds (in my case, even multiple time per seconds since I use multiple consumers and queues).

Since I use MySQL binary logging and replica, a quick count of recent logged queries highlighted the issue (it's a quick `cat binlog | sort | uniq -c | sort -n`):

```
     30 UPDATE my_job SET delivered_at = 'NNNN-NN-NN NN:NN:NN' WHERE id = 'NNNNNNN'
   3621 DELETE FROM my_job WHERE delivered_at = 'NNNN-NN-NN NN:NN:NN'
```

As you can see, for only 30 messages handles, there were 3600 (1 hour) DELETE statements.

### The fix

I've added a private variable to track if an update has been made, as so if a DELETE  is required. This effectively stops the flood.

I kept the default to `false` to reduce this value to minimum, and after verifying that all invalid records are not queried thanks to `createAvailableMessagesQueryBuilder`.

### Unit tests

There's no regression on actual tests, but I haven't found a specific test to update for this. I'm not sure how this could be implemented, so if needed I waiting for hints!

Commits
-------

662c8e4 Messenger: avoid DELETE statement flood when using Doctrine+MySQL backend
  • Loading branch information
fabpot committed May 2, 2024
2 parents e6c875e + 662c8e4 commit e2664e8
Showing 1 changed file with 14 additions and 3 deletions.
Expand Up @@ -52,6 +52,7 @@ class Connection implements ResetInterface
protected ?float $queueEmptiedAt = null;

private bool $autoSetup;
private bool $doMysqlCleanup = false;

/**
* Constructor.
Expand All @@ -75,6 +76,7 @@ public function __construct(
public function reset(): void
{
$this->queueEmptiedAt = null;
$this->doMysqlCleanup = false;
}

public function getConfiguration(): array
Expand Down Expand Up @@ -152,9 +154,10 @@ public function send(string $body, array $headers, int $delay = 0): string

public function get(): ?array
{
if ($this->driverConnection->getDatabasePlatform() instanceof AbstractMySQLPlatform) {
if ($this->doMysqlCleanup && $this->driverConnection->getDatabasePlatform() instanceof AbstractMySQLPlatform) {
try {
$this->driverConnection->delete($this->configuration['table_name'], ['delivered_at' => '9999-12-31 23:59:59']);
$this->doMysqlCleanup = false;
} catch (DriverException $e) {
// Ignore the exception
} catch (TableNotFoundException $e) {
Expand Down Expand Up @@ -252,7 +255,11 @@ public function ack(string $id): bool
{
try {
if ($this->driverConnection->getDatabasePlatform() instanceof AbstractMySQLPlatform) {
return $this->driverConnection->update($this->configuration['table_name'], ['delivered_at' => '9999-12-31 23:59:59'], ['id' => $id]) > 0;
if ($updated = $this->driverConnection->update($this->configuration['table_name'], ['delivered_at' => '9999-12-31 23:59:59'], ['id' => $id]) > 0) {
$this->doMysqlCleanup = true;
}

return $updated;
}

return $this->driverConnection->delete($this->configuration['table_name'], ['id' => $id]) > 0;
Expand All @@ -265,7 +272,11 @@ public function reject(string $id): bool
{
try {
if ($this->driverConnection->getDatabasePlatform() instanceof AbstractMySQLPlatform) {
return $this->driverConnection->update($this->configuration['table_name'], ['delivered_at' => '9999-12-31 23:59:59'], ['id' => $id]) > 0;
if ($updated = $this->driverConnection->update($this->configuration['table_name'], ['delivered_at' => '9999-12-31 23:59:59'], ['id' => $id]) > 0) {
$this->doMysqlCleanup = true;
}

return $updated;
}

return $this->driverConnection->delete($this->configuration['table_name'], ['id' => $id]) > 0;
Expand Down

0 comments on commit e2664e8

Please sign in to comment.