[Messenger] dispatch event when a message is retried

This commit is contained in:
Nicolas PHILIPPE 2020-03-20 17:56:40 +01:00 committed by Fabien Potencier
parent 127724d519
commit 55bddcb721
8 changed files with 90 additions and 4 deletions

View File

@ -152,6 +152,7 @@ return static function (ContainerConfigurator $container) {
abstract_arg('senders service locator'),
service('messenger.retry_strategy_locator'),
service('logger')->ignoreOnInvalid(),
service('event_dispatcher'),
])
->tag('kernel.event_subscriber')
->tag('monolog.logger', ['channel' => 'messenger'])

View File

@ -7,6 +7,8 @@ CHANGELOG
* Added `FlattenExceptionNormalizer` to give more information about the exception on Messenger background processes. The `FlattenExceptionNormalizer` has a higher priority than `ProblemNormalizer` and it is only used when the Messenger serialization context is set.
* Added factory methods to `DelayStamp`.
* Removed the exception when dispatching a message with a `DispatchAfterCurrentBusStamp` and not in a context of another dispatch call
* Added `WorkerMessageRetriedEvent`
* Added `WorkerMessageReceivedEvent::setEnvelope()` and made event mutable
5.1.0
-----

View File

@ -12,6 +12,7 @@
namespace Symfony\Component\Messenger\Event;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Stamp\StampInterface;
abstract class AbstractWorkerMessageEvent
{
@ -36,4 +37,9 @@ abstract class AbstractWorkerMessageEvent
{
return $this->receiverName;
}
public function addStamps(StampInterface ...$stamps): void
{
$this->envelope = $this->envelope->with(...$stamps);
}
}

View File

@ -0,0 +1,21 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Event;
/**
* Dispatched after a message has been sent for retry.
*
* The event name is the class name.
*/
final class WorkerMessageRetriedEvent extends AbstractWorkerMessageEvent
{
}

View File

