diff --git a/src/Symfony/Component/Messenger/Tests/Transport/Doctrine/DoctrineReceiverTest.php b/src/Symfony/Component/Messenger/Tests/Transport/Doctrine/DoctrineReceiverTest.php index c284c2536b..9c8b359f09 100644 --- a/src/Symfony/Component/Messenger/Tests/Transport/Doctrine/DoctrineReceiverTest.php +++ b/src/Symfony/Component/Messenger/Tests/Transport/Doctrine/DoctrineReceiverTest.php @@ -11,9 +11,12 @@ namespace Symfony\Component\Messenger\Tests\Transport\Doctrine; +use Doctrine\DBAL\Driver\PDOException; +use Doctrine\DBAL\Exception\DeadlockException; use PHPUnit\Framework\TestCase; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Exception\MessageDecodingFailedException; +use Symfony\Component\Messenger\Exception\TransportException; use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp; use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; use Symfony\Component\Messenger\Transport\Doctrine\Connection; @@ -68,6 +71,26 @@ class DoctrineReceiverTest extends TestCase $receiver->get(); } + public function testOccursRetryableExceptionFromConnection() + { + $serializer = $this->createSerializer(); + $connection = $this->createMock(Connection::class); + $driverException = new PDOException(new \PDOException('Deadlock', 40001)); + $connection->method('get')->willThrowException(new DeadlockException('Deadlock', $driverException)); + $receiver = new DoctrineReceiver($connection, $serializer); + $this->assertSame([], $receiver->get()); + $this->assertSame([], $receiver->get()); + try { + $receiver->get(); + } catch (TransportException $exception) { + // skip, and retry + } + $this->assertSame([], $receiver->get()); + $this->assertSame([], $receiver->get()); + $this->expectException(TransportException::class); + $receiver->get(); + } + public function testAll() { $serializer = $this->createSerializer(); diff --git a/src/Symfony/Component/Messenger/Transport/Doctrine/DoctrineReceiver.php b/src/Symfony/Component/Messenger/Transport/Doctrine/DoctrineReceiver.php index a6a41e8c79..765a3fcffe 100644 --- a/src/Symfony/Component/Messenger/Transport/Doctrine/DoctrineReceiver.php +++ b/src/Symfony/Component/Messenger/Transport/Doctrine/DoctrineReceiver.php @@ -12,6 +12,7 @@ namespace Symfony\Component\Messenger\Transport\Doctrine; use Doctrine\DBAL\DBALException; +use Doctrine\DBAL\Exception\RetryableException; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Exception\LogicException; use Symfony\Component\Messenger\Exception\MessageDecodingFailedException; @@ -30,6 +31,8 @@ use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; */ class DoctrineReceiver implements ReceiverInterface, MessageCountAwareInterface, ListableReceiverInterface { + private const MAX_RETRIES = 3; + private $retryingSafetyCounter = 0; private $connection; private $serializer; @@ -46,6 +49,17 @@ class DoctrineReceiver implements ReceiverInterface, MessageCountAwareInterface, { try { $doctrineEnvelope = $this->connection->get(); + $this->retryingSafetyCounter = 0; // reset counter + } catch (RetryableException $exception) { + // Do nothing when RetryableException occurs less than "MAX_RETRIES" + // as it will likely be resolved on the next call to get() + // Problem with concurrent consumers and database deadlocks + if (++$this->retryingSafetyCounter >= self::MAX_RETRIES) { + $this->retryingSafetyCounter = 0; // reset counter + throw new TransportException($exception->getMessage(), 0, $exception); + } + + return []; } catch (DBALException $exception) { throw new TransportException($exception->getMessage(), 0, $exception); } diff --git a/src/Symfony/Component/Messenger/composer.json b/src/Symfony/Component/Messenger/composer.json index 798940ef5d..728c9fb63b 100644 --- a/src/Symfony/Component/Messenger/composer.json +++ b/src/Symfony/Component/Messenger/composer.json @@ -20,7 +20,7 @@ "psr/log": "~1.0" }, "require-dev": { - "doctrine/dbal": "^2.5", + "doctrine/dbal": "^2.6", "psr/cache": "~1.0", "symfony/console": "~3.4|~4.0", "symfony/debug": "~4.1",