[Messenger] make retry logic work without SentStamp
This commit is contained in:
parent
a9bcdcc936
commit
0034dee641
@ -34,9 +34,9 @@ class RetryIntegrationTest extends TestCase
|
|||||||
$senderAndReceiver = new DummySenderAndReceiver();
|
$senderAndReceiver = new DummySenderAndReceiver();
|
||||||
|
|
||||||
$senderLocator = $this->createMock(ContainerInterface::class);
|
$senderLocator = $this->createMock(ContainerInterface::class);
|
||||||
$senderLocator->method('has')->with('sender_alias')->willReturn(true);
|
$senderLocator->method('has')->with('transportName')->willReturn(true);
|
||||||
$senderLocator->method('get')->with('sender_alias')->willReturn($senderAndReceiver);
|
$senderLocator->method('get')->with('transportName')->willReturn($senderAndReceiver);
|
||||||
$senderLocator = new SendersLocator([DummyMessage::class => ['sender_alias']], $senderLocator);
|
$senderLocator = new SendersLocator([DummyMessage::class => ['transportName']], $senderLocator);
|
||||||
|
|
||||||
$handler = new DummyMessageHandlerFailingFirstTimes(0);
|
$handler = new DummyMessageHandlerFailingFirstTimes(0);
|
||||||
$throwingHandler = new DummyMessageHandlerFailingFirstTimes(1);
|
$throwingHandler = new DummyMessageHandlerFailingFirstTimes(1);
|
||||||
@ -52,7 +52,7 @@ class RetryIntegrationTest extends TestCase
|
|||||||
$envelope = new Envelope(new DummyMessage('API'));
|
$envelope = new Envelope(new DummyMessage('API'));
|
||||||
$bus->dispatch($envelope);
|
$bus->dispatch($envelope);
|
||||||
|
|
||||||
$worker = new Worker(['receiverName' => $senderAndReceiver], $bus, ['receiverName' => new MultiplierRetryStrategy()]);
|
$worker = new Worker(['transportName' => $senderAndReceiver], $bus, ['transportName' => new MultiplierRetryStrategy()]);
|
||||||
$worker->run([], function (?Envelope $envelope) use ($worker) {
|
$worker->run([], function (?Envelope $envelope) use ($worker) {
|
||||||
if (null === $envelope) {
|
if (null === $envelope) {
|
||||||
$worker->stop();
|
$worker->stop();
|
||||||
|
@ -84,7 +84,7 @@ class WorkerTest extends TestCase
|
|||||||
public function testDispatchCausesRetry()
|
public function testDispatchCausesRetry()
|
||||||
{
|
{
|
||||||
$receiver = new DummyReceiver([
|
$receiver = new DummyReceiver([
|
||||||
[new Envelope(new DummyMessage('Hello'), [new SentStamp('Some\Sender', 'sender_alias')])],
|
[new Envelope(new DummyMessage('Hello'), [new SentStamp('Some\Sender', 'transport1')])],
|
||||||
]);
|
]);
|
||||||
|
|
||||||
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
|
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
|
||||||
@ -97,7 +97,7 @@ class WorkerTest extends TestCase
|
|||||||
$this->assertNotNull($redeliveryStamp);
|
$this->assertNotNull($redeliveryStamp);
|
||||||
// retry count now at 1
|
// retry count now at 1
|
||||||
$this->assertSame(1, $redeliveryStamp->getRetryCount());
|
$this->assertSame(1, $redeliveryStamp->getRetryCount());
|
||||||
$this->assertSame('sender_alias', $redeliveryStamp->getSenderClassOrAlias());
|
$this->assertSame('transport1', $redeliveryStamp->getSenderClassOrAlias());
|
||||||
|
|
||||||
// received stamp is removed
|
// received stamp is removed
|
||||||
$this->assertNull($envelope->last(ReceivedStamp::class));
|
$this->assertNull($envelope->last(ReceivedStamp::class));
|
||||||
@ -108,7 +108,7 @@ class WorkerTest extends TestCase
|
|||||||
$retryStrategy = $this->getMockBuilder(RetryStrategyInterface::class)->getMock();
|
$retryStrategy = $this->getMockBuilder(RetryStrategyInterface::class)->getMock();
|
||||||
$retryStrategy->expects($this->once())->method('isRetryable')->willReturn(true);
|
$retryStrategy->expects($this->once())->method('isRetryable')->willReturn(true);
|
||||||
|
|
||||||
$worker = new Worker(['receiver1' => $receiver], $bus, ['receiver1' => $retryStrategy]);
|
$worker = new Worker(['transport1' => $receiver], $bus, ['transport1' => $retryStrategy]);
|
||||||
$worker->run([], function (?Envelope $envelope) use ($worker) {
|
$worker->run([], function (?Envelope $envelope) use ($worker) {
|
||||||
// stop after the messages finish
|
// stop after the messages finish
|
||||||
if (null === $envelope) {
|
if (null === $envelope) {
|
||||||
@ -123,7 +123,7 @@ class WorkerTest extends TestCase
|
|||||||
public function testDispatchCausesRejectWhenNoRetry()
|
public function testDispatchCausesRejectWhenNoRetry()
|
||||||
{
|
{
|
||||||
$receiver = new DummyReceiver([
|
$receiver = new DummyReceiver([
|
||||||
[new Envelope(new DummyMessage('Hello'), [new SentStamp('Some\Sender', 'sender_alias')])],
|
[new Envelope(new DummyMessage('Hello'), [new SentStamp('Some\Sender', 'transport1')])],
|
||||||
]);
|
]);
|
||||||
|
|
||||||
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
|
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
|
||||||
@ -132,7 +132,7 @@ class WorkerTest extends TestCase
|
|||||||
$retryStrategy = $this->getMockBuilder(RetryStrategyInterface::class)->getMock();
|
$retryStrategy = $this->getMockBuilder(RetryStrategyInterface::class)->getMock();
|
||||||
$retryStrategy->expects($this->once())->method('isRetryable')->willReturn(false);
|
$retryStrategy->expects($this->once())->method('isRetryable')->willReturn(false);
|
||||||
|
|
||||||
$worker = new Worker(['receiver1' => $receiver], $bus, ['receiver1' => $retryStrategy]);
|
$worker = new Worker(['transport1' => $receiver], $bus, ['transport1' => $retryStrategy]);
|
||||||
$worker->run([], function (?Envelope $envelope) use ($worker) {
|
$worker->run([], function (?Envelope $envelope) use ($worker) {
|
||||||
// stop after the messages finish
|
// stop after the messages finish
|
||||||
if (null === $envelope) {
|
if (null === $envelope) {
|
||||||
@ -155,7 +155,7 @@ class WorkerTest extends TestCase
|
|||||||
$retryStrategy = $this->getMockBuilder(RetryStrategyInterface::class)->getMock();
|
$retryStrategy = $this->getMockBuilder(RetryStrategyInterface::class)->getMock();
|
||||||
$retryStrategy->expects($this->never())->method('isRetryable');
|
$retryStrategy->expects($this->never())->method('isRetryable');
|
||||||
|
|
||||||
$worker = new Worker(['receiver1' => $receiver], $bus, ['receiver1' => $retryStrategy]);
|
$worker = new Worker(['transport1' => $receiver], $bus, ['transport1' => $retryStrategy]);
|
||||||
$worker->run([], function (?Envelope $envelope) use ($worker) {
|
$worker->run([], function (?Envelope $envelope) use ($worker) {
|
||||||
// stop after the messages finish
|
// stop after the messages finish
|
||||||
if (null === $envelope) {
|
if (null === $envelope) {
|
||||||
|
@ -17,13 +17,11 @@ use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent;
|
|||||||
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
|
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
|
||||||
use Symfony\Component\Messenger\Event\WorkerStoppedEvent;
|
use Symfony\Component\Messenger\Event\WorkerStoppedEvent;
|
||||||
use Symfony\Component\Messenger\Exception\HandlerFailedException;
|
use Symfony\Component\Messenger\Exception\HandlerFailedException;
|
||||||
use Symfony\Component\Messenger\Exception\LogicException;
|
|
||||||
use Symfony\Component\Messenger\Exception\UnrecoverableMessageHandlingException;
|
use Symfony\Component\Messenger\Exception\UnrecoverableMessageHandlingException;
|
||||||
use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
|
use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
|
||||||
use Symfony\Component\Messenger\Stamp\DelayStamp;
|
use Symfony\Component\Messenger\Stamp\DelayStamp;
|
||||||
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
|
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
|
||||||
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
|
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
|
||||||
use Symfony\Component\Messenger\Stamp\SentStamp;
|
|
||||||
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
|
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
|
||||||
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
|
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
|
||||||
|
|
||||||
@ -150,7 +148,7 @@ class Worker implements WorkerInterface
|
|||||||
|
|
||||||
// add the delay and retry stamp info + remove ReceivedStamp
|
// add the delay and retry stamp info + remove ReceivedStamp
|
||||||
$retryEnvelope = $envelope->with(new DelayStamp($delay))
|
$retryEnvelope = $envelope->with(new DelayStamp($delay))
|
||||||
->with(new RedeliveryStamp($retryCount, $this->getSenderClassOrAlias($envelope)))
|
->with(new RedeliveryStamp($retryCount, $transportName))
|
||||||
->withoutAll(ReceivedStamp::class);
|
->withoutAll(ReceivedStamp::class);
|
||||||
|
|
||||||
// re-send the message
|
// re-send the message
|
||||||
@ -197,28 +195,6 @@ class Worker implements WorkerInterface
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
$sentStamp = $envelope->last(SentStamp::class);
|
|
||||||
if (null === $sentStamp) {
|
|
||||||
if (null !== $this->logger) {
|
|
||||||
$this->logger->warning('Message will not be retried because the SentStamp is missing and so the target sender cannot be determined.');
|
|
||||||
}
|
|
||||||
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
return $retryStrategy->isRetryable($envelope);
|
return $retryStrategy->isRetryable($envelope);
|
||||||
}
|
}
|
||||||
|
|
||||||
private function getSenderClassOrAlias(Envelope $envelope): string
|
|
||||||
{
|
|
||||||
/** @var SentStamp|null $sentStamp */
|
|
||||||
$sentStamp = $envelope->last(SentStamp::class);
|
|
||||||
|
|
||||||
if (null === $sentStamp) {
|
|
||||||
// should not happen, because of the check in shouldRetry()
|
|
||||||
throw new LogicException('Could not find SentStamp.');
|
|
||||||
}
|
|
||||||
|
|
||||||
return $sentStamp->getSenderAlias() ?: $sentStamp->getSenderClass();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user