@ -15,6 +15,7 @@ use Psr\Log\LoggerInterface;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
use Symfony\Component\Messenger\Event\WorkerMessageRetriedEvent;
use Symfony\Component\Messenger\Exception\HandlerFailedException;
use Symfony\Component\Messenger\Exception\RecoverableExceptionInterface;
use Symfony\Component\Messenger\Exception\RuntimeException;
@ -24,6 +25,7 @@ use Symfony\Component\Messenger\Stamp\DelayStamp;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Stamp\StampInterface;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
/**
* @author Tobias Schultze <http://tobion.de>
@ -33,13 +35,15 @@ class SendFailedMessageForRetryListener implements EventSubscriberInterface
private $sendersLocator;
private $retryStrategyLocator;
private $logger;
private $eventDispatcher;
private $historySize;
public function __construct(ContainerInterface $sendersLocator, ContainerInterface $retryStrategyLocator, LoggerInterface $logger = null, int $historySize = 10)
public function __construct(ContainerInterface $sendersLocator, ContainerInterface $retryStrategyLocator, LoggerInterface $logger = null, EventDispatcherInterface $eventDispatcher = null, int $historySize = 10)
{
$this->sendersLocator = $sendersLocator;
$this->retryStrategyLocator = $retryStrategyLocator;
$this->logger = $logger;
$this->eventDispatcher = $eventDispatcher;
$this->historySize = $historySize;
}
@ -74,6 +78,10 @@ class SendFailedMessageForRetryListener implements EventSubscriberInterface
// re-send the message for retry
$this->getSenderForTransport($event->getReceiverName())->send($retryEnvelope);
if (null !== $this->eventDispatcher) {
$this->eventDispatcher->dispatch(new WorkerMessageRetriedEvent($retryEnvelope, $event->getReceiverName()));
}
} else {
if (null !== $this->logger) {
$this->logger->critical('Error thrown while handling message {class}. Removing from transport after {retryCount} retries. Error: "{error}"', $context + ['retryCount' => $retryCount, 'error' => $throwable->getMessage(), 'exception' => $throwable]);

View File

@ -21,6 +21,7 @@ use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
use Symfony\Component\Messenger\Stamp\DelayStamp;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
class SendFailedMessageForRetryListenerTest extends TestCase
{
@ -107,7 +108,10 @@ class SendFailedMessageForRetryListenerTest extends TestCase
$retryStrategyLocator->expects($this->once())->method('has')->willReturn(true);
$retryStrategyLocator->expects($this->once())->method('get')->willReturn($retryStategy);
$listener = new SendFailedMessageForRetryListener($senderLocator, $retryStrategyLocator);
$eventDispatcher = $this->createMock(EventDispatcherInterface::class);
$eventDispatcher->expects($this->once())->method('dispatch');
$listener = new SendFailedMessageForRetryListener($senderLocator, $retryStrategyLocator, null, $eventDispatcher);
$event = new WorkerMessageFailedEvent($envelope, 'my_receiver', $exception);

View File

@ -25,6 +25,7 @@ use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp;
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
use Symfony\Component\Messenger\Stamp\SentStamp;
use Symfony\Component\Messenger\Stamp\StampInterface;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\Messenger\Worker;
@ -243,14 +244,45 @@ class WorkerTest extends TestCase
// make sure they were processed in the correct order
$this->assertSame([$envelope1, $envelope2, $envelope3, $envelope4, $envelope5, $envelope6], $processedEnvelopes);
}
public function testWorkerMessageReceivedEventMutability()
{
$envelope = new Envelope(new DummyMessage('Hello'));
$receiver = new DummyReceiver([[$envelope]]);
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
$bus->method('dispatch')->willReturnArgument(0);
$eventDispatcher = new EventDispatcher();
$eventDispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(1));
$stamp = new class() implements StampInterface {
};
$listener = function (WorkerMessageReceivedEvent $event) use ($stamp) {
$event->addStamps($stamp);
};
$eventDispatcher->addListener(WorkerMessageReceivedEvent::class, $listener);
$worker = new Worker([$receiver], $bus, $eventDispatcher);
$worker->run();
$envelope = current($receiver->getAcknowledgedEnvelopes());
$this->assertCount(1, $envelope->all(\get_class($stamp)));
}
}
class DummyReceiver implements ReceiverInterface
{
private $deliveriesOfEnvelopes;
private $acknowledgedEnvelopes;
private $rejectedEnvelopes;
private $acknowledgeCount = 0;
private $rejectCount = 0;
/**
* @param Envelope[][] $deliveriesOfEnvelopes
*/
public function __construct(array $deliveriesOfEnvelopes)
{
$this->deliveriesOfEnvelopes = $deliveriesOfEnvelopes;
@ -266,11 +298,13 @@ class DummyReceiver implements ReceiverInterface
public function ack(Envelope $envelope): void
{
++$this->acknowledgeCount;
$this->acknowledgedEnvelopes[] = $envelope;
}
public function reject(Envelope $envelope): void
{
++$this->rejectCount;
$this->rejectedEnvelopes[] = $envelope;
}
public function getAcknowledgeCount(): int
@ -282,4 +316,9 @@ class DummyReceiver implements ReceiverInterface
{
return $this->rejectCount;
}
public function getAcknowledgedEnvelopes(): array
{
return $this->acknowledgedEnvelopes;
}
}

View File

@ -104,6 +104,7 @@ class Worker
{
$event = new WorkerMessageReceivedEvent($envelope, $transportName);
$this->dispatchEvent($event);
$envelope = $event->getEnvelope();
if (!$event->shouldHandle()) {
return;
@ -123,7 +124,9 @@ class Worker
$envelope = $throwable->getEnvelope();
}
$this->dispatchEvent(new WorkerMessageFailedEvent($envelope, $transportName, $throwable));
$failedEvent = new WorkerMessageFailedEvent($envelope, $transportName, $throwable);
$this->dispatchEvent($failedEvent);
$envelope = $failedEvent->getEnvelope();
if (!$rejectFirst) {
$receiver->reject($envelope);
@ -132,7 +135,9 @@ class Worker
return;
}
$this->dispatchEvent(new WorkerMessageHandledEvent($envelope, $transportName));
$handledEvent = new WorkerMessageHandledEvent($envelope, $transportName);
$this->dispatchEvent($handledEvent);
$envelope = $handledEvent->getEnvelope();
if (null !== $this->logger) {
$message = $envelope->getMessage();