[Messenger] On failure retry, make message appear received from original sender

This commit is contained in:
Ryan Weaver 2019-05-08 10:31:22 -04:00 committed by Fabien Potencier
parent ca6266d902
commit 80b5df270f
16 changed files with 410 additions and 70 deletions

View File

@ -1659,6 +1659,7 @@ class FrameworkExtension extends Extension
'before' => [
['id' => 'add_bus_name_stamp_middleware'],
['id' => 'dispatch_after_current_bus'],
['id' => 'failed_message_processing_middleware'],
],
'after' => [
['id' => 'send_message'],

View File

@ -48,6 +48,8 @@
<argument type="service" id="validator" />
</service>
<service id="messenger.middleware.failed_message_processing_middleware" class="Symfony\Component\Messenger\Middleware\FailedMessageProcessingMiddleware" />
<service id="messenger.middleware.traceable" class="Symfony\Component\Messenger\Middleware\TraceableMiddleware" abstract="true">
<argument type="service" id="debug.stopwatch" />
</service>

View File

@ -753,6 +753,7 @@ abstract class FrameworkExtensionTest extends TestCase
$this->assertEquals([
['id' => 'add_bus_name_stamp_middleware', 'arguments' => ['messenger.bus.commands']],
['id' => 'dispatch_after_current_bus'],
['id' => 'failed_message_processing_middleware'],
['id' => 'send_message'],
['id' => 'handle_message'],
], $container->getParameter('messenger.bus.commands.middleware'));
@ -761,6 +762,7 @@ abstract class FrameworkExtensionTest extends TestCase
$this->assertEquals([
['id' => 'add_bus_name_stamp_middleware', 'arguments' => ['messenger.bus.events']],
['id' => 'dispatch_after_current_bus'],
['id' => 'failed_message_processing_middleware'],
['id' => 'with_factory', 'arguments' => ['foo', true, ['bar' => 'baz']]],
['id' => 'send_message'],
['id' => 'handle_message'],

View File

@ -15,6 +15,7 @@ use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Helper\Dumper;
use Symfony\Component\Console\Style\SymfonyStyle;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
@ -59,8 +60,10 @@ abstract class AbstractFailedMessagesCommand extends Command
{
$io->title('Failed Message Details');
/** @var SentToFailureTransportStamp $sentToFailureTransportStamp */
/** @var SentToFailureTransportStamp|null $sentToFailureTransportStamp */
$sentToFailureTransportStamp = $envelope->last(SentToFailureTransportStamp::class);
/** @var RedeliveryStamp|null $lastRedeliveryStamp */
$lastRedeliveryStamp = $envelope->last(RedeliveryStamp::class);
$rows = [
['Class', \get_class($envelope->getMessage())],
@ -70,25 +73,34 @@ abstract class AbstractFailedMessagesCommand extends Command
$rows[] = ['Message Id', $id];
}
$flattenException = null === $lastRedeliveryStamp ? null : $lastRedeliveryStamp->getFlattenException();
if (null === $sentToFailureTransportStamp) {
$io->warning('Message does not appear to have been sent to this transport after failing');
} else {
$rows = array_merge($rows, [
['Failed at', $sentToFailureTransportStamp->getSentAt()->format('Y-m-d H:i:s')],
['Error', $sentToFailureTransportStamp->getExceptionMessage()],
['Error Class', $sentToFailureTransportStamp->getFlattenException() ? $sentToFailureTransportStamp->getFlattenException()->getClass() : '(unknown)'],
['Failed at', null === $lastRedeliveryStamp ? '' : $lastRedeliveryStamp->getRedeliveredAt()->format('Y-m-d H:i:s')],
['Error', null === $lastRedeliveryStamp ? '' : $lastRedeliveryStamp->getExceptionMessage()],
['Error Class', null === $flattenException ? '(unknown)' : $flattenException->getClass()],
['Transport', $sentToFailureTransportStamp->getOriginalReceiverName()],
]);
}
$io->table([], $rows);
/** @var RedeliveryStamp[] $redeliveryStamps */
$redeliveryStamps = $envelope->all(RedeliveryStamp::class);
$io->writeln(' Message history:');
foreach ($redeliveryStamps as $redeliveryStamp) {
$io->writeln(sprintf(' * Message failed and redelivered to the <info>%s</info> transport at <info>%s</info>', $redeliveryStamp->getSenderClassOrAlias(), $redeliveryStamp->getRedeliveredAt()->format('Y-m-d H:i:s')));
}
$io->newLine();
if ($io->isVeryVerbose()) {
$io->title('Message:');
$dump = new Dumper($io);
$io->writeln($dump($envelope->getMessage()));
$io->title('Exception:');
$io->writeln($sentToFailureTransportStamp->getFlattenException()->getTraceAsString());
$io->writeln(null === $flattenException ? '(no data)' : $flattenException->getTraceAsString());
} else {
$io->writeln(' Re-run command with <info>-vv</info> to see more message & error details.');
}

View File

@ -154,7 +154,7 @@ EOF
}
// avoid success message if nothing was processed
if (1 < $count) {
if (1 <= $count) {
$io->success('All failed messages have been handled or removed!');
}
}

View File

@ -18,7 +18,7 @@ use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\ConsoleOutputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Style\SymfonyStyle;
use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
/**
@ -83,14 +83,14 @@ EOF
$rows = [];
foreach ($envelopes as $envelope) {
/** @var SentToFailureTransportStamp $sentToFailureTransportStamp */
$sentToFailureTransportStamp = $envelope->last(SentToFailureTransportStamp::class);
/** @var RedeliveryStamp|null $lastRedeliveryStamp */
$lastRedeliveryStamp = $envelope->last(RedeliveryStamp::class);
$rows[] = [
$this->getMessageId($envelope),
\get_class($envelope->getMessage()),
null === $sentToFailureTransportStamp ? '' : $sentToFailureTransportStamp->getSentAt()->format('Y-m-d H:i:s'),
null === $sentToFailureTransportStamp ? '' : $sentToFailureTransportStamp->getExceptionMessage(),
null === $lastRedeliveryStamp ? '' : $lastRedeliveryStamp->getRedeliveredAt()->format('Y-m-d H:i:s'),
null === $lastRedeliveryStamp ? '' : $lastRedeliveryStamp->getExceptionMessage(),
];
}

View File

@ -16,9 +16,9 @@ use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
use Symfony\Component\Messenger\Exception\HandlerFailedException;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\DelayStamp;
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Stamp\SentStamp;
use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
@ -51,11 +51,8 @@ class SendFailedMessageToFailureTransportListener implements EventSubscriberInte
$envelope = $event->getEnvelope();
// avoid re-sending to the failed sender
foreach ($envelope->all(SentStamp::class) as $sentStamp) {
/** @var SentStamp $sentStamp */
if ($sentStamp->getSenderAlias() === $this->failureSenderAlias) {
return;
}
if (null !== $envelope->last(SentToFailureTransportStamp::class)) {
return;
}
// remove the received stamp so it's redelivered
@ -67,8 +64,9 @@ class SendFailedMessageToFailureTransportListener implements EventSubscriberInte
$flattenedException = \class_exists(FlattenException::class) ? FlattenException::createFromThrowable($throwable) : null;
$envelope = $envelope->withoutAll(ReceivedStamp::class)
->withoutAll(TransportMessageIdStamp::class)
->with(new SentToFailureTransportStamp($throwable->getMessage(), $event->getReceiverName(), $flattenedException))
->with(new RedeliveryStamp(0, $this->failureSenderAlias));
->with(new SentToFailureTransportStamp($event->getReceiverName()))
->with(new DelayStamp(0))
->with(new RedeliveryStamp(0, $this->failureSenderAlias, $throwable->getMessage(), $flattenedException));
if (null !== $this->logger) {
$this->logger->info('Rejected message {class} will be sent to the failure transport {transport}.', [

View File

@ -0,0 +1,38 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Middleware;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
/**
* @author Ryan Weaver <ryan@symfonycasts.com>
*
* @experimental in 4.3
*/
class FailedMessageProcessingMiddleware implements MiddlewareInterface
{
public function handle(Envelope $envelope, StackInterface $stack): Envelope
{
// look for "received" messages decorated with the SentToFailureTransportStamp
/** @var SentToFailureTransportStamp|null $sentToFailureStamp */
$sentToFailureStamp = $envelope->last(SentToFailureTransportStamp::class);
if (null !== $sentToFailureStamp && null !== $envelope->last(ReceivedStamp::class)) {
// mark the message as "received" from the original transport
// this guarantees the same behavior as when originally received
$envelope = $envelope->with(new ReceivedStamp($sentToFailureStamp->getOriginalReceiverName()));
}
return $stack->next()->handle($envelope, $stack);
}
}

View File

@ -11,6 +11,8 @@
namespace Symfony\Component\Messenger\Stamp;
use Symfony\Component\Debug\Exception\FlattenException;
/**
* Stamp applied when a messages needs to be redelivered.
*
@ -20,14 +22,20 @@ class RedeliveryStamp implements StampInterface
{
private $retryCount;
private $senderClassOrAlias;
private $redeliveredAt;
private $exceptionMessage;
private $flattenException;
/**
* @param string $senderClassOrAlias Alias from SendersLocator or just the class name
*/
public function __construct(int $retryCount, string $senderClassOrAlias)
public function __construct(int $retryCount, string $senderClassOrAlias, string $exceptionMessage = null, FlattenException $flattenException = null)
{
$this->retryCount = $retryCount;
$this->senderClassOrAlias = $senderClassOrAlias;
$this->exceptionMessage = $exceptionMessage;
$this->flattenException = $flattenException;
$this->redeliveredAt = new \DateTimeImmutable();
}
public function getRetryCount(): int
@ -36,7 +44,7 @@ class RedeliveryStamp implements StampInterface
}
/**
* Needed for this class to serialize through Symfony's serializer.
* The target sender this should be redelivered to.
*
* @internal
*/
@ -44,4 +52,19 @@ class RedeliveryStamp implements StampInterface
{
return $this->senderClassOrAlias;
}
public function getExceptionMessage(): ?string
{
return $this->exceptionMessage;
}
public function getFlattenException(): ?FlattenException
{
return $this->flattenException;
}
public function getRedeliveredAt(): \DateTimeInterface
{
return $this->redeliveredAt;
}
}

View File

@ -11,8 +11,6 @@
namespace Symfony\Component\Messenger\Stamp;
use Symfony\Component\Debug\Exception\FlattenException;
/**
* Stamp applied when a message is sent to the failure transport.
*
@ -22,36 +20,15 @@ use Symfony\Component\Debug\Exception\FlattenException;
*/
class SentToFailureTransportStamp implements StampInterface
{
private $exceptionMessage;
private $originalReceiverName;
private $flattenException;
private $sentAt;
public function __construct(string $exceptionMessage, string $originalReceiverName, FlattenException $flattenException = null)
public function __construct(string $originalReceiverName)
{
$this->exceptionMessage = $exceptionMessage;
$this->originalReceiverName = $originalReceiverName;
$this->flattenException = $flattenException;
$this->sentAt = new \DateTimeImmutable();
}
public function getExceptionMessage(): string
{
return $this->exceptionMessage;
}
public function getOriginalReceiverName(): string
{
return $this->originalReceiverName;
}
public function getFlattenException(): ?FlattenException
{
return $this->flattenException;
}
public function getSentAt(): \DateTimeInterface
{
return $this->sentAt;
}
}

View File

@ -15,6 +15,7 @@ use PHPUnit\Framework\TestCase;
use Symfony\Component\Console\Tester\CommandTester;
use Symfony\Component\Messenger\Command\FailedMessagesShowCommand;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
@ -26,10 +27,12 @@ class FailedMessagesShowCommandTest extends TestCase
{
public function testBasicRun()
{
$sentToFailureStamp = new SentToFailureTransportStamp('Things are bad!', 'async');
$sentToFailureStamp = new SentToFailureTransportStamp('async');
$redeliveryStamp = new RedeliveryStamp(0, 'failure_receiver', 'Things are bad!');
$envelope = new Envelope(new \stdClass(), [
new TransportMessageIdStamp(15),
$sentToFailureStamp,
$redeliveryStamp,
]);
$receiver = $this->createMock(ListableReceiverInterface::class);
$receiver->expects($this->once())->method('find')->with(15)->willReturn($envelope);
@ -48,10 +51,11 @@ class FailedMessagesShowCommandTest extends TestCase
Message Id 15
Failed at %s
Error Things are bad!
Error Class (unknown)
Error Class (unknown)
Transport async
EOF
,
$sentToFailureStamp->getSentAt()->format('Y-m-d H:i:s')),
$redeliveryStamp->getRedeliveredAt()->format('Y-m-d H:i:s')),
$tester->getDisplay(true));
}
}

View File

@ -19,7 +19,6 @@ use Symfony\Component\Messenger\Exception\HandlerFailedException;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Stamp\SentStamp;
use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
@ -35,13 +34,13 @@ class SendFailedMessageToFailureTransportListenerTest extends TestCase
/** @var SentToFailureTransportStamp $sentToFailureTransportStamp */
$sentToFailureTransportStamp = $envelope->last(SentToFailureTransportStamp::class);
$this->assertNotNull($sentToFailureTransportStamp);
$this->assertSame('no!', $sentToFailureTransportStamp->getExceptionMessage());
$this->assertSame('my_receiver', $sentToFailureTransportStamp->getOriginalReceiverName());
$this->assertSame('no!', $sentToFailureTransportStamp->getFlattenException()->getMessage());
/** @var RedeliveryStamp $redeliveryStamp */
$redeliveryStamp = $envelope->last(RedeliveryStamp::class);
$this->assertSame('failure_sender', $redeliveryStamp->getSenderClassOrAlias());
$this->assertSame('no!', $redeliveryStamp->getExceptionMessage());
$this->assertSame('no!', $redeliveryStamp->getFlattenException()->getMessage());
$this->assertNull($envelope->last(ReceivedStamp::class));
$this->assertNull($envelope->last(TransportMessageIdStamp::class));
@ -65,11 +64,11 @@ class SendFailedMessageToFailureTransportListenerTest extends TestCase
$bus = $this->createMock(MessageBusInterface::class);
$bus->expects($this->once())->method('dispatch')->with($this->callback(function ($envelope) {
/** @var Envelope $envelope */
/** @var SentToFailureTransportStamp $sentToFailureTransportStamp */
$sentToFailureTransportStamp = $envelope->last(SentToFailureTransportStamp::class);
$this->assertNotNull($sentToFailureTransportStamp);
$this->assertSame('I am inside!', $sentToFailureTransportStamp->getExceptionMessage());
$this->assertSame('Exception', $sentToFailureTransportStamp->getFlattenException()->getClass());
/** @var RedeliveryStamp $redeliveryStamp */
$redeliveryStamp = $envelope->last(RedeliveryStamp::class);
$this->assertNotNull($redeliveryStamp);
$this->assertSame('I am inside!', $redeliveryStamp->getExceptionMessage());
$this->assertSame('Exception', $redeliveryStamp->getFlattenException()->getClass());
return true;
}))->willReturn(new Envelope(new \stdClass()));
@ -112,7 +111,7 @@ class SendFailedMessageToFailureTransportListenerTest extends TestCase
);
$envelope = new Envelope(new \stdClass(), [
new SentStamp('MySender', 'failure_sender'),
new SentToFailureTransportStamp('my_receiver'),
]);
$event = new WorkerMessageFailedEvent($envelope, 'my_receiver', new \Exception(''), false);

