From d7e0f98cd0f2ecc267f4b6c1a4434f110596f1aa Mon Sep 17 00:00:00 2001 From: Tobias Schultze Date: Tue, 29 Oct 2019 15:55:01 +0100 Subject: [PATCH] [Messenger] extract worker logic to listener and sent messages for retry and failure directly to transport instead of redispatching on the bus --- UPGRADE-4.4.md | 5 + .../FrameworkExtension.php | 17 ++- .../Resources/config/console.xml | 8 +- .../Resources/config/messenger.xml | 13 +- .../Bundle/FrameworkBundle/composer.json | 4 +- src/Symfony/Component/Messenger/CHANGELOG.md | 6 + .../Command/AbstractFailedMessagesCommand.php | 2 +- .../Command/ConsumeMessagesCommand.php | 18 +-- .../Command/FailedMessagesRetryCommand.php | 6 +- .../Event/WorkerMessageFailedEvent.php | 10 +- .../SendFailedMessageForRetryListener.php | 126 +++++++++++++++++ ...ailedMessageToFailureTransportListener.php | 27 ++-- .../Exception/UnknownSenderException.php | 19 --- .../RejectRedeliveredMessageMiddleware.php | 10 +- .../Middleware/SendMessageMiddleware.php | 24 +--- .../Messenger/Stamp/RedeliveryStamp.php | 17 +-- .../Command/FailedMessagesShowCommandTest.php | 6 +- .../DependencyInjection/MessengerPassTest.php | 1 - .../SendFailedMessageForRetryListenerTest.php | 66 +++++++++ ...dMessageToFailureTransportListenerTest.php | 61 +++----- .../Tests/FailureIntegrationTest.php | 33 +++-- .../DummyMessageHandlerFailingFirstTimes.php | 39 ----- .../Middleware/SendMessageMiddlewareTest.php | 44 ------ .../Retry/MultiplierRetryStrategyTest.php | 8 +- .../Messenger/Tests/RetryIntegrationTest.php | 100 ------------- .../Tests/Stamp/RedeliveryStampTest.php | 5 +- .../AmqpExt/AmqpExtIntegrationTest.php | 2 +- .../Transport/Sender/SendersLocatorTest.php | 30 ---- .../Component/Messenger/Tests/WorkerTest.php | 133 ++---------------- .../Transport/Sender/SendersLocator.php | 10 -- .../Sender/SendersLocatorInterface.php | 10 -- src/Symfony/Component/Messenger/Worker.php | 74 ++-------- 32 files changed, 332 insertions(+), 602 deletions(-) create mode 100644 src/Symfony/Component/Messenger/EventListener/SendFailedMessageForRetryListener.php delete mode 100644 src/Symfony/Component/Messenger/Exception/UnknownSenderException.php create mode 100644 src/Symfony/Component/Messenger/Tests/EventListener/SendFailedMessageForRetryListenerTest.php delete mode 100644 src/Symfony/Component/Messenger/Tests/Fixtures/DummyMessageHandlerFailingFirstTimes.php delete mode 100644 src/Symfony/Component/Messenger/Tests/RetryIntegrationTest.php diff --git a/UPGRADE-4.4.md b/UPGRADE-4.4.md index 6144bc150d..9a86328526 100644 --- a/UPGRADE-4.4.md +++ b/UPGRADE-4.4.md @@ -154,6 +154,11 @@ Messenger * Deprecated passing a `ContainerInterface` instance as first argument of the `ConsumeMessagesCommand` constructor, pass a `RoutableMessageBus` instance instead. + * [BC BREAK] Removed `SendersLocatorInterface::getSenderByAlias` added in 4.3. + * [BC BREAK] Removed `$retryStrategies` argument from `Worker::__construct`. + * [BC BREAK] Removed `$retryStrategyLocator` argument from `ConsumeMessagesCommand::__construct`. + * [BC BREAK] Removed `$senderClassOrAlias` argument from `RedeliveryStamp::__construct`. + * [BC BREAK] Removed `UnknownSenderException`. Mime ---- diff --git a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php index e58b800864..0d395d59c9 100644 --- a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php +++ b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php @@ -1810,20 +1810,29 @@ class FrameworkExtension extends Extension $messageToSendersMapping[$message] = $messageConfiguration['senders']; } + $sendersServiceLocator = ServiceLocatorTagPass::register($container, $senderReferences); + $container->getDefinition('messenger.senders_locator') ->replaceArgument(0, $messageToSendersMapping) - ->replaceArgument(1, ServiceLocatorTagPass::register($container, $senderReferences)) + ->replaceArgument(1, $sendersServiceLocator) + ; + + $container->getDefinition('messenger.retry.send_failed_message_for_retry_listener') + ->replaceArgument(0, $sendersServiceLocator) ; $container->getDefinition('messenger.retry_strategy_locator') ->replaceArgument(0, $transportRetryReferences); if ($config['failure_transport']) { + if (!isset($senderReferences[$config['failure_transport']])) { + throw new LogicException(sprintf('Invalid Messenger configuration: the failure transport "%s" is not a valid transport or service id.', $config['failure_transport'])); + } + $container->getDefinition('messenger.failure.send_failed_message_to_failure_transport_listener') - ->replaceArgument(1, $config['failure_transport']); + ->replaceArgument(0, $senderReferences[$config['failure_transport']]); $container->getDefinition('console.command.messenger_failed_messages_retry') - ->replaceArgument(0, $config['failure_transport']) - ->replaceArgument(4, $transportRetryReferences[$config['failure_transport']] ?? null); + ->replaceArgument(0, $config['failure_transport']); $container->getDefinition('console.command.messenger_failed_messages_show') ->replaceArgument(0, $config['failure_transport']); $container->getDefinition('console.command.messenger_failed_messages_remove') diff --git a/src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml b/src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml index 9a586e0a58..5da94a907a 100644 --- a/src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml +++ b/src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml @@ -86,7 +86,6 @@ - @@ -116,10 +115,9 @@ - + - @@ -127,14 +125,14 @@ - + - + diff --git a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml index 2284dd7963..82014fe5fb 100644 --- a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml +++ b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml @@ -10,7 +10,7 @@ - + @@ -98,12 +98,19 @@ + + + + + + + + - - + diff --git a/src/Symfony/Bundle/FrameworkBundle/composer.json b/src/Symfony/Bundle/FrameworkBundle/composer.json index ebba20c08f..e3faf21102 100644 --- a/src/Symfony/Bundle/FrameworkBundle/composer.json +++ b/src/Symfony/Bundle/FrameworkBundle/composer.json @@ -45,7 +45,7 @@ "symfony/http-client": "^4.4|^5.0", "symfony/lock": "^4.4|^5.0", "symfony/mailer": "^4.4|^5.0", - "symfony/messenger": "^4.3.6|^5.0", + "symfony/messenger": "^4.4|^5.0", "symfony/mime": "^4.4|^5.0", "symfony/process": "^3.4|^4.0|^5.0", "symfony/security-csrf": "^3.4|^4.0|^5.0", @@ -77,7 +77,7 @@ "symfony/form": "<4.3", "symfony/lock": "<4.4", "symfony/mailer": "<4.4", - "symfony/messenger": "<4.3.6", + "symfony/messenger": "<4.4", "symfony/mime": "<4.4", "symfony/property-info": "<3.4", "symfony/security-bundle": "<4.4", diff --git a/src/Symfony/Component/Messenger/CHANGELOG.md b/src/Symfony/Component/Messenger/CHANGELOG.md index 30e195f7ee..1e7fef642d 100644 --- a/src/Symfony/Component/Messenger/CHANGELOG.md +++ b/src/Symfony/Component/Messenger/CHANGELOG.md @@ -11,6 +11,12 @@ CHANGELOG * Made all dispatched worker event classes final. * Added support for `from_transport` attribute on `messenger.message_handler` tag. * Added support for passing `dbindex` as a query parameter to the redis transport DSN. + * [BC BREAK] Removed `SendersLocatorInterface::getSenderByAlias` added in 4.3. + * [BC BREAK] Removed `$retryStrategies` argument from `Worker::__construct`. + * [BC BREAK] Removed `$retryStrategyLocator` argument from `ConsumeMessagesCommand::__construct`. + * [BC BREAK] Removed `$senderClassOrAlias` argument from `RedeliveryStamp::__construct`. + * [BC BREAK] Removed `UnknownSenderException`. + * The component is not marked as `@experimental` anymore. 4.3.0 ----- diff --git a/src/Symfony/Component/Messenger/Command/AbstractFailedMessagesCommand.php b/src/Symfony/Component/Messenger/Command/AbstractFailedMessagesCommand.php index 2dde1af659..a95eb1fd41 100644 --- a/src/Symfony/Component/Messenger/Command/AbstractFailedMessagesCommand.php +++ b/src/Symfony/Component/Messenger/Command/AbstractFailedMessagesCommand.php @@ -89,7 +89,7 @@ abstract class AbstractFailedMessagesCommand extends Command $redeliveryStamps = $envelope->all(RedeliveryStamp::class); $io->writeln(' Message history:'); foreach ($redeliveryStamps as $redeliveryStamp) { - $io->writeln(sprintf(' * Message failed and redelivered to the %s transport at %s', $redeliveryStamp->getSenderClassOrAlias(), $redeliveryStamp->getRedeliveredAt()->format('Y-m-d H:i:s'))); + $io->writeln(sprintf(' * Message failed at %s and was redelivered', $redeliveryStamp->getRedeliveredAt()->format('Y-m-d H:i:s'))); } $io->newLine(); diff --git a/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php b/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php index 4e99703870..da2a51acd9 100644 --- a/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php +++ b/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php @@ -42,7 +42,6 @@ class ConsumeMessagesCommand extends Command private $receiverLocator; private $logger; private $receiverNames; - private $retryStrategyLocator; private $eventDispatcher; /** @var CacheItemPoolInterface|null */ private $restartSignalCachePool; @@ -50,7 +49,7 @@ class ConsumeMessagesCommand extends Command /** * @param RoutableMessageBus $routableBus */ - public function __construct($routableBus, ContainerInterface $receiverLocator, LoggerInterface $logger = null, array $receiverNames = [], /* ContainerInterface */ $retryStrategyLocator = null, EventDispatcherInterface $eventDispatcher = null) + public function __construct($routableBus, ContainerInterface $receiverLocator, LoggerInterface $logger = null, array $receiverNames = [], /* EventDispatcherInterface */ $eventDispatcher = null) { if ($routableBus instanceof ContainerInterface) { @trigger_error(sprintf('Passing a "%s" instance as first argument to "%s()" is deprecated since Symfony 4.4, pass a "%s" instance instead.', ContainerInterface::class, __METHOD__, RoutableMessageBus::class), E_USER_DEPRECATED); @@ -59,17 +58,16 @@ class ConsumeMessagesCommand extends Command throw new \TypeError(sprintf('The first argument must be an instance of "%s".', RoutableMessageBus::class)); } - if (\is_array($retryStrategyLocator)) { - @trigger_error(sprintf('The 5th argument of the class "%s" should be a retry-strategy locator, an array of bus names as a value is deprecated since Symfony 4.3.', __CLASS__), E_USER_DEPRECATED); + if (null !== $eventDispatcher && !$eventDispatcher instanceof EventDispatcherInterface) { + @trigger_error(sprintf('The 5th argument of the class "%s" should be a "%s"', __CLASS__, EventDispatcherInterface::class), E_USER_DEPRECATED); - $retryStrategyLocator = null; + $eventDispatcher = null; } $this->routableBus = $routableBus; $this->receiverLocator = $receiverLocator; $this->logger = $logger; $this->receiverNames = $receiverNames; - $this->retryStrategyLocator = $retryStrategyLocator; $this->eventDispatcher = $eventDispatcher; parent::__construct(); @@ -166,7 +164,6 @@ EOF } $receivers = []; - $retryStrategies = []; foreach ($receiverNames = $input->getArgument('receivers') as $receiverName) { if (!$this->receiverLocator->has($receiverName)) { $message = sprintf('The receiver "%s" does not exist.', $receiverName); @@ -177,17 +174,12 @@ EOF throw new RuntimeException($message); } - if (null !== $this->retryStrategyLocator && !$this->retryStrategyLocator->has($receiverName)) { - throw new RuntimeException(sprintf('Receiver "%s" does not have a configured retry strategy.', $receiverName)); - } - $receivers[$receiverName] = $this->receiverLocator->get($receiverName); - $retryStrategies[$receiverName] = null !== $this->retryStrategyLocator ? $this->retryStrategyLocator->get($receiverName) : null; } $bus = $input->getOption('bus') ? $this->routableBus->getMessageBus($input->getOption('bus')) : $this->routableBus; - $worker = new Worker($receivers, $bus, $retryStrategies, $this->eventDispatcher, $this->logger); + $worker = new Worker($receivers, $bus, $this->eventDispatcher, $this->logger); $stopsWhen = []; if ($limit = $input->getOption('limit')) { $stopsWhen[] = "processed {$limit} messages"; diff --git a/src/Symfony/Component/Messenger/Command/FailedMessagesRetryCommand.php b/src/Symfony/Component/Messenger/Command/FailedMessagesRetryCommand.php index c8981b59ad..4a0cc44191 100644 --- a/src/Symfony/Component/Messenger/Command/FailedMessagesRetryCommand.php +++ b/src/Symfony/Component/Messenger/Command/FailedMessagesRetryCommand.php @@ -24,7 +24,6 @@ use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent; use Symfony\Component\Messenger\Exception\LogicException; use Symfony\Component\Messenger\MessageBusInterface; -use Symfony\Component\Messenger\Retry\RetryStrategyInterface; use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface; use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface; use Symfony\Component\Messenger\Transport\Receiver\SingleMessageReceiver; @@ -39,14 +38,12 @@ class FailedMessagesRetryCommand extends AbstractFailedMessagesCommand private $eventDispatcher; private $messageBus; - private $retryStrategy; private $logger; - public function __construct(string $receiverName, ReceiverInterface $receiver, MessageBusInterface $messageBus, EventDispatcherInterface $eventDispatcher, RetryStrategyInterface $retryStrategy = null, LoggerInterface $logger = null) + public function __construct(string $receiverName, ReceiverInterface $receiver, MessageBusInterface $messageBus, EventDispatcherInterface $eventDispatcher, LoggerInterface $logger = null) { $this->eventDispatcher = $eventDispatcher; $this->messageBus = $messageBus; - $this->retryStrategy = $retryStrategy; $this->logger = $logger; parent::__construct($receiverName, $receiver); @@ -180,7 +177,6 @@ EOF $worker = new Worker( [$this->getReceiverName() => $receiver], $this->messageBus, - [$this->getReceiverName() => $this->retryStrategy], $this->eventDispatcher, $this->logger ); diff --git a/src/Symfony/Component/Messenger/Event/WorkerMessageFailedEvent.php b/src/Symfony/Component/Messenger/Event/WorkerMessageFailedEvent.php index 5867979699..7fb858c87c 100644 --- a/src/Symfony/Component/Messenger/Event/WorkerMessageFailedEvent.php +++ b/src/Symfony/Component/Messenger/Event/WorkerMessageFailedEvent.php @@ -21,12 +21,11 @@ use Symfony\Component\Messenger\Envelope; final class WorkerMessageFailedEvent extends AbstractWorkerMessageEvent { private $throwable; - private $willRetry; + private $willRetry = false; - public function __construct(Envelope $envelope, string $receiverName, \Throwable $error, bool $willRetry) + public function __construct(Envelope $envelope, string $receiverName, \Throwable $error) { $this->throwable = $error; - $this->willRetry = $willRetry; parent::__construct($envelope, $receiverName); } @@ -40,4 +39,9 @@ final class WorkerMessageFailedEvent extends AbstractWorkerMessageEvent { return $this->willRetry; } + + public function setForRetry(): void + { + $this->willRetry = true; + } } diff --git a/src/Symfony/Component/Messenger/EventListener/SendFailedMessageForRetryListener.php b/src/Symfony/Component/Messenger/EventListener/SendFailedMessageForRetryListener.php new file mode 100644 index 0000000000..5e654b51d4 --- /dev/null +++ b/src/Symfony/Component/Messenger/EventListener/SendFailedMessageForRetryListener.php @@ -0,0 +1,126 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\EventListener; + +use Psr\Container\ContainerInterface; +use Psr\Log\LoggerInterface; +use Symfony\Component\EventDispatcher\EventSubscriberInterface; +use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent; +use Symfony\Component\Messenger\Exception\HandlerFailedException; +use Symfony\Component\Messenger\Exception\RuntimeException; +use Symfony\Component\Messenger\Exception\UnrecoverableExceptionInterface; +use Symfony\Component\Messenger\Retry\RetryStrategyInterface; +use Symfony\Component\Messenger\Stamp\DelayStamp; +use Symfony\Component\Messenger\Stamp\RedeliveryStamp; +use Symfony\Component\Messenger\Transport\Sender\SenderInterface; + +/** + * @author Tobias Schultze + */ +class SendFailedMessageForRetryListener implements EventSubscriberInterface +{ + private $sendersLocator; + private $retryStrategyLocator; + private $logger; + + public function __construct(ContainerInterface $sendersLocator, ContainerInterface $retryStrategyLocator, LoggerInterface $logger = null) + { + $this->sendersLocator = $sendersLocator; + $this->retryStrategyLocator = $retryStrategyLocator; + $this->logger = $logger; + } + + public function onMessageFailed(WorkerMessageFailedEvent $event) + { + $retryStrategy = $this->getRetryStrategyForTransport($event->getReceiverName()); + $envelope = $event->getEnvelope(); + $throwable = $event->getThrowable(); + + $message = $envelope->getMessage(); + $context = [ + 'message' => $message, + 'class' => \get_class($message), + ]; + + $shouldRetry = $retryStrategy && $this->shouldRetry($throwable, $envelope, $retryStrategy); + + $retryCount = RedeliveryStamp::getRetryCountFromEnvelope($envelope); + if ($shouldRetry) { + $event->setForRetry(); + + ++$retryCount; + $delay = $retryStrategy->getWaitingTime($envelope); + if (null !== $this->logger) { + $this->logger->error('Error thrown while handling message {class}. Sending for retry #{retryCount} using {delay} ms delay. Error: "{error}"', $context + ['retryCount' => $retryCount, 'delay' => $delay, 'error' => $throwable->getMessage(), 'exception' => $throwable]); + } + + // add the delay and retry stamp info + $retryEnvelope = $envelope->with(new DelayStamp($delay), new RedeliveryStamp($retryCount)); + + // re-send the message for retry + $this->getSenderForTransport($event->getReceiverName())->send($retryEnvelope); + } else { + if (null !== $this->logger) { + $this->logger->critical('Error thrown while handling message {class}. Removing from transport after {retryCount} retries. Error: "{error}"', $context + ['retryCount' => $retryCount, 'error' => $throwable->getMessage(), 'exception' => $throwable]); + } + } + } + + public static function getSubscribedEvents() + { + return [ + // must have higher priority than SendFailedMessageToFailureTransportListener + WorkerMessageFailedEvent::class => ['onMessageFailed', 100], + ]; + } + + private function shouldRetry(\Throwable $e, Envelope $envelope, RetryStrategyInterface $retryStrategy): bool + { + // if ALL nested Exceptions are an instance of UnrecoverableExceptionInterface we should not retry + if ($e instanceof HandlerFailedException) { + $shouldNotRetry = true; + foreach ($e->getNestedExceptions() as $nestedException) { + if (!$nestedException instanceof UnrecoverableExceptionInterface) { + $shouldNotRetry = false; + break; + } + } + if ($shouldNotRetry) { + return false; + } + } + + if ($e instanceof UnrecoverableExceptionInterface) { + return false; + } + + return $retryStrategy->isRetryable($envelope); + } + + private function getRetryStrategyForTransport(string $alias): ?RetryStrategyInterface + { + if ($this->retryStrategyLocator->has($alias)) { + return $this->retryStrategyLocator->get($alias); + } + + return null; + } + + private function getSenderForTransport(string $alias): SenderInterface + { + if ($this->sendersLocator->has($alias)) { + return $this->sendersLocator->get($alias); + } + + throw new RuntimeException(sprintf('Could not find sender "%s" based on the same receiver to send the failed message to for retry.', $alias)); + } +} diff --git a/src/Symfony/Component/Messenger/EventListener/SendFailedMessageToFailureTransportListener.php b/src/Symfony/Component/Messenger/EventListener/SendFailedMessageToFailureTransportListener.php index 2e431e654a..0aa8fd5ea7 100644 --- a/src/Symfony/Component/Messenger/EventListener/SendFailedMessageToFailureTransportListener.php +++ b/src/Symfony/Component/Messenger/EventListener/SendFailedMessageToFailureTransportListener.php @@ -15,12 +15,10 @@ use Symfony\Component\ErrorRenderer\Exception\FlattenException; use Symfony\Component\EventDispatcher\EventSubscriberInterface; use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent; use Symfony\Component\Messenger\Exception\HandlerFailedException; -use Symfony\Component\Messenger\MessageBusInterface; use Symfony\Component\Messenger\Stamp\DelayStamp; -use Symfony\Component\Messenger\Stamp\ReceivedStamp; use Symfony\Component\Messenger\Stamp\RedeliveryStamp; use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp; -use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp; +use Symfony\Component\Messenger\Transport\Sender\SenderInterface; /** * Sends a rejected message to a "failure transport". @@ -29,14 +27,12 @@ use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp; */ class SendFailedMessageToFailureTransportListener implements EventSubscriberInterface { - private $messageBus; - private $failureSenderAlias; + private $failureSender; private $logger; - public function __construct(MessageBusInterface $messageBus, string $failureSenderAlias, LoggerInterface $logger = null) + public function __construct(SenderInterface $failureSender, LoggerInterface $logger = null) { - $this->messageBus = $messageBus; - $this->failureSenderAlias = $failureSenderAlias; + $this->failureSender = $failureSender; $this->logger = $logger; } @@ -53,27 +49,26 @@ class SendFailedMessageToFailureTransportListener implements EventSubscriberInte return; } - // remove the received stamp so it's redelivered $throwable = $event->getThrowable(); if ($throwable instanceof HandlerFailedException) { $throwable = $throwable->getNestedExceptions()[0]; } $flattenedException = class_exists(FlattenException::class) ? FlattenException::createFromThrowable($throwable) : null; - $envelope = $envelope->withoutAll(ReceivedStamp::class) - ->withoutAll(TransportMessageIdStamp::class) - ->with(new SentToFailureTransportStamp($event->getReceiverName())) - ->with(new DelayStamp(0)) - ->with(new RedeliveryStamp(0, $this->failureSenderAlias, $throwable->getMessage(), $flattenedException)); + $envelope = $envelope->with( + new SentToFailureTransportStamp($event->getReceiverName()), + new DelayStamp(0), + new RedeliveryStamp(0, $throwable->getMessage(), $flattenedException) + ); if (null !== $this->logger) { $this->logger->info('Rejected message {class} will be sent to the failure transport {transport}.', [ 'class' => \get_class($envelope->getMessage()), - 'transport' => $this->failureSenderAlias, + 'transport' => \get_class($this->failureSender), ]); } - $this->messageBus->dispatch($envelope); + $this->failureSender->send($envelope); } public static function getSubscribedEvents() diff --git a/src/Symfony/Component/Messenger/Exception/UnknownSenderException.php b/src/Symfony/Component/Messenger/Exception/UnknownSenderException.php deleted file mode 100644 index 63555c8ee9..0000000000 --- a/src/Symfony/Component/Messenger/Exception/UnknownSenderException.php +++ /dev/null @@ -1,19 +0,0 @@ - - * - * For the full copyright and license information, please view the LICENSE - * file that was distributed with this source code. - */ - -namespace Symfony\Component\Messenger\Exception; - -/** - * @author Ryan Weaver - */ -class UnknownSenderException extends InvalidArgumentException -{ -} diff --git a/src/Symfony/Component/Messenger/Middleware/RejectRedeliveredMessageMiddleware.php b/src/Symfony/Component/Messenger/Middleware/RejectRedeliveredMessageMiddleware.php index 548197242d..d036790ddb 100644 --- a/src/Symfony/Component/Messenger/Middleware/RejectRedeliveredMessageMiddleware.php +++ b/src/Symfony/Component/Messenger/Middleware/RejectRedeliveredMessageMiddleware.php @@ -13,7 +13,6 @@ namespace Symfony\Component\Messenger\Middleware; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Exception\RejectRedeliveredMessageException; -use Symfony\Component\Messenger\Stamp\ReceivedStamp; use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceivedStamp; /** @@ -34,13 +33,10 @@ class RejectRedeliveredMessageMiddleware implements MiddlewareInterface { public function handle(Envelope $envelope, StackInterface $stack): Envelope { - // ignore the dispatched messages for retry - if (null !== $envelope->last(ReceivedStamp::class)) { - $amqpReceivedStamp = $envelope->last(AmqpReceivedStamp::class); + $amqpReceivedStamp = $envelope->last(AmqpReceivedStamp::class); - if ($amqpReceivedStamp instanceof AmqpReceivedStamp && $amqpReceivedStamp->getAmqpEnvelope()->isRedelivery()) { - throw new RejectRedeliveredMessageException('Redelivered message from AMQP detected that will be rejected and trigger the retry logic.'); - } + if ($amqpReceivedStamp instanceof AmqpReceivedStamp && $amqpReceivedStamp->getAmqpEnvelope()->isRedelivery()) { + throw new RejectRedeliveredMessageException('Redelivered message from AMQP detected that will be rejected and trigger the retry logic.'); } return $stack->next()->handle($envelope, $stack); diff --git a/src/Symfony/Component/Messenger/Middleware/SendMessageMiddleware.php b/src/Symfony/Component/Messenger/Middleware/SendMessageMiddleware.php index ca1adf8d78..6cfaeb2ef7 100644 --- a/src/Symfony/Component/Messenger/Middleware/SendMessageMiddleware.php +++ b/src/Symfony/Component/Messenger/Middleware/SendMessageMiddleware.php @@ -17,9 +17,7 @@ use Symfony\Component\EventDispatcher\LegacyEventDispatcherProxy; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Event\SendMessageToTransportsEvent; use Symfony\Component\Messenger\Stamp\ReceivedStamp; -use Symfony\Component\Messenger\Stamp\RedeliveryStamp; use Symfony\Component\Messenger\Stamp\SentStamp; -use Symfony\Component\Messenger\Transport\Sender\SenderInterface; use Symfony\Component\Messenger\Transport\Sender\SendersLocatorInterface; use Symfony\Contracts\EventDispatcher\EventDispatcherInterface; @@ -57,12 +55,8 @@ class SendMessageMiddleware implements MiddlewareInterface // it's a received message, do not send it back $this->logger->info('Received message {class}', $context); } else { - /** @var RedeliveryStamp|null $redeliveryStamp */ - $redeliveryStamp = $envelope->last(RedeliveryStamp::class); - - // dispatch event unless this is a redelivery - $shouldDispatchEvent = null === $redeliveryStamp; - foreach ($this->getSenders($envelope, $redeliveryStamp) as $alias => $sender) { + $shouldDispatchEvent = true; + foreach ($this->sendersLocator->getSenders($envelope) as $alias => $sender) { if (null !== $this->eventDispatcher && $shouldDispatchEvent) { $event = new SendMessageToTransportsEvent($envelope); $this->eventDispatcher->dispatch($event); @@ -82,18 +76,4 @@ class SendMessageMiddleware implements MiddlewareInterface // message should only be sent and not be handled by the next middleware return $envelope; } - - /** - * * @return iterable|SenderInterface[] - */ - private function getSenders(Envelope $envelope, ?RedeliveryStamp $redeliveryStamp): iterable - { - if (null !== $redeliveryStamp) { - return [ - $redeliveryStamp->getSenderClassOrAlias() => $this->sendersLocator->getSenderByAlias($redeliveryStamp->getSenderClassOrAlias()), - ]; - } - - return $this->sendersLocator->getSenders($envelope); - } } diff --git a/src/Symfony/Component/Messenger/Stamp/RedeliveryStamp.php b/src/Symfony/Component/Messenger/Stamp/RedeliveryStamp.php index 6a5042a105..49bbf2f778 100644 --- a/src/Symfony/Component/Messenger/Stamp/RedeliveryStamp.php +++ b/src/Symfony/Component/Messenger/Stamp/RedeliveryStamp.php @@ -20,18 +20,13 @@ use Symfony\Component\Messenger\Envelope; final class RedeliveryStamp implements StampInterface { private $retryCount; - private $senderClassOrAlias; private $redeliveredAt; private $exceptionMessage; private $flattenException; - /** - * @param string $senderClassOrAlias Alias from SendersLocator or just the class name - */ - public function __construct(int $retryCount, string $senderClassOrAlias, string $exceptionMessage = null, FlattenException $flattenException = null) + public function __construct(int $retryCount, string $exceptionMessage = null, FlattenException $flattenException = null) { $this->retryCount = $retryCount; - $this->senderClassOrAlias = $senderClassOrAlias; $this->exceptionMessage = $exceptionMessage; $this->flattenException = $flattenException; $this->redeliveredAt = new \DateTimeImmutable(); @@ -50,16 +45,6 @@ final class RedeliveryStamp implements StampInterface return $this->retryCount; } - /** - * The target sender this should be redelivered to. - * - * @internal - */ - public function getSenderClassOrAlias(): string - { - return $this->senderClassOrAlias; - } - public function getExceptionMessage(): ?string { return $this->exceptionMessage; diff --git a/src/Symfony/Component/Messenger/Tests/Command/FailedMessagesShowCommandTest.php b/src/Symfony/Component/Messenger/Tests/Command/FailedMessagesShowCommandTest.php index f632d9890b..1a40e0c532 100644 --- a/src/Symfony/Component/Messenger/Tests/Command/FailedMessagesShowCommandTest.php +++ b/src/Symfony/Component/Messenger/Tests/Command/FailedMessagesShowCommandTest.php @@ -28,7 +28,7 @@ class FailedMessagesShowCommandTest extends TestCase public function testBasicRun() { $sentToFailureStamp = new SentToFailureTransportStamp('async'); - $redeliveryStamp = new RedeliveryStamp(0, 'failure_receiver', 'Things are bad!'); + $redeliveryStamp = new RedeliveryStamp(0, 'Things are bad!'); $envelope = new Envelope(new \stdClass(), [ new TransportMessageIdStamp(15), $sentToFailureStamp, @@ -62,8 +62,8 @@ EOF public function testMultipleRedeliveryFails() { $sentToFailureStamp = new SentToFailureTransportStamp('async'); - $redeliveryStamp1 = new RedeliveryStamp(0, 'failure_receiver', 'Things are bad!'); - $redeliveryStamp2 = new RedeliveryStamp(0, 'failure_receiver'); + $redeliveryStamp1 = new RedeliveryStamp(0, 'Things are bad!'); + $redeliveryStamp2 = new RedeliveryStamp(0); $envelope = new Envelope(new \stdClass(), [ new TransportMessageIdStamp(15), $sentToFailureStamp, diff --git a/src/Symfony/Component/Messenger/Tests/DependencyInjection/MessengerPassTest.php b/src/Symfony/Component/Messenger/Tests/DependencyInjection/MessengerPassTest.php index 1c42c39f82..dd9b70b1f1 100644 --- a/src/Symfony/Component/Messenger/Tests/DependencyInjection/MessengerPassTest.php +++ b/src/Symfony/Component/Messenger/Tests/DependencyInjection/MessengerPassTest.php @@ -281,7 +281,6 @@ class MessengerPassTest extends TestCase $container->register('console.command.messenger_consume_messages', ConsumeMessagesCommand::class)->setArguments([ null, new Reference('messenger.receiver_locator'), - new Reference('messenger.retry_strategy_locator'), null, null, null, diff --git a/src/Symfony/Component/Messenger/Tests/EventListener/SendFailedMessageForRetryListenerTest.php b/src/Symfony/Component/Messenger/Tests/EventListener/SendFailedMessageForRetryListenerTest.php new file mode 100644 index 0000000000..2157f5df0d --- /dev/null +++ b/src/Symfony/Component/Messenger/Tests/EventListener/SendFailedMessageForRetryListenerTest.php @@ -0,0 +1,66 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Tests\EventListener; + +use PHPUnit\Framework\TestCase; +use Psr\Container\ContainerInterface; +use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent; +use Symfony\Component\Messenger\EventListener\SendFailedMessageForRetryListener; +use Symfony\Component\Messenger\Retry\RetryStrategyInterface; +use Symfony\Component\Messenger\Stamp\DelayStamp; +use Symfony\Component\Messenger\Stamp\RedeliveryStamp; +use Symfony\Component\Messenger\Transport\Sender\SenderInterface; + +class SendFailedMessageForRetryListenerTest extends TestCase +{ + public function testNoRetryStrategyCausesNoRetry() + { + $senderLocator = $this->createMock(ContainerInterface::class); + $senderLocator->expects($this->never())->method('has'); + $senderLocator->expects($this->never())->method('get'); + $retryStrategyLocator = $this->createMock(ContainerInterface::class); + $retryStrategyLocator->expects($this->once())->method('has')->willReturn(false); + + $listener = new SendFailedMessageForRetryListener($senderLocator, $retryStrategyLocator); + + $exception = new \Exception('no!'); + $envelope = new Envelope(new \stdClass()); + $event = new WorkerMessageFailedEvent($envelope, 'my_receiver', $exception); + + $listener->onMessageFailed($event); + } + + public function testEnvelopeIsSentToTransportOnRetry() + { + $exception = new \Exception('no!'); + $envelope = new Envelope(new \stdClass()); + + $sender = $this->createMock(SenderInterface::class); + $sender->expects($this->once())->method('send')->with($envelope->with(new DelayStamp(1000), new RedeliveryStamp(1)))->willReturnArgument(0); + $senderLocator = $this->createMock(ContainerInterface::class); + $senderLocator->expects($this->once())->method('has')->willReturn(true); + $senderLocator->expects($this->never())->method('get')->willReturn($sender); + $retryStategy = $this->createMock(RetryStrategyInterface::class); + $retryStategy->expects($this->once())->method('isRetryable')->willReturn(true); + $retryStategy->expects($this->once())->method('getWaitingTime')->willReturn(1000); + $retryStrategyLocator = $this->createMock(ContainerInterface::class); + $retryStrategyLocator->expects($this->once())->method('has')->willReturn(true); + $retryStrategyLocator->expects($this->once())->method('get')->willReturn($retryStategy); + + $listener = new SendFailedMessageForRetryListener($senderLocator, $retryStrategyLocator); + + $event = new WorkerMessageFailedEvent($envelope, 'my_receiver', $exception); + + $listener->onMessageFailed($event); + } +} diff --git a/src/Symfony/Component/Messenger/Tests/EventListener/SendFailedMessageToFailureTransportListenerTest.php b/src/Symfony/Component/Messenger/Tests/EventListener/SendFailedMessageToFailureTransportListenerTest.php index 628a4be3ee..1f648b83e1 100644 --- a/src/Symfony/Component/Messenger/Tests/EventListener/SendFailedMessageToFailureTransportListenerTest.php +++ b/src/Symfony/Component/Messenger/Tests/EventListener/SendFailedMessageToFailureTransportListenerTest.php @@ -9,25 +9,23 @@ * file that was distributed with this source code. */ -namespace Symfony\Component\Messenger\Tests\Handler; +namespace Symfony\Component\Messenger\Tests\EventListener; use PHPUnit\Framework\TestCase; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent; use Symfony\Component\Messenger\EventListener\SendFailedMessageToFailureTransportListener; use Symfony\Component\Messenger\Exception\HandlerFailedException; -use Symfony\Component\Messenger\MessageBusInterface; -use Symfony\Component\Messenger\Stamp\ReceivedStamp; use Symfony\Component\Messenger\Stamp\RedeliveryStamp; use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp; -use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp; +use Symfony\Component\Messenger\Transport\Sender\SenderInterface; class SendFailedMessageToFailureTransportListenerTest extends TestCase { - public function testItDispatchesToTheFailureTransport() + public function testItSendsToTheFailureTransport() { - $bus = $this->createMock(MessageBusInterface::class); - $bus->expects($this->once())->method('dispatch')->with($this->callback(function ($envelope) { + $sender = $this->createMock(SenderInterface::class); + $sender->expects($this->once())->method('send')->with($this->callback(function ($envelope) { /* @var Envelope $envelope */ $this->assertInstanceOf(Envelope::class, $envelope); @@ -38,31 +36,24 @@ class SendFailedMessageToFailureTransportListenerTest extends TestCase /** @var RedeliveryStamp $redeliveryStamp */ $redeliveryStamp = $envelope->last(RedeliveryStamp::class); - $this->assertSame('failure_sender', $redeliveryStamp->getSenderClassOrAlias()); $this->assertSame('no!', $redeliveryStamp->getExceptionMessage()); $this->assertSame('no!', $redeliveryStamp->getFlattenException()->getMessage()); - $this->assertNull($envelope->last(ReceivedStamp::class)); - $this->assertNull($envelope->last(TransportMessageIdStamp::class)); - return true; - }))->willReturn(new Envelope(new \stdClass())); - $listener = new SendFailedMessageToFailureTransportListener( - $bus, - 'failure_sender' - ); + }))->willReturnArgument(0); + $listener = new SendFailedMessageToFailureTransportListener($sender); $exception = new \Exception('no!'); $envelope = new Envelope(new \stdClass()); - $event = new WorkerMessageFailedEvent($envelope, 'my_receiver', $exception, false); + $event = new WorkerMessageFailedEvent($envelope, 'my_receiver', $exception); $listener->onMessageFailed($event); } public function testItGetsNestedHandlerFailedException() { - $bus = $this->createMock(MessageBusInterface::class); - $bus->expects($this->once())->method('dispatch')->with($this->callback(function ($envelope) { + $sender = $this->createMock(SenderInterface::class); + $sender->expects($this->once())->method('send')->with($this->callback(function ($envelope) { /** @var Envelope $envelope */ /** @var RedeliveryStamp $redeliveryStamp */ $redeliveryStamp = $envelope->last(RedeliveryStamp::class); @@ -71,49 +62,41 @@ class SendFailedMessageToFailureTransportListenerTest extends TestCase $this->assertSame('Exception', $redeliveryStamp->getFlattenException()->getClass()); return true; - }))->willReturn(new Envelope(new \stdClass())); + }))->willReturnArgument(0); - $listener = new SendFailedMessageToFailureTransportListener( - $bus, - 'failure_sender' - ); + $listener = new SendFailedMessageToFailureTransportListener($sender); $envelope = new Envelope(new \stdClass()); $exception = new \Exception('I am inside!'); $exception = new HandlerFailedException($envelope, [$exception]); - $event = new WorkerMessageFailedEvent($envelope, 'my_receiver', $exception, false); + $event = new WorkerMessageFailedEvent($envelope, 'my_receiver', $exception); $listener->onMessageFailed($event); } public function testDoNothingOnRetry() { - $bus = $this->createMock(MessageBusInterface::class); - $bus->expects($this->never())->method('dispatch'); - $listener = new SendFailedMessageToFailureTransportListener( - $bus, - 'failure_sender' - ); + $sender = $this->createMock(SenderInterface::class); + $sender->expects($this->never())->method('send'); + $listener = new SendFailedMessageToFailureTransportListener($sender); $envelope = new Envelope(new \stdClass()); - $event = new WorkerMessageFailedEvent($envelope, 'my_receiver', new \Exception(''), true); + $event = new WorkerMessageFailedEvent($envelope, 'my_receiver', new \Exception()); + $event->setForRetry(); $listener->onMessageFailed($event); } public function testDoNotRedeliverToFailed() { - $bus = $this->createMock(MessageBusInterface::class); - $bus->expects($this->never())->method('dispatch'); - $listener = new SendFailedMessageToFailureTransportListener( - $bus, - 'failure_sender' - ); + $sender = $this->createMock(SenderInterface::class); + $sender->expects($this->never())->method('send'); + $listener = new SendFailedMessageToFailureTransportListener($sender); $envelope = new Envelope(new \stdClass(), [ new SentToFailureTransportStamp('my_receiver'), ]); - $event = new WorkerMessageFailedEvent($envelope, 'my_receiver', new \Exception(''), false); + $event = new WorkerMessageFailedEvent($envelope, 'my_receiver', new \Exception()); $listener->onMessageFailed($event); } diff --git a/src/Symfony/Component/Messenger/Tests/FailureIntegrationTest.php b/src/Symfony/Component/Messenger/Tests/FailureIntegrationTest.php index abe78be759..cb3aa4f6c1 100644 --- a/src/Symfony/Component/Messenger/Tests/FailureIntegrationTest.php +++ b/src/Symfony/Component/Messenger/Tests/FailureIntegrationTest.php @@ -13,10 +13,10 @@ namespace Symfony\Component\Messenger\Tests; use PHPUnit\Framework\TestCase; use Psr\Container\ContainerInterface; -use Symfony\Component\DependencyInjection\Container; use Symfony\Component\EventDispatcher\EventDispatcher; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent; +use Symfony\Component\Messenger\EventListener\SendFailedMessageForRetryListener; use Symfony\Component\Messenger\EventListener\SendFailedMessageToFailureTransportListener; use Symfony\Component\Messenger\Exception\HandlerFailedException; use Symfony\Component\Messenger\Handler\HandlerDescriptor; @@ -61,11 +61,18 @@ class FailureIntegrationTest extends TestCase $locator ); + $retryStrategyLocator = $this->createMock(ContainerInterface::class); + $retryStrategyLocator->expects($this->any()) + ->method('has') + ->willReturn(true); + $retryStrategyLocator->expects($this->any()) + ->method('get') + ->willReturn(new MultiplierRetryStrategy(1)); + // using to so we can lazily get the bus later and avoid circular problem $transport1HandlerThatFails = new DummyTestHandler(true); $allTransportHandlerThatWorks = new DummyTestHandler(false); $transport2HandlerThatWorks = new DummyTestHandler(false); - $container = new Container(); $handlerLocator = new HandlersLocator([ DummyMessage::class => [ new HandlerDescriptor($transport1HandlerThatFails, [ @@ -88,17 +95,17 @@ class FailureIntegrationTest extends TestCase new SendMessageMiddleware($senderLocator), new HandleMessageMiddleware($handlerLocator), ]); - $container->set('bus', $bus); - $dispatcher->addSubscriber(new SendFailedMessageToFailureTransportListener($bus, 'the_failure_transport')); + $dispatcher->addSubscriber(new SendFailedMessageForRetryListener($locator, $retryStrategyLocator)); + $dispatcher->addSubscriber(new SendFailedMessageToFailureTransportListener($failureTransport)); - $runWorker = function (string $transportName, int $maxRetries) use ($transports, $bus, $dispatcher): ?\Throwable { + $runWorker = function (string $transportName) use ($transports, $bus, $dispatcher): ?\Throwable { $throwable = null; $failedListener = function (WorkerMessageFailedEvent $event) use (&$throwable) { $throwable = $event->getThrowable(); }; $dispatcher->addListener(WorkerMessageFailedEvent::class, $failedListener); - $worker = new Worker([$transportName => $transports[$transportName]], $bus, [$transportName => new MultiplierRetryStrategy($maxRetries)], $dispatcher); + $worker = new Worker([$transportName => $transports[$transportName]], $bus, $dispatcher); $worker->run([], function (?Envelope $envelope) use ($worker) { // handle one envelope, then stop @@ -126,7 +133,7 @@ class FailureIntegrationTest extends TestCase /* * Receive the message from "transport1" */ - $throwable = $runWorker('transport1', 1); + $throwable = $runWorker('transport1'); // make sure this is failing for the reason we think $this->assertInstanceOf(HandlerFailedException::class, $throwable); // handler for transport1 and all transports were called @@ -140,7 +147,7 @@ class FailureIntegrationTest extends TestCase /* * Receive the message for a (final) retry */ - $runWorker('transport1', 1); + $runWorker('transport1'); // only the "failed" handler is called a 2nd time $this->assertSame(2, $transport1HandlerThatFails->getTimesCalled()); $this->assertSame(1, $allTransportHandlerThatWorks->getTimesCalled()); @@ -160,7 +167,7 @@ class FailureIntegrationTest extends TestCase /* * Failed message is handled, fails, and sent for a retry */ - $throwable = $runWorker('the_failure_transport', 1); + $throwable = $runWorker('the_failure_transport'); // make sure this is failing for the reason we think $this->assertInstanceOf(HandlerFailedException::class, $throwable); // only the "failed" handler is called a 3rd time @@ -175,7 +182,7 @@ class FailureIntegrationTest extends TestCase /* * Message is retried on failure transport then discarded */ - $runWorker('the_failure_transport', 1); + $runWorker('the_failure_transport'); // only the "failed" handler is called a 4th time $this->assertSame(4, $transport1HandlerThatFails->getTimesCalled()); $this->assertSame(1, $allTransportHandlerThatWorks->getTimesCalled()); @@ -185,7 +192,7 @@ class FailureIntegrationTest extends TestCase /* * Execute handlers on transport2 */ - $runWorker('transport2', 1); + $runWorker('transport2'); // transport1 handler is not called again $this->assertSame(4, $transport1HandlerThatFails->getTimesCalled()); // all transport handler is now called again @@ -202,10 +209,10 @@ class FailureIntegrationTest extends TestCase */ $bus->dispatch($envelope); // handle the message, but with no retries - $runWorker('transport1', 0); + $runWorker('transport1'); // now make the handler work! $transport1HandlerThatFails->setShouldThrow(false); - $runWorker('the_failure_transport', 1); + $runWorker('the_failure_transport'); // the failure transport is empty because it worked $this->assertEmpty($failureTransport->getMessagesWaitingToBeReceived()); } diff --git a/src/Symfony/Component/Messenger/Tests/Fixtures/DummyMessageHandlerFailingFirstTimes.php b/src/Symfony/Component/Messenger/Tests/Fixtures/DummyMessageHandlerFailingFirstTimes.php deleted file mode 100644 index 2e97445384..0000000000 --- a/src/Symfony/Component/Messenger/Tests/Fixtures/DummyMessageHandlerFailingFirstTimes.php +++ /dev/null @@ -1,39 +0,0 @@ - - * - * For the full copyright and license information, please view the LICENSE - * file that was distributed with this source code. - */ - -namespace Symfony\Component\Messenger\Tests\Fixtures; - -class DummyMessageHandlerFailingFirstTimes -{ - private $remainingFailures; - - private $called = 0; - - public function __construct(int $throwExceptionOnFirstTries = 0) - { - $this->remainingFailures = $throwExceptionOnFirstTries; - } - - public function __invoke(DummyMessage $message) - { - if ($this->remainingFailures > 0) { - --$this->remainingFailures; - throw new \Exception('Handler should throw Exception.'); - } - - ++$this->called; - } - - public function getTimesCalledWithoutThrowing(): int - { - return $this->called; - } -} diff --git a/src/Symfony/Component/Messenger/Tests/Middleware/SendMessageMiddlewareTest.php b/src/Symfony/Component/Messenger/Tests/Middleware/SendMessageMiddlewareTest.php index 82c3b0ac05..b14d8a60a7 100644 --- a/src/Symfony/Component/Messenger/Tests/Middleware/SendMessageMiddlewareTest.php +++ b/src/Symfony/Component/Messenger/Tests/Middleware/SendMessageMiddlewareTest.php @@ -16,7 +16,6 @@ use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Event\SendMessageToTransportsEvent; use Symfony\Component\Messenger\Middleware\SendMessageMiddleware; use Symfony\Component\Messenger\Stamp\ReceivedStamp; -use Symfony\Component\Messenger\Stamp\RedeliveryStamp; use Symfony\Component\Messenger\Stamp\SentStamp; use Symfony\Component\Messenger\Test\Middleware\MiddlewareTestCase; use Symfony\Component\Messenger\Tests\Fixtures\ChildDummyMessage; @@ -84,31 +83,6 @@ class SendMessageMiddlewareTest extends MiddlewareTestCase $this->assertCount(2, $sentStamps); } - public function testItSendsToOnlyOneSenderOnRedelivery() - { - $envelope = new Envelope(new DummyMessage('Hey'), [new RedeliveryStamp(5, 'bar')]); - $sender = $this->getMockBuilder(SenderInterface::class)->getMock(); - $sender2 = $this->getMockBuilder(SenderInterface::class)->getMock(); - - $sendersLocator = $this->createSendersLocator( - [DummyMessage::class => ['foo', 'bar']], - ['foo' => $sender, 'bar' => $sender2] - ); - - $middleware = new SendMessageMiddleware($sendersLocator); - - $sender->expects($this->never()) - ->method('send') - ; - $sender2->expects($this->once()) - ->method('send') - ->willReturnArgument(0); - - $mockStack = $this->getStackMock(false); // false because next should not be called - $envelope = $middleware->handle($envelope, $mockStack); - $this->assertCount(1, $envelope->all(SentStamp::class)); - } - public function testItSendsTheMessageToAssignedSenderWithPreWrappedMessage() { $envelope = new Envelope(new ChildDummyMessage('Hey')); @@ -223,24 +197,6 @@ class SendMessageMiddlewareTest extends MiddlewareTestCase $middleware->handle($envelope, $this->getStackMock()); } - public function testItDoesNotDispatchOnRedeliver() - { - $envelope = new Envelope(new DummyMessage('original envelope')); - $envelope = $envelope->with(new RedeliveryStamp(3, 'foo_sender')); - - $dispatcher = $this->createMock(EventDispatcherInterface::class); - $dispatcher->expects($this->never())->method('dispatch'); - - $sender = $this->getMockBuilder(SenderInterface::class)->getMock(); - $sender2 = $this->getMockBuilder(SenderInterface::class)->getMock(); - $sender2->expects($this->once())->method('send')->willReturn(new Envelope(new \stdClass())); - - $sendersLocator = $this->createSendersLocator([DummyMessage::class => ['foo']], ['foo' => $sender, 'foo_sender' => $sender2]); - $middleware = new SendMessageMiddleware($sendersLocator, $dispatcher); - - $middleware->handle($envelope, $this->getStackMock(false)); - } - private function createSendersLocator(array $sendersMap, array $senders): SendersLocator { $container = $this->createMock(ContainerInterface::class); diff --git a/src/Symfony/Component/Messenger/Tests/Retry/MultiplierRetryStrategyTest.php b/src/Symfony/Component/Messenger/Tests/Retry/MultiplierRetryStrategyTest.php index e9c6c0bc75..e1572bbbae 100644 --- a/src/Symfony/Component/Messenger/Tests/Retry/MultiplierRetryStrategyTest.php +++ b/src/Symfony/Component/Messenger/Tests/Retry/MultiplierRetryStrategyTest.php @@ -21,7 +21,7 @@ class MultiplierRetryStrategyTest extends TestCase public function testIsRetryable() { $strategy = new MultiplierRetryStrategy(3); - $envelope = new Envelope(new \stdClass(), [new RedeliveryStamp(0, 'sender_alias')]); + $envelope = new Envelope(new \stdClass(), [new RedeliveryStamp(0)]); $this->assertTrue($strategy->isRetryable($envelope)); } @@ -29,7 +29,7 @@ class MultiplierRetryStrategyTest extends TestCase public function testIsNotRetryable() { $strategy = new MultiplierRetryStrategy(3); - $envelope = new Envelope(new \stdClass(), [new RedeliveryStamp(3, 'sender_alias')]); + $envelope = new Envelope(new \stdClass(), [new RedeliveryStamp(3)]); $this->assertFalse($strategy->isRetryable($envelope)); } @@ -37,7 +37,7 @@ class MultiplierRetryStrategyTest extends TestCase public function testIsNotRetryableWithZeroMax() { $strategy = new MultiplierRetryStrategy(0); - $envelope = new Envelope(new \stdClass(), [new RedeliveryStamp(0, 'sender_alias')]); + $envelope = new Envelope(new \stdClass(), [new RedeliveryStamp(0)]); $this->assertFalse($strategy->isRetryable($envelope)); } @@ -55,7 +55,7 @@ class MultiplierRetryStrategyTest extends TestCase public function testGetWaitTime(int $delay, int $multiplier, int $maxDelay, int $previousRetries, int $expectedDelay) { $strategy = new MultiplierRetryStrategy(10, $delay, $multiplier, $maxDelay); - $envelope = new Envelope(new \stdClass(), [new RedeliveryStamp($previousRetries, 'sender_alias')]); + $envelope = new Envelope(new \stdClass(), [new RedeliveryStamp($previousRetries)]); $this->assertSame($expectedDelay, $strategy->getWaitingTime($envelope)); } diff --git a/src/Symfony/Component/Messenger/Tests/RetryIntegrationTest.php b/src/Symfony/Component/Messenger/Tests/RetryIntegrationTest.php deleted file mode 100644 index 2447a0c82b..0000000000 --- a/src/Symfony/Component/Messenger/Tests/RetryIntegrationTest.php +++ /dev/null @@ -1,100 +0,0 @@ - - * - * For the full copyright and license information, please view the LICENSE - * file that was distributed with this source code. - */ - -namespace Symfony\Component\Messenger\Tests; - -use PHPUnit\Framework\TestCase; -use Psr\Container\ContainerInterface; -use Symfony\Component\Messenger\Envelope; -use Symfony\Component\Messenger\Handler\HandlerDescriptor; -use Symfony\Component\Messenger\Handler\HandlersLocator; -use Symfony\Component\Messenger\MessageBus; -use Symfony\Component\Messenger\Middleware\HandleMessageMiddleware; -use Symfony\Component\Messenger\Middleware\SendMessageMiddleware; -use Symfony\Component\Messenger\Retry\MultiplierRetryStrategy; -use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; -use Symfony\Component\Messenger\Tests\Fixtures\DummyMessageHandlerFailingFirstTimes; -use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface; -use Symfony\Component\Messenger\Transport\Sender\SenderInterface; -use Symfony\Component\Messenger\Transport\Sender\SendersLocator; -use Symfony\Component\Messenger\Worker; - -class RetryIntegrationTest extends TestCase -{ - public function testRetryMechanism() - { - $senderAndReceiver = new DummySenderAndReceiver(); - - $senderLocator = $this->createMock(ContainerInterface::class); - $senderLocator->method('has')->with('transportName')->willReturn(true); - $senderLocator->method('get')->with('transportName')->willReturn($senderAndReceiver); - $senderLocator = new SendersLocator([DummyMessage::class => ['transportName']], $senderLocator); - - $handler = new DummyMessageHandlerFailingFirstTimes(0); - $throwingHandler = new DummyMessageHandlerFailingFirstTimes(1); - $handlerLocator = new HandlersLocator([ - DummyMessage::class => [ - new HandlerDescriptor($handler, ['alias' => 'first']), - new HandlerDescriptor($throwingHandler, ['alias' => 'throwing']), - ], - ]); - - // dispatch the message, which will get "sent" and then received by DummySenderAndReceiver - $bus = new MessageBus([new SendMessageMiddleware($senderLocator), new HandleMessageMiddleware($handlerLocator)]); - $envelope = new Envelope(new DummyMessage('API')); - $bus->dispatch($envelope); - - $worker = new Worker(['transportName' => $senderAndReceiver], $bus, ['transportName' => new MultiplierRetryStrategy()]); - $worker->run([], function (?Envelope $envelope) use ($worker) { - if (null === $envelope) { - $worker->stop(); - } - }); - - $this->assertSame(1, $handler->getTimesCalledWithoutThrowing()); - $this->assertSame(1, $throwingHandler->getTimesCalledWithoutThrowing()); - } -} - -class DummySenderAndReceiver implements ReceiverInterface, SenderInterface -{ - private $messagesWaiting = []; - - private $messagesReceived = []; - - public function get(): iterable - { - $message = array_shift($this->messagesWaiting); - - if (null === $message) { - return []; - } - - $this->messagesReceived[] = $message; - - return [$message]; - } - - public function ack(Envelope $envelope): void - { - } - - public function reject(Envelope $envelope): void - { - } - - public function send(Envelope $envelope): Envelope - { - $this->messagesWaiting[] = $envelope; - - return $envelope; - } -} diff --git a/src/Symfony/Component/Messenger/Tests/Stamp/RedeliveryStampTest.php b/src/Symfony/Component/Messenger/Tests/Stamp/RedeliveryStampTest.php index 2b724a5f69..c55a670bf3 100644 --- a/src/Symfony/Component/Messenger/Tests/Stamp/RedeliveryStampTest.php +++ b/src/Symfony/Component/Messenger/Tests/Stamp/RedeliveryStampTest.php @@ -19,9 +19,8 @@ class RedeliveryStampTest extends TestCase { public function testGetters() { - $stamp = new RedeliveryStamp(10, 'sender_alias'); + $stamp = new RedeliveryStamp(10); $this->assertSame(10, $stamp->getRetryCount()); - $this->assertSame('sender_alias', $stamp->getSenderClassOrAlias()); $this->assertInstanceOf(\DateTimeInterface::class, $stamp->getRedeliveredAt()); $this->assertNull($stamp->getExceptionMessage()); $this->assertNull($stamp->getFlattenException()); @@ -30,7 +29,7 @@ class RedeliveryStampTest extends TestCase public function testGettersPopulated() { $flattenException = new FlattenException(); - $stamp = new RedeliveryStamp(10, 'sender_alias', 'exception message', $flattenException); + $stamp = new RedeliveryStamp(10, 'exception message', $flattenException); $this->assertSame('exception message', $stamp->getExceptionMessage()); $this->assertSame($flattenException, $stamp->getFlattenException()); } diff --git a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpExtIntegrationTest.php b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpExtIntegrationTest.php index 9fddc0646c..6d1c1598f2 100644 --- a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpExtIntegrationTest.php +++ b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpExtIntegrationTest.php @@ -95,7 +95,7 @@ class AmqpExtIntegrationTest extends TestCase $envelope = $envelopes[0]; $newEnvelope = $envelope ->with(new DelayStamp(2000)) - ->with(new RedeliveryStamp(1, 'not_important')); + ->with(new RedeliveryStamp(1)); $sender->send($newEnvelope); $receiver->ack($envelope); diff --git a/src/Symfony/Component/Messenger/Tests/Transport/Sender/SendersLocatorTest.php b/src/Symfony/Component/Messenger/Tests/Transport/Sender/SendersLocatorTest.php index d284861c3b..b5c87e58da 100644 --- a/src/Symfony/Component/Messenger/Tests/Transport/Sender/SendersLocatorTest.php +++ b/src/Symfony/Component/Messenger/Tests/Transport/Sender/SendersLocatorTest.php @@ -14,7 +14,6 @@ namespace Symfony\Component\Messenger\Tests\Transport\Sender; use PHPUnit\Framework\TestCase; use Psr\Container\ContainerInterface; use Symfony\Component\Messenger\Envelope; -use Symfony\Component\Messenger\Exception\UnknownSenderException; use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; use Symfony\Component\Messenger\Tests\Fixtures\SecondMessage; use Symfony\Component\Messenger\Transport\Sender\SenderInterface; @@ -36,35 +35,6 @@ class SendersLocatorTest extends TestCase $this->assertSame([], iterator_to_array($locator->getSenders(new Envelope(new SecondMessage())))); } - public function testGetSenderByAlias() - { - $sender1 = $this->getMockBuilder(SenderInterface::class)->getMock(); - $sender2 = $this->getMockBuilder(SenderInterface::class)->getMock(); - $sendersLocator = $this->createContainer([ - 'sender1' => $sender1, - 'sender2' => $sender2, - ]); - - $locator = new SendersLocator([], $sendersLocator); - - $this->assertSame($sender1, $locator->getSenderByAlias('sender1')); - $this->assertSame($sender2, $locator->getSenderByAlias('sender2')); - } - - public function testGetSenderByAliasThrowsException() - { - $this->expectException(UnknownSenderException::class); - $this->expectExceptionMessage('Unknown sender alias'); - - $sender1 = $this->getMockBuilder(SenderInterface::class)->getMock(); - $sendersLocator = $this->createContainer([ - 'sender1' => $sender1, - ]); - - $locator = new SendersLocator([], $sendersLocator); - $locator->getSenderByAlias('sender2'); - } - /** * @group legacy */ diff --git a/src/Symfony/Component/Messenger/Tests/WorkerTest.php b/src/Symfony/Component/Messenger/Tests/WorkerTest.php index c4d6c782af..6eba4efcae 100644 --- a/src/Symfony/Component/Messenger/Tests/WorkerTest.php +++ b/src/Symfony/Component/Messenger/Tests/WorkerTest.php @@ -17,13 +17,9 @@ use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent; use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent; use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent; use Symfony\Component\Messenger\Event\WorkerStoppedEvent; -use Symfony\Component\Messenger\Exception\HandlerFailedException; -use Symfony\Component\Messenger\Exception\UnrecoverableMessageHandlingException; use Symfony\Component\Messenger\MessageBusInterface; -use Symfony\Component\Messenger\Retry\RetryStrategyInterface; use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp; use Symfony\Component\Messenger\Stamp\ReceivedStamp; -use Symfony\Component\Messenger\Stamp\RedeliveryStamp; use Symfony\Component\Messenger\Stamp\SentStamp; use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface; @@ -48,12 +44,12 @@ class WorkerTest extends TestCase $bus = $this->getMockBuilder(MessageBusInterface::class)->getMock(); $bus->expects($this->at(0))->method('dispatch')->with( - $envelope = new Envelope($apiMessage, [new ReceivedStamp('transport'), new ConsumedByWorkerStamp()]) - )->willReturn($envelope); + new Envelope($apiMessage, [new ReceivedStamp('transport'), new ConsumedByWorkerStamp()]) + )->willReturnArgument(0); $bus->expects($this->at(1))->method('dispatch')->with( - $envelope = new Envelope($ipaMessage, [new ReceivedStamp('transport'), new ConsumedByWorkerStamp()]) - )->willReturn($envelope); + new Envelope($ipaMessage, [new ReceivedStamp('transport'), new ConsumedByWorkerStamp()]) + )->willReturnArgument(0); $worker = new Worker(['transport' => $receiver], $bus); $worker->run([], function (?Envelope $envelope) use ($worker) { @@ -66,95 +62,7 @@ class WorkerTest extends TestCase $this->assertSame(2, $receiver->getAcknowledgeCount()); } - public function testWorkerDoesNotWrapMessagesAlreadyWrappedWithReceivedMessage() - { - $envelope = new Envelope(new DummyMessage('API')); - $receiver = new DummyReceiver([[$envelope]]); - $envelope = $envelope->with(new ReceivedStamp('transport'), new ConsumedByWorkerStamp()); - - $bus = $this->getMockBuilder(MessageBusInterface::class)->getMock(); - $bus->expects($this->at(0))->method('dispatch')->with($envelope)->willReturn($envelope); - - $worker = new Worker(['transport' => $receiver], $bus, []); - $worker->run([], function (?Envelope $envelope) use ($worker) { - // stop after the messages finish - if (null === $envelope) { - $worker->stop(); - } - }); - } - - public function testDispatchCausesRetry() - { - $receiver = new DummyReceiver([ - [new Envelope(new DummyMessage('Hello'), [new SentStamp('Some\Sender', 'transport1')])], - ]); - - $bus = $this->getMockBuilder(MessageBusInterface::class)->getMock(); - $bus->expects($this->at(0))->method('dispatch')->willThrowException(new \InvalidArgumentException('Why not')); - - // 2nd call will be the retry - $bus->expects($this->at(1))->method('dispatch')->with($this->callback(function (Envelope $envelope) { - /** @var RedeliveryStamp|null $redeliveryStamp */ - $redeliveryStamp = $envelope->last(RedeliveryStamp::class); - $this->assertNotNull($redeliveryStamp); - // retry count now at 1 - $this->assertSame(1, $redeliveryStamp->getRetryCount()); - $this->assertSame('transport1', $redeliveryStamp->getSenderClassOrAlias()); - - // received stamp is removed - $this->assertNull($envelope->last(ReceivedStamp::class)); - - return true; - }))->willReturnArgument(0); - - $retryStrategy = $this->getMockBuilder(RetryStrategyInterface::class)->getMock(); - $retryStrategy->expects($this->once())->method('isRetryable')->willReturn(true); - - $worker = new Worker(['transport1' => $receiver], $bus, ['transport1' => $retryStrategy]); - $worker->run([], function (?Envelope $envelope) use ($worker) { - // stop after the messages finish - if (null === $envelope) { - $worker->stop(); - } - }); - - // old message rejected - $this->assertSame(1, $receiver->getRejectCount()); - } - - public function testUnrecoverableMessageHandlingExceptionPreventsRetries() - { - $envelope1 = new Envelope(new DummyMessage('Unwrapped Exception'), [new SentStamp('Some\Sender', 'transport1')]); - $envelope2 = new Envelope(new DummyMessage('Wrapped Exception'), [new SentStamp('Some\Sender', 'transport1')]); - - $receiver = new DummyReceiver([ - [$envelope1], - [$envelope2], - ]); - - $bus = $this->getMockBuilder(MessageBusInterface::class)->getMock(); - $bus->expects($this->at(0))->method('dispatch')->willThrowException(new UnrecoverableMessageHandlingException()); - $bus->expects($this->at(1))->method('dispatch')->willThrowException( - new HandlerFailedException($envelope2, [new UnrecoverableMessageHandlingException()]) - ); - - $retryStrategy = $this->getMockBuilder(RetryStrategyInterface::class)->getMock(); - $retryStrategy->expects($this->never())->method('isRetryable')->willReturn(true); - - $worker = new Worker(['transport1' => $receiver], $bus, ['transport1' => $retryStrategy]); - $worker->run([], function (?Envelope $envelope) use ($worker) { - // stop after the messages finish - if (null === $envelope) { - $worker->stop(); - } - }); - - // message was rejected - $this->assertSame(2, $receiver->getRejectCount()); - } - - public function testDispatchCausesRejectWhenNoRetry() + public function testHandlingErrorCausesReject() { $receiver = new DummyReceiver([ [new Envelope(new DummyMessage('Hello'), [new SentStamp('Some\Sender', 'transport1')])], @@ -163,10 +71,7 @@ class WorkerTest extends TestCase $bus = $this->getMockBuilder(MessageBusInterface::class)->getMock(); $bus->method('dispatch')->willThrowException(new \InvalidArgumentException('Why not')); - $retryStrategy = $this->getMockBuilder(RetryStrategyInterface::class)->getMock(); - $retryStrategy->expects($this->once())->method('isRetryable')->willReturn(false); - - $worker = new Worker(['transport1' => $receiver], $bus, ['transport1' => $retryStrategy]); + $worker = new Worker(['transport1' => $receiver], $bus); $worker->run([], function (?Envelope $envelope) use ($worker) { // stop after the messages finish if (null === $envelope) { @@ -177,28 +82,6 @@ class WorkerTest extends TestCase $this->assertSame(0, $receiver->getAcknowledgeCount()); } - public function testDispatchCausesRejectOnUnrecoverableMessage() - { - $receiver = new DummyReceiver([ - [new Envelope(new DummyMessage('Hello'))], - ]); - - $bus = $this->getMockBuilder(MessageBusInterface::class)->getMock(); - $bus->method('dispatch')->willThrowException(new UnrecoverableMessageHandlingException('Will never work')); - - $retryStrategy = $this->getMockBuilder(RetryStrategyInterface::class)->getMock(); - $retryStrategy->expects($this->never())->method('isRetryable'); - - $worker = new Worker(['transport1' => $receiver], $bus, ['transport1' => $retryStrategy]); - $worker->run([], function (?Envelope $envelope) use ($worker) { - // stop after the messages finish - if (null === $envelope) { - $worker->stop(); - } - }); - $this->assertSame(1, $receiver->getRejectCount()); - } - public function testWorkerDoesNotSendNullMessagesToTheBus() { $receiver = new DummyReceiver([ @@ -235,7 +118,7 @@ class WorkerTest extends TestCase [$this->isInstanceOf(WorkerStoppedEvent::class)] ); - $worker = new Worker([$receiver], $bus, [], $eventDispatcher); + $worker = new Worker([$receiver], $bus, $eventDispatcher); $worker->run([], function (?Envelope $envelope) use ($worker) { // stop after the messages finish if (null === $envelope) { @@ -263,7 +146,7 @@ class WorkerTest extends TestCase [$this->isInstanceOf(WorkerStoppedEvent::class)] ); - $worker = new Worker([$receiver], $bus, [], $eventDispatcher); + $worker = new Worker([$receiver], $bus, $eventDispatcher); $worker->run([], function (?Envelope $envelope) use ($worker) { // stop after the messages finish if (null === $envelope) { diff --git a/src/Symfony/Component/Messenger/Transport/Sender/SendersLocator.php b/src/Symfony/Component/Messenger/Transport/Sender/SendersLocator.php index 41fc2f3ade..7f64851b9d 100644 --- a/src/Symfony/Component/Messenger/Transport/Sender/SendersLocator.php +++ b/src/Symfony/Component/Messenger/Transport/Sender/SendersLocator.php @@ -15,7 +15,6 @@ use Psr\Container\ContainerInterface; use Symfony\Component\DependencyInjection\ServiceLocator; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Exception\RuntimeException; -use Symfony\Component\Messenger\Exception\UnknownSenderException; use Symfony\Component\Messenger\Handler\HandlersLocator; /** @@ -79,13 +78,4 @@ class SendersLocator implements SendersLocatorInterface } } } - - public function getSenderByAlias(string $alias): SenderInterface - { - if ($this->sendersLocator->has($alias)) { - return $this->sendersLocator->get($alias); - } - - throw new UnknownSenderException(sprintf('Unknown sender alias "%s".', $alias)); - } } diff --git a/src/Symfony/Component/Messenger/Transport/Sender/SendersLocatorInterface.php b/src/Symfony/Component/Messenger/Transport/Sender/SendersLocatorInterface.php index e74f82f986..916f780d74 100644 --- a/src/Symfony/Component/Messenger/Transport/Sender/SendersLocatorInterface.php +++ b/src/Symfony/Component/Messenger/Transport/Sender/SendersLocatorInterface.php @@ -12,7 +12,6 @@ namespace Symfony\Component\Messenger\Transport\Sender; use Symfony\Component\Messenger\Envelope; -use Symfony\Component\Messenger\Exception\UnknownSenderException; /** * Maps a message to a list of senders. @@ -28,13 +27,4 @@ interface SendersLocatorInterface * @return iterable|SenderInterface[] Indexed by sender alias if available */ public function getSenders(Envelope $envelope): iterable; - - /** - * Returns a specific sender by its alias. - * - * @param string $alias The alias given to the sender in getSenders() - * - * @throws UnknownSenderException If the sender is not found - */ - public function getSenderByAlias(string $alias): SenderInterface; } diff --git a/src/Symfony/Component/Messenger/Worker.php b/src/Symfony/Component/Messenger/Worker.php index 1765fbec2a..6fcce7e368 100644 --- a/src/Symfony/Component/Messenger/Worker.php +++ b/src/Symfony/Component/Messenger/Worker.php @@ -19,12 +19,8 @@ use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent; use Symfony\Component\Messenger\Event\WorkerStoppedEvent; use Symfony\Component\Messenger\Exception\HandlerFailedException; use Symfony\Component\Messenger\Exception\RejectRedeliveredMessageException; -use Symfony\Component\Messenger\Exception\UnrecoverableExceptionInterface; -use Symfony\Component\Messenger\Retry\RetryStrategyInterface; use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp; -use Symfony\Component\Messenger\Stamp\DelayStamp; use Symfony\Component\Messenger\Stamp\ReceivedStamp; -use Symfony\Component\Messenger\Stamp\RedeliveryStamp; use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface; use Symfony\Contracts\EventDispatcher\EventDispatcherInterface; @@ -37,20 +33,17 @@ class Worker implements WorkerInterface { private $receivers; private $bus; - private $retryStrategies; private $eventDispatcher; private $logger; private $shouldStop = false; /** - * @param ReceiverInterface[] $receivers Where the key is the transport name - * @param RetryStrategyInterface[] $retryStrategies Retry strategies for each receiver (array keys must match) + * @param ReceiverInterface[] $receivers Where the key is the transport name */ - public function __construct(array $receivers, MessageBusInterface $bus, array $retryStrategies = [], EventDispatcherInterface $eventDispatcher = null, LoggerInterface $logger = null) + public function __construct(array $receivers, MessageBusInterface $bus, EventDispatcherInterface $eventDispatcher = null, LoggerInterface $logger = null) { $this->receivers = $receivers; $this->bus = $bus; - $this->retryStrategies = $retryStrategies; $this->eventDispatcher = LegacyEventDispatcherProxy::decorate($eventDispatcher); $this->logger = $logger; } @@ -91,7 +84,7 @@ class Worker implements WorkerInterface foreach ($envelopes as $envelope) { $envelopeHandled = true; - $this->handleMessage($envelope, $receiver, $transportName, $this->retryStrategies[$transportName] ?? null); + $this->handleMessage($envelope, $receiver, $transportName); $onHandled($envelope); if ($this->shouldStop) { @@ -117,7 +110,7 @@ class Worker implements WorkerInterface $this->dispatchEvent(new WorkerStoppedEvent()); } - private function handleMessage(Envelope $envelope, ReceiverInterface $receiver, string $transportName, ?RetryStrategyInterface $retryStrategy): void + private function handleMessage(Envelope $envelope, ReceiverInterface $receiver, string $transportName): void { $event = new WorkerMessageReceivedEvent($envelope, $transportName); $this->dispatchEvent($event); @@ -126,12 +119,6 @@ class Worker implements WorkerInterface return; } - $message = $envelope->getMessage(); - $context = [ - 'message' => $message, - 'class' => \get_class($message), - ]; - try { $envelope = $this->bus->dispatch($envelope->with(new ReceivedStamp($transportName), new ConsumedByWorkerStamp())); } catch (\Throwable $throwable) { @@ -146,30 +133,7 @@ class Worker implements WorkerInterface $envelope = $throwable->getEnvelope(); } - $shouldRetry = $retryStrategy && $this->shouldRetry($throwable, $envelope, $retryStrategy); - - $this->dispatchEvent(new WorkerMessageFailedEvent($envelope, $transportName, $throwable, $shouldRetry)); - - $retryCount = RedeliveryStamp::getRetryCountFromEnvelope($envelope); - if ($shouldRetry) { - ++$retryCount; - $delay = $retryStrategy->getWaitingTime($envelope); - if (null !== $this->logger) { - $this->logger->error('Error thrown while handling message {class}. Dispatching for retry #{retryCount} using {delay} ms delay. Error: "{error}"', $context + ['retryCount' => $retryCount, 'delay' => $delay, 'error' => $throwable->getMessage(), 'exception' => $throwable]); - } - - // add the delay and retry stamp info + remove ReceivedStamp - $retryEnvelope = $envelope->with(new DelayStamp($delay)) - ->with(new RedeliveryStamp($retryCount, $transportName)) - ->withoutAll(ReceivedStamp::class); - - // re-send the message for retry - $this->bus->dispatch($retryEnvelope); - } else { - if (null !== $this->logger) { - $this->logger->critical('Error thrown while handling message {class}. Removing from transport after {retryCount} retries. Error: "{error}"', $context + ['retryCount' => $retryCount, 'error' => $throwable->getMessage(), 'exception' => $throwable]); - } - } + $this->dispatchEvent(new WorkerMessageFailedEvent($envelope, $transportName, $throwable)); if (!$rejectFirst) { $receiver->reject($envelope); @@ -181,6 +145,11 @@ class Worker implements WorkerInterface $this->dispatchEvent(new WorkerMessageHandledEvent($envelope, $transportName)); if (null !== $this->logger) { + $message = $envelope->getMessage(); + $context = [ + 'message' => $message, + 'class' => \get_class($message), + ]; $this->logger->info('{class} was handled successfully (acknowledging to transport).', $context); } @@ -200,27 +169,4 @@ class Worker implements WorkerInterface $this->eventDispatcher->dispatch($event); } - - private function shouldRetry(\Throwable $e, Envelope $envelope, RetryStrategyInterface $retryStrategy): bool - { - // if ALL nested Exceptions are an instance of UnrecoverableExceptionInterface we should not retry - if ($e instanceof HandlerFailedException) { - $shouldNotRetry = true; - foreach ($e->getNestedExceptions() as $nestedException) { - if (!$nestedException instanceof UnrecoverableExceptionInterface) { - $shouldNotRetry = false; - break; - } - } - if ($shouldNotRetry) { - return false; - } - } - - if ($e instanceof UnrecoverableExceptionInterface) { - return false; - } - - return $retryStrategy->isRetryable($envelope); - } }