[Messenger] extract worker logic to listener and sent messages for retry

and failure directly to transport instead of redispatching on the bus
This commit is contained in:
Tobias Schultze 2019-10-29 15:55:01 +01:00
parent cf10c02765
commit d7e0f98cd0
32 changed files with 332 additions and 602 deletions

View File

@ -154,6 +154,11 @@ Messenger
* Deprecated passing a `ContainerInterface` instance as first argument of the `ConsumeMessagesCommand` constructor, * Deprecated passing a `ContainerInterface` instance as first argument of the `ConsumeMessagesCommand` constructor,
pass a `RoutableMessageBus` instance instead. pass a `RoutableMessageBus` instance instead.
* [BC BREAK] Removed `SendersLocatorInterface::getSenderByAlias` added in 4.3.
* [BC BREAK] Removed `$retryStrategies` argument from `Worker::__construct`.
* [BC BREAK] Removed `$retryStrategyLocator` argument from `ConsumeMessagesCommand::__construct`.
* [BC BREAK] Removed `$senderClassOrAlias` argument from `RedeliveryStamp::__construct`.
* [BC BREAK] Removed `UnknownSenderException`.
Mime Mime
---- ----

View File

@ -1810,20 +1810,29 @@ class FrameworkExtension extends Extension
$messageToSendersMapping[$message] = $messageConfiguration['senders']; $messageToSendersMapping[$message] = $messageConfiguration['senders'];
} }
$sendersServiceLocator = ServiceLocatorTagPass::register($container, $senderReferences);
$container->getDefinition('messenger.senders_locator') $container->getDefinition('messenger.senders_locator')
->replaceArgument(0, $messageToSendersMapping) ->replaceArgument(0, $messageToSendersMapping)
->replaceArgument(1, ServiceLocatorTagPass::register($container, $senderReferences)) ->replaceArgument(1, $sendersServiceLocator)
;
$container->getDefinition('messenger.retry.send_failed_message_for_retry_listener')
->replaceArgument(0, $sendersServiceLocator)
; ;
$container->getDefinition('messenger.retry_strategy_locator') $container->getDefinition('messenger.retry_strategy_locator')
->replaceArgument(0, $transportRetryReferences); ->replaceArgument(0, $transportRetryReferences);
if ($config['failure_transport']) { if ($config['failure_transport']) {
if (!isset($senderReferences[$config['failure_transport']])) {
throw new LogicException(sprintf('Invalid Messenger configuration: the failure transport "%s" is not a valid transport or service id.', $config['failure_transport']));
}
$container->getDefinition('messenger.failure.send_failed_message_to_failure_transport_listener') $container->getDefinition('messenger.failure.send_failed_message_to_failure_transport_listener')
->replaceArgument(1, $config['failure_transport']); ->replaceArgument(0, $senderReferences[$config['failure_transport']]);
$container->getDefinition('console.command.messenger_failed_messages_retry') $container->getDefinition('console.command.messenger_failed_messages_retry')
->replaceArgument(0, $config['failure_transport']) ->replaceArgument(0, $config['failure_transport']);
->replaceArgument(4, $transportRetryReferences[$config['failure_transport']] ?? null);
$container->getDefinition('console.command.messenger_failed_messages_show') $container->getDefinition('console.command.messenger_failed_messages_show')
->replaceArgument(0, $config['failure_transport']); ->replaceArgument(0, $config['failure_transport']);
$container->getDefinition('console.command.messenger_failed_messages_remove') $container->getDefinition('console.command.messenger_failed_messages_remove')

View File

@ -86,7 +86,6 @@
<argument type="service" id="messenger.receiver_locator" /> <argument type="service" id="messenger.receiver_locator" />
<argument type="service" id="logger" on-invalid="null" /> <argument type="service" id="logger" on-invalid="null" />
<argument type="collection" /> <!-- Receiver names --> <argument type="collection" /> <!-- Receiver names -->
<argument type="service" id="messenger.retry_strategy_locator" />
<argument type="service" id="event_dispatcher" /> <argument type="service" id="event_dispatcher" />
<call method="setCachePoolForRestartSignal"> <call method="setCachePoolForRestartSignal">
<argument type="service" id="cache.messenger.restart_workers_signal" /> <argument type="service" id="cache.messenger.restart_workers_signal" />
@ -116,10 +115,9 @@
<service id="console.command.messenger_failed_messages_retry" class="Symfony\Component\Messenger\Command\FailedMessagesRetryCommand"> <service id="console.command.messenger_failed_messages_retry" class="Symfony\Component\Messenger\Command\FailedMessagesRetryCommand">
<argument /> <!-- Receiver name --> <argument /> <!-- Receiver name -->
<argument /> <!-- Receiver locator --> <argument /> <!-- Receiver -->
<argument type="service" id="messenger.routable_message_bus" /> <argument type="service" id="messenger.routable_message_bus" />
<argument type="service" id="event_dispatcher" /> <argument type="service" id="event_dispatcher" />
<argument /> <!-- Retry strategy -->
<argument type="service" id="logger" /> <argument type="service" id="logger" />
<tag name="console.command" command="messenger:failed:retry" /> <tag name="console.command" command="messenger:failed:retry" />
@ -127,14 +125,14 @@
<service id="console.command.messenger_failed_messages_show" class="Symfony\Component\Messenger\Command\FailedMessagesShowCommand"> <service id="console.command.messenger_failed_messages_show" class="Symfony\Component\Messenger\Command\FailedMessagesShowCommand">
<argument /> <!-- Receiver name --> <argument /> <!-- Receiver name -->
<argument /> <!-- Receiver locator --> <argument /> <!-- Receiver -->
<tag name="console.command" command="messenger:failed:show" /> <tag name="console.command" command="messenger:failed:show" />
</service> </service>
<service id="console.command.messenger_failed_messages_remove" class="Symfony\Component\Messenger\Command\FailedMessagesRemoveCommand"> <service id="console.command.messenger_failed_messages_remove" class="Symfony\Component\Messenger\Command\FailedMessagesRemoveCommand">
<argument /> <!-- Receiver name --> <argument /> <!-- Receiver name -->
<argument /> <!-- Receiver locator --> <argument /> <!-- Receiver -->
<tag name="console.command" command="messenger:failed:remove" /> <tag name="console.command" command="messenger:failed:remove" />
</service> </service>

View File

@ -10,7 +10,7 @@
<!-- Asynchronous --> <!-- Asynchronous -->
<service id="messenger.senders_locator" class="Symfony\Component\Messenger\Transport\Sender\SendersLocator"> <service id="messenger.senders_locator" class="Symfony\Component\Messenger\Transport\Sender\SendersLocator">
<argument type="collection" /> <!-- Per message senders map --> <argument type="collection" /> <!-- Per message senders map -->
<argument /> <!-- senders locator --> <argument /> <!-- senders service locator -->
</service> </service>
<service id="messenger.middleware.send_message" class="Symfony\Component\Messenger\Middleware\SendMessageMiddleware"> <service id="messenger.middleware.send_message" class="Symfony\Component\Messenger\Middleware\SendMessageMiddleware">
<tag name="monolog.logger" channel="messenger" /> <tag name="monolog.logger" channel="messenger" />
@ -98,12 +98,19 @@
<argument /> <!-- max delay ms --> <argument /> <!-- max delay ms -->
</service> </service>
<service id="messenger.retry.send_failed_message_for_retry_listener" class="Symfony\Component\Messenger\EventListener\SendFailedMessageForRetryListener">
<tag name="kernel.event_subscriber" />
<tag name="monolog.logger" channel="messenger" />
<argument /> <!-- senders service locator -->
<argument type="service" id="messenger.retry_strategy_locator" />
<argument type="service" id="logger" on-invalid="ignore" />
</service>
<!-- failed handling --> <!-- failed handling -->
<service id="messenger.failure.send_failed_message_to_failure_transport_listener" class="Symfony\Component\Messenger\EventListener\SendFailedMessageToFailureTransportListener"> <service id="messenger.failure.send_failed_message_to_failure_transport_listener" class="Symfony\Component\Messenger\EventListener\SendFailedMessageToFailureTransportListener">
<tag name="kernel.event_subscriber" /> <tag name="kernel.event_subscriber" />
<tag name="monolog.logger" channel="messenger" /> <tag name="monolog.logger" channel="messenger" />
<argument type="service" id="messenger.routable_message_bus" /> <argument /> <!-- Failure transport -->
<argument /> <!-- Failure transport name -->
<argument type="service" id="logger" on-invalid="ignore" /> <argument type="service" id="logger" on-invalid="ignore" />
</service> </service>

View File

@ -45,7 +45,7 @@
"symfony/http-client": "^4.4|^5.0", "symfony/http-client": "^4.4|^5.0",
"symfony/lock": "^4.4|^5.0", "symfony/lock": "^4.4|^5.0",
"symfony/mailer": "^4.4|^5.0", "symfony/mailer": "^4.4|^5.0",
"symfony/messenger": "^4.3.6|^5.0", "symfony/messenger": "^4.4|^5.0",
"symfony/mime": "^4.4|^5.0", "symfony/mime": "^4.4|^5.0",
"symfony/process": "^3.4|^4.0|^5.0", "symfony/process": "^3.4|^4.0|^5.0",
"symfony/security-csrf": "^3.4|^4.0|^5.0", "symfony/security-csrf": "^3.4|^4.0|^5.0",
@ -77,7 +77,7 @@
"symfony/form": "<4.3", "symfony/form": "<4.3",
"symfony/lock": "<4.4", "symfony/lock": "<4.4",
"symfony/mailer": "<4.4", "symfony/mailer": "<4.4",
"symfony/messenger": "<4.3.6", "symfony/messenger": "<4.4",
"symfony/mime": "<4.4", "symfony/mime": "<4.4",
"symfony/property-info": "<3.4", "symfony/property-info": "<3.4",
"symfony/security-bundle": "<4.4", "symfony/security-bundle": "<4.4",

View File

@ -11,6 +11,12 @@ CHANGELOG
* Made all dispatched worker event classes final. * Made all dispatched worker event classes final.
* Added support for `from_transport` attribute on `messenger.message_handler` tag. * Added support for `from_transport` attribute on `messenger.message_handler` tag.
* Added support for passing `dbindex` as a query parameter to the redis transport DSN. * Added support for passing `dbindex` as a query parameter to the redis transport DSN.
* [BC BREAK] Removed `SendersLocatorInterface::getSenderByAlias` added in 4.3.
* [BC BREAK] Removed `$retryStrategies` argument from `Worker::__construct`.
* [BC BREAK] Removed `$retryStrategyLocator` argument from `ConsumeMessagesCommand::__construct`.
* [BC BREAK] Removed `$senderClassOrAlias` argument from `RedeliveryStamp::__construct`.
* [BC BREAK] Removed `UnknownSenderException`.
* The component is not marked as `@experimental` anymore.
4.3.0 4.3.0
----- -----

View File

@ -89,7 +89,7 @@ abstract class AbstractFailedMessagesCommand extends Command
$redeliveryStamps = $envelope->all(RedeliveryStamp::class); $redeliveryStamps = $envelope->all(RedeliveryStamp::class);
$io->writeln(' Message history:'); $io->writeln(' Message history:');
foreach ($redeliveryStamps as $redeliveryStamp) { foreach ($redeliveryStamps as $redeliveryStamp) {
$io->writeln(sprintf(' * Message failed and redelivered to the <info>%s</info> transport at <info>%s</info>', $redeliveryStamp->getSenderClassOrAlias(), $redeliveryStamp->getRedeliveredAt()->format('Y-m-d H:i:s'))); $io->writeln(sprintf(' * Message failed at <info>%s</info> and was redelivered', $redeliveryStamp->getRedeliveredAt()->format('Y-m-d H:i:s')));
} }
$io->newLine(); $io->newLine();

View File

@ -42,7 +42,6 @@ class ConsumeMessagesCommand extends Command
private $receiverLocator; private $receiverLocator;
private $logger; private $logger;
private $receiverNames; private $receiverNames;
private $retryStrategyLocator;
private $eventDispatcher; private $eventDispatcher;
/** @var CacheItemPoolInterface|null */ /** @var CacheItemPoolInterface|null */
private $restartSignalCachePool; private $restartSignalCachePool;
@ -50,7 +49,7 @@ class ConsumeMessagesCommand extends Command
/** /**
* @param RoutableMessageBus $routableBus * @param RoutableMessageBus $routableBus
*/ */
public function __construct($routableBus, ContainerInterface $receiverLocator, LoggerInterface $logger = null, array $receiverNames = [], /* ContainerInterface */ $retryStrategyLocator = null, EventDispatcherInterface $eventDispatcher = null) public function __construct($routableBus, ContainerInterface $receiverLocator, LoggerInterface $logger = null, array $receiverNames = [], /* EventDispatcherInterface */ $eventDispatcher = null)
{ {
if ($routableBus instanceof ContainerInterface) { if ($routableBus instanceof ContainerInterface) {
@trigger_error(sprintf('Passing a "%s" instance as first argument to "%s()" is deprecated since Symfony 4.4, pass a "%s" instance instead.', ContainerInterface::class, __METHOD__, RoutableMessageBus::class), E_USER_DEPRECATED); @trigger_error(sprintf('Passing a "%s" instance as first argument to "%s()" is deprecated since Symfony 4.4, pass a "%s" instance instead.', ContainerInterface::class, __METHOD__, RoutableMessageBus::class), E_USER_DEPRECATED);
@ -59,17 +58,16 @@ class ConsumeMessagesCommand extends Command
throw new \TypeError(sprintf('The first argument must be an instance of "%s".', RoutableMessageBus::class)); throw new \TypeError(sprintf('The first argument must be an instance of "%s".', RoutableMessageBus::class));
} }
if (\is_array($retryStrategyLocator)) { if (null !== $eventDispatcher && !$eventDispatcher instanceof EventDispatcherInterface) {
@trigger_error(sprintf('The 5th argument of the class "%s" should be a retry-strategy locator, an array of bus names as a value is deprecated since Symfony 4.3.', __CLASS__), E_USER_DEPRECATED); @trigger_error(sprintf('The 5th argument of the class "%s" should be a "%s"', __CLASS__, EventDispatcherInterface::class), E_USER_DEPRECATED);
$retryStrategyLocator = null; $eventDispatcher = null;
} }
$this->routableBus = $routableBus; $this->routableBus = $routableBus;
$this->receiverLocator = $receiverLocator; $this->receiverLocator = $receiverLocator;
$this->logger = $logger; $this->logger = $logger;
$this->receiverNames = $receiverNames; $this->receiverNames = $receiverNames;
$this->retryStrategyLocator = $retryStrategyLocator;
$this->eventDispatcher = $eventDispatcher; $this->eventDispatcher = $eventDispatcher;
parent::__construct(); parent::__construct();
@ -166,7 +164,6 @@ EOF
} }
$receivers = []; $receivers = [];
$retryStrategies = [];
foreach ($receiverNames = $input->getArgument('receivers') as $receiverName) { foreach ($receiverNames = $input->getArgument('receivers') as $receiverName) {
if (!$this->receiverLocator->has($receiverName)) { if (!$this->receiverLocator->has($receiverName)) {
$message = sprintf('The receiver "%s" does not exist.', $receiverName); $message = sprintf('The receiver "%s" does not exist.', $receiverName);
@ -177,17 +174,12 @@ EOF
throw new RuntimeException($message); throw new RuntimeException($message);
} }
if (null !== $this->retryStrategyLocator && !$this->retryStrategyLocator->has($receiverName)) {
throw new RuntimeException(sprintf('Receiver "%s" does not have a configured retry strategy.', $receiverName));
}
$receivers[$receiverName] = $this->receiverLocator->get($receiverName); $receivers[$receiverName] = $this->receiverLocator->get($receiverName);
$retryStrategies[$receiverName] = null !== $this->retryStrategyLocator ? $this->retryStrategyLocator->get($receiverName) : null;
} }
$bus = $input->getOption('bus') ? $this->routableBus->getMessageBus($input->getOption('bus')) : $this->routableBus; $bus = $input->getOption('bus') ? $this->routableBus->getMessageBus($input->getOption('bus')) : $this->routableBus;
$worker = new Worker($receivers, $bus, $retryStrategies, $this->eventDispatcher, $this->logger); $worker = new Worker($receivers, $bus, $this->eventDispatcher, $this->logger);
$stopsWhen = []; $stopsWhen = [];
if ($limit = $input->getOption('limit')) { if ($limit = $input->getOption('limit')) {
$stopsWhen[] = "processed {$limit} messages"; $stopsWhen[] = "processed {$limit} messages";

View File

@ -24,7 +24,6 @@ use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent; use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
use Symfony\Component\Messenger\Exception\LogicException; use Symfony\Component\Messenger\Exception\LogicException;
use Symfony\Component\Messenger\MessageBusInterface; use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface; use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface; use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\Messenger\Transport\Receiver\SingleMessageReceiver; use Symfony\Component\Messenger\Transport\Receiver\SingleMessageReceiver;
@ -39,14 +38,12 @@ class FailedMessagesRetryCommand extends AbstractFailedMessagesCommand
private $eventDispatcher; private $eventDispatcher;
private $messageBus; private $messageBus;
private $retryStrategy;
private $logger; private $logger;
public function __construct(string $receiverName, ReceiverInterface $receiver, MessageBusInterface $messageBus, EventDispatcherInterface $eventDispatcher, RetryStrategyInterface $retryStrategy = null, LoggerInterface $logger = null) public function __construct(string $receiverName, ReceiverInterface $receiver, MessageBusInterface $messageBus, EventDispatcherInterface $eventDispatcher, LoggerInterface $logger = null)
{ {
$this->eventDispatcher = $eventDispatcher; $this->eventDispatcher = $eventDispatcher;
$this->messageBus = $messageBus; $this->messageBus = $messageBus;
$this->retryStrategy = $retryStrategy;
$this->logger = $logger; $this->logger = $logger;
parent::__construct($receiverName, $receiver); parent::__construct($receiverName, $receiver);
@ -180,7 +177,6 @@ EOF
$worker = new Worker( $worker = new Worker(
[$this->getReceiverName() => $receiver], [$this->getReceiverName() => $receiver],
$this->messageBus, $this->messageBus,
[$this->getReceiverName() => $this->retryStrategy],
$this->eventDispatcher, $this->eventDispatcher,
$this->logger $this->logger
); );

View File

@ -21,12 +21,11 @@ use Symfony\Component\Messenger\Envelope;
final class WorkerMessageFailedEvent extends AbstractWorkerMessageEvent final class WorkerMessageFailedEvent extends AbstractWorkerMessageEvent
{ {
private $throwable; private $throwable;
private $willRetry; private $willRetry = false;
public function __construct(Envelope $envelope, string $receiverName, \Throwable $error, bool $willRetry) public function __construct(Envelope $envelope, string $receiverName, \Throwable $error)
{ {
$this->throwable = $error; $this->throwable = $error;
$this->willRetry = $willRetry;
parent::__construct($envelope, $receiverName); parent::__construct($envelope, $receiverName);
} }
@ -40,4 +39,9 @@ final class WorkerMessageFailedEvent extends AbstractWorkerMessageEvent
{ {
return $this->willRetry; return $this->willRetry;
} }
public function setForRetry(): void
{
$this->willRetry = true;
}
} }

View File

@ -0,0 +1,126 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\EventListener;
use Psr\Container\ContainerInterface;
use Psr\Log\LoggerInterface;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
use Symfony\Component\Messenger\Exception\HandlerFailedException;
use Symfony\Component\Messenger\Exception\RuntimeException;
use Symfony\Component\Messenger\Exception\UnrecoverableExceptionInterface;
use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
use Symfony\Component\Messenger\Stamp\DelayStamp;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
/**
* @author Tobias Schultze <http://tobion.de>
*/
class SendFailedMessageForRetryListener implements EventSubscriberInterface
{
private $sendersLocator;
private $retryStrategyLocator;
private $logger;
public function __construct(ContainerInterface $sendersLocator, ContainerInterface $retryStrategyLocator, LoggerInterface $logger = null)
{
$this->sendersLocator = $sendersLocator;
$this->retryStrategyLocator = $retryStrategyLocator;
$this->logger = $logger;
}
public function onMessageFailed(WorkerMessageFailedEvent $event)
{
$retryStrategy = $this->getRetryStrategyForTransport($event->getReceiverName());
$envelope = $event->getEnvelope();
$throwable = $event->getThrowable();
$message = $envelope->getMessage();
$context = [
'message' => $message,
'class' => \get_class($message),
];
$shouldRetry = $retryStrategy && $this->shouldRetry($throwable, $envelope, $retryStrategy);
$retryCount = RedeliveryStamp::getRetryCountFromEnvelope($envelope);
if ($shouldRetry) {
$event->setForRetry();
++$retryCount;
$delay = $retryStrategy->getWaitingTime($envelope);
if (null !== $this->logger) {
$this->logger->error('Error thrown while handling message {class}. Sending for retry #{retryCount} using {delay} ms delay. Error: "{error}"', $context + ['retryCount' => $retryCount, 'delay' => $delay, 'error' => $throwable->getMessage(), 'exception' => $throwable]);
}
// add the delay and retry stamp info
$retryEnvelope = $envelope->with(new DelayStamp($delay), new RedeliveryStamp($retryCount));
// re-send the message for retry
$this->getSenderForTransport($event->getReceiverName())->send($retryEnvelope);
} else {
if (null !== $this->logger) {
$this->logger->critical('Error thrown while handling message {class}. Removing from transport after {retryCount} retries. Error: "{error}"', $context + ['retryCount' => $retryCount, 'error' => $throwable->getMessage(), 'exception' => $throwable]);
}
}
}
public static function getSubscribedEvents()
{
return [
// must have higher priority than SendFailedMessageToFailureTransportListener
WorkerMessageFailedEvent::class => ['onMessageFailed', 100],
];
}
private function shouldRetry(\Throwable $e, Envelope $envelope, RetryStrategyInterface $retryStrategy): bool
{
// if ALL nested Exceptions are an instance of UnrecoverableExceptionInterface we should not retry
if ($e instanceof HandlerFailedException) {
$shouldNotRetry = true;
foreach ($e->getNestedExceptions() as $nestedException) {
if (!$nestedException instanceof UnrecoverableExceptionInterface) {
$shouldNotRetry = false;
break;
}
}
if ($shouldNotRetry) {
return false;
}
}
if ($e instanceof UnrecoverableExceptionInterface) {
return false;
}
return $retryStrategy->isRetryable($envelope);
}
private function getRetryStrategyForTransport(string $alias): ?RetryStrategyInterface
{
if ($this->retryStrategyLocator->has($alias)) {
return $this->retryStrategyLocator->get($alias);
}
return null;
}
private function getSenderForTransport(string $alias): SenderInterface
{
if ($this->sendersLocator->has($alias)) {
return $this->sendersLocator->get($alias);
}
throw new RuntimeException(sprintf('Could not find sender "%s" based on the same receiver to send the failed message to for retry.', $alias));
}
}

View File

@ -15,12 +15,10 @@ use Symfony\Component\ErrorRenderer\Exception\FlattenException;
use Symfony\Component\EventDispatcher\EventSubscriberInterface; use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent; use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
use Symfony\Component\Messenger\Exception\HandlerFailedException; use Symfony\Component\Messenger\Exception\HandlerFailedException;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\DelayStamp; use Symfony\Component\Messenger\Stamp\DelayStamp;
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp; use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp; use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp; use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
/** /**
* Sends a rejected message to a "failure transport". * Sends a rejected message to a "failure transport".
@ -29,14 +27,12 @@ use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
*/ */
class SendFailedMessageToFailureTransportListener implements EventSubscriberInterface class SendFailedMessageToFailureTransportListener implements EventSubscriberInterface
{ {
private $messageBus; private $failureSender;
private $failureSenderAlias;
private $logger; private $logger;
public function __construct(MessageBusInterface $messageBus, string $failureSenderAlias, LoggerInterface $logger = null) public function __construct(SenderInterface $failureSender, LoggerInterface $logger = null)
{ {
$this->messageBus = $messageBus; $this->failureSender = $failureSender;
$this->failureSenderAlias = $failureSenderAlias;
$this->logger = $logger; $this->logger = $logger;
} }
@ -53,27 +49,26 @@ class SendFailedMessageToFailureTransportListener implements EventSubscriberInte
return; return;
} }
// remove the received stamp so it's redelivered
$throwable = $event->getThrowable(); $throwable = $event->getThrowable();
if ($throwable instanceof HandlerFailedException) { if ($throwable instanceof HandlerFailedException) {
$throwable = $throwable->getNestedExceptions()[0]; $throwable = $throwable->getNestedExceptions()[0];
} }
$flattenedException = class_exists(FlattenException::class) ? FlattenException::createFromThrowable($throwable) : null; $flattenedException = class_exists(FlattenException::class) ? FlattenException::createFromThrowable($throwable) : null;
$envelope = $envelope->withoutAll(ReceivedStamp::class) $envelope = $envelope->with(
->withoutAll(TransportMessageIdStamp::class) new SentToFailureTransportStamp($event->getReceiverName()),
->with(new SentToFailureTransportStamp($event->getReceiverName())) new DelayStamp(0),
->with(new DelayStamp(0)) new RedeliveryStamp(0, $throwable->getMessage(), $flattenedException)
->with(new RedeliveryStamp(0, $this->failureSenderAlias, $throwable->getMessage(), $flattenedException)); );
if (null !== $this->logger) { if (null !== $this->logger) {
$this->logger->info('Rejected message {class} will be sent to the failure transport {transport}.', [ $this->logger->info('Rejected message {class} will be sent to the failure transport {transport}.', [
'class' => \get_class($envelope->getMessage()), 'class' => \get_class($envelope->getMessage()),
'transport' => $this->failureSenderAlias, 'transport' => \get_class($this->failureSender),
]); ]);
} }
$this->messageBus->dispatch($envelope); $this->failureSender->send($envelope);
} }
public static function getSubscribedEvents() public static function getSubscribedEvents()

View File

@ -1,19 +0,0 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Exception;
/**
* @author Ryan Weaver <ryan@symfonycasts.com>
*/
class UnknownSenderException extends InvalidArgumentException
{
}

View File

@ -13,7 +13,6 @@ namespace Symfony\Component\Messenger\Middleware;
use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\RejectRedeliveredMessageException; use Symfony\Component\Messenger\Exception\RejectRedeliveredMessageException;
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceivedStamp; use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceivedStamp;
/** /**
@ -34,13 +33,10 @@ class RejectRedeliveredMessageMiddleware implements MiddlewareInterface
{ {
public function handle(Envelope $envelope, StackInterface $stack): Envelope public function handle(Envelope $envelope, StackInterface $stack): Envelope
{ {
// ignore the dispatched messages for retry $amqpReceivedStamp = $envelope->last(AmqpReceivedStamp::class);
if (null !== $envelope->last(ReceivedStamp::class)) {
$amqpReceivedStamp = $envelope->last(AmqpReceivedStamp::class);
if ($amqpReceivedStamp instanceof AmqpReceivedStamp && $amqpReceivedStamp->getAmqpEnvelope()->isRedelivery()) { if ($amqpReceivedStamp instanceof AmqpReceivedStamp && $amqpReceivedStamp->getAmqpEnvelope()->isRedelivery()) {
throw new RejectRedeliveredMessageException('Redelivered message from AMQP detected that will be rejected and trigger the retry logic.'); throw new RejectRedeliveredMessageException('Redelivered message from AMQP detected that will be rejected and trigger the retry logic.');
}
} }
return $stack->next()->handle($envelope, $stack); return $stack->next()->handle($envelope, $stack);

View File

@ -17,9 +17,7 @@ use Symfony\Component\EventDispatcher\LegacyEventDispatcherProxy;
use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Event\SendMessageToTransportsEvent; use Symfony\Component\Messenger\Event\SendMessageToTransportsEvent;
use Symfony\Component\Messenger\Stamp\ReceivedStamp; use Symfony\Component\Messenger\Stamp\ReceivedStamp;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Stamp\SentStamp; use Symfony\Component\Messenger\Stamp\SentStamp;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
use Symfony\Component\Messenger\Transport\Sender\SendersLocatorInterface; use Symfony\Component\Messenger\Transport\Sender\SendersLocatorInterface;
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface; use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
@ -57,12 +55,8 @@ class SendMessageMiddleware implements MiddlewareInterface
// it's a received message, do not send it back // it's a received message, do not send it back
$this->logger->info('Received message {class}', $context); $this->logger->info('Received message {class}', $context);
} else { } else {
/** @var RedeliveryStamp|null $redeliveryStamp */ $shouldDispatchEvent = true;
$redeliveryStamp = $envelope->last(RedeliveryStamp::class); foreach ($this->sendersLocator->getSenders($envelope) as $alias => $sender) {
// dispatch event unless this is a redelivery
$shouldDispatchEvent = null === $redeliveryStamp;
foreach ($this->getSenders($envelope, $redeliveryStamp) as $alias => $sender) {
if (null !== $this->eventDispatcher && $shouldDispatchEvent) { if (null !== $this->eventDispatcher && $shouldDispatchEvent) {
$event = new SendMessageToTransportsEvent($envelope); $event = new SendMessageToTransportsEvent($envelope);
$this->eventDispatcher->dispatch($event); $this->eventDispatcher->dispatch($event);
@ -82,18 +76,4 @@ class SendMessageMiddleware implements MiddlewareInterface
// message should only be sent and not be handled by the next middleware // message should only be sent and not be handled by the next middleware
return $envelope; return $envelope;
} }
/**
* * @return iterable|SenderInterface[]
*/
private function getSenders(Envelope $envelope, ?RedeliveryStamp $redeliveryStamp): iterable
{
if (null !== $redeliveryStamp) {
return [
$redeliveryStamp->getSenderClassOrAlias() => $this->sendersLocator->getSenderByAlias($redeliveryStamp->getSenderClassOrAlias()),
];
}
return $this->sendersLocator->getSenders($envelope);
}
} }

View File

@ -20,18 +20,13 @@ use Symfony\Component\Messenger\Envelope;
final class RedeliveryStamp implements StampInterface final class RedeliveryStamp implements StampInterface
{ {
private $retryCount; private $retryCount;
private $senderClassOrAlias;
private $redeliveredAt; private $redeliveredAt;
private $exceptionMessage; private $exceptionMessage;
private $flattenException; private $flattenException;
/** public function __construct(int $retryCount, string $exceptionMessage = null, FlattenException $flattenException = null)
* @param string $senderClassOrAlias Alias from SendersLocator or just the class name
*/
public function __construct(int $retryCount, string $senderClassOrAlias, string $exceptionMessage = null, FlattenException $flattenException = null)
{ {
$this->retryCount = $retryCount; $this->retryCount = $retryCount;
$this->senderClassOrAlias = $senderClassOrAlias;
$this->exceptionMessage = $exceptionMessage; $this->exceptionMessage = $exceptionMessage;
$this->flattenException = $flattenException; $this->flattenException = $flattenException;
$this->redeliveredAt = new \DateTimeImmutable(); $this->redeliveredAt = new \DateTimeImmutable();
@ -50,16 +45,6 @@ final class RedeliveryStamp implements StampInterface
return $this->retryCount; return $this->retryCount;
} }
/**
* The target sender this should be redelivered to.
*
* @internal
*/
public function getSenderClassOrAlias(): string
{
return $this->senderClassOrAlias;
}
public function getExceptionMessage(): ?string public function getExceptionMessage(): ?string
{ {
return $this->exceptionMessage; return $this->exceptionMessage;

View File

@ -28,7 +28,7 @@ class FailedMessagesShowCommandTest extends TestCase
public function testBasicRun() public function testBasicRun()
{ {
$sentToFailureStamp = new SentToFailureTransportStamp('async'); $sentToFailureStamp = new SentToFailureTransportStamp('async');
$redeliveryStamp = new RedeliveryStamp(0, 'failure_receiver', 'Things are bad!'); $redeliveryStamp = new RedeliveryStamp(0, 'Things are bad!');
$envelope = new Envelope(new \stdClass(), [ $envelope = new Envelope(new \stdClass(), [
new TransportMessageIdStamp(15), new TransportMessageIdStamp(15),
$sentToFailureStamp, $sentToFailureStamp,
@ -62,8 +62,8 @@ EOF
public function testMultipleRedeliveryFails() public function testMultipleRedeliveryFails()
{ {
$sentToFailureStamp = new SentToFailureTransportStamp('async'); $sentToFailureStamp = new SentToFailureTransportStamp('async');
$redeliveryStamp1 = new RedeliveryStamp(0, 'failure_receiver', 'Things are bad!'); $redeliveryStamp1 = new RedeliveryStamp(0, 'Things are bad!');
$redeliveryStamp2 = new RedeliveryStamp(0, 'failure_receiver'); $redeliveryStamp2 = new RedeliveryStamp(0);
$envelope = new Envelope(new \stdClass(), [ $envelope = new Envelope(new \stdClass(), [
new TransportMessageIdStamp(15), new TransportMessageIdStamp(15),
$sentToFailureStamp, $sentToFailureStamp,

View File

@ -281,7 +281,6 @@ class MessengerPassTest extends TestCase
$container->register('console.command.messenger_consume_messages', ConsumeMessagesCommand::class)->setArguments([ $container->register('console.command.messenger_consume_messages', ConsumeMessagesCommand::class)->setArguments([
null, null,
new Reference('messenger.receiver_locator'), new Reference('messenger.receiver_locator'),
new Reference('messenger.retry_strategy_locator'),
null, null,
null, null,
null, null,

View File

@ -0,0 +1,66 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Tests\EventListener;
use PHPUnit\Framework\TestCase;
use Psr\Container\ContainerInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
use Symfony\Component\Messenger\EventListener\SendFailedMessageForRetryListener;
use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
use Symfony\Component\Messenger\Stamp\DelayStamp;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
class SendFailedMessageForRetryListenerTest extends TestCase
{
public function testNoRetryStrategyCausesNoRetry()
{
$senderLocator = $this->createMock(ContainerInterface::class);
$senderLocator->expects($this->never())->method('has');
$senderLocator->expects($this->never())->method('get');
$retryStrategyLocator = $this->createMock(ContainerInterface::class);
$retryStrategyLocator->expects($this->once())->method('has')->willReturn(false);
$listener = new SendFailedMessageForRetryListener($senderLocator, $retryStrategyLocator);
$exception = new \Exception('no!');
$envelope = new Envelope(new \stdClass());
$event = new WorkerMessageFailedEvent($envelope, 'my_receiver', $exception);
$listener->onMessageFailed($event);
}
public function testEnvelopeIsSentToTransportOnRetry()
{
$exception = new \Exception('no!');
$envelope = new Envelope(new \stdClass());
$sender = $this->createMock(SenderInterface::class);
$sender->expects($this->once())->method('send')->with($envelope->with(new DelayStamp(1000), new RedeliveryStamp(1)))->willReturnArgument(0);
$senderLocator = $this->createMock(ContainerInterface::class);
$senderLocator->expects($this->once())->method('has')->willReturn(true);
$senderLocator->expects($this->never())->method('get')->willReturn($sender);
$retryStategy = $this->createMock(RetryStrategyInterface::class);
$retryStategy->expects($this->once())->method('isRetryable')->willReturn(true);
$retryStategy->expects($this->once())->method('getWaitingTime')->willReturn(1000);
$retryStrategyLocator = $this->createMock(ContainerInterface::class);
$retryStrategyLocator->expects($this->once())->method('has')->willReturn(true);
$retryStrategyLocator->expects($this->once())->method('get')->willReturn($retryStategy);
$listener = new SendFailedMessageForRetryListener($senderLocator, $retryStrategyLocator);
$event = new WorkerMessageFailedEvent($envelope, 'my_receiver', $exception);
$listener->onMessageFailed($event);
}
}

View File

@ -9,25 +9,23 @@
* file that was distributed with this source code. * file that was distributed with this source code.
*/ */
namespace Symfony\Component\Messenger\Tests\Handler; namespace Symfony\Component\Messenger\Tests\EventListener;
use PHPUnit\Framework\TestCase; use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent; use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
use Symfony\Component\Messenger\EventListener\SendFailedMessageToFailureTransportListener; use Symfony\Component\Messenger\EventListener\SendFailedMessageToFailureTransportListener;
use Symfony\Component\Messenger\Exception\HandlerFailedException; use Symfony\Component\Messenger\Exception\HandlerFailedException;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp; use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp; use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp; use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
class SendFailedMessageToFailureTransportListenerTest extends TestCase class SendFailedMessageToFailureTransportListenerTest extends TestCase
{ {
public function testItDispatchesToTheFailureTransport() public function testItSendsToTheFailureTransport()
{ {
$bus = $this->createMock(MessageBusInterface::class); $sender = $this->createMock(SenderInterface::class);
$bus->expects($this->once())->method('dispatch')->with($this->callback(function ($envelope) { $sender->expects($this->once())->method('send')->with($this->callback(function ($envelope) {
/* @var Envelope $envelope */ /* @var Envelope $envelope */
$this->assertInstanceOf(Envelope::class, $envelope); $this->assertInstanceOf(Envelope::class, $envelope);
@ -38,31 +36,24 @@ class SendFailedMessageToFailureTransportListenerTest extends TestCase
/** @var RedeliveryStamp $redeliveryStamp */ /** @var RedeliveryStamp $redeliveryStamp */
$redeliveryStamp = $envelope->last(RedeliveryStamp::class); $redeliveryStamp = $envelope->last(RedeliveryStamp::class);
$this->assertSame('failure_sender', $redeliveryStamp->getSenderClassOrAlias());
$this->assertSame('no!', $redeliveryStamp->getExceptionMessage()); $this->assertSame('no!', $redeliveryStamp->getExceptionMessage());
$this->assertSame('no!', $redeliveryStamp->getFlattenException()->getMessage()); $this->assertSame('no!', $redeliveryStamp->getFlattenException()->getMessage());
$this->assertNull($envelope->last(ReceivedStamp::class));
$this->assertNull($envelope->last(TransportMessageIdStamp::class));
return true; return true;
}))->willReturn(new Envelope(new \stdClass())); }))->willReturnArgument(0);
$listener = new SendFailedMessageToFailureTransportListener( $listener = new SendFailedMessageToFailureTransportListener($sender);
$bus,
'failure_sender'
);
$exception = new \Exception('no!'); $exception = new \Exception('no!');
$envelope = new Envelope(new \stdClass()); $envelope = new Envelope(new \stdClass());
$event = new WorkerMessageFailedEvent($envelope, 'my_receiver', $exception, false); $event = new WorkerMessageFailedEvent($envelope, 'my_receiver', $exception);
$listener->onMessageFailed($event); $listener->onMessageFailed($event);
} }
public function testItGetsNestedHandlerFailedException() public function testItGetsNestedHandlerFailedException()
{ {
$bus = $this->createMock(MessageBusInterface::class); $sender = $this->createMock(SenderInterface::class);
$bus->expects($this->once())->method('dispatch')->with($this->callback(function ($envelope) { $sender->expects($this->once())->method('send')->with($this->callback(function ($envelope) {
/** @var Envelope $envelope */ /** @var Envelope $envelope */
/** @var RedeliveryStamp $redeliveryStamp */ /** @var RedeliveryStamp $redeliveryStamp */
$redeliveryStamp = $envelope->last(RedeliveryStamp::class); $redeliveryStamp = $envelope->last(RedeliveryStamp::class);
@ -71,49 +62,41 @@ class SendFailedMessageToFailureTransportListenerTest extends TestCase
$this->assertSame('Exception', $redeliveryStamp->getFlattenException()->getClass()); $this->assertSame('Exception', $redeliveryStamp->getFlattenException()->getClass());
return true; return true;
}))->willReturn(new Envelope(new \stdClass())); }))->willReturnArgument(0);
$listener = new SendFailedMessageToFailureTransportListener( $listener = new SendFailedMessageToFailureTransportListener($sender);
$bus,
'failure_sender'
);
$envelope = new Envelope(new \stdClass()); $envelope = new Envelope(new \stdClass());
$exception = new \Exception('I am inside!'); $exception = new \Exception('I am inside!');
$exception = new HandlerFailedException($envelope, [$exception]); $exception = new HandlerFailedException($envelope, [$exception]);
$event = new WorkerMessageFailedEvent($envelope, 'my_receiver', $exception, false); $event = new WorkerMessageFailedEvent($envelope, 'my_receiver', $exception);
$listener->onMessageFailed($event); $listener->onMessageFailed($event);
} }
public function testDoNothingOnRetry() public function testDoNothingOnRetry()
{ {
$bus = $this->createMock(MessageBusInterface::class); $sender = $this->createMock(SenderInterface::class);
$bus->expects($this->never())->method('dispatch'); $sender->expects($this->never())->method('send');
$listener = new SendFailedMessageToFailureTransportListener( $listener = new SendFailedMessageToFailureTransportListener($sender);
$bus,
'failure_sender'
);
$envelope = new Envelope(new \stdClass()); $envelope = new Envelope(new \stdClass());
$event = new WorkerMessageFailedEvent($envelope, 'my_receiver', new \Exception(''), true); $event = new WorkerMessageFailedEvent($envelope, 'my_receiver', new \Exception());
$event->setForRetry();
$listener->onMessageFailed($event); $listener->onMessageFailed($event);
} }
public function testDoNotRedeliverToFailed() public function testDoNotRedeliverToFailed()
{ {
$bus = $this->createMock(MessageBusInterface::class); $sender = $this->createMock(SenderInterface::class);
$bus->expects($this->never())->method('dispatch'); $sender->expects($this->never())->method('send');
$listener = new SendFailedMessageToFailureTransportListener( $listener = new SendFailedMessageToFailureTransportListener($sender);
$bus,
'failure_sender'
);
$envelope = new Envelope(new \stdClass(), [ $envelope = new Envelope(new \stdClass(), [
new SentToFailureTransportStamp('my_receiver'), new SentToFailureTransportStamp('my_receiver'),
]); ]);
$event = new WorkerMessageFailedEvent($envelope, 'my_receiver', new \Exception(''), false); $event = new WorkerMessageFailedEvent($envelope, 'my_receiver', new \Exception());
$listener->onMessageFailed($event); $listener->onMessageFailed($event);
} }

View File

@ -13,10 +13,10 @@ namespace Symfony\Component\Messenger\Tests;
use PHPUnit\Framework\TestCase; use PHPUnit\Framework\TestCase;
use Psr\Container\ContainerInterface; use Psr\Container\ContainerInterface;
use Symfony\Component\DependencyInjection\Container;
use Symfony\Component\EventDispatcher\EventDispatcher; use Symfony\Component\EventDispatcher\EventDispatcher;
use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent; use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
use Symfony\Component\Messenger\EventListener\SendFailedMessageForRetryListener;
use Symfony\Component\Messenger\EventListener\SendFailedMessageToFailureTransportListener; use Symfony\Component\Messenger\EventListener\SendFailedMessageToFailureTransportListener;
use Symfony\Component\Messenger\Exception\HandlerFailedException; use Symfony\Component\Messenger\Exception\HandlerFailedException;
use Symfony\Component\Messenger\Handler\HandlerDescriptor; use Symfony\Component\Messenger\Handler\HandlerDescriptor;
@ -61,11 +61,18 @@ class FailureIntegrationTest extends TestCase
$locator $locator
); );
$retryStrategyLocator = $this->createMock(ContainerInterface::class);
$retryStrategyLocator->expects($this->any())
->method('has')
->willReturn(true);
$retryStrategyLocator->expects($this->any())
->method('get')
->willReturn(new MultiplierRetryStrategy(1));
// using to so we can lazily get the bus later and avoid circular problem // using to so we can lazily get the bus later and avoid circular problem
$transport1HandlerThatFails = new DummyTestHandler(true); $transport1HandlerThatFails = new DummyTestHandler(true);
$allTransportHandlerThatWorks = new DummyTestHandler(false); $allTransportHandlerThatWorks = new DummyTestHandler(false);
$transport2HandlerThatWorks = new DummyTestHandler(false); $transport2HandlerThatWorks = new DummyTestHandler(false);
$container = new Container();
$handlerLocator = new HandlersLocator([ $handlerLocator = new HandlersLocator([
DummyMessage::class => [ DummyMessage::class => [
new HandlerDescriptor($transport1HandlerThatFails, [ new HandlerDescriptor($transport1HandlerThatFails, [
@ -88,17 +95,17 @@ class FailureIntegrationTest extends TestCase
new SendMessageMiddleware($senderLocator), new SendMessageMiddleware($senderLocator),
new HandleMessageMiddleware($handlerLocator), new HandleMessageMiddleware($handlerLocator),
]); ]);
$container->set('bus', $bus); $dispatcher->addSubscriber(new SendFailedMessageForRetryListener($locator, $retryStrategyLocator));
$dispatcher->addSubscriber(new SendFailedMessageToFailureTransportListener($bus, 'the_failure_transport')); $dispatcher->addSubscriber(new SendFailedMessageToFailureTransportListener($failureTransport));
$runWorker = function (string $transportName, int $maxRetries) use ($transports, $bus, $dispatcher): ?\Throwable { $runWorker = function (string $transportName) use ($transports, $bus, $dispatcher): ?\Throwable {
$throwable = null; $throwable = null;
$failedListener = function (WorkerMessageFailedEvent $event) use (&$throwable) { $failedListener = function (WorkerMessageFailedEvent $event) use (&$throwable) {
$throwable = $event->getThrowable(); $throwable = $event->getThrowable();
}; };
$dispatcher->addListener(WorkerMessageFailedEvent::class, $failedListener); $dispatcher->addListener(WorkerMessageFailedEvent::class, $failedListener);
$worker = new Worker([$transportName => $transports[$transportName]], $bus, [$transportName => new MultiplierRetryStrategy($maxRetries)], $dispatcher); $worker = new Worker([$transportName => $transports[$transportName]], $bus, $dispatcher);
$worker->run([], function (?Envelope $envelope) use ($worker) { $worker->run([], function (?Envelope $envelope) use ($worker) {
// handle one envelope, then stop // handle one envelope, then stop
@ -126,7 +133,7 @@ class FailureIntegrationTest extends TestCase
/* /*
* Receive the message from "transport1" * Receive the message from "transport1"
*/ */
$throwable = $runWorker('transport1', 1); $throwable = $runWorker('transport1');
// make sure this is failing for the reason we think // make sure this is failing for the reason we think
$this->assertInstanceOf(HandlerFailedException::class, $throwable); $this->assertInstanceOf(HandlerFailedException::class, $throwable);
// handler for transport1 and all transports were called // handler for transport1 and all transports were called
@ -140,7 +147,7 @@ class FailureIntegrationTest extends TestCase
/* /*
* Receive the message for a (final) retry * Receive the message for a (final) retry
*/ */
$runWorker('transport1', 1); $runWorker('transport1');
// only the "failed" handler is called a 2nd time // only the "failed" handler is called a 2nd time
$this->assertSame(2, $transport1HandlerThatFails->getTimesCalled()); $this->assertSame(2, $transport1HandlerThatFails->getTimesCalled());
$this->assertSame(1, $allTransportHandlerThatWorks->getTimesCalled()); $this->assertSame(1, $allTransportHandlerThatWorks->getTimesCalled());
@ -160,7 +167,7 @@ class FailureIntegrationTest extends TestCase
/* /*
* Failed message is handled, fails, and sent for a retry * Failed message is handled, fails, and sent for a retry
*/ */
$throwable = $runWorker('the_failure_transport', 1); $throwable = $runWorker('the_failure_transport');
// make sure this is failing for the reason we think // make sure this is failing for the reason we think
$this->assertInstanceOf(HandlerFailedException::class, $throwable); $this->assertInstanceOf(HandlerFailedException::class, $throwable);
// only the "failed" handler is called a 3rd time // only the "failed" handler is called a 3rd time
@ -175,7 +182,7 @@ class FailureIntegrationTest extends TestCase
/* /*
* Message is retried on failure transport then discarded * Message is retried on failure transport then discarded
*/ */
$runWorker('the_failure_transport', 1); $runWorker('the_failure_transport');
// only the "failed" handler is called a 4th time // only the "failed" handler is called a 4th time
$this->assertSame(4, $transport1HandlerThatFails->getTimesCalled()); $this->assertSame(4, $transport1HandlerThatFails->getTimesCalled());
$this->assertSame(1, $allTransportHandlerThatWorks->getTimesCalled()); $this->assertSame(1, $allTransportHandlerThatWorks->getTimesCalled());
@ -185,7 +192,7 @@ class FailureIntegrationTest extends TestCase
/* /*
* Execute handlers on transport2 * Execute handlers on transport2
*/ */
$runWorker('transport2', 1); $runWorker('transport2');
// transport1 handler is not called again // transport1 handler is not called again
$this->assertSame(4, $transport1HandlerThatFails->getTimesCalled()); $this->assertSame(4, $transport1HandlerThatFails->getTimesCalled());
// all transport handler is now called again // all transport handler is now called again
@ -202,10 +209,10 @@ class FailureIntegrationTest extends TestCase
*/ */
$bus->dispatch($envelope); $bus->dispatch($envelope);
// handle the message, but with no retries // handle the message, but with no retries
$runWorker('transport1', 0); $runWorker('transport1');
// now make the handler work! // now make the handler work!
$transport1HandlerThatFails->setShouldThrow(false); $transport1HandlerThatFails->setShouldThrow(false);
$runWorker('the_failure_transport', 1); $runWorker('the_failure_transport');
// the failure transport is empty because it worked // the failure transport is empty because it worked
$this->assertEmpty($failureTransport->getMessagesWaitingToBeReceived()); $this->assertEmpty($failureTransport->getMessagesWaitingToBeReceived());
} }

View File

@ -1,39 +0,0 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Tests\Fixtures;
class DummyMessageHandlerFailingFirstTimes
{
private $remainingFailures;
private $called = 0;
public function __construct(int $throwExceptionOnFirstTries = 0)
{
$this->remainingFailures = $throwExceptionOnFirstTries;
}
public function __invoke(DummyMessage $message)
{
if ($this->remainingFailures > 0) {
--$this->remainingFailures;
throw new \Exception('Handler should throw Exception.');
}
++$this->called;
}
public function getTimesCalledWithoutThrowing(): int
{
return $this->called;
}
}

View File

@ -16,7 +16,6 @@ use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Event\SendMessageToTransportsEvent; use Symfony\Component\Messenger\Event\SendMessageToTransportsEvent;
use Symfony\Component\Messenger\Middleware\SendMessageMiddleware; use Symfony\Component\Messenger\Middleware\SendMessageMiddleware;
use Symfony\Component\Messenger\Stamp\ReceivedStamp; use Symfony\Component\Messenger\Stamp\ReceivedStamp;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Stamp\SentStamp; use Symfony\Component\Messenger\Stamp\SentStamp;
use Symfony\Component\Messenger\Test\Middleware\MiddlewareTestCase; use Symfony\Component\Messenger\Test\Middleware\MiddlewareTestCase;
use Symfony\Component\Messenger\Tests\Fixtures\ChildDummyMessage; use Symfony\Component\Messenger\Tests\Fixtures\ChildDummyMessage;
@ -84,31 +83,6 @@ class SendMessageMiddlewareTest extends MiddlewareTestCase
$this->assertCount(2, $sentStamps); $this->assertCount(2, $sentStamps);
} }
public function testItSendsToOnlyOneSenderOnRedelivery()
{
$envelope = new Envelope(new DummyMessage('Hey'), [new RedeliveryStamp(5, 'bar')]);
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
$sender2 = $this->getMockBuilder(SenderInterface::class)->getMock();
$sendersLocator = $this->createSendersLocator(
[DummyMessage::class => ['foo', 'bar']],
['foo' => $sender, 'bar' => $sender2]
);
$middleware = new SendMessageMiddleware($sendersLocator);
$sender->expects($this->never())
->method('send')
;
$sender2->expects($this->once())
->method('send')
->willReturnArgument(0);
$mockStack = $this->getStackMock(false); // false because next should not be called
$envelope = $middleware->handle($envelope, $mockStack);
$this->assertCount(1, $envelope->all(SentStamp::class));
}
public function testItSendsTheMessageToAssignedSenderWithPreWrappedMessage() public function testItSendsTheMessageToAssignedSenderWithPreWrappedMessage()
{ {
$envelope = new Envelope(new ChildDummyMessage('Hey')); $envelope = new Envelope(new ChildDummyMessage('Hey'));
@ -223,24 +197,6 @@ class SendMessageMiddlewareTest extends MiddlewareTestCase
$middleware->handle($envelope, $this->getStackMock()); $middleware->handle($envelope, $this->getStackMock());
} }
public function testItDoesNotDispatchOnRedeliver()
{
$envelope = new Envelope(new DummyMessage('original envelope'));
$envelope = $envelope->with(new RedeliveryStamp(3, 'foo_sender'));
$dispatcher = $this->createMock(EventDispatcherInterface::class);
$dispatcher->expects($this->never())->method('dispatch');
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
$sender2 = $this->getMockBuilder(SenderInterface::class)->getMock();
$sender2->expects($this->once())->method('send')->willReturn(new Envelope(new \stdClass()));
$sendersLocator = $this->createSendersLocator([DummyMessage::class => ['foo']], ['foo' => $sender, 'foo_sender' => $sender2]);
$middleware = new SendMessageMiddleware($sendersLocator, $dispatcher);
$middleware->handle($envelope, $this->getStackMock(false));
}
private function createSendersLocator(array $sendersMap, array $senders): SendersLocator private function createSendersLocator(array $sendersMap, array $senders): SendersLocator
{ {
$container = $this->createMock(ContainerInterface::class); $container = $this->createMock(ContainerInterface::class);

View File

@ -21,7 +21,7 @@ class MultiplierRetryStrategyTest extends TestCase
public function testIsRetryable() public function testIsRetryable()
{ {
$strategy = new MultiplierRetryStrategy(3); $strategy = new MultiplierRetryStrategy(3);
$envelope = new Envelope(new \stdClass(), [new RedeliveryStamp(0, 'sender_alias')]); $envelope = new Envelope(new \stdClass(), [new RedeliveryStamp(0)]);
$this->assertTrue($strategy->isRetryable($envelope)); $this->assertTrue($strategy->isRetryable($envelope));
} }
@ -29,7 +29,7 @@ class MultiplierRetryStrategyTest extends TestCase
public function testIsNotRetryable() public function testIsNotRetryable()
{ {
$strategy = new MultiplierRetryStrategy(3); $strategy = new MultiplierRetryStrategy(3);
$envelope = new Envelope(new \stdClass(), [new RedeliveryStamp(3, 'sender_alias')]); $envelope = new Envelope(new \stdClass(), [new RedeliveryStamp(3)]);
$this->assertFalse($strategy->isRetryable($envelope)); $this->assertFalse($strategy->isRetryable($envelope));
} }
@ -37,7 +37,7 @@ class MultiplierRetryStrategyTest extends TestCase
public function testIsNotRetryableWithZeroMax() public function testIsNotRetryableWithZeroMax()
{ {
$strategy = new MultiplierRetryStrategy(0); $strategy = new MultiplierRetryStrategy(0);
$envelope = new Envelope(new \stdClass(), [new RedeliveryStamp(0, 'sender_alias')]); $envelope = new Envelope(new \stdClass(), [new RedeliveryStamp(0)]);
$this->assertFalse($strategy->isRetryable($envelope)); $this->assertFalse($strategy->isRetryable($envelope));
} }
@ -55,7 +55,7 @@ class MultiplierRetryStrategyTest extends TestCase
public function testGetWaitTime(int $delay, int $multiplier, int $maxDelay, int $previousRetries, int $expectedDelay) public function testGetWaitTime(int $delay, int $multiplier, int $maxDelay, int $previousRetries, int $expectedDelay)
{ {
$strategy = new MultiplierRetryStrategy(10, $delay, $multiplier, $maxDelay); $strategy = new MultiplierRetryStrategy(10, $delay, $multiplier, $maxDelay);
$envelope = new Envelope(new \stdClass(), [new RedeliveryStamp($previousRetries, 'sender_alias')]); $envelope = new Envelope(new \stdClass(), [new RedeliveryStamp($previousRetries)]);
$this->assertSame($expectedDelay, $strategy->getWaitingTime($envelope)); $this->assertSame($expectedDelay, $strategy->getWaitingTime($envelope));
} }

View File

@ -1,100 +0,0 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Tests;
use PHPUnit\Framework\TestCase;
use Psr\Container\ContainerInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Handler\HandlerDescriptor;
use Symfony\Component\Messenger\Handler\HandlersLocator;
use Symfony\Component\Messenger\MessageBus;
use Symfony\Component\Messenger\Middleware\HandleMessageMiddleware;
use Symfony\Component\Messenger\Middleware\SendMessageMiddleware;
use Symfony\Component\Messenger\Retry\MultiplierRetryStrategy;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessageHandlerFailingFirstTimes;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
use Symfony\Component\Messenger\Transport\Sender\SendersLocator;
use Symfony\Component\Messenger\Worker;
class RetryIntegrationTest extends TestCase
{
public function testRetryMechanism()
{
$senderAndReceiver = new DummySenderAndReceiver();
$senderLocator = $this->createMock(ContainerInterface::class);
$senderLocator->method('has')->with('transportName')->willReturn(true);
$senderLocator->method('get')->with('transportName')->willReturn($senderAndReceiver);
$senderLocator = new SendersLocator([DummyMessage::class => ['transportName']], $senderLocator);
$handler = new DummyMessageHandlerFailingFirstTimes(0);
$throwingHandler = new DummyMessageHandlerFailingFirstTimes(1);
$handlerLocator = new HandlersLocator([
DummyMessage::class => [
new HandlerDescriptor($handler, ['alias' => 'first']),
new HandlerDescriptor($throwingHandler, ['alias' => 'throwing']),
],
]);
// dispatch the message, which will get "sent" and then received by DummySenderAndReceiver
$bus = new MessageBus([new SendMessageMiddleware($senderLocator), new HandleMessageMiddleware($handlerLocator)]);
$envelope = new Envelope(new DummyMessage('API'));
$bus->dispatch($envelope);
$worker = new Worker(['transportName' => $senderAndReceiver], $bus, ['transportName' => new MultiplierRetryStrategy()]);
$worker->run([], function (?Envelope $envelope) use ($worker) {
if (null === $envelope) {
$worker->stop();
}
});
$this->assertSame(1, $handler->getTimesCalledWithoutThrowing());
$this->assertSame(1, $throwingHandler->getTimesCalledWithoutThrowing());
}
}
class DummySenderAndReceiver implements ReceiverInterface, SenderInterface
{
private $messagesWaiting = [];
private $messagesReceived = [];
public function get(): iterable
{
$message = array_shift($this->messagesWaiting);
if (null === $message) {
return [];
}
$this->messagesReceived[] = $message;
return [$message];
}
public function ack(Envelope $envelope): void
{
}
public function reject(Envelope $envelope): void
{
}
public function send(Envelope $envelope): Envelope
{
$this->messagesWaiting[] = $envelope;
return $envelope;
}
}

View File

@ -19,9 +19,8 @@ class RedeliveryStampTest extends TestCase
{ {
public function testGetters() public function testGetters()
{ {
$stamp = new RedeliveryStamp(10, 'sender_alias'); $stamp = new RedeliveryStamp(10);
$this->assertSame(10, $stamp->getRetryCount()); $this->assertSame(10, $stamp->getRetryCount());
$this->assertSame('sender_alias', $stamp->getSenderClassOrAlias());
$this->assertInstanceOf(\DateTimeInterface::class, $stamp->getRedeliveredAt()); $this->assertInstanceOf(\DateTimeInterface::class, $stamp->getRedeliveredAt());
$this->assertNull($stamp->getExceptionMessage()); $this->assertNull($stamp->getExceptionMessage());
$this->assertNull($stamp->getFlattenException()); $this->assertNull($stamp->getFlattenException());
@ -30,7 +29,7 @@ class RedeliveryStampTest extends TestCase
public function testGettersPopulated() public function testGettersPopulated()
{ {
$flattenException = new FlattenException(); $flattenException = new FlattenException();
$stamp = new RedeliveryStamp(10, 'sender_alias', 'exception message', $flattenException); $stamp = new RedeliveryStamp(10, 'exception message', $flattenException);
$this->assertSame('exception message', $stamp->getExceptionMessage()); $this->assertSame('exception message', $stamp->getExceptionMessage());
$this->assertSame($flattenException, $stamp->getFlattenException()); $this->assertSame($flattenException, $stamp->getFlattenException());
} }

View File

@ -95,7 +95,7 @@ class AmqpExtIntegrationTest extends TestCase
$envelope = $envelopes[0]; $envelope = $envelopes[0];
$newEnvelope = $envelope $newEnvelope = $envelope
->with(new DelayStamp(2000)) ->with(new DelayStamp(2000))
->with(new RedeliveryStamp(1, 'not_important')); ->with(new RedeliveryStamp(1));
$sender->send($newEnvelope); $sender->send($newEnvelope);
$receiver->ack($envelope); $receiver->ack($envelope);

View File

@ -14,7 +14,6 @@ namespace Symfony\Component\Messenger\Tests\Transport\Sender;
use PHPUnit\Framework\TestCase; use PHPUnit\Framework\TestCase;
use Psr\Container\ContainerInterface; use Psr\Container\ContainerInterface;
use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\UnknownSenderException;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Tests\Fixtures\SecondMessage; use Symfony\Component\Messenger\Tests\Fixtures\SecondMessage;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface; use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
@ -36,35 +35,6 @@ class SendersLocatorTest extends TestCase
$this->assertSame([], iterator_to_array($locator->getSenders(new Envelope(new SecondMessage())))); $this->assertSame([], iterator_to_array($locator->getSenders(new Envelope(new SecondMessage()))));
} }
public function testGetSenderByAlias()
{
$sender1 = $this->getMockBuilder(SenderInterface::class)->getMock();
$sender2 = $this->getMockBuilder(SenderInterface::class)->getMock();
$sendersLocator = $this->createContainer([
'sender1' => $sender1,
'sender2' => $sender2,
]);
$locator = new SendersLocator([], $sendersLocator);
$this->assertSame($sender1, $locator->getSenderByAlias('sender1'));
$this->assertSame($sender2, $locator->getSenderByAlias('sender2'));
}
public function testGetSenderByAliasThrowsException()
{
$this->expectException(UnknownSenderException::class);
$this->expectExceptionMessage('Unknown sender alias');
$sender1 = $this->getMockBuilder(SenderInterface::class)->getMock();
$sendersLocator = $this->createContainer([
'sender1' => $sender1,
]);
$locator = new SendersLocator([], $sendersLocator);
$locator->getSenderByAlias('sender2');
}
/** /**
* @group legacy * @group legacy
*/ */

View File

@ -17,13 +17,9 @@ use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent; use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent;
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent; use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
use Symfony\Component\Messenger\Event\WorkerStoppedEvent; use Symfony\Component\Messenger\Event\WorkerStoppedEvent;
use Symfony\Component\Messenger\Exception\HandlerFailedException;
use Symfony\Component\Messenger\Exception\UnrecoverableMessageHandlingException;
use Symfony\Component\Messenger\MessageBusInterface; use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp; use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp;
use Symfony\Component\Messenger\Stamp\ReceivedStamp; use Symfony\Component\Messenger\Stamp\ReceivedStamp;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Stamp\SentStamp; use Symfony\Component\Messenger\Stamp\SentStamp;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface; use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
@ -48,12 +44,12 @@ class WorkerTest extends TestCase
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock(); $bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
$bus->expects($this->at(0))->method('dispatch')->with( $bus->expects($this->at(0))->method('dispatch')->with(
$envelope = new Envelope($apiMessage, [new ReceivedStamp('transport'), new ConsumedByWorkerStamp()]) new Envelope($apiMessage, [new ReceivedStamp('transport'), new ConsumedByWorkerStamp()])
)->willReturn($envelope); )->willReturnArgument(0);
$bus->expects($this->at(1))->method('dispatch')->with( $bus->expects($this->at(1))->method('dispatch')->with(
$envelope = new Envelope($ipaMessage, [new ReceivedStamp('transport'), new ConsumedByWorkerStamp()]) new Envelope($ipaMessage, [new ReceivedStamp('transport'), new ConsumedByWorkerStamp()])
)->willReturn($envelope); )->willReturnArgument(0);
$worker = new Worker(['transport' => $receiver], $bus); $worker = new Worker(['transport' => $receiver], $bus);
$worker->run([], function (?Envelope $envelope) use ($worker) { $worker->run([], function (?Envelope $envelope) use ($worker) {
@ -66,95 +62,7 @@ class WorkerTest extends TestCase
$this->assertSame(2, $receiver->getAcknowledgeCount()); $this->assertSame(2, $receiver->getAcknowledgeCount());
} }
public function testWorkerDoesNotWrapMessagesAlreadyWrappedWithReceivedMessage() public function testHandlingErrorCausesReject()
{
$envelope = new Envelope(new DummyMessage('API'));
$receiver = new DummyReceiver([[$envelope]]);
$envelope = $envelope->with(new ReceivedStamp('transport'), new ConsumedByWorkerStamp());
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
$bus->expects($this->at(0))->method('dispatch')->with($envelope)->willReturn($envelope);
$worker = new Worker(['transport' => $receiver], $bus, []);
$worker->run([], function (?Envelope $envelope) use ($worker) {
// stop after the messages finish
if (null === $envelope) {
$worker->stop();
}
});
}
public function testDispatchCausesRetry()
{
$receiver = new DummyReceiver([
[new Envelope(new DummyMessage('Hello'), [new SentStamp('Some\Sender', 'transport1')])],
]);
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
$bus->expects($this->at(0))->method('dispatch')->willThrowException(new \InvalidArgumentException('Why not'));
// 2nd call will be the retry
$bus->expects($this->at(1))->method('dispatch')->with($this->callback(function (Envelope $envelope) {
/** @var RedeliveryStamp|null $redeliveryStamp */
$redeliveryStamp = $envelope->last(RedeliveryStamp::class);
$this->assertNotNull($redeliveryStamp);
// retry count now at 1
$this->assertSame(1, $redeliveryStamp->getRetryCount());
$this->assertSame('transport1', $redeliveryStamp->getSenderClassOrAlias());
// received stamp is removed
$this->assertNull($envelope->last(ReceivedStamp::class));
return true;
}))->willReturnArgument(0);
$retryStrategy = $this->getMockBuilder(RetryStrategyInterface::class)->getMock();
$retryStrategy->expects($this->once())->method('isRetryable')->willReturn(true);
$worker = new Worker(['transport1' => $receiver], $bus, ['transport1' => $retryStrategy]);
$worker->run([], function (?Envelope $envelope) use ($worker) {
// stop after the messages finish
if (null === $envelope) {
$worker->stop();
}
});
// old message rejected
$this->assertSame(1, $receiver->getRejectCount());
}
public function testUnrecoverableMessageHandlingExceptionPreventsRetries()
{
$envelope1 = new Envelope(new DummyMessage('Unwrapped Exception'), [new SentStamp('Some\Sender', 'transport1')]);
$envelope2 = new Envelope(new DummyMessage('Wrapped Exception'), [new SentStamp('Some\Sender', 'transport1')]);
$receiver = new DummyReceiver([
[$envelope1],
[$envelope2],
]);
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
$bus->expects($this->at(0))->method('dispatch')->willThrowException(new UnrecoverableMessageHandlingException());
$bus->expects($this->at(1))->method('dispatch')->willThrowException(
new HandlerFailedException($envelope2, [new UnrecoverableMessageHandlingException()])
);
$retryStrategy = $this->getMockBuilder(RetryStrategyInterface::class)->getMock();
$retryStrategy->expects($this->never())->method('isRetryable')->willReturn(true);
$worker = new Worker(['transport1' => $receiver], $bus, ['transport1' => $retryStrategy]);
$worker->run([], function (?Envelope $envelope) use ($worker) {
// stop after the messages finish
if (null === $envelope) {
$worker->stop();
}
});
// message was rejected
$this->assertSame(2, $receiver->getRejectCount());
}
public function testDispatchCausesRejectWhenNoRetry()
{ {
$receiver = new DummyReceiver([ $receiver = new DummyReceiver([
[new Envelope(new DummyMessage('Hello'), [new SentStamp('Some\Sender', 'transport1')])], [new Envelope(new DummyMessage('Hello'), [new SentStamp('Some\Sender', 'transport1')])],
@ -163,10 +71,7 @@ class WorkerTest extends TestCase
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock(); $bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
$bus->method('dispatch')->willThrowException(new \InvalidArgumentException('Why not')); $bus->method('dispatch')->willThrowException(new \InvalidArgumentException('Why not'));
$retryStrategy = $this->getMockBuilder(RetryStrategyInterface::class)->getMock(); $worker = new Worker(['transport1' => $receiver], $bus);
$retryStrategy->expects($this->once())->method('isRetryable')->willReturn(false);
$worker = new Worker(['transport1' => $receiver], $bus, ['transport1' => $retryStrategy]);
$worker->run([], function (?Envelope $envelope) use ($worker) { $worker->run([], function (?Envelope $envelope) use ($worker) {
// stop after the messages finish // stop after the messages finish
if (null === $envelope) { if (null === $envelope) {
@ -177,28 +82,6 @@ class WorkerTest extends TestCase
$this->assertSame(0, $receiver->getAcknowledgeCount()); $this->assertSame(0, $receiver->getAcknowledgeCount());
} }
public function testDispatchCausesRejectOnUnrecoverableMessage()
{
$receiver = new DummyReceiver([
[new Envelope(new DummyMessage('Hello'))],
]);
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
$bus->method('dispatch')->willThrowException(new UnrecoverableMessageHandlingException('Will never work'));
$retryStrategy = $this->getMockBuilder(RetryStrategyInterface::class)->getMock();
$retryStrategy->expects($this->never())->method('isRetryable');
$worker = new Worker(['transport1' => $receiver], $bus, ['transport1' => $retryStrategy]);
$worker->run([], function (?Envelope $envelope) use ($worker) {
// stop after the messages finish
if (null === $envelope) {
$worker->stop();
}
});
$this->assertSame(1, $receiver->getRejectCount());
}
public function testWorkerDoesNotSendNullMessagesToTheBus() public function testWorkerDoesNotSendNullMessagesToTheBus()
{ {
$receiver = new DummyReceiver([ $receiver = new DummyReceiver([
@ -235,7 +118,7 @@ class WorkerTest extends TestCase
[$this->isInstanceOf(WorkerStoppedEvent::class)] [$this->isInstanceOf(WorkerStoppedEvent::class)]
); );
$worker = new Worker([$receiver], $bus, [], $eventDispatcher); $worker = new Worker([$receiver], $bus, $eventDispatcher);
$worker->run([], function (?Envelope $envelope) use ($worker) { $worker->run([], function (?Envelope $envelope) use ($worker) {
// stop after the messages finish // stop after the messages finish
if (null === $envelope) { if (null === $envelope) {
@ -263,7 +146,7 @@ class WorkerTest extends TestCase
[$this->isInstanceOf(WorkerStoppedEvent::class)] [$this->isInstanceOf(WorkerStoppedEvent::class)]
); );
$worker = new Worker([$receiver], $bus, [], $eventDispatcher); $worker = new Worker([$receiver], $bus, $eventDispatcher);
$worker->run([], function (?Envelope $envelope) use ($worker) { $worker->run([], function (?Envelope $envelope) use ($worker) {
// stop after the messages finish // stop after the messages finish
if (null === $envelope) { if (null === $envelope) {

View File

@ -15,7 +15,6 @@ use Psr\Container\ContainerInterface;
use Symfony\Component\DependencyInjection\ServiceLocator; use Symfony\Component\DependencyInjection\ServiceLocator;
use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\RuntimeException; use Symfony\Component\Messenger\Exception\RuntimeException;
use Symfony\Component\Messenger\Exception\UnknownSenderException;
use Symfony\Component\Messenger\Handler\HandlersLocator; use Symfony\Component\Messenger\Handler\HandlersLocator;
/** /**
@ -79,13 +78,4 @@ class SendersLocator implements SendersLocatorInterface
} }
} }
} }
public function getSenderByAlias(string $alias): SenderInterface
{
if ($this->sendersLocator->has($alias)) {
return $this->sendersLocator->get($alias);
}
throw new UnknownSenderException(sprintf('Unknown sender alias "%s".', $alias));
}
} }

View File

@ -12,7 +12,6 @@
namespace Symfony\Component\Messenger\Transport\Sender; namespace Symfony\Component\Messenger\Transport\Sender;
use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\UnknownSenderException;
/** /**
* Maps a message to a list of senders. * Maps a message to a list of senders.
@ -28,13 +27,4 @@ interface SendersLocatorInterface
* @return iterable|SenderInterface[] Indexed by sender alias if available * @return iterable|SenderInterface[] Indexed by sender alias if available
*/ */
public function getSenders(Envelope $envelope): iterable; public function getSenders(Envelope $envelope): iterable;
/**
* Returns a specific sender by its alias.
*
* @param string $alias The alias given to the sender in getSenders()
*
* @throws UnknownSenderException If the sender is not found
*/
public function getSenderByAlias(string $alias): SenderInterface;
} }

View File

@ -19,12 +19,8 @@ use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
use Symfony\Component\Messenger\Event\WorkerStoppedEvent; use Symfony\Component\Messenger\Event\WorkerStoppedEvent;
use Symfony\Component\Messenger\Exception\HandlerFailedException; use Symfony\Component\Messenger\Exception\HandlerFailedException;
use Symfony\Component\Messenger\Exception\RejectRedeliveredMessageException; use Symfony\Component\Messenger\Exception\RejectRedeliveredMessageException;
use Symfony\Component\Messenger\Exception\UnrecoverableExceptionInterface;
use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp; use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp;
use Symfony\Component\Messenger\Stamp\DelayStamp;
use Symfony\Component\Messenger\Stamp\ReceivedStamp; use Symfony\Component\Messenger\Stamp\ReceivedStamp;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface; use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface; use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
@ -37,20 +33,17 @@ class Worker implements WorkerInterface
{ {
private $receivers; private $receivers;
private $bus; private $bus;
private $retryStrategies;
private $eventDispatcher; private $eventDispatcher;
private $logger; private $logger;
private $shouldStop = false; private $shouldStop = false;
/** /**
* @param ReceiverInterface[] $receivers Where the key is the transport name * @param ReceiverInterface[] $receivers Where the key is the transport name
* @param RetryStrategyInterface[] $retryStrategies Retry strategies for each receiver (array keys must match)
*/ */
public function __construct(array $receivers, MessageBusInterface $bus, array $retryStrategies = [], EventDispatcherInterface $eventDispatcher = null, LoggerInterface $logger = null) public function __construct(array $receivers, MessageBusInterface $bus, EventDispatcherInterface $eventDispatcher = null, LoggerInterface $logger = null)
{ {
$this->receivers = $receivers; $this->receivers = $receivers;
$this->bus = $bus; $this->bus = $bus;
$this->retryStrategies = $retryStrategies;
$this->eventDispatcher = LegacyEventDispatcherProxy::decorate($eventDispatcher); $this->eventDispatcher = LegacyEventDispatcherProxy::decorate($eventDispatcher);
$this->logger = $logger; $this->logger = $logger;
} }
@ -91,7 +84,7 @@ class Worker implements WorkerInterface
foreach ($envelopes as $envelope) { foreach ($envelopes as $envelope) {
$envelopeHandled = true; $envelopeHandled = true;
$this->handleMessage($envelope, $receiver, $transportName, $this->retryStrategies[$transportName] ?? null); $this->handleMessage($envelope, $receiver, $transportName);
$onHandled($envelope); $onHandled($envelope);
if ($this->shouldStop) { if ($this->shouldStop) {
@ -117,7 +110,7 @@ class Worker implements WorkerInterface
$this->dispatchEvent(new WorkerStoppedEvent()); $this->dispatchEvent(new WorkerStoppedEvent());
} }
private function handleMessage(Envelope $envelope, ReceiverInterface $receiver, string $transportName, ?RetryStrategyInterface $retryStrategy): void private function handleMessage(Envelope $envelope, ReceiverInterface $receiver, string $transportName): void
{ {
$event = new WorkerMessageReceivedEvent($envelope, $transportName); $event = new WorkerMessageReceivedEvent($envelope, $transportName);
$this->dispatchEvent($event); $this->dispatchEvent($event);
@ -126,12 +119,6 @@ class Worker implements WorkerInterface
return; return;
} }
$message = $envelope->getMessage();
$context = [
'message' => $message,
'class' => \get_class($message),
];
try { try {
$envelope = $this->bus->dispatch($envelope->with(new ReceivedStamp($transportName), new ConsumedByWorkerStamp())); $envelope = $this->bus->dispatch($envelope->with(new ReceivedStamp($transportName), new ConsumedByWorkerStamp()));
} catch (\Throwable $throwable) { } catch (\Throwable $throwable) {
@ -146,30 +133,7 @@ class Worker implements WorkerInterface
$envelope = $throwable->getEnvelope(); $envelope = $throwable->getEnvelope();
} }
$shouldRetry = $retryStrategy && $this->shouldRetry($throwable, $envelope, $retryStrategy); $this->dispatchEvent(new WorkerMessageFailedEvent($envelope, $transportName, $throwable));
$this->dispatchEvent(new WorkerMessageFailedEvent($envelope, $transportName, $throwable, $shouldRetry));
$retryCount = RedeliveryStamp::getRetryCountFromEnvelope($envelope);
if ($shouldRetry) {
++$retryCount;
$delay = $retryStrategy->getWaitingTime($envelope);
if (null !== $this->logger) {
$this->logger->error('Error thrown while handling message {class}. Dispatching for retry #{retryCount} using {delay} ms delay. Error: "{error}"', $context + ['retryCount' => $retryCount, 'delay' => $delay, 'error' => $throwable->getMessage(), 'exception' => $throwable]);
}
// add the delay and retry stamp info + remove ReceivedStamp
$retryEnvelope = $envelope->with(new DelayStamp($delay))
->with(new RedeliveryStamp($retryCount, $transportName))
->withoutAll(ReceivedStamp::class);
// re-send the message for retry
$this->bus->dispatch($retryEnvelope);
} else {
if (null !== $this->logger) {
$this->logger->critical('Error thrown while handling message {class}. Removing from transport after {retryCount} retries. Error: "{error}"', $context + ['retryCount' => $retryCount, 'error' => $throwable->getMessage(), 'exception' => $throwable]);
}
}
if (!$rejectFirst) { if (!$rejectFirst) {
$receiver->reject($envelope); $receiver->reject($envelope);
@ -181,6 +145,11 @@ class Worker implements WorkerInterface
$this->dispatchEvent(new WorkerMessageHandledEvent($envelope, $transportName)); $this->dispatchEvent(new WorkerMessageHandledEvent($envelope, $transportName));
if (null !== $this->logger) { if (null !== $this->logger) {
$message = $envelope->getMessage();
$context = [
'message' => $message,
'class' => \get_class($message),
];
$this->logger->info('{class} was handled successfully (acknowledging to transport).', $context); $this->logger->info('{class} was handled successfully (acknowledging to transport).', $context);
} }
@ -200,27 +169,4 @@ class Worker implements WorkerInterface
$this->eventDispatcher->dispatch($event); $this->eventDispatcher->dispatch($event);
} }
private function shouldRetry(\Throwable $e, Envelope $envelope, RetryStrategyInterface $retryStrategy): bool
{
// if ALL nested Exceptions are an instance of UnrecoverableExceptionInterface we should not retry
if ($e instanceof HandlerFailedException) {
$shouldNotRetry = true;
foreach ($e->getNestedExceptions() as $nestedException) {
if (!$nestedException instanceof UnrecoverableExceptionInterface) {
$shouldNotRetry = false;
break;
}
}
if ($shouldNotRetry) {
return false;
}
}
if ($e instanceof UnrecoverableExceptionInterface) {
return false;
}
return $retryStrategy->isRetryable($envelope);
}
} }