View File

@ -0,0 +1,281 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Tests;
use PHPUnit\Framework\TestCase;
use Psr\Container\ContainerInterface;
use Symfony\Component\DependencyInjection\Container;
use Symfony\Component\EventDispatcher\EventDispatcher;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
use Symfony\Component\Messenger\EventListener\SendFailedMessageToFailureTransportListener;
use Symfony\Component\Messenger\Exception\HandlerFailedException;
use Symfony\Component\Messenger\Handler\HandlerDescriptor;
use Symfony\Component\Messenger\Handler\HandlersLocator;
use Symfony\Component\Messenger\MessageBus;
use Symfony\Component\Messenger\Middleware\FailedMessageProcessingMiddleware;
use Symfony\Component\Messenger\Middleware\HandleMessageMiddleware;
use Symfony\Component\Messenger\Middleware\SendMessageMiddleware;
use Symfony\Component\Messenger\Retry\MultiplierRetryStrategy;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
use Symfony\Component\Messenger\Transport\Sender\SendersLocator;
use Symfony\Component\Messenger\Worker;
class FailureIntegrationTest extends TestCase
{
public function testRequeMechanism()
{
$transport1 = new DummyFailureTestSenderAndReceiver();
$transport2 = new DummyFailureTestSenderAndReceiver();
$failureTransport = new DummyFailureTestSenderAndReceiver();
$transports = [
'transport1' => $transport1,
'transport2' => $transport2,
'the_failure_transport' => $failureTransport,
];
$locator = $this->createMock(ContainerInterface::class);
$locator->expects($this->any())
->method('has')
->willReturn(true);
$locator->expects($this->any())
->method('get')
->willReturnCallback(function ($transportName) use ($transports) {
return $transports[$transportName];
});
$senderLocator = new SendersLocator(
[DummyMessage::class => ['transport1', 'transport2']],
$locator
);
// using to so we can lazily get the bus later and avoid circular problem
$transport1HandlerThatFails = new DummyTestHandler(true);
$allTransportHandlerThatWorks = new DummyTestHandler(false);
$transport2HandlerThatWorks = new DummyTestHandler(false);
$container = new Container();
$handlerLocator = new HandlersLocator([
DummyMessage::class => [
new HandlerDescriptor($transport1HandlerThatFails, [
'from_transport' => 'transport1',
'alias' => 'handler_that_fails',
]),
new HandlerDescriptor($allTransportHandlerThatWorks, [
'alias' => 'handler_that_works1',
]),
new HandlerDescriptor($transport2HandlerThatWorks, [
'from_transport' => 'transport2',
'alias' => 'handler_that_works2',
]),
],
]);
$dispatcher = new EventDispatcher();
$bus = new MessageBus([
new FailedMessageProcessingMiddleware(),
new SendMessageMiddleware($senderLocator),
new HandleMessageMiddleware($handlerLocator),
]);
$container->set('bus', $bus);
$dispatcher->addSubscriber(new SendFailedMessageToFailureTransportListener($bus, 'the_failure_transport'));
$runWorker = function (string $transportName, int $maxRetries) use ($transports, $bus, $dispatcher): ?\Throwable {
$throwable = null;
$failedListener = function (WorkerMessageFailedEvent $event) use (&$throwable) {
$throwable = $event->getThrowable();
};
$dispatcher->addListener(WorkerMessageFailedEvent::class, $failedListener);
$worker = new Worker([$transportName => $transports[$transportName]], $bus, [$transportName => new MultiplierRetryStrategy($maxRetries)], $dispatcher);
$worker->run([], function (?Envelope $envelope) use ($worker) {
// handle one envelope, then stop
if (null !== $envelope) {
$worker->stop();
}
});
$dispatcher->removeListener(WorkerMessageFailedEvent::class, $failedListener);
return $throwable;
};
// send the message
$envelope = new Envelope(new DummyMessage('API'));
$bus->dispatch($envelope);
// message has been sent
$this->assertCount(1, $transport1->getMessagesWaitingToBeReceived());
$this->assertCount(1, $transport2->getMessagesWaitingToBeReceived());
$this->assertCount(0, $failureTransport->getMessagesWaitingToBeReceived());
// receive the message - one handler will fail and the message
// will be sent back to transport1 to be retried
/*
* Receive the message from "transport1"
*/
$throwable = $runWorker('transport1', 1);
// make sure this is failing for the reason we think
$this->assertInstanceOf(HandlerFailedException::class, $throwable);
// handler for transport1 and all transports were called
$this->assertSame(1, $transport1HandlerThatFails->getTimesCalled());
$this->assertSame(1, $allTransportHandlerThatWorks->getTimesCalled());
$this->assertSame(0, $transport2HandlerThatWorks->getTimesCalled());
// one handler failed and the message is retried (resent to transport1)
$this->assertCount(1, $transport1->getMessagesWaitingToBeReceived());
$this->assertEmpty($failureTransport->getMessagesWaitingToBeReceived());
/*
* Receive the message for a (final) retry
*/
$runWorker('transport1', 1);
// only the "failed" handler is called a 2nd time
$this->assertSame(2, $transport1HandlerThatFails->getTimesCalled());
$this->assertSame(1, $allTransportHandlerThatWorks->getTimesCalled());
// handling fails again, message is sent to failure transport
$this->assertCount(0, $transport1->getMessagesWaitingToBeReceived());
$this->assertCount(1, $failureTransport->getMessagesWaitingToBeReceived());
/** @var Envelope $failedEnvelope */
$failedEnvelope = $failureTransport->getMessagesWaitingToBeReceived()[0];
/** @var SentToFailureTransportStamp $sentToFailureStamp */
$sentToFailureStamp = $failedEnvelope->last(SentToFailureTransportStamp::class);
$this->assertNotNull($sentToFailureStamp);
/** @var RedeliveryStamp $redeliveryStamp */
$redeliveryStamp = $failedEnvelope->last(RedeliveryStamp::class);
$this->assertNotNull($redeliveryStamp);
$this->assertSame('Failure from call 2', $redeliveryStamp->getExceptionMessage());
/*
* Failed message is handled, fails, and sent for a retry
*/
$throwable = $runWorker('the_failure_transport', 1);
// make sure this is failing for the reason we think
$this->assertInstanceOf(HandlerFailedException::class, $throwable);
// only the "failed" handler is called a 3rd time
$this->assertSame(3, $transport1HandlerThatFails->getTimesCalled());
$this->assertSame(1, $allTransportHandlerThatWorks->getTimesCalled());
// handling fails again, message is retried
$this->assertCount(1, $failureTransport->getMessagesWaitingToBeReceived());
// transport2 still only holds the original message
// a new message was never mistakenly delivered to it
$this->assertCount(1, $transport2->getMessagesWaitingToBeReceived());
/*
* Message is retried on failure transport then discarded
*/
$runWorker('the_failure_transport', 1);
// only the "failed" handler is called a 4th time
$this->assertSame(4, $transport1HandlerThatFails->getTimesCalled());
$this->assertSame(1, $allTransportHandlerThatWorks->getTimesCalled());
// handling fails again, message is discarded
$this->assertCount(0, $failureTransport->getMessagesWaitingToBeReceived());
/*
* Execute handlers on transport2
*/
$runWorker('transport2', 1);
// transport1 handler is not called again
$this->assertSame(4, $transport1HandlerThatFails->getTimesCalled());
// all transport handler is now called again
$this->assertSame(2, $allTransportHandlerThatWorks->getTimesCalled());
// transport1 handler called for the first time
$this->assertSame(1, $transport2HandlerThatWorks->getTimesCalled());
// all transport should be empty
$this->assertEmpty($transport1->getMessagesWaitingToBeReceived());
$this->assertEmpty($transport2->getMessagesWaitingToBeReceived());
$this->assertEmpty($failureTransport->getMessagesWaitingToBeReceived());
/*
* Dispatch the original message again
*/
$bus->dispatch($envelope);
// handle the message, but with no retries
$runWorker('transport1', 0);
// now make the handler work!
$transport1HandlerThatFails->setShouldThrow(false);
$runWorker('the_failure_transport', 1);
// the failure transport is empty because it worked
$this->assertEmpty($failureTransport->getMessagesWaitingToBeReceived());
}
}
class DummyFailureTestSenderAndReceiver implements ReceiverInterface, SenderInterface
{
private $messagesWaiting = [];
public function get(): iterable
{
$message = array_shift($this->messagesWaiting);
if (null === $message) {
return [];
}
return [$message];
}
public function ack(Envelope $envelope): void
{
}
public function reject(Envelope $envelope): void
{
}
public function send(Envelope $envelope): Envelope
{
$this->messagesWaiting[] = $envelope;
return $envelope;
}
/**
* @return Envelope[]
*/
public function getMessagesWaitingToBeReceived(): array
{
return $this->messagesWaiting;
}
}
class DummyTestHandler
{
private $timesCalled = 0;
private $shouldThrow;
public function __construct(bool $shouldThrow)
{
$this->shouldThrow = $shouldThrow;
}
public function __invoke()
{
++$this->timesCalled;
if ($this->shouldThrow) {
throw new \Exception('Failure from call '.$this->timesCalled);
}
}
public function getTimesCalled(): int
{
return $this->timesCalled;
}
public function setShouldThrow(bool $shouldThrow)
{
$this->shouldThrow = $shouldThrow;
}
}

