Messenger multiple failed transports

This commit is contained in:
Hugo Monteiro 2019-12-14 08:49:29 +00:00 committed by Robin Chalas
parent 2dcf313a87
commit 5810b6c378
24 changed files with 1498 additions and 71 deletions

View File

@ -1313,6 +1313,10 @@ class Configuration implements ConfigurationInterface
->prototype('variable')
->end()
->end()
->scalarNode('failure_transport')
->defaultNull()
->info('Transport name to send failed messages to (after all retries have failed).')
->end()
->arrayNode('retry_strategy')
->addDefaultsIfNotSet()
->beforeNormalization()

View File

@ -1915,15 +1915,38 @@ class FrameworkExtension extends Extension
$container->setAlias('messenger.default_serializer', $config['serializer']['default_serializer']);
}
$failureTransports = [];
if ($config['failure_transport']) {
if (!isset($config['transports'][$config['failure_transport']])) {
throw new LogicException(sprintf('Invalid Messenger configuration: the failure transport "%s" is not a valid transport or service id.', $config['failure_transport']));
}
$container->setAlias('messenger.failure_transports.default', 'messenger.transport.'.$config['failure_transport']);
$failureTransports[] = $config['failure_transport'];
}
$failureTransportsByName = [];
foreach ($config['transports'] as $name => $transport) {
if ($transport['failure_transport']) {
$failureTransports[] = $transport['failure_transport'];
$failureTransportsByName[$name] = $transport['failure_transport'];
} elseif ($config['failure_transport']) {
$failureTransportsByName[$name] = $config['failure_transport'];
}
}
$senderAliases = [];
$transportRetryReferences = [];
foreach ($config['transports'] as $name => $transport) {
$serializerId = $transport['serializer'] ?? 'messenger.default_serializer';
$transportDefinition = (new Definition(TransportInterface::class))
->setFactory([new Reference('messenger.transport_factory'), 'createTransport'])
->setArguments([$transport['dsn'], $transport['options'] + ['transport_name' => $name], new Reference($serializerId)])
->addTag('messenger.receiver', ['alias' => $name])
->addTag('messenger.receiver', [
'alias' => $name,
'is_failure_transport' => \in_array($name, $failureTransports),
]
)
;
$container->setDefinition($transportId = 'messenger.transport.'.$name, $transportDefinition);
$senderAliases[$name] = $transportId;
@ -1954,6 +1977,18 @@ class FrameworkExtension extends Extension
$senderReferences[$serviceId] = new Reference($serviceId);
}
foreach ($config['transports'] as $name => $transport) {
if ($transport['failure_transport']) {
if (!isset($senderReferences[$transport['failure_transport']])) {
throw new LogicException(sprintf('Invalid Messenger configuration: the failure transport "%s" is not a valid transport or service id.', $transport['failure_transport']));
}
}
}
$failureTransportReferencesByTransportName = array_map(function ($failureTransportName) use ($senderReferences) {
return $senderReferences[$failureTransportName];
}, $failureTransportsByName);
$messageToSendersMapping = [];
foreach ($config['routing'] as $message => $messageConfiguration) {
if ('*' !== $message && !class_exists($message) && !interface_exists($message, false)) {
@ -1984,19 +2019,17 @@ class FrameworkExtension extends Extension
$container->getDefinition('messenger.retry_strategy_locator')
->replaceArgument(0, $transportRetryReferences);
if ($config['failure_transport']) {
if (!isset($senderReferences[$config['failure_transport']])) {
throw new LogicException(sprintf('Invalid Messenger configuration: the failure transport "%s" is not a valid transport or service id.', $config['failure_transport']));
}
$container->getDefinition('messenger.failure.send_failed_message_to_failure_transport_listener')
->replaceArgument(0, $senderReferences[$config['failure_transport']]);
if (\count($failureTransports) > 0) {
$container->getDefinition('console.command.messenger_failed_messages_retry')
->replaceArgument(0, $config['failure_transport']);
$container->getDefinition('console.command.messenger_failed_messages_show')
->replaceArgument(0, $config['failure_transport']);
$container->getDefinition('console.command.messenger_failed_messages_remove')
->replaceArgument(0, $config['failure_transport']);
$failureTransportsByTransportNameServiceLocator = ServiceLocatorTagPass::register($container, $failureTransportReferencesByTransportName);
$container->getDefinition('messenger.failure.send_failed_message_to_failure_transport_listener')
->replaceArgument(0, $failureTransportsByTransportNameServiceLocator);
} else {
$container->removeDefinition('messenger.failure.send_failed_message_to_failure_transport_listener');
$container->removeDefinition('console.command.messenger_failed_messages_retry');

View File

@ -165,8 +165,8 @@ return static function (ContainerConfigurator $container) {
->set('console.command.messenger_failed_messages_retry', FailedMessagesRetryCommand::class)
->args([
abstract_arg('Receiver name'),
abstract_arg('Receiver'),
abstract_arg('Default failure receiver name'),
abstract_arg('Receivers'),
service('messenger.routable_message_bus'),
service('event_dispatcher'),
service('logger'),
@ -175,15 +175,15 @@ return static function (ContainerConfigurator $container) {
->set('console.command.messenger_failed_messages_show', FailedMessagesShowCommand::class)
->args([
abstract_arg('Receiver name'),
abstract_arg('Receiver'),
abstract_arg('Default failure receiver name'),
abstract_arg('Receivers'),
])
->tag('console.command')
->set('console.command.messenger_failed_messages_remove', FailedMessagesRemoveCommand::class)
->args([
abstract_arg('Receiver name'),
abstract_arg('Receiver'),
abstract_arg('Default failure receiver name'),
abstract_arg('Receivers'),
])
->tag('console.command')

View File

@ -172,7 +172,7 @@ return static function (ContainerConfigurator $container) {
->set('messenger.failure.send_failed_message_to_failure_transport_listener', SendFailedMessageToFailureTransportListener::class)
->args([
abstract_arg('failure transport'),
abstract_arg('failure transports'),
service('logger')->ignoreOnInvalid(),
])
->tag('kernel.event_subscriber')

View File

@ -491,6 +491,7 @@
<xsd:attribute name="name" type="xsd:string" />
<xsd:attribute name="serializer" type="xsd:string" />
<xsd:attribute name="dsn" type="xsd:string" />
<xsd:attribute name="failure-transport" type="xsd:string" />
</xsd:complexType>
<xsd:complexType name="messenger_retry_strategy">

View File

@ -0,0 +1,19 @@
<?php
$container->loadFromExtension('framework', [
'messenger' => [
'transports' => [
'transport_1' => [
'dsn' => 'null://',
'failure_transport' => 'failure_transport_1'
],
'transport_2' => 'null://',
'transport_3' => [
'dsn' => 'null://',
'failure_transport' => 'failure_transport_3'
],
'failure_transport_1' => 'null://',
'failure_transport_3' => 'null://'
],
],
]);

View File

@ -0,0 +1,21 @@
<?php
$container->loadFromExtension('framework', [
'messenger' => [
'failure_transport' => 'failure_transport_global',
'transports' => [
'transport_1' => [
'dsn' => 'null://',
'failure_transport' => 'failure_transport_1'
],
'transport_2' => 'null://',
'transport_3' => [
'dsn' => 'null://',
'failure_transport' => 'failure_transport_3'
],
'failure_transport_global' => 'null://',
'failure_transport_1' => 'null://',
'failure_transport_3' => 'null://',
],
],
]);

View File

@ -0,0 +1,17 @@
<?xml version="1.0" encoding="utf-8" ?>
<container xmlns="http://symfony.com/schema/dic/services"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:framework="http://symfony.com/schema/dic/symfony"
xsi:schemaLocation="http://symfony.com/schema/dic/services https://symfony.com/schema/dic/services/services-1.0.xsd
http://symfony.com/schema/dic/symfony https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
<framework:config>
<framework:messenger>
<framework:transport name="transport_1" dsn="null://" failure-transport="failure_transport_1" />
<framework:transport name="transport_2" dsn="null://" />
<framework:transport name="transport_3" dsn="null://" failure-transport="failure_transport_3" />
<framework:transport name="failure_transport_1" dsn="null://" />
<framework:transport name="failure_transport_3" dsn="null://" />
</framework:messenger>
</framework:config>
</container>

View File

@ -0,0 +1,18 @@
<?xml version="1.0" encoding="utf-8" ?>
<container xmlns="http://symfony.com/schema/dic/services"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:framework="http://symfony.com/schema/dic/symfony"
xsi:schemaLocation="http://symfony.com/schema/dic/services https://symfony.com/schema/dic/services/services-1.0.xsd
http://symfony.com/schema/dic/symfony https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
<framework:config>
<framework:messenger failure-transport="failure_transport_global">
<framework:transport name="transport_1" dsn="null://" failure-transport="failure_transport_1" />
<framework:transport name="transport_2" dsn="null://" />
<framework:transport name="transport_3" dsn="null://" failure-transport="failure_transport_3" />
<framework:transport name="failure_transport_global" dsn="null://" />
<framework:transport name="failure_transport_1" dsn="null://" />
<framework:transport name="failure_transport_3" dsn="null://" />
</framework:messenger>
</framework:config>
</container>

View File

@ -0,0 +1,12 @@
framework:
messenger:
transports:
transport_1:
dsn: 'null://'
failure_transport: failure_transport_1
transport_2: 'null://'
transport_3:
dsn: 'null://'
failure_transport: failure_transport_3
failure_transport_1: 'null://'
failure_transport_3: 'null://'

View File

@ -0,0 +1,14 @@
framework:
messenger:
failure_transport: failure_transport_global
transports:
transport_1:
dsn: 'null://'
failure_transport: failure_transport_1
transport_2: 'null://'
transport_3:
dsn: 'null://'
failure_transport: failure_transport_3
failure_transport_global: 'null://'
failure_transport_1: 'null://'
failure_transport_3: 'null://'

View File

@ -32,6 +32,7 @@ use Symfony\Component\Cache\Adapter\RedisAdapter;
use Symfony\Component\Cache\Adapter\RedisTagAwareAdapter;
use Symfony\Component\Cache\DependencyInjection\CachePoolPass;
use Symfony\Component\Config\Definition\Exception\InvalidConfigurationException;
use Symfony\Component\DependencyInjection\Argument\ServiceClosureArgument;
use Symfony\Component\DependencyInjection\ChildDefinition;
use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface;
use Symfony\Component\DependencyInjection\Compiler\ResolveInstanceofConditionalsPass;
@ -710,12 +711,92 @@ abstract class FrameworkExtensionTest extends TestCase
$this->assertSame(TransportFactory::class, $container->getDefinition('messenger.transport_factory')->getClass());
}
public function testMessengerMultipleFailureTransports()
{
$container = $this->createContainerFromFile('messenger_multiple_failure_transports');
$failureTransport1Definition = $container->getDefinition('messenger.transport.failure_transport_1');
$failureTransport1Tags = $failureTransport1Definition->getTag('messenger.receiver')[0];
$this->assertEquals([
'alias' => 'failure_transport_1',
'is_failure_transport' => true,
], $failureTransport1Tags);
$failureTransport3Definition = $container->getDefinition('messenger.transport.failure_transport_3');
$failureTransport3Tags = $failureTransport3Definition->getTag('messenger.receiver')[0];
$this->assertEquals([
'alias' => 'failure_transport_3',
'is_failure_transport' => true,
], $failureTransport3Tags);
// transport 2 exists but does not appear in the mapping
$this->assertFalse($container->hasDefinition('messenger.transport.failure_transport_2'));
$failureTransportsByTransportNameServiceLocator = $container->getDefinition('messenger.failure.send_failed_message_to_failure_transport_listener')->getArgument(0);
$failureTransports = $container->getDefinition((string) $failureTransportsByTransportNameServiceLocator)->getArgument(0);
$expectedTransportsByFailureTransports = [
'transport_1' => new Reference('messenger.transport.failure_transport_1'),
'transport_3' => new Reference('messenger.transport.failure_transport_3'),
];
$failureTransportsReferences = array_map(function (ServiceClosureArgument $serviceClosureArgument) {
$values = $serviceClosureArgument->getValues();
return array_shift($values);
}, $failureTransports);
$this->assertEquals($expectedTransportsByFailureTransports, $failureTransportsReferences);
}
public function testMessengerMultipleFailureTransportsWithGlobalFailureTransport()
{
$container = $this->createContainerFromFile('messenger_multiple_failure_transports_global');
$this->assertEquals('messenger.transport.failure_transport_global', (string) $container->getAlias('messenger.failure_transports.default'));
$failureTransport1Definition = $container->getDefinition('messenger.transport.failure_transport_1');
$failureTransport1Tags = $failureTransport1Definition->getTag('messenger.receiver')[0];
$this->assertEquals([
'alias' => 'failure_transport_1',
'is_failure_transport' => true,
], $failureTransport1Tags);
$failureTransport3Definition = $container->getDefinition('messenger.transport.failure_transport_3');
$failureTransport3Tags = $failureTransport3Definition->getTag('messenger.receiver')[0];
$this->assertEquals([
'alias' => 'failure_transport_3',
'is_failure_transport' => true,
], $failureTransport3Tags);
$failureTransportsByTransportNameServiceLocator = $container->getDefinition('messenger.failure.send_failed_message_to_failure_transport_listener')->getArgument(0);
$failureTransports = $container->getDefinition((string) $failureTransportsByTransportNameServiceLocator)->getArgument(0);
$expectedTransportsByFailureTransports = [
'failure_transport_1' => new Reference('messenger.transport.failure_transport_global'),
'failure_transport_3' => new Reference('messenger.transport.failure_transport_global'),
'failure_transport_global' => new Reference('messenger.transport.failure_transport_global'),
'transport_1' => new Reference('messenger.transport.failure_transport_1'),
'transport_2' => new Reference('messenger.transport.failure_transport_global'),
'transport_3' => new Reference('messenger.transport.failure_transport_3'),
];
$failureTransportsReferences = array_map(function (ServiceClosureArgument $serviceClosureArgument) {
$values = $serviceClosureArgument->getValues();
return array_shift($values);
}, $failureTransports);
$this->assertEquals($expectedTransportsByFailureTransports, $failureTransportsReferences);
}
public function testMessengerTransports()
{
$container = $this->createContainerFromFile('messenger_transports');
$this->assertTrue($container->hasDefinition('messenger.transport.default'));
$this->assertTrue($container->getDefinition('messenger.transport.default')->hasTag('messenger.receiver'));
$this->assertEquals([['alias' => 'default']], $container->getDefinition('messenger.transport.default')->getTag('messenger.receiver'));
$this->assertEquals([
['alias' => 'default', 'is_failure_transport' => false], ], $container->getDefinition('messenger.transport.default')->getTag('messenger.receiver'));
$transportArguments = $container->getDefinition('messenger.transport.default')->getArguments();
$this->assertEquals(new Reference('messenger.default_serializer'), $transportArguments[2]);
@ -756,7 +837,22 @@ abstract class FrameworkExtensionTest extends TestCase
$this->assertSame(3, $container->getDefinition('messenger.retry.multiplier_retry_strategy.customised')->getArgument(2));
$this->assertSame(100, $container->getDefinition('messenger.retry.multiplier_retry_strategy.customised')->getArgument(3));
$this->assertEquals(new Reference('messenger.transport.failed'), $container->getDefinition('messenger.failure.send_failed_message_to_failure_transport_listener')->getArgument(0));
$failureTransportsByTransportNameServiceLocator = $container->getDefinition('messenger.failure.send_failed_message_to_failure_transport_listener')->getArgument(0);
$failureTransports = $container->getDefinition((string) $failureTransportsByTransportNameServiceLocator)->getArgument(0);
$expectedTransportsByFailureTransports = [
'beanstalkd' => new Reference('messenger.transport.failed'),
'customised' => new Reference('messenger.transport.failed'),
'default' => new Reference('messenger.transport.failed'),
'failed' => new Reference('messenger.transport.failed'),
'redis' => new Reference('messenger.transport.failed'),
];
$failureTransportsReferences = array_map(function (ServiceClosureArgument $serviceClosureArgument) {
$values = $serviceClosureArgument->getValues();
return array_shift($values);
}, $failureTransports);
$this->assertEquals($expectedTransportsByFailureTransports, $failureTransportsReferences);
}
public function testMessengerRouting()

View File

@ -13,9 +13,12 @@ namespace Symfony\Component\Messenger\Command;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Helper\Dumper;
use Symfony\Component\Console\Question\ChoiceQuestion;
use Symfony\Component\Console\Style\SymfonyStyle;
use Symfony\Component\DependencyInjection\ServiceLocator;
use Symfony\Component\ErrorHandler\Exception\FlattenException;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
use Symfony\Component\Messenger\Stamp\ErrorDetailsStamp;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
@ -35,20 +38,36 @@ use Symfony\Component\VarDumper\Cloner\VarCloner;
*/
abstract class AbstractFailedMessagesCommand extends Command
{
private $receiverName;
private $receiver;
private $globalFailureReceiverName;
protected $failureTransports;
public function __construct(string $receiverName, ReceiverInterface $receiver)
public function __construct(?string $globalFailureReceiverName, $failureTransports)
{
$this->receiverName = $receiverName;
$this->receiver = $receiver;
$this->failureTransports = $failureTransports;
if (!$failureTransports instanceof ServiceLocator) {
trigger_deprecation('symfony/messenger', '5.3', 'Passing a non-scalar value as 2nd argument to "%s()" is deprecated, pass a ServiceLocator instead.', __METHOD__);
if (null === $globalFailureReceiverName) {
throw new InvalidArgumentException(sprintf('The argument "globalFailureReceiver" from method "%s()" must be not null if 2nd argument is not a ServiceLocator.', __METHOD__));
}
$this->failureTransports = new ServiceLocator([$globalFailureReceiverName => function () use ($failureTransports) { return $failureTransports; },]);
}
$this->globalFailureReceiverName = $globalFailureReceiverName;
parent::__construct();
}
protected function getReceiverName(): string
{
return $this->receiverName;
trigger_deprecation('symfony/messenger', '5.3', 'The method "%s()" is deprecated, use getGlobalFailureReceiverName() instead.', __METHOD__);
return $this->globalFailureReceiverName;
}
protected function getGlobalFailureReceiverName(): ?string
{
return $this->globalFailureReceiverName;
}
/**
@ -153,9 +172,18 @@ abstract class AbstractFailedMessagesCommand extends Command
}
}
protected function getReceiver(): ReceiverInterface
protected function getReceiver(/* string $name */): ReceiverInterface
{
return $this->receiver;
if (1 > \func_num_args() && __CLASS__ !== static::class && __CLASS__ !== (new \ReflectionMethod($this, __FUNCTION__))->getDeclaringClass()->getName() && !$this instanceof \PHPUnit\Framework\MockObject\MockObject && !$this instanceof \Prophecy\Prophecy\ProphecySubjectInterface) {
@trigger_error(sprintf('The "%s()" method will have a new "string $name" argument in version 5.3, not defining it is deprecated since Symfony 5.2.', __METHOD__), \E_USER_DEPRECATED);
}
$name = \func_num_args() > 0 ? func_get_arg(0) : $this->globalFailureReceiverName;
if (!$this->failureTransports->has($name)) {
throw new InvalidArgumentException(sprintf('The failure transport with name "%s" was not found. Available transports are: "%s".', $name, implode(', ', array_keys($this->failureTransports->getProvidedServices()))));
}
return $this->failureTransports->get($name);
}
protected function getLastRedeliveryStampWithException(Envelope $envelope): ?RedeliveryStamp
@ -200,4 +228,27 @@ abstract class AbstractFailedMessagesCommand extends Command
return $cloner;
}
protected function printWarningAvailableFailureTransports(SymfonyStyle $io, ?string $failureTransportName): void
{
$failureTransports = array_keys($this->failureTransports->getProvidedServices());
$failureTransportsCount = \count($failureTransports);
if ($failureTransportsCount > 1) {
$io->writeln([
sprintf('> Loading messages from the <comment>global</comment> failure transport <comment>%s</comment>.', $failureTransportName),
'> To use a different failure transport, pass <comment>--transport=</comment>.',
sprintf('> Available failure transports are: <comment>%s</comment>', implode(', ', $failureTransports)),
"\n",
]);
}
}
protected function interactiveChooseFailureTransport(SymfonyStyle $io)
{
$failedTransports = array_keys($this->failureTransports->getProvidedServices());
$question = new ChoiceQuestion('Select failed transport:', $failedTransports, 0);
$question->setMultiselect(false);
return $io->askQuestion($question);
}
}

View File

@ -38,6 +38,7 @@ class FailedMessagesRemoveCommand extends AbstractFailedMessagesCommand
->setDefinition([
new InputArgument('id', InputArgument::REQUIRED | InputArgument::IS_ARRAY, 'Specific message id(s) to remove'),
new InputOption('force', null, InputOption::VALUE_NONE, 'Force the operation without confirmation'),
new InputOption('transport', null, InputOption::VALUE_OPTIONAL, 'Use a specific failure transport'),
new InputOption('show-messages', null, InputOption::VALUE_NONE, 'Display messages before removing it (if multiple ids are given)'),
])
->setDescription(self::$defaultDescription)
@ -59,20 +60,25 @@ EOF
{
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
$receiver = $this->getReceiver();
$failureTransportName = $input->getOption('transport');
if (null === $failureTransportName) {
$failureTransportName = $this->getGlobalFailureReceiverName();
}
$receiver = $this->getReceiver($failureTransportName);
$shouldForce = $input->getOption('force');
$ids = (array) $input->getArgument('id');
$shouldDisplayMessages = $input->getOption('show-messages') || 1 === \count($ids);
$this->removeMessages($ids, $receiver, $io, $shouldForce, $shouldDisplayMessages);
$this->removeMessages($failureTransportName, $ids, $receiver, $io, $shouldForce, $shouldDisplayMessages);
return 0;
}
private function removeMessages(array $ids, ReceiverInterface $receiver, SymfonyStyle $io, bool $shouldForce, bool $shouldDisplayMessages): void
private function removeMessages(string $failureTransportName, array $ids, ReceiverInterface $receiver, SymfonyStyle $io, bool $shouldForce, bool $shouldDisplayMessages): void
{
if (!$receiver instanceof ListableReceiverInterface) {
throw new RuntimeException(sprintf('The "%s" receiver does not support removing specific messages.', $this->getReceiverName()));
throw new RuntimeException(sprintf('The "%s" receiver does not support removing specific messages.', $failureTransportName));
}
foreach ($ids as $id) {

View File

@ -34,6 +34,8 @@ use Symfony\Component\Messenger\Worker;
*/
class FailedMessagesRetryCommand extends AbstractFailedMessagesCommand
{
private const DEFAULT_TRANSPORT_OPTION = 'choose';
protected static $defaultName = 'messenger:failed:retry';
protected static $defaultDescription = 'Retry one or more messages from the failure transport';
@ -41,13 +43,13 @@ class FailedMessagesRetryCommand extends AbstractFailedMessagesCommand
private $messageBus;
private $logger;
public function __construct(string $receiverName, ReceiverInterface $receiver, MessageBusInterface $messageBus, EventDispatcherInterface $eventDispatcher, LoggerInterface $logger = null)
public function __construct(?string $globalReceiverName, $failureTransports, MessageBusInterface $messageBus, EventDispatcherInterface $eventDispatcher, LoggerInterface $logger = null)
{
$this->eventDispatcher = $eventDispatcher;
$this->messageBus = $messageBus;
$this->logger = $logger;
parent::__construct($receiverName, $receiver);
parent::__construct($globalReceiverName, $failureTransports);
}
/**
@ -59,6 +61,7 @@ class FailedMessagesRetryCommand extends AbstractFailedMessagesCommand
->setDefinition([
new InputArgument('id', InputArgument::IS_ARRAY, 'Specific message id(s) to retry'),
new InputOption('force', null, InputOption::VALUE_NONE, 'Force action without confirmation'),
new InputOption('transport', null, InputOption::VALUE_OPTIONAL, 'Use a specific failure transport', self::DEFAULT_TRANSPORT_OPTION),
])
->setDescription(self::$defaultDescription)
->setHelp(<<<'EOF'
@ -96,10 +99,19 @@ EOF
$io->comment('Re-run the command with a -vv option to see logs about consumed messages.');
}
$receiver = $this->getReceiver();
$failureTransportName = $input->getOption('transport');
if (self::DEFAULT_TRANSPORT_OPTION === $failureTransportName) {
$this->printWarningAvailableFailureTransports($io, $this->getGlobalFailureReceiverName());
}
if ('' === $failureTransportName || null === $failureTransportName) {
$failureTransportName = $this->interactiveChooseFailureTransport($io);
}
$failureTransportName = self::DEFAULT_TRANSPORT_OPTION === $failureTransportName ? $this->getGlobalFailureReceiverName() : $failureTransportName;
$receiver = $this->getReceiver($failureTransportName);
$this->printPendingMessagesMessage($receiver, $io);
$io->writeln(sprintf('To retry all the messages, run <comment>messenger:consume %s</comment>', $this->getReceiverName()));
$io->writeln(sprintf('To retry all the messages, run <comment>messenger:consume %s</comment>', $failureTransportName));
$shouldForce = $input->getOption('force');
$ids = $input->getArgument('id');
@ -108,20 +120,20 @@ EOF
throw new RuntimeException('Message id must be passed when in non-interactive mode.');
}
$this->runInteractive($io, $shouldForce);
$this->runInteractive($failureTransportName, $io, $shouldForce);
return 0;
}
$this->retrySpecificIds($ids, $io, $shouldForce);
$this->retrySpecificIds($failureTransportName, $ids, $io, $shouldForce);
$io->success('All done!');
return 0;
}
private function runInteractive(SymfonyStyle $io, bool $shouldForce)
private function runInteractive(string $failureTransportName, SymfonyStyle $io, bool $shouldForce)
{
$receiver = $this->getReceiver();
$receiver = $this->failureTransports->get($failureTransportName);
$count = 0;
if ($receiver instanceof ListableReceiverInterface) {
// for listable receivers, find the messages one-by-one
@ -136,7 +148,7 @@ EOF
$id = $this->getMessageId($envelope);
if (null === $id) {
throw new LogicException(sprintf('The "%s" receiver is able to list messages by id but the envelope is missing the TransportMessageIdStamp stamp.', $this->getReceiverName()));
throw new LogicException(sprintf('The "%s" receiver is able to list messages by id but the envelope is missing the TransportMessageIdStamp stamp.', $failureTransportName));
}
$ids[] = $id;
}
@ -146,11 +158,11 @@ EOF
break;
}
$this->retrySpecificIds($ids, $io, $shouldForce);
$this->retrySpecificIds($failureTransportName, $ids, $io, $shouldForce);
}
} else {
// get() and ask messages one-by-one
$count = $this->runWorker($this->getReceiver(), $io, $shouldForce);
$count = $this->runWorker($failureTransportName, $receiver, $io, $shouldForce);
}
// avoid success message if nothing was processed
@ -159,7 +171,7 @@ EOF
}
}
private function runWorker(ReceiverInterface $receiver, SymfonyStyle $io, bool $shouldForce): int
private function runWorker(string $failureTransportName, ReceiverInterface $receiver, SymfonyStyle $io, bool $shouldForce): int
{
$count = 0;
$listener = function (WorkerMessageReceivedEvent $messageReceivedEvent) use ($io, $receiver, $shouldForce, &$count) {
@ -180,7 +192,7 @@ EOF
$this->eventDispatcher->addListener(WorkerMessageReceivedEvent::class, $listener);
$worker = new Worker(
[$this->getReceiverName() => $receiver],
[$failureTransportName => $receiver],
$this->messageBus,
$this->eventDispatcher,
$this->logger
@ -195,12 +207,12 @@ EOF
return $count;
}
private function retrySpecificIds(array $ids, SymfonyStyle $io, bool $shouldForce)
private function retrySpecificIds(string $failureTransportName, array $ids, SymfonyStyle $io, bool $shouldForce)
{
$receiver = $this->getReceiver();
$receiver = $this->getReceiver($failureTransportName);
if (!$receiver instanceof ListableReceiverInterface) {
throw new RuntimeException(sprintf('The "%s" receiver does not support retrying messages by id.', $this->getReceiverName()));
throw new RuntimeException(sprintf('The "%s" receiver does not support retrying messages by id.', $failureTransportName));
}
foreach ($ids as $id) {
@ -210,7 +222,7 @@ EOF
}
$singleReceiver = new SingleMessageReceiver($receiver, $envelope);
$this->runWorker($singleReceiver, $io, $shouldForce);
$this->runWorker($failureTransportName, $singleReceiver, $io, $shouldForce);
}
}
}

View File

@ -27,6 +27,8 @@ use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
*/
class FailedMessagesShowCommand extends AbstractFailedMessagesCommand
{
private const DEFAULT_TRANSPORT_OPTION = 'choose';
protected static $defaultName = 'messenger:failed:show';
protected static $defaultDescription = 'Show one or more messages from the failure transport';
@ -39,6 +41,7 @@ class FailedMessagesShowCommand extends AbstractFailedMessagesCommand
->setDefinition([
new InputArgument('id', InputArgument::OPTIONAL, 'Specific message id to show'),
new InputOption('max', null, InputOption::VALUE_REQUIRED, 'Maximum number of messages to list', 50),
new InputOption('transport', null, InputOption::VALUE_OPTIONAL, 'Use a specific failure transport', self::DEFAULT_TRANSPORT_OPTION),
])
->setDescription(self::$defaultDescription)
->setHelp(<<<'EOF'
@ -61,26 +64,36 @@ EOF
{
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
$receiver = $this->getReceiver();
$failureTransportName = $input->getOption('transport');
if (self::DEFAULT_TRANSPORT_OPTION === $failureTransportName) {
$this->printWarningAvailableFailureTransports($io, $this->getGlobalFailureReceiverName());
}
if ('' === $failureTransportName || null === $failureTransportName) {
$failureTransportName = $this->interactiveChooseFailureTransport($io);
}
$failureTransportName = self::DEFAULT_TRANSPORT_OPTION === $failureTransportName ? $this->getGlobalFailureReceiverName() : $failureTransportName;
$receiver = $this->getReceiver($failureTransportName);
$this->printPendingMessagesMessage($receiver, $io);
if (!$receiver instanceof ListableReceiverInterface) {
throw new RuntimeException(sprintf('The "%s" receiver does not support listing or showing specific messages.', $this->getReceiverName()));
throw new RuntimeException(sprintf('The "%s" receiver does not support listing or showing specific messages.', $failureTransportName));
}
if (null === $id = $input->getArgument('id')) {
$this->listMessages($io, $input->getOption('max'));
$this->listMessages($failureTransportName, $io, $input->getOption('max'));
} else {
$this->showMessage($id, $io);
$this->showMessage($failureTransportName, $id, $io);
}
return 0;
}
private function listMessages(SymfonyStyle $io, int $max)
private function listMessages(?string $failedTransportName, SymfonyStyle $io, int $max)
{
/** @var ListableReceiverInterface $receiver */
$receiver = $this->getReceiver();
$receiver = $this->getReceiver($failedTransportName);
$envelopes = $receiver->all($max);
$rows = [];
@ -119,13 +132,13 @@ EOF
$io->comment(sprintf('Showing first %d messages.', $max));
}
$io->comment('Run <comment>messenger:failed:show {id} -vv</comment> to see message details.');
$io->comment(sprintf('Run <comment>messenger:failed:show {id} --transport=%s -vv</comment> to see message details.', $failedTransportName));
}
private function showMessage(string $id, SymfonyStyle $io)
private function showMessage(?string $failedTransportName, string $id, SymfonyStyle $io)
{
/** @var ListableReceiverInterface $receiver */
$receiver = $this->getReceiver();
$receiver = $this->getReceiver($failedTransportName);
$envelope = $receiver->find($id);
if (null === $envelope) {
throw new RuntimeException(sprintf('The message "%s" was not found.', $id));
@ -135,8 +148,8 @@ EOF
$io->writeln([
'',
sprintf(' Run <comment>messenger:failed:retry %s</comment> to retry this message.', $id),
sprintf(' Run <comment>messenger:failed:remove %s</comment> to delete it.', $id),
sprintf(' Run <comment>messenger:failed:retry %s --transport=%s</comment> to retry this message.', $id, $failedTransportName),
sprintf(' Run <comment>messenger:failed:remove %s --transport=%s</comment> to delete it.', $id, $failedTransportName),
]);
}
}

View File

@ -125,7 +125,7 @@ class MessengerPass implements CompilerPassInterface
if (!\in_array($options['bus'], $busIds)) {
$messageLocation = isset($tag['handles']) ? 'declared in your tag attribute "handles"' : ($r->implementsInterface(MessageSubscriberInterface::class) ? sprintf('returned by method "%s::getHandledMessages()"', $r->getName()) : sprintf('used as argument type in method "%s::%s()"', $r->getName(), $method));
throw new RuntimeException(sprintf('Invalid configuration %s for message "%s": bus "%s" does not exist.', $messageLocation, $message, $options['bus']));
throw new RuntimeException(sprintf('Invalid configuration "%s" for message "%s": bus "%s" does not exist.', $messageLocation, $message, $options['bus']));
}
$buses = [$options['bus']];
@ -134,7 +134,7 @@ class MessengerPass implements CompilerPassInterface
if ('*' !== $message && !class_exists($message) && !interface_exists($message, false)) {
$messageLocation = isset($tag['handles']) ? 'declared in your tag attribute "handles"' : ($r->implementsInterface(MessageSubscriberInterface::class) ? sprintf('returned by method "%s::getHandledMessages()"', $r->getName()) : sprintf('used as argument type in method "%s::%s()"', $r->getName(), $method));
throw new RuntimeException(sprintf('Invalid handler service "%s": class or interface "%s" %s not found.', $serviceId, $message, $messageLocation));
throw new RuntimeException(sprintf('Invalid handler service "%s": class or interface "%s" "%s" not found.', $serviceId, $message, $messageLocation));
}
if (!$r->hasMethod($method)) {
@ -255,6 +255,14 @@ class MessengerPass implements CompilerPassInterface
private function registerReceivers(ContainerBuilder $container, array $busIds)
{
$receiverMapping = [];
$failureTransportsMap = [];
if ($container->hasDefinition('console.command.messenger_failed_messages_retry')) {
$commandDefinition = $container->getDefinition('console.command.messenger_failed_messages_retry');
$globalReceiverName = $commandDefinition->getArgument(0);
if (null !== $globalReceiverName) {
$failureTransportsMap[$commandDefinition->getArgument(0)] = new Reference('messenger.failure_transports.default');
}
}
foreach ($container->findTaggedServiceIds($this->receiverTag) as $id => $tags) {
$receiverClass = $this->getServiceClass($container, $id);
@ -267,6 +275,9 @@ class MessengerPass implements CompilerPassInterface
foreach ($tags as $tag) {
if (isset($tag['alias'])) {
$receiverMapping[$tag['alias']] = $receiverMapping[$id];
if ($tag['is_failure_transport'] ?? false) {
$failureTransportsMap[$tag['alias']] = $receiverMapping[$id];
}
}
}
}
@ -303,6 +314,8 @@ class MessengerPass implements CompilerPassInterface
$container->getDefinition('messenger.receiver_locator')->replaceArgument(0, $receiverMapping);
$failureTransportsLocator = ServiceLocatorTagPass::register($container, $failureTransportsMap);
$failedCommandIds = [
'console.command.messenger_failed_messages_retry',
'console.command.messenger_failed_messages_show',
@ -311,7 +324,7 @@ class MessengerPass implements CompilerPassInterface
foreach ($failedCommandIds as $failedCommandId) {
if ($container->hasDefinition($failedCommandId)) {
$definition = $container->getDefinition($failedCommandId);
$definition->replaceArgument(1, $receiverMapping[$definition->getArgument(0)]);
$definition->replaceArgument(1, $failureTransportsLocator);
}
}
}

View File

@ -10,6 +10,7 @@
namespace Symfony\Component\Messenger\EventListener;
use Psr\Container\ContainerInterface;
use Psr\Log\LoggerInterface;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
@ -25,12 +26,19 @@ use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
*/
class SendFailedMessageToFailureTransportListener implements EventSubscriberInterface
{
private $failureSender;
private $failureSenders;
private $logger;
public function __construct(SenderInterface $failureSender, LoggerInterface $logger = null)
/**
* @param ContainerInterface $failureSenders
*/
public function __construct($failureSenders, LoggerInterface $logger = null)
{
$this->failureSender = $failureSender;
if (!$failureSenders instanceof ContainerInterface) {
trigger_deprecation('symfony/messenger', '5.3', 'Passing a SenderInterface value as 1st argument to "%s()" is deprecated, pass a ServiceLocator instead.', __METHOD__);
}
$this->failureSenders = $failureSenders;
$this->logger = $logger;
}
@ -40,6 +48,15 @@ class SendFailedMessageToFailureTransportListener implements EventSubscriberInte
return;
}
if (!$this->hasFailureTransports($event)) {
return;
}
$failureSender = $this->getFailureSender($event->getReceiverName());
if (null === $failureSender) {
return;
}
$envelope = $event->getEnvelope();
// avoid re-sending to the failed sender
@ -56,11 +73,11 @@ class SendFailedMessageToFailureTransportListener implements EventSubscriberInte
if (null !== $this->logger) {
$this->logger->info('Rejected message {class} will be sent to the failure transport {transport}.', [
'class' => \get_class($envelope->getMessage()),
'transport' => \get_class($this->failureSender),
'transport' => \get_class($failureSender),
]);
}
$this->failureSender->send($envelope);
$failureSender->send($envelope);
}
public static function getSubscribedEvents()
@ -69,4 +86,18 @@ class SendFailedMessageToFailureTransportListener implements EventSubscriberInte
WorkerMessageFailedEvent::class => ['onMessageFailed', -100],
];
}
private function getFailureSender(string $receiverName): SenderInterface
{
if ($this->failureSenders instanceof SenderInterface) {
return $this->failureSenders;
}
return $this->failureSenders->get($receiverName);
}
private function hasFailureTransports(WorkerMessageFailedEvent $event): bool
{
return ($this->failureSenders instanceof ContainerInterface && $this->failureSenders->has($event->getReceiverName())) || $this->failureSenders instanceof SenderInterface;
}
}

View File

@ -13,12 +13,17 @@ namespace Symfony\Component\Messenger\Tests\Command;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Console\Tester\CommandTester;
use Symfony\Component\DependencyInjection\ServiceLocator;
use Symfony\Component\Messenger\Command\FailedMessagesRemoveCommand;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
class FailedMessagesRemoveCommandTest extends TestCase
{
/**
* @group legacy
*/
public function testRemoveSingleMessage()
{
$receiver = $this->createMock(ListableReceiverInterface::class);
@ -36,6 +41,30 @@ class FailedMessagesRemoveCommandTest extends TestCase
$this->assertStringContainsString('Message with id 20 removed.', $tester->getDisplay());
}
public function testRemoveSingleMessageWithServiceLocator()
{
$globalFailureReceiverName = 'failure_receiver';
$receiver = $this->createMock(ListableReceiverInterface::class);
$receiver->expects($this->once())->method('find')->with(20)->willReturn(new Envelope(new \stdClass()));
$serviceLocator = $this->createMock(ServiceLocator::class);
$serviceLocator->expects($this->once())->method('has')->with($globalFailureReceiverName)->willReturn(true);
$serviceLocator->expects($this->any())->method('get')->with($globalFailureReceiverName)->willReturn($receiver);
$command = new FailedMessagesRemoveCommand(
$globalFailureReceiverName,
$serviceLocator
);
$tester = new CommandTester($command);
$tester->execute(['id' => 20, '--force' => true]);
$this->assertStringContainsString('Failed Message Details', $tester->getDisplay());
$this->assertStringContainsString('Message with id 20 removed.', $tester->getDisplay());
}
/**
* @group legacy
*/
public function testRemoveUniqueMessage()
{
$receiver = $this->createMock(ListableReceiverInterface::class);
@ -53,6 +82,92 @@ class FailedMessagesRemoveCommandTest extends TestCase
$this->assertStringContainsString('Message with id 20 removed.', $tester->getDisplay());
}
public function testRemoveUniqueMessageWithServiceLocator()
{
$globalFailureReceiverName = 'failure_receiver';
$receiver = $this->createMock(ListableReceiverInterface::class);
$receiver->expects($this->once())->method('find')->with(20)->willReturn(new Envelope(new \stdClass()));
$serviceLocator = $this->createMock(ServiceLocator::class);
$serviceLocator->expects($this->once())->method('has')->with($globalFailureReceiverName)->willReturn(true);
$serviceLocator->expects($this->any())->method('get')->with($globalFailureReceiverName)->willReturn($receiver);
$command = new FailedMessagesRemoveCommand(
$globalFailureReceiverName,
$serviceLocator
);
$tester = new CommandTester($command);
$tester->execute(['id' => [20], '--force' => true]);
$this->assertStringContainsString('Failed Message Details', $tester->getDisplay());
$this->assertStringContainsString('Message with id 20 removed.', $tester->getDisplay());
}
public function testRemoveUniqueMessageWithServiceLocatorFromSpecificFailureTransport()
{
$failureReveiverName = 'specific_failure_receiver';
$receiver = $this->createMock(ListableReceiverInterface::class);
$receiver->expects($this->once())->method('find')->with(20)->willReturn(new Envelope(new \stdClass()));
$serviceLocator = $this->createMock(ServiceLocator::class);
$serviceLocator->expects($this->once())->method('has')->with($failureReveiverName)->willReturn(true);
$serviceLocator->expects($this->any())->method('get')->with($failureReveiverName)->willReturn($receiver);
$command = new FailedMessagesRemoveCommand(
$failureReveiverName,
$serviceLocator
);
$tester = new CommandTester($command);
$tester->execute(['id' => [20], '--transport' => $failureReveiverName, '--force' => true]);
$this->assertStringContainsString('Failed Message Details', $tester->getDisplay());
$this->assertStringContainsString('Message with id 20 removed.', $tester->getDisplay());
}
/**
* @group legacy
*/
public function testThrowExceptionIfFailureTransportNotDefined()
{
$failureReceiverName = 'failure_receiver';
$receiver = $this->createMock(ListableReceiverInterface::class);
$this->expectException(InvalidArgumentException::class);
$command = new FailedMessagesRemoveCommand(
null,
$receiver
);
$tester = new CommandTester($command);
$tester->execute(['id' => [20], '--transport' => $failureReceiverName, '--force' => true]);
$this->assertStringContainsString('Failed Message Details', $tester->getDisplay());
$this->assertStringContainsString('Message with id 20 removed.', $tester->getDisplay());
}
public function testThrowExceptionIfFailureTransportNotDefinedWithServiceLocator()
{
$failureReceiverName = 'failure_receiver';
$serviceLocator = $this->createMock(ServiceLocator::class);
$serviceLocator->expects($this->once())->method('has')->with($failureReceiverName)->willReturn(false);
$this->expectException(InvalidArgumentException::class);
$command = new FailedMessagesRemoveCommand(
$failureReceiverName,
$serviceLocator
);
$tester = new CommandTester($command);
$tester->execute(['id' => [20], '--transport' => $failureReceiverName, '--force' => true]);
$this->assertStringContainsString('Failed Message Details', $tester->getDisplay());
$this->assertStringContainsString('Message with id 20 removed.', $tester->getDisplay());
}
/**
* @group legacy
*/
public function testRemoveMultipleMessages()
{
$receiver = $this->createMock(ListableReceiverInterface::class);
@ -76,6 +191,37 @@ class FailedMessagesRemoveCommandTest extends TestCase
$this->assertStringContainsString('Message with id 40 removed.', $tester->getDisplay());
}
public function testRemoveMultipleMessagesWithServiceLocator()
{
$globalFailureReceiverName = 'failure_receiver';
$receiver = $this->createMock(ListableReceiverInterface::class);
$receiver->expects($this->exactly(3))->method('find')->withConsecutive([20], [30], [40])->willReturnOnConsecutiveCalls(
new Envelope(new \stdClass()),
null,
new Envelope(new \stdClass())
);
$serviceLocator = $this->createMock(ServiceLocator::class);
$serviceLocator->expects($this->once())->method('has')->with($globalFailureReceiverName)->willReturn(true);
$serviceLocator->expects($this->any())->method('get')->with($globalFailureReceiverName)->willReturn($receiver);
$command = new FailedMessagesRemoveCommand(
$globalFailureReceiverName,
$serviceLocator
);
$tester = new CommandTester($command);
$tester->execute(['id' => [20, 30, 40], '--force' => true]);
$this->assertStringNotContainsString('Failed Message Details', $tester->getDisplay());
$this->assertStringContainsString('Message with id 20 removed.', $tester->getDisplay());
$this->assertStringContainsString('The message with id "30" was not found.', $tester->getDisplay());
$this->assertStringContainsString('Message with id 40 removed.', $tester->getDisplay());
}
/**
* @group legacy
*/
public function testRemoveMultipleMessagesAndDisplayMessages()
{
$receiver = $this->createMock(ListableReceiverInterface::class);
@ -96,4 +242,30 @@ class FailedMessagesRemoveCommandTest extends TestCase
$this->assertStringContainsString('Message with id 20 removed.', $tester->getDisplay());
$this->assertStringContainsString('Message with id 30 removed.', $tester->getDisplay());
}
public function testRemoveMultipleMessagesAndDisplayMessagesWithServiceLocator()
{
$globalFailureReceiverName = 'failure_receiver';
$receiver = $this->createMock(ListableReceiverInterface::class);
$receiver->expects($this->exactly(2))->method('find')->withConsecutive([20], [30])->willReturnOnConsecutiveCalls(
new Envelope(new \stdClass()),
new Envelope(new \stdClass())
);
$serviceLocator = $this->createMock(ServiceLocator::class);
$serviceLocator->expects($this->once())->method('has')->with($globalFailureReceiverName)->willReturn(true);
$serviceLocator->expects($this->any())->method('get')->with($globalFailureReceiverName)->willReturn($receiver);
$command = new FailedMessagesRemoveCommand(
$globalFailureReceiverName,
$serviceLocator
);
$tester = new CommandTester($command);
$tester->execute(['id' => [20, 30], '--force' => true, '--show-messages' => true]);
$this->assertStringContainsString('Failed Message Details', $tester->getDisplay());
$this->assertStringContainsString('Message with id 20 removed.', $tester->getDisplay());
$this->assertStringContainsString('Message with id 30 removed.', $tester->getDisplay());
}
}

View File

@ -13,6 +13,7 @@ namespace Symfony\Component\Messenger\Tests\Command;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Console\Tester\CommandTester;
use Symfony\Component\DependencyInjection\ServiceLocator;
use Symfony\Component\EventDispatcher\EventDispatcher;
use Symfony\Component\Messenger\Command\FailedMessagesRetryCommand;
use Symfony\Component\Messenger\Envelope;
@ -21,6 +22,9 @@ use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
class FailedMessagesRetryCommandTest extends TestCase
{
/**
* @group legacy
*/
public function testBasicRun()
{
$receiver = $this->createMock(ListableReceiverInterface::class);
@ -45,4 +49,99 @@ class FailedMessagesRetryCommandTest extends TestCase
$this->assertStringContainsString('[OK]', $tester->getDisplay());
}
public function testBasicRunWithServiceLocator()
{
$receiver = $this->createMock(ListableReceiverInterface::class);
$receiver->expects($this->exactly(2))->method('find')->withConsecutive([10], [12])->willReturn(new Envelope(new \stdClass()));
// message will eventually be ack'ed in Worker
$receiver->expects($this->exactly(2))->method('ack');
$dispatcher = new EventDispatcher();
$bus = $this->createMock(MessageBusInterface::class);
// the bus should be called in the worker
$bus->expects($this->exactly(2))->method('dispatch')->willReturn(new Envelope(new \stdClass()));
$failureTransportName = 'failure_receiver';
$serviceLocator = $this->createMock(ServiceLocator::class);
$serviceLocator->method('has')->with($failureTransportName)->willReturn(true);
$serviceLocator->method('get')->with($failureTransportName)->willReturn($receiver);
$command = new FailedMessagesRetryCommand(
$failureTransportName,
$serviceLocator,
$bus,
$dispatcher
);
$tester = new CommandTester($command);
$tester->execute(['id' => [10, 12], '--force' => true]);
$this->assertStringContainsString('[OK]', $tester->getDisplay());
$this->assertStringNotContainsString('Available failure transports are:', $tester->getDisplay());
}
public function testBasicRunWithServiceLocatorMultipleFailedTransportsDefined()
{
$receiver = $this->createMock(ListableReceiverInterface::class);
$receiver->method('all')->willReturn([]);
$dispatcher = new EventDispatcher();
$bus = $this->createMock(MessageBusInterface::class);
$failureTransportName = 'failure_receiver';
$serviceLocator = $this->createMock(ServiceLocator::class);
$serviceLocator->method('has')->with($failureTransportName)->willReturn(true);
$serviceLocator->method('get')->with($failureTransportName)->willReturn($receiver);
$serviceLocator->method('getProvidedServices')->willReturn([
'failure_receiver' => [],
'failure_receiver_2' => [],
'failure_receiver_3' => [],
]);
$command = new FailedMessagesRetryCommand(
$failureTransportName,
$serviceLocator,
$bus,
$dispatcher
);
$tester = new CommandTester($command);
$tester->setInputs([0]);
$tester->execute(['--force' => true]);
$expectedLadingMessage = <<<EOF
> Available failure transports are: failure_receiver, failure_receiver_2, failure_receiver_3
EOF;
$this->assertStringContainsString($expectedLadingMessage, $tester->getDisplay());
}
public function testBasicRunWithServiceLocatorWithSpecificFailureTransport()
{
$receiver = $this->createMock(ListableReceiverInterface::class);
$receiver->expects($this->exactly(2))->method('find')->withConsecutive([10], [12])->willReturn(new Envelope(new \stdClass()));
// message will eventually be ack'ed in Worker
$receiver->expects($this->exactly(2))->method('ack');
$dispatcher = new EventDispatcher();
$bus = $this->createMock(MessageBusInterface::class);
// the bus should be called in the worker
$bus->expects($this->exactly(2))->method('dispatch')->willReturn(new Envelope(new \stdClass()));
$failureTransportName = 'failure_receiver';
$serviceLocator = $this->createMock(ServiceLocator::class);
$serviceLocator->method('has')->with($failureTransportName)->willReturn(true);
$serviceLocator->method('get')->with($failureTransportName)->willReturn($receiver);
$command = new FailedMessagesRetryCommand(
$failureTransportName,
$serviceLocator,
$bus,
$dispatcher
);
$tester = new CommandTester($command);
$tester->execute(['id' => [10, 12], '--transport' => $failureTransportName, '--force' => true]);
$this->assertStringContainsString('[OK]', $tester->getDisplay());
}
}

View File

@ -14,6 +14,7 @@ namespace Symfony\Component\Messenger\Tests\Command;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Tester\CommandTester;
use Symfony\Component\DependencyInjection\ServiceLocator;
use Symfony\Component\Messenger\Command\FailedMessagesShowCommand;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Stamp\ErrorDetailsStamp;
@ -28,6 +29,9 @@ use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
*/
class FailedMessagesShowCommandTest extends TestCase
{
/**
* @group legacy
*/
public function testBasicRun()
{
$sentToFailureStamp = new SentToFailureTransportStamp('async');
@ -51,7 +55,7 @@ class FailedMessagesShowCommandTest extends TestCase
$tester->execute(['id' => 15]);
$this->assertStringContainsString(sprintf(<<<EOF
------------- ---------------------
------------- ---------------------
Class stdClass
Message Id 15
Failed at %s
@ -65,6 +69,51 @@ EOF
$tester->getDisplay(true));
}
public function testBasicRunWithServiceLocator()
{
$sentToFailureStamp = new SentToFailureTransportStamp('async');
$redeliveryStamp = new RedeliveryStamp(0);
$errorStamp = ErrorDetailsStamp::create(new \Exception('Things are bad!', 123));
$envelope = new Envelope(new \stdClass(), [
new TransportMessageIdStamp(15),
$sentToFailureStamp,
$redeliveryStamp,
$errorStamp,
]);
$receiver = $this->createMock(ListableReceiverInterface::class);
$receiver->expects($this->once())->method('find')->with(15)->willReturn($envelope);
$failureTransportName = 'failure_receiver';
$serviceLocator = $this->createMock(ServiceLocator::class);
$serviceLocator->method('has')->with($failureTransportName)->willReturn(true);
$serviceLocator->method('get')->with($failureTransportName)->willReturn($receiver);
$command = new FailedMessagesShowCommand(
$failureTransportName,
$serviceLocator
);
$tester = new CommandTester($command);
$tester->execute(['id' => 15]);
$this->assertStringContainsString(sprintf(<<<EOF
------------- ---------------------
Class stdClass
Message Id 15
Failed at %s
Error Things are bad!
Error Code 123
Error Class Exception
Transport async
EOF
,
$redeliveryStamp->getRedeliveredAt()->format('Y-m-d H:i:s')),
$tester->getDisplay(true));
}
/**
* @group legacy
*/
public function testMultipleRedeliveryFails()
{
$sentToFailureStamp = new SentToFailureTransportStamp('async');
@ -101,6 +150,48 @@ EOF
$tester->getDisplay(true));
}
public function testMultipleRedeliveryFailsWithServiceLocator()
{
$sentToFailureStamp = new SentToFailureTransportStamp('async');
$redeliveryStamp1 = new RedeliveryStamp(0);
$errorStamp = ErrorDetailsStamp::create(new \Exception('Things are bad!', 123));
$redeliveryStamp2 = new RedeliveryStamp(0);
$envelope = new Envelope(new \stdClass(), [
new TransportMessageIdStamp(15),
$sentToFailureStamp,
$redeliveryStamp1,
$errorStamp,
$redeliveryStamp2,
]);
$receiver = $this->createMock(ListableReceiverInterface::class);
$receiver->expects($this->once())->method('find')->with(15)->willReturn($envelope);
$failureTransportName = 'failure_receiver';
$serviceLocator = $this->createMock(ServiceLocator::class);
$serviceLocator->method('has')->with($failureTransportName)->willReturn(true);
$serviceLocator->method('get')->with($failureTransportName)->willReturn($receiver);
$command = new FailedMessagesShowCommand(
$failureTransportName,
$serviceLocator
);
$tester = new CommandTester($command);
$tester->execute(['id' => 15]);
$this->assertStringContainsString(sprintf(<<<EOF
------------- ---------------------
Class stdClass
Message Id 15
Failed at %s
Error Things are bad!
Error Code 123
Error Class Exception
Transport async
EOF
,
$redeliveryStamp2->getRedeliveredAt()->format('Y-m-d H:i:s')),
$tester->getDisplay(true));
}
/**
* @group legacy
*/
@ -136,6 +227,9 @@ EOF
$tester->getDisplay(true));
}
/**
* @group legacy
*/
public function testReceiverShouldBeListable()
{
$receiver = $this->createMock(ReceiverInterface::class);
@ -150,6 +244,28 @@ EOF
$tester->execute(['id' => 15]);
}
public function testReceiverShouldBeListableWithServiceLocator()
{
$receiver = $this->createMock(ReceiverInterface::class);
$failureTransportName = 'failure_receiver';
$serviceLocator = $this->createMock(ServiceLocator::class);
$serviceLocator->method('has')->with($failureTransportName)->willReturn(true);
$serviceLocator->method('get')->with($failureTransportName)->willReturn($receiver);
$command = new FailedMessagesShowCommand(
$failureTransportName,
$serviceLocator
);
$this->expectExceptionMessage('The "failure_receiver" receiver does not support listing or showing specific messages.');
$tester = new CommandTester($command);
$tester->execute(['id' => 15]);
}
/**
* @group legacy
*/
public function testListMessages()
{
$sentToFailureStamp = new SentToFailureTransportStamp('async');
@ -179,6 +295,56 @@ EOF
$tester->getDisplay(true));
}
public function testListMessagesWithServiceLocator()
{
$sentToFailureStamp = new SentToFailureTransportStamp('async');
$redeliveryStamp = new RedeliveryStamp(0);
$errorStamp = ErrorDetailsStamp::create(new \RuntimeException('Things are bad!'));
$envelope = new Envelope(new \stdClass(), [
new TransportMessageIdStamp(15),
$sentToFailureStamp,
$redeliveryStamp,
$errorStamp,
]);
$receiver = $this->createMock(ListableReceiverInterface::class);
$receiver->expects($this->once())->method('all')->with()->willReturn([$envelope]);
$failureTransportName = 'failure_receiver';
$serviceLocator = $this->createMock(ServiceLocator::class);
$serviceLocator->method('has')->with($failureTransportName)->willReturn(true);
$serviceLocator->method('get')->with($failureTransportName)->willReturn($receiver);
$serviceLocator->method('getProvidedServices')->willReturn([
$failureTransportName => [],
'failure_receiver_2' => [],
'failure_receiver_3' => [],
]);
$command = new FailedMessagesShowCommand(
$failureTransportName,
$serviceLocator
);
$tester = new CommandTester($command);
$tester->setInputs([0]);
$tester->execute([]);
$this->assertStringContainsString(sprintf(<<<EOF
15 stdClass %s Things are bad!
EOF
,
$redeliveryStamp->getRedeliveredAt()->format('Y-m-d H:i:s')),
$tester->getDisplay(true));
$expectedLoadingMessage = <<<EOF
> Available failure transports are: failure_receiver, failure_receiver_2, failure_receiver_3
EOF;
$this->assertStringContainsString($expectedLoadingMessage, $tester->getDisplay());
$this->assertStringContainsString('Run messenger:failed:show {id} --transport=failure_receiver -vv to see message details.', $tester->getDisplay());
}
/**
* @group legacy
*/
public function testListMessagesReturnsNoMessagesFound()
{
$receiver = $this->createMock(ListableReceiverInterface::class);
@ -194,6 +360,28 @@ EOF
$this->assertStringContainsString('[OK] No failed messages were found.', $tester->getDisplay(true));
}
public function testListMessagesReturnsNoMessagesFoundWithServiceLocator()
{
$receiver = $this->createMock(ListableReceiverInterface::class);
$receiver->expects($this->once())->method('all')->with()->willReturn([]);
$failureTransportName = 'failure_receiver';
$serviceLocator = $this->createMock(ServiceLocator::class);
$serviceLocator->method('has')->with($failureTransportName)->willReturn(true);
$serviceLocator->method('get')->with($failureTransportName)->willReturn($receiver);
$command = new FailedMessagesShowCommand(
$failureTransportName,
$serviceLocator
);
$tester = new CommandTester($command);
$tester->execute([]);
$this->assertStringContainsString('[OK] No failed messages were found.', $tester->getDisplay(true));
}
/**
* @group legacy
*/
public function testListMessagesReturnsPaginatedMessages()
{
$sentToFailureStamp = new SentToFailureTransportStamp('async');
@ -216,6 +404,36 @@ EOF
$this->assertStringContainsString('Showing first 1 messages.', $tester->getDisplay(true));
}
public function testListMessagesReturnsPaginatedMessagesWithServiceLocator()
{
$sentToFailureStamp = new SentToFailureTransportStamp('async');
$envelope = new Envelope(new \stdClass(), [
new TransportMessageIdStamp(15),
$sentToFailureStamp,
new RedeliveryStamp(0),
ErrorDetailsStamp::create(new \RuntimeException('Things are bad!')),
]);
$receiver = $this->createMock(ListableReceiverInterface::class);
$receiver->expects($this->once())->method('all')->with()->willReturn([$envelope]);
$failureTransportName = 'failure_receiver';
$serviceLocator = $this->createMock(ServiceLocator::class);
$serviceLocator->method('has')->with($failureTransportName)->willReturn(true);
$serviceLocator->method('get')->with($failureTransportName)->willReturn($receiver);
$command = new FailedMessagesShowCommand(
$failureTransportName,
$serviceLocator
);
$tester = new CommandTester($command);
$tester->execute(['--max' => 1]);
$this->assertStringContainsString('Showing first 1 messages.', $tester->getDisplay(true));
}
/**
* @group legacy
*/
public function testInvalidMessagesThrowsException()
{
$sentToFailureStamp = new SentToFailureTransportStamp('async');
@ -236,6 +454,29 @@ EOF
$tester->execute(['id' => 15]);
}
public function testInvalidMessagesThrowsExceptionWithServiceLocator()
{
$receiver = $this->createMock(ListableReceiverInterface::class);
$failureTransportName = 'failure_receiver';
$serviceLocator = $this->createMock(ServiceLocator::class);
$serviceLocator->method('has')->with($failureTransportName)->willReturn(true);
$serviceLocator->method('get')->with($failureTransportName)->willReturn($receiver);
$command = new FailedMessagesShowCommand(
$failureTransportName,
$serviceLocator
);
$this->expectExceptionMessage('The message "15" was not found.');
$tester = new CommandTester($command);
$tester->execute(['id' => 15]);
}
/**
* @group legacy
*/
public function testVeryVerboseOutputForSingleMessageContainsExceptionWithTrace()
{
$exception = new \RuntimeException('Things are bad!');
@ -275,4 +516,83 @@ EOF
__FILE__, $exceptionLine, $exceptionLine),
$tester->getDisplay(true));
}
public function testVeryVerboseOutputForSingleMessageContainsExceptionWithTraceWithServiceLocator()
{
$exception = new \RuntimeException('Things are bad!');
$exceptionLine = __LINE__ - 1;
$envelope = new Envelope(new \stdClass(), [
new TransportMessageIdStamp(15),
new SentToFailureTransportStamp('async'),
new RedeliveryStamp(0),
ErrorDetailsStamp::create($exception),
]);
$receiver = $this->createMock(ListableReceiverInterface::class);
$receiver->expects($this->once())->method('find')->with(42)->willReturn($envelope);
$failureTransportName = 'failure_receiver';
$serviceLocator = $this->createMock(ServiceLocator::class);
$serviceLocator->method('has')->with($failureTransportName)->willReturn(true);
$serviceLocator->method('get')->with($failureTransportName)->willReturn($receiver);
$command = new FailedMessagesShowCommand($failureTransportName, $serviceLocator);
$tester = new CommandTester($command);
$tester->execute(['id' => 42], ['verbosity' => OutputInterface::VERBOSITY_VERY_VERBOSE]);
$this->assertStringMatchesFormat(sprintf(<<<'EOF'
%%A
Exception:
==========
RuntimeException {
message: "Things are bad!"
code: 0
file: "%s"
line: %d
trace: {
%%s%%eTests%%eCommand%%eFailedMessagesShowCommandTest.php:%d {
Symfony\Component\Messenger\Tests\Command\FailedMessagesShowCommandTest->testVeryVerboseOutputForSingleMessageContainsExceptionWithTraceWithServiceLocator()
{
$exception = new \RuntimeException('Things are bad!');
$exceptionLine = __LINE__ - 1;
}
%%A
EOF
,
__FILE__, $exceptionLine, $exceptionLine),
$tester->getDisplay(true));
}
public function testListMessagesWithServiceLocatorFromSpecificTransport()
{
$sentToFailureStamp = new SentToFailureTransportStamp('async');
$redeliveryStamp = new RedeliveryStamp(0);
$errorStamp = ErrorDetailsStamp::create(new \RuntimeException('Things are bad!'));
$envelope = new Envelope(new \stdClass(), [
new TransportMessageIdStamp(15),
$sentToFailureStamp,
$redeliveryStamp,
$errorStamp,
]);
$receiver = $this->createMock(ListableReceiverInterface::class);
$receiver->expects($this->once())->method('all')->with()->willReturn([$envelope]);
$failureTransportName = 'failure_receiver_another';
$serviceLocator = $this->createMock(ServiceLocator::class);
$serviceLocator->method('has')->with($failureTransportName)->willReturn(true);
$serviceLocator->method('get')->with($failureTransportName)->willReturn($receiver);
$command = new FailedMessagesShowCommand(
'global_but_not_used',
$serviceLocator
);
$tester = new CommandTester($command);
$tester->execute(['--transport' => $failureTransportName]);
$this->assertStringContainsString(sprintf(<<<EOF
15 stdClass %s Things are bad!
EOF
,
$redeliveryStamp->getRedeliveredAt()->format('Y-m-d H:i:s')),
$tester->getDisplay(true));
}
}

View File

@ -22,6 +22,8 @@ use Symfony\Component\DependencyInjection\ServiceLocator;
use Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpReceiver;
use Symfony\Component\Messenger\Command\ConsumeMessagesCommand;
use Symfony\Component\Messenger\Command\DebugCommand;
use Symfony\Component\Messenger\Command\FailedMessagesRetryCommand;
use Symfony\Component\Messenger\Command\FailedMessagesShowCommand;
use Symfony\Component\Messenger\Command\SetupTransportsCommand;
use Symfony\Component\Messenger\DataCollector\MessengerDataCollector;
use Symfony\Component\Messenger\DependencyInjection\MessengerPass;
@ -402,7 +404,7 @@ class MessengerPassTest extends TestCase
public function testItThrowsAnExceptionOnUnknownBus()
{
$this->expectException(RuntimeException::class);
$this->expectExceptionMessage('Invalid configuration returned by method "Symfony\Component\Messenger\Tests\DependencyInjection\HandlerOnUndefinedBus::getHandledMessages()" for message "Symfony\Component\Messenger\Tests\Fixtures\DummyMessage": bus "some_undefined_bus" does not exist.');
$this->expectExceptionMessage('Invalid configuration "returned by method "Symfony\Component\Messenger\Tests\DependencyInjection\HandlerOnUndefinedBus::getHandledMessages()"" for message "Symfony\Component\Messenger\Tests\Fixtures\DummyMessage": bus "some_undefined_bus" does not exist.');
$container = $this->getContainerBuilder();
$container
->register(HandlerOnUndefinedBus::class, HandlerOnUndefinedBus::class)
@ -415,7 +417,7 @@ class MessengerPassTest extends TestCase
public function testUndefinedMessageClassForHandler()
{
$this->expectException(RuntimeException::class);
$this->expectExceptionMessage('Invalid handler service "Symfony\Component\Messenger\Tests\DependencyInjection\UndefinedMessageHandler": class or interface "Symfony\Component\Messenger\Tests\DependencyInjection\UndefinedMessage" used as argument type in method "Symfony\Component\Messenger\Tests\DependencyInjection\UndefinedMessageHandler::__invoke()" not found.');
$this->expectExceptionMessage('Invalid handler service "Symfony\Component\Messenger\Tests\DependencyInjection\UndefinedMessageHandler": class or interface "Symfony\Component\Messenger\Tests\DependencyInjection\UndefinedMessage" "used as argument type in method "Symfony\Component\Messenger\Tests\DependencyInjection\UndefinedMessageHandler::__invoke()"" not found.');
$container = $this->getContainerBuilder();
$container
->register(UndefinedMessageHandler::class, UndefinedMessageHandler::class)
@ -428,7 +430,7 @@ class MessengerPassTest extends TestCase
public function testUndefinedMessageClassForHandlerImplementingMessageHandlerInterface()
{
$this->expectException(RuntimeException::class);
$this->expectExceptionMessage('Invalid handler service "Symfony\Component\Messenger\Tests\DependencyInjection\UndefinedMessageHandlerViaHandlerInterface": class or interface "Symfony\Component\Messenger\Tests\DependencyInjection\UndefinedMessage" used as argument type in method "Symfony\Component\Messenger\Tests\DependencyInjection\UndefinedMessageHandlerViaHandlerInterface::__invoke()" not found.');
$this->expectExceptionMessage('Invalid handler service "Symfony\Component\Messenger\Tests\DependencyInjection\UndefinedMessageHandlerViaHandlerInterface": class or interface "Symfony\Component\Messenger\Tests\DependencyInjection\UndefinedMessage" "used as argument type in method "Symfony\Component\Messenger\Tests\DependencyInjection\UndefinedMessageHandlerViaHandlerInterface::__invoke()"" not found.');
$container = $this->getContainerBuilder();
$container
->register(UndefinedMessageHandlerViaHandlerInterface::class, UndefinedMessageHandlerViaHandlerInterface::class)
@ -441,7 +443,7 @@ class MessengerPassTest extends TestCase
public function testUndefinedMessageClassForHandlerImplementingMessageSubscriberInterface()
{
$this->expectException(RuntimeException::class);
$this->expectExceptionMessage('Invalid handler service "Symfony\Component\Messenger\Tests\DependencyInjection\UndefinedMessageHandlerViaSubscriberInterface": class or interface "Symfony\Component\Messenger\Tests\DependencyInjection\UndefinedMessage" returned by method "Symfony\Component\Messenger\Tests\DependencyInjection\UndefinedMessageHandlerViaSubscriberInterface::getHandledMessages()" not found.');
$this->expectExceptionMessage('Invalid handler service "Symfony\Component\Messenger\Tests\DependencyInjection\UndefinedMessageHandlerViaSubscriberInterface": class or interface "Symfony\Component\Messenger\Tests\DependencyInjection\UndefinedMessage" "returned by method "Symfony\Component\Messenger\Tests\DependencyInjection\UndefinedMessageHandlerViaSubscriberInterface::getHandledMessages()"" not found.');
$container = $this->getContainerBuilder();
$container
->register(UndefinedMessageHandlerViaSubscriberInterface::class, UndefinedMessageHandlerViaSubscriberInterface::class)
@ -693,6 +695,34 @@ class MessengerPassTest extends TestCase
$this->assertEquals($options[$index] ?? [], $definitionArguments[1]);
}
}
public function testFailedCommandsRegisteredWithServiceLocatorArgumentReplaced()
{
$globalReceiverName = 'global_failure_transport';
$container = $this->getContainerBuilder($messageBusId = 'message_bus');
$container->register('console.command.messenger_failed_messages_retry', FailedMessagesRetryCommand::class)
->setArgument(0, $globalReceiverName)
->setArgument(1, null)
->setArgument(2, new Reference($messageBusId));
$container->register('console.command.messenger_failed_messages_show', FailedMessagesShowCommand::class)
->setArgument(0, $globalReceiverName)
->setArgument(1, null);
$container->register('console.command.messenger_failed_messages_remove', FailedMessagesRetryCommand::class)
->setArgument(0, $globalReceiverName)
->setArgument(1, null);
(new MessengerPass())->process($container);
$retryDefinition = $container->getDefinition('console.command.messenger_failed_messages_retry');
$this->assertNotNull($retryDefinition->getArgument(1));
$showDefinition = $container->getDefinition('console.command.messenger_failed_messages_show');
$this->assertNotNull($showDefinition->getArgument(1));
$removeDefinition = $container->getDefinition('console.command.messenger_failed_messages_remove');
$this->assertNotNull($removeDefinition->getArgument(1));
}
}
class DummyHandler

View File

@ -12,6 +12,7 @@
namespace Symfony\Component\Messenger\Tests\EventListener;
use PHPUnit\Framework\TestCase;
use Symfony\Component\DependencyInjection\ServiceLocator;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
use Symfony\Component\Messenger\EventListener\SendFailedMessageToFailureTransportListener;
@ -20,6 +21,9 @@ use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
class SendFailedMessageToFailureTransportListenerTest extends TestCase
{
/**
* @group legacy
*/
public function testItSendsToTheFailureTransport()
{
$sender = $this->createMock(SenderInterface::class);
@ -43,11 +47,56 @@ class SendFailedMessageToFailureTransportListenerTest extends TestCase
$listener->onMessageFailed($event);
}
public function testItSendsToTheFailureTransportWithSenderLocator()
{
$receiverName = 'my_receiver';
$sender = $this->createMock(SenderInterface::class);
$sender->expects($this->once())->method('send')->with($this->callback(function ($envelope) use ($receiverName) {
/* @var Envelope $envelope */
$this->assertInstanceOf(Envelope::class, $envelope);
/** @var SentToFailureTransportStamp $sentToFailureTransportStamp */
$sentToFailureTransportStamp = $envelope->last(SentToFailureTransportStamp::class);
$this->assertNotNull($sentToFailureTransportStamp);
$this->assertSame($receiverName, $sentToFailureTransportStamp->getOriginalReceiverName());
return true;
}))->willReturnArgument(0);
$serviceLocator = $this->createMock(ServiceLocator::class);
$serviceLocator->expects($this->once())->method('has')->willReturn(true);
$serviceLocator->expects($this->once())->method('get')->with($receiverName)->willReturn($sender);
$listener = new SendFailedMessageToFailureTransportListener($serviceLocator);
$exception = new \Exception('no!');
$envelope = new Envelope(new \stdClass());
$event = new WorkerMessageFailedEvent($envelope, 'my_receiver', $exception);
$listener->onMessageFailed($event);
}
/**
* @group legacy
*/
public function testDoNothingOnRetry()
{
$sender = $this->createMock(SenderInterface::class);
$sender->expects($this->never())->method('send');
$listener = new SendFailedMessageToFailureTransportListener($sender);
$envelope = new Envelope(new \stdClass());
$event = new WorkerMessageFailedEvent($envelope, 'my_receiver', new \Exception());
$event->setForRetry();
$listener->onMessageFailed($event);
}
public function testDoNothingOnRetryWithServiceLocator()
{
$sender = $this->createMock(SenderInterface::class);
$sender->expects($this->never())->method('send');
$serviceLocator = $this->createMock(ServiceLocator::class);
$listener = new SendFailedMessageToFailureTransportListener($serviceLocator);
$envelope = new Envelope(new \stdClass());
$event = new WorkerMessageFailedEvent($envelope, 'my_receiver', new \Exception());
@ -56,6 +105,9 @@ class SendFailedMessageToFailureTransportListenerTest extends TestCase
$listener->onMessageFailed($event);
}
/**
* @group legacy
*/
public function testDoNotRedeliverToFailed()
{
$sender = $this->createMock(SenderInterface::class);
@ -69,4 +121,65 @@ class SendFailedMessageToFailureTransportListenerTest extends TestCase
$listener->onMessageFailed($event);
}
public function testDoNotRedeliverToFailedWithServiceLocator()
{
$receiverName = 'my_receiver';
$sender = $this->createMock(SenderInterface::class);
$sender->expects($this->never())->method('send');
$serviceLocator = $this->createMock(ServiceLocator::class);
$listener = new SendFailedMessageToFailureTransportListener($serviceLocator);
$envelope = new Envelope(new \stdClass(), [
new SentToFailureTransportStamp($receiverName),
]);
$event = new WorkerMessageFailedEvent($envelope, $receiverName, new \Exception());
$listener->onMessageFailed($event);
}
public function testDoNothingIfFailureTransportIsNotDefined()
{
$sender = $this->createMock(SenderInterface::class);
$sender->expects($this->never())->method('send');
$serviceLocator = $this->createMock(ServiceLocator::class);
$listener = new SendFailedMessageToFailureTransportListener($serviceLocator, null);
$exception = new \Exception('no!');
$envelope = new Envelope(new \stdClass());
$event = new WorkerMessageFailedEvent($envelope, 'my_receiver', $exception);
$listener->onMessageFailed($event);
}
public function testItSendsToTheFailureTransportWithMultipleFailedTransports()
{
$receiverName = 'my_receiver';
$sender = $this->createMock(SenderInterface::class);
$sender->expects($this->once())->method('send')->with($this->callback(function ($envelope) use ($receiverName) {
/* @var Envelope $envelope */
$this->assertInstanceOf(Envelope::class, $envelope);
/** @var SentToFailureTransportStamp $sentToFailureTransportStamp */
$sentToFailureTransportStamp = $envelope->last(SentToFailureTransportStamp::class);
$this->assertNotNull($sentToFailureTransportStamp);
$this->assertSame($receiverName, $sentToFailureTransportStamp->getOriginalReceiverName());
return true;
}))->willReturnArgument(0);
$serviceLocator = $this->createMock(ServiceLocator::class);
$serviceLocator->method('has')->with($receiverName)->willReturn(true);
$serviceLocator->method('get')->with($receiverName)->willReturn($sender);
$listener = new SendFailedMessageToFailureTransportListener($serviceLocator);
$exception = new \Exception('no!');
$envelope = new Envelope(new \stdClass());
$event = new WorkerMessageFailedEvent($envelope, 'my_receiver', $exception);
$listener->onMessageFailed($event);
}
}

View File

@ -13,6 +13,8 @@ namespace Symfony\Component\Messenger\Tests;
use PHPUnit\Framework\TestCase;
use Psr\Container\ContainerInterface;
use Psr\Log\NullLogger;
use Symfony\Component\DependencyInjection\ServiceLocator;
use Symfony\Component\EventDispatcher\EventDispatcher;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
@ -38,6 +40,9 @@ use Symfony\Component\Messenger\Worker;
class FailureIntegrationTest extends TestCase
{
/**
* @group legacy
*/
public function testRequeueMechanism()
{
$transport1 = new DummyFailureTestSenderAndReceiver();
@ -216,6 +221,333 @@ class FailureIntegrationTest extends TestCase
// the failure transport is empty because it worked
$this->assertEmpty($failureTransport->getMessagesWaitingToBeReceived());
}
public function testRequeueMechanismWithServiceLocator()
{
$transport1 = new DummyFailureTestSenderAndReceiver();
$transport2 = new DummyFailureTestSenderAndReceiver();
$failureTransport = new DummyFailureTestSenderAndReceiver();
$sendersLocatorFailureTransport = new ServiceLocator([
'transport1' => function () use ($failureTransport) {
return $failureTransport;
},
'transport2' => function () use ($failureTransport) {
return $failureTransport;
},
]);
$transports = [
'transport1' => $transport1,
'transport2' => $transport2,
'the_failure_transport' => $failureTransport,
];
$locator = $this->createMock(ContainerInterface::class);
$locator->expects($this->any())
->method('has')
->willReturn(true);
$locator->expects($this->any())
->method('get')
->willReturnCallback(function ($transportName) use ($transports) {
return $transports[$transportName];
});
$senderLocator = new SendersLocator(
[DummyMessage::class => ['transport1', 'transport2']],
$locator
);
$retryStrategyLocator = $this->createMock(ContainerInterface::class);
$retryStrategyLocator->expects($this->any())
->method('has')
->willReturn(true);
$retryStrategyLocator->expects($this->any())
->method('get')
->willReturn(new MultiplierRetryStrategy(1));
// using to so we can lazily get the bus later and avoid circular problem
$transport1HandlerThatFails = new DummyTestHandler(true);
$allTransportHandlerThatWorks = new DummyTestHandler(false);
$transport2HandlerThatWorks = new DummyTestHandler(false);
$handlerLocator = new HandlersLocator([
DummyMessage::class => [
new HandlerDescriptor($transport1HandlerThatFails, [
'from_transport' => 'transport1',
'alias' => 'handler_that_fails',
]),
new HandlerDescriptor($allTransportHandlerThatWorks, [
'alias' => 'handler_that_works1',
]),
new HandlerDescriptor($transport2HandlerThatWorks, [
'from_transport' => 'transport2',
'alias' => 'handler_that_works2',
]),
],
]);
$dispatcher = new EventDispatcher();
$bus = new MessageBus([
new FailedMessageProcessingMiddleware(),
new SendMessageMiddleware($senderLocator),
new HandleMessageMiddleware($handlerLocator),
]);
$dispatcher->addSubscriber(new AddErrorDetailsStampListener());
$dispatcher->addSubscriber(new SendFailedMessageForRetryListener($locator, $retryStrategyLocator));
$dispatcher->addSubscriber(new SendFailedMessageToFailureTransportListener($sendersLocatorFailureTransport));
$dispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(1));
$runWorker = function (string $transportName) use ($transports, $bus, $dispatcher): ?\Throwable {
$throwable = null;
$failedListener = function (WorkerMessageFailedEvent $event) use (&$throwable) {
$throwable = $event->getThrowable();
};
$dispatcher->addListener(WorkerMessageFailedEvent::class, $failedListener);
$worker = new Worker([$transportName => $transports[$transportName]], $bus, $dispatcher);
$worker->run();
$dispatcher->removeListener(WorkerMessageFailedEvent::class, $failedListener);
return $throwable;
};
// send the message
$envelope = new Envelope(new DummyMessage('API'));
$bus->dispatch($envelope);
// message has been sent
$this->assertCount(1, $transport1->getMessagesWaitingToBeReceived());
$this->assertCount(1, $transport2->getMessagesWaitingToBeReceived());
$this->assertCount(0, $failureTransport->getMessagesWaitingToBeReceived());
// receive the message - one handler will fail and the message
// will be sent back to transport1 to be retried
/*
* Receive the message from "transport1"
*/
$throwable = $runWorker('transport1');
// make sure this is failing for the reason we think
$this->assertInstanceOf(HandlerFailedException::class, $throwable);
// handler for transport1 and all transports were called
$this->assertSame(1, $transport1HandlerThatFails->getTimesCalled());
$this->assertSame(1, $allTransportHandlerThatWorks->getTimesCalled());
$this->assertSame(0, $transport2HandlerThatWorks->getTimesCalled());
// one handler failed and the message is retried (resent to transport1)
$this->assertCount(1, $transport1->getMessagesWaitingToBeReceived());
$this->assertEmpty($failureTransport->getMessagesWaitingToBeReceived());
/*
* Receive the message for a (final) retry
*/
$runWorker('transport1');
// only the "failed" handler is called a 2nd time
$this->assertSame(2, $transport1HandlerThatFails->getTimesCalled());
$this->assertSame(1, $allTransportHandlerThatWorks->getTimesCalled());
// handling fails again, message is sent to failure transport
$this->assertCount(0, $transport1->getMessagesWaitingToBeReceived());
$this->assertCount(1, $failureTransport->getMessagesWaitingToBeReceived());
/** @var Envelope $failedEnvelope */
$failedEnvelope = $failureTransport->getMessagesWaitingToBeReceived()[0];
/** @var SentToFailureTransportStamp $sentToFailureStamp */
$sentToFailureStamp = $failedEnvelope->last(SentToFailureTransportStamp::class);
$this->assertNotNull($sentToFailureStamp);
/** @var ErrorDetailsStamp $errorDetailsStamp */
$errorDetailsStamp = $failedEnvelope->last(ErrorDetailsStamp::class);
$this->assertNotNull($errorDetailsStamp);
$this->assertSame('Failure from call 2', $errorDetailsStamp->getExceptionMessage());
/*
* Failed message is handled, fails, and sent for a retry
*/
$throwable = $runWorker('the_failure_transport');
// make sure this is failing for the reason we think
$this->assertInstanceOf(HandlerFailedException::class, $throwable);
// only the "failed" handler is called a 3rd time
$this->assertSame(3, $transport1HandlerThatFails->getTimesCalled());
$this->assertSame(1, $allTransportHandlerThatWorks->getTimesCalled());
// handling fails again, message is retried
$this->assertCount(1, $failureTransport->getMessagesWaitingToBeReceived());
// transport2 still only holds the original message
// a new message was never mistakenly delivered to it
$this->assertCount(1, $transport2->getMessagesWaitingToBeReceived());
/*
* Message is retried on failure transport then discarded
*/
$runWorker('the_failure_transport');
// only the "failed" handler is called a 4th time
$this->assertSame(4, $transport1HandlerThatFails->getTimesCalled());
$this->assertSame(1, $allTransportHandlerThatWorks->getTimesCalled());
// handling fails again, message is discarded
$this->assertCount(0, $failureTransport->getMessagesWaitingToBeReceived());
/*
* Execute handlers on transport2
*/
$runWorker('transport2');
// transport1 handler is not called again
$this->assertSame(4, $transport1HandlerThatFails->getTimesCalled());
// all transport handler is now called again
$this->assertSame(2, $allTransportHandlerThatWorks->getTimesCalled());
// transport1 handler called for the first time
$this->assertSame(1, $transport2HandlerThatWorks->getTimesCalled());
// all transport should be empty
$this->assertEmpty($transport1->getMessagesWaitingToBeReceived());
$this->assertEmpty($transport2->getMessagesWaitingToBeReceived());
$this->assertEmpty($failureTransport->getMessagesWaitingToBeReceived());
/*
* Dispatch the original message again
*/
$bus->dispatch($envelope);
// handle the failing message so it goes into the failure transport
$runWorker('transport1');
$runWorker('transport1');
// now make the handler work!
$transport1HandlerThatFails->setShouldThrow(false);
$runWorker('the_failure_transport');
// the failure transport is empty because it worked
$this->assertEmpty($failureTransport->getMessagesWaitingToBeReceived());
}
public function testMultipleFailedTransportsWithoutGlobalFailureTransport()
{
$transport1 = new DummyFailureTestSenderAndReceiver();
$transport2 = new DummyFailureTestSenderAndReceiver();
$failureTransport1 = new DummyFailureTestSenderAndReceiver();
$failureTransport2 = new DummyFailureTestSenderAndReceiver();
$sendersLocatorFailureTransport = new ServiceLocator([
'transport1' => function () use ($failureTransport1) {
return $failureTransport1;
},
'transport2' => function () use ($failureTransport2) {
return $failureTransport2;
},
]);
$transports = [
'transport1' => $transport1,
'transport2' => $transport2,
'the_failure_transport1' => $failureTransport1,
'the_failure_transport2' => $failureTransport2,
];
$locator = $this->createMock(ContainerInterface::class);
$locator->expects($this->any())
->method('has')
->willReturn(true);
$locator->expects($this->any())
->method('get')
->willReturnCallback(function ($transportName) use ($transports) {
return $transports[$transportName];
});
$senderLocator = new SendersLocator(
[DummyMessage::class => ['transport1', 'transport2']],
$locator
);
// retry strategy with zero retries so it goes to the failed transport after failure
$retryStrategyLocator = $this->createMock(ContainerInterface::class);
$retryStrategyLocator->expects($this->any())
->method('has')
->willReturn(true);
$retryStrategyLocator->expects($this->any())
->method('get')
->willReturn(new MultiplierRetryStrategy(0));
// using to so we can lazily get the bus later and avoid circular problem
$transport1HandlerThatFails = new DummyTestHandler(true);
$transport2HandlerThatFails = new DummyTestHandler(true);
$handlerLocator = new HandlersLocator([
DummyMessage::class => [
new HandlerDescriptor($transport1HandlerThatFails, [
'from_transport' => 'transport1',
]),
new HandlerDescriptor($transport2HandlerThatFails, [
'from_transport' => 'transport2',
]),
],
]);
$dispatcher = new EventDispatcher();
$bus = new MessageBus([
new FailedMessageProcessingMiddleware(),
new SendMessageMiddleware($senderLocator),
new HandleMessageMiddleware($handlerLocator),
]);
$dispatcher->addSubscriber(new SendFailedMessageForRetryListener($locator, $retryStrategyLocator));
$dispatcher->addSubscriber(new SendFailedMessageToFailureTransportListener(
$sendersLocatorFailureTransport,
new NullLogger()
));
$dispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(1));
$runWorker = function (string $transportName) use ($transports, $bus, $dispatcher): ?\Throwable {
$throwable = null;
$failedListener = function (WorkerMessageFailedEvent $event) use (&$throwable) {
$throwable = $event->getThrowable();
};
$dispatcher->addListener(WorkerMessageFailedEvent::class, $failedListener);
$worker = new Worker([$transportName => $transports[$transportName]], $bus, $dispatcher);
$worker->run();
$dispatcher->removeListener(WorkerMessageFailedEvent::class, $failedListener);
return $throwable;
};
// send the message
$envelope = new Envelope(new DummyMessage('API'));
$bus->dispatch($envelope);
// message has been sent
$this->assertCount(1, $transport1->getMessagesWaitingToBeReceived());
$this->assertCount(1, $transport2->getMessagesWaitingToBeReceived());
$this->assertCount(0, $failureTransport1->getMessagesWaitingToBeReceived());
$this->assertCount(0, $failureTransport2->getMessagesWaitingToBeReceived());
// Receive the message from "transport1"
$throwable = $runWorker('transport1');
$this->assertInstanceOf(HandlerFailedException::class, $throwable);
// handler for transport1 is called
$this->assertSame(1, $transport1HandlerThatFails->getTimesCalled());
$this->assertSame(0, $transport2HandlerThatFails->getTimesCalled());
// one handler failed and the message is sent to the failed transport of transport1
$this->assertCount(1, $failureTransport1->getMessagesWaitingToBeReceived());
$this->assertCount(0, $failureTransport2->getMessagesWaitingToBeReceived());
// consume the failure message failed on "transport1"
$runWorker('the_failure_transport1');
// "transport1" handler is called again from the "the_failed_transport1" and it fails
$this->assertSame(2, $transport1HandlerThatFails->getTimesCalled());
$this->assertSame(0, $transport2HandlerThatFails->getTimesCalled());
$this->assertCount(0, $failureTransport1->getMessagesWaitingToBeReceived());
$this->assertCount(0, $failureTransport2->getMessagesWaitingToBeReceived());
// Receive the message from "transport2"
$throwable = $runWorker('transport2');
$this->assertInstanceOf(HandlerFailedException::class, $throwable);
$this->assertSame(2, $transport1HandlerThatFails->getTimesCalled());
// handler for "transport2" is called
$this->assertSame(1, $transport2HandlerThatFails->getTimesCalled());
$this->assertCount(0, $failureTransport1->getMessagesWaitingToBeReceived());
// the failure transport "the_failure_transport2" has 1 new message failed from "transport2"
$this->assertCount(1, $failureTransport2->getMessagesWaitingToBeReceived());
// Consume the failure message failed on "transport2"
$runWorker('the_failure_transport2');
$this->assertSame(2, $transport1HandlerThatFails->getTimesCalled());
// "transport2" handler is called again from the "the_failed_transport2" and it fails
$this->assertSame(2, $transport2HandlerThatFails->getTimesCalled());
$this->assertCount(0, $failureTransport1->getMessagesWaitingToBeReceived());
// After the message fails again, the message is discarded from the "the_failure_transport2"
$this->assertCount(0, $failureTransport2->getMessagesWaitingToBeReceived());
}
}
class DummyFailureTestSenderAndReceiver implements ReceiverInterface, SenderInterface