bug #32979 [Messenger] return empty envelopes when RetryableException occurs (surikman)

This PR was squashed before being merged into the 4.3 branch (closes #32979).

Discussion
----------

[Messenger] return empty envelopes when RetryableException occurs

| Q             | A
| ------------- | ---
| Branch?       |  3.4 or 4.3 for bug fixes <!-- see below -->
| Bug fix?      | yes
| New feature?  | no <!-- please update src/**/CHANGELOG.md files -->
| BC breaks?    | no     <!-- see https://symfony.com/bc -->
| Deprecations? | no <!-- please update UPGRADE-*.md and src/**/CHANGELOG.md files -->
| Tests pass?   | yes    <!-- please add some, will be required by reviewers -->
| License       | MIT
| ~~Doc PR~~ | ~~symfony/symfony-docs#12109~~

<!--
Replace this notice by a short README for your feature/bugfix. This will help people
understand your PR and can be used as a start for the documentation.

Additionally (see https://symfony.com/roadmap):
 - Bug fixes must be submitted against the lowest maintained branch where they apply
   (lowest branches are regularly merged to upper ones so they get the fixes too).
 - Features and deprecations must be submitted against branch 4.4.
 - Legacy code removals go to the master branch.
-->

Problem occurs when you are using more than 1 worker with Doctrine Transport.

`Symfony\Component\Messenger\Transport\Doctrine\Connection::get` does a query `SELECT ... FOR UPDATE` and this locking query could lock table and workers stops. But using locks can result in dead locks or lock timeouts. Doctrine renders these SQL errors as RetryableExceptions. These exceptions are often normal if you are in a high concurrency environment. They can happen very often and your application should handle them properly.

Commits
-------

9add32a9ca [Messenger] return empty envelopes when RetryableException occurs
This commit is contained in:
Tobias Schultze 2019-09-27 13:21:51 +02:00
commit abf11d8ba8
3 changed files with 38 additions and 1 deletions

View File

@ -11,9 +11,12 @@
namespace Symfony\Component\Messenger\Tests\Transport\Doctrine;
use Doctrine\DBAL\Driver\PDOException;
use Doctrine\DBAL\Exception\DeadlockException;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
use Symfony\Component\Messenger\Exception\TransportException;
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\Doctrine\Connection;
@ -68,6 +71,26 @@ class DoctrineReceiverTest extends TestCase
$receiver->get();
}
public function testOccursRetryableExceptionFromConnection()
{
$serializer = $this->createSerializer();
$connection = $this->createMock(Connection::class);
$driverException = new PDOException(new \PDOException('Deadlock', 40001));
$connection->method('get')->willThrowException(new DeadlockException('Deadlock', $driverException));
$receiver = new DoctrineReceiver($connection, $serializer);
$this->assertSame([], $receiver->get());
$this->assertSame([], $receiver->get());
try {
$receiver->get();
} catch (TransportException $exception) {
// skip, and retry
}
$this->assertSame([], $receiver->get());
$this->assertSame([], $receiver->get());
$this->expectException(TransportException::class);
$receiver->get();
}
public function testAll()
{
$serializer = $this->createSerializer();

View File

@ -12,6 +12,7 @@
namespace Symfony\Component\Messenger\Transport\Doctrine;
use Doctrine\DBAL\DBALException;
use Doctrine\DBAL\Exception\RetryableException;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\LogicException;
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
@ -30,6 +31,8 @@ use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
*/
class DoctrineReceiver implements ReceiverInterface, MessageCountAwareInterface, ListableReceiverInterface
{
private const MAX_RETRIES = 3;
private $retryingSafetyCounter = 0;
private $connection;
private $serializer;
@ -46,6 +49,17 @@ class DoctrineReceiver implements ReceiverInterface, MessageCountAwareInterface,
{
try {
$doctrineEnvelope = $this->connection->get();
$this->retryingSafetyCounter = 0; // reset counter
} catch (RetryableException $exception) {
// Do nothing when RetryableException occurs less than "MAX_RETRIES"
// as it will likely be resolved on the next call to get()
// Problem with concurrent consumers and database deadlocks
if (++$this->retryingSafetyCounter >= self::MAX_RETRIES) {
$this->retryingSafetyCounter = 0; // reset counter
throw new TransportException($exception->getMessage(), 0, $exception);
}
return [];
} catch (DBALException $exception) {
throw new TransportException($exception->getMessage(), 0, $exception);
}

View File

@ -20,7 +20,7 @@
"psr/log": "~1.0"
},
"require-dev": {
"doctrine/dbal": "^2.5",
"doctrine/dbal": "^2.6",
"psr/cache": "~1.0",
"symfony/console": "~3.4|~4.0",
"symfony/debug": "~4.1",