View File

@ -38,8 +38,8 @@ class RetryIntegrationTest extends TestCase
$senderLocator->method('get')->with('sender_alias')->willReturn($senderAndReceiver);
$senderLocator = new SendersLocator([DummyMessage::class => ['sender_alias']], $senderLocator);
$handler = new DummyMessageHandlerFailingFirstTimes(0, 'A');
$throwingHandler = new DummyMessageHandlerFailingFirstTimes(1, 'B');
$handler = new DummyMessageHandlerFailingFirstTimes(0);
$throwingHandler = new DummyMessageHandlerFailingFirstTimes(1);
$handlerLocator = new HandlersLocator([
DummyMessage::class => [
new HandlerDescriptor($handler, ['alias' => 'first']),

View File

@ -12,6 +12,7 @@
namespace Symfony\Component\Messenger\Tests\Stamp;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Debug\Exception\FlattenException;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
class RedeliveryStampTest extends TestCase
@ -21,5 +22,16 @@ class RedeliveryStampTest extends TestCase
$stamp = new RedeliveryStamp(10, 'sender_alias');
$this->assertSame(10, $stamp->getRetryCount());
$this->assertSame('sender_alias', $stamp->getSenderClassOrAlias());
$this->assertInstanceOf(\DateTimeInterface::class, $stamp->getRedeliveredAt());
$this->assertNull($stamp->getExceptionMessage());
$this->assertNull($stamp->getFlattenException());
}
public function testGettersPopulated()
{
$flattenException = new FlattenException();
$stamp = new RedeliveryStamp(10, 'sender_alias', 'exception message', $flattenException);
$this->assertSame('exception message', $stamp->getExceptionMessage());
$this->assertSame($flattenException, $stamp->getFlattenException());
}
}

View File

@ -12,22 +12,13 @@
namespace Symfony\Component\Messenger\Tests\Stamp;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Debug\Exception\FlattenException;
use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
class SentToFailureTransportStampTest extends TestCase
{
public function testGetters()
public function testGetOriginalReceiverName()
{
$flattenException = new FlattenException();
$stamp = new SentToFailureTransportStamp(
'exception message',
'original_receiver',
$flattenException
);
$this->assertSame('exception message', $stamp->getExceptionMessage());
$stamp = new SentToFailureTransportStamp('original_receiver');
$this->assertSame('original_receiver', $stamp->getOriginalReceiverName());
$this->assertSame($flattenException, $stamp->getFlattenException());
$this->assertInstanceOf(\DateTimeInterface::class, $stamp->getSentAt());
}
}