feature #36557 [Messenger] Add support for RecoverableException (jderusse)
This PR was squashed before being merged into the 5.1-dev branch.
Discussion
----------
[Messenger] Add support for RecoverableException
| Q | A
| ------------- | ---
| Branch? | master
| Bug fix? | no
| New feature? | yes
| Deprecations? | no
| Tickets | N/A
| License | MIT
| Doc PR | N/A
The messenger supports the `UnrecoverableException` preventing the messenger retry mechanism
when the Handler will never be able to process the Message.
This PR adds the opposite behavior to always retry the message.
UseCase:
- High concurency Consumers use non-blocking lock
- 503/429 errors from 3rd party API
Commits
-------
e7c31675f7
[Messenger] Add support for RecoverableException
This commit is contained in:
commit
0d4bba82b8
|
@ -8,7 +8,8 @@ CHANGELOG
|
|||
* Moved Doctrine transport to package `symfony/doctrine-messenger`. All classes in `Symfony\Component\Messenger\Transport\Doctrine` have been moved to `Symfony\Component\Messenger\Bridge\Doctrine\Transport`
|
||||
* Moved RedisExt transport to package `symfony/redis-messenger`. All classes in `Symfony\Component\Messenger\Transport\RedisExt` have been moved to `Symfony\Component\Messenger\Bridge\Redis\Transport`
|
||||
* Added support for passing a `\Throwable` argument to `RetryStrategyInterface` methods. This allows to define strategies based on the reason of the handling failure.
|
||||
* Added `StopWorkerOnFailureLimitListener` to stop the worker after a specified amount of failed messages is reached.
|
||||
* Added `StopWorkerOnFailureLimitListener` to stop the worker after a specified amount of failed messages is reached.
|
||||
* Added `RecoverableExceptionInterface` interface to force retry.
|
||||
|
||||
5.0.0
|
||||
-----
|
||||
|
|
|
@ -16,6 +16,7 @@ use Symfony\Component\EventDispatcher\EventSubscriberInterface;
|
|||
use Symfony\Component\Messenger\Envelope;
|
||||
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
|
||||
use Symfony\Component\Messenger\Exception\HandlerFailedException;
|
||||
use Symfony\Component\Messenger\Exception\RecoverableExceptionInterface;
|
||||
use Symfony\Component\Messenger\Exception\RuntimeException;
|
||||
use Symfony\Component\Messenger\Exception\UnrecoverableExceptionInterface;
|
||||
use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
|
||||
|
@ -87,10 +88,19 @@ class SendFailedMessageForRetryListener implements EventSubscriberInterface
|
|||
|
||||
private function shouldRetry(\Throwable $e, Envelope $envelope, RetryStrategyInterface $retryStrategy): bool
|
||||
{
|
||||
if ($e instanceof RecoverableExceptionInterface) {
|
||||
return true;
|
||||
}
|
||||
|
||||
// if one or more nested Exceptions is an instance of RecoverableExceptionInterface we should retry
|
||||
// if ALL nested Exceptions are an instance of UnrecoverableExceptionInterface we should not retry
|
||||
if ($e instanceof HandlerFailedException) {
|
||||
$shouldNotRetry = true;
|
||||
foreach ($e->getNestedExceptions() as $nestedException) {
|
||||
if ($nestedException instanceof RecoverableExceptionInterface) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (!$nestedException instanceof UnrecoverableExceptionInterface) {
|
||||
$shouldNotRetry = false;
|
||||
break;
|
||||
|
|
|
@ -0,0 +1,24 @@
|
|||
<?php
|
||||
|
||||
/*
|
||||
* This file is part of the Symfony package.
|
||||
*
|
||||
* (c) Fabien Potencier <fabien@symfony.com>
|
||||
*
|
||||
* For the full copyright and license information, please view the LICENSE
|
||||
* file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
namespace Symfony\Component\Messenger\Exception;
|
||||
|
||||
/**
|
||||
* Marker interface for exceptions to indicate that handling a message should have worked.
|
||||
*
|
||||
* If something goes wrong while handling a message that's received from a transport
|
||||
* and the message should must be retried, a handler can throw such an exception.
|
||||
*
|
||||
* @author Jérémy Derussé <jeremy@derusse.com>
|
||||
*/
|
||||
interface RecoverableExceptionInterface extends \Throwable
|
||||
{
|
||||
}
|
|
@ -0,0 +1,21 @@
|
|||
<?php
|
||||
|
||||
/*
|
||||
* This file is part of the Symfony package.
|
||||
*
|
||||
* (c) Fabien Potencier <fabien@symfony.com>
|
||||
*
|
||||
* For the full copyright and license information, please view the LICENSE
|
||||
* file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
namespace Symfony\Component\Messenger\Exception;
|
||||
|
||||
/**
|
||||
* A concrete implementation of RecoverableExceptionInterface that can be used directly.
|
||||
*
|
||||
* @author Frederic Bouchery <frederic@bouchery.fr>
|
||||
*/
|
||||
class RecoverableMessageHandlingException extends RuntimeException implements RecoverableExceptionInterface
|
||||
{
|
||||
}
|
|
@ -16,6 +16,7 @@ use Psr\Container\ContainerInterface;
|
|||
use Symfony\Component\Messenger\Envelope;
|
||||
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
|
||||
use Symfony\Component\Messenger\EventListener\SendFailedMessageForRetryListener;
|
||||
use Symfony\Component\Messenger\Exception\RecoverableMessageHandlingException;
|
||||
use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
|
||||
use Symfony\Component\Messenger\Stamp\DelayStamp;
|
||||
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
|
||||
|
@ -40,6 +41,42 @@ class SendFailedMessageForRetryListenerTest extends TestCase
|
|||
$listener->onMessageFailed($event);
|
||||
}
|
||||
|
||||
public function testRecoverableStrategyCausesRetry()
|
||||
{
|
||||
$sender = $this->createMock(SenderInterface::class);
|
||||
$sender->expects($this->once())->method('send')->willReturnCallback(function (Envelope $envelope) {
|
||||
/** @var DelayStamp $delayStamp */
|
||||
$delayStamp = $envelope->last(DelayStamp::class);
|
||||
/** @var RedeliveryStamp $redeliveryStamp */
|
||||
$redeliveryStamp = $envelope->last(RedeliveryStamp::class);
|
||||
|
||||
$this->assertInstanceOf(DelayStamp::class, $delayStamp);
|
||||
$this->assertSame(1000, $delayStamp->getDelay());
|
||||
|
||||
$this->assertInstanceOf(RedeliveryStamp::class, $redeliveryStamp);
|
||||
$this->assertSame(1, $redeliveryStamp->getRetryCount());
|
||||
|
||||
return $envelope;
|
||||
});
|
||||
$senderLocator = $this->createMock(ContainerInterface::class);
|
||||
$senderLocator->expects($this->once())->method('has')->willReturn(true);
|
||||
$senderLocator->expects($this->once())->method('get')->willReturn($sender);
|
||||
$retryStategy = $this->createMock(RetryStrategyInterface::class);
|
||||
$retryStategy->expects($this->never())->method('isRetryable');
|
||||
$retryStategy->expects($this->once())->method('getWaitingTime')->willReturn(1000);
|
||||
$retryStrategyLocator = $this->createMock(ContainerInterface::class);
|
||||
$retryStrategyLocator->expects($this->once())->method('has')->willReturn(true);
|
||||
$retryStrategyLocator->expects($this->once())->method('get')->willReturn($retryStategy);
|
||||
|
||||
$listener = new SendFailedMessageForRetryListener($senderLocator, $retryStrategyLocator);
|
||||
|
||||
$exception = new RecoverableMessageHandlingException('retry');
|
||||
$envelope = new Envelope(new \stdClass());
|
||||
$event = new WorkerMessageFailedEvent($envelope, 'my_receiver', $exception);
|
||||
|
||||
$listener->onMessageFailed($event);
|
||||
}
|
||||
|
||||
public function testEnvelopeIsSentToTransportOnRetry()
|
||||
{
|
||||
$exception = new \Exception('no!');
|
||||
|
|
Reference in New Issue