[Messenger] Add support for RecoverableException

This commit is contained in:
Jérémy Derussé 2020-04-24 01:17:44 +02:00 committed by Fabien Potencier
parent 39aab260d4
commit e7c31675f7
5 changed files with 94 additions and 1 deletions

View File

@ -8,7 +8,8 @@ CHANGELOG
* Moved Doctrine transport to package `symfony/doctrine-messenger`. All classes in `Symfony\Component\Messenger\Transport\Doctrine` have been moved to `Symfony\Component\Messenger\Bridge\Doctrine\Transport`
* Moved RedisExt transport to package `symfony/redis-messenger`. All classes in `Symfony\Component\Messenger\Transport\RedisExt` have been moved to `Symfony\Component\Messenger\Bridge\Redis\Transport`
* Added support for passing a `\Throwable` argument to `RetryStrategyInterface` methods. This allows to define strategies based on the reason of the handling failure.
* Added `StopWorkerOnFailureLimitListener` to stop the worker after a specified amount of failed messages is reached.
* Added `StopWorkerOnFailureLimitListener` to stop the worker after a specified amount of failed messages is reached.
* Added `RecoverableExceptionInterface` interface to force retry.
5.0.0
-----

View File

@ -16,6 +16,7 @@ use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
use Symfony\Component\Messenger\Exception\HandlerFailedException;
use Symfony\Component\Messenger\Exception\RecoverableExceptionInterface;
use Symfony\Component\Messenger\Exception\RuntimeException;
use Symfony\Component\Messenger\Exception\UnrecoverableExceptionInterface;
use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
@ -87,10 +88,19 @@ class SendFailedMessageForRetryListener implements EventSubscriberInterface
private function shouldRetry(\Throwable $e, Envelope $envelope, RetryStrategyInterface $retryStrategy): bool
{
if ($e instanceof RecoverableExceptionInterface) {
return true;
}
// if one or more nested Exceptions is an instance of RecoverableExceptionInterface we should retry
// if ALL nested Exceptions are an instance of UnrecoverableExceptionInterface we should not retry
if ($e instanceof HandlerFailedException) {
$shouldNotRetry = true;
foreach ($e->getNestedExceptions() as $nestedException) {
if ($nestedException instanceof RecoverableExceptionInterface) {
return true;
}
if (!$nestedException instanceof UnrecoverableExceptionInterface) {
$shouldNotRetry = false;
break;

View File

@ -0,0 +1,24 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Exception;
/**
* Marker interface for exceptions to indicate that handling a message should have worked.
*
* If something goes wrong while handling a message that's received from a transport
* and the message should must be retried, a handler can throw such an exception.
*
* @author Jérémy Derussé <jeremy@derusse.com>
*/
interface RecoverableExceptionInterface extends \Throwable
{
}

View File

@ -0,0 +1,21 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Exception;
/**
* A concrete implementation of RecoverableExceptionInterface that can be used directly.
*
* @author Frederic Bouchery <frederic@bouchery.fr>
*/
class RecoverableMessageHandlingException extends RuntimeException implements RecoverableExceptionInterface
{
}

View File

@ -16,6 +16,7 @@ use Psr\Container\ContainerInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
use Symfony\Component\Messenger\EventListener\SendFailedMessageForRetryListener;
use Symfony\Component\Messenger\Exception\RecoverableMessageHandlingException;
use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
use Symfony\Component\Messenger\Stamp\DelayStamp;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
@ -40,6 +41,42 @@ class SendFailedMessageForRetryListenerTest extends TestCase
$listener->onMessageFailed($event);
}
public function testRecoverableStrategyCausesRetry()
{
$sender = $this->createMock(SenderInterface::class);
$sender->expects($this->once())->method('send')->willReturnCallback(function (Envelope $envelope) {
/** @var DelayStamp $delayStamp */
$delayStamp = $envelope->last(DelayStamp::class);
/** @var RedeliveryStamp $redeliveryStamp */
$redeliveryStamp = $envelope->last(RedeliveryStamp::class);
$this->assertInstanceOf(DelayStamp::class, $delayStamp);
$this->assertSame(1000, $delayStamp->getDelay());
$this->assertInstanceOf(RedeliveryStamp::class, $redeliveryStamp);
$this->assertSame(1, $redeliveryStamp->getRetryCount());
return $envelope;
});
$senderLocator = $this->createMock(ContainerInterface::class);
$senderLocator->expects($this->once())->method('has')->willReturn(true);
$senderLocator->expects($this->once())->method('get')->willReturn($sender);
$retryStategy = $this->createMock(RetryStrategyInterface::class);
$retryStategy->expects($this->never())->method('isRetryable');
$retryStategy->expects($this->once())->method('getWaitingTime')->willReturn(1000);
$retryStrategyLocator = $this->createMock(ContainerInterface::class);
$retryStrategyLocator->expects($this->once())->method('has')->willReturn(true);
$retryStrategyLocator->expects($this->once())->method('get')->willReturn($retryStategy);
$listener = new SendFailedMessageForRetryListener($senderLocator, $retryStrategyLocator);
$exception = new RecoverableMessageHandlingException('retry');
$envelope = new Envelope(new \stdClass());
$event = new WorkerMessageFailedEvent($envelope, 'my_receiver', $exception);
$listener->onMessageFailed($event);
}
public function testEnvelopeIsSentToTransportOnRetry()
{
$exception = new \Exception('no!');