diff --git a/UPGRADE-4.4.md b/UPGRADE-4.4.md
index 6144bc150d..9a86328526 100644
--- a/UPGRADE-4.4.md
+++ b/UPGRADE-4.4.md
@@ -154,6 +154,11 @@ Messenger
* Deprecated passing a `ContainerInterface` instance as first argument of the `ConsumeMessagesCommand` constructor,
pass a `RoutableMessageBus` instance instead.
+ * [BC BREAK] Removed `SendersLocatorInterface::getSenderByAlias` added in 4.3.
+ * [BC BREAK] Removed `$retryStrategies` argument from `Worker::__construct`.
+ * [BC BREAK] Removed `$retryStrategyLocator` argument from `ConsumeMessagesCommand::__construct`.
+ * [BC BREAK] Removed `$senderClassOrAlias` argument from `RedeliveryStamp::__construct`.
+ * [BC BREAK] Removed `UnknownSenderException`.
Mime
----
diff --git a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php
index e58b800864..0d395d59c9 100644
--- a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php
+++ b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php
@@ -1810,20 +1810,29 @@ class FrameworkExtension extends Extension
$messageToSendersMapping[$message] = $messageConfiguration['senders'];
}
+ $sendersServiceLocator = ServiceLocatorTagPass::register($container, $senderReferences);
+
$container->getDefinition('messenger.senders_locator')
->replaceArgument(0, $messageToSendersMapping)
- ->replaceArgument(1, ServiceLocatorTagPass::register($container, $senderReferences))
+ ->replaceArgument(1, $sendersServiceLocator)
+ ;
+
+ $container->getDefinition('messenger.retry.send_failed_message_for_retry_listener')
+ ->replaceArgument(0, $sendersServiceLocator)
;
$container->getDefinition('messenger.retry_strategy_locator')
->replaceArgument(0, $transportRetryReferences);
if ($config['failure_transport']) {
+ if (!isset($senderReferences[$config['failure_transport']])) {
+ throw new LogicException(sprintf('Invalid Messenger configuration: the failure transport "%s" is not a valid transport or service id.', $config['failure_transport']));
+ }
+
$container->getDefinition('messenger.failure.send_failed_message_to_failure_transport_listener')
- ->replaceArgument(1, $config['failure_transport']);
+ ->replaceArgument(0, $senderReferences[$config['failure_transport']]);
$container->getDefinition('console.command.messenger_failed_messages_retry')
- ->replaceArgument(0, $config['failure_transport'])
- ->replaceArgument(4, $transportRetryReferences[$config['failure_transport']] ?? null);
+ ->replaceArgument(0, $config['failure_transport']);
$container->getDefinition('console.command.messenger_failed_messages_show')
->replaceArgument(0, $config['failure_transport']);
$container->getDefinition('console.command.messenger_failed_messages_remove')
diff --git a/src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml b/src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml
index 9a586e0a58..5da94a907a 100644
--- a/src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml
+++ b/src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml
@@ -86,7 +86,6 @@
-
@@ -116,10 +115,9 @@
-
+
-
@@ -127,14 +125,14 @@
-
+
-
+
diff --git a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml
index 2284dd7963..82014fe5fb 100644
--- a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml
+++ b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml
@@ -10,7 +10,7 @@
-
+
@@ -98,12 +98,19 @@
+
+
+
+
+
+
+
+
-
-
+
diff --git a/src/Symfony/Bundle/FrameworkBundle/composer.json b/src/Symfony/Bundle/FrameworkBundle/composer.json
index ebba20c08f..e3faf21102 100644
--- a/src/Symfony/Bundle/FrameworkBundle/composer.json
+++ b/src/Symfony/Bundle/FrameworkBundle/composer.json
@@ -45,7 +45,7 @@
"symfony/http-client": "^4.4|^5.0",
"symfony/lock": "^4.4|^5.0",
"symfony/mailer": "^4.4|^5.0",
- "symfony/messenger": "^4.3.6|^5.0",
+ "symfony/messenger": "^4.4|^5.0",
"symfony/mime": "^4.4|^5.0",
"symfony/process": "^3.4|^4.0|^5.0",
"symfony/security-csrf": "^3.4|^4.0|^5.0",
@@ -77,7 +77,7 @@
"symfony/form": "<4.3",
"symfony/lock": "<4.4",
"symfony/mailer": "<4.4",
- "symfony/messenger": "<4.3.6",
+ "symfony/messenger": "<4.4",
"symfony/mime": "<4.4",
"symfony/property-info": "<3.4",
"symfony/security-bundle": "<4.4",
diff --git a/src/Symfony/Component/Messenger/CHANGELOG.md b/src/Symfony/Component/Messenger/CHANGELOG.md
index 30e195f7ee..1e7fef642d 100644
--- a/src/Symfony/Component/Messenger/CHANGELOG.md
+++ b/src/Symfony/Component/Messenger/CHANGELOG.md
@@ -11,6 +11,12 @@ CHANGELOG
* Made all dispatched worker event classes final.
* Added support for `from_transport` attribute on `messenger.message_handler` tag.
* Added support for passing `dbindex` as a query parameter to the redis transport DSN.
+ * [BC BREAK] Removed `SendersLocatorInterface::getSenderByAlias` added in 4.3.
+ * [BC BREAK] Removed `$retryStrategies` argument from `Worker::__construct`.
+ * [BC BREAK] Removed `$retryStrategyLocator` argument from `ConsumeMessagesCommand::__construct`.
+ * [BC BREAK] Removed `$senderClassOrAlias` argument from `RedeliveryStamp::__construct`.
+ * [BC BREAK] Removed `UnknownSenderException`.
+ * The component is not marked as `@experimental` anymore.
4.3.0
-----
diff --git a/src/Symfony/Component/Messenger/Command/AbstractFailedMessagesCommand.php b/src/Symfony/Component/Messenger/Command/AbstractFailedMessagesCommand.php
index 2dde1af659..a95eb1fd41 100644
--- a/src/Symfony/Component/Messenger/Command/AbstractFailedMessagesCommand.php
+++ b/src/Symfony/Component/Messenger/Command/AbstractFailedMessagesCommand.php
@@ -89,7 +89,7 @@ abstract class AbstractFailedMessagesCommand extends Command
$redeliveryStamps = $envelope->all(RedeliveryStamp::class);
$io->writeln(' Message history:');
foreach ($redeliveryStamps as $redeliveryStamp) {
- $io->writeln(sprintf(' * Message failed and redelivered to the %s transport at %s', $redeliveryStamp->getSenderClassOrAlias(), $redeliveryStamp->getRedeliveredAt()->format('Y-m-d H:i:s')));
+ $io->writeln(sprintf(' * Message failed at %s and was redelivered', $redeliveryStamp->getRedeliveredAt()->format('Y-m-d H:i:s')));
}
$io->newLine();
diff --git a/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php b/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php
index 4e99703870..da2a51acd9 100644
--- a/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php
+++ b/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php
@@ -42,7 +42,6 @@ class ConsumeMessagesCommand extends Command
private $receiverLocator;
private $logger;
private $receiverNames;
- private $retryStrategyLocator;
private $eventDispatcher;
/** @var CacheItemPoolInterface|null */
private $restartSignalCachePool;
@@ -50,7 +49,7 @@ class ConsumeMessagesCommand extends Command
/**
* @param RoutableMessageBus $routableBus
*/
- public function __construct($routableBus, ContainerInterface $receiverLocator, LoggerInterface $logger = null, array $receiverNames = [], /* ContainerInterface */ $retryStrategyLocator = null, EventDispatcherInterface $eventDispatcher = null)
+ public function __construct($routableBus, ContainerInterface $receiverLocator, LoggerInterface $logger = null, array $receiverNames = [], /* EventDispatcherInterface */ $eventDispatcher = null)
{
if ($routableBus instanceof ContainerInterface) {
@trigger_error(sprintf('Passing a "%s" instance as first argument to "%s()" is deprecated since Symfony 4.4, pass a "%s" instance instead.', ContainerInterface::class, __METHOD__, RoutableMessageBus::class), E_USER_DEPRECATED);
@@ -59,17 +58,16 @@ class ConsumeMessagesCommand extends Command
throw new \TypeError(sprintf('The first argument must be an instance of "%s".', RoutableMessageBus::class));
}
- if (\is_array($retryStrategyLocator)) {
- @trigger_error(sprintf('The 5th argument of the class "%s" should be a retry-strategy locator, an array of bus names as a value is deprecated since Symfony 4.3.', __CLASS__), E_USER_DEPRECATED);
+ if (null !== $eventDispatcher && !$eventDispatcher instanceof EventDispatcherInterface) {
+ @trigger_error(sprintf('The 5th argument of the class "%s" should be a "%s"', __CLASS__, EventDispatcherInterface::class), E_USER_DEPRECATED);
- $retryStrategyLocator = null;
+ $eventDispatcher = null;
}
$this->routableBus = $routableBus;
$this->receiverLocator = $receiverLocator;
$this->logger = $logger;
$this->receiverNames = $receiverNames;
- $this->retryStrategyLocator = $retryStrategyLocator;
$this->eventDispatcher = $eventDispatcher;
parent::__construct();
@@ -166,7 +164,6 @@ EOF
}
$receivers = [];
- $retryStrategies = [];
foreach ($receiverNames = $input->getArgument('receivers') as $receiverName) {
if (!$this->receiverLocator->has($receiverName)) {
$message = sprintf('The receiver "%s" does not exist.', $receiverName);
@@ -177,17 +174,12 @@ EOF
throw new RuntimeException($message);
}
- if (null !== $this->retryStrategyLocator && !$this->retryStrategyLocator->has($receiverName)) {
- throw new RuntimeException(sprintf('Receiver "%s" does not have a configured retry strategy.', $receiverName));
- }
-
$receivers[$receiverName] = $this->receiverLocator->get($receiverName);
- $retryStrategies[$receiverName] = null !== $this->retryStrategyLocator ? $this->retryStrategyLocator->get($receiverName) : null;
}
$bus = $input->getOption('bus') ? $this->routableBus->getMessageBus($input->getOption('bus')) : $this->routableBus;
- $worker = new Worker($receivers, $bus, $retryStrategies, $this->eventDispatcher, $this->logger);
+ $worker = new Worker($receivers, $bus, $this->eventDispatcher, $this->logger);
$stopsWhen = [];
if ($limit = $input->getOption('limit')) {
$stopsWhen[] = "processed {$limit} messages";
diff --git a/src/Symfony/Component/Messenger/Command/FailedMessagesRetryCommand.php b/src/Symfony/Component/Messenger/Command/FailedMessagesRetryCommand.php
index c8981b59ad..4a0cc44191 100644
--- a/src/Symfony/Component/Messenger/Command/FailedMessagesRetryCommand.php
+++ b/src/Symfony/Component/Messenger/Command/FailedMessagesRetryCommand.php
@@ -24,7 +24,6 @@ use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
use Symfony\Component\Messenger\Exception\LogicException;
use Symfony\Component\Messenger\MessageBusInterface;
-use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\Messenger\Transport\Receiver\SingleMessageReceiver;
@@ -39,14 +38,12 @@ class FailedMessagesRetryCommand extends AbstractFailedMessagesCommand
private $eventDispatcher;
private $messageBus;
- private $retryStrategy;
private $logger;
- public function __construct(string $receiverName, ReceiverInterface $receiver, MessageBusInterface $messageBus, EventDispatcherInterface $eventDispatcher, RetryStrategyInterface $retryStrategy = null, LoggerInterface $logger = null)
+ public function __construct(string $receiverName, ReceiverInterface $receiver, MessageBusInterface $messageBus, EventDispatcherInterface $eventDispatcher, LoggerInterface $logger = null)
{
$this->eventDispatcher = $eventDispatcher;
$this->messageBus = $messageBus;
- $this->retryStrategy = $retryStrategy;
$this->logger = $logger;
parent::__construct($receiverName, $receiver);
@@ -180,7 +177,6 @@ EOF
$worker = new Worker(
[$this->getReceiverName() => $receiver],
$this->messageBus,
- [$this->getReceiverName() => $this->retryStrategy],
$this->eventDispatcher,
$this->logger
);
diff --git a/src/Symfony/Component/Messenger/Event/WorkerMessageFailedEvent.php b/src/Symfony/Component/Messenger/Event/WorkerMessageFailedEvent.php
index 5867979699..7fb858c87c 100644
--- a/src/Symfony/Component/Messenger/Event/WorkerMessageFailedEvent.php
+++ b/src/Symfony/Component/Messenger/Event/WorkerMessageFailedEvent.php
@@ -21,12 +21,11 @@ use Symfony\Component\Messenger\Envelope;
final class WorkerMessageFailedEvent extends AbstractWorkerMessageEvent
{
private $throwable;
- private $willRetry;
+ private $willRetry = false;
- public function __construct(Envelope $envelope, string $receiverName, \Throwable $error, bool $willRetry)
+ public function __construct(Envelope $envelope, string $receiverName, \Throwable $error)
{
$this->throwable = $error;
- $this->willRetry = $willRetry;
parent::__construct($envelope, $receiverName);
}
@@ -40,4 +39,9 @@ final class WorkerMessageFailedEvent extends AbstractWorkerMessageEvent
{
return $this->willRetry;
}
+
+ public function setForRetry(): void
+ {
+ $this->willRetry = true;
+ }
}
diff --git a/src/Symfony/Component/Messenger/EventListener/SendFailedMessageForRetryListener.php b/src/Symfony/Component/Messenger/EventListener/SendFailedMessageForRetryListener.php
new file mode 100644
index 0000000000..5e654b51d4
--- /dev/null
+++ b/src/Symfony/Component/Messenger/EventListener/SendFailedMessageForRetryListener.php
@@ -0,0 +1,126 @@
+
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Symfony\Component\Messenger\EventListener;
+
+use Psr\Container\ContainerInterface;
+use Psr\Log\LoggerInterface;
+use Symfony\Component\EventDispatcher\EventSubscriberInterface;
+use Symfony\Component\Messenger\Envelope;
+use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
+use Symfony\Component\Messenger\Exception\HandlerFailedException;
+use Symfony\Component\Messenger\Exception\RuntimeException;
+use Symfony\Component\Messenger\Exception\UnrecoverableExceptionInterface;
+use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
+use Symfony\Component\Messenger\Stamp\DelayStamp;
+use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
+use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
+
+/**
+ * @author Tobias Schultze
+ */
+class SendFailedMessageForRetryListener implements EventSubscriberInterface
+{
+ private $sendersLocator;
+ private $retryStrategyLocator;
+ private $logger;
+
+ public function __construct(ContainerInterface $sendersLocator, ContainerInterface $retryStrategyLocator, LoggerInterface $logger = null)
+ {
+ $this->sendersLocator = $sendersLocator;
+ $this->retryStrategyLocator = $retryStrategyLocator;
+ $this->logger = $logger;
+ }
+
+ public function onMessageFailed(WorkerMessageFailedEvent $event)
+ {
+ $retryStrategy = $this->getRetryStrategyForTransport($event->getReceiverName());
+ $envelope = $event->getEnvelope();
+ $throwable = $event->getThrowable();
+
+ $message = $envelope->getMessage();
+ $context = [
+ 'message' => $message,
+ 'class' => \get_class($message),
+ ];
+
+ $shouldRetry = $retryStrategy && $this->shouldRetry($throwable, $envelope, $retryStrategy);
+
+ $retryCount = RedeliveryStamp::getRetryCountFromEnvelope($envelope);
+ if ($shouldRetry) {
+ $event->setForRetry();
+
+ ++$retryCount;
+ $delay = $retryStrategy->getWaitingTime($envelope);
+ if (null !== $this->logger) {
+ $this->logger->error('Error thrown while handling message {class}. Sending for retry #{retryCount} using {delay} ms delay. Error: "{error}"', $context + ['retryCount' => $retryCount, 'delay' => $delay, 'error' => $throwable->getMessage(), 'exception' => $throwable]);
+ }
+
+ // add the delay and retry stamp info
+ $retryEnvelope = $envelope->with(new DelayStamp($delay), new RedeliveryStamp($retryCount));
+
+ // re-send the message for retry
+ $this->getSenderForTransport($event->getReceiverName())->send($retryEnvelope);
+ } else {
+ if (null !== $this->logger) {
+ $this->logger->critical('Error thrown while handling message {class}. Removing from transport after {retryCount} retries. Error: "{error}"', $context + ['retryCount' => $retryCount, 'error' => $throwable->getMessage(), 'exception' => $throwable]);
+ }
+ }
+ }
+
+ public static function getSubscribedEvents()
+ {
+ return [
+ // must have higher priority than SendFailedMessageToFailureTransportListener
+ WorkerMessageFailedEvent::class => ['onMessageFailed', 100],
+ ];
+ }
+
+ private function shouldRetry(\Throwable $e, Envelope $envelope, RetryStrategyInterface $retryStrategy): bool
+ {
+ // if ALL nested Exceptions are an instance of UnrecoverableExceptionInterface we should not retry
+ if ($e instanceof HandlerFailedException) {
+ $shouldNotRetry = true;
+ foreach ($e->getNestedExceptions() as $nestedException) {
+ if (!$nestedException instanceof UnrecoverableExceptionInterface) {
+ $shouldNotRetry = false;
+ break;
+ }
+ }
+ if ($shouldNotRetry) {
+ return false;
+ }
+ }
+
+ if ($e instanceof UnrecoverableExceptionInterface) {
+ return false;
+ }
+
+ return $retryStrategy->isRetryable($envelope);
+ }
+
+ private function getRetryStrategyForTransport(string $alias): ?RetryStrategyInterface
+ {
+ if ($this->retryStrategyLocator->has($alias)) {
+ return $this->retryStrategyLocator->get($alias);
+ }
+
+ return null;
+ }
+
+ private function getSenderForTransport(string $alias): SenderInterface
+ {
+ if ($this->sendersLocator->has($alias)) {
+ return $this->sendersLocator->get($alias);
+ }
+
+ throw new RuntimeException(sprintf('Could not find sender "%s" based on the same receiver to send the failed message to for retry.', $alias));
+ }
+}
diff --git a/src/Symfony/Component/Messenger/EventListener/SendFailedMessageToFailureTransportListener.php b/src/Symfony/Component/Messenger/EventListener/SendFailedMessageToFailureTransportListener.php
index 2e431e654a..0aa8fd5ea7 100644
--- a/src/Symfony/Component/Messenger/EventListener/SendFailedMessageToFailureTransportListener.php
+++ b/src/Symfony/Component/Messenger/EventListener/SendFailedMessageToFailureTransportListener.php
@@ -15,12 +15,10 @@ use Symfony\Component\ErrorRenderer\Exception\FlattenException;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
use Symfony\Component\Messenger\Exception\HandlerFailedException;
-use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\DelayStamp;
-use Symfony\Component\Messenger\Stamp\ReceivedStamp;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
-use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
+use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
/**
* Sends a rejected message to a "failure transport".
@@ -29,14 +27,12 @@ use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
*/
class SendFailedMessageToFailureTransportListener implements EventSubscriberInterface
{
- private $messageBus;
- private $failureSenderAlias;
+ private $failureSender;
private $logger;
- public function __construct(MessageBusInterface $messageBus, string $failureSenderAlias, LoggerInterface $logger = null)
+ public function __construct(SenderInterface $failureSender, LoggerInterface $logger = null)
{
- $this->messageBus = $messageBus;
- $this->failureSenderAlias = $failureSenderAlias;
+ $this->failureSender = $failureSender;
$this->logger = $logger;
}
@@ -53,27 +49,26 @@ class SendFailedMessageToFailureTransportListener implements EventSubscriberInte
return;
}
- // remove the received stamp so it's redelivered
$throwable = $event->getThrowable();
if ($throwable instanceof HandlerFailedException) {
$throwable = $throwable->getNestedExceptions()[0];
}
$flattenedException = class_exists(FlattenException::class) ? FlattenException::createFromThrowable($throwable) : null;
- $envelope = $envelope->withoutAll(ReceivedStamp::class)
- ->withoutAll(TransportMessageIdStamp::class)
- ->with(new SentToFailureTransportStamp($event->getReceiverName()))
- ->with(new DelayStamp(0))
- ->with(new RedeliveryStamp(0, $this->failureSenderAlias, $throwable->getMessage(), $flattenedException));
+ $envelope = $envelope->with(
+ new SentToFailureTransportStamp($event->getReceiverName()),
+ new DelayStamp(0),
+ new RedeliveryStamp(0, $throwable->getMessage(), $flattenedException)
+ );
if (null !== $this->logger) {
$this->logger->info('Rejected message {class} will be sent to the failure transport {transport}.', [
'class' => \get_class($envelope->getMessage()),
- 'transport' => $this->failureSenderAlias,
+ 'transport' => \get_class($this->failureSender),
]);
}
- $this->messageBus->dispatch($envelope);
+ $this->failureSender->send($envelope);
}
public static function getSubscribedEvents()
diff --git a/src/Symfony/Component/Messenger/Exception/UnknownSenderException.php b/src/Symfony/Component/Messenger/Exception/UnknownSenderException.php
deleted file mode 100644
index 63555c8ee9..0000000000
--- a/src/Symfony/Component/Messenger/Exception/UnknownSenderException.php
+++ /dev/null
@@ -1,19 +0,0 @@
-
- *
- * For the full copyright and license information, please view the LICENSE
- * file that was distributed with this source code.
- */
-
-namespace Symfony\Component\Messenger\Exception;
-
-/**
- * @author Ryan Weaver
- */
-class UnknownSenderException extends InvalidArgumentException
-{
-}
diff --git a/src/Symfony/Component/Messenger/Middleware/RejectRedeliveredMessageMiddleware.php b/src/Symfony/Component/Messenger/Middleware/RejectRedeliveredMessageMiddleware.php
index 548197242d..d036790ddb 100644
--- a/src/Symfony/Component/Messenger/Middleware/RejectRedeliveredMessageMiddleware.php
+++ b/src/Symfony/Component/Messenger/Middleware/RejectRedeliveredMessageMiddleware.php
@@ -13,7 +13,6 @@ namespace Symfony\Component\Messenger\Middleware;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\RejectRedeliveredMessageException;
-use Symfony\Component\Messenger\Stamp\ReceivedStamp;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceivedStamp;
/**
@@ -34,13 +33,10 @@ class RejectRedeliveredMessageMiddleware implements MiddlewareInterface
{
public function handle(Envelope $envelope, StackInterface $stack): Envelope
{
- // ignore the dispatched messages for retry
- if (null !== $envelope->last(ReceivedStamp::class)) {
- $amqpReceivedStamp = $envelope->last(AmqpReceivedStamp::class);
+ $amqpReceivedStamp = $envelope->last(AmqpReceivedStamp::class);
- if ($amqpReceivedStamp instanceof AmqpReceivedStamp && $amqpReceivedStamp->getAmqpEnvelope()->isRedelivery()) {
- throw new RejectRedeliveredMessageException('Redelivered message from AMQP detected that will be rejected and trigger the retry logic.');
- }
+ if ($amqpReceivedStamp instanceof AmqpReceivedStamp && $amqpReceivedStamp->getAmqpEnvelope()->isRedelivery()) {
+ throw new RejectRedeliveredMessageException('Redelivered message from AMQP detected that will be rejected and trigger the retry logic.');
}
return $stack->next()->handle($envelope, $stack);
diff --git a/src/Symfony/Component/Messenger/Middleware/SendMessageMiddleware.php b/src/Symfony/Component/Messenger/Middleware/SendMessageMiddleware.php
index ca1adf8d78..6cfaeb2ef7 100644
--- a/src/Symfony/Component/Messenger/Middleware/SendMessageMiddleware.php
+++ b/src/Symfony/Component/Messenger/Middleware/SendMessageMiddleware.php
@@ -17,9 +17,7 @@ use Symfony\Component\EventDispatcher\LegacyEventDispatcherProxy;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Event\SendMessageToTransportsEvent;
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
-use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Stamp\SentStamp;
-use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
use Symfony\Component\Messenger\Transport\Sender\SendersLocatorInterface;
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
@@ -57,12 +55,8 @@ class SendMessageMiddleware implements MiddlewareInterface
// it's a received message, do not send it back
$this->logger->info('Received message {class}', $context);
} else {
- /** @var RedeliveryStamp|null $redeliveryStamp */
- $redeliveryStamp = $envelope->last(RedeliveryStamp::class);
-
- // dispatch event unless this is a redelivery
- $shouldDispatchEvent = null === $redeliveryStamp;
- foreach ($this->getSenders($envelope, $redeliveryStamp) as $alias => $sender) {
+ $shouldDispatchEvent = true;
+ foreach ($this->sendersLocator->getSenders($envelope) as $alias => $sender) {
if (null !== $this->eventDispatcher && $shouldDispatchEvent) {
$event = new SendMessageToTransportsEvent($envelope);
$this->eventDispatcher->dispatch($event);
@@ -82,18 +76,4 @@ class SendMessageMiddleware implements MiddlewareInterface
// message should only be sent and not be handled by the next middleware
return $envelope;
}
-
- /**
- * * @return iterable|SenderInterface[]
- */
- private function getSenders(Envelope $envelope, ?RedeliveryStamp $redeliveryStamp): iterable
- {
- if (null !== $redeliveryStamp) {
- return [
- $redeliveryStamp->getSenderClassOrAlias() => $this->sendersLocator->getSenderByAlias($redeliveryStamp->getSenderClassOrAlias()),
- ];
- }
-
- return $this->sendersLocator->getSenders($envelope);
- }
}
diff --git a/src/Symfony/Component/Messenger/Stamp/RedeliveryStamp.php b/src/Symfony/Component/Messenger/Stamp/RedeliveryStamp.php
index 6a5042a105..49bbf2f778 100644
--- a/src/Symfony/Component/Messenger/Stamp/RedeliveryStamp.php
+++ b/src/Symfony/Component/Messenger/Stamp/RedeliveryStamp.php
@@ -20,18 +20,13 @@ use Symfony\Component\Messenger\Envelope;
final class RedeliveryStamp implements StampInterface
{
private $retryCount;
- private $senderClassOrAlias;
private $redeliveredAt;
private $exceptionMessage;
private $flattenException;
- /**
- * @param string $senderClassOrAlias Alias from SendersLocator or just the class name
- */
- public function __construct(int $retryCount, string $senderClassOrAlias, string $exceptionMessage = null, FlattenException $flattenException = null)
+ public function __construct(int $retryCount, string $exceptionMessage = null, FlattenException $flattenException = null)
{
$this->retryCount = $retryCount;
- $this->senderClassOrAlias = $senderClassOrAlias;
$this->exceptionMessage = $exceptionMessage;
$this->flattenException = $flattenException;
$this->redeliveredAt = new \DateTimeImmutable();
@@ -50,16 +45,6 @@ final class RedeliveryStamp implements StampInterface
return $this->retryCount;
}
- /**
- * The target sender this should be redelivered to.
- *
- * @internal
- */
- public function getSenderClassOrAlias(): string
- {
- return $this->senderClassOrAlias;
- }
-
public function getExceptionMessage(): ?string
{
return $this->exceptionMessage;
diff --git a/src/Symfony/Component/Messenger/Tests/Command/FailedMessagesShowCommandTest.php b/src/Symfony/Component/Messenger/Tests/Command/FailedMessagesShowCommandTest.php
index f632d9890b..1a40e0c532 100644
--- a/src/Symfony/Component/Messenger/Tests/Command/FailedMessagesShowCommandTest.php
+++ b/src/Symfony/Component/Messenger/Tests/Command/FailedMessagesShowCommandTest.php
@@ -28,7 +28,7 @@ class FailedMessagesShowCommandTest extends TestCase
public function testBasicRun()
{
$sentToFailureStamp = new SentToFailureTransportStamp('async');
- $redeliveryStamp = new RedeliveryStamp(0, 'failure_receiver', 'Things are bad!');
+ $redeliveryStamp = new RedeliveryStamp(0, 'Things are bad!');
$envelope = new Envelope(new \stdClass(), [
new TransportMessageIdStamp(15),
$sentToFailureStamp,
@@ -62,8 +62,8 @@ EOF
public function testMultipleRedeliveryFails()
{
$sentToFailureStamp = new SentToFailureTransportStamp('async');
- $redeliveryStamp1 = new RedeliveryStamp(0, 'failure_receiver', 'Things are bad!');
- $redeliveryStamp2 = new RedeliveryStamp(0, 'failure_receiver');
+ $redeliveryStamp1 = new RedeliveryStamp(0, 'Things are bad!');
+ $redeliveryStamp2 = new RedeliveryStamp(0);
$envelope = new Envelope(new \stdClass(), [
new TransportMessageIdStamp(15),
$sentToFailureStamp,
diff --git a/src/Symfony/Component/Messenger/Tests/DependencyInjection/MessengerPassTest.php b/src/Symfony/Component/Messenger/Tests/DependencyInjection/MessengerPassTest.php
index 1c42c39f82..dd9b70b1f1 100644
--- a/src/Symfony/Component/Messenger/Tests/DependencyInjection/MessengerPassTest.php
+++ b/src/Symfony/Component/Messenger/Tests/DependencyInjection/MessengerPassTest.php
@@ -281,7 +281,6 @@ class MessengerPassTest extends TestCase
$container->register('console.command.messenger_consume_messages', ConsumeMessagesCommand::class)->setArguments([
null,
new Reference('messenger.receiver_locator'),
- new Reference('messenger.retry_strategy_locator'),
null,
null,
null,
diff --git a/src/Symfony/Component/Messenger/Tests/EventListener/SendFailedMessageForRetryListenerTest.php b/src/Symfony/Component/Messenger/Tests/EventListener/SendFailedMessageForRetryListenerTest.php
new file mode 100644
index 0000000000..2157f5df0d
--- /dev/null
+++ b/src/Symfony/Component/Messenger/Tests/EventListener/SendFailedMessageForRetryListenerTest.php
@@ -0,0 +1,66 @@
+
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Symfony\Component\Messenger\Tests\EventListener;
+
+use PHPUnit\Framework\TestCase;
+use Psr\Container\ContainerInterface;
+use Symfony\Component\Messenger\Envelope;
+use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
+use Symfony\Component\Messenger\EventListener\SendFailedMessageForRetryListener;
+use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
+use Symfony\Component\Messenger\Stamp\DelayStamp;
+use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
+use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
+
+class SendFailedMessageForRetryListenerTest extends TestCase
+{
+ public function testNoRetryStrategyCausesNoRetry()
+ {
+ $senderLocator = $this->createMock(ContainerInterface::class);
+ $senderLocator->expects($this->never())->method('has');
+ $senderLocator->expects($this->never())->method('get');
+ $retryStrategyLocator = $this->createMock(ContainerInterface::class);
+ $retryStrategyLocator->expects($this->once())->method('has')->willReturn(false);
+
+ $listener = new SendFailedMessageForRetryListener($senderLocator, $retryStrategyLocator);
+
+ $exception = new \Exception('no!');
+ $envelope = new Envelope(new \stdClass());
+ $event = new WorkerMessageFailedEvent($envelope, 'my_receiver', $exception);
+
+ $listener->onMessageFailed($event);
+ }
+
+ public function testEnvelopeIsSentToTransportOnRetry()
+ {
+ $exception = new \Exception('no!');
+ $envelope = new Envelope(new \stdClass());
+
+ $sender = $this->createMock(SenderInterface::class);
+ $sender->expects($this->once())->method('send')->with($envelope->with(new DelayStamp(1000), new RedeliveryStamp(1)))->willReturnArgument(0);
+ $senderLocator = $this->createMock(ContainerInterface::class);
+ $senderLocator->expects($this->once())->method('has')->willReturn(true);
+ $senderLocator->expects($this->never())->method('get')->willReturn($sender);
+ $retryStategy = $this->createMock(RetryStrategyInterface::class);
+ $retryStategy->expects($this->once())->method('isRetryable')->willReturn(true);
+ $retryStategy->expects($this->once())->method('getWaitingTime')->willReturn(1000);
+ $retryStrategyLocator = $this->createMock(ContainerInterface::class);
+ $retryStrategyLocator->expects($this->once())->method('has')->willReturn(true);
+ $retryStrategyLocator->expects($this->once())->method('get')->willReturn($retryStategy);
+
+ $listener = new SendFailedMessageForRetryListener($senderLocator, $retryStrategyLocator);
+
+ $event = new WorkerMessageFailedEvent($envelope, 'my_receiver', $exception);
+
+ $listener->onMessageFailed($event);
+ }
+}
diff --git a/src/Symfony/Component/Messenger/Tests/EventListener/SendFailedMessageToFailureTransportListenerTest.php b/src/Symfony/Component/Messenger/Tests/EventListener/SendFailedMessageToFailureTransportListenerTest.php
index 628a4be3ee..1f648b83e1 100644
--- a/src/Symfony/Component/Messenger/Tests/EventListener/SendFailedMessageToFailureTransportListenerTest.php
+++ b/src/Symfony/Component/Messenger/Tests/EventListener/SendFailedMessageToFailureTransportListenerTest.php
@@ -9,25 +9,23 @@
* file that was distributed with this source code.
*/
-namespace Symfony\Component\Messenger\Tests\Handler;
+namespace Symfony\Component\Messenger\Tests\EventListener;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
use Symfony\Component\Messenger\EventListener\SendFailedMessageToFailureTransportListener;
use Symfony\Component\Messenger\Exception\HandlerFailedException;
-use Symfony\Component\Messenger\MessageBusInterface;
-use Symfony\Component\Messenger\Stamp\ReceivedStamp;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
-use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
+use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
class SendFailedMessageToFailureTransportListenerTest extends TestCase
{
- public function testItDispatchesToTheFailureTransport()
+ public function testItSendsToTheFailureTransport()
{
- $bus = $this->createMock(MessageBusInterface::class);
- $bus->expects($this->once())->method('dispatch')->with($this->callback(function ($envelope) {
+ $sender = $this->createMock(SenderInterface::class);
+ $sender->expects($this->once())->method('send')->with($this->callback(function ($envelope) {
/* @var Envelope $envelope */
$this->assertInstanceOf(Envelope::class, $envelope);
@@ -38,31 +36,24 @@ class SendFailedMessageToFailureTransportListenerTest extends TestCase
/** @var RedeliveryStamp $redeliveryStamp */
$redeliveryStamp = $envelope->last(RedeliveryStamp::class);
- $this->assertSame('failure_sender', $redeliveryStamp->getSenderClassOrAlias());
$this->assertSame('no!', $redeliveryStamp->getExceptionMessage());
$this->assertSame('no!', $redeliveryStamp->getFlattenException()->getMessage());
- $this->assertNull($envelope->last(ReceivedStamp::class));
- $this->assertNull($envelope->last(TransportMessageIdStamp::class));
-
return true;
- }))->willReturn(new Envelope(new \stdClass()));
- $listener = new SendFailedMessageToFailureTransportListener(
- $bus,
- 'failure_sender'
- );
+ }))->willReturnArgument(0);
+ $listener = new SendFailedMessageToFailureTransportListener($sender);
$exception = new \Exception('no!');
$envelope = new Envelope(new \stdClass());
- $event = new WorkerMessageFailedEvent($envelope, 'my_receiver', $exception, false);
+ $event = new WorkerMessageFailedEvent($envelope, 'my_receiver', $exception);
$listener->onMessageFailed($event);
}
public function testItGetsNestedHandlerFailedException()
{
- $bus = $this->createMock(MessageBusInterface::class);
- $bus->expects($this->once())->method('dispatch')->with($this->callback(function ($envelope) {
+ $sender = $this->createMock(SenderInterface::class);
+ $sender->expects($this->once())->method('send')->with($this->callback(function ($envelope) {
/** @var Envelope $envelope */
/** @var RedeliveryStamp $redeliveryStamp */
$redeliveryStamp = $envelope->last(RedeliveryStamp::class);
@@ -71,49 +62,41 @@ class SendFailedMessageToFailureTransportListenerTest extends TestCase
$this->assertSame('Exception', $redeliveryStamp->getFlattenException()->getClass());
return true;
- }))->willReturn(new Envelope(new \stdClass()));
+ }))->willReturnArgument(0);
- $listener = new SendFailedMessageToFailureTransportListener(
- $bus,
- 'failure_sender'
- );
+ $listener = new SendFailedMessageToFailureTransportListener($sender);
$envelope = new Envelope(new \stdClass());
$exception = new \Exception('I am inside!');
$exception = new HandlerFailedException($envelope, [$exception]);
- $event = new WorkerMessageFailedEvent($envelope, 'my_receiver', $exception, false);
+ $event = new WorkerMessageFailedEvent($envelope, 'my_receiver', $exception);
$listener->onMessageFailed($event);
}
public function testDoNothingOnRetry()
{
- $bus = $this->createMock(MessageBusInterface::class);
- $bus->expects($this->never())->method('dispatch');
- $listener = new SendFailedMessageToFailureTransportListener(
- $bus,
- 'failure_sender'
- );
+ $sender = $this->createMock(SenderInterface::class);
+ $sender->expects($this->never())->method('send');
+ $listener = new SendFailedMessageToFailureTransportListener($sender);
$envelope = new Envelope(new \stdClass());
- $event = new WorkerMessageFailedEvent($envelope, 'my_receiver', new \Exception(''), true);
+ $event = new WorkerMessageFailedEvent($envelope, 'my_receiver', new \Exception());
+ $event->setForRetry();
$listener->onMessageFailed($event);
}
public function testDoNotRedeliverToFailed()
{
- $bus = $this->createMock(MessageBusInterface::class);
- $bus->expects($this->never())->method('dispatch');
- $listener = new SendFailedMessageToFailureTransportListener(
- $bus,
- 'failure_sender'
- );
+ $sender = $this->createMock(SenderInterface::class);
+ $sender->expects($this->never())->method('send');
+ $listener = new SendFailedMessageToFailureTransportListener($sender);
$envelope = new Envelope(new \stdClass(), [
new SentToFailureTransportStamp('my_receiver'),
]);
- $event = new WorkerMessageFailedEvent($envelope, 'my_receiver', new \Exception(''), false);
+ $event = new WorkerMessageFailedEvent($envelope, 'my_receiver', new \Exception());
$listener->onMessageFailed($event);
}
diff --git a/src/Symfony/Component/Messenger/Tests/FailureIntegrationTest.php b/src/Symfony/Component/Messenger/Tests/FailureIntegrationTest.php
index abe78be759..cb3aa4f6c1 100644
--- a/src/Symfony/Component/Messenger/Tests/FailureIntegrationTest.php
+++ b/src/Symfony/Component/Messenger/Tests/FailureIntegrationTest.php
@@ -13,10 +13,10 @@ namespace Symfony\Component\Messenger\Tests;
use PHPUnit\Framework\TestCase;
use Psr\Container\ContainerInterface;
-use Symfony\Component\DependencyInjection\Container;
use Symfony\Component\EventDispatcher\EventDispatcher;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
+use Symfony\Component\Messenger\EventListener\SendFailedMessageForRetryListener;
use Symfony\Component\Messenger\EventListener\SendFailedMessageToFailureTransportListener;
use Symfony\Component\Messenger\Exception\HandlerFailedException;
use Symfony\Component\Messenger\Handler\HandlerDescriptor;
@@ -61,11 +61,18 @@ class FailureIntegrationTest extends TestCase
$locator
);
+ $retryStrategyLocator = $this->createMock(ContainerInterface::class);
+ $retryStrategyLocator->expects($this->any())
+ ->method('has')
+ ->willReturn(true);
+ $retryStrategyLocator->expects($this->any())
+ ->method('get')
+ ->willReturn(new MultiplierRetryStrategy(1));
+
// using to so we can lazily get the bus later and avoid circular problem
$transport1HandlerThatFails = new DummyTestHandler(true);
$allTransportHandlerThatWorks = new DummyTestHandler(false);
$transport2HandlerThatWorks = new DummyTestHandler(false);
- $container = new Container();
$handlerLocator = new HandlersLocator([
DummyMessage::class => [
new HandlerDescriptor($transport1HandlerThatFails, [
@@ -88,17 +95,17 @@ class FailureIntegrationTest extends TestCase
new SendMessageMiddleware($senderLocator),
new HandleMessageMiddleware($handlerLocator),
]);
- $container->set('bus', $bus);
- $dispatcher->addSubscriber(new SendFailedMessageToFailureTransportListener($bus, 'the_failure_transport'));
+ $dispatcher->addSubscriber(new SendFailedMessageForRetryListener($locator, $retryStrategyLocator));
+ $dispatcher->addSubscriber(new SendFailedMessageToFailureTransportListener($failureTransport));
- $runWorker = function (string $transportName, int $maxRetries) use ($transports, $bus, $dispatcher): ?\Throwable {
+ $runWorker = function (string $transportName) use ($transports, $bus, $dispatcher): ?\Throwable {
$throwable = null;
$failedListener = function (WorkerMessageFailedEvent $event) use (&$throwable) {
$throwable = $event->getThrowable();
};
$dispatcher->addListener(WorkerMessageFailedEvent::class, $failedListener);
- $worker = new Worker([$transportName => $transports[$transportName]], $bus, [$transportName => new MultiplierRetryStrategy($maxRetries)], $dispatcher);
+ $worker = new Worker([$transportName => $transports[$transportName]], $bus, $dispatcher);
$worker->run([], function (?Envelope $envelope) use ($worker) {
// handle one envelope, then stop
@@ -126,7 +133,7 @@ class FailureIntegrationTest extends TestCase
/*
* Receive the message from "transport1"
*/
- $throwable = $runWorker('transport1', 1);
+ $throwable = $runWorker('transport1');
// make sure this is failing for the reason we think
$this->assertInstanceOf(HandlerFailedException::class, $throwable);
// handler for transport1 and all transports were called
@@ -140,7 +147,7 @@ class FailureIntegrationTest extends TestCase
/*
* Receive the message for a (final) retry
*/
- $runWorker('transport1', 1);
+ $runWorker('transport1');
// only the "failed" handler is called a 2nd time
$this->assertSame(2, $transport1HandlerThatFails->getTimesCalled());
$this->assertSame(1, $allTransportHandlerThatWorks->getTimesCalled());
@@ -160,7 +167,7 @@ class FailureIntegrationTest extends TestCase
/*
* Failed message is handled, fails, and sent for a retry
*/
- $throwable = $runWorker('the_failure_transport', 1);
+ $throwable = $runWorker('the_failure_transport');
// make sure this is failing for the reason we think
$this->assertInstanceOf(HandlerFailedException::class, $throwable);
// only the "failed" handler is called a 3rd time
@@ -175,7 +182,7 @@ class FailureIntegrationTest extends TestCase
/*
* Message is retried on failure transport then discarded
*/
- $runWorker('the_failure_transport', 1);
+ $runWorker('the_failure_transport');
// only the "failed" handler is called a 4th time
$this->assertSame(4, $transport1HandlerThatFails->getTimesCalled());
$this->assertSame(1, $allTransportHandlerThatWorks->getTimesCalled());
@@ -185,7 +192,7 @@ class FailureIntegrationTest extends TestCase
/*
* Execute handlers on transport2
*/
- $runWorker('transport2', 1);
+ $runWorker('transport2');
// transport1 handler is not called again
$this->assertSame(4, $transport1HandlerThatFails->getTimesCalled());
// all transport handler is now called again
@@ -202,10 +209,10 @@ class FailureIntegrationTest extends TestCase
*/
$bus->dispatch($envelope);
// handle the message, but with no retries
- $runWorker('transport1', 0);
+ $runWorker('transport1');
// now make the handler work!
$transport1HandlerThatFails->setShouldThrow(false);
- $runWorker('the_failure_transport', 1);
+ $runWorker('the_failure_transport');
// the failure transport is empty because it worked
$this->assertEmpty($failureTransport->getMessagesWaitingToBeReceived());
}
diff --git a/src/Symfony/Component/Messenger/Tests/Fixtures/DummyMessageHandlerFailingFirstTimes.php b/src/Symfony/Component/Messenger/Tests/Fixtures/DummyMessageHandlerFailingFirstTimes.php
deleted file mode 100644
index 2e97445384..0000000000
--- a/src/Symfony/Component/Messenger/Tests/Fixtures/DummyMessageHandlerFailingFirstTimes.php
+++ /dev/null
@@ -1,39 +0,0 @@
-
- *
- * For the full copyright and license information, please view the LICENSE
- * file that was distributed with this source code.
- */
-
-namespace Symfony\Component\Messenger\Tests\Fixtures;
-
-class DummyMessageHandlerFailingFirstTimes
-{
- private $remainingFailures;
-
- private $called = 0;
-
- public function __construct(int $throwExceptionOnFirstTries = 0)
- {
- $this->remainingFailures = $throwExceptionOnFirstTries;
- }
-
- public function __invoke(DummyMessage $message)
- {
- if ($this->remainingFailures > 0) {
- --$this->remainingFailures;
- throw new \Exception('Handler should throw Exception.');
- }
-
- ++$this->called;
- }
-
- public function getTimesCalledWithoutThrowing(): int
- {
- return $this->called;
- }
-}
diff --git a/src/Symfony/Component/Messenger/Tests/Middleware/SendMessageMiddlewareTest.php b/src/Symfony/Component/Messenger/Tests/Middleware/SendMessageMiddlewareTest.php
index 82c3b0ac05..b14d8a60a7 100644
--- a/src/Symfony/Component/Messenger/Tests/Middleware/SendMessageMiddlewareTest.php
+++ b/src/Symfony/Component/Messenger/Tests/Middleware/SendMessageMiddlewareTest.php
@@ -16,7 +16,6 @@ use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Event\SendMessageToTransportsEvent;
use Symfony\Component\Messenger\Middleware\SendMessageMiddleware;
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
-use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Stamp\SentStamp;
use Symfony\Component\Messenger\Test\Middleware\MiddlewareTestCase;
use Symfony\Component\Messenger\Tests\Fixtures\ChildDummyMessage;
@@ -84,31 +83,6 @@ class SendMessageMiddlewareTest extends MiddlewareTestCase
$this->assertCount(2, $sentStamps);
}
- public function testItSendsToOnlyOneSenderOnRedelivery()
- {
- $envelope = new Envelope(new DummyMessage('Hey'), [new RedeliveryStamp(5, 'bar')]);
- $sender = $this->getMockBuilder(SenderInterface::class)->getMock();
- $sender2 = $this->getMockBuilder(SenderInterface::class)->getMock();
-
- $sendersLocator = $this->createSendersLocator(
- [DummyMessage::class => ['foo', 'bar']],
- ['foo' => $sender, 'bar' => $sender2]
- );
-
- $middleware = new SendMessageMiddleware($sendersLocator);
-
- $sender->expects($this->never())
- ->method('send')
- ;
- $sender2->expects($this->once())
- ->method('send')
- ->willReturnArgument(0);
-
- $mockStack = $this->getStackMock(false); // false because next should not be called
- $envelope = $middleware->handle($envelope, $mockStack);
- $this->assertCount(1, $envelope->all(SentStamp::class));
- }
-
public function testItSendsTheMessageToAssignedSenderWithPreWrappedMessage()
{
$envelope = new Envelope(new ChildDummyMessage('Hey'));
@@ -223,24 +197,6 @@ class SendMessageMiddlewareTest extends MiddlewareTestCase
$middleware->handle($envelope, $this->getStackMock());
}
- public function testItDoesNotDispatchOnRedeliver()
- {
- $envelope = new Envelope(new DummyMessage('original envelope'));
- $envelope = $envelope->with(new RedeliveryStamp(3, 'foo_sender'));
-
- $dispatcher = $this->createMock(EventDispatcherInterface::class);
- $dispatcher->expects($this->never())->method('dispatch');
-
- $sender = $this->getMockBuilder(SenderInterface::class)->getMock();
- $sender2 = $this->getMockBuilder(SenderInterface::class)->getMock();
- $sender2->expects($this->once())->method('send')->willReturn(new Envelope(new \stdClass()));
-
- $sendersLocator = $this->createSendersLocator([DummyMessage::class => ['foo']], ['foo' => $sender, 'foo_sender' => $sender2]);
- $middleware = new SendMessageMiddleware($sendersLocator, $dispatcher);
-
- $middleware->handle($envelope, $this->getStackMock(false));
- }
-
private function createSendersLocator(array $sendersMap, array $senders): SendersLocator
{
$container = $this->createMock(ContainerInterface::class);
diff --git a/src/Symfony/Component/Messenger/Tests/Retry/MultiplierRetryStrategyTest.php b/src/Symfony/Component/Messenger/Tests/Retry/MultiplierRetryStrategyTest.php
index e9c6c0bc75..e1572bbbae 100644
--- a/src/Symfony/Component/Messenger/Tests/Retry/MultiplierRetryStrategyTest.php
+++ b/src/Symfony/Component/Messenger/Tests/Retry/MultiplierRetryStrategyTest.php
@@ -21,7 +21,7 @@ class MultiplierRetryStrategyTest extends TestCase
public function testIsRetryable()
{
$strategy = new MultiplierRetryStrategy(3);
- $envelope = new Envelope(new \stdClass(), [new RedeliveryStamp(0, 'sender_alias')]);
+ $envelope = new Envelope(new \stdClass(), [new RedeliveryStamp(0)]);
$this->assertTrue($strategy->isRetryable($envelope));
}
@@ -29,7 +29,7 @@ class MultiplierRetryStrategyTest extends TestCase
public function testIsNotRetryable()
{
$strategy = new MultiplierRetryStrategy(3);
- $envelope = new Envelope(new \stdClass(), [new RedeliveryStamp(3, 'sender_alias')]);
+ $envelope = new Envelope(new \stdClass(), [new RedeliveryStamp(3)]);
$this->assertFalse($strategy->isRetryable($envelope));
}
@@ -37,7 +37,7 @@ class MultiplierRetryStrategyTest extends TestCase
public function testIsNotRetryableWithZeroMax()
{
$strategy = new MultiplierRetryStrategy(0);
- $envelope = new Envelope(new \stdClass(), [new RedeliveryStamp(0, 'sender_alias')]);
+ $envelope = new Envelope(new \stdClass(), [new RedeliveryStamp(0)]);
$this->assertFalse($strategy->isRetryable($envelope));
}
@@ -55,7 +55,7 @@ class MultiplierRetryStrategyTest extends TestCase
public function testGetWaitTime(int $delay, int $multiplier, int $maxDelay, int $previousRetries, int $expectedDelay)
{
$strategy = new MultiplierRetryStrategy(10, $delay, $multiplier, $maxDelay);
- $envelope = new Envelope(new \stdClass(), [new RedeliveryStamp($previousRetries, 'sender_alias')]);
+ $envelope = new Envelope(new \stdClass(), [new RedeliveryStamp($previousRetries)]);
$this->assertSame($expectedDelay, $strategy->getWaitingTime($envelope));
}
diff --git a/src/Symfony/Component/Messenger/Tests/RetryIntegrationTest.php b/src/Symfony/Component/Messenger/Tests/RetryIntegrationTest.php
deleted file mode 100644
index 2447a0c82b..0000000000
--- a/src/Symfony/Component/Messenger/Tests/RetryIntegrationTest.php
+++ /dev/null
@@ -1,100 +0,0 @@
-
- *
- * For the full copyright and license information, please view the LICENSE
- * file that was distributed with this source code.
- */
-
-namespace Symfony\Component\Messenger\Tests;
-
-use PHPUnit\Framework\TestCase;
-use Psr\Container\ContainerInterface;
-use Symfony\Component\Messenger\Envelope;
-use Symfony\Component\Messenger\Handler\HandlerDescriptor;
-use Symfony\Component\Messenger\Handler\HandlersLocator;
-use Symfony\Component\Messenger\MessageBus;
-use Symfony\Component\Messenger\Middleware\HandleMessageMiddleware;
-use Symfony\Component\Messenger\Middleware\SendMessageMiddleware;
-use Symfony\Component\Messenger\Retry\MultiplierRetryStrategy;
-use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
-use Symfony\Component\Messenger\Tests\Fixtures\DummyMessageHandlerFailingFirstTimes;
-use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
-use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
-use Symfony\Component\Messenger\Transport\Sender\SendersLocator;
-use Symfony\Component\Messenger\Worker;
-
-class RetryIntegrationTest extends TestCase
-{
- public function testRetryMechanism()
- {
- $senderAndReceiver = new DummySenderAndReceiver();
-
- $senderLocator = $this->createMock(ContainerInterface::class);
- $senderLocator->method('has')->with('transportName')->willReturn(true);
- $senderLocator->method('get')->with('transportName')->willReturn($senderAndReceiver);
- $senderLocator = new SendersLocator([DummyMessage::class => ['transportName']], $senderLocator);
-
- $handler = new DummyMessageHandlerFailingFirstTimes(0);
- $throwingHandler = new DummyMessageHandlerFailingFirstTimes(1);
- $handlerLocator = new HandlersLocator([
- DummyMessage::class => [
- new HandlerDescriptor($handler, ['alias' => 'first']),
- new HandlerDescriptor($throwingHandler, ['alias' => 'throwing']),
- ],
- ]);
-
- // dispatch the message, which will get "sent" and then received by DummySenderAndReceiver
- $bus = new MessageBus([new SendMessageMiddleware($senderLocator), new HandleMessageMiddleware($handlerLocator)]);
- $envelope = new Envelope(new DummyMessage('API'));
- $bus->dispatch($envelope);
-
- $worker = new Worker(['transportName' => $senderAndReceiver], $bus, ['transportName' => new MultiplierRetryStrategy()]);
- $worker->run([], function (?Envelope $envelope) use ($worker) {
- if (null === $envelope) {
- $worker->stop();
- }
- });
-
- $this->assertSame(1, $handler->getTimesCalledWithoutThrowing());
- $this->assertSame(1, $throwingHandler->getTimesCalledWithoutThrowing());
- }
-}
-
-class DummySenderAndReceiver implements ReceiverInterface, SenderInterface
-{
- private $messagesWaiting = [];
-
- private $messagesReceived = [];
-
- public function get(): iterable
- {
- $message = array_shift($this->messagesWaiting);
-
- if (null === $message) {
- return [];
- }
-
- $this->messagesReceived[] = $message;
-
- return [$message];
- }
-
- public function ack(Envelope $envelope): void
- {
- }
-
- public function reject(Envelope $envelope): void
- {
- }
-
- public function send(Envelope $envelope): Envelope
- {
- $this->messagesWaiting[] = $envelope;
-
- return $envelope;
- }
-}
diff --git a/src/Symfony/Component/Messenger/Tests/Stamp/RedeliveryStampTest.php b/src/Symfony/Component/Messenger/Tests/Stamp/RedeliveryStampTest.php
index 2b724a5f69..c55a670bf3 100644
--- a/src/Symfony/Component/Messenger/Tests/Stamp/RedeliveryStampTest.php
+++ b/src/Symfony/Component/Messenger/Tests/Stamp/RedeliveryStampTest.php
@@ -19,9 +19,8 @@ class RedeliveryStampTest extends TestCase
{
public function testGetters()
{
- $stamp = new RedeliveryStamp(10, 'sender_alias');
+ $stamp = new RedeliveryStamp(10);
$this->assertSame(10, $stamp->getRetryCount());
- $this->assertSame('sender_alias', $stamp->getSenderClassOrAlias());
$this->assertInstanceOf(\DateTimeInterface::class, $stamp->getRedeliveredAt());
$this->assertNull($stamp->getExceptionMessage());
$this->assertNull($stamp->getFlattenException());
@@ -30,7 +29,7 @@ class RedeliveryStampTest extends TestCase
public function testGettersPopulated()
{
$flattenException = new FlattenException();
- $stamp = new RedeliveryStamp(10, 'sender_alias', 'exception message', $flattenException);
+ $stamp = new RedeliveryStamp(10, 'exception message', $flattenException);
$this->assertSame('exception message', $stamp->getExceptionMessage());
$this->assertSame($flattenException, $stamp->getFlattenException());
}
diff --git a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpExtIntegrationTest.php b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpExtIntegrationTest.php
index 9fddc0646c..6d1c1598f2 100644
--- a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpExtIntegrationTest.php
+++ b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpExtIntegrationTest.php
@@ -95,7 +95,7 @@ class AmqpExtIntegrationTest extends TestCase
$envelope = $envelopes[0];
$newEnvelope = $envelope
->with(new DelayStamp(2000))
- ->with(new RedeliveryStamp(1, 'not_important'));
+ ->with(new RedeliveryStamp(1));
$sender->send($newEnvelope);
$receiver->ack($envelope);
diff --git a/src/Symfony/Component/Messenger/Tests/Transport/Sender/SendersLocatorTest.php b/src/Symfony/Component/Messenger/Tests/Transport/Sender/SendersLocatorTest.php
index d284861c3b..b5c87e58da 100644
--- a/src/Symfony/Component/Messenger/Tests/Transport/Sender/SendersLocatorTest.php
+++ b/src/Symfony/Component/Messenger/Tests/Transport/Sender/SendersLocatorTest.php
@@ -14,7 +14,6 @@ namespace Symfony\Component\Messenger\Tests\Transport\Sender;
use PHPUnit\Framework\TestCase;
use Psr\Container\ContainerInterface;
use Symfony\Component\Messenger\Envelope;
-use Symfony\Component\Messenger\Exception\UnknownSenderException;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Tests\Fixtures\SecondMessage;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
@@ -36,35 +35,6 @@ class SendersLocatorTest extends TestCase
$this->assertSame([], iterator_to_array($locator->getSenders(new Envelope(new SecondMessage()))));
}
- public function testGetSenderByAlias()
- {
- $sender1 = $this->getMockBuilder(SenderInterface::class)->getMock();
- $sender2 = $this->getMockBuilder(SenderInterface::class)->getMock();
- $sendersLocator = $this->createContainer([
- 'sender1' => $sender1,
- 'sender2' => $sender2,
- ]);
-
- $locator = new SendersLocator([], $sendersLocator);
-
- $this->assertSame($sender1, $locator->getSenderByAlias('sender1'));
- $this->assertSame($sender2, $locator->getSenderByAlias('sender2'));
- }
-
- public function testGetSenderByAliasThrowsException()
- {
- $this->expectException(UnknownSenderException::class);
- $this->expectExceptionMessage('Unknown sender alias');
-
- $sender1 = $this->getMockBuilder(SenderInterface::class)->getMock();
- $sendersLocator = $this->createContainer([
- 'sender1' => $sender1,
- ]);
-
- $locator = new SendersLocator([], $sendersLocator);
- $locator->getSenderByAlias('sender2');
- }
-
/**
* @group legacy
*/
diff --git a/src/Symfony/Component/Messenger/Tests/WorkerTest.php b/src/Symfony/Component/Messenger/Tests/WorkerTest.php
index c4d6c782af..6eba4efcae 100644
--- a/src/Symfony/Component/Messenger/Tests/WorkerTest.php
+++ b/src/Symfony/Component/Messenger/Tests/WorkerTest.php
@@ -17,13 +17,9 @@ use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent;
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
use Symfony\Component\Messenger\Event\WorkerStoppedEvent;
-use Symfony\Component\Messenger\Exception\HandlerFailedException;
-use Symfony\Component\Messenger\Exception\UnrecoverableMessageHandlingException;
use Symfony\Component\Messenger\MessageBusInterface;
-use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp;
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
-use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Stamp\SentStamp;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
@@ -48,12 +44,12 @@ class WorkerTest extends TestCase
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
$bus->expects($this->at(0))->method('dispatch')->with(
- $envelope = new Envelope($apiMessage, [new ReceivedStamp('transport'), new ConsumedByWorkerStamp()])
- )->willReturn($envelope);
+ new Envelope($apiMessage, [new ReceivedStamp('transport'), new ConsumedByWorkerStamp()])
+ )->willReturnArgument(0);
$bus->expects($this->at(1))->method('dispatch')->with(
- $envelope = new Envelope($ipaMessage, [new ReceivedStamp('transport'), new ConsumedByWorkerStamp()])
- )->willReturn($envelope);
+ new Envelope($ipaMessage, [new ReceivedStamp('transport'), new ConsumedByWorkerStamp()])
+ )->willReturnArgument(0);
$worker = new Worker(['transport' => $receiver], $bus);
$worker->run([], function (?Envelope $envelope) use ($worker) {
@@ -66,95 +62,7 @@ class WorkerTest extends TestCase
$this->assertSame(2, $receiver->getAcknowledgeCount());
}
- public function testWorkerDoesNotWrapMessagesAlreadyWrappedWithReceivedMessage()
- {
- $envelope = new Envelope(new DummyMessage('API'));
- $receiver = new DummyReceiver([[$envelope]]);
- $envelope = $envelope->with(new ReceivedStamp('transport'), new ConsumedByWorkerStamp());
-
- $bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
- $bus->expects($this->at(0))->method('dispatch')->with($envelope)->willReturn($envelope);
-
- $worker = new Worker(['transport' => $receiver], $bus, []);
- $worker->run([], function (?Envelope $envelope) use ($worker) {
- // stop after the messages finish
- if (null === $envelope) {
- $worker->stop();
- }
- });
- }
-
- public function testDispatchCausesRetry()
- {
- $receiver = new DummyReceiver([
- [new Envelope(new DummyMessage('Hello'), [new SentStamp('Some\Sender', 'transport1')])],
- ]);
-
- $bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
- $bus->expects($this->at(0))->method('dispatch')->willThrowException(new \InvalidArgumentException('Why not'));
-
- // 2nd call will be the retry
- $bus->expects($this->at(1))->method('dispatch')->with($this->callback(function (Envelope $envelope) {
- /** @var RedeliveryStamp|null $redeliveryStamp */
- $redeliveryStamp = $envelope->last(RedeliveryStamp::class);
- $this->assertNotNull($redeliveryStamp);
- // retry count now at 1
- $this->assertSame(1, $redeliveryStamp->getRetryCount());
- $this->assertSame('transport1', $redeliveryStamp->getSenderClassOrAlias());
-
- // received stamp is removed
- $this->assertNull($envelope->last(ReceivedStamp::class));
-
- return true;
- }))->willReturnArgument(0);
-
- $retryStrategy = $this->getMockBuilder(RetryStrategyInterface::class)->getMock();
- $retryStrategy->expects($this->once())->method('isRetryable')->willReturn(true);
-
- $worker = new Worker(['transport1' => $receiver], $bus, ['transport1' => $retryStrategy]);
- $worker->run([], function (?Envelope $envelope) use ($worker) {
- // stop after the messages finish
- if (null === $envelope) {
- $worker->stop();
- }
- });
-
- // old message rejected
- $this->assertSame(1, $receiver->getRejectCount());
- }
-
- public function testUnrecoverableMessageHandlingExceptionPreventsRetries()
- {
- $envelope1 = new Envelope(new DummyMessage('Unwrapped Exception'), [new SentStamp('Some\Sender', 'transport1')]);
- $envelope2 = new Envelope(new DummyMessage('Wrapped Exception'), [new SentStamp('Some\Sender', 'transport1')]);
-
- $receiver = new DummyReceiver([
- [$envelope1],
- [$envelope2],
- ]);
-
- $bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
- $bus->expects($this->at(0))->method('dispatch')->willThrowException(new UnrecoverableMessageHandlingException());
- $bus->expects($this->at(1))->method('dispatch')->willThrowException(
- new HandlerFailedException($envelope2, [new UnrecoverableMessageHandlingException()])
- );
-
- $retryStrategy = $this->getMockBuilder(RetryStrategyInterface::class)->getMock();
- $retryStrategy->expects($this->never())->method('isRetryable')->willReturn(true);
-
- $worker = new Worker(['transport1' => $receiver], $bus, ['transport1' => $retryStrategy]);
- $worker->run([], function (?Envelope $envelope) use ($worker) {
- // stop after the messages finish
- if (null === $envelope) {
- $worker->stop();
- }
- });
-
- // message was rejected
- $this->assertSame(2, $receiver->getRejectCount());
- }
-
- public function testDispatchCausesRejectWhenNoRetry()
+ public function testHandlingErrorCausesReject()
{
$receiver = new DummyReceiver([
[new Envelope(new DummyMessage('Hello'), [new SentStamp('Some\Sender', 'transport1')])],
@@ -163,10 +71,7 @@ class WorkerTest extends TestCase
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
$bus->method('dispatch')->willThrowException(new \InvalidArgumentException('Why not'));
- $retryStrategy = $this->getMockBuilder(RetryStrategyInterface::class)->getMock();
- $retryStrategy->expects($this->once())->method('isRetryable')->willReturn(false);
-
- $worker = new Worker(['transport1' => $receiver], $bus, ['transport1' => $retryStrategy]);
+ $worker = new Worker(['transport1' => $receiver], $bus);
$worker->run([], function (?Envelope $envelope) use ($worker) {
// stop after the messages finish
if (null === $envelope) {
@@ -177,28 +82,6 @@ class WorkerTest extends TestCase
$this->assertSame(0, $receiver->getAcknowledgeCount());
}
- public function testDispatchCausesRejectOnUnrecoverableMessage()
- {
- $receiver = new DummyReceiver([
- [new Envelope(new DummyMessage('Hello'))],
- ]);
-
- $bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
- $bus->method('dispatch')->willThrowException(new UnrecoverableMessageHandlingException('Will never work'));
-
- $retryStrategy = $this->getMockBuilder(RetryStrategyInterface::class)->getMock();
- $retryStrategy->expects($this->never())->method('isRetryable');
-
- $worker = new Worker(['transport1' => $receiver], $bus, ['transport1' => $retryStrategy]);
- $worker->run([], function (?Envelope $envelope) use ($worker) {
- // stop after the messages finish
- if (null === $envelope) {
- $worker->stop();
- }
- });
- $this->assertSame(1, $receiver->getRejectCount());
- }
-
public function testWorkerDoesNotSendNullMessagesToTheBus()
{
$receiver = new DummyReceiver([
@@ -235,7 +118,7 @@ class WorkerTest extends TestCase
[$this->isInstanceOf(WorkerStoppedEvent::class)]
);
- $worker = new Worker([$receiver], $bus, [], $eventDispatcher);
+ $worker = new Worker([$receiver], $bus, $eventDispatcher);
$worker->run([], function (?Envelope $envelope) use ($worker) {
// stop after the messages finish
if (null === $envelope) {
@@ -263,7 +146,7 @@ class WorkerTest extends TestCase
[$this->isInstanceOf(WorkerStoppedEvent::class)]
);
- $worker = new Worker([$receiver], $bus, [], $eventDispatcher);
+ $worker = new Worker([$receiver], $bus, $eventDispatcher);
$worker->run([], function (?Envelope $envelope) use ($worker) {
// stop after the messages finish
if (null === $envelope) {
diff --git a/src/Symfony/Component/Messenger/Transport/Sender/SendersLocator.php b/src/Symfony/Component/Messenger/Transport/Sender/SendersLocator.php
index 41fc2f3ade..7f64851b9d 100644
--- a/src/Symfony/Component/Messenger/Transport/Sender/SendersLocator.php
+++ b/src/Symfony/Component/Messenger/Transport/Sender/SendersLocator.php
@@ -15,7 +15,6 @@ use Psr\Container\ContainerInterface;
use Symfony\Component\DependencyInjection\ServiceLocator;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\RuntimeException;
-use Symfony\Component\Messenger\Exception\UnknownSenderException;
use Symfony\Component\Messenger\Handler\HandlersLocator;
/**
@@ -79,13 +78,4 @@ class SendersLocator implements SendersLocatorInterface
}
}
}
-
- public function getSenderByAlias(string $alias): SenderInterface
- {
- if ($this->sendersLocator->has($alias)) {
- return $this->sendersLocator->get($alias);
- }
-
- throw new UnknownSenderException(sprintf('Unknown sender alias "%s".', $alias));
- }
}
diff --git a/src/Symfony/Component/Messenger/Transport/Sender/SendersLocatorInterface.php b/src/Symfony/Component/Messenger/Transport/Sender/SendersLocatorInterface.php
index e74f82f986..916f780d74 100644
--- a/src/Symfony/Component/Messenger/Transport/Sender/SendersLocatorInterface.php
+++ b/src/Symfony/Component/Messenger/Transport/Sender/SendersLocatorInterface.php
@@ -12,7 +12,6 @@
namespace Symfony\Component\Messenger\Transport\Sender;
use Symfony\Component\Messenger\Envelope;
-use Symfony\Component\Messenger\Exception\UnknownSenderException;
/**
* Maps a message to a list of senders.
@@ -28,13 +27,4 @@ interface SendersLocatorInterface
* @return iterable|SenderInterface[] Indexed by sender alias if available
*/
public function getSenders(Envelope $envelope): iterable;
-
- /**
- * Returns a specific sender by its alias.
- *
- * @param string $alias The alias given to the sender in getSenders()
- *
- * @throws UnknownSenderException If the sender is not found
- */
- public function getSenderByAlias(string $alias): SenderInterface;
}
diff --git a/src/Symfony/Component/Messenger/Worker.php b/src/Symfony/Component/Messenger/Worker.php
index 1765fbec2a..6fcce7e368 100644
--- a/src/Symfony/Component/Messenger/Worker.php
+++ b/src/Symfony/Component/Messenger/Worker.php
@@ -19,12 +19,8 @@ use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
use Symfony\Component\Messenger\Event\WorkerStoppedEvent;
use Symfony\Component\Messenger\Exception\HandlerFailedException;
use Symfony\Component\Messenger\Exception\RejectRedeliveredMessageException;
-use Symfony\Component\Messenger\Exception\UnrecoverableExceptionInterface;
-use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp;
-use Symfony\Component\Messenger\Stamp\DelayStamp;
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
-use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
@@ -37,20 +33,17 @@ class Worker implements WorkerInterface
{
private $receivers;
private $bus;
- private $retryStrategies;
private $eventDispatcher;
private $logger;
private $shouldStop = false;
/**
- * @param ReceiverInterface[] $receivers Where the key is the transport name
- * @param RetryStrategyInterface[] $retryStrategies Retry strategies for each receiver (array keys must match)
+ * @param ReceiverInterface[] $receivers Where the key is the transport name
*/
- public function __construct(array $receivers, MessageBusInterface $bus, array $retryStrategies = [], EventDispatcherInterface $eventDispatcher = null, LoggerInterface $logger = null)
+ public function __construct(array $receivers, MessageBusInterface $bus, EventDispatcherInterface $eventDispatcher = null, LoggerInterface $logger = null)
{
$this->receivers = $receivers;
$this->bus = $bus;
- $this->retryStrategies = $retryStrategies;
$this->eventDispatcher = LegacyEventDispatcherProxy::decorate($eventDispatcher);
$this->logger = $logger;
}
@@ -91,7 +84,7 @@ class Worker implements WorkerInterface
foreach ($envelopes as $envelope) {
$envelopeHandled = true;
- $this->handleMessage($envelope, $receiver, $transportName, $this->retryStrategies[$transportName] ?? null);
+ $this->handleMessage($envelope, $receiver, $transportName);
$onHandled($envelope);
if ($this->shouldStop) {
@@ -117,7 +110,7 @@ class Worker implements WorkerInterface
$this->dispatchEvent(new WorkerStoppedEvent());
}
- private function handleMessage(Envelope $envelope, ReceiverInterface $receiver, string $transportName, ?RetryStrategyInterface $retryStrategy): void
+ private function handleMessage(Envelope $envelope, ReceiverInterface $receiver, string $transportName): void
{
$event = new WorkerMessageReceivedEvent($envelope, $transportName);
$this->dispatchEvent($event);
@@ -126,12 +119,6 @@ class Worker implements WorkerInterface
return;
}
- $message = $envelope->getMessage();
- $context = [
- 'message' => $message,
- 'class' => \get_class($message),
- ];
-
try {
$envelope = $this->bus->dispatch($envelope->with(new ReceivedStamp($transportName), new ConsumedByWorkerStamp()));
} catch (\Throwable $throwable) {
@@ -146,30 +133,7 @@ class Worker implements WorkerInterface
$envelope = $throwable->getEnvelope();
}
- $shouldRetry = $retryStrategy && $this->shouldRetry($throwable, $envelope, $retryStrategy);
-
- $this->dispatchEvent(new WorkerMessageFailedEvent($envelope, $transportName, $throwable, $shouldRetry));
-
- $retryCount = RedeliveryStamp::getRetryCountFromEnvelope($envelope);
- if ($shouldRetry) {
- ++$retryCount;
- $delay = $retryStrategy->getWaitingTime($envelope);
- if (null !== $this->logger) {
- $this->logger->error('Error thrown while handling message {class}. Dispatching for retry #{retryCount} using {delay} ms delay. Error: "{error}"', $context + ['retryCount' => $retryCount, 'delay' => $delay, 'error' => $throwable->getMessage(), 'exception' => $throwable]);
- }
-
- // add the delay and retry stamp info + remove ReceivedStamp
- $retryEnvelope = $envelope->with(new DelayStamp($delay))
- ->with(new RedeliveryStamp($retryCount, $transportName))
- ->withoutAll(ReceivedStamp::class);
-
- // re-send the message for retry
- $this->bus->dispatch($retryEnvelope);
- } else {
- if (null !== $this->logger) {
- $this->logger->critical('Error thrown while handling message {class}. Removing from transport after {retryCount} retries. Error: "{error}"', $context + ['retryCount' => $retryCount, 'error' => $throwable->getMessage(), 'exception' => $throwable]);
- }
- }
+ $this->dispatchEvent(new WorkerMessageFailedEvent($envelope, $transportName, $throwable));
if (!$rejectFirst) {
$receiver->reject($envelope);
@@ -181,6 +145,11 @@ class Worker implements WorkerInterface
$this->dispatchEvent(new WorkerMessageHandledEvent($envelope, $transportName));
if (null !== $this->logger) {
+ $message = $envelope->getMessage();
+ $context = [
+ 'message' => $message,
+ 'class' => \get_class($message),
+ ];
$this->logger->info('{class} was handled successfully (acknowledging to transport).', $context);
}
@@ -200,27 +169,4 @@ class Worker implements WorkerInterface
$this->eventDispatcher->dispatch($event);
}
-
- private function shouldRetry(\Throwable $e, Envelope $envelope, RetryStrategyInterface $retryStrategy): bool
- {
- // if ALL nested Exceptions are an instance of UnrecoverableExceptionInterface we should not retry
- if ($e instanceof HandlerFailedException) {
- $shouldNotRetry = true;
- foreach ($e->getNestedExceptions() as $nestedException) {
- if (!$nestedException instanceof UnrecoverableExceptionInterface) {
- $shouldNotRetry = false;
- break;
- }
- }
- if ($shouldNotRetry) {
- return false;
- }
- }
-
- if ($e instanceof UnrecoverableExceptionInterface) {
- return false;
- }
-
- return $retryStrategy->isRetryable($envelope);
- }
}