[Messenger] dispatch event when a message is retried
This commit is contained in:
parent
127724d519
commit
55bddcb721
@ -152,6 +152,7 @@ return static function (ContainerConfigurator $container) {
|
|||||||
abstract_arg('senders service locator'),
|
abstract_arg('senders service locator'),
|
||||||
service('messenger.retry_strategy_locator'),
|
service('messenger.retry_strategy_locator'),
|
||||||
service('logger')->ignoreOnInvalid(),
|
service('logger')->ignoreOnInvalid(),
|
||||||
|
service('event_dispatcher'),
|
||||||
])
|
])
|
||||||
->tag('kernel.event_subscriber')
|
->tag('kernel.event_subscriber')
|
||||||
->tag('monolog.logger', ['channel' => 'messenger'])
|
->tag('monolog.logger', ['channel' => 'messenger'])
|
||||||
|
@ -7,6 +7,8 @@ CHANGELOG
|
|||||||
* Added `FlattenExceptionNormalizer` to give more information about the exception on Messenger background processes. The `FlattenExceptionNormalizer` has a higher priority than `ProblemNormalizer` and it is only used when the Messenger serialization context is set.
|
* Added `FlattenExceptionNormalizer` to give more information about the exception on Messenger background processes. The `FlattenExceptionNormalizer` has a higher priority than `ProblemNormalizer` and it is only used when the Messenger serialization context is set.
|
||||||
* Added factory methods to `DelayStamp`.
|
* Added factory methods to `DelayStamp`.
|
||||||
* Removed the exception when dispatching a message with a `DispatchAfterCurrentBusStamp` and not in a context of another dispatch call
|
* Removed the exception when dispatching a message with a `DispatchAfterCurrentBusStamp` and not in a context of another dispatch call
|
||||||
|
* Added `WorkerMessageRetriedEvent`
|
||||||
|
* Added `WorkerMessageReceivedEvent::setEnvelope()` and made event mutable
|
||||||
|
|
||||||
5.1.0
|
5.1.0
|
||||||
-----
|
-----
|
||||||
|
@ -12,6 +12,7 @@
|
|||||||
namespace Symfony\Component\Messenger\Event;
|
namespace Symfony\Component\Messenger\Event;
|
||||||
|
|
||||||
use Symfony\Component\Messenger\Envelope;
|
use Symfony\Component\Messenger\Envelope;
|
||||||
|
use Symfony\Component\Messenger\Stamp\StampInterface;
|
||||||
|
|
||||||
abstract class AbstractWorkerMessageEvent
|
abstract class AbstractWorkerMessageEvent
|
||||||
{
|
{
|
||||||
@ -36,4 +37,9 @@ abstract class AbstractWorkerMessageEvent
|
|||||||
{
|
{
|
||||||
return $this->receiverName;
|
return $this->receiverName;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public function addStamps(StampInterface ...$stamps): void
|
||||||
|
{
|
||||||
|
$this->envelope = $this->envelope->with(...$stamps);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,21 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This file is part of the Symfony package.
|
||||||
|
*
|
||||||
|
* (c) Fabien Potencier <fabien@symfony.com>
|
||||||
|
*
|
||||||
|
* For the full copyright and license information, please view the LICENSE
|
||||||
|
* file that was distributed with this source code.
|
||||||
|
*/
|
||||||
|
|
||||||
|
namespace Symfony\Component\Messenger\Event;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Dispatched after a message has been sent for retry.
|
||||||
|
*
|
||||||
|
* The event name is the class name.
|
||||||
|
*/
|
||||||
|
final class WorkerMessageRetriedEvent extends AbstractWorkerMessageEvent
|
||||||
|
{
|
||||||
|
}
|
@ -15,6 +15,7 @@ use Psr\Log\LoggerInterface;
|
|||||||
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
|
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
|
||||||
use Symfony\Component\Messenger\Envelope;
|
use Symfony\Component\Messenger\Envelope;
|
||||||
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
|
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
|
||||||
|
use Symfony\Component\Messenger\Event\WorkerMessageRetriedEvent;
|
||||||
use Symfony\Component\Messenger\Exception\HandlerFailedException;
|
use Symfony\Component\Messenger\Exception\HandlerFailedException;
|
||||||
use Symfony\Component\Messenger\Exception\RecoverableExceptionInterface;
|
use Symfony\Component\Messenger\Exception\RecoverableExceptionInterface;
|
||||||
use Symfony\Component\Messenger\Exception\RuntimeException;
|
use Symfony\Component\Messenger\Exception\RuntimeException;
|
||||||
@ -24,6 +25,7 @@ use Symfony\Component\Messenger\Stamp\DelayStamp;
|
|||||||
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
|
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
|
||||||
use Symfony\Component\Messenger\Stamp\StampInterface;
|
use Symfony\Component\Messenger\Stamp\StampInterface;
|
||||||
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
|
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
|
||||||
|
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author Tobias Schultze <http://tobion.de>
|
* @author Tobias Schultze <http://tobion.de>
|
||||||
@ -33,13 +35,15 @@ class SendFailedMessageForRetryListener implements EventSubscriberInterface
|
|||||||
private $sendersLocator;
|
private $sendersLocator;
|
||||||
private $retryStrategyLocator;
|
private $retryStrategyLocator;
|
||||||
private $logger;
|
private $logger;
|
||||||
|
private $eventDispatcher;
|
||||||
private $historySize;
|
private $historySize;
|
||||||
|
|
||||||
public function __construct(ContainerInterface $sendersLocator, ContainerInterface $retryStrategyLocator, LoggerInterface $logger = null, int $historySize = 10)
|
public function __construct(ContainerInterface $sendersLocator, ContainerInterface $retryStrategyLocator, LoggerInterface $logger = null, EventDispatcherInterface $eventDispatcher = null, int $historySize = 10)
|
||||||
{
|
{
|
||||||
$this->sendersLocator = $sendersLocator;
|
$this->sendersLocator = $sendersLocator;
|
||||||
$this->retryStrategyLocator = $retryStrategyLocator;
|
$this->retryStrategyLocator = $retryStrategyLocator;
|
||||||
$this->logger = $logger;
|
$this->logger = $logger;
|
||||||
|
$this->eventDispatcher = $eventDispatcher;
|
||||||
$this->historySize = $historySize;
|
$this->historySize = $historySize;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -74,6 +78,10 @@ class SendFailedMessageForRetryListener implements EventSubscriberInterface
|
|||||||
|
|
||||||
// re-send the message for retry
|
// re-send the message for retry
|
||||||
$this->getSenderForTransport($event->getReceiverName())->send($retryEnvelope);
|
$this->getSenderForTransport($event->getReceiverName())->send($retryEnvelope);
|
||||||
|
|
||||||
|
if (null !== $this->eventDispatcher) {
|
||||||
|
$this->eventDispatcher->dispatch(new WorkerMessageRetriedEvent($retryEnvelope, $event->getReceiverName()));
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
if (null !== $this->logger) {
|
if (null !== $this->logger) {
|
||||||
$this->logger->critical('Error thrown while handling message {class}. Removing from transport after {retryCount} retries. Error: "{error}"', $context + ['retryCount' => $retryCount, 'error' => $throwable->getMessage(), 'exception' => $throwable]);
|
$this->logger->critical('Error thrown while handling message {class}. Removing from transport after {retryCount} retries. Error: "{error}"', $context + ['retryCount' => $retryCount, 'error' => $throwable->getMessage(), 'exception' => $throwable]);
|
||||||
|
@ -21,6 +21,7 @@ use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
|
|||||||
use Symfony\Component\Messenger\Stamp\DelayStamp;
|
use Symfony\Component\Messenger\Stamp\DelayStamp;
|
||||||
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
|
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
|
||||||
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
|
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
|
||||||
|
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
|
||||||
|
|
||||||
class SendFailedMessageForRetryListenerTest extends TestCase
|
class SendFailedMessageForRetryListenerTest extends TestCase
|
||||||
{
|
{
|
||||||
@ -107,7 +108,10 @@ class SendFailedMessageForRetryListenerTest extends TestCase
|
|||||||
$retryStrategyLocator->expects($this->once())->method('has')->willReturn(true);
|
$retryStrategyLocator->expects($this->once())->method('has')->willReturn(true);
|
||||||
$retryStrategyLocator->expects($this->once())->method('get')->willReturn($retryStategy);
|
$retryStrategyLocator->expects($this->once())->method('get')->willReturn($retryStategy);
|
||||||
|
|
||||||
$listener = new SendFailedMessageForRetryListener($senderLocator, $retryStrategyLocator);
|
$eventDispatcher = $this->createMock(EventDispatcherInterface::class);
|
||||||
|
$eventDispatcher->expects($this->once())->method('dispatch');
|
||||||
|
|
||||||
|
$listener = new SendFailedMessageForRetryListener($senderLocator, $retryStrategyLocator, null, $eventDispatcher);
|
||||||
|
|
||||||
$event = new WorkerMessageFailedEvent($envelope, 'my_receiver', $exception);
|
$event = new WorkerMessageFailedEvent($envelope, 'my_receiver', $exception);
|
||||||
|
|
||||||
|
@ -25,6 +25,7 @@ use Symfony\Component\Messenger\MessageBusInterface;
|
|||||||
use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp;
|
use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp;
|
||||||
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
|
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
|
||||||
use Symfony\Component\Messenger\Stamp\SentStamp;
|
use Symfony\Component\Messenger\Stamp\SentStamp;
|
||||||
|
use Symfony\Component\Messenger\Stamp\StampInterface;
|
||||||
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
||||||
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
|
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
|
||||||
use Symfony\Component\Messenger\Worker;
|
use Symfony\Component\Messenger\Worker;
|
||||||
@ -243,14 +244,45 @@ class WorkerTest extends TestCase
|
|||||||
// make sure they were processed in the correct order
|
// make sure they were processed in the correct order
|
||||||
$this->assertSame([$envelope1, $envelope2, $envelope3, $envelope4, $envelope5, $envelope6], $processedEnvelopes);
|
$this->assertSame([$envelope1, $envelope2, $envelope3, $envelope4, $envelope5, $envelope6], $processedEnvelopes);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public function testWorkerMessageReceivedEventMutability()
|
||||||
|
{
|
||||||
|
$envelope = new Envelope(new DummyMessage('Hello'));
|
||||||
|
$receiver = new DummyReceiver([[$envelope]]);
|
||||||
|
|
||||||
|
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
|
||||||
|
$bus->method('dispatch')->willReturnArgument(0);
|
||||||
|
|
||||||
|
$eventDispatcher = new EventDispatcher();
|
||||||
|
$eventDispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(1));
|
||||||
|
|
||||||
|
$stamp = new class() implements StampInterface {
|
||||||
|
};
|
||||||
|
$listener = function (WorkerMessageReceivedEvent $event) use ($stamp) {
|
||||||
|
$event->addStamps($stamp);
|
||||||
|
};
|
||||||
|
|
||||||
|
$eventDispatcher->addListener(WorkerMessageReceivedEvent::class, $listener);
|
||||||
|
|
||||||
|
$worker = new Worker([$receiver], $bus, $eventDispatcher);
|
||||||
|
$worker->run();
|
||||||
|
|
||||||
|
$envelope = current($receiver->getAcknowledgedEnvelopes());
|
||||||
|
$this->assertCount(1, $envelope->all(\get_class($stamp)));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
class DummyReceiver implements ReceiverInterface
|
class DummyReceiver implements ReceiverInterface
|
||||||
{
|
{
|
||||||
private $deliveriesOfEnvelopes;
|
private $deliveriesOfEnvelopes;
|
||||||
|
private $acknowledgedEnvelopes;
|
||||||
|
private $rejectedEnvelopes;
|
||||||
private $acknowledgeCount = 0;
|
private $acknowledgeCount = 0;
|
||||||
private $rejectCount = 0;
|
private $rejectCount = 0;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param Envelope[][] $deliveriesOfEnvelopes
|
||||||
|
*/
|
||||||
public function __construct(array $deliveriesOfEnvelopes)
|
public function __construct(array $deliveriesOfEnvelopes)
|
||||||
{
|
{
|
||||||
$this->deliveriesOfEnvelopes = $deliveriesOfEnvelopes;
|
$this->deliveriesOfEnvelopes = $deliveriesOfEnvelopes;
|
||||||
@ -266,11 +298,13 @@ class DummyReceiver implements ReceiverInterface
|
|||||||
public function ack(Envelope $envelope): void
|
public function ack(Envelope $envelope): void
|
||||||
{
|
{
|
||||||
++$this->acknowledgeCount;
|
++$this->acknowledgeCount;
|
||||||
|
$this->acknowledgedEnvelopes[] = $envelope;
|
||||||
}
|
}
|
||||||
|
|
||||||
public function reject(Envelope $envelope): void
|
public function reject(Envelope $envelope): void
|
||||||
{
|
{
|
||||||
++$this->rejectCount;
|
++$this->rejectCount;
|
||||||
|
$this->rejectedEnvelopes[] = $envelope;
|
||||||
}
|
}
|
||||||
|
|
||||||
public function getAcknowledgeCount(): int
|
public function getAcknowledgeCount(): int
|
||||||
@ -282,4 +316,9 @@ class DummyReceiver implements ReceiverInterface
|
|||||||
{
|
{
|
||||||
return $this->rejectCount;
|
return $this->rejectCount;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public function getAcknowledgedEnvelopes(): array
|
||||||
|
{
|
||||||
|
return $this->acknowledgedEnvelopes;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -104,6 +104,7 @@ class Worker
|
|||||||
{
|
{
|
||||||
$event = new WorkerMessageReceivedEvent($envelope, $transportName);
|
$event = new WorkerMessageReceivedEvent($envelope, $transportName);
|
||||||
$this->dispatchEvent($event);
|
$this->dispatchEvent($event);
|
||||||
|
$envelope = $event->getEnvelope();
|
||||||
|
|
||||||
if (!$event->shouldHandle()) {
|
if (!$event->shouldHandle()) {
|
||||||
return;
|
return;
|
||||||
@ -123,7 +124,9 @@ class Worker
|
|||||||
$envelope = $throwable->getEnvelope();
|
$envelope = $throwable->getEnvelope();
|
||||||
}
|
}
|
||||||
|
|
||||||
$this->dispatchEvent(new WorkerMessageFailedEvent($envelope, $transportName, $throwable));
|
$failedEvent = new WorkerMessageFailedEvent($envelope, $transportName, $throwable);
|
||||||
|
$this->dispatchEvent($failedEvent);
|
||||||
|
$envelope = $failedEvent->getEnvelope();
|
||||||
|
|
||||||
if (!$rejectFirst) {
|
if (!$rejectFirst) {
|
||||||
$receiver->reject($envelope);
|
$receiver->reject($envelope);
|
||||||
@ -132,7 +135,9 @@ class Worker
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
$this->dispatchEvent(new WorkerMessageHandledEvent($envelope, $transportName));
|
$handledEvent = new WorkerMessageHandledEvent($envelope, $transportName);
|
||||||
|
$this->dispatchEvent($handledEvent);
|
||||||
|
$envelope = $handledEvent->getEnvelope();
|
||||||
|
|
||||||
if (null !== $this->logger) {
|
if (null !== $this->logger) {
|
||||||
$message = $envelope->getMessage();
|
$message = $envelope->getMessage();
|
||||||
|
Reference in New Issue
Block a user