[Messenger] return empty envelopes when RetryableException occurs
This commit is contained in:
parent
ccb3a4c2d5
commit
9add32a9ca
@ -11,9 +11,12 @@
|
|||||||
|
|
||||||
namespace Symfony\Component\Messenger\Tests\Transport\Doctrine;
|
namespace Symfony\Component\Messenger\Tests\Transport\Doctrine;
|
||||||
|
|
||||||
|
use Doctrine\DBAL\Driver\PDOException;
|
||||||
|
use Doctrine\DBAL\Exception\DeadlockException;
|
||||||
use PHPUnit\Framework\TestCase;
|
use PHPUnit\Framework\TestCase;
|
||||||
use Symfony\Component\Messenger\Envelope;
|
use Symfony\Component\Messenger\Envelope;
|
||||||
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
|
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
|
||||||
|
use Symfony\Component\Messenger\Exception\TransportException;
|
||||||
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
|
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
|
||||||
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
||||||
use Symfony\Component\Messenger\Transport\Doctrine\Connection;
|
use Symfony\Component\Messenger\Transport\Doctrine\Connection;
|
||||||
@ -68,6 +71,26 @@ class DoctrineReceiverTest extends TestCase
|
|||||||
$receiver->get();
|
$receiver->get();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public function testOccursRetryableExceptionFromConnection()
|
||||||
|
{
|
||||||
|
$serializer = $this->createSerializer();
|
||||||
|
$connection = $this->createMock(Connection::class);
|
||||||
|
$driverException = new PDOException(new \PDOException('Deadlock', 40001));
|
||||||
|
$connection->method('get')->willThrowException(new DeadlockException('Deadlock', $driverException));
|
||||||
|
$receiver = new DoctrineReceiver($connection, $serializer);
|
||||||
|
$this->assertSame([], $receiver->get());
|
||||||
|
$this->assertSame([], $receiver->get());
|
||||||
|
try {
|
||||||
|
$receiver->get();
|
||||||
|
} catch (TransportException $exception) {
|
||||||
|
// skip, and retry
|
||||||
|
}
|
||||||
|
$this->assertSame([], $receiver->get());
|
||||||
|
$this->assertSame([], $receiver->get());
|
||||||
|
$this->expectException(TransportException::class);
|
||||||
|
$receiver->get();
|
||||||
|
}
|
||||||
|
|
||||||
public function testAll()
|
public function testAll()
|
||||||
{
|
{
|
||||||
$serializer = $this->createSerializer();
|
$serializer = $this->createSerializer();
|
||||||
|
@ -12,6 +12,7 @@
|
|||||||
namespace Symfony\Component\Messenger\Transport\Doctrine;
|
namespace Symfony\Component\Messenger\Transport\Doctrine;
|
||||||
|
|
||||||
use Doctrine\DBAL\DBALException;
|
use Doctrine\DBAL\DBALException;
|
||||||
|
use Doctrine\DBAL\Exception\RetryableException;
|
||||||
use Symfony\Component\Messenger\Envelope;
|
use Symfony\Component\Messenger\Envelope;
|
||||||
use Symfony\Component\Messenger\Exception\LogicException;
|
use Symfony\Component\Messenger\Exception\LogicException;
|
||||||
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
|
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
|
||||||
@ -30,6 +31,8 @@ use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
|
|||||||
*/
|
*/
|
||||||
class DoctrineReceiver implements ReceiverInterface, MessageCountAwareInterface, ListableReceiverInterface
|
class DoctrineReceiver implements ReceiverInterface, MessageCountAwareInterface, ListableReceiverInterface
|
||||||
{
|
{
|
||||||
|
private const MAX_RETRIES = 3;
|
||||||
|
private $retryingSafetyCounter = 0;
|
||||||
private $connection;
|
private $connection;
|
||||||
private $serializer;
|
private $serializer;
|
||||||
|
|
||||||
@ -46,6 +49,17 @@ class DoctrineReceiver implements ReceiverInterface, MessageCountAwareInterface,
|
|||||||
{
|
{
|
||||||
try {
|
try {
|
||||||
$doctrineEnvelope = $this->connection->get();
|
$doctrineEnvelope = $this->connection->get();
|
||||||
|
$this->retryingSafetyCounter = 0; // reset counter
|
||||||
|
} catch (RetryableException $exception) {
|
||||||
|
// Do nothing when RetryableException occurs less than "MAX_RETRIES"
|
||||||
|
// as it will likely be resolved on the next call to get()
|
||||||
|
// Problem with concurrent consumers and database deadlocks
|
||||||
|
if (++$this->retryingSafetyCounter >= self::MAX_RETRIES) {
|
||||||
|
$this->retryingSafetyCounter = 0; // reset counter
|
||||||
|
throw new TransportException($exception->getMessage(), 0, $exception);
|
||||||
|
}
|
||||||
|
|
||||||
|
return [];
|
||||||
} catch (DBALException $exception) {
|
} catch (DBALException $exception) {
|
||||||
throw new TransportException($exception->getMessage(), 0, $exception);
|
throw new TransportException($exception->getMessage(), 0, $exception);
|
||||||
}
|
}
|
||||||
|
@ -20,7 +20,7 @@
|
|||||||
"psr/log": "~1.0"
|
"psr/log": "~1.0"
|
||||||
},
|
},
|
||||||
"require-dev": {
|
"require-dev": {
|
||||||
"doctrine/dbal": "^2.5",
|
"doctrine/dbal": "^2.6",
|
||||||
"psr/cache": "~1.0",
|
"psr/cache": "~1.0",
|
||||||
"symfony/console": "~3.4|~4.0",
|
"symfony/console": "~3.4|~4.0",
|
||||||
"symfony/debug": "~4.1",
|
"symfony/debug": "~4.1",
|
||||||
|
Reference in New Issue
Block a user