From 80b5df270f4ad968a1a34c17b9910e122cfc8038 Mon Sep 17 00:00:00 2001 From: Ryan Weaver Date: Wed, 8 May 2019 10:31:22 -0400 Subject: [PATCH] [Messenger] On failure retry, make message appear received from original sender --- .../FrameworkExtension.php | 1 + .../Resources/config/messenger.xml | 2 + .../FrameworkExtensionTest.php | 2 + .../Command/AbstractFailedMessagesCommand.php | 22 +- .../Command/FailedMessagesRetryCommand.php | 2 +- .../Command/FailedMessagesShowCommand.php | 10 +- ...ailedMessageToFailureTransportListener.php | 14 +- .../FailedMessageProcessingMiddleware.php | 38 +++ .../Messenger/Stamp/RedeliveryStamp.php | 27 +- .../Stamp/SentToFailureTransportStamp.php | 25 +- .../Command/FailedMessagesShowCommandTest.php | 10 +- ...dMessageToFailureTransportListenerTest.php | 17 +- .../Tests/FailureIntegrationTest.php | 281 ++++++++++++++++++ .../Messenger/Tests/RetryIntegrationTest.php | 4 +- .../Tests/Stamp/RedeliveryStampTest.php | 12 + .../Stamp/SentToFailureTransportStampTest.php | 13 +- 16 files changed, 410 insertions(+), 70 deletions(-) create mode 100644 src/Symfony/Component/Messenger/Middleware/FailedMessageProcessingMiddleware.php create mode 100644 src/Symfony/Component/Messenger/Tests/FailureIntegrationTest.php diff --git a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php index beac069806..b5843ad957 100644 --- a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php +++ b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php @@ -1659,6 +1659,7 @@ class FrameworkExtension extends Extension 'before' => [ ['id' => 'add_bus_name_stamp_middleware'], ['id' => 'dispatch_after_current_bus'], + ['id' => 'failed_message_processing_middleware'], ], 'after' => [ ['id' => 'send_message'], diff --git a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml index 14ffd4d382..0cbd2d8423 100644 --- a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml +++ b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml @@ -48,6 +48,8 @@ + + diff --git a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php index f1dade56c6..43e4a534c2 100644 --- a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php +++ b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php @@ -753,6 +753,7 @@ abstract class FrameworkExtensionTest extends TestCase $this->assertEquals([ ['id' => 'add_bus_name_stamp_middleware', 'arguments' => ['messenger.bus.commands']], ['id' => 'dispatch_after_current_bus'], + ['id' => 'failed_message_processing_middleware'], ['id' => 'send_message'], ['id' => 'handle_message'], ], $container->getParameter('messenger.bus.commands.middleware')); @@ -761,6 +762,7 @@ abstract class FrameworkExtensionTest extends TestCase $this->assertEquals([ ['id' => 'add_bus_name_stamp_middleware', 'arguments' => ['messenger.bus.events']], ['id' => 'dispatch_after_current_bus'], + ['id' => 'failed_message_processing_middleware'], ['id' => 'with_factory', 'arguments' => ['foo', true, ['bar' => 'baz']]], ['id' => 'send_message'], ['id' => 'handle_message'], diff --git a/src/Symfony/Component/Messenger/Command/AbstractFailedMessagesCommand.php b/src/Symfony/Component/Messenger/Command/AbstractFailedMessagesCommand.php index 854daba0e6..194d42c109 100644 --- a/src/Symfony/Component/Messenger/Command/AbstractFailedMessagesCommand.php +++ b/src/Symfony/Component/Messenger/Command/AbstractFailedMessagesCommand.php @@ -15,6 +15,7 @@ use Symfony\Component\Console\Command\Command; use Symfony\Component\Console\Helper\Dumper; use Symfony\Component\Console\Style\SymfonyStyle; use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Stamp\RedeliveryStamp; use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp; use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp; use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface; @@ -59,8 +60,10 @@ abstract class AbstractFailedMessagesCommand extends Command { $io->title('Failed Message Details'); - /** @var SentToFailureTransportStamp $sentToFailureTransportStamp */ + /** @var SentToFailureTransportStamp|null $sentToFailureTransportStamp */ $sentToFailureTransportStamp = $envelope->last(SentToFailureTransportStamp::class); + /** @var RedeliveryStamp|null $lastRedeliveryStamp */ + $lastRedeliveryStamp = $envelope->last(RedeliveryStamp::class); $rows = [ ['Class', \get_class($envelope->getMessage())], @@ -70,25 +73,34 @@ abstract class AbstractFailedMessagesCommand extends Command $rows[] = ['Message Id', $id]; } + $flattenException = null === $lastRedeliveryStamp ? null : $lastRedeliveryStamp->getFlattenException(); if (null === $sentToFailureTransportStamp) { $io->warning('Message does not appear to have been sent to this transport after failing'); } else { $rows = array_merge($rows, [ - ['Failed at', $sentToFailureTransportStamp->getSentAt()->format('Y-m-d H:i:s')], - ['Error', $sentToFailureTransportStamp->getExceptionMessage()], - ['Error Class', $sentToFailureTransportStamp->getFlattenException() ? $sentToFailureTransportStamp->getFlattenException()->getClass() : '(unknown)'], + ['Failed at', null === $lastRedeliveryStamp ? '' : $lastRedeliveryStamp->getRedeliveredAt()->format('Y-m-d H:i:s')], + ['Error', null === $lastRedeliveryStamp ? '' : $lastRedeliveryStamp->getExceptionMessage()], + ['Error Class', null === $flattenException ? '(unknown)' : $flattenException->getClass()], ['Transport', $sentToFailureTransportStamp->getOriginalReceiverName()], ]); } $io->table([], $rows); + /** @var RedeliveryStamp[] $redeliveryStamps */ + $redeliveryStamps = $envelope->all(RedeliveryStamp::class); + $io->writeln(' Message history:'); + foreach ($redeliveryStamps as $redeliveryStamp) { + $io->writeln(sprintf(' * Message failed and redelivered to the %s transport at %s', $redeliveryStamp->getSenderClassOrAlias(), $redeliveryStamp->getRedeliveredAt()->format('Y-m-d H:i:s'))); + } + $io->newLine(); + if ($io->isVeryVerbose()) { $io->title('Message:'); $dump = new Dumper($io); $io->writeln($dump($envelope->getMessage())); $io->title('Exception:'); - $io->writeln($sentToFailureTransportStamp->getFlattenException()->getTraceAsString()); + $io->writeln(null === $flattenException ? '(no data)' : $flattenException->getTraceAsString()); } else { $io->writeln(' Re-run command with -vv to see more message & error details.'); } diff --git a/src/Symfony/Component/Messenger/Command/FailedMessagesRetryCommand.php b/src/Symfony/Component/Messenger/Command/FailedMessagesRetryCommand.php index a2ebc290c1..33686e6549 100644 --- a/src/Symfony/Component/Messenger/Command/FailedMessagesRetryCommand.php +++ b/src/Symfony/Component/Messenger/Command/FailedMessagesRetryCommand.php @@ -154,7 +154,7 @@ EOF } // avoid success message if nothing was processed - if (1 < $count) { + if (1 <= $count) { $io->success('All failed messages have been handled or removed!'); } } diff --git a/src/Symfony/Component/Messenger/Command/FailedMessagesShowCommand.php b/src/Symfony/Component/Messenger/Command/FailedMessagesShowCommand.php index da49a4b7fe..0444d79f44 100644 --- a/src/Symfony/Component/Messenger/Command/FailedMessagesShowCommand.php +++ b/src/Symfony/Component/Messenger/Command/FailedMessagesShowCommand.php @@ -18,7 +18,7 @@ use Symfony\Component\Console\Input\InputOption; use Symfony\Component\Console\Output\ConsoleOutputInterface; use Symfony\Component\Console\Output\OutputInterface; use Symfony\Component\Console\Style\SymfonyStyle; -use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp; +use Symfony\Component\Messenger\Stamp\RedeliveryStamp; use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface; /** @@ -83,14 +83,14 @@ EOF $rows = []; foreach ($envelopes as $envelope) { - /** @var SentToFailureTransportStamp $sentToFailureTransportStamp */ - $sentToFailureTransportStamp = $envelope->last(SentToFailureTransportStamp::class); + /** @var RedeliveryStamp|null $lastRedeliveryStamp */ + $lastRedeliveryStamp = $envelope->last(RedeliveryStamp::class); $rows[] = [ $this->getMessageId($envelope), \get_class($envelope->getMessage()), - null === $sentToFailureTransportStamp ? '' : $sentToFailureTransportStamp->getSentAt()->format('Y-m-d H:i:s'), - null === $sentToFailureTransportStamp ? '' : $sentToFailureTransportStamp->getExceptionMessage(), + null === $lastRedeliveryStamp ? '' : $lastRedeliveryStamp->getRedeliveredAt()->format('Y-m-d H:i:s'), + null === $lastRedeliveryStamp ? '' : $lastRedeliveryStamp->getExceptionMessage(), ]; } diff --git a/src/Symfony/Component/Messenger/EventListener/SendFailedMessageToFailureTransportListener.php b/src/Symfony/Component/Messenger/EventListener/SendFailedMessageToFailureTransportListener.php index d62a8ed799..431d9ca97f 100644 --- a/src/Symfony/Component/Messenger/EventListener/SendFailedMessageToFailureTransportListener.php +++ b/src/Symfony/Component/Messenger/EventListener/SendFailedMessageToFailureTransportListener.php @@ -16,9 +16,9 @@ use Symfony\Component\EventDispatcher\EventSubscriberInterface; use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent; use Symfony\Component\Messenger\Exception\HandlerFailedException; use Symfony\Component\Messenger\MessageBusInterface; +use Symfony\Component\Messenger\Stamp\DelayStamp; use Symfony\Component\Messenger\Stamp\ReceivedStamp; use Symfony\Component\Messenger\Stamp\RedeliveryStamp; -use Symfony\Component\Messenger\Stamp\SentStamp; use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp; use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp; @@ -51,11 +51,8 @@ class SendFailedMessageToFailureTransportListener implements EventSubscriberInte $envelope = $event->getEnvelope(); // avoid re-sending to the failed sender - foreach ($envelope->all(SentStamp::class) as $sentStamp) { - /** @var SentStamp $sentStamp */ - if ($sentStamp->getSenderAlias() === $this->failureSenderAlias) { - return; - } + if (null !== $envelope->last(SentToFailureTransportStamp::class)) { + return; } // remove the received stamp so it's redelivered @@ -67,8 +64,9 @@ class SendFailedMessageToFailureTransportListener implements EventSubscriberInte $flattenedException = \class_exists(FlattenException::class) ? FlattenException::createFromThrowable($throwable) : null; $envelope = $envelope->withoutAll(ReceivedStamp::class) ->withoutAll(TransportMessageIdStamp::class) - ->with(new SentToFailureTransportStamp($throwable->getMessage(), $event->getReceiverName(), $flattenedException)) - ->with(new RedeliveryStamp(0, $this->failureSenderAlias)); + ->with(new SentToFailureTransportStamp($event->getReceiverName())) + ->with(new DelayStamp(0)) + ->with(new RedeliveryStamp(0, $this->failureSenderAlias, $throwable->getMessage(), $flattenedException)); if (null !== $this->logger) { $this->logger->info('Rejected message {class} will be sent to the failure transport {transport}.', [ diff --git a/src/Symfony/Component/Messenger/Middleware/FailedMessageProcessingMiddleware.php b/src/Symfony/Component/Messenger/Middleware/FailedMessageProcessingMiddleware.php new file mode 100644 index 0000000000..5d3f63fda6 --- /dev/null +++ b/src/Symfony/Component/Messenger/Middleware/FailedMessageProcessingMiddleware.php @@ -0,0 +1,38 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Middleware; + +use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Stamp\ReceivedStamp; +use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp; + +/** + * @author Ryan Weaver + * + * @experimental in 4.3 + */ +class FailedMessageProcessingMiddleware implements MiddlewareInterface +{ + public function handle(Envelope $envelope, StackInterface $stack): Envelope + { + // look for "received" messages decorated with the SentToFailureTransportStamp + /** @var SentToFailureTransportStamp|null $sentToFailureStamp */ + $sentToFailureStamp = $envelope->last(SentToFailureTransportStamp::class); + if (null !== $sentToFailureStamp && null !== $envelope->last(ReceivedStamp::class)) { + // mark the message as "received" from the original transport + // this guarantees the same behavior as when originally received + $envelope = $envelope->with(new ReceivedStamp($sentToFailureStamp->getOriginalReceiverName())); + } + + return $stack->next()->handle($envelope, $stack); + } +} diff --git a/src/Symfony/Component/Messenger/Stamp/RedeliveryStamp.php b/src/Symfony/Component/Messenger/Stamp/RedeliveryStamp.php index a0dd9b89d8..265ee1a628 100644 --- a/src/Symfony/Component/Messenger/Stamp/RedeliveryStamp.php +++ b/src/Symfony/Component/Messenger/Stamp/RedeliveryStamp.php @@ -11,6 +11,8 @@ namespace Symfony\Component\Messenger\Stamp; +use Symfony\Component\Debug\Exception\FlattenException; + /** * Stamp applied when a messages needs to be redelivered. * @@ -20,14 +22,20 @@ class RedeliveryStamp implements StampInterface { private $retryCount; private $senderClassOrAlias; + private $redeliveredAt; + private $exceptionMessage; + private $flattenException; /** * @param string $senderClassOrAlias Alias from SendersLocator or just the class name */ - public function __construct(int $retryCount, string $senderClassOrAlias) + public function __construct(int $retryCount, string $senderClassOrAlias, string $exceptionMessage = null, FlattenException $flattenException = null) { $this->retryCount = $retryCount; $this->senderClassOrAlias = $senderClassOrAlias; + $this->exceptionMessage = $exceptionMessage; + $this->flattenException = $flattenException; + $this->redeliveredAt = new \DateTimeImmutable(); } public function getRetryCount(): int @@ -36,7 +44,7 @@ class RedeliveryStamp implements StampInterface } /** - * Needed for this class to serialize through Symfony's serializer. + * The target sender this should be redelivered to. * * @internal */ @@ -44,4 +52,19 @@ class RedeliveryStamp implements StampInterface { return $this->senderClassOrAlias; } + + public function getExceptionMessage(): ?string + { + return $this->exceptionMessage; + } + + public function getFlattenException(): ?FlattenException + { + return $this->flattenException; + } + + public function getRedeliveredAt(): \DateTimeInterface + { + return $this->redeliveredAt; + } } diff --git a/src/Symfony/Component/Messenger/Stamp/SentToFailureTransportStamp.php b/src/Symfony/Component/Messenger/Stamp/SentToFailureTransportStamp.php index 222aa6a18d..c11574f6e7 100644 --- a/src/Symfony/Component/Messenger/Stamp/SentToFailureTransportStamp.php +++ b/src/Symfony/Component/Messenger/Stamp/SentToFailureTransportStamp.php @@ -11,8 +11,6 @@ namespace Symfony\Component\Messenger\Stamp; -use Symfony\Component\Debug\Exception\FlattenException; - /** * Stamp applied when a message is sent to the failure transport. * @@ -22,36 +20,15 @@ use Symfony\Component\Debug\Exception\FlattenException; */ class SentToFailureTransportStamp implements StampInterface { - private $exceptionMessage; private $originalReceiverName; - private $flattenException; - private $sentAt; - public function __construct(string $exceptionMessage, string $originalReceiverName, FlattenException $flattenException = null) + public function __construct(string $originalReceiverName) { - $this->exceptionMessage = $exceptionMessage; $this->originalReceiverName = $originalReceiverName; - $this->flattenException = $flattenException; - $this->sentAt = new \DateTimeImmutable(); - } - - public function getExceptionMessage(): string - { - return $this->exceptionMessage; } public function getOriginalReceiverName(): string { return $this->originalReceiverName; } - - public function getFlattenException(): ?FlattenException - { - return $this->flattenException; - } - - public function getSentAt(): \DateTimeInterface - { - return $this->sentAt; - } } diff --git a/src/Symfony/Component/Messenger/Tests/Command/FailedMessagesShowCommandTest.php b/src/Symfony/Component/Messenger/Tests/Command/FailedMessagesShowCommandTest.php index ad56462dbd..48c34fcaea 100644 --- a/src/Symfony/Component/Messenger/Tests/Command/FailedMessagesShowCommandTest.php +++ b/src/Symfony/Component/Messenger/Tests/Command/FailedMessagesShowCommandTest.php @@ -15,6 +15,7 @@ use PHPUnit\Framework\TestCase; use Symfony\Component\Console\Tester\CommandTester; use Symfony\Component\Messenger\Command\FailedMessagesShowCommand; use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Stamp\RedeliveryStamp; use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp; use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp; use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface; @@ -26,10 +27,12 @@ class FailedMessagesShowCommandTest extends TestCase { public function testBasicRun() { - $sentToFailureStamp = new SentToFailureTransportStamp('Things are bad!', 'async'); + $sentToFailureStamp = new SentToFailureTransportStamp('async'); + $redeliveryStamp = new RedeliveryStamp(0, 'failure_receiver', 'Things are bad!'); $envelope = new Envelope(new \stdClass(), [ new TransportMessageIdStamp(15), $sentToFailureStamp, + $redeliveryStamp, ]); $receiver = $this->createMock(ListableReceiverInterface::class); $receiver->expects($this->once())->method('find')->with(15)->willReturn($envelope); @@ -48,10 +51,11 @@ class FailedMessagesShowCommandTest extends TestCase Message Id 15 Failed at %s Error Things are bad! - Error Class (unknown) + Error Class (unknown) + Transport async EOF , - $sentToFailureStamp->getSentAt()->format('Y-m-d H:i:s')), + $redeliveryStamp->getRedeliveredAt()->format('Y-m-d H:i:s')), $tester->getDisplay(true)); } } diff --git a/src/Symfony/Component/Messenger/Tests/EventListener/SendFailedMessageToFailureTransportListenerTest.php b/src/Symfony/Component/Messenger/Tests/EventListener/SendFailedMessageToFailureTransportListenerTest.php index 9f23e462c0..628a4be3ee 100644 --- a/src/Symfony/Component/Messenger/Tests/EventListener/SendFailedMessageToFailureTransportListenerTest.php +++ b/src/Symfony/Component/Messenger/Tests/EventListener/SendFailedMessageToFailureTransportListenerTest.php @@ -19,7 +19,6 @@ use Symfony\Component\Messenger\Exception\HandlerFailedException; use Symfony\Component\Messenger\MessageBusInterface; use Symfony\Component\Messenger\Stamp\ReceivedStamp; use Symfony\Component\Messenger\Stamp\RedeliveryStamp; -use Symfony\Component\Messenger\Stamp\SentStamp; use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp; use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp; @@ -35,13 +34,13 @@ class SendFailedMessageToFailureTransportListenerTest extends TestCase /** @var SentToFailureTransportStamp $sentToFailureTransportStamp */ $sentToFailureTransportStamp = $envelope->last(SentToFailureTransportStamp::class); $this->assertNotNull($sentToFailureTransportStamp); - $this->assertSame('no!', $sentToFailureTransportStamp->getExceptionMessage()); $this->assertSame('my_receiver', $sentToFailureTransportStamp->getOriginalReceiverName()); - $this->assertSame('no!', $sentToFailureTransportStamp->getFlattenException()->getMessage()); /** @var RedeliveryStamp $redeliveryStamp */ $redeliveryStamp = $envelope->last(RedeliveryStamp::class); $this->assertSame('failure_sender', $redeliveryStamp->getSenderClassOrAlias()); + $this->assertSame('no!', $redeliveryStamp->getExceptionMessage()); + $this->assertSame('no!', $redeliveryStamp->getFlattenException()->getMessage()); $this->assertNull($envelope->last(ReceivedStamp::class)); $this->assertNull($envelope->last(TransportMessageIdStamp::class)); @@ -65,11 +64,11 @@ class SendFailedMessageToFailureTransportListenerTest extends TestCase $bus = $this->createMock(MessageBusInterface::class); $bus->expects($this->once())->method('dispatch')->with($this->callback(function ($envelope) { /** @var Envelope $envelope */ - /** @var SentToFailureTransportStamp $sentToFailureTransportStamp */ - $sentToFailureTransportStamp = $envelope->last(SentToFailureTransportStamp::class); - $this->assertNotNull($sentToFailureTransportStamp); - $this->assertSame('I am inside!', $sentToFailureTransportStamp->getExceptionMessage()); - $this->assertSame('Exception', $sentToFailureTransportStamp->getFlattenException()->getClass()); + /** @var RedeliveryStamp $redeliveryStamp */ + $redeliveryStamp = $envelope->last(RedeliveryStamp::class); + $this->assertNotNull($redeliveryStamp); + $this->assertSame('I am inside!', $redeliveryStamp->getExceptionMessage()); + $this->assertSame('Exception', $redeliveryStamp->getFlattenException()->getClass()); return true; }))->willReturn(new Envelope(new \stdClass())); @@ -112,7 +111,7 @@ class SendFailedMessageToFailureTransportListenerTest extends TestCase ); $envelope = new Envelope(new \stdClass(), [ - new SentStamp('MySender', 'failure_sender'), + new SentToFailureTransportStamp('my_receiver'), ]); $event = new WorkerMessageFailedEvent($envelope, 'my_receiver', new \Exception(''), false); diff --git a/src/Symfony/Component/Messenger/Tests/FailureIntegrationTest.php b/src/Symfony/Component/Messenger/Tests/FailureIntegrationTest.php new file mode 100644 index 0000000000..abe78be759 --- /dev/null +++ b/src/Symfony/Component/Messenger/Tests/FailureIntegrationTest.php @@ -0,0 +1,281 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Tests; + +use PHPUnit\Framework\TestCase; +use Psr\Container\ContainerInterface; +use Symfony\Component\DependencyInjection\Container; +use Symfony\Component\EventDispatcher\EventDispatcher; +use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent; +use Symfony\Component\Messenger\EventListener\SendFailedMessageToFailureTransportListener; +use Symfony\Component\Messenger\Exception\HandlerFailedException; +use Symfony\Component\Messenger\Handler\HandlerDescriptor; +use Symfony\Component\Messenger\Handler\HandlersLocator; +use Symfony\Component\Messenger\MessageBus; +use Symfony\Component\Messenger\Middleware\FailedMessageProcessingMiddleware; +use Symfony\Component\Messenger\Middleware\HandleMessageMiddleware; +use Symfony\Component\Messenger\Middleware\SendMessageMiddleware; +use Symfony\Component\Messenger\Retry\MultiplierRetryStrategy; +use Symfony\Component\Messenger\Stamp\RedeliveryStamp; +use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp; +use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; +use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface; +use Symfony\Component\Messenger\Transport\Sender\SenderInterface; +use Symfony\Component\Messenger\Transport\Sender\SendersLocator; +use Symfony\Component\Messenger\Worker; + +class FailureIntegrationTest extends TestCase +{ + public function testRequeMechanism() + { + $transport1 = new DummyFailureTestSenderAndReceiver(); + $transport2 = new DummyFailureTestSenderAndReceiver(); + $failureTransport = new DummyFailureTestSenderAndReceiver(); + $transports = [ + 'transport1' => $transport1, + 'transport2' => $transport2, + 'the_failure_transport' => $failureTransport, + ]; + + $locator = $this->createMock(ContainerInterface::class); + $locator->expects($this->any()) + ->method('has') + ->willReturn(true); + $locator->expects($this->any()) + ->method('get') + ->willReturnCallback(function ($transportName) use ($transports) { + return $transports[$transportName]; + }); + $senderLocator = new SendersLocator( + [DummyMessage::class => ['transport1', 'transport2']], + $locator + ); + + // using to so we can lazily get the bus later and avoid circular problem + $transport1HandlerThatFails = new DummyTestHandler(true); + $allTransportHandlerThatWorks = new DummyTestHandler(false); + $transport2HandlerThatWorks = new DummyTestHandler(false); + $container = new Container(); + $handlerLocator = new HandlersLocator([ + DummyMessage::class => [ + new HandlerDescriptor($transport1HandlerThatFails, [ + 'from_transport' => 'transport1', + 'alias' => 'handler_that_fails', + ]), + new HandlerDescriptor($allTransportHandlerThatWorks, [ + 'alias' => 'handler_that_works1', + ]), + new HandlerDescriptor($transport2HandlerThatWorks, [ + 'from_transport' => 'transport2', + 'alias' => 'handler_that_works2', + ]), + ], + ]); + + $dispatcher = new EventDispatcher(); + $bus = new MessageBus([ + new FailedMessageProcessingMiddleware(), + new SendMessageMiddleware($senderLocator), + new HandleMessageMiddleware($handlerLocator), + ]); + $container->set('bus', $bus); + $dispatcher->addSubscriber(new SendFailedMessageToFailureTransportListener($bus, 'the_failure_transport')); + + $runWorker = function (string $transportName, int $maxRetries) use ($transports, $bus, $dispatcher): ?\Throwable { + $throwable = null; + $failedListener = function (WorkerMessageFailedEvent $event) use (&$throwable) { + $throwable = $event->getThrowable(); + }; + $dispatcher->addListener(WorkerMessageFailedEvent::class, $failedListener); + + $worker = new Worker([$transportName => $transports[$transportName]], $bus, [$transportName => new MultiplierRetryStrategy($maxRetries)], $dispatcher); + + $worker->run([], function (?Envelope $envelope) use ($worker) { + // handle one envelope, then stop + if (null !== $envelope) { + $worker->stop(); + } + }); + + $dispatcher->removeListener(WorkerMessageFailedEvent::class, $failedListener); + + return $throwable; + }; + + // send the message + $envelope = new Envelope(new DummyMessage('API')); + $bus->dispatch($envelope); + + // message has been sent + $this->assertCount(1, $transport1->getMessagesWaitingToBeReceived()); + $this->assertCount(1, $transport2->getMessagesWaitingToBeReceived()); + $this->assertCount(0, $failureTransport->getMessagesWaitingToBeReceived()); + + // receive the message - one handler will fail and the message + // will be sent back to transport1 to be retried + /* + * Receive the message from "transport1" + */ + $throwable = $runWorker('transport1', 1); + // make sure this is failing for the reason we think + $this->assertInstanceOf(HandlerFailedException::class, $throwable); + // handler for transport1 and all transports were called + $this->assertSame(1, $transport1HandlerThatFails->getTimesCalled()); + $this->assertSame(1, $allTransportHandlerThatWorks->getTimesCalled()); + $this->assertSame(0, $transport2HandlerThatWorks->getTimesCalled()); + // one handler failed and the message is retried (resent to transport1) + $this->assertCount(1, $transport1->getMessagesWaitingToBeReceived()); + $this->assertEmpty($failureTransport->getMessagesWaitingToBeReceived()); + + /* + * Receive the message for a (final) retry + */ + $runWorker('transport1', 1); + // only the "failed" handler is called a 2nd time + $this->assertSame(2, $transport1HandlerThatFails->getTimesCalled()); + $this->assertSame(1, $allTransportHandlerThatWorks->getTimesCalled()); + // handling fails again, message is sent to failure transport + $this->assertCount(0, $transport1->getMessagesWaitingToBeReceived()); + $this->assertCount(1, $failureTransport->getMessagesWaitingToBeReceived()); + /** @var Envelope $failedEnvelope */ + $failedEnvelope = $failureTransport->getMessagesWaitingToBeReceived()[0]; + /** @var SentToFailureTransportStamp $sentToFailureStamp */ + $sentToFailureStamp = $failedEnvelope->last(SentToFailureTransportStamp::class); + $this->assertNotNull($sentToFailureStamp); + /** @var RedeliveryStamp $redeliveryStamp */ + $redeliveryStamp = $failedEnvelope->last(RedeliveryStamp::class); + $this->assertNotNull($redeliveryStamp); + $this->assertSame('Failure from call 2', $redeliveryStamp->getExceptionMessage()); + + /* + * Failed message is handled, fails, and sent for a retry + */ + $throwable = $runWorker('the_failure_transport', 1); + // make sure this is failing for the reason we think + $this->assertInstanceOf(HandlerFailedException::class, $throwable); + // only the "failed" handler is called a 3rd time + $this->assertSame(3, $transport1HandlerThatFails->getTimesCalled()); + $this->assertSame(1, $allTransportHandlerThatWorks->getTimesCalled()); + // handling fails again, message is retried + $this->assertCount(1, $failureTransport->getMessagesWaitingToBeReceived()); + // transport2 still only holds the original message + // a new message was never mistakenly delivered to it + $this->assertCount(1, $transport2->getMessagesWaitingToBeReceived()); + + /* + * Message is retried on failure transport then discarded + */ + $runWorker('the_failure_transport', 1); + // only the "failed" handler is called a 4th time + $this->assertSame(4, $transport1HandlerThatFails->getTimesCalled()); + $this->assertSame(1, $allTransportHandlerThatWorks->getTimesCalled()); + // handling fails again, message is discarded + $this->assertCount(0, $failureTransport->getMessagesWaitingToBeReceived()); + + /* + * Execute handlers on transport2 + */ + $runWorker('transport2', 1); + // transport1 handler is not called again + $this->assertSame(4, $transport1HandlerThatFails->getTimesCalled()); + // all transport handler is now called again + $this->assertSame(2, $allTransportHandlerThatWorks->getTimesCalled()); + // transport1 handler called for the first time + $this->assertSame(1, $transport2HandlerThatWorks->getTimesCalled()); + // all transport should be empty + $this->assertEmpty($transport1->getMessagesWaitingToBeReceived()); + $this->assertEmpty($transport2->getMessagesWaitingToBeReceived()); + $this->assertEmpty($failureTransport->getMessagesWaitingToBeReceived()); + + /* + * Dispatch the original message again + */ + $bus->dispatch($envelope); + // handle the message, but with no retries + $runWorker('transport1', 0); + // now make the handler work! + $transport1HandlerThatFails->setShouldThrow(false); + $runWorker('the_failure_transport', 1); + // the failure transport is empty because it worked + $this->assertEmpty($failureTransport->getMessagesWaitingToBeReceived()); + } +} + +class DummyFailureTestSenderAndReceiver implements ReceiverInterface, SenderInterface +{ + private $messagesWaiting = []; + + public function get(): iterable + { + $message = array_shift($this->messagesWaiting); + + if (null === $message) { + return []; + } + + return [$message]; + } + + public function ack(Envelope $envelope): void + { + } + + public function reject(Envelope $envelope): void + { + } + + public function send(Envelope $envelope): Envelope + { + $this->messagesWaiting[] = $envelope; + + return $envelope; + } + + /** + * @return Envelope[] + */ + public function getMessagesWaitingToBeReceived(): array + { + return $this->messagesWaiting; + } +} + +class DummyTestHandler +{ + private $timesCalled = 0; + private $shouldThrow; + + public function __construct(bool $shouldThrow) + { + $this->shouldThrow = $shouldThrow; + } + + public function __invoke() + { + ++$this->timesCalled; + + if ($this->shouldThrow) { + throw new \Exception('Failure from call '.$this->timesCalled); + } + } + + public function getTimesCalled(): int + { + return $this->timesCalled; + } + + public function setShouldThrow(bool $shouldThrow) + { + $this->shouldThrow = $shouldThrow; + } +} diff --git a/src/Symfony/Component/Messenger/Tests/RetryIntegrationTest.php b/src/Symfony/Component/Messenger/Tests/RetryIntegrationTest.php index 71b9efcf50..64ff20bae3 100644 --- a/src/Symfony/Component/Messenger/Tests/RetryIntegrationTest.php +++ b/src/Symfony/Component/Messenger/Tests/RetryIntegrationTest.php @@ -38,8 +38,8 @@ class RetryIntegrationTest extends TestCase $senderLocator->method('get')->with('sender_alias')->willReturn($senderAndReceiver); $senderLocator = new SendersLocator([DummyMessage::class => ['sender_alias']], $senderLocator); - $handler = new DummyMessageHandlerFailingFirstTimes(0, 'A'); - $throwingHandler = new DummyMessageHandlerFailingFirstTimes(1, 'B'); + $handler = new DummyMessageHandlerFailingFirstTimes(0); + $throwingHandler = new DummyMessageHandlerFailingFirstTimes(1); $handlerLocator = new HandlersLocator([ DummyMessage::class => [ new HandlerDescriptor($handler, ['alias' => 'first']), diff --git a/src/Symfony/Component/Messenger/Tests/Stamp/RedeliveryStampTest.php b/src/Symfony/Component/Messenger/Tests/Stamp/RedeliveryStampTest.php index 6617abc8e5..ebff684b3d 100644 --- a/src/Symfony/Component/Messenger/Tests/Stamp/RedeliveryStampTest.php +++ b/src/Symfony/Component/Messenger/Tests/Stamp/RedeliveryStampTest.php @@ -12,6 +12,7 @@ namespace Symfony\Component\Messenger\Tests\Stamp; use PHPUnit\Framework\TestCase; +use Symfony\Component\Debug\Exception\FlattenException; use Symfony\Component\Messenger\Stamp\RedeliveryStamp; class RedeliveryStampTest extends TestCase @@ -21,5 +22,16 @@ class RedeliveryStampTest extends TestCase $stamp = new RedeliveryStamp(10, 'sender_alias'); $this->assertSame(10, $stamp->getRetryCount()); $this->assertSame('sender_alias', $stamp->getSenderClassOrAlias()); + $this->assertInstanceOf(\DateTimeInterface::class, $stamp->getRedeliveredAt()); + $this->assertNull($stamp->getExceptionMessage()); + $this->assertNull($stamp->getFlattenException()); + } + + public function testGettersPopulated() + { + $flattenException = new FlattenException(); + $stamp = new RedeliveryStamp(10, 'sender_alias', 'exception message', $flattenException); + $this->assertSame('exception message', $stamp->getExceptionMessage()); + $this->assertSame($flattenException, $stamp->getFlattenException()); } } diff --git a/src/Symfony/Component/Messenger/Tests/Stamp/SentToFailureTransportStampTest.php b/src/Symfony/Component/Messenger/Tests/Stamp/SentToFailureTransportStampTest.php index 3b9579a646..733555a605 100644 --- a/src/Symfony/Component/Messenger/Tests/Stamp/SentToFailureTransportStampTest.php +++ b/src/Symfony/Component/Messenger/Tests/Stamp/SentToFailureTransportStampTest.php @@ -12,22 +12,13 @@ namespace Symfony\Component\Messenger\Tests\Stamp; use PHPUnit\Framework\TestCase; -use Symfony\Component\Debug\Exception\FlattenException; use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp; class SentToFailureTransportStampTest extends TestCase { - public function testGetters() + public function testGetOriginalReceiverName() { - $flattenException = new FlattenException(); - $stamp = new SentToFailureTransportStamp( - 'exception message', - 'original_receiver', - $flattenException - ); - $this->assertSame('exception message', $stamp->getExceptionMessage()); + $stamp = new SentToFailureTransportStamp('original_receiver'); $this->assertSame('original_receiver', $stamp->getOriginalReceiverName()); - $this->assertSame($flattenException, $stamp->getFlattenException()); - $this->assertInstanceOf(\DateTimeInterface::class, $stamp->getSentAt()); } }