From 55bddcb72137b65fc1378c69829fa0163ffeb376 Mon Sep 17 00:00:00 2001 From: Nicolas PHILIPPE Date: Fri, 20 Mar 2020 17:56:40 +0100 Subject: [PATCH] [Messenger] dispatch event when a message is retried --- .../Resources/config/messenger.php | 1 + src/Symfony/Component/Messenger/CHANGELOG.md | 2 + .../Event/AbstractWorkerMessageEvent.php | 6 +++ .../Event/WorkerMessageRetriedEvent.php | 21 ++++++++++ .../SendFailedMessageForRetryListener.php | 10 ++++- .../SendFailedMessageForRetryListenerTest.php | 6 ++- .../Component/Messenger/Tests/WorkerTest.php | 39 +++++++++++++++++++ src/Symfony/Component/Messenger/Worker.php | 9 ++++- 8 files changed, 90 insertions(+), 4 deletions(-) create mode 100644 src/Symfony/Component/Messenger/Event/WorkerMessageRetriedEvent.php diff --git a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php index 3d99912972..120ce6c50b 100644 --- a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php +++ b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php @@ -152,6 +152,7 @@ return static function (ContainerConfigurator $container) { abstract_arg('senders service locator'), service('messenger.retry_strategy_locator'), service('logger')->ignoreOnInvalid(), + service('event_dispatcher'), ]) ->tag('kernel.event_subscriber') ->tag('monolog.logger', ['channel' => 'messenger']) diff --git a/src/Symfony/Component/Messenger/CHANGELOG.md b/src/Symfony/Component/Messenger/CHANGELOG.md index 8a0e0b1dec..63b8d4b188 100644 --- a/src/Symfony/Component/Messenger/CHANGELOG.md +++ b/src/Symfony/Component/Messenger/CHANGELOG.md @@ -7,6 +7,8 @@ CHANGELOG * Added `FlattenExceptionNormalizer` to give more information about the exception on Messenger background processes. The `FlattenExceptionNormalizer` has a higher priority than `ProblemNormalizer` and it is only used when the Messenger serialization context is set. * Added factory methods to `DelayStamp`. * Removed the exception when dispatching a message with a `DispatchAfterCurrentBusStamp` and not in a context of another dispatch call +* Added `WorkerMessageRetriedEvent` +* Added `WorkerMessageReceivedEvent::setEnvelope()` and made event mutable 5.1.0 ----- diff --git a/src/Symfony/Component/Messenger/Event/AbstractWorkerMessageEvent.php b/src/Symfony/Component/Messenger/Event/AbstractWorkerMessageEvent.php index ce444d6b3c..e3ece8118e 100644 --- a/src/Symfony/Component/Messenger/Event/AbstractWorkerMessageEvent.php +++ b/src/Symfony/Component/Messenger/Event/AbstractWorkerMessageEvent.php @@ -12,6 +12,7 @@ namespace Symfony\Component\Messenger\Event; use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Stamp\StampInterface; abstract class AbstractWorkerMessageEvent { @@ -36,4 +37,9 @@ abstract class AbstractWorkerMessageEvent { return $this->receiverName; } + + public function addStamps(StampInterface ...$stamps): void + { + $this->envelope = $this->envelope->with(...$stamps); + } } diff --git a/src/Symfony/Component/Messenger/Event/WorkerMessageRetriedEvent.php b/src/Symfony/Component/Messenger/Event/WorkerMessageRetriedEvent.php new file mode 100644 index 0000000000..d348b44fd3 --- /dev/null +++ b/src/Symfony/Component/Messenger/Event/WorkerMessageRetriedEvent.php @@ -0,0 +1,21 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Event; + +/** + * Dispatched after a message has been sent for retry. + * + * The event name is the class name. + */ +final class WorkerMessageRetriedEvent extends AbstractWorkerMessageEvent +{ +} diff --git a/src/Symfony/Component/Messenger/EventListener/SendFailedMessageForRetryListener.php b/src/Symfony/Component/Messenger/EventListener/SendFailedMessageForRetryListener.php index 9db83b8d56..ff330cd38c 100644 --- a/src/Symfony/Component/Messenger/EventListener/SendFailedMessageForRetryListener.php +++ b/src/Symfony/Component/Messenger/EventListener/SendFailedMessageForRetryListener.php @@ -15,6 +15,7 @@ use Psr\Log\LoggerInterface; use Symfony\Component\EventDispatcher\EventSubscriberInterface; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent; +use Symfony\Component\Messenger\Event\WorkerMessageRetriedEvent; use Symfony\Component\Messenger\Exception\HandlerFailedException; use Symfony\Component\Messenger\Exception\RecoverableExceptionInterface; use Symfony\Component\Messenger\Exception\RuntimeException; @@ -24,6 +25,7 @@ use Symfony\Component\Messenger\Stamp\DelayStamp; use Symfony\Component\Messenger\Stamp\RedeliveryStamp; use Symfony\Component\Messenger\Stamp\StampInterface; use Symfony\Component\Messenger\Transport\Sender\SenderInterface; +use Symfony\Contracts\EventDispatcher\EventDispatcherInterface; /** * @author Tobias Schultze @@ -33,13 +35,15 @@ class SendFailedMessageForRetryListener implements EventSubscriberInterface private $sendersLocator; private $retryStrategyLocator; private $logger; + private $eventDispatcher; private $historySize; - public function __construct(ContainerInterface $sendersLocator, ContainerInterface $retryStrategyLocator, LoggerInterface $logger = null, int $historySize = 10) + public function __construct(ContainerInterface $sendersLocator, ContainerInterface $retryStrategyLocator, LoggerInterface $logger = null, EventDispatcherInterface $eventDispatcher = null, int $historySize = 10) { $this->sendersLocator = $sendersLocator; $this->retryStrategyLocator = $retryStrategyLocator; $this->logger = $logger; + $this->eventDispatcher = $eventDispatcher; $this->historySize = $historySize; } @@ -74,6 +78,10 @@ class SendFailedMessageForRetryListener implements EventSubscriberInterface // re-send the message for retry $this->getSenderForTransport($event->getReceiverName())->send($retryEnvelope); + + if (null !== $this->eventDispatcher) { + $this->eventDispatcher->dispatch(new WorkerMessageRetriedEvent($retryEnvelope, $event->getReceiverName())); + } } else { if (null !== $this->logger) { $this->logger->critical('Error thrown while handling message {class}. Removing from transport after {retryCount} retries. Error: "{error}"', $context + ['retryCount' => $retryCount, 'error' => $throwable->getMessage(), 'exception' => $throwable]); diff --git a/src/Symfony/Component/Messenger/Tests/EventListener/SendFailedMessageForRetryListenerTest.php b/src/Symfony/Component/Messenger/Tests/EventListener/SendFailedMessageForRetryListenerTest.php index 0bac20198e..3e1cc6c2ae 100644 --- a/src/Symfony/Component/Messenger/Tests/EventListener/SendFailedMessageForRetryListenerTest.php +++ b/src/Symfony/Component/Messenger/Tests/EventListener/SendFailedMessageForRetryListenerTest.php @@ -21,6 +21,7 @@ use Symfony\Component\Messenger\Retry\RetryStrategyInterface; use Symfony\Component\Messenger\Stamp\DelayStamp; use Symfony\Component\Messenger\Stamp\RedeliveryStamp; use Symfony\Component\Messenger\Transport\Sender\SenderInterface; +use Symfony\Contracts\EventDispatcher\EventDispatcherInterface; class SendFailedMessageForRetryListenerTest extends TestCase { @@ -107,7 +108,10 @@ class SendFailedMessageForRetryListenerTest extends TestCase $retryStrategyLocator->expects($this->once())->method('has')->willReturn(true); $retryStrategyLocator->expects($this->once())->method('get')->willReturn($retryStategy); - $listener = new SendFailedMessageForRetryListener($senderLocator, $retryStrategyLocator); + $eventDispatcher = $this->createMock(EventDispatcherInterface::class); + $eventDispatcher->expects($this->once())->method('dispatch'); + + $listener = new SendFailedMessageForRetryListener($senderLocator, $retryStrategyLocator, null, $eventDispatcher); $event = new WorkerMessageFailedEvent($envelope, 'my_receiver', $exception); diff --git a/src/Symfony/Component/Messenger/Tests/WorkerTest.php b/src/Symfony/Component/Messenger/Tests/WorkerTest.php index 28349d804c..9ed4c512a1 100644 --- a/src/Symfony/Component/Messenger/Tests/WorkerTest.php +++ b/src/Symfony/Component/Messenger/Tests/WorkerTest.php @@ -25,6 +25,7 @@ use Symfony\Component\Messenger\MessageBusInterface; use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp; use Symfony\Component\Messenger\Stamp\ReceivedStamp; use Symfony\Component\Messenger\Stamp\SentStamp; +use Symfony\Component\Messenger\Stamp\StampInterface; use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface; use Symfony\Component\Messenger\Worker; @@ -243,14 +244,45 @@ class WorkerTest extends TestCase // make sure they were processed in the correct order $this->assertSame([$envelope1, $envelope2, $envelope3, $envelope4, $envelope5, $envelope6], $processedEnvelopes); } + + public function testWorkerMessageReceivedEventMutability() + { + $envelope = new Envelope(new DummyMessage('Hello')); + $receiver = new DummyReceiver([[$envelope]]); + + $bus = $this->getMockBuilder(MessageBusInterface::class)->getMock(); + $bus->method('dispatch')->willReturnArgument(0); + + $eventDispatcher = new EventDispatcher(); + $eventDispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(1)); + + $stamp = new class() implements StampInterface { + }; + $listener = function (WorkerMessageReceivedEvent $event) use ($stamp) { + $event->addStamps($stamp); + }; + + $eventDispatcher->addListener(WorkerMessageReceivedEvent::class, $listener); + + $worker = new Worker([$receiver], $bus, $eventDispatcher); + $worker->run(); + + $envelope = current($receiver->getAcknowledgedEnvelopes()); + $this->assertCount(1, $envelope->all(\get_class($stamp))); + } } class DummyReceiver implements ReceiverInterface { private $deliveriesOfEnvelopes; + private $acknowledgedEnvelopes; + private $rejectedEnvelopes; private $acknowledgeCount = 0; private $rejectCount = 0; + /** + * @param Envelope[][] $deliveriesOfEnvelopes + */ public function __construct(array $deliveriesOfEnvelopes) { $this->deliveriesOfEnvelopes = $deliveriesOfEnvelopes; @@ -266,11 +298,13 @@ class DummyReceiver implements ReceiverInterface public function ack(Envelope $envelope): void { ++$this->acknowledgeCount; + $this->acknowledgedEnvelopes[] = $envelope; } public function reject(Envelope $envelope): void { ++$this->rejectCount; + $this->rejectedEnvelopes[] = $envelope; } public function getAcknowledgeCount(): int @@ -282,4 +316,9 @@ class DummyReceiver implements ReceiverInterface { return $this->rejectCount; } + + public function getAcknowledgedEnvelopes(): array + { + return $this->acknowledgedEnvelopes; + } } diff --git a/src/Symfony/Component/Messenger/Worker.php b/src/Symfony/Component/Messenger/Worker.php index a1cca6b802..6f8d34b1cb 100644 --- a/src/Symfony/Component/Messenger/Worker.php +++ b/src/Symfony/Component/Messenger/Worker.php @@ -104,6 +104,7 @@ class Worker { $event = new WorkerMessageReceivedEvent($envelope, $transportName); $this->dispatchEvent($event); + $envelope = $event->getEnvelope(); if (!$event->shouldHandle()) { return; @@ -123,7 +124,9 @@ class Worker $envelope = $throwable->getEnvelope(); } - $this->dispatchEvent(new WorkerMessageFailedEvent($envelope, $transportName, $throwable)); + $failedEvent = new WorkerMessageFailedEvent($envelope, $transportName, $throwable); + $this->dispatchEvent($failedEvent); + $envelope = $failedEvent->getEnvelope(); if (!$rejectFirst) { $receiver->reject($envelope); @@ -132,7 +135,9 @@ class Worker return; } - $this->dispatchEvent(new WorkerMessageHandledEvent($envelope, $transportName)); + $handledEvent = new WorkerMessageHandledEvent($envelope, $transportName); + $this->dispatchEvent($handledEvent); + $envelope = $handledEvent->getEnvelope(); if (null !== $this->logger) { $message = $envelope->getMessage();