diff --git a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php
index beac069806..b5843ad957 100644
--- a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php
+++ b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php
@@ -1659,6 +1659,7 @@ class FrameworkExtension extends Extension
'before' => [
['id' => 'add_bus_name_stamp_middleware'],
['id' => 'dispatch_after_current_bus'],
+ ['id' => 'failed_message_processing_middleware'],
],
'after' => [
['id' => 'send_message'],
diff --git a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml
index 85a1a43261..be938cba31 100644
--- a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml
+++ b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml
@@ -48,6 +48,8 @@
+
+
diff --git a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php
index f1dade56c6..43e4a534c2 100644
--- a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php
+++ b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php
@@ -753,6 +753,7 @@ abstract class FrameworkExtensionTest extends TestCase
$this->assertEquals([
['id' => 'add_bus_name_stamp_middleware', 'arguments' => ['messenger.bus.commands']],
['id' => 'dispatch_after_current_bus'],
+ ['id' => 'failed_message_processing_middleware'],
['id' => 'send_message'],
['id' => 'handle_message'],
], $container->getParameter('messenger.bus.commands.middleware'));
@@ -761,6 +762,7 @@ abstract class FrameworkExtensionTest extends TestCase
$this->assertEquals([
['id' => 'add_bus_name_stamp_middleware', 'arguments' => ['messenger.bus.events']],
['id' => 'dispatch_after_current_bus'],
+ ['id' => 'failed_message_processing_middleware'],
['id' => 'with_factory', 'arguments' => ['foo', true, ['bar' => 'baz']]],
['id' => 'send_message'],
['id' => 'handle_message'],
diff --git a/src/Symfony/Component/Messenger/Command/AbstractFailedMessagesCommand.php b/src/Symfony/Component/Messenger/Command/AbstractFailedMessagesCommand.php
index 854daba0e6..194d42c109 100644
--- a/src/Symfony/Component/Messenger/Command/AbstractFailedMessagesCommand.php
+++ b/src/Symfony/Component/Messenger/Command/AbstractFailedMessagesCommand.php
@@ -15,6 +15,7 @@ use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Helper\Dumper;
use Symfony\Component\Console\Style\SymfonyStyle;
use Symfony\Component\Messenger\Envelope;
+use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
@@ -59,8 +60,10 @@ abstract class AbstractFailedMessagesCommand extends Command
{
$io->title('Failed Message Details');
- /** @var SentToFailureTransportStamp $sentToFailureTransportStamp */
+ /** @var SentToFailureTransportStamp|null $sentToFailureTransportStamp */
$sentToFailureTransportStamp = $envelope->last(SentToFailureTransportStamp::class);
+ /** @var RedeliveryStamp|null $lastRedeliveryStamp */
+ $lastRedeliveryStamp = $envelope->last(RedeliveryStamp::class);
$rows = [
['Class', \get_class($envelope->getMessage())],
@@ -70,25 +73,34 @@ abstract class AbstractFailedMessagesCommand extends Command
$rows[] = ['Message Id', $id];
}
+ $flattenException = null === $lastRedeliveryStamp ? null : $lastRedeliveryStamp->getFlattenException();
if (null === $sentToFailureTransportStamp) {
$io->warning('Message does not appear to have been sent to this transport after failing');
} else {
$rows = array_merge($rows, [
- ['Failed at', $sentToFailureTransportStamp->getSentAt()->format('Y-m-d H:i:s')],
- ['Error', $sentToFailureTransportStamp->getExceptionMessage()],
- ['Error Class', $sentToFailureTransportStamp->getFlattenException() ? $sentToFailureTransportStamp->getFlattenException()->getClass() : '(unknown)'],
+ ['Failed at', null === $lastRedeliveryStamp ? '' : $lastRedeliveryStamp->getRedeliveredAt()->format('Y-m-d H:i:s')],
+ ['Error', null === $lastRedeliveryStamp ? '' : $lastRedeliveryStamp->getExceptionMessage()],
+ ['Error Class', null === $flattenException ? '(unknown)' : $flattenException->getClass()],
['Transport', $sentToFailureTransportStamp->getOriginalReceiverName()],
]);
}
$io->table([], $rows);
+ /** @var RedeliveryStamp[] $redeliveryStamps */
+ $redeliveryStamps = $envelope->all(RedeliveryStamp::class);
+ $io->writeln(' Message history:');
+ foreach ($redeliveryStamps as $redeliveryStamp) {
+ $io->writeln(sprintf(' * Message failed and redelivered to the %s transport at %s', $redeliveryStamp->getSenderClassOrAlias(), $redeliveryStamp->getRedeliveredAt()->format('Y-m-d H:i:s')));
+ }
+ $io->newLine();
+
if ($io->isVeryVerbose()) {
$io->title('Message:');
$dump = new Dumper($io);
$io->writeln($dump($envelope->getMessage()));
$io->title('Exception:');
- $io->writeln($sentToFailureTransportStamp->getFlattenException()->getTraceAsString());
+ $io->writeln(null === $flattenException ? '(no data)' : $flattenException->getTraceAsString());
} else {
$io->writeln(' Re-run command with -vv to see more message & error details.');
}
diff --git a/src/Symfony/Component/Messenger/Command/FailedMessagesRetryCommand.php b/src/Symfony/Component/Messenger/Command/FailedMessagesRetryCommand.php
index a2ebc290c1..33686e6549 100644
--- a/src/Symfony/Component/Messenger/Command/FailedMessagesRetryCommand.php
+++ b/src/Symfony/Component/Messenger/Command/FailedMessagesRetryCommand.php
@@ -154,7 +154,7 @@ EOF
}
// avoid success message if nothing was processed
- if (1 < $count) {
+ if (1 <= $count) {
$io->success('All failed messages have been handled or removed!');
}
}
diff --git a/src/Symfony/Component/Messenger/Command/FailedMessagesShowCommand.php b/src/Symfony/Component/Messenger/Command/FailedMessagesShowCommand.php
index da49a4b7fe..0444d79f44 100644
--- a/src/Symfony/Component/Messenger/Command/FailedMessagesShowCommand.php
+++ b/src/Symfony/Component/Messenger/Command/FailedMessagesShowCommand.php
@@ -18,7 +18,7 @@ use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\ConsoleOutputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Style\SymfonyStyle;
-use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
+use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
/**
@@ -83,14 +83,14 @@ EOF
$rows = [];
foreach ($envelopes as $envelope) {
- /** @var SentToFailureTransportStamp $sentToFailureTransportStamp */
- $sentToFailureTransportStamp = $envelope->last(SentToFailureTransportStamp::class);
+ /** @var RedeliveryStamp|null $lastRedeliveryStamp */
+ $lastRedeliveryStamp = $envelope->last(RedeliveryStamp::class);
$rows[] = [
$this->getMessageId($envelope),
\get_class($envelope->getMessage()),
- null === $sentToFailureTransportStamp ? '' : $sentToFailureTransportStamp->getSentAt()->format('Y-m-d H:i:s'),
- null === $sentToFailureTransportStamp ? '' : $sentToFailureTransportStamp->getExceptionMessage(),
+ null === $lastRedeliveryStamp ? '' : $lastRedeliveryStamp->getRedeliveredAt()->format('Y-m-d H:i:s'),
+ null === $lastRedeliveryStamp ? '' : $lastRedeliveryStamp->getExceptionMessage(),
];
}
diff --git a/src/Symfony/Component/Messenger/EventListener/SendFailedMessageToFailureTransportListener.php b/src/Symfony/Component/Messenger/EventListener/SendFailedMessageToFailureTransportListener.php
index d62a8ed799..431d9ca97f 100644
--- a/src/Symfony/Component/Messenger/EventListener/SendFailedMessageToFailureTransportListener.php
+++ b/src/Symfony/Component/Messenger/EventListener/SendFailedMessageToFailureTransportListener.php
@@ -16,9 +16,9 @@ use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
use Symfony\Component\Messenger\Exception\HandlerFailedException;
use Symfony\Component\Messenger\MessageBusInterface;
+use Symfony\Component\Messenger\Stamp\DelayStamp;
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
-use Symfony\Component\Messenger\Stamp\SentStamp;
use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
@@ -51,11 +51,8 @@ class SendFailedMessageToFailureTransportListener implements EventSubscriberInte
$envelope = $event->getEnvelope();
// avoid re-sending to the failed sender
- foreach ($envelope->all(SentStamp::class) as $sentStamp) {
- /** @var SentStamp $sentStamp */
- if ($sentStamp->getSenderAlias() === $this->failureSenderAlias) {
- return;
- }
+ if (null !== $envelope->last(SentToFailureTransportStamp::class)) {
+ return;
}
// remove the received stamp so it's redelivered
@@ -67,8 +64,9 @@ class SendFailedMessageToFailureTransportListener implements EventSubscriberInte
$flattenedException = \class_exists(FlattenException::class) ? FlattenException::createFromThrowable($throwable) : null;
$envelope = $envelope->withoutAll(ReceivedStamp::class)
->withoutAll(TransportMessageIdStamp::class)
- ->with(new SentToFailureTransportStamp($throwable->getMessage(), $event->getReceiverName(), $flattenedException))
- ->with(new RedeliveryStamp(0, $this->failureSenderAlias));
+ ->with(new SentToFailureTransportStamp($event->getReceiverName()))
+ ->with(new DelayStamp(0))
+ ->with(new RedeliveryStamp(0, $this->failureSenderAlias, $throwable->getMessage(), $flattenedException));
if (null !== $this->logger) {
$this->logger->info('Rejected message {class} will be sent to the failure transport {transport}.', [
diff --git a/src/Symfony/Component/Messenger/Middleware/FailedMessageProcessingMiddleware.php b/src/Symfony/Component/Messenger/Middleware/FailedMessageProcessingMiddleware.php
new file mode 100644
index 0000000000..5d3f63fda6
--- /dev/null
+++ b/src/Symfony/Component/Messenger/Middleware/FailedMessageProcessingMiddleware.php
@@ -0,0 +1,38 @@
+
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Symfony\Component\Messenger\Middleware;
+
+use Symfony\Component\Messenger\Envelope;
+use Symfony\Component\Messenger\Stamp\ReceivedStamp;
+use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
+
+/**
+ * @author Ryan Weaver
+ *
+ * @experimental in 4.3
+ */
+class FailedMessageProcessingMiddleware implements MiddlewareInterface
+{
+ public function handle(Envelope $envelope, StackInterface $stack): Envelope
+ {
+ // look for "received" messages decorated with the SentToFailureTransportStamp
+ /** @var SentToFailureTransportStamp|null $sentToFailureStamp */
+ $sentToFailureStamp = $envelope->last(SentToFailureTransportStamp::class);
+ if (null !== $sentToFailureStamp && null !== $envelope->last(ReceivedStamp::class)) {
+ // mark the message as "received" from the original transport
+ // this guarantees the same behavior as when originally received
+ $envelope = $envelope->with(new ReceivedStamp($sentToFailureStamp->getOriginalReceiverName()));
+ }
+
+ return $stack->next()->handle($envelope, $stack);
+ }
+}
diff --git a/src/Symfony/Component/Messenger/Stamp/RedeliveryStamp.php b/src/Symfony/Component/Messenger/Stamp/RedeliveryStamp.php
index a0dd9b89d8..265ee1a628 100644
--- a/src/Symfony/Component/Messenger/Stamp/RedeliveryStamp.php
+++ b/src/Symfony/Component/Messenger/Stamp/RedeliveryStamp.php
@@ -11,6 +11,8 @@
namespace Symfony\Component\Messenger\Stamp;
+use Symfony\Component\Debug\Exception\FlattenException;
+
/**
* Stamp applied when a messages needs to be redelivered.
*
@@ -20,14 +22,20 @@ class RedeliveryStamp implements StampInterface
{
private $retryCount;
private $senderClassOrAlias;
+ private $redeliveredAt;
+ private $exceptionMessage;
+ private $flattenException;
/**
* @param string $senderClassOrAlias Alias from SendersLocator or just the class name
*/
- public function __construct(int $retryCount, string $senderClassOrAlias)
+ public function __construct(int $retryCount, string $senderClassOrAlias, string $exceptionMessage = null, FlattenException $flattenException = null)
{
$this->retryCount = $retryCount;
$this->senderClassOrAlias = $senderClassOrAlias;
+ $this->exceptionMessage = $exceptionMessage;
+ $this->flattenException = $flattenException;
+ $this->redeliveredAt = new \DateTimeImmutable();
}
public function getRetryCount(): int
@@ -36,7 +44,7 @@ class RedeliveryStamp implements StampInterface
}
/**
- * Needed for this class to serialize through Symfony's serializer.
+ * The target sender this should be redelivered to.
*
* @internal
*/
@@ -44,4 +52,19 @@ class RedeliveryStamp implements StampInterface
{
return $this->senderClassOrAlias;
}
+
+ public function getExceptionMessage(): ?string
+ {
+ return $this->exceptionMessage;
+ }
+
+ public function getFlattenException(): ?FlattenException
+ {
+ return $this->flattenException;
+ }
+
+ public function getRedeliveredAt(): \DateTimeInterface
+ {
+ return $this->redeliveredAt;
+ }
}
diff --git a/src/Symfony/Component/Messenger/Stamp/SentToFailureTransportStamp.php b/src/Symfony/Component/Messenger/Stamp/SentToFailureTransportStamp.php
index 222aa6a18d..c11574f6e7 100644
--- a/src/Symfony/Component/Messenger/Stamp/SentToFailureTransportStamp.php
+++ b/src/Symfony/Component/Messenger/Stamp/SentToFailureTransportStamp.php
@@ -11,8 +11,6 @@
namespace Symfony\Component\Messenger\Stamp;
-use Symfony\Component\Debug\Exception\FlattenException;
-
/**
* Stamp applied when a message is sent to the failure transport.
*
@@ -22,36 +20,15 @@ use Symfony\Component\Debug\Exception\FlattenException;
*/
class SentToFailureTransportStamp implements StampInterface
{
- private $exceptionMessage;
private $originalReceiverName;
- private $flattenException;
- private $sentAt;
- public function __construct(string $exceptionMessage, string $originalReceiverName, FlattenException $flattenException = null)
+ public function __construct(string $originalReceiverName)
{
- $this->exceptionMessage = $exceptionMessage;
$this->originalReceiverName = $originalReceiverName;
- $this->flattenException = $flattenException;
- $this->sentAt = new \DateTimeImmutable();
- }
-
- public function getExceptionMessage(): string
- {
- return $this->exceptionMessage;
}
public function getOriginalReceiverName(): string
{
return $this->originalReceiverName;
}
-
- public function getFlattenException(): ?FlattenException
- {
- return $this->flattenException;
- }
-
- public function getSentAt(): \DateTimeInterface
- {
- return $this->sentAt;
- }
}
diff --git a/src/Symfony/Component/Messenger/Tests/Command/FailedMessagesShowCommandTest.php b/src/Symfony/Component/Messenger/Tests/Command/FailedMessagesShowCommandTest.php
index ad56462dbd..48c34fcaea 100644
--- a/src/Symfony/Component/Messenger/Tests/Command/FailedMessagesShowCommandTest.php
+++ b/src/Symfony/Component/Messenger/Tests/Command/FailedMessagesShowCommandTest.php
@@ -15,6 +15,7 @@ use PHPUnit\Framework\TestCase;
use Symfony\Component\Console\Tester\CommandTester;
use Symfony\Component\Messenger\Command\FailedMessagesShowCommand;
use Symfony\Component\Messenger\Envelope;
+use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
@@ -26,10 +27,12 @@ class FailedMessagesShowCommandTest extends TestCase
{
public function testBasicRun()
{
- $sentToFailureStamp = new SentToFailureTransportStamp('Things are bad!', 'async');
+ $sentToFailureStamp = new SentToFailureTransportStamp('async');
+ $redeliveryStamp = new RedeliveryStamp(0, 'failure_receiver', 'Things are bad!');
$envelope = new Envelope(new \stdClass(), [
new TransportMessageIdStamp(15),
$sentToFailureStamp,
+ $redeliveryStamp,
]);
$receiver = $this->createMock(ListableReceiverInterface::class);
$receiver->expects($this->once())->method('find')->with(15)->willReturn($envelope);
@@ -48,10 +51,11 @@ class FailedMessagesShowCommandTest extends TestCase
Message Id 15
Failed at %s
Error Things are bad!
- Error Class (unknown)
+ Error Class (unknown)
+ Transport async
EOF
,
- $sentToFailureStamp->getSentAt()->format('Y-m-d H:i:s')),
+ $redeliveryStamp->getRedeliveredAt()->format('Y-m-d H:i:s')),
$tester->getDisplay(true));
}
}
diff --git a/src/Symfony/Component/Messenger/Tests/EventListener/SendFailedMessageToFailureTransportListenerTest.php b/src/Symfony/Component/Messenger/Tests/EventListener/SendFailedMessageToFailureTransportListenerTest.php
index 9f23e462c0..628a4be3ee 100644
--- a/src/Symfony/Component/Messenger/Tests/EventListener/SendFailedMessageToFailureTransportListenerTest.php
+++ b/src/Symfony/Component/Messenger/Tests/EventListener/SendFailedMessageToFailureTransportListenerTest.php
@@ -19,7 +19,6 @@ use Symfony\Component\Messenger\Exception\HandlerFailedException;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
-use Symfony\Component\Messenger\Stamp\SentStamp;
use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
@@ -35,13 +34,13 @@ class SendFailedMessageToFailureTransportListenerTest extends TestCase
/** @var SentToFailureTransportStamp $sentToFailureTransportStamp */
$sentToFailureTransportStamp = $envelope->last(SentToFailureTransportStamp::class);
$this->assertNotNull($sentToFailureTransportStamp);
- $this->assertSame('no!', $sentToFailureTransportStamp->getExceptionMessage());
$this->assertSame('my_receiver', $sentToFailureTransportStamp->getOriginalReceiverName());
- $this->assertSame('no!', $sentToFailureTransportStamp->getFlattenException()->getMessage());
/** @var RedeliveryStamp $redeliveryStamp */
$redeliveryStamp = $envelope->last(RedeliveryStamp::class);
$this->assertSame('failure_sender', $redeliveryStamp->getSenderClassOrAlias());
+ $this->assertSame('no!', $redeliveryStamp->getExceptionMessage());
+ $this->assertSame('no!', $redeliveryStamp->getFlattenException()->getMessage());
$this->assertNull($envelope->last(ReceivedStamp::class));
$this->assertNull($envelope->last(TransportMessageIdStamp::class));
@@ -65,11 +64,11 @@ class SendFailedMessageToFailureTransportListenerTest extends TestCase
$bus = $this->createMock(MessageBusInterface::class);
$bus->expects($this->once())->method('dispatch')->with($this->callback(function ($envelope) {
/** @var Envelope $envelope */
- /** @var SentToFailureTransportStamp $sentToFailureTransportStamp */
- $sentToFailureTransportStamp = $envelope->last(SentToFailureTransportStamp::class);
- $this->assertNotNull($sentToFailureTransportStamp);
- $this->assertSame('I am inside!', $sentToFailureTransportStamp->getExceptionMessage());
- $this->assertSame('Exception', $sentToFailureTransportStamp->getFlattenException()->getClass());
+ /** @var RedeliveryStamp $redeliveryStamp */
+ $redeliveryStamp = $envelope->last(RedeliveryStamp::class);
+ $this->assertNotNull($redeliveryStamp);
+ $this->assertSame('I am inside!', $redeliveryStamp->getExceptionMessage());
+ $this->assertSame('Exception', $redeliveryStamp->getFlattenException()->getClass());
return true;
}))->willReturn(new Envelope(new \stdClass()));
@@ -112,7 +111,7 @@ class SendFailedMessageToFailureTransportListenerTest extends TestCase
);
$envelope = new Envelope(new \stdClass(), [
- new SentStamp('MySender', 'failure_sender'),
+ new SentToFailureTransportStamp('my_receiver'),
]);
$event = new WorkerMessageFailedEvent($envelope, 'my_receiver', new \Exception(''), false);
diff --git a/src/Symfony/Component/Messenger/Tests/FailureIntegrationTest.php b/src/Symfony/Component/Messenger/Tests/FailureIntegrationTest.php
new file mode 100644
index 0000000000..abe78be759
--- /dev/null
+++ b/src/Symfony/Component/Messenger/Tests/FailureIntegrationTest.php
@@ -0,0 +1,281 @@
+
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Symfony\Component\Messenger\Tests;
+
+use PHPUnit\Framework\TestCase;
+use Psr\Container\ContainerInterface;
+use Symfony\Component\DependencyInjection\Container;
+use Symfony\Component\EventDispatcher\EventDispatcher;
+use Symfony\Component\Messenger\Envelope;
+use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
+use Symfony\Component\Messenger\EventListener\SendFailedMessageToFailureTransportListener;
+use Symfony\Component\Messenger\Exception\HandlerFailedException;
+use Symfony\Component\Messenger\Handler\HandlerDescriptor;
+use Symfony\Component\Messenger\Handler\HandlersLocator;
+use Symfony\Component\Messenger\MessageBus;
+use Symfony\Component\Messenger\Middleware\FailedMessageProcessingMiddleware;
+use Symfony\Component\Messenger\Middleware\HandleMessageMiddleware;
+use Symfony\Component\Messenger\Middleware\SendMessageMiddleware;
+use Symfony\Component\Messenger\Retry\MultiplierRetryStrategy;
+use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
+use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
+use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
+use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
+use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
+use Symfony\Component\Messenger\Transport\Sender\SendersLocator;
+use Symfony\Component\Messenger\Worker;
+
+class FailureIntegrationTest extends TestCase
+{
+ public function testRequeMechanism()
+ {
+ $transport1 = new DummyFailureTestSenderAndReceiver();
+ $transport2 = new DummyFailureTestSenderAndReceiver();
+ $failureTransport = new DummyFailureTestSenderAndReceiver();
+ $transports = [
+ 'transport1' => $transport1,
+ 'transport2' => $transport2,
+ 'the_failure_transport' => $failureTransport,
+ ];
+
+ $locator = $this->createMock(ContainerInterface::class);
+ $locator->expects($this->any())
+ ->method('has')
+ ->willReturn(true);
+ $locator->expects($this->any())
+ ->method('get')
+ ->willReturnCallback(function ($transportName) use ($transports) {
+ return $transports[$transportName];
+ });
+ $senderLocator = new SendersLocator(
+ [DummyMessage::class => ['transport1', 'transport2']],
+ $locator
+ );
+
+ // using to so we can lazily get the bus later and avoid circular problem
+ $transport1HandlerThatFails = new DummyTestHandler(true);
+ $allTransportHandlerThatWorks = new DummyTestHandler(false);
+ $transport2HandlerThatWorks = new DummyTestHandler(false);
+ $container = new Container();
+ $handlerLocator = new HandlersLocator([
+ DummyMessage::class => [
+ new HandlerDescriptor($transport1HandlerThatFails, [
+ 'from_transport' => 'transport1',
+ 'alias' => 'handler_that_fails',
+ ]),
+ new HandlerDescriptor($allTransportHandlerThatWorks, [
+ 'alias' => 'handler_that_works1',
+ ]),
+ new HandlerDescriptor($transport2HandlerThatWorks, [
+ 'from_transport' => 'transport2',
+ 'alias' => 'handler_that_works2',
+ ]),
+ ],
+ ]);
+
+ $dispatcher = new EventDispatcher();
+ $bus = new MessageBus([
+ new FailedMessageProcessingMiddleware(),
+ new SendMessageMiddleware($senderLocator),
+ new HandleMessageMiddleware($handlerLocator),
+ ]);
+ $container->set('bus', $bus);
+ $dispatcher->addSubscriber(new SendFailedMessageToFailureTransportListener($bus, 'the_failure_transport'));
+
+ $runWorker = function (string $transportName, int $maxRetries) use ($transports, $bus, $dispatcher): ?\Throwable {
+ $throwable = null;
+ $failedListener = function (WorkerMessageFailedEvent $event) use (&$throwable) {
+ $throwable = $event->getThrowable();
+ };
+ $dispatcher->addListener(WorkerMessageFailedEvent::class, $failedListener);
+
+ $worker = new Worker([$transportName => $transports[$transportName]], $bus, [$transportName => new MultiplierRetryStrategy($maxRetries)], $dispatcher);
+
+ $worker->run([], function (?Envelope $envelope) use ($worker) {
+ // handle one envelope, then stop
+ if (null !== $envelope) {
+ $worker->stop();
+ }
+ });
+
+ $dispatcher->removeListener(WorkerMessageFailedEvent::class, $failedListener);
+
+ return $throwable;
+ };
+
+ // send the message
+ $envelope = new Envelope(new DummyMessage('API'));
+ $bus->dispatch($envelope);
+
+ // message has been sent
+ $this->assertCount(1, $transport1->getMessagesWaitingToBeReceived());
+ $this->assertCount(1, $transport2->getMessagesWaitingToBeReceived());
+ $this->assertCount(0, $failureTransport->getMessagesWaitingToBeReceived());
+
+ // receive the message - one handler will fail and the message
+ // will be sent back to transport1 to be retried
+ /*
+ * Receive the message from "transport1"
+ */
+ $throwable = $runWorker('transport1', 1);
+ // make sure this is failing for the reason we think
+ $this->assertInstanceOf(HandlerFailedException::class, $throwable);
+ // handler for transport1 and all transports were called
+ $this->assertSame(1, $transport1HandlerThatFails->getTimesCalled());
+ $this->assertSame(1, $allTransportHandlerThatWorks->getTimesCalled());
+ $this->assertSame(0, $transport2HandlerThatWorks->getTimesCalled());
+ // one handler failed and the message is retried (resent to transport1)
+ $this->assertCount(1, $transport1->getMessagesWaitingToBeReceived());
+ $this->assertEmpty($failureTransport->getMessagesWaitingToBeReceived());
+
+ /*
+ * Receive the message for a (final) retry
+ */
+ $runWorker('transport1', 1);
+ // only the "failed" handler is called a 2nd time
+ $this->assertSame(2, $transport1HandlerThatFails->getTimesCalled());
+ $this->assertSame(1, $allTransportHandlerThatWorks->getTimesCalled());
+ // handling fails again, message is sent to failure transport
+ $this->assertCount(0, $transport1->getMessagesWaitingToBeReceived());
+ $this->assertCount(1, $failureTransport->getMessagesWaitingToBeReceived());
+ /** @var Envelope $failedEnvelope */
+ $failedEnvelope = $failureTransport->getMessagesWaitingToBeReceived()[0];
+ /** @var SentToFailureTransportStamp $sentToFailureStamp */
+ $sentToFailureStamp = $failedEnvelope->last(SentToFailureTransportStamp::class);
+ $this->assertNotNull($sentToFailureStamp);
+ /** @var RedeliveryStamp $redeliveryStamp */
+ $redeliveryStamp = $failedEnvelope->last(RedeliveryStamp::class);
+ $this->assertNotNull($redeliveryStamp);
+ $this->assertSame('Failure from call 2', $redeliveryStamp->getExceptionMessage());
+
+ /*
+ * Failed message is handled, fails, and sent for a retry
+ */
+ $throwable = $runWorker('the_failure_transport', 1);
+ // make sure this is failing for the reason we think
+ $this->assertInstanceOf(HandlerFailedException::class, $throwable);
+ // only the "failed" handler is called a 3rd time
+ $this->assertSame(3, $transport1HandlerThatFails->getTimesCalled());
+ $this->assertSame(1, $allTransportHandlerThatWorks->getTimesCalled());
+ // handling fails again, message is retried
+ $this->assertCount(1, $failureTransport->getMessagesWaitingToBeReceived());
+ // transport2 still only holds the original message
+ // a new message was never mistakenly delivered to it
+ $this->assertCount(1, $transport2->getMessagesWaitingToBeReceived());
+
+ /*
+ * Message is retried on failure transport then discarded
+ */
+ $runWorker('the_failure_transport', 1);
+ // only the "failed" handler is called a 4th time
+ $this->assertSame(4, $transport1HandlerThatFails->getTimesCalled());
+ $this->assertSame(1, $allTransportHandlerThatWorks->getTimesCalled());
+ // handling fails again, message is discarded
+ $this->assertCount(0, $failureTransport->getMessagesWaitingToBeReceived());
+
+ /*
+ * Execute handlers on transport2
+ */
+ $runWorker('transport2', 1);
+ // transport1 handler is not called again
+ $this->assertSame(4, $transport1HandlerThatFails->getTimesCalled());
+ // all transport handler is now called again
+ $this->assertSame(2, $allTransportHandlerThatWorks->getTimesCalled());
+ // transport1 handler called for the first time
+ $this->assertSame(1, $transport2HandlerThatWorks->getTimesCalled());
+ // all transport should be empty
+ $this->assertEmpty($transport1->getMessagesWaitingToBeReceived());
+ $this->assertEmpty($transport2->getMessagesWaitingToBeReceived());
+ $this->assertEmpty($failureTransport->getMessagesWaitingToBeReceived());
+
+ /*
+ * Dispatch the original message again
+ */
+ $bus->dispatch($envelope);
+ // handle the message, but with no retries
+ $runWorker('transport1', 0);
+ // now make the handler work!
+ $transport1HandlerThatFails->setShouldThrow(false);
+ $runWorker('the_failure_transport', 1);
+ // the failure transport is empty because it worked
+ $this->assertEmpty($failureTransport->getMessagesWaitingToBeReceived());
+ }
+}
+
+class DummyFailureTestSenderAndReceiver implements ReceiverInterface, SenderInterface
+{
+ private $messagesWaiting = [];
+
+ public function get(): iterable
+ {
+ $message = array_shift($this->messagesWaiting);
+
+ if (null === $message) {
+ return [];
+ }
+
+ return [$message];
+ }
+
+ public function ack(Envelope $envelope): void
+ {
+ }
+
+ public function reject(Envelope $envelope): void
+ {
+ }
+
+ public function send(Envelope $envelope): Envelope
+ {
+ $this->messagesWaiting[] = $envelope;
+
+ return $envelope;
+ }
+
+ /**
+ * @return Envelope[]
+ */
+ public function getMessagesWaitingToBeReceived(): array
+ {
+ return $this->messagesWaiting;
+ }
+}
+
+class DummyTestHandler
+{
+ private $timesCalled = 0;
+ private $shouldThrow;
+
+ public function __construct(bool $shouldThrow)
+ {
+ $this->shouldThrow = $shouldThrow;
+ }
+
+ public function __invoke()
+ {
+ ++$this->timesCalled;
+
+ if ($this->shouldThrow) {
+ throw new \Exception('Failure from call '.$this->timesCalled);
+ }
+ }
+
+ public function getTimesCalled(): int
+ {
+ return $this->timesCalled;
+ }
+
+ public function setShouldThrow(bool $shouldThrow)
+ {
+ $this->shouldThrow = $shouldThrow;
+ }
+}
diff --git a/src/Symfony/Component/Messenger/Tests/RetryIntegrationTest.php b/src/Symfony/Component/Messenger/Tests/RetryIntegrationTest.php
index 71b9efcf50..64ff20bae3 100644
--- a/src/Symfony/Component/Messenger/Tests/RetryIntegrationTest.php
+++ b/src/Symfony/Component/Messenger/Tests/RetryIntegrationTest.php
@@ -38,8 +38,8 @@ class RetryIntegrationTest extends TestCase
$senderLocator->method('get')->with('sender_alias')->willReturn($senderAndReceiver);
$senderLocator = new SendersLocator([DummyMessage::class => ['sender_alias']], $senderLocator);
- $handler = new DummyMessageHandlerFailingFirstTimes(0, 'A');
- $throwingHandler = new DummyMessageHandlerFailingFirstTimes(1, 'B');
+ $handler = new DummyMessageHandlerFailingFirstTimes(0);
+ $throwingHandler = new DummyMessageHandlerFailingFirstTimes(1);
$handlerLocator = new HandlersLocator([
DummyMessage::class => [
new HandlerDescriptor($handler, ['alias' => 'first']),
diff --git a/src/Symfony/Component/Messenger/Tests/Stamp/RedeliveryStampTest.php b/src/Symfony/Component/Messenger/Tests/Stamp/RedeliveryStampTest.php
index 6617abc8e5..ebff684b3d 100644
--- a/src/Symfony/Component/Messenger/Tests/Stamp/RedeliveryStampTest.php
+++ b/src/Symfony/Component/Messenger/Tests/Stamp/RedeliveryStampTest.php
@@ -12,6 +12,7 @@
namespace Symfony\Component\Messenger\Tests\Stamp;
use PHPUnit\Framework\TestCase;
+use Symfony\Component\Debug\Exception\FlattenException;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
class RedeliveryStampTest extends TestCase
@@ -21,5 +22,16 @@ class RedeliveryStampTest extends TestCase
$stamp = new RedeliveryStamp(10, 'sender_alias');
$this->assertSame(10, $stamp->getRetryCount());
$this->assertSame('sender_alias', $stamp->getSenderClassOrAlias());
+ $this->assertInstanceOf(\DateTimeInterface::class, $stamp->getRedeliveredAt());
+ $this->assertNull($stamp->getExceptionMessage());
+ $this->assertNull($stamp->getFlattenException());
+ }
+
+ public function testGettersPopulated()
+ {
+ $flattenException = new FlattenException();
+ $stamp = new RedeliveryStamp(10, 'sender_alias', 'exception message', $flattenException);
+ $this->assertSame('exception message', $stamp->getExceptionMessage());
+ $this->assertSame($flattenException, $stamp->getFlattenException());
}
}
diff --git a/src/Symfony/Component/Messenger/Tests/Stamp/SentToFailureTransportStampTest.php b/src/Symfony/Component/Messenger/Tests/Stamp/SentToFailureTransportStampTest.php
index 3b9579a646..733555a605 100644
--- a/src/Symfony/Component/Messenger/Tests/Stamp/SentToFailureTransportStampTest.php
+++ b/src/Symfony/Component/Messenger/Tests/Stamp/SentToFailureTransportStampTest.php
@@ -12,22 +12,13 @@
namespace Symfony\Component\Messenger\Tests\Stamp;
use PHPUnit\Framework\TestCase;
-use Symfony\Component\Debug\Exception\FlattenException;
use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
class SentToFailureTransportStampTest extends TestCase
{
- public function testGetters()
+ public function testGetOriginalReceiverName()
{
- $flattenException = new FlattenException();
- $stamp = new SentToFailureTransportStamp(
- 'exception message',
- 'original_receiver',
- $flattenException
- );
- $this->assertSame('exception message', $stamp->getExceptionMessage());
+ $stamp = new SentToFailureTransportStamp('original_receiver');
$this->assertSame('original_receiver', $stamp->getOriginalReceiverName());
- $this->assertSame($flattenException, $stamp->getFlattenException());
- $this->assertInstanceOf(\DateTimeInterface::class, $stamp->getSentAt());
}
}