[Messenger] Do not stack retry stamp
This commit is contained in:
parent
fb4c3f920f
commit
ad6f8532c6
@ -21,6 +21,7 @@ use Symfony\Component\Messenger\Exception\UnrecoverableExceptionInterface;
|
||||
use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
|
||||
use Symfony\Component\Messenger\Stamp\DelayStamp;
|
||||
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
|
||||
use Symfony\Component\Messenger\Stamp\StampInterface;
|
||||
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
|
||||
|
||||
/**
|
||||
@ -31,12 +32,14 @@ class SendFailedMessageForRetryListener implements EventSubscriberInterface
|
||||
private $sendersLocator;
|
||||
private $retryStrategyLocator;
|
||||
private $logger;
|
||||
private $historySize;
|
||||
|
||||
public function __construct(ContainerInterface $sendersLocator, ContainerInterface $retryStrategyLocator, LoggerInterface $logger = null)
|
||||
public function __construct(ContainerInterface $sendersLocator, ContainerInterface $retryStrategyLocator, LoggerInterface $logger = null, int $historySize = 10)
|
||||
{
|
||||
$this->sendersLocator = $sendersLocator;
|
||||
$this->retryStrategyLocator = $retryStrategyLocator;
|
||||
$this->logger = $logger;
|
||||
$this->historySize = $historySize;
|
||||
}
|
||||
|
||||
public function onMessageFailed(WorkerMessageFailedEvent $event)
|
||||
@ -64,7 +67,7 @@ class SendFailedMessageForRetryListener implements EventSubscriberInterface
|
||||
}
|
||||
|
||||
// add the delay and retry stamp info
|
||||
$retryEnvelope = $envelope->with(new DelayStamp($delay), new RedeliveryStamp($retryCount));
|
||||
$retryEnvelope = $this->withLimitedHistory($envelope, new DelayStamp($delay), new RedeliveryStamp($retryCount));
|
||||
|
||||
// re-send the message for retry
|
||||
$this->getSenderForTransport($event->getReceiverName())->send($retryEnvelope);
|
||||
@ -75,6 +78,30 @@ class SendFailedMessageForRetryListener implements EventSubscriberInterface
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds stamps to the envelope by keeping only the First + Last N stamps
|
||||
*/
|
||||
private function withLimitedHistory(Envelope $envelope, StampInterface ...$stamps): Envelope
|
||||
{
|
||||
foreach ($stamps as $stamp) {
|
||||
$history = $envelope->all(get_class($stamp));
|
||||
if (\count($history) < $this->historySize) {
|
||||
$envelope = $envelope->with($stamp);
|
||||
continue;
|
||||
}
|
||||
|
||||
$history = \array_merge(
|
||||
[$history[0]],
|
||||
\array_slice($history, -$this->historySize + 2),
|
||||
[$stamp]
|
||||
);
|
||||
|
||||
$envelope = $envelope->withoutAll(get_class($stamp))->with(...$history);
|
||||
}
|
||||
|
||||
return $envelope;
|
||||
}
|
||||
|
||||
public static function getSubscribedEvents()
|
||||
{
|
||||
return [
|
||||
|
@ -76,4 +76,40 @@ class SendFailedMessageForRetryListenerTest extends TestCase
|
||||
|
||||
$listener->onMessageFailed($event);
|
||||
}
|
||||
|
||||
public function testEnvelopeKeepOnlyTheLast10Stamps()
|
||||
{
|
||||
$exception = new \Exception('no!');
|
||||
$stamps = \array_merge(
|
||||
\array_fill(0, 15, new DelayStamp(1)),
|
||||
\array_fill(0, 3, new RedeliveryStamp(1))
|
||||
);
|
||||
$envelope = new Envelope(new \stdClass(), $stamps);
|
||||
|
||||
$sender = $this->createMock(SenderInterface::class);
|
||||
$sender->expects($this->once())->method('send')->willReturnCallback(function (Envelope $envelope) {
|
||||
$delayStamps = $envelope->all(DelayStamp::class);
|
||||
$redeliveryStamps = $envelope->all(RedeliveryStamp::class);
|
||||
|
||||
$this->assertCount(10, $delayStamps);
|
||||
$this->assertCount(4, $redeliveryStamps);
|
||||
|
||||
return $envelope;
|
||||
});
|
||||
$senderLocator = $this->createMock(ContainerInterface::class);
|
||||
$senderLocator->expects($this->once())->method('has')->willReturn(true);
|
||||
$senderLocator->expects($this->once())->method('get')->willReturn($sender);
|
||||
$retryStategy = $this->createMock(RetryStrategyInterface::class);
|
||||
$retryStategy->expects($this->once())->method('isRetryable')->willReturn(true);
|
||||
$retryStategy->expects($this->once())->method('getWaitingTime')->willReturn(1000);
|
||||
$retryStrategyLocator = $this->createMock(ContainerInterface::class);
|
||||
$retryStrategyLocator->expects($this->once())->method('has')->willReturn(true);
|
||||
$retryStrategyLocator->expects($this->once())->method('get')->willReturn($retryStategy);
|
||||
|
||||
$listener = new SendFailedMessageForRetryListener($senderLocator, $retryStrategyLocator);
|
||||
|
||||
$event = new WorkerMessageFailedEvent($envelope, 'my_receiver', $exception);
|
||||
|
||||
$listener->onMessageFailed($event);
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user