[Messenger] Add support for RecoverableException
This commit is contained in:
parent
39aab260d4
commit
e7c31675f7
@ -9,6 +9,7 @@ CHANGELOG
|
|||||||
* Moved RedisExt transport to package `symfony/redis-messenger`. All classes in `Symfony\Component\Messenger\Transport\RedisExt` have been moved to `Symfony\Component\Messenger\Bridge\Redis\Transport`
|
* Moved RedisExt transport to package `symfony/redis-messenger`. All classes in `Symfony\Component\Messenger\Transport\RedisExt` have been moved to `Symfony\Component\Messenger\Bridge\Redis\Transport`
|
||||||
* Added support for passing a `\Throwable` argument to `RetryStrategyInterface` methods. This allows to define strategies based on the reason of the handling failure.
|
* Added support for passing a `\Throwable` argument to `RetryStrategyInterface` methods. This allows to define strategies based on the reason of the handling failure.
|
||||||
* Added `StopWorkerOnFailureLimitListener` to stop the worker after a specified amount of failed messages is reached.
|
* Added `StopWorkerOnFailureLimitListener` to stop the worker after a specified amount of failed messages is reached.
|
||||||
|
* Added `RecoverableExceptionInterface` interface to force retry.
|
||||||
|
|
||||||
5.0.0
|
5.0.0
|
||||||
-----
|
-----
|
||||||
|
@ -16,6 +16,7 @@ use Symfony\Component\EventDispatcher\EventSubscriberInterface;
|
|||||||
use Symfony\Component\Messenger\Envelope;
|
use Symfony\Component\Messenger\Envelope;
|
||||||
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
|
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
|
||||||
use Symfony\Component\Messenger\Exception\HandlerFailedException;
|
use Symfony\Component\Messenger\Exception\HandlerFailedException;
|
||||||
|
use Symfony\Component\Messenger\Exception\RecoverableExceptionInterface;
|
||||||
use Symfony\Component\Messenger\Exception\RuntimeException;
|
use Symfony\Component\Messenger\Exception\RuntimeException;
|
||||||
use Symfony\Component\Messenger\Exception\UnrecoverableExceptionInterface;
|
use Symfony\Component\Messenger\Exception\UnrecoverableExceptionInterface;
|
||||||
use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
|
use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
|
||||||
@ -87,10 +88,19 @@ class SendFailedMessageForRetryListener implements EventSubscriberInterface
|
|||||||
|
|
||||||
private function shouldRetry(\Throwable $e, Envelope $envelope, RetryStrategyInterface $retryStrategy): bool
|
private function shouldRetry(\Throwable $e, Envelope $envelope, RetryStrategyInterface $retryStrategy): bool
|
||||||
{
|
{
|
||||||
|
if ($e instanceof RecoverableExceptionInterface) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
// if one or more nested Exceptions is an instance of RecoverableExceptionInterface we should retry
|
||||||
// if ALL nested Exceptions are an instance of UnrecoverableExceptionInterface we should not retry
|
// if ALL nested Exceptions are an instance of UnrecoverableExceptionInterface we should not retry
|
||||||
if ($e instanceof HandlerFailedException) {
|
if ($e instanceof HandlerFailedException) {
|
||||||
$shouldNotRetry = true;
|
$shouldNotRetry = true;
|
||||||
foreach ($e->getNestedExceptions() as $nestedException) {
|
foreach ($e->getNestedExceptions() as $nestedException) {
|
||||||
|
if ($nestedException instanceof RecoverableExceptionInterface) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
if (!$nestedException instanceof UnrecoverableExceptionInterface) {
|
if (!$nestedException instanceof UnrecoverableExceptionInterface) {
|
||||||
$shouldNotRetry = false;
|
$shouldNotRetry = false;
|
||||||
break;
|
break;
|
||||||
|
@ -0,0 +1,24 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This file is part of the Symfony package.
|
||||||
|
*
|
||||||
|
* (c) Fabien Potencier <fabien@symfony.com>
|
||||||
|
*
|
||||||
|
* For the full copyright and license information, please view the LICENSE
|
||||||
|
* file that was distributed with this source code.
|
||||||
|
*/
|
||||||
|
|
||||||
|
namespace Symfony\Component\Messenger\Exception;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Marker interface for exceptions to indicate that handling a message should have worked.
|
||||||
|
*
|
||||||
|
* If something goes wrong while handling a message that's received from a transport
|
||||||
|
* and the message should must be retried, a handler can throw such an exception.
|
||||||
|
*
|
||||||
|
* @author Jérémy Derussé <jeremy@derusse.com>
|
||||||
|
*/
|
||||||
|
interface RecoverableExceptionInterface extends \Throwable
|
||||||
|
{
|
||||||
|
}
|
@ -0,0 +1,21 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This file is part of the Symfony package.
|
||||||
|
*
|
||||||
|
* (c) Fabien Potencier <fabien@symfony.com>
|
||||||
|
*
|
||||||
|
* For the full copyright and license information, please view the LICENSE
|
||||||
|
* file that was distributed with this source code.
|
||||||
|
*/
|
||||||
|
|
||||||
|
namespace Symfony\Component\Messenger\Exception;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A concrete implementation of RecoverableExceptionInterface that can be used directly.
|
||||||
|
*
|
||||||
|
* @author Frederic Bouchery <frederic@bouchery.fr>
|
||||||
|
*/
|
||||||
|
class RecoverableMessageHandlingException extends RuntimeException implements RecoverableExceptionInterface
|
||||||
|
{
|
||||||
|
}
|
@ -16,6 +16,7 @@ use Psr\Container\ContainerInterface;
|
|||||||
use Symfony\Component\Messenger\Envelope;
|
use Symfony\Component\Messenger\Envelope;
|
||||||
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
|
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
|
||||||
use Symfony\Component\Messenger\EventListener\SendFailedMessageForRetryListener;
|
use Symfony\Component\Messenger\EventListener\SendFailedMessageForRetryListener;
|
||||||
|
use Symfony\Component\Messenger\Exception\RecoverableMessageHandlingException;
|
||||||
use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
|
use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
|
||||||
use Symfony\Component\Messenger\Stamp\DelayStamp;
|
use Symfony\Component\Messenger\Stamp\DelayStamp;
|
||||||
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
|
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
|
||||||
@ -40,6 +41,42 @@ class SendFailedMessageForRetryListenerTest extends TestCase
|
|||||||
$listener->onMessageFailed($event);
|
$listener->onMessageFailed($event);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public function testRecoverableStrategyCausesRetry()
|
||||||
|
{
|
||||||
|
$sender = $this->createMock(SenderInterface::class);
|
||||||
|
$sender->expects($this->once())->method('send')->willReturnCallback(function (Envelope $envelope) {
|
||||||
|
/** @var DelayStamp $delayStamp */
|
||||||
|
$delayStamp = $envelope->last(DelayStamp::class);
|
||||||
|
/** @var RedeliveryStamp $redeliveryStamp */
|
||||||
|
$redeliveryStamp = $envelope->last(RedeliveryStamp::class);
|
||||||
|
|
||||||
|
$this->assertInstanceOf(DelayStamp::class, $delayStamp);
|
||||||
|
$this->assertSame(1000, $delayStamp->getDelay());
|
||||||
|
|
||||||
|
$this->assertInstanceOf(RedeliveryStamp::class, $redeliveryStamp);
|
||||||
|
$this->assertSame(1, $redeliveryStamp->getRetryCount());
|
||||||
|
|
||||||
|
return $envelope;
|
||||||
|
});
|
||||||
|
$senderLocator = $this->createMock(ContainerInterface::class);
|
||||||
|
$senderLocator->expects($this->once())->method('has')->willReturn(true);
|
||||||
|
$senderLocator->expects($this->once())->method('get')->willReturn($sender);
|
||||||
|
$retryStategy = $this->createMock(RetryStrategyInterface::class);
|
||||||
|
$retryStategy->expects($this->never())->method('isRetryable');
|
||||||
|
$retryStategy->expects($this->once())->method('getWaitingTime')->willReturn(1000);
|
||||||
|
$retryStrategyLocator = $this->createMock(ContainerInterface::class);
|
||||||
|
$retryStrategyLocator->expects($this->once())->method('has')->willReturn(true);
|
||||||
|
$retryStrategyLocator->expects($this->once())->method('get')->willReturn($retryStategy);
|
||||||
|
|
||||||
|
$listener = new SendFailedMessageForRetryListener($senderLocator, $retryStrategyLocator);
|
||||||
|
|
||||||
|
$exception = new RecoverableMessageHandlingException('retry');
|
||||||
|
$envelope = new Envelope(new \stdClass());
|
||||||
|
$event = new WorkerMessageFailedEvent($envelope, 'my_receiver', $exception);
|
||||||
|
|
||||||
|
$listener->onMessageFailed($event);
|
||||||
|
}
|
||||||
|
|
||||||
public function testEnvelopeIsSentToTransportOnRetry()
|
public function testEnvelopeIsSentToTransportOnRetry()
|
||||||
{
|
{
|
||||||
$exception = new \Exception('no!');
|
$exception = new \Exception('no!');
|
||||||
|
Reference in New Issue
Block a user