[Messenger] dispatch event when a message is retried

This commit is contained in:
Nicolas PHILIPPE 2020-03-20 17:56:40 +01:00 committed by Fabien Potencier
parent 127724d519
commit 55bddcb721
8 changed files with 90 additions and 4 deletions

View File

@ -152,6 +152,7 @@ return static function (ContainerConfigurator $container) {
abstract_arg('senders service locator'), abstract_arg('senders service locator'),
service('messenger.retry_strategy_locator'), service('messenger.retry_strategy_locator'),
service('logger')->ignoreOnInvalid(), service('logger')->ignoreOnInvalid(),
service('event_dispatcher'),
]) ])
->tag('kernel.event_subscriber') ->tag('kernel.event_subscriber')
->tag('monolog.logger', ['channel' => 'messenger']) ->tag('monolog.logger', ['channel' => 'messenger'])

View File

@ -7,6 +7,8 @@ CHANGELOG
* Added `FlattenExceptionNormalizer` to give more information about the exception on Messenger background processes. The `FlattenExceptionNormalizer` has a higher priority than `ProblemNormalizer` and it is only used when the Messenger serialization context is set. * Added `FlattenExceptionNormalizer` to give more information about the exception on Messenger background processes. The `FlattenExceptionNormalizer` has a higher priority than `ProblemNormalizer` and it is only used when the Messenger serialization context is set.
* Added factory methods to `DelayStamp`. * Added factory methods to `DelayStamp`.
* Removed the exception when dispatching a message with a `DispatchAfterCurrentBusStamp` and not in a context of another dispatch call * Removed the exception when dispatching a message with a `DispatchAfterCurrentBusStamp` and not in a context of another dispatch call
* Added `WorkerMessageRetriedEvent`
* Added `WorkerMessageReceivedEvent::setEnvelope()` and made event mutable
5.1.0 5.1.0
----- -----

View File

@ -12,6 +12,7 @@
namespace Symfony\Component\Messenger\Event; namespace Symfony\Component\Messenger\Event;
use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Stamp\StampInterface;
abstract class AbstractWorkerMessageEvent abstract class AbstractWorkerMessageEvent
{ {
@ -36,4 +37,9 @@ abstract class AbstractWorkerMessageEvent
{ {
return $this->receiverName; return $this->receiverName;
} }
public function addStamps(StampInterface ...$stamps): void
{
$this->envelope = $this->envelope->with(...$stamps);
}
} }

View File

@ -0,0 +1,21 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Event;
/**
* Dispatched after a message has been sent for retry.
*
* The event name is the class name.
*/
final class WorkerMessageRetriedEvent extends AbstractWorkerMessageEvent
{
}

View File

@ -15,6 +15,7 @@ use Psr\Log\LoggerInterface;
use Symfony\Component\EventDispatcher\EventSubscriberInterface; use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent; use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
use Symfony\Component\Messenger\Event\WorkerMessageRetriedEvent;
use Symfony\Component\Messenger\Exception\HandlerFailedException; use Symfony\Component\Messenger\Exception\HandlerFailedException;
use Symfony\Component\Messenger\Exception\RecoverableExceptionInterface; use Symfony\Component\Messenger\Exception\RecoverableExceptionInterface;
use Symfony\Component\Messenger\Exception\RuntimeException; use Symfony\Component\Messenger\Exception\RuntimeException;
@ -24,6 +25,7 @@ use Symfony\Component\Messenger\Stamp\DelayStamp;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp; use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Stamp\StampInterface; use Symfony\Component\Messenger\Stamp\StampInterface;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface; use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
/** /**
* @author Tobias Schultze <http://tobion.de> * @author Tobias Schultze <http://tobion.de>
@ -33,13 +35,15 @@ class SendFailedMessageForRetryListener implements EventSubscriberInterface
private $sendersLocator; private $sendersLocator;
private $retryStrategyLocator; private $retryStrategyLocator;
private $logger; private $logger;
private $eventDispatcher;
private $historySize; private $historySize;
public function __construct(ContainerInterface $sendersLocator, ContainerInterface $retryStrategyLocator, LoggerInterface $logger = null, int $historySize = 10) public function __construct(ContainerInterface $sendersLocator, ContainerInterface $retryStrategyLocator, LoggerInterface $logger = null, EventDispatcherInterface $eventDispatcher = null, int $historySize = 10)
{ {
$this->sendersLocator = $sendersLocator; $this->sendersLocator = $sendersLocator;
$this->retryStrategyLocator = $retryStrategyLocator; $this->retryStrategyLocator = $retryStrategyLocator;
$this->logger = $logger; $this->logger = $logger;
$this->eventDispatcher = $eventDispatcher;
$this->historySize = $historySize; $this->historySize = $historySize;
} }
@ -74,6 +78,10 @@ class SendFailedMessageForRetryListener implements EventSubscriberInterface
// re-send the message for retry // re-send the message for retry
$this->getSenderForTransport($event->getReceiverName())->send($retryEnvelope); $this->getSenderForTransport($event->getReceiverName())->send($retryEnvelope);
if (null !== $this->eventDispatcher) {
$this->eventDispatcher->dispatch(new WorkerMessageRetriedEvent($retryEnvelope, $event->getReceiverName()));
}
} else { } else {
if (null !== $this->logger) { if (null !== $this->logger) {
$this->logger->critical('Error thrown while handling message {class}. Removing from transport after {retryCount} retries. Error: "{error}"', $context + ['retryCount' => $retryCount, 'error' => $throwable->getMessage(), 'exception' => $throwable]); $this->logger->critical('Error thrown while handling message {class}. Removing from transport after {retryCount} retries. Error: "{error}"', $context + ['retryCount' => $retryCount, 'error' => $throwable->getMessage(), 'exception' => $throwable]);

View File

@ -21,6 +21,7 @@ use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
use Symfony\Component\Messenger\Stamp\DelayStamp; use Symfony\Component\Messenger\Stamp\DelayStamp;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp; use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface; use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
class SendFailedMessageForRetryListenerTest extends TestCase class SendFailedMessageForRetryListenerTest extends TestCase
{ {
@ -107,7 +108,10 @@ class SendFailedMessageForRetryListenerTest extends TestCase
$retryStrategyLocator->expects($this->once())->method('has')->willReturn(true); $retryStrategyLocator->expects($this->once())->method('has')->willReturn(true);
$retryStrategyLocator->expects($this->once())->method('get')->willReturn($retryStategy); $retryStrategyLocator->expects($this->once())->method('get')->willReturn($retryStategy);
$listener = new SendFailedMessageForRetryListener($senderLocator, $retryStrategyLocator); $eventDispatcher = $this->createMock(EventDispatcherInterface::class);
$eventDispatcher->expects($this->once())->method('dispatch');
$listener = new SendFailedMessageForRetryListener($senderLocator, $retryStrategyLocator, null, $eventDispatcher);
$event = new WorkerMessageFailedEvent($envelope, 'my_receiver', $exception); $event = new WorkerMessageFailedEvent($envelope, 'my_receiver', $exception);

View File

@ -25,6 +25,7 @@ use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp; use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp;
use Symfony\Component\Messenger\Stamp\ReceivedStamp; use Symfony\Component\Messenger\Stamp\ReceivedStamp;
use Symfony\Component\Messenger\Stamp\SentStamp; use Symfony\Component\Messenger\Stamp\SentStamp;
use Symfony\Component\Messenger\Stamp\StampInterface;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface; use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\Messenger\Worker; use Symfony\Component\Messenger\Worker;
@ -243,14 +244,45 @@ class WorkerTest extends TestCase
// make sure they were processed in the correct order // make sure they were processed in the correct order
$this->assertSame([$envelope1, $envelope2, $envelope3, $envelope4, $envelope5, $envelope6], $processedEnvelopes); $this->assertSame([$envelope1, $envelope2, $envelope3, $envelope4, $envelope5, $envelope6], $processedEnvelopes);
} }
public function testWorkerMessageReceivedEventMutability()
{
$envelope = new Envelope(new DummyMessage('Hello'));
$receiver = new DummyReceiver([[$envelope]]);
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
$bus->method('dispatch')->willReturnArgument(0);
$eventDispatcher = new EventDispatcher();
$eventDispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(1));
$stamp = new class() implements StampInterface {
};
$listener = function (WorkerMessageReceivedEvent $event) use ($stamp) {
$event->addStamps($stamp);
};
$eventDispatcher->addListener(WorkerMessageReceivedEvent::class, $listener);
$worker = new Worker([$receiver], $bus, $eventDispatcher);
$worker->run();
$envelope = current($receiver->getAcknowledgedEnvelopes());
$this->assertCount(1, $envelope->all(\get_class($stamp)));
}
} }
class DummyReceiver implements ReceiverInterface class DummyReceiver implements ReceiverInterface
{ {
private $deliveriesOfEnvelopes; private $deliveriesOfEnvelopes;
private $acknowledgedEnvelopes;
private $rejectedEnvelopes;
private $acknowledgeCount = 0; private $acknowledgeCount = 0;
private $rejectCount = 0; private $rejectCount = 0;
/**
* @param Envelope[][] $deliveriesOfEnvelopes
*/
public function __construct(array $deliveriesOfEnvelopes) public function __construct(array $deliveriesOfEnvelopes)
{ {
$this->deliveriesOfEnvelopes = $deliveriesOfEnvelopes; $this->deliveriesOfEnvelopes = $deliveriesOfEnvelopes;
@ -266,11 +298,13 @@ class DummyReceiver implements ReceiverInterface
public function ack(Envelope $envelope): void public function ack(Envelope $envelope): void
{ {
++$this->acknowledgeCount; ++$this->acknowledgeCount;
$this->acknowledgedEnvelopes[] = $envelope;
} }
public function reject(Envelope $envelope): void public function reject(Envelope $envelope): void
{ {
++$this->rejectCount; ++$this->rejectCount;
$this->rejectedEnvelopes[] = $envelope;
} }
public function getAcknowledgeCount(): int public function getAcknowledgeCount(): int
@ -282,4 +316,9 @@ class DummyReceiver implements ReceiverInterface
{ {
return $this->rejectCount; return $this->rejectCount;
} }
public function getAcknowledgedEnvelopes(): array
{
return $this->acknowledgedEnvelopes;
}
} }

View File

@ -104,6 +104,7 @@ class Worker
{ {
$event = new WorkerMessageReceivedEvent($envelope, $transportName); $event = new WorkerMessageReceivedEvent($envelope, $transportName);
$this->dispatchEvent($event); $this->dispatchEvent($event);
$envelope = $event->getEnvelope();
if (!$event->shouldHandle()) { if (!$event->shouldHandle()) {
return; return;
@ -123,7 +124,9 @@ class Worker
$envelope = $throwable->getEnvelope(); $envelope = $throwable->getEnvelope();
} }
$this->dispatchEvent(new WorkerMessageFailedEvent($envelope, $transportName, $throwable)); $failedEvent = new WorkerMessageFailedEvent($envelope, $transportName, $throwable);
$this->dispatchEvent($failedEvent);
$envelope = $failedEvent->getEnvelope();
if (!$rejectFirst) { if (!$rejectFirst) {
$receiver->reject($envelope); $receiver->reject($envelope);
@ -132,7 +135,9 @@ class Worker
return; return;
} }
$this->dispatchEvent(new WorkerMessageHandledEvent($envelope, $transportName)); $handledEvent = new WorkerMessageHandledEvent($envelope, $transportName);
$this->dispatchEvent($handledEvent);
$envelope = $handledEvent->getEnvelope();
if (null !== $this->logger) { if (null !== $this->logger) {
$message = $envelope->getMessage(); $message = $envelope->getMessage();