[Messenger] return empty envelopes when RetryableException occurs
This commit is contained in:
parent
ccb3a4c2d5
commit
9add32a9ca
@ -11,9 +11,12 @@
|
||||
|
||||
namespace Symfony\Component\Messenger\Tests\Transport\Doctrine;
|
||||
|
||||
use Doctrine\DBAL\Driver\PDOException;
|
||||
use Doctrine\DBAL\Exception\DeadlockException;
|
||||
use PHPUnit\Framework\TestCase;
|
||||
use Symfony\Component\Messenger\Envelope;
|
||||
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
|
||||
use Symfony\Component\Messenger\Exception\TransportException;
|
||||
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
|
||||
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
||||
use Symfony\Component\Messenger\Transport\Doctrine\Connection;
|
||||
@ -68,6 +71,26 @@ class DoctrineReceiverTest extends TestCase
|
||||
$receiver->get();
|
||||
}
|
||||
|
||||
public function testOccursRetryableExceptionFromConnection()
|
||||
{
|
||||
$serializer = $this->createSerializer();
|
||||
$connection = $this->createMock(Connection::class);
|
||||
$driverException = new PDOException(new \PDOException('Deadlock', 40001));
|
||||
$connection->method('get')->willThrowException(new DeadlockException('Deadlock', $driverException));
|
||||
$receiver = new DoctrineReceiver($connection, $serializer);
|
||||
$this->assertSame([], $receiver->get());
|
||||
$this->assertSame([], $receiver->get());
|
||||
try {
|
||||
$receiver->get();
|
||||
} catch (TransportException $exception) {
|
||||
// skip, and retry
|
||||
}
|
||||
$this->assertSame([], $receiver->get());
|
||||
$this->assertSame([], $receiver->get());
|
||||
$this->expectException(TransportException::class);
|
||||
$receiver->get();
|
||||
}
|
||||
|
||||
public function testAll()
|
||||
{
|
||||
$serializer = $this->createSerializer();
|
||||
|
@ -12,6 +12,7 @@
|
||||
namespace Symfony\Component\Messenger\Transport\Doctrine;
|
||||
|
||||
use Doctrine\DBAL\DBALException;
|
||||
use Doctrine\DBAL\Exception\RetryableException;
|
||||
use Symfony\Component\Messenger\Envelope;
|
||||
use Symfony\Component\Messenger\Exception\LogicException;
|
||||
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
|
||||
@ -30,6 +31,8 @@ use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
|
||||
*/
|
||||
class DoctrineReceiver implements ReceiverInterface, MessageCountAwareInterface, ListableReceiverInterface
|
||||
{
|
||||
private const MAX_RETRIES = 3;
|
||||
private $retryingSafetyCounter = 0;
|
||||
private $connection;
|
||||
private $serializer;
|
||||
|
||||
@ -46,6 +49,17 @@ class DoctrineReceiver implements ReceiverInterface, MessageCountAwareInterface,
|
||||
{
|
||||
try {
|
||||
$doctrineEnvelope = $this->connection->get();
|
||||
$this->retryingSafetyCounter = 0; // reset counter
|
||||
} catch (RetryableException $exception) {
|
||||
// Do nothing when RetryableException occurs less than "MAX_RETRIES"
|
||||
// as it will likely be resolved on the next call to get()
|
||||
// Problem with concurrent consumers and database deadlocks
|
||||
if (++$this->retryingSafetyCounter >= self::MAX_RETRIES) {
|
||||
$this->retryingSafetyCounter = 0; // reset counter
|
||||
throw new TransportException($exception->getMessage(), 0, $exception);
|
||||
}
|
||||
|
||||
return [];
|
||||
} catch (DBALException $exception) {
|
||||
throw new TransportException($exception->getMessage(), 0, $exception);
|
||||
}
|
||||
|
@ -20,7 +20,7 @@
|
||||
"psr/log": "~1.0"
|
||||
},
|
||||
"require-dev": {
|
||||
"doctrine/dbal": "^2.5",
|
||||
"doctrine/dbal": "^2.6",
|
||||
"psr/cache": "~1.0",
|
||||
"symfony/console": "~3.4|~4.0",
|
||||
"symfony/debug": "~4.1",
|
||||
|
Reference in New Issue
Block a user