[Messenger] extract worker logic to listener and sent messages for retry
and failure directly to transport instead of redispatching on the bus
This commit is contained in:
parent
cf10c02765
commit
d7e0f98cd0
@ -154,6 +154,11 @@ Messenger
|
|||||||
|
|
||||||
* Deprecated passing a `ContainerInterface` instance as first argument of the `ConsumeMessagesCommand` constructor,
|
* Deprecated passing a `ContainerInterface` instance as first argument of the `ConsumeMessagesCommand` constructor,
|
||||||
pass a `RoutableMessageBus` instance instead.
|
pass a `RoutableMessageBus` instance instead.
|
||||||
|
* [BC BREAK] Removed `SendersLocatorInterface::getSenderByAlias` added in 4.3.
|
||||||
|
* [BC BREAK] Removed `$retryStrategies` argument from `Worker::__construct`.
|
||||||
|
* [BC BREAK] Removed `$retryStrategyLocator` argument from `ConsumeMessagesCommand::__construct`.
|
||||||
|
* [BC BREAK] Removed `$senderClassOrAlias` argument from `RedeliveryStamp::__construct`.
|
||||||
|
* [BC BREAK] Removed `UnknownSenderException`.
|
||||||
|
|
||||||
Mime
|
Mime
|
||||||
----
|
----
|
||||||
|
@ -1810,20 +1810,29 @@ class FrameworkExtension extends Extension
|
|||||||
$messageToSendersMapping[$message] = $messageConfiguration['senders'];
|
$messageToSendersMapping[$message] = $messageConfiguration['senders'];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
$sendersServiceLocator = ServiceLocatorTagPass::register($container, $senderReferences);
|
||||||
|
|
||||||
$container->getDefinition('messenger.senders_locator')
|
$container->getDefinition('messenger.senders_locator')
|
||||||
->replaceArgument(0, $messageToSendersMapping)
|
->replaceArgument(0, $messageToSendersMapping)
|
||||||
->replaceArgument(1, ServiceLocatorTagPass::register($container, $senderReferences))
|
->replaceArgument(1, $sendersServiceLocator)
|
||||||
|
;
|
||||||
|
|
||||||
|
$container->getDefinition('messenger.retry.send_failed_message_for_retry_listener')
|
||||||
|
->replaceArgument(0, $sendersServiceLocator)
|
||||||
;
|
;
|
||||||
|
|
||||||
$container->getDefinition('messenger.retry_strategy_locator')
|
$container->getDefinition('messenger.retry_strategy_locator')
|
||||||
->replaceArgument(0, $transportRetryReferences);
|
->replaceArgument(0, $transportRetryReferences);
|
||||||
|
|
||||||
if ($config['failure_transport']) {
|
if ($config['failure_transport']) {
|
||||||
|
if (!isset($senderReferences[$config['failure_transport']])) {
|
||||||
|
throw new LogicException(sprintf('Invalid Messenger configuration: the failure transport "%s" is not a valid transport or service id.', $config['failure_transport']));
|
||||||
|
}
|
||||||
|
|
||||||
$container->getDefinition('messenger.failure.send_failed_message_to_failure_transport_listener')
|
$container->getDefinition('messenger.failure.send_failed_message_to_failure_transport_listener')
|
||||||
->replaceArgument(1, $config['failure_transport']);
|
->replaceArgument(0, $senderReferences[$config['failure_transport']]);
|
||||||
$container->getDefinition('console.command.messenger_failed_messages_retry')
|
$container->getDefinition('console.command.messenger_failed_messages_retry')
|
||||||
->replaceArgument(0, $config['failure_transport'])
|
->replaceArgument(0, $config['failure_transport']);
|
||||||
->replaceArgument(4, $transportRetryReferences[$config['failure_transport']] ?? null);
|
|
||||||
$container->getDefinition('console.command.messenger_failed_messages_show')
|
$container->getDefinition('console.command.messenger_failed_messages_show')
|
||||||
->replaceArgument(0, $config['failure_transport']);
|
->replaceArgument(0, $config['failure_transport']);
|
||||||
$container->getDefinition('console.command.messenger_failed_messages_remove')
|
$container->getDefinition('console.command.messenger_failed_messages_remove')
|
||||||
|
@ -86,7 +86,6 @@
|
|||||||
<argument type="service" id="messenger.receiver_locator" />
|
<argument type="service" id="messenger.receiver_locator" />
|
||||||
<argument type="service" id="logger" on-invalid="null" />
|
<argument type="service" id="logger" on-invalid="null" />
|
||||||
<argument type="collection" /> <!-- Receiver names -->
|
<argument type="collection" /> <!-- Receiver names -->
|
||||||
<argument type="service" id="messenger.retry_strategy_locator" />
|
|
||||||
<argument type="service" id="event_dispatcher" />
|
<argument type="service" id="event_dispatcher" />
|
||||||
<call method="setCachePoolForRestartSignal">
|
<call method="setCachePoolForRestartSignal">
|
||||||
<argument type="service" id="cache.messenger.restart_workers_signal" />
|
<argument type="service" id="cache.messenger.restart_workers_signal" />
|
||||||
@ -116,10 +115,9 @@
|
|||||||
|
|
||||||
<service id="console.command.messenger_failed_messages_retry" class="Symfony\Component\Messenger\Command\FailedMessagesRetryCommand">
|
<service id="console.command.messenger_failed_messages_retry" class="Symfony\Component\Messenger\Command\FailedMessagesRetryCommand">
|
||||||
<argument /> <!-- Receiver name -->
|
<argument /> <!-- Receiver name -->
|
||||||
<argument /> <!-- Receiver locator -->
|
<argument /> <!-- Receiver -->
|
||||||
<argument type="service" id="messenger.routable_message_bus" />
|
<argument type="service" id="messenger.routable_message_bus" />
|
||||||
<argument type="service" id="event_dispatcher" />
|
<argument type="service" id="event_dispatcher" />
|
||||||
<argument /> <!-- Retry strategy -->
|
|
||||||
<argument type="service" id="logger" />
|
<argument type="service" id="logger" />
|
||||||
|
|
||||||
<tag name="console.command" command="messenger:failed:retry" />
|
<tag name="console.command" command="messenger:failed:retry" />
|
||||||
@ -127,14 +125,14 @@
|
|||||||
|
|
||||||
<service id="console.command.messenger_failed_messages_show" class="Symfony\Component\Messenger\Command\FailedMessagesShowCommand">
|
<service id="console.command.messenger_failed_messages_show" class="Symfony\Component\Messenger\Command\FailedMessagesShowCommand">
|
||||||
<argument /> <!-- Receiver name -->
|
<argument /> <!-- Receiver name -->
|
||||||
<argument /> <!-- Receiver locator -->
|
<argument /> <!-- Receiver -->
|
||||||
|
|
||||||
<tag name="console.command" command="messenger:failed:show" />
|
<tag name="console.command" command="messenger:failed:show" />
|
||||||
</service>
|
</service>
|
||||||
|
|
||||||
<service id="console.command.messenger_failed_messages_remove" class="Symfony\Component\Messenger\Command\FailedMessagesRemoveCommand">
|
<service id="console.command.messenger_failed_messages_remove" class="Symfony\Component\Messenger\Command\FailedMessagesRemoveCommand">
|
||||||
<argument /> <!-- Receiver name -->
|
<argument /> <!-- Receiver name -->
|
||||||
<argument /> <!-- Receiver locator -->
|
<argument /> <!-- Receiver -->
|
||||||
|
|
||||||
<tag name="console.command" command="messenger:failed:remove" />
|
<tag name="console.command" command="messenger:failed:remove" />
|
||||||
</service>
|
</service>
|
||||||
|
@ -10,7 +10,7 @@
|
|||||||
<!-- Asynchronous -->
|
<!-- Asynchronous -->
|
||||||
<service id="messenger.senders_locator" class="Symfony\Component\Messenger\Transport\Sender\SendersLocator">
|
<service id="messenger.senders_locator" class="Symfony\Component\Messenger\Transport\Sender\SendersLocator">
|
||||||
<argument type="collection" /> <!-- Per message senders map -->
|
<argument type="collection" /> <!-- Per message senders map -->
|
||||||
<argument /> <!-- senders locator -->
|
<argument /> <!-- senders service locator -->
|
||||||
</service>
|
</service>
|
||||||
<service id="messenger.middleware.send_message" class="Symfony\Component\Messenger\Middleware\SendMessageMiddleware">
|
<service id="messenger.middleware.send_message" class="Symfony\Component\Messenger\Middleware\SendMessageMiddleware">
|
||||||
<tag name="monolog.logger" channel="messenger" />
|
<tag name="monolog.logger" channel="messenger" />
|
||||||
@ -98,12 +98,19 @@
|
|||||||
<argument /> <!-- max delay ms -->
|
<argument /> <!-- max delay ms -->
|
||||||
</service>
|
</service>
|
||||||
|
|
||||||
|
<service id="messenger.retry.send_failed_message_for_retry_listener" class="Symfony\Component\Messenger\EventListener\SendFailedMessageForRetryListener">
|
||||||
|
<tag name="kernel.event_subscriber" />
|
||||||
|
<tag name="monolog.logger" channel="messenger" />
|
||||||
|
<argument /> <!-- senders service locator -->
|
||||||
|
<argument type="service" id="messenger.retry_strategy_locator" />
|
||||||
|
<argument type="service" id="logger" on-invalid="ignore" />
|
||||||
|
</service>
|
||||||
|
|
||||||
<!-- failed handling -->
|
<!-- failed handling -->
|
||||||
<service id="messenger.failure.send_failed_message_to_failure_transport_listener" class="Symfony\Component\Messenger\EventListener\SendFailedMessageToFailureTransportListener">
|
<service id="messenger.failure.send_failed_message_to_failure_transport_listener" class="Symfony\Component\Messenger\EventListener\SendFailedMessageToFailureTransportListener">
|
||||||
<tag name="kernel.event_subscriber" />
|
<tag name="kernel.event_subscriber" />
|
||||||
<tag name="monolog.logger" channel="messenger" />
|
<tag name="monolog.logger" channel="messenger" />
|
||||||
<argument type="service" id="messenger.routable_message_bus" />
|
<argument /> <!-- Failure transport -->
|
||||||
<argument /> <!-- Failure transport name -->
|
|
||||||
<argument type="service" id="logger" on-invalid="ignore" />
|
<argument type="service" id="logger" on-invalid="ignore" />
|
||||||
</service>
|
</service>
|
||||||
|
|
||||||
|
@ -45,7 +45,7 @@
|
|||||||
"symfony/http-client": "^4.4|^5.0",
|
"symfony/http-client": "^4.4|^5.0",
|
||||||
"symfony/lock": "^4.4|^5.0",
|
"symfony/lock": "^4.4|^5.0",
|
||||||
"symfony/mailer": "^4.4|^5.0",
|
"symfony/mailer": "^4.4|^5.0",
|
||||||
"symfony/messenger": "^4.3.6|^5.0",
|
"symfony/messenger": "^4.4|^5.0",
|
||||||
"symfony/mime": "^4.4|^5.0",
|
"symfony/mime": "^4.4|^5.0",
|
||||||
"symfony/process": "^3.4|^4.0|^5.0",
|
"symfony/process": "^3.4|^4.0|^5.0",
|
||||||
"symfony/security-csrf": "^3.4|^4.0|^5.0",
|
"symfony/security-csrf": "^3.4|^4.0|^5.0",
|
||||||
@ -77,7 +77,7 @@
|
|||||||
"symfony/form": "<4.3",
|
"symfony/form": "<4.3",
|
||||||
"symfony/lock": "<4.4",
|
"symfony/lock": "<4.4",
|
||||||
"symfony/mailer": "<4.4",
|
"symfony/mailer": "<4.4",
|
||||||
"symfony/messenger": "<4.3.6",
|
"symfony/messenger": "<4.4",
|
||||||
"symfony/mime": "<4.4",
|
"symfony/mime": "<4.4",
|
||||||
"symfony/property-info": "<3.4",
|
"symfony/property-info": "<3.4",
|
||||||
"symfony/security-bundle": "<4.4",
|
"symfony/security-bundle": "<4.4",
|
||||||
|
@ -11,6 +11,12 @@ CHANGELOG
|
|||||||
* Made all dispatched worker event classes final.
|
* Made all dispatched worker event classes final.
|
||||||
* Added support for `from_transport` attribute on `messenger.message_handler` tag.
|
* Added support for `from_transport` attribute on `messenger.message_handler` tag.
|
||||||
* Added support for passing `dbindex` as a query parameter to the redis transport DSN.
|
* Added support for passing `dbindex` as a query parameter to the redis transport DSN.
|
||||||
|
* [BC BREAK] Removed `SendersLocatorInterface::getSenderByAlias` added in 4.3.
|
||||||
|
* [BC BREAK] Removed `$retryStrategies` argument from `Worker::__construct`.
|
||||||
|
* [BC BREAK] Removed `$retryStrategyLocator` argument from `ConsumeMessagesCommand::__construct`.
|
||||||
|
* [BC BREAK] Removed `$senderClassOrAlias` argument from `RedeliveryStamp::__construct`.
|
||||||
|
* [BC BREAK] Removed `UnknownSenderException`.
|
||||||
|
* The component is not marked as `@experimental` anymore.
|
||||||
|
|
||||||
4.3.0
|
4.3.0
|
||||||
-----
|
-----
|
||||||
|
@ -89,7 +89,7 @@ abstract class AbstractFailedMessagesCommand extends Command
|
|||||||
$redeliveryStamps = $envelope->all(RedeliveryStamp::class);
|
$redeliveryStamps = $envelope->all(RedeliveryStamp::class);
|
||||||
$io->writeln(' Message history:');
|
$io->writeln(' Message history:');
|
||||||
foreach ($redeliveryStamps as $redeliveryStamp) {
|
foreach ($redeliveryStamps as $redeliveryStamp) {
|
||||||
$io->writeln(sprintf(' * Message failed and redelivered to the <info>%s</info> transport at <info>%s</info>', $redeliveryStamp->getSenderClassOrAlias(), $redeliveryStamp->getRedeliveredAt()->format('Y-m-d H:i:s')));
|
$io->writeln(sprintf(' * Message failed at <info>%s</info> and was redelivered', $redeliveryStamp->getRedeliveredAt()->format('Y-m-d H:i:s')));
|
||||||
}
|
}
|
||||||
$io->newLine();
|
$io->newLine();
|
||||||
|
|
||||||
|
@ -42,7 +42,6 @@ class ConsumeMessagesCommand extends Command
|
|||||||
private $receiverLocator;
|
private $receiverLocator;
|
||||||
private $logger;
|
private $logger;
|
||||||
private $receiverNames;
|
private $receiverNames;
|
||||||
private $retryStrategyLocator;
|
|
||||||
private $eventDispatcher;
|
private $eventDispatcher;
|
||||||
/** @var CacheItemPoolInterface|null */
|
/** @var CacheItemPoolInterface|null */
|
||||||
private $restartSignalCachePool;
|
private $restartSignalCachePool;
|
||||||
@ -50,7 +49,7 @@ class ConsumeMessagesCommand extends Command
|
|||||||
/**
|
/**
|
||||||
* @param RoutableMessageBus $routableBus
|
* @param RoutableMessageBus $routableBus
|
||||||
*/
|
*/
|
||||||
public function __construct($routableBus, ContainerInterface $receiverLocator, LoggerInterface $logger = null, array $receiverNames = [], /* ContainerInterface */ $retryStrategyLocator = null, EventDispatcherInterface $eventDispatcher = null)
|
public function __construct($routableBus, ContainerInterface $receiverLocator, LoggerInterface $logger = null, array $receiverNames = [], /* EventDispatcherInterface */ $eventDispatcher = null)
|
||||||
{
|
{
|
||||||
if ($routableBus instanceof ContainerInterface) {
|
if ($routableBus instanceof ContainerInterface) {
|
||||||
@trigger_error(sprintf('Passing a "%s" instance as first argument to "%s()" is deprecated since Symfony 4.4, pass a "%s" instance instead.', ContainerInterface::class, __METHOD__, RoutableMessageBus::class), E_USER_DEPRECATED);
|
@trigger_error(sprintf('Passing a "%s" instance as first argument to "%s()" is deprecated since Symfony 4.4, pass a "%s" instance instead.', ContainerInterface::class, __METHOD__, RoutableMessageBus::class), E_USER_DEPRECATED);
|
||||||
@ -59,17 +58,16 @@ class ConsumeMessagesCommand extends Command
|
|||||||
throw new \TypeError(sprintf('The first argument must be an instance of "%s".', RoutableMessageBus::class));
|
throw new \TypeError(sprintf('The first argument must be an instance of "%s".', RoutableMessageBus::class));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (\is_array($retryStrategyLocator)) {
|
if (null !== $eventDispatcher && !$eventDispatcher instanceof EventDispatcherInterface) {
|
||||||
@trigger_error(sprintf('The 5th argument of the class "%s" should be a retry-strategy locator, an array of bus names as a value is deprecated since Symfony 4.3.', __CLASS__), E_USER_DEPRECATED);
|
@trigger_error(sprintf('The 5th argument of the class "%s" should be a "%s"', __CLASS__, EventDispatcherInterface::class), E_USER_DEPRECATED);
|
||||||
|
|
||||||
$retryStrategyLocator = null;
|
$eventDispatcher = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
$this->routableBus = $routableBus;
|
$this->routableBus = $routableBus;
|
||||||
$this->receiverLocator = $receiverLocator;
|
$this->receiverLocator = $receiverLocator;
|
||||||
$this->logger = $logger;
|
$this->logger = $logger;
|
||||||
$this->receiverNames = $receiverNames;
|
$this->receiverNames = $receiverNames;
|
||||||
$this->retryStrategyLocator = $retryStrategyLocator;
|
|
||||||
$this->eventDispatcher = $eventDispatcher;
|
$this->eventDispatcher = $eventDispatcher;
|
||||||
|
|
||||||
parent::__construct();
|
parent::__construct();
|
||||||
@ -166,7 +164,6 @@ EOF
|
|||||||
}
|
}
|
||||||
|
|
||||||
$receivers = [];
|
$receivers = [];
|
||||||
$retryStrategies = [];
|
|
||||||
foreach ($receiverNames = $input->getArgument('receivers') as $receiverName) {
|
foreach ($receiverNames = $input->getArgument('receivers') as $receiverName) {
|
||||||
if (!$this->receiverLocator->has($receiverName)) {
|
if (!$this->receiverLocator->has($receiverName)) {
|
||||||
$message = sprintf('The receiver "%s" does not exist.', $receiverName);
|
$message = sprintf('The receiver "%s" does not exist.', $receiverName);
|
||||||
@ -177,17 +174,12 @@ EOF
|
|||||||
throw new RuntimeException($message);
|
throw new RuntimeException($message);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (null !== $this->retryStrategyLocator && !$this->retryStrategyLocator->has($receiverName)) {
|
|
||||||
throw new RuntimeException(sprintf('Receiver "%s" does not have a configured retry strategy.', $receiverName));
|
|
||||||
}
|
|
||||||
|
|
||||||
$receivers[$receiverName] = $this->receiverLocator->get($receiverName);
|
$receivers[$receiverName] = $this->receiverLocator->get($receiverName);
|
||||||
$retryStrategies[$receiverName] = null !== $this->retryStrategyLocator ? $this->retryStrategyLocator->get($receiverName) : null;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
$bus = $input->getOption('bus') ? $this->routableBus->getMessageBus($input->getOption('bus')) : $this->routableBus;
|
$bus = $input->getOption('bus') ? $this->routableBus->getMessageBus($input->getOption('bus')) : $this->routableBus;
|
||||||
|
|
||||||
$worker = new Worker($receivers, $bus, $retryStrategies, $this->eventDispatcher, $this->logger);
|
$worker = new Worker($receivers, $bus, $this->eventDispatcher, $this->logger);
|
||||||
$stopsWhen = [];
|
$stopsWhen = [];
|
||||||
if ($limit = $input->getOption('limit')) {
|
if ($limit = $input->getOption('limit')) {
|
||||||
$stopsWhen[] = "processed {$limit} messages";
|
$stopsWhen[] = "processed {$limit} messages";
|
||||||
|
@ -24,7 +24,6 @@ use Symfony\Component\Messenger\Envelope;
|
|||||||
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
|
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
|
||||||
use Symfony\Component\Messenger\Exception\LogicException;
|
use Symfony\Component\Messenger\Exception\LogicException;
|
||||||
use Symfony\Component\Messenger\MessageBusInterface;
|
use Symfony\Component\Messenger\MessageBusInterface;
|
||||||
use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
|
|
||||||
use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
|
use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
|
||||||
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
|
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
|
||||||
use Symfony\Component\Messenger\Transport\Receiver\SingleMessageReceiver;
|
use Symfony\Component\Messenger\Transport\Receiver\SingleMessageReceiver;
|
||||||
@ -39,14 +38,12 @@ class FailedMessagesRetryCommand extends AbstractFailedMessagesCommand
|
|||||||
|
|
||||||
private $eventDispatcher;
|
private $eventDispatcher;
|
||||||
private $messageBus;
|
private $messageBus;
|
||||||
private $retryStrategy;
|
|
||||||
private $logger;
|
private $logger;
|
||||||
|
|
||||||
public function __construct(string $receiverName, ReceiverInterface $receiver, MessageBusInterface $messageBus, EventDispatcherInterface $eventDispatcher, RetryStrategyInterface $retryStrategy = null, LoggerInterface $logger = null)
|
public function __construct(string $receiverName, ReceiverInterface $receiver, MessageBusInterface $messageBus, EventDispatcherInterface $eventDispatcher, LoggerInterface $logger = null)
|
||||||
{
|
{
|
||||||
$this->eventDispatcher = $eventDispatcher;
|
$this->eventDispatcher = $eventDispatcher;
|
||||||
$this->messageBus = $messageBus;
|
$this->messageBus = $messageBus;
|
||||||
$this->retryStrategy = $retryStrategy;
|
|
||||||
$this->logger = $logger;
|
$this->logger = $logger;
|
||||||
|
|
||||||
parent::__construct($receiverName, $receiver);
|
parent::__construct($receiverName, $receiver);
|
||||||
@ -180,7 +177,6 @@ EOF
|
|||||||
$worker = new Worker(
|
$worker = new Worker(
|
||||||
[$this->getReceiverName() => $receiver],
|
[$this->getReceiverName() => $receiver],
|
||||||
$this->messageBus,
|
$this->messageBus,
|
||||||
[$this->getReceiverName() => $this->retryStrategy],
|
|
||||||
$this->eventDispatcher,
|
$this->eventDispatcher,
|
||||||
$this->logger
|
$this->logger
|
||||||
);
|
);
|
||||||
|
@ -21,12 +21,11 @@ use Symfony\Component\Messenger\Envelope;
|
|||||||
final class WorkerMessageFailedEvent extends AbstractWorkerMessageEvent
|
final class WorkerMessageFailedEvent extends AbstractWorkerMessageEvent
|
||||||
{
|
{
|
||||||
private $throwable;
|
private $throwable;
|
||||||
private $willRetry;
|
private $willRetry = false;
|
||||||
|
|
||||||
public function __construct(Envelope $envelope, string $receiverName, \Throwable $error, bool $willRetry)
|
public function __construct(Envelope $envelope, string $receiverName, \Throwable $error)
|
||||||
{
|
{
|
||||||
$this->throwable = $error;
|
$this->throwable = $error;
|
||||||
$this->willRetry = $willRetry;
|
|
||||||
|
|
||||||
parent::__construct($envelope, $receiverName);
|
parent::__construct($envelope, $receiverName);
|
||||||
}
|
}
|
||||||
@ -40,4 +39,9 @@ final class WorkerMessageFailedEvent extends AbstractWorkerMessageEvent
|
|||||||
{
|
{
|
||||||
return $this->willRetry;
|
return $this->willRetry;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public function setForRetry(): void
|
||||||
|
{
|
||||||
|
$this->willRetry = true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,126 @@
|
|||||||
|
<?php
|
||||||
|
/*
|
||||||
|
* This file is part of the Symfony package.
|
||||||
|
*
|
||||||
|
* (c) Fabien Potencier <fabien@symfony.com>
|
||||||
|
*
|
||||||
|
* For the full copyright and license information, please view the LICENSE
|
||||||
|
* file that was distributed with this source code.
|
||||||
|
*/
|
||||||
|
|
||||||
|
namespace Symfony\Component\Messenger\EventListener;
|
||||||
|
|
||||||
|
use Psr\Container\ContainerInterface;
|
||||||
|
use Psr\Log\LoggerInterface;
|
||||||
|
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
|
||||||
|
use Symfony\Component\Messenger\Envelope;
|
||||||
|
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
|
||||||
|
use Symfony\Component\Messenger\Exception\HandlerFailedException;
|
||||||
|
use Symfony\Component\Messenger\Exception\RuntimeException;
|
||||||
|
use Symfony\Component\Messenger\Exception\UnrecoverableExceptionInterface;
|
||||||
|
use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
|
||||||
|
use Symfony\Component\Messenger\Stamp\DelayStamp;
|
||||||
|
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
|
||||||
|
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author Tobias Schultze <http://tobion.de>
|
||||||
|
*/
|
||||||
|
class SendFailedMessageForRetryListener implements EventSubscriberInterface
|
||||||
|
{
|
||||||
|
private $sendersLocator;
|
||||||
|
private $retryStrategyLocator;
|
||||||
|
private $logger;
|
||||||
|
|
||||||
|
public function __construct(ContainerInterface $sendersLocator, ContainerInterface $retryStrategyLocator, LoggerInterface $logger = null)
|
||||||
|
{
|
||||||
|
$this->sendersLocator = $sendersLocator;
|
||||||
|
$this->retryStrategyLocator = $retryStrategyLocator;
|
||||||
|
$this->logger = $logger;
|
||||||
|
}
|
||||||
|
|
||||||
|
public function onMessageFailed(WorkerMessageFailedEvent $event)
|
||||||
|
{
|
||||||
|
$retryStrategy = $this->getRetryStrategyForTransport($event->getReceiverName());
|
||||||
|
$envelope = $event->getEnvelope();
|
||||||
|
$throwable = $event->getThrowable();
|
||||||
|
|
||||||
|
$message = $envelope->getMessage();
|
||||||
|
$context = [
|
||||||
|
'message' => $message,
|
||||||
|
'class' => \get_class($message),
|
||||||
|
];
|
||||||
|
|
||||||
|
$shouldRetry = $retryStrategy && $this->shouldRetry($throwable, $envelope, $retryStrategy);
|
||||||
|
|
||||||
|
$retryCount = RedeliveryStamp::getRetryCountFromEnvelope($envelope);
|
||||||
|
if ($shouldRetry) {
|
||||||
|
$event->setForRetry();
|
||||||
|
|
||||||
|
++$retryCount;
|
||||||
|
$delay = $retryStrategy->getWaitingTime($envelope);
|
||||||
|
if (null !== $this->logger) {
|
||||||
|
$this->logger->error('Error thrown while handling message {class}. Sending for retry #{retryCount} using {delay} ms delay. Error: "{error}"', $context + ['retryCount' => $retryCount, 'delay' => $delay, 'error' => $throwable->getMessage(), 'exception' => $throwable]);
|
||||||
|
}
|
||||||
|
|
||||||
|
// add the delay and retry stamp info
|
||||||
|
$retryEnvelope = $envelope->with(new DelayStamp($delay), new RedeliveryStamp($retryCount));
|
||||||
|
|
||||||
|
// re-send the message for retry
|
||||||
|
$this->getSenderForTransport($event->getReceiverName())->send($retryEnvelope);
|
||||||
|
} else {
|
||||||
|
if (null !== $this->logger) {
|
||||||
|
$this->logger->critical('Error thrown while handling message {class}. Removing from transport after {retryCount} retries. Error: "{error}"', $context + ['retryCount' => $retryCount, 'error' => $throwable->getMessage(), 'exception' => $throwable]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static function getSubscribedEvents()
|
||||||
|
{
|
||||||
|
return [
|
||||||
|
// must have higher priority than SendFailedMessageToFailureTransportListener
|
||||||
|
WorkerMessageFailedEvent::class => ['onMessageFailed', 100],
|
||||||
|
];
|
||||||
|
}
|
||||||
|
|
||||||
|
private function shouldRetry(\Throwable $e, Envelope $envelope, RetryStrategyInterface $retryStrategy): bool
|
||||||
|
{
|
||||||
|
// if ALL nested Exceptions are an instance of UnrecoverableExceptionInterface we should not retry
|
||||||
|
if ($e instanceof HandlerFailedException) {
|
||||||
|
$shouldNotRetry = true;
|
||||||
|
foreach ($e->getNestedExceptions() as $nestedException) {
|
||||||
|
if (!$nestedException instanceof UnrecoverableExceptionInterface) {
|
||||||
|
$shouldNotRetry = false;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if ($shouldNotRetry) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if ($e instanceof UnrecoverableExceptionInterface) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return $retryStrategy->isRetryable($envelope);
|
||||||
|
}
|
||||||
|
|
||||||
|
private function getRetryStrategyForTransport(string $alias): ?RetryStrategyInterface
|
||||||
|
{
|
||||||
|
if ($this->retryStrategyLocator->has($alias)) {
|
||||||
|
return $this->retryStrategyLocator->get($alias);
|
||||||
|
}
|
||||||
|
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
private function getSenderForTransport(string $alias): SenderInterface
|
||||||
|
{
|
||||||
|
if ($this->sendersLocator->has($alias)) {
|
||||||
|
return $this->sendersLocator->get($alias);
|
||||||
|
}
|
||||||
|
|
||||||
|
throw new RuntimeException(sprintf('Could not find sender "%s" based on the same receiver to send the failed message to for retry.', $alias));
|
||||||
|
}
|
||||||
|
}
|
@ -15,12 +15,10 @@ use Symfony\Component\ErrorRenderer\Exception\FlattenException;
|
|||||||
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
|
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
|
||||||
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
|
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
|
||||||
use Symfony\Component\Messenger\Exception\HandlerFailedException;
|
use Symfony\Component\Messenger\Exception\HandlerFailedException;
|
||||||
use Symfony\Component\Messenger\MessageBusInterface;
|
|
||||||
use Symfony\Component\Messenger\Stamp\DelayStamp;
|
use Symfony\Component\Messenger\Stamp\DelayStamp;
|
||||||
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
|
|
||||||
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
|
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
|
||||||
use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
|
use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
|
||||||
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
|
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sends a rejected message to a "failure transport".
|
* Sends a rejected message to a "failure transport".
|
||||||
@ -29,14 +27,12 @@ use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
|
|||||||
*/
|
*/
|
||||||
class SendFailedMessageToFailureTransportListener implements EventSubscriberInterface
|
class SendFailedMessageToFailureTransportListener implements EventSubscriberInterface
|
||||||
{
|
{
|
||||||
private $messageBus;
|
private $failureSender;
|
||||||
private $failureSenderAlias;
|
|
||||||
private $logger;
|
private $logger;
|
||||||
|
|
||||||
public function __construct(MessageBusInterface $messageBus, string $failureSenderAlias, LoggerInterface $logger = null)
|
public function __construct(SenderInterface $failureSender, LoggerInterface $logger = null)
|
||||||
{
|
{
|
||||||
$this->messageBus = $messageBus;
|
$this->failureSender = $failureSender;
|
||||||
$this->failureSenderAlias = $failureSenderAlias;
|
|
||||||
$this->logger = $logger;
|
$this->logger = $logger;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -53,27 +49,26 @@ class SendFailedMessageToFailureTransportListener implements EventSubscriberInte
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// remove the received stamp so it's redelivered
|
|
||||||
$throwable = $event->getThrowable();
|
$throwable = $event->getThrowable();
|
||||||
if ($throwable instanceof HandlerFailedException) {
|
if ($throwable instanceof HandlerFailedException) {
|
||||||
$throwable = $throwable->getNestedExceptions()[0];
|
$throwable = $throwable->getNestedExceptions()[0];
|
||||||
}
|
}
|
||||||
|
|
||||||
$flattenedException = class_exists(FlattenException::class) ? FlattenException::createFromThrowable($throwable) : null;
|
$flattenedException = class_exists(FlattenException::class) ? FlattenException::createFromThrowable($throwable) : null;
|
||||||
$envelope = $envelope->withoutAll(ReceivedStamp::class)
|
$envelope = $envelope->with(
|
||||||
->withoutAll(TransportMessageIdStamp::class)
|
new SentToFailureTransportStamp($event->getReceiverName()),
|
||||||
->with(new SentToFailureTransportStamp($event->getReceiverName()))
|
new DelayStamp(0),
|
||||||
->with(new DelayStamp(0))
|
new RedeliveryStamp(0, $throwable->getMessage(), $flattenedException)
|
||||||
->with(new RedeliveryStamp(0, $this->failureSenderAlias, $throwable->getMessage(), $flattenedException));
|
);
|
||||||
|
|
||||||
if (null !== $this->logger) {
|
if (null !== $this->logger) {
|
||||||
$this->logger->info('Rejected message {class} will be sent to the failure transport {transport}.', [
|
$this->logger->info('Rejected message {class} will be sent to the failure transport {transport}.', [
|
||||||
'class' => \get_class($envelope->getMessage()),
|
'class' => \get_class($envelope->getMessage()),
|
||||||
'transport' => $this->failureSenderAlias,
|
'transport' => \get_class($this->failureSender),
|
||||||
]);
|
]);
|
||||||
}
|
}
|
||||||
|
|
||||||
$this->messageBus->dispatch($envelope);
|
$this->failureSender->send($envelope);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static function getSubscribedEvents()
|
public static function getSubscribedEvents()
|
||||||
|
@ -1,19 +0,0 @@
|
|||||||
<?php
|
|
||||||
|
|
||||||
/*
|
|
||||||
* This file is part of the Symfony package.
|
|
||||||
*
|
|
||||||
* (c) Fabien Potencier <fabien@symfony.com>
|
|
||||||
*
|
|
||||||
* For the full copyright and license information, please view the LICENSE
|
|
||||||
* file that was distributed with this source code.
|
|
||||||
*/
|
|
||||||
|
|
||||||
namespace Symfony\Component\Messenger\Exception;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @author Ryan Weaver <ryan@symfonycasts.com>
|
|
||||||
*/
|
|
||||||
class UnknownSenderException extends InvalidArgumentException
|
|
||||||
{
|
|
||||||
}
|
|
@ -13,7 +13,6 @@ namespace Symfony\Component\Messenger\Middleware;
|
|||||||
|
|
||||||
use Symfony\Component\Messenger\Envelope;
|
use Symfony\Component\Messenger\Envelope;
|
||||||
use Symfony\Component\Messenger\Exception\RejectRedeliveredMessageException;
|
use Symfony\Component\Messenger\Exception\RejectRedeliveredMessageException;
|
||||||
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
|
|
||||||
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceivedStamp;
|
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceivedStamp;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -34,14 +33,11 @@ class RejectRedeliveredMessageMiddleware implements MiddlewareInterface
|
|||||||
{
|
{
|
||||||
public function handle(Envelope $envelope, StackInterface $stack): Envelope
|
public function handle(Envelope $envelope, StackInterface $stack): Envelope
|
||||||
{
|
{
|
||||||
// ignore the dispatched messages for retry
|
|
||||||
if (null !== $envelope->last(ReceivedStamp::class)) {
|
|
||||||
$amqpReceivedStamp = $envelope->last(AmqpReceivedStamp::class);
|
$amqpReceivedStamp = $envelope->last(AmqpReceivedStamp::class);
|
||||||
|
|
||||||
if ($amqpReceivedStamp instanceof AmqpReceivedStamp && $amqpReceivedStamp->getAmqpEnvelope()->isRedelivery()) {
|
if ($amqpReceivedStamp instanceof AmqpReceivedStamp && $amqpReceivedStamp->getAmqpEnvelope()->isRedelivery()) {
|
||||||
throw new RejectRedeliveredMessageException('Redelivered message from AMQP detected that will be rejected and trigger the retry logic.');
|
throw new RejectRedeliveredMessageException('Redelivered message from AMQP detected that will be rejected and trigger the retry logic.');
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
return $stack->next()->handle($envelope, $stack);
|
return $stack->next()->handle($envelope, $stack);
|
||||||
}
|
}
|
||||||
|
@ -17,9 +17,7 @@ use Symfony\Component\EventDispatcher\LegacyEventDispatcherProxy;
|
|||||||
use Symfony\Component\Messenger\Envelope;
|
use Symfony\Component\Messenger\Envelope;
|
||||||
use Symfony\Component\Messenger\Event\SendMessageToTransportsEvent;
|
use Symfony\Component\Messenger\Event\SendMessageToTransportsEvent;
|
||||||
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
|
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
|
||||||
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
|
|
||||||
use Symfony\Component\Messenger\Stamp\SentStamp;
|
use Symfony\Component\Messenger\Stamp\SentStamp;
|
||||||
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
|
|
||||||
use Symfony\Component\Messenger\Transport\Sender\SendersLocatorInterface;
|
use Symfony\Component\Messenger\Transport\Sender\SendersLocatorInterface;
|
||||||
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
|
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
|
||||||
|
|
||||||
@ -57,12 +55,8 @@ class SendMessageMiddleware implements MiddlewareInterface
|
|||||||
// it's a received message, do not send it back
|
// it's a received message, do not send it back
|
||||||
$this->logger->info('Received message {class}', $context);
|
$this->logger->info('Received message {class}', $context);
|
||||||
} else {
|
} else {
|
||||||
/** @var RedeliveryStamp|null $redeliveryStamp */
|
$shouldDispatchEvent = true;
|
||||||
$redeliveryStamp = $envelope->last(RedeliveryStamp::class);
|
foreach ($this->sendersLocator->getSenders($envelope) as $alias => $sender) {
|
||||||
|
|
||||||
// dispatch event unless this is a redelivery
|
|
||||||
$shouldDispatchEvent = null === $redeliveryStamp;
|
|
||||||
foreach ($this->getSenders($envelope, $redeliveryStamp) as $alias => $sender) {
|
|
||||||
if (null !== $this->eventDispatcher && $shouldDispatchEvent) {
|
if (null !== $this->eventDispatcher && $shouldDispatchEvent) {
|
||||||
$event = new SendMessageToTransportsEvent($envelope);
|
$event = new SendMessageToTransportsEvent($envelope);
|
||||||
$this->eventDispatcher->dispatch($event);
|
$this->eventDispatcher->dispatch($event);
|
||||||
@ -82,18 +76,4 @@ class SendMessageMiddleware implements MiddlewareInterface
|
|||||||
// message should only be sent and not be handled by the next middleware
|
// message should only be sent and not be handled by the next middleware
|
||||||
return $envelope;
|
return $envelope;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* * @return iterable|SenderInterface[]
|
|
||||||
*/
|
|
||||||
private function getSenders(Envelope $envelope, ?RedeliveryStamp $redeliveryStamp): iterable
|
|
||||||
{
|
|
||||||
if (null !== $redeliveryStamp) {
|
|
||||||
return [
|
|
||||||
$redeliveryStamp->getSenderClassOrAlias() => $this->sendersLocator->getSenderByAlias($redeliveryStamp->getSenderClassOrAlias()),
|
|
||||||
];
|
|
||||||
}
|
|
||||||
|
|
||||||
return $this->sendersLocator->getSenders($envelope);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -20,18 +20,13 @@ use Symfony\Component\Messenger\Envelope;
|
|||||||
final class RedeliveryStamp implements StampInterface
|
final class RedeliveryStamp implements StampInterface
|
||||||
{
|
{
|
||||||
private $retryCount;
|
private $retryCount;
|
||||||
private $senderClassOrAlias;
|
|
||||||
private $redeliveredAt;
|
private $redeliveredAt;
|
||||||
private $exceptionMessage;
|
private $exceptionMessage;
|
||||||
private $flattenException;
|
private $flattenException;
|
||||||
|
|
||||||
/**
|
public function __construct(int $retryCount, string $exceptionMessage = null, FlattenException $flattenException = null)
|
||||||
* @param string $senderClassOrAlias Alias from SendersLocator or just the class name
|
|
||||||
*/
|
|
||||||
public function __construct(int $retryCount, string $senderClassOrAlias, string $exceptionMessage = null, FlattenException $flattenException = null)
|
|
||||||
{
|
{
|
||||||
$this->retryCount = $retryCount;
|
$this->retryCount = $retryCount;
|
||||||
$this->senderClassOrAlias = $senderClassOrAlias;
|
|
||||||
$this->exceptionMessage = $exceptionMessage;
|
$this->exceptionMessage = $exceptionMessage;
|
||||||
$this->flattenException = $flattenException;
|
$this->flattenException = $flattenException;
|
||||||
$this->redeliveredAt = new \DateTimeImmutable();
|
$this->redeliveredAt = new \DateTimeImmutable();
|
||||||
@ -50,16 +45,6 @@ final class RedeliveryStamp implements StampInterface
|
|||||||
return $this->retryCount;
|
return $this->retryCount;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* The target sender this should be redelivered to.
|
|
||||||
*
|
|
||||||
* @internal
|
|
||||||
*/
|
|
||||||
public function getSenderClassOrAlias(): string
|
|
||||||
{
|
|
||||||
return $this->senderClassOrAlias;
|
|
||||||
}
|
|
||||||
|
|
||||||
public function getExceptionMessage(): ?string
|
public function getExceptionMessage(): ?string
|
||||||
{
|
{
|
||||||
return $this->exceptionMessage;
|
return $this->exceptionMessage;
|
||||||
|
@ -28,7 +28,7 @@ class FailedMessagesShowCommandTest extends TestCase
|
|||||||
public function testBasicRun()
|
public function testBasicRun()
|
||||||
{
|
{
|
||||||
$sentToFailureStamp = new SentToFailureTransportStamp('async');
|
$sentToFailureStamp = new SentToFailureTransportStamp('async');
|
||||||
$redeliveryStamp = new RedeliveryStamp(0, 'failure_receiver', 'Things are bad!');
|
$redeliveryStamp = new RedeliveryStamp(0, 'Things are bad!');
|
||||||
$envelope = new Envelope(new \stdClass(), [
|
$envelope = new Envelope(new \stdClass(), [
|
||||||
new TransportMessageIdStamp(15),
|
new TransportMessageIdStamp(15),
|
||||||
$sentToFailureStamp,
|
$sentToFailureStamp,
|
||||||
@ -62,8 +62,8 @@ EOF
|
|||||||
public function testMultipleRedeliveryFails()
|
public function testMultipleRedeliveryFails()
|
||||||
{
|
{
|
||||||
$sentToFailureStamp = new SentToFailureTransportStamp('async');
|
$sentToFailureStamp = new SentToFailureTransportStamp('async');
|
||||||
$redeliveryStamp1 = new RedeliveryStamp(0, 'failure_receiver', 'Things are bad!');
|
$redeliveryStamp1 = new RedeliveryStamp(0, 'Things are bad!');
|
||||||
$redeliveryStamp2 = new RedeliveryStamp(0, 'failure_receiver');
|
$redeliveryStamp2 = new RedeliveryStamp(0);
|
||||||
$envelope = new Envelope(new \stdClass(), [
|
$envelope = new Envelope(new \stdClass(), [
|
||||||
new TransportMessageIdStamp(15),
|
new TransportMessageIdStamp(15),
|
||||||
$sentToFailureStamp,
|
$sentToFailureStamp,
|
||||||
|
@ -281,7 +281,6 @@ class MessengerPassTest extends TestCase
|
|||||||
$container->register('console.command.messenger_consume_messages', ConsumeMessagesCommand::class)->setArguments([
|
$container->register('console.command.messenger_consume_messages', ConsumeMessagesCommand::class)->setArguments([
|
||||||
null,
|
null,
|
||||||
new Reference('messenger.receiver_locator'),
|
new Reference('messenger.receiver_locator'),
|
||||||
new Reference('messenger.retry_strategy_locator'),
|
|
||||||
null,
|
null,
|
||||||
null,
|
null,
|
||||||
null,
|
null,
|
||||||
|
@ -0,0 +1,66 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This file is part of the Symfony package.
|
||||||
|
*
|
||||||
|
* (c) Fabien Potencier <fabien@symfony.com>
|
||||||
|
*
|
||||||
|
* For the full copyright and license information, please view the LICENSE
|
||||||
|
* file that was distributed with this source code.
|
||||||
|
*/
|
||||||
|
|
||||||
|
namespace Symfony\Component\Messenger\Tests\EventListener;
|
||||||
|
|
||||||
|
use PHPUnit\Framework\TestCase;
|
||||||
|
use Psr\Container\ContainerInterface;
|
||||||
|
use Symfony\Component\Messenger\Envelope;
|
||||||
|
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
|
||||||
|
use Symfony\Component\Messenger\EventListener\SendFailedMessageForRetryListener;
|
||||||
|
use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
|
||||||
|
use Symfony\Component\Messenger\Stamp\DelayStamp;
|
||||||
|
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
|
||||||
|
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
|
||||||
|
|
||||||
|
class SendFailedMessageForRetryListenerTest extends TestCase
|
||||||
|
{
|
||||||
|
public function testNoRetryStrategyCausesNoRetry()
|
||||||
|
{
|
||||||
|
$senderLocator = $this->createMock(ContainerInterface::class);
|
||||||
|
$senderLocator->expects($this->never())->method('has');
|
||||||
|
$senderLocator->expects($this->never())->method('get');
|
||||||
|
$retryStrategyLocator = $this->createMock(ContainerInterface::class);
|
||||||
|
$retryStrategyLocator->expects($this->once())->method('has')->willReturn(false);
|
||||||
|
|
||||||
|
$listener = new SendFailedMessageForRetryListener($senderLocator, $retryStrategyLocator);
|
||||||
|
|
||||||
|
$exception = new \Exception('no!');
|
||||||
|
$envelope = new Envelope(new \stdClass());
|
||||||
|
$event = new WorkerMessageFailedEvent($envelope, 'my_receiver', $exception);
|
||||||
|
|
||||||
|
$listener->onMessageFailed($event);
|
||||||
|
}
|
||||||
|
|
||||||
|
public function testEnvelopeIsSentToTransportOnRetry()
|
||||||
|
{
|
||||||
|
$exception = new \Exception('no!');
|
||||||
|
$envelope = new Envelope(new \stdClass());
|
||||||
|
|
||||||
|
$sender = $this->createMock(SenderInterface::class);
|
||||||
|
$sender->expects($this->once())->method('send')->with($envelope->with(new DelayStamp(1000), new RedeliveryStamp(1)))->willReturnArgument(0);
|
||||||
|
$senderLocator = $this->createMock(ContainerInterface::class);
|
||||||
|
$senderLocator->expects($this->once())->method('has')->willReturn(true);
|
||||||
|
$senderLocator->expects($this->never())->method('get')->willReturn($sender);
|
||||||
|
$retryStategy = $this->createMock(RetryStrategyInterface::class);
|
||||||
|
$retryStategy->expects($this->once())->method('isRetryable')->willReturn(true);
|
||||||
|
$retryStategy->expects($this->once())->method('getWaitingTime')->willReturn(1000);
|
||||||
|
$retryStrategyLocator = $this->createMock(ContainerInterface::class);
|
||||||
|
$retryStrategyLocator->expects($this->once())->method('has')->willReturn(true);
|
||||||
|
$retryStrategyLocator->expects($this->once())->method('get')->willReturn($retryStategy);
|
||||||
|
|
||||||
|
$listener = new SendFailedMessageForRetryListener($senderLocator, $retryStrategyLocator);
|
||||||
|
|
||||||
|
$event = new WorkerMessageFailedEvent($envelope, 'my_receiver', $exception);
|
||||||
|
|
||||||
|
$listener->onMessageFailed($event);
|
||||||
|
}
|
||||||
|
}
|
@ -9,25 +9,23 @@
|
|||||||
* file that was distributed with this source code.
|
* file that was distributed with this source code.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
namespace Symfony\Component\Messenger\Tests\Handler;
|
namespace Symfony\Component\Messenger\Tests\EventListener;
|
||||||
|
|
||||||
use PHPUnit\Framework\TestCase;
|
use PHPUnit\Framework\TestCase;
|
||||||
use Symfony\Component\Messenger\Envelope;
|
use Symfony\Component\Messenger\Envelope;
|
||||||
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
|
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
|
||||||
use Symfony\Component\Messenger\EventListener\SendFailedMessageToFailureTransportListener;
|
use Symfony\Component\Messenger\EventListener\SendFailedMessageToFailureTransportListener;
|
||||||
use Symfony\Component\Messenger\Exception\HandlerFailedException;
|
use Symfony\Component\Messenger\Exception\HandlerFailedException;
|
||||||
use Symfony\Component\Messenger\MessageBusInterface;
|
|
||||||
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
|
|
||||||
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
|
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
|
||||||
use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
|
use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
|
||||||
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
|
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
|
||||||
|
|
||||||
class SendFailedMessageToFailureTransportListenerTest extends TestCase
|
class SendFailedMessageToFailureTransportListenerTest extends TestCase
|
||||||
{
|
{
|
||||||
public function testItDispatchesToTheFailureTransport()
|
public function testItSendsToTheFailureTransport()
|
||||||
{
|
{
|
||||||
$bus = $this->createMock(MessageBusInterface::class);
|
$sender = $this->createMock(SenderInterface::class);
|
||||||
$bus->expects($this->once())->method('dispatch')->with($this->callback(function ($envelope) {
|
$sender->expects($this->once())->method('send')->with($this->callback(function ($envelope) {
|
||||||
/* @var Envelope $envelope */
|
/* @var Envelope $envelope */
|
||||||
$this->assertInstanceOf(Envelope::class, $envelope);
|
$this->assertInstanceOf(Envelope::class, $envelope);
|
||||||
|
|
||||||
@ -38,31 +36,24 @@ class SendFailedMessageToFailureTransportListenerTest extends TestCase
|
|||||||
|
|
||||||
/** @var RedeliveryStamp $redeliveryStamp */
|
/** @var RedeliveryStamp $redeliveryStamp */
|
||||||
$redeliveryStamp = $envelope->last(RedeliveryStamp::class);
|
$redeliveryStamp = $envelope->last(RedeliveryStamp::class);
|
||||||
$this->assertSame('failure_sender', $redeliveryStamp->getSenderClassOrAlias());
|
|
||||||
$this->assertSame('no!', $redeliveryStamp->getExceptionMessage());
|
$this->assertSame('no!', $redeliveryStamp->getExceptionMessage());
|
||||||
$this->assertSame('no!', $redeliveryStamp->getFlattenException()->getMessage());
|
$this->assertSame('no!', $redeliveryStamp->getFlattenException()->getMessage());
|
||||||
|
|
||||||
$this->assertNull($envelope->last(ReceivedStamp::class));
|
|
||||||
$this->assertNull($envelope->last(TransportMessageIdStamp::class));
|
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}))->willReturn(new Envelope(new \stdClass()));
|
}))->willReturnArgument(0);
|
||||||
$listener = new SendFailedMessageToFailureTransportListener(
|
$listener = new SendFailedMessageToFailureTransportListener($sender);
|
||||||
$bus,
|
|
||||||
'failure_sender'
|
|
||||||
);
|
|
||||||
|
|
||||||
$exception = new \Exception('no!');
|
$exception = new \Exception('no!');
|
||||||
$envelope = new Envelope(new \stdClass());
|
$envelope = new Envelope(new \stdClass());
|
||||||
$event = new WorkerMessageFailedEvent($envelope, 'my_receiver', $exception, false);
|
$event = new WorkerMessageFailedEvent($envelope, 'my_receiver', $exception);
|
||||||
|
|
||||||
$listener->onMessageFailed($event);
|
$listener->onMessageFailed($event);
|
||||||
}
|
}
|
||||||
|
|
||||||
public function testItGetsNestedHandlerFailedException()
|
public function testItGetsNestedHandlerFailedException()
|
||||||
{
|
{
|
||||||
$bus = $this->createMock(MessageBusInterface::class);
|
$sender = $this->createMock(SenderInterface::class);
|
||||||
$bus->expects($this->once())->method('dispatch')->with($this->callback(function ($envelope) {
|
$sender->expects($this->once())->method('send')->with($this->callback(function ($envelope) {
|
||||||
/** @var Envelope $envelope */
|
/** @var Envelope $envelope */
|
||||||
/** @var RedeliveryStamp $redeliveryStamp */
|
/** @var RedeliveryStamp $redeliveryStamp */
|
||||||
$redeliveryStamp = $envelope->last(RedeliveryStamp::class);
|
$redeliveryStamp = $envelope->last(RedeliveryStamp::class);
|
||||||
@ -71,49 +62,41 @@ class SendFailedMessageToFailureTransportListenerTest extends TestCase
|
|||||||
$this->assertSame('Exception', $redeliveryStamp->getFlattenException()->getClass());
|
$this->assertSame('Exception', $redeliveryStamp->getFlattenException()->getClass());
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}))->willReturn(new Envelope(new \stdClass()));
|
}))->willReturnArgument(0);
|
||||||
|
|
||||||
$listener = new SendFailedMessageToFailureTransportListener(
|
$listener = new SendFailedMessageToFailureTransportListener($sender);
|
||||||
$bus,
|
|
||||||
'failure_sender'
|
|
||||||
);
|
|
||||||
|
|
||||||
$envelope = new Envelope(new \stdClass());
|
$envelope = new Envelope(new \stdClass());
|
||||||
$exception = new \Exception('I am inside!');
|
$exception = new \Exception('I am inside!');
|
||||||
$exception = new HandlerFailedException($envelope, [$exception]);
|
$exception = new HandlerFailedException($envelope, [$exception]);
|
||||||
$event = new WorkerMessageFailedEvent($envelope, 'my_receiver', $exception, false);
|
$event = new WorkerMessageFailedEvent($envelope, 'my_receiver', $exception);
|
||||||
|
|
||||||
$listener->onMessageFailed($event);
|
$listener->onMessageFailed($event);
|
||||||
}
|
}
|
||||||
|
|
||||||
public function testDoNothingOnRetry()
|
public function testDoNothingOnRetry()
|
||||||
{
|
{
|
||||||
$bus = $this->createMock(MessageBusInterface::class);
|
$sender = $this->createMock(SenderInterface::class);
|
||||||
$bus->expects($this->never())->method('dispatch');
|
$sender->expects($this->never())->method('send');
|
||||||
$listener = new SendFailedMessageToFailureTransportListener(
|
$listener = new SendFailedMessageToFailureTransportListener($sender);
|
||||||
$bus,
|
|
||||||
'failure_sender'
|
|
||||||
);
|
|
||||||
|
|
||||||
$envelope = new Envelope(new \stdClass());
|
$envelope = new Envelope(new \stdClass());
|
||||||
$event = new WorkerMessageFailedEvent($envelope, 'my_receiver', new \Exception(''), true);
|
$event = new WorkerMessageFailedEvent($envelope, 'my_receiver', new \Exception());
|
||||||
|
$event->setForRetry();
|
||||||
|
|
||||||
$listener->onMessageFailed($event);
|
$listener->onMessageFailed($event);
|
||||||
}
|
}
|
||||||
|
|
||||||
public function testDoNotRedeliverToFailed()
|
public function testDoNotRedeliverToFailed()
|
||||||
{
|
{
|
||||||
$bus = $this->createMock(MessageBusInterface::class);
|
$sender = $this->createMock(SenderInterface::class);
|
||||||
$bus->expects($this->never())->method('dispatch');
|
$sender->expects($this->never())->method('send');
|
||||||
$listener = new SendFailedMessageToFailureTransportListener(
|
$listener = new SendFailedMessageToFailureTransportListener($sender);
|
||||||
$bus,
|
|
||||||
'failure_sender'
|
|
||||||
);
|
|
||||||
|
|
||||||
$envelope = new Envelope(new \stdClass(), [
|
$envelope = new Envelope(new \stdClass(), [
|
||||||
new SentToFailureTransportStamp('my_receiver'),
|
new SentToFailureTransportStamp('my_receiver'),
|
||||||
]);
|
]);
|
||||||
$event = new WorkerMessageFailedEvent($envelope, 'my_receiver', new \Exception(''), false);
|
$event = new WorkerMessageFailedEvent($envelope, 'my_receiver', new \Exception());
|
||||||
|
|
||||||
$listener->onMessageFailed($event);
|
$listener->onMessageFailed($event);
|
||||||
}
|
}
|
||||||
|
@ -13,10 +13,10 @@ namespace Symfony\Component\Messenger\Tests;
|
|||||||
|
|
||||||
use PHPUnit\Framework\TestCase;
|
use PHPUnit\Framework\TestCase;
|
||||||
use Psr\Container\ContainerInterface;
|
use Psr\Container\ContainerInterface;
|
||||||
use Symfony\Component\DependencyInjection\Container;
|
|
||||||
use Symfony\Component\EventDispatcher\EventDispatcher;
|
use Symfony\Component\EventDispatcher\EventDispatcher;
|
||||||
use Symfony\Component\Messenger\Envelope;
|
use Symfony\Component\Messenger\Envelope;
|
||||||
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
|
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
|
||||||
|
use Symfony\Component\Messenger\EventListener\SendFailedMessageForRetryListener;
|
||||||
use Symfony\Component\Messenger\EventListener\SendFailedMessageToFailureTransportListener;
|
use Symfony\Component\Messenger\EventListener\SendFailedMessageToFailureTransportListener;
|
||||||
use Symfony\Component\Messenger\Exception\HandlerFailedException;
|
use Symfony\Component\Messenger\Exception\HandlerFailedException;
|
||||||
use Symfony\Component\Messenger\Handler\HandlerDescriptor;
|
use Symfony\Component\Messenger\Handler\HandlerDescriptor;
|
||||||
@ -61,11 +61,18 @@ class FailureIntegrationTest extends TestCase
|
|||||||
$locator
|
$locator
|
||||||
);
|
);
|
||||||
|
|
||||||
|
$retryStrategyLocator = $this->createMock(ContainerInterface::class);
|
||||||
|
$retryStrategyLocator->expects($this->any())
|
||||||
|
->method('has')
|
||||||
|
->willReturn(true);
|
||||||
|
$retryStrategyLocator->expects($this->any())
|
||||||
|
->method('get')
|
||||||
|
->willReturn(new MultiplierRetryStrategy(1));
|
||||||
|
|
||||||
// using to so we can lazily get the bus later and avoid circular problem
|
// using to so we can lazily get the bus later and avoid circular problem
|
||||||
$transport1HandlerThatFails = new DummyTestHandler(true);
|
$transport1HandlerThatFails = new DummyTestHandler(true);
|
||||||
$allTransportHandlerThatWorks = new DummyTestHandler(false);
|
$allTransportHandlerThatWorks = new DummyTestHandler(false);
|
||||||
$transport2HandlerThatWorks = new DummyTestHandler(false);
|
$transport2HandlerThatWorks = new DummyTestHandler(false);
|
||||||
$container = new Container();
|
|
||||||
$handlerLocator = new HandlersLocator([
|
$handlerLocator = new HandlersLocator([
|
||||||
DummyMessage::class => [
|
DummyMessage::class => [
|
||||||
new HandlerDescriptor($transport1HandlerThatFails, [
|
new HandlerDescriptor($transport1HandlerThatFails, [
|
||||||
@ -88,17 +95,17 @@ class FailureIntegrationTest extends TestCase
|
|||||||
new SendMessageMiddleware($senderLocator),
|
new SendMessageMiddleware($senderLocator),
|
||||||
new HandleMessageMiddleware($handlerLocator),
|
new HandleMessageMiddleware($handlerLocator),
|
||||||
]);
|
]);
|
||||||
$container->set('bus', $bus);
|
$dispatcher->addSubscriber(new SendFailedMessageForRetryListener($locator, $retryStrategyLocator));
|
||||||
$dispatcher->addSubscriber(new SendFailedMessageToFailureTransportListener($bus, 'the_failure_transport'));
|
$dispatcher->addSubscriber(new SendFailedMessageToFailureTransportListener($failureTransport));
|
||||||
|
|
||||||
$runWorker = function (string $transportName, int $maxRetries) use ($transports, $bus, $dispatcher): ?\Throwable {
|
$runWorker = function (string $transportName) use ($transports, $bus, $dispatcher): ?\Throwable {
|
||||||
$throwable = null;
|
$throwable = null;
|
||||||
$failedListener = function (WorkerMessageFailedEvent $event) use (&$throwable) {
|
$failedListener = function (WorkerMessageFailedEvent $event) use (&$throwable) {
|
||||||
$throwable = $event->getThrowable();
|
$throwable = $event->getThrowable();
|
||||||
};
|
};
|
||||||
$dispatcher->addListener(WorkerMessageFailedEvent::class, $failedListener);
|
$dispatcher->addListener(WorkerMessageFailedEvent::class, $failedListener);
|
||||||
|
|
||||||
$worker = new Worker([$transportName => $transports[$transportName]], $bus, [$transportName => new MultiplierRetryStrategy($maxRetries)], $dispatcher);
|
$worker = new Worker([$transportName => $transports[$transportName]], $bus, $dispatcher);
|
||||||
|
|
||||||
$worker->run([], function (?Envelope $envelope) use ($worker) {
|
$worker->run([], function (?Envelope $envelope) use ($worker) {
|
||||||
// handle one envelope, then stop
|
// handle one envelope, then stop
|
||||||
@ -126,7 +133,7 @@ class FailureIntegrationTest extends TestCase
|
|||||||
/*
|
/*
|
||||||
* Receive the message from "transport1"
|
* Receive the message from "transport1"
|
||||||
*/
|
*/
|
||||||
$throwable = $runWorker('transport1', 1);
|
$throwable = $runWorker('transport1');
|
||||||
// make sure this is failing for the reason we think
|
// make sure this is failing for the reason we think
|
||||||
$this->assertInstanceOf(HandlerFailedException::class, $throwable);
|
$this->assertInstanceOf(HandlerFailedException::class, $throwable);
|
||||||
// handler for transport1 and all transports were called
|
// handler for transport1 and all transports were called
|
||||||
@ -140,7 +147,7 @@ class FailureIntegrationTest extends TestCase
|
|||||||
/*
|
/*
|
||||||
* Receive the message for a (final) retry
|
* Receive the message for a (final) retry
|
||||||
*/
|
*/
|
||||||
$runWorker('transport1', 1);
|
$runWorker('transport1');
|
||||||
// only the "failed" handler is called a 2nd time
|
// only the "failed" handler is called a 2nd time
|
||||||
$this->assertSame(2, $transport1HandlerThatFails->getTimesCalled());
|
$this->assertSame(2, $transport1HandlerThatFails->getTimesCalled());
|
||||||
$this->assertSame(1, $allTransportHandlerThatWorks->getTimesCalled());
|
$this->assertSame(1, $allTransportHandlerThatWorks->getTimesCalled());
|
||||||
@ -160,7 +167,7 @@ class FailureIntegrationTest extends TestCase
|
|||||||
/*
|
/*
|
||||||
* Failed message is handled, fails, and sent for a retry
|
* Failed message is handled, fails, and sent for a retry
|
||||||
*/
|
*/
|
||||||
$throwable = $runWorker('the_failure_transport', 1);
|
$throwable = $runWorker('the_failure_transport');
|
||||||
// make sure this is failing for the reason we think
|
// make sure this is failing for the reason we think
|
||||||
$this->assertInstanceOf(HandlerFailedException::class, $throwable);
|
$this->assertInstanceOf(HandlerFailedException::class, $throwable);
|
||||||
// only the "failed" handler is called a 3rd time
|
// only the "failed" handler is called a 3rd time
|
||||||
@ -175,7 +182,7 @@ class FailureIntegrationTest extends TestCase
|
|||||||
/*
|
/*
|
||||||
* Message is retried on failure transport then discarded
|
* Message is retried on failure transport then discarded
|
||||||
*/
|
*/
|
||||||
$runWorker('the_failure_transport', 1);
|
$runWorker('the_failure_transport');
|
||||||
// only the "failed" handler is called a 4th time
|
// only the "failed" handler is called a 4th time
|
||||||
$this->assertSame(4, $transport1HandlerThatFails->getTimesCalled());
|
$this->assertSame(4, $transport1HandlerThatFails->getTimesCalled());
|
||||||
$this->assertSame(1, $allTransportHandlerThatWorks->getTimesCalled());
|
$this->assertSame(1, $allTransportHandlerThatWorks->getTimesCalled());
|
||||||
@ -185,7 +192,7 @@ class FailureIntegrationTest extends TestCase
|
|||||||
/*
|
/*
|
||||||
* Execute handlers on transport2
|
* Execute handlers on transport2
|
||||||
*/
|
*/
|
||||||
$runWorker('transport2', 1);
|
$runWorker('transport2');
|
||||||
// transport1 handler is not called again
|
// transport1 handler is not called again
|
||||||
$this->assertSame(4, $transport1HandlerThatFails->getTimesCalled());
|
$this->assertSame(4, $transport1HandlerThatFails->getTimesCalled());
|
||||||
// all transport handler is now called again
|
// all transport handler is now called again
|
||||||
@ -202,10 +209,10 @@ class FailureIntegrationTest extends TestCase
|
|||||||
*/
|
*/
|
||||||
$bus->dispatch($envelope);
|
$bus->dispatch($envelope);
|
||||||
// handle the message, but with no retries
|
// handle the message, but with no retries
|
||||||
$runWorker('transport1', 0);
|
$runWorker('transport1');
|
||||||
// now make the handler work!
|
// now make the handler work!
|
||||||
$transport1HandlerThatFails->setShouldThrow(false);
|
$transport1HandlerThatFails->setShouldThrow(false);
|
||||||
$runWorker('the_failure_transport', 1);
|
$runWorker('the_failure_transport');
|
||||||
// the failure transport is empty because it worked
|
// the failure transport is empty because it worked
|
||||||
$this->assertEmpty($failureTransport->getMessagesWaitingToBeReceived());
|
$this->assertEmpty($failureTransport->getMessagesWaitingToBeReceived());
|
||||||
}
|
}
|
||||||
|
@ -1,39 +0,0 @@
|
|||||||
<?php
|
|
||||||
|
|
||||||
/*
|
|
||||||
* This file is part of the Symfony package.
|
|
||||||
*
|
|
||||||
* (c) Fabien Potencier <fabien@symfony.com>
|
|
||||||
*
|
|
||||||
* For the full copyright and license information, please view the LICENSE
|
|
||||||
* file that was distributed with this source code.
|
|
||||||
*/
|
|
||||||
|
|
||||||
namespace Symfony\Component\Messenger\Tests\Fixtures;
|
|
||||||
|
|
||||||
class DummyMessageHandlerFailingFirstTimes
|
|
||||||
{
|
|
||||||
private $remainingFailures;
|
|
||||||
|
|
||||||
private $called = 0;
|
|
||||||
|
|
||||||
public function __construct(int $throwExceptionOnFirstTries = 0)
|
|
||||||
{
|
|
||||||
$this->remainingFailures = $throwExceptionOnFirstTries;
|
|
||||||
}
|
|
||||||
|
|
||||||
public function __invoke(DummyMessage $message)
|
|
||||||
{
|
|
||||||
if ($this->remainingFailures > 0) {
|
|
||||||
--$this->remainingFailures;
|
|
||||||
throw new \Exception('Handler should throw Exception.');
|
|
||||||
}
|
|
||||||
|
|
||||||
++$this->called;
|
|
||||||
}
|
|
||||||
|
|
||||||
public function getTimesCalledWithoutThrowing(): int
|
|
||||||
{
|
|
||||||
return $this->called;
|
|
||||||
}
|
|
||||||
}
|
|
@ -16,7 +16,6 @@ use Symfony\Component\Messenger\Envelope;
|
|||||||
use Symfony\Component\Messenger\Event\SendMessageToTransportsEvent;
|
use Symfony\Component\Messenger\Event\SendMessageToTransportsEvent;
|
||||||
use Symfony\Component\Messenger\Middleware\SendMessageMiddleware;
|
use Symfony\Component\Messenger\Middleware\SendMessageMiddleware;
|
||||||
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
|
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
|
||||||
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
|
|
||||||
use Symfony\Component\Messenger\Stamp\SentStamp;
|
use Symfony\Component\Messenger\Stamp\SentStamp;
|
||||||
use Symfony\Component\Messenger\Test\Middleware\MiddlewareTestCase;
|
use Symfony\Component\Messenger\Test\Middleware\MiddlewareTestCase;
|
||||||
use Symfony\Component\Messenger\Tests\Fixtures\ChildDummyMessage;
|
use Symfony\Component\Messenger\Tests\Fixtures\ChildDummyMessage;
|
||||||
@ -84,31 +83,6 @@ class SendMessageMiddlewareTest extends MiddlewareTestCase
|
|||||||
$this->assertCount(2, $sentStamps);
|
$this->assertCount(2, $sentStamps);
|
||||||
}
|
}
|
||||||
|
|
||||||
public function testItSendsToOnlyOneSenderOnRedelivery()
|
|
||||||
{
|
|
||||||
$envelope = new Envelope(new DummyMessage('Hey'), [new RedeliveryStamp(5, 'bar')]);
|
|
||||||
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
|
|
||||||
$sender2 = $this->getMockBuilder(SenderInterface::class)->getMock();
|
|
||||||
|
|
||||||
$sendersLocator = $this->createSendersLocator(
|
|
||||||
[DummyMessage::class => ['foo', 'bar']],
|
|
||||||
['foo' => $sender, 'bar' => $sender2]
|
|
||||||
);
|
|
||||||
|
|
||||||
$middleware = new SendMessageMiddleware($sendersLocator);
|
|
||||||
|
|
||||||
$sender->expects($this->never())
|
|
||||||
->method('send')
|
|
||||||
;
|
|
||||||
$sender2->expects($this->once())
|
|
||||||
->method('send')
|
|
||||||
->willReturnArgument(0);
|
|
||||||
|
|
||||||
$mockStack = $this->getStackMock(false); // false because next should not be called
|
|
||||||
$envelope = $middleware->handle($envelope, $mockStack);
|
|
||||||
$this->assertCount(1, $envelope->all(SentStamp::class));
|
|
||||||
}
|
|
||||||
|
|
||||||
public function testItSendsTheMessageToAssignedSenderWithPreWrappedMessage()
|
public function testItSendsTheMessageToAssignedSenderWithPreWrappedMessage()
|
||||||
{
|
{
|
||||||
$envelope = new Envelope(new ChildDummyMessage('Hey'));
|
$envelope = new Envelope(new ChildDummyMessage('Hey'));
|
||||||
@ -223,24 +197,6 @@ class SendMessageMiddlewareTest extends MiddlewareTestCase
|
|||||||
$middleware->handle($envelope, $this->getStackMock());
|
$middleware->handle($envelope, $this->getStackMock());
|
||||||
}
|
}
|
||||||
|
|
||||||
public function testItDoesNotDispatchOnRedeliver()
|
|
||||||
{
|
|
||||||
$envelope = new Envelope(new DummyMessage('original envelope'));
|
|
||||||
$envelope = $envelope->with(new RedeliveryStamp(3, 'foo_sender'));
|
|
||||||
|
|
||||||
$dispatcher = $this->createMock(EventDispatcherInterface::class);
|
|
||||||
$dispatcher->expects($this->never())->method('dispatch');
|
|
||||||
|
|
||||||
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
|
|
||||||
$sender2 = $this->getMockBuilder(SenderInterface::class)->getMock();
|
|
||||||
$sender2->expects($this->once())->method('send')->willReturn(new Envelope(new \stdClass()));
|
|
||||||
|
|
||||||
$sendersLocator = $this->createSendersLocator([DummyMessage::class => ['foo']], ['foo' => $sender, 'foo_sender' => $sender2]);
|
|
||||||
$middleware = new SendMessageMiddleware($sendersLocator, $dispatcher);
|
|
||||||
|
|
||||||
$middleware->handle($envelope, $this->getStackMock(false));
|
|
||||||
}
|
|
||||||
|
|
||||||
private function createSendersLocator(array $sendersMap, array $senders): SendersLocator
|
private function createSendersLocator(array $sendersMap, array $senders): SendersLocator
|
||||||
{
|
{
|
||||||
$container = $this->createMock(ContainerInterface::class);
|
$container = $this->createMock(ContainerInterface::class);
|
||||||
|
@ -21,7 +21,7 @@ class MultiplierRetryStrategyTest extends TestCase
|
|||||||
public function testIsRetryable()
|
public function testIsRetryable()
|
||||||
{
|
{
|
||||||
$strategy = new MultiplierRetryStrategy(3);
|
$strategy = new MultiplierRetryStrategy(3);
|
||||||
$envelope = new Envelope(new \stdClass(), [new RedeliveryStamp(0, 'sender_alias')]);
|
$envelope = new Envelope(new \stdClass(), [new RedeliveryStamp(0)]);
|
||||||
|
|
||||||
$this->assertTrue($strategy->isRetryable($envelope));
|
$this->assertTrue($strategy->isRetryable($envelope));
|
||||||
}
|
}
|
||||||
@ -29,7 +29,7 @@ class MultiplierRetryStrategyTest extends TestCase
|
|||||||
public function testIsNotRetryable()
|
public function testIsNotRetryable()
|
||||||
{
|
{
|
||||||
$strategy = new MultiplierRetryStrategy(3);
|
$strategy = new MultiplierRetryStrategy(3);
|
||||||
$envelope = new Envelope(new \stdClass(), [new RedeliveryStamp(3, 'sender_alias')]);
|
$envelope = new Envelope(new \stdClass(), [new RedeliveryStamp(3)]);
|
||||||
|
|
||||||
$this->assertFalse($strategy->isRetryable($envelope));
|
$this->assertFalse($strategy->isRetryable($envelope));
|
||||||
}
|
}
|
||||||
@ -37,7 +37,7 @@ class MultiplierRetryStrategyTest extends TestCase
|
|||||||
public function testIsNotRetryableWithZeroMax()
|
public function testIsNotRetryableWithZeroMax()
|
||||||
{
|
{
|
||||||
$strategy = new MultiplierRetryStrategy(0);
|
$strategy = new MultiplierRetryStrategy(0);
|
||||||
$envelope = new Envelope(new \stdClass(), [new RedeliveryStamp(0, 'sender_alias')]);
|
$envelope = new Envelope(new \stdClass(), [new RedeliveryStamp(0)]);
|
||||||
$this->assertFalse($strategy->isRetryable($envelope));
|
$this->assertFalse($strategy->isRetryable($envelope));
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -55,7 +55,7 @@ class MultiplierRetryStrategyTest extends TestCase
|
|||||||
public function testGetWaitTime(int $delay, int $multiplier, int $maxDelay, int $previousRetries, int $expectedDelay)
|
public function testGetWaitTime(int $delay, int $multiplier, int $maxDelay, int $previousRetries, int $expectedDelay)
|
||||||
{
|
{
|
||||||
$strategy = new MultiplierRetryStrategy(10, $delay, $multiplier, $maxDelay);
|
$strategy = new MultiplierRetryStrategy(10, $delay, $multiplier, $maxDelay);
|
||||||
$envelope = new Envelope(new \stdClass(), [new RedeliveryStamp($previousRetries, 'sender_alias')]);
|
$envelope = new Envelope(new \stdClass(), [new RedeliveryStamp($previousRetries)]);
|
||||||
|
|
||||||
$this->assertSame($expectedDelay, $strategy->getWaitingTime($envelope));
|
$this->assertSame($expectedDelay, $strategy->getWaitingTime($envelope));
|
||||||
}
|
}
|
||||||
|
@ -1,100 +0,0 @@
|
|||||||
<?php
|
|
||||||
|
|
||||||
/*
|
|
||||||
* This file is part of the Symfony package.
|
|
||||||
*
|
|
||||||
* (c) Fabien Potencier <fabien@symfony.com>
|
|
||||||
*
|
|
||||||
* For the full copyright and license information, please view the LICENSE
|
|
||||||
* file that was distributed with this source code.
|
|
||||||
*/
|
|
||||||
|
|
||||||
namespace Symfony\Component\Messenger\Tests;
|
|
||||||
|
|
||||||
use PHPUnit\Framework\TestCase;
|
|
||||||
use Psr\Container\ContainerInterface;
|
|
||||||
use Symfony\Component\Messenger\Envelope;
|
|
||||||
use Symfony\Component\Messenger\Handler\HandlerDescriptor;
|
|
||||||
use Symfony\Component\Messenger\Handler\HandlersLocator;
|
|
||||||
use Symfony\Component\Messenger\MessageBus;
|
|
||||||
use Symfony\Component\Messenger\Middleware\HandleMessageMiddleware;
|
|
||||||
use Symfony\Component\Messenger\Middleware\SendMessageMiddleware;
|
|
||||||
use Symfony\Component\Messenger\Retry\MultiplierRetryStrategy;
|
|
||||||
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
|
||||||
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessageHandlerFailingFirstTimes;
|
|
||||||
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
|
|
||||||
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
|
|
||||||
use Symfony\Component\Messenger\Transport\Sender\SendersLocator;
|
|
||||||
use Symfony\Component\Messenger\Worker;
|
|
||||||
|
|
||||||
class RetryIntegrationTest extends TestCase
|
|
||||||
{
|
|
||||||
public function testRetryMechanism()
|
|
||||||
{
|
|
||||||
$senderAndReceiver = new DummySenderAndReceiver();
|
|
||||||
|
|
||||||
$senderLocator = $this->createMock(ContainerInterface::class);
|
|
||||||
$senderLocator->method('has')->with('transportName')->willReturn(true);
|
|
||||||
$senderLocator->method('get')->with('transportName')->willReturn($senderAndReceiver);
|
|
||||||
$senderLocator = new SendersLocator([DummyMessage::class => ['transportName']], $senderLocator);
|
|
||||||
|
|
||||||
$handler = new DummyMessageHandlerFailingFirstTimes(0);
|
|
||||||
$throwingHandler = new DummyMessageHandlerFailingFirstTimes(1);
|
|
||||||
$handlerLocator = new HandlersLocator([
|
|
||||||
DummyMessage::class => [
|
|
||||||
new HandlerDescriptor($handler, ['alias' => 'first']),
|
|
||||||
new HandlerDescriptor($throwingHandler, ['alias' => 'throwing']),
|
|
||||||
],
|
|
||||||
]);
|
|
||||||
|
|
||||||
// dispatch the message, which will get "sent" and then received by DummySenderAndReceiver
|
|
||||||
$bus = new MessageBus([new SendMessageMiddleware($senderLocator), new HandleMessageMiddleware($handlerLocator)]);
|
|
||||||
$envelope = new Envelope(new DummyMessage('API'));
|
|
||||||
$bus->dispatch($envelope);
|
|
||||||
|
|
||||||
$worker = new Worker(['transportName' => $senderAndReceiver], $bus, ['transportName' => new MultiplierRetryStrategy()]);
|
|
||||||
$worker->run([], function (?Envelope $envelope) use ($worker) {
|
|
||||||
if (null === $envelope) {
|
|
||||||
$worker->stop();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
$this->assertSame(1, $handler->getTimesCalledWithoutThrowing());
|
|
||||||
$this->assertSame(1, $throwingHandler->getTimesCalledWithoutThrowing());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
class DummySenderAndReceiver implements ReceiverInterface, SenderInterface
|
|
||||||
{
|
|
||||||
private $messagesWaiting = [];
|
|
||||||
|
|
||||||
private $messagesReceived = [];
|
|
||||||
|
|
||||||
public function get(): iterable
|
|
||||||
{
|
|
||||||
$message = array_shift($this->messagesWaiting);
|
|
||||||
|
|
||||||
if (null === $message) {
|
|
||||||
return [];
|
|
||||||
}
|
|
||||||
|
|
||||||
$this->messagesReceived[] = $message;
|
|
||||||
|
|
||||||
return [$message];
|
|
||||||
}
|
|
||||||
|
|
||||||
public function ack(Envelope $envelope): void
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
public function reject(Envelope $envelope): void
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
public function send(Envelope $envelope): Envelope
|
|
||||||
{
|
|
||||||
$this->messagesWaiting[] = $envelope;
|
|
||||||
|
|
||||||
return $envelope;
|
|
||||||
}
|
|
||||||
}
|
|
@ -19,9 +19,8 @@ class RedeliveryStampTest extends TestCase
|
|||||||
{
|
{
|
||||||
public function testGetters()
|
public function testGetters()
|
||||||
{
|
{
|
||||||
$stamp = new RedeliveryStamp(10, 'sender_alias');
|
$stamp = new RedeliveryStamp(10);
|
||||||
$this->assertSame(10, $stamp->getRetryCount());
|
$this->assertSame(10, $stamp->getRetryCount());
|
||||||
$this->assertSame('sender_alias', $stamp->getSenderClassOrAlias());
|
|
||||||
$this->assertInstanceOf(\DateTimeInterface::class, $stamp->getRedeliveredAt());
|
$this->assertInstanceOf(\DateTimeInterface::class, $stamp->getRedeliveredAt());
|
||||||
$this->assertNull($stamp->getExceptionMessage());
|
$this->assertNull($stamp->getExceptionMessage());
|
||||||
$this->assertNull($stamp->getFlattenException());
|
$this->assertNull($stamp->getFlattenException());
|
||||||
@ -30,7 +29,7 @@ class RedeliveryStampTest extends TestCase
|
|||||||
public function testGettersPopulated()
|
public function testGettersPopulated()
|
||||||
{
|
{
|
||||||
$flattenException = new FlattenException();
|
$flattenException = new FlattenException();
|
||||||
$stamp = new RedeliveryStamp(10, 'sender_alias', 'exception message', $flattenException);
|
$stamp = new RedeliveryStamp(10, 'exception message', $flattenException);
|
||||||
$this->assertSame('exception message', $stamp->getExceptionMessage());
|
$this->assertSame('exception message', $stamp->getExceptionMessage());
|
||||||
$this->assertSame($flattenException, $stamp->getFlattenException());
|
$this->assertSame($flattenException, $stamp->getFlattenException());
|
||||||
}
|
}
|
||||||
|
@ -95,7 +95,7 @@ class AmqpExtIntegrationTest extends TestCase
|
|||||||
$envelope = $envelopes[0];
|
$envelope = $envelopes[0];
|
||||||
$newEnvelope = $envelope
|
$newEnvelope = $envelope
|
||||||
->with(new DelayStamp(2000))
|
->with(new DelayStamp(2000))
|
||||||
->with(new RedeliveryStamp(1, 'not_important'));
|
->with(new RedeliveryStamp(1));
|
||||||
$sender->send($newEnvelope);
|
$sender->send($newEnvelope);
|
||||||
$receiver->ack($envelope);
|
$receiver->ack($envelope);
|
||||||
|
|
||||||
|
@ -14,7 +14,6 @@ namespace Symfony\Component\Messenger\Tests\Transport\Sender;
|
|||||||
use PHPUnit\Framework\TestCase;
|
use PHPUnit\Framework\TestCase;
|
||||||
use Psr\Container\ContainerInterface;
|
use Psr\Container\ContainerInterface;
|
||||||
use Symfony\Component\Messenger\Envelope;
|
use Symfony\Component\Messenger\Envelope;
|
||||||
use Symfony\Component\Messenger\Exception\UnknownSenderException;
|
|
||||||
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
||||||
use Symfony\Component\Messenger\Tests\Fixtures\SecondMessage;
|
use Symfony\Component\Messenger\Tests\Fixtures\SecondMessage;
|
||||||
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
|
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
|
||||||
@ -36,35 +35,6 @@ class SendersLocatorTest extends TestCase
|
|||||||
$this->assertSame([], iterator_to_array($locator->getSenders(new Envelope(new SecondMessage()))));
|
$this->assertSame([], iterator_to_array($locator->getSenders(new Envelope(new SecondMessage()))));
|
||||||
}
|
}
|
||||||
|
|
||||||
public function testGetSenderByAlias()
|
|
||||||
{
|
|
||||||
$sender1 = $this->getMockBuilder(SenderInterface::class)->getMock();
|
|
||||||
$sender2 = $this->getMockBuilder(SenderInterface::class)->getMock();
|
|
||||||
$sendersLocator = $this->createContainer([
|
|
||||||
'sender1' => $sender1,
|
|
||||||
'sender2' => $sender2,
|
|
||||||
]);
|
|
||||||
|
|
||||||
$locator = new SendersLocator([], $sendersLocator);
|
|
||||||
|
|
||||||
$this->assertSame($sender1, $locator->getSenderByAlias('sender1'));
|
|
||||||
$this->assertSame($sender2, $locator->getSenderByAlias('sender2'));
|
|
||||||
}
|
|
||||||
|
|
||||||
public function testGetSenderByAliasThrowsException()
|
|
||||||
{
|
|
||||||
$this->expectException(UnknownSenderException::class);
|
|
||||||
$this->expectExceptionMessage('Unknown sender alias');
|
|
||||||
|
|
||||||
$sender1 = $this->getMockBuilder(SenderInterface::class)->getMock();
|
|
||||||
$sendersLocator = $this->createContainer([
|
|
||||||
'sender1' => $sender1,
|
|
||||||
]);
|
|
||||||
|
|
||||||
$locator = new SendersLocator([], $sendersLocator);
|
|
||||||
$locator->getSenderByAlias('sender2');
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @group legacy
|
* @group legacy
|
||||||
*/
|
*/
|
||||||
|
@ -17,13 +17,9 @@ use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
|
|||||||
use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent;
|
use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent;
|
||||||
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
|
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
|
||||||
use Symfony\Component\Messenger\Event\WorkerStoppedEvent;
|
use Symfony\Component\Messenger\Event\WorkerStoppedEvent;
|
||||||
use Symfony\Component\Messenger\Exception\HandlerFailedException;
|
|
||||||
use Symfony\Component\Messenger\Exception\UnrecoverableMessageHandlingException;
|
|
||||||
use Symfony\Component\Messenger\MessageBusInterface;
|
use Symfony\Component\Messenger\MessageBusInterface;
|
||||||
use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
|
|
||||||
use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp;
|
use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp;
|
||||||
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
|
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
|
||||||
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
|
|
||||||
use Symfony\Component\Messenger\Stamp\SentStamp;
|
use Symfony\Component\Messenger\Stamp\SentStamp;
|
||||||
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
||||||
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
|
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
|
||||||
@ -48,12 +44,12 @@ class WorkerTest extends TestCase
|
|||||||
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
|
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
|
||||||
|
|
||||||
$bus->expects($this->at(0))->method('dispatch')->with(
|
$bus->expects($this->at(0))->method('dispatch')->with(
|
||||||
$envelope = new Envelope($apiMessage, [new ReceivedStamp('transport'), new ConsumedByWorkerStamp()])
|
new Envelope($apiMessage, [new ReceivedStamp('transport'), new ConsumedByWorkerStamp()])
|
||||||
)->willReturn($envelope);
|
)->willReturnArgument(0);
|
||||||
|
|
||||||
$bus->expects($this->at(1))->method('dispatch')->with(
|
$bus->expects($this->at(1))->method('dispatch')->with(
|
||||||
$envelope = new Envelope($ipaMessage, [new ReceivedStamp('transport'), new ConsumedByWorkerStamp()])
|
new Envelope($ipaMessage, [new ReceivedStamp('transport'), new ConsumedByWorkerStamp()])
|
||||||
)->willReturn($envelope);
|
)->willReturnArgument(0);
|
||||||
|
|
||||||
$worker = new Worker(['transport' => $receiver], $bus);
|
$worker = new Worker(['transport' => $receiver], $bus);
|
||||||
$worker->run([], function (?Envelope $envelope) use ($worker) {
|
$worker->run([], function (?Envelope $envelope) use ($worker) {
|
||||||
@ -66,95 +62,7 @@ class WorkerTest extends TestCase
|
|||||||
$this->assertSame(2, $receiver->getAcknowledgeCount());
|
$this->assertSame(2, $receiver->getAcknowledgeCount());
|
||||||
}
|
}
|
||||||
|
|
||||||
public function testWorkerDoesNotWrapMessagesAlreadyWrappedWithReceivedMessage()
|
public function testHandlingErrorCausesReject()
|
||||||
{
|
|
||||||
$envelope = new Envelope(new DummyMessage('API'));
|
|
||||||
$receiver = new DummyReceiver([[$envelope]]);
|
|
||||||
$envelope = $envelope->with(new ReceivedStamp('transport'), new ConsumedByWorkerStamp());
|
|
||||||
|
|
||||||
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
|
|
||||||
$bus->expects($this->at(0))->method('dispatch')->with($envelope)->willReturn($envelope);
|
|
||||||
|
|
||||||
$worker = new Worker(['transport' => $receiver], $bus, []);
|
|
||||||
$worker->run([], function (?Envelope $envelope) use ($worker) {
|
|
||||||
// stop after the messages finish
|
|
||||||
if (null === $envelope) {
|
|
||||||
$worker->stop();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
public function testDispatchCausesRetry()
|
|
||||||
{
|
|
||||||
$receiver = new DummyReceiver([
|
|
||||||
[new Envelope(new DummyMessage('Hello'), [new SentStamp('Some\Sender', 'transport1')])],
|
|
||||||
]);
|
|
||||||
|
|
||||||
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
|
|
||||||
$bus->expects($this->at(0))->method('dispatch')->willThrowException(new \InvalidArgumentException('Why not'));
|
|
||||||
|
|
||||||
// 2nd call will be the retry
|
|
||||||
$bus->expects($this->at(1))->method('dispatch')->with($this->callback(function (Envelope $envelope) {
|
|
||||||
/** @var RedeliveryStamp|null $redeliveryStamp */
|
|
||||||
$redeliveryStamp = $envelope->last(RedeliveryStamp::class);
|
|
||||||
$this->assertNotNull($redeliveryStamp);
|
|
||||||
// retry count now at 1
|
|
||||||
$this->assertSame(1, $redeliveryStamp->getRetryCount());
|
|
||||||
$this->assertSame('transport1', $redeliveryStamp->getSenderClassOrAlias());
|
|
||||||
|
|
||||||
// received stamp is removed
|
|
||||||
$this->assertNull($envelope->last(ReceivedStamp::class));
|
|
||||||
|
|
||||||
return true;
|
|
||||||
}))->willReturnArgument(0);
|
|
||||||
|
|
||||||
$retryStrategy = $this->getMockBuilder(RetryStrategyInterface::class)->getMock();
|
|
||||||
$retryStrategy->expects($this->once())->method('isRetryable')->willReturn(true);
|
|
||||||
|
|
||||||
$worker = new Worker(['transport1' => $receiver], $bus, ['transport1' => $retryStrategy]);
|
|
||||||
$worker->run([], function (?Envelope $envelope) use ($worker) {
|
|
||||||
// stop after the messages finish
|
|
||||||
if (null === $envelope) {
|
|
||||||
$worker->stop();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
// old message rejected
|
|
||||||
$this->assertSame(1, $receiver->getRejectCount());
|
|
||||||
}
|
|
||||||
|
|
||||||
public function testUnrecoverableMessageHandlingExceptionPreventsRetries()
|
|
||||||
{
|
|
||||||
$envelope1 = new Envelope(new DummyMessage('Unwrapped Exception'), [new SentStamp('Some\Sender', 'transport1')]);
|
|
||||||
$envelope2 = new Envelope(new DummyMessage('Wrapped Exception'), [new SentStamp('Some\Sender', 'transport1')]);
|
|
||||||
|
|
||||||
$receiver = new DummyReceiver([
|
|
||||||
[$envelope1],
|
|
||||||
[$envelope2],
|
|
||||||
]);
|
|
||||||
|
|
||||||
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
|
|
||||||
$bus->expects($this->at(0))->method('dispatch')->willThrowException(new UnrecoverableMessageHandlingException());
|
|
||||||
$bus->expects($this->at(1))->method('dispatch')->willThrowException(
|
|
||||||
new HandlerFailedException($envelope2, [new UnrecoverableMessageHandlingException()])
|
|
||||||
);
|
|
||||||
|
|
||||||
$retryStrategy = $this->getMockBuilder(RetryStrategyInterface::class)->getMock();
|
|
||||||
$retryStrategy->expects($this->never())->method('isRetryable')->willReturn(true);
|
|
||||||
|
|
||||||
$worker = new Worker(['transport1' => $receiver], $bus, ['transport1' => $retryStrategy]);
|
|
||||||
$worker->run([], function (?Envelope $envelope) use ($worker) {
|
|
||||||
// stop after the messages finish
|
|
||||||
if (null === $envelope) {
|
|
||||||
$worker->stop();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
// message was rejected
|
|
||||||
$this->assertSame(2, $receiver->getRejectCount());
|
|
||||||
}
|
|
||||||
|
|
||||||
public function testDispatchCausesRejectWhenNoRetry()
|
|
||||||
{
|
{
|
||||||
$receiver = new DummyReceiver([
|
$receiver = new DummyReceiver([
|
||||||
[new Envelope(new DummyMessage('Hello'), [new SentStamp('Some\Sender', 'transport1')])],
|
[new Envelope(new DummyMessage('Hello'), [new SentStamp('Some\Sender', 'transport1')])],
|
||||||
@ -163,10 +71,7 @@ class WorkerTest extends TestCase
|
|||||||
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
|
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
|
||||||
$bus->method('dispatch')->willThrowException(new \InvalidArgumentException('Why not'));
|
$bus->method('dispatch')->willThrowException(new \InvalidArgumentException('Why not'));
|
||||||
|
|
||||||
$retryStrategy = $this->getMockBuilder(RetryStrategyInterface::class)->getMock();
|
$worker = new Worker(['transport1' => $receiver], $bus);
|
||||||
$retryStrategy->expects($this->once())->method('isRetryable')->willReturn(false);
|
|
||||||
|
|
||||||
$worker = new Worker(['transport1' => $receiver], $bus, ['transport1' => $retryStrategy]);
|
|
||||||
$worker->run([], function (?Envelope $envelope) use ($worker) {
|
$worker->run([], function (?Envelope $envelope) use ($worker) {
|
||||||
// stop after the messages finish
|
// stop after the messages finish
|
||||||
if (null === $envelope) {
|
if (null === $envelope) {
|
||||||
@ -177,28 +82,6 @@ class WorkerTest extends TestCase
|
|||||||
$this->assertSame(0, $receiver->getAcknowledgeCount());
|
$this->assertSame(0, $receiver->getAcknowledgeCount());
|
||||||
}
|
}
|
||||||
|
|
||||||
public function testDispatchCausesRejectOnUnrecoverableMessage()
|
|
||||||
{
|
|
||||||
$receiver = new DummyReceiver([
|
|
||||||
[new Envelope(new DummyMessage('Hello'))],
|
|
||||||
]);
|
|
||||||
|
|
||||||
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
|
|
||||||
$bus->method('dispatch')->willThrowException(new UnrecoverableMessageHandlingException('Will never work'));
|
|
||||||
|
|
||||||
$retryStrategy = $this->getMockBuilder(RetryStrategyInterface::class)->getMock();
|
|
||||||
$retryStrategy->expects($this->never())->method('isRetryable');
|
|
||||||
|
|
||||||
$worker = new Worker(['transport1' => $receiver], $bus, ['transport1' => $retryStrategy]);
|
|
||||||
$worker->run([], function (?Envelope $envelope) use ($worker) {
|
|
||||||
// stop after the messages finish
|
|
||||||
if (null === $envelope) {
|
|
||||||
$worker->stop();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
$this->assertSame(1, $receiver->getRejectCount());
|
|
||||||
}
|
|
||||||
|
|
||||||
public function testWorkerDoesNotSendNullMessagesToTheBus()
|
public function testWorkerDoesNotSendNullMessagesToTheBus()
|
||||||
{
|
{
|
||||||
$receiver = new DummyReceiver([
|
$receiver = new DummyReceiver([
|
||||||
@ -235,7 +118,7 @@ class WorkerTest extends TestCase
|
|||||||
[$this->isInstanceOf(WorkerStoppedEvent::class)]
|
[$this->isInstanceOf(WorkerStoppedEvent::class)]
|
||||||
);
|
);
|
||||||
|
|
||||||
$worker = new Worker([$receiver], $bus, [], $eventDispatcher);
|
$worker = new Worker([$receiver], $bus, $eventDispatcher);
|
||||||
$worker->run([], function (?Envelope $envelope) use ($worker) {
|
$worker->run([], function (?Envelope $envelope) use ($worker) {
|
||||||
// stop after the messages finish
|
// stop after the messages finish
|
||||||
if (null === $envelope) {
|
if (null === $envelope) {
|
||||||
@ -263,7 +146,7 @@ class WorkerTest extends TestCase
|
|||||||
[$this->isInstanceOf(WorkerStoppedEvent::class)]
|
[$this->isInstanceOf(WorkerStoppedEvent::class)]
|
||||||
);
|
);
|
||||||
|
|
||||||
$worker = new Worker([$receiver], $bus, [], $eventDispatcher);
|
$worker = new Worker([$receiver], $bus, $eventDispatcher);
|
||||||
$worker->run([], function (?Envelope $envelope) use ($worker) {
|
$worker->run([], function (?Envelope $envelope) use ($worker) {
|
||||||
// stop after the messages finish
|
// stop after the messages finish
|
||||||
if (null === $envelope) {
|
if (null === $envelope) {
|
||||||
|
@ -15,7 +15,6 @@ use Psr\Container\ContainerInterface;
|
|||||||
use Symfony\Component\DependencyInjection\ServiceLocator;
|
use Symfony\Component\DependencyInjection\ServiceLocator;
|
||||||
use Symfony\Component\Messenger\Envelope;
|
use Symfony\Component\Messenger\Envelope;
|
||||||
use Symfony\Component\Messenger\Exception\RuntimeException;
|
use Symfony\Component\Messenger\Exception\RuntimeException;
|
||||||
use Symfony\Component\Messenger\Exception\UnknownSenderException;
|
|
||||||
use Symfony\Component\Messenger\Handler\HandlersLocator;
|
use Symfony\Component\Messenger\Handler\HandlersLocator;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -79,13 +78,4 @@ class SendersLocator implements SendersLocatorInterface
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public function getSenderByAlias(string $alias): SenderInterface
|
|
||||||
{
|
|
||||||
if ($this->sendersLocator->has($alias)) {
|
|
||||||
return $this->sendersLocator->get($alias);
|
|
||||||
}
|
|
||||||
|
|
||||||
throw new UnknownSenderException(sprintf('Unknown sender alias "%s".', $alias));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -12,7 +12,6 @@
|
|||||||
namespace Symfony\Component\Messenger\Transport\Sender;
|
namespace Symfony\Component\Messenger\Transport\Sender;
|
||||||
|
|
||||||
use Symfony\Component\Messenger\Envelope;
|
use Symfony\Component\Messenger\Envelope;
|
||||||
use Symfony\Component\Messenger\Exception\UnknownSenderException;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Maps a message to a list of senders.
|
* Maps a message to a list of senders.
|
||||||
@ -28,13 +27,4 @@ interface SendersLocatorInterface
|
|||||||
* @return iterable|SenderInterface[] Indexed by sender alias if available
|
* @return iterable|SenderInterface[] Indexed by sender alias if available
|
||||||
*/
|
*/
|
||||||
public function getSenders(Envelope $envelope): iterable;
|
public function getSenders(Envelope $envelope): iterable;
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns a specific sender by its alias.
|
|
||||||
*
|
|
||||||
* @param string $alias The alias given to the sender in getSenders()
|
|
||||||
*
|
|
||||||
* @throws UnknownSenderException If the sender is not found
|
|
||||||
*/
|
|
||||||
public function getSenderByAlias(string $alias): SenderInterface;
|
|
||||||
}
|
}
|
||||||
|
@ -19,12 +19,8 @@ use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
|
|||||||
use Symfony\Component\Messenger\Event\WorkerStoppedEvent;
|
use Symfony\Component\Messenger\Event\WorkerStoppedEvent;
|
||||||
use Symfony\Component\Messenger\Exception\HandlerFailedException;
|
use Symfony\Component\Messenger\Exception\HandlerFailedException;
|
||||||
use Symfony\Component\Messenger\Exception\RejectRedeliveredMessageException;
|
use Symfony\Component\Messenger\Exception\RejectRedeliveredMessageException;
|
||||||
use Symfony\Component\Messenger\Exception\UnrecoverableExceptionInterface;
|
|
||||||
use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
|
|
||||||
use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp;
|
use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp;
|
||||||
use Symfony\Component\Messenger\Stamp\DelayStamp;
|
|
||||||
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
|
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
|
||||||
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
|
|
||||||
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
|
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
|
||||||
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
|
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
|
||||||
|
|
||||||
@ -37,20 +33,17 @@ class Worker implements WorkerInterface
|
|||||||
{
|
{
|
||||||
private $receivers;
|
private $receivers;
|
||||||
private $bus;
|
private $bus;
|
||||||
private $retryStrategies;
|
|
||||||
private $eventDispatcher;
|
private $eventDispatcher;
|
||||||
private $logger;
|
private $logger;
|
||||||
private $shouldStop = false;
|
private $shouldStop = false;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param ReceiverInterface[] $receivers Where the key is the transport name
|
* @param ReceiverInterface[] $receivers Where the key is the transport name
|
||||||
* @param RetryStrategyInterface[] $retryStrategies Retry strategies for each receiver (array keys must match)
|
|
||||||
*/
|
*/
|
||||||
public function __construct(array $receivers, MessageBusInterface $bus, array $retryStrategies = [], EventDispatcherInterface $eventDispatcher = null, LoggerInterface $logger = null)
|
public function __construct(array $receivers, MessageBusInterface $bus, EventDispatcherInterface $eventDispatcher = null, LoggerInterface $logger = null)
|
||||||
{
|
{
|
||||||
$this->receivers = $receivers;
|
$this->receivers = $receivers;
|
||||||
$this->bus = $bus;
|
$this->bus = $bus;
|
||||||
$this->retryStrategies = $retryStrategies;
|
|
||||||
$this->eventDispatcher = LegacyEventDispatcherProxy::decorate($eventDispatcher);
|
$this->eventDispatcher = LegacyEventDispatcherProxy::decorate($eventDispatcher);
|
||||||
$this->logger = $logger;
|
$this->logger = $logger;
|
||||||
}
|
}
|
||||||
@ -91,7 +84,7 @@ class Worker implements WorkerInterface
|
|||||||
foreach ($envelopes as $envelope) {
|
foreach ($envelopes as $envelope) {
|
||||||
$envelopeHandled = true;
|
$envelopeHandled = true;
|
||||||
|
|
||||||
$this->handleMessage($envelope, $receiver, $transportName, $this->retryStrategies[$transportName] ?? null);
|
$this->handleMessage($envelope, $receiver, $transportName);
|
||||||
$onHandled($envelope);
|
$onHandled($envelope);
|
||||||
|
|
||||||
if ($this->shouldStop) {
|
if ($this->shouldStop) {
|
||||||
@ -117,7 +110,7 @@ class Worker implements WorkerInterface
|
|||||||
$this->dispatchEvent(new WorkerStoppedEvent());
|
$this->dispatchEvent(new WorkerStoppedEvent());
|
||||||
}
|
}
|
||||||
|
|
||||||
private function handleMessage(Envelope $envelope, ReceiverInterface $receiver, string $transportName, ?RetryStrategyInterface $retryStrategy): void
|
private function handleMessage(Envelope $envelope, ReceiverInterface $receiver, string $transportName): void
|
||||||
{
|
{
|
||||||
$event = new WorkerMessageReceivedEvent($envelope, $transportName);
|
$event = new WorkerMessageReceivedEvent($envelope, $transportName);
|
||||||
$this->dispatchEvent($event);
|
$this->dispatchEvent($event);
|
||||||
@ -126,12 +119,6 @@ class Worker implements WorkerInterface
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
$message = $envelope->getMessage();
|
|
||||||
$context = [
|
|
||||||
'message' => $message,
|
|
||||||
'class' => \get_class($message),
|
|
||||||
];
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
$envelope = $this->bus->dispatch($envelope->with(new ReceivedStamp($transportName), new ConsumedByWorkerStamp()));
|
$envelope = $this->bus->dispatch($envelope->with(new ReceivedStamp($transportName), new ConsumedByWorkerStamp()));
|
||||||
} catch (\Throwable $throwable) {
|
} catch (\Throwable $throwable) {
|
||||||
@ -146,30 +133,7 @@ class Worker implements WorkerInterface
|
|||||||
$envelope = $throwable->getEnvelope();
|
$envelope = $throwable->getEnvelope();
|
||||||
}
|
}
|
||||||
|
|
||||||
$shouldRetry = $retryStrategy && $this->shouldRetry($throwable, $envelope, $retryStrategy);
|
$this->dispatchEvent(new WorkerMessageFailedEvent($envelope, $transportName, $throwable));
|
||||||
|
|
||||||
$this->dispatchEvent(new WorkerMessageFailedEvent($envelope, $transportName, $throwable, $shouldRetry));
|
|
||||||
|
|
||||||
$retryCount = RedeliveryStamp::getRetryCountFromEnvelope($envelope);
|
|
||||||
if ($shouldRetry) {
|
|
||||||
++$retryCount;
|
|
||||||
$delay = $retryStrategy->getWaitingTime($envelope);
|
|
||||||
if (null !== $this->logger) {
|
|
||||||
$this->logger->error('Error thrown while handling message {class}. Dispatching for retry #{retryCount} using {delay} ms delay. Error: "{error}"', $context + ['retryCount' => $retryCount, 'delay' => $delay, 'error' => $throwable->getMessage(), 'exception' => $throwable]);
|
|
||||||
}
|
|
||||||
|
|
||||||
// add the delay and retry stamp info + remove ReceivedStamp
|
|
||||||
$retryEnvelope = $envelope->with(new DelayStamp($delay))
|
|
||||||
->with(new RedeliveryStamp($retryCount, $transportName))
|
|
||||||
->withoutAll(ReceivedStamp::class);
|
|
||||||
|
|
||||||
// re-send the message for retry
|
|
||||||
$this->bus->dispatch($retryEnvelope);
|
|
||||||
} else {
|
|
||||||
if (null !== $this->logger) {
|
|
||||||
$this->logger->critical('Error thrown while handling message {class}. Removing from transport after {retryCount} retries. Error: "{error}"', $context + ['retryCount' => $retryCount, 'error' => $throwable->getMessage(), 'exception' => $throwable]);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!$rejectFirst) {
|
if (!$rejectFirst) {
|
||||||
$receiver->reject($envelope);
|
$receiver->reject($envelope);
|
||||||
@ -181,6 +145,11 @@ class Worker implements WorkerInterface
|
|||||||
$this->dispatchEvent(new WorkerMessageHandledEvent($envelope, $transportName));
|
$this->dispatchEvent(new WorkerMessageHandledEvent($envelope, $transportName));
|
||||||
|
|
||||||
if (null !== $this->logger) {
|
if (null !== $this->logger) {
|
||||||
|
$message = $envelope->getMessage();
|
||||||
|
$context = [
|
||||||
|
'message' => $message,
|
||||||
|
'class' => \get_class($message),
|
||||||
|
];
|
||||||
$this->logger->info('{class} was handled successfully (acknowledging to transport).', $context);
|
$this->logger->info('{class} was handled successfully (acknowledging to transport).', $context);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -200,27 +169,4 @@ class Worker implements WorkerInterface
|
|||||||
|
|
||||||
$this->eventDispatcher->dispatch($event);
|
$this->eventDispatcher->dispatch($event);
|
||||||
}
|
}
|
||||||
|
|
||||||
private function shouldRetry(\Throwable $e, Envelope $envelope, RetryStrategyInterface $retryStrategy): bool
|
|
||||||
{
|
|
||||||
// if ALL nested Exceptions are an instance of UnrecoverableExceptionInterface we should not retry
|
|
||||||
if ($e instanceof HandlerFailedException) {
|
|
||||||
$shouldNotRetry = true;
|
|
||||||
foreach ($e->getNestedExceptions() as $nestedException) {
|
|
||||||
if (!$nestedException instanceof UnrecoverableExceptionInterface) {
|
|
||||||
$shouldNotRetry = false;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if ($shouldNotRetry) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if ($e instanceof UnrecoverableExceptionInterface) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
return $retryStrategy->isRetryable($envelope);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user