[Messenger] Adding failure transport support

This commit is contained in:
Ryan Weaver 2019-04-07 07:00:26 -04:00 committed by Fabien Potencier
parent 2dacfcade1
commit 36487e5f44
49 changed files with 1839 additions and 150 deletions

View File

@ -1200,6 +1200,10 @@ class Configuration implements ConfigurationInterface
->end()
->end()
->end()
->scalarNode('failure_transport')
->defaultNull()
->info('Transport name to send failed messages to (after all retries have failed).')
->end()
->scalarNode('default_bus')->defaultNull()->end()
->arrayNode('buses')
->defaultValue(['messenger.bus.default' => ['default_middleware' => true, 'middleware' => []]])

View File

@ -40,9 +40,9 @@ use Symfony\Component\Config\ResourceCheckerInterface;
use Symfony\Component\Console\Application;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\DependencyInjection\Alias;
use Symfony\Component\DependencyInjection\Argument\IteratorArgument;
use Symfony\Component\DependencyInjection\Argument\ServiceClosureArgument;
use Symfony\Component\DependencyInjection\ChildDefinition;
use Symfony\Component\DependencyInjection\Compiler\ServiceLocatorTagPass;
use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\DependencyInjection\ContainerInterface;
use Symfony\Component\DependencyInjection\Definition;
@ -284,6 +284,9 @@ class FrameworkExtension extends Extension
$container->removeDefinition('console.command.messenger_debug');
$container->removeDefinition('console.command.messenger_stop_workers');
$container->removeDefinition('console.command.messenger_setup_transports');
$container->removeDefinition('console.command.messenger_failed_messages_retry');
$container->removeDefinition('console.command.messenger_failed_messages_show');
$container->removeDefinition('console.command.messenger_failed_messages_remove');
}
$propertyInfoEnabled = $this->isConfigEnabled($container, $config['property_info']);
@ -1743,22 +1746,48 @@ class FrameworkExtension extends Extension
if ('*' !== $message && !class_exists($message) && !interface_exists($message, false)) {
throw new LogicException(sprintf('Invalid Messenger routing configuration: class or interface "%s" not found.', $message));
}
$senders = [];
// make sure senderAliases contains all senders
foreach ($messageConfiguration['senders'] as $sender) {
$senders[$sender] = new Reference($senderAliases[$sender] ?? $sender);
if (!isset($senderAliases[$sender])) {
$senderAliases[$sender] = $sender;
}
}
$messageToSendersMapping[$message] = new IteratorArgument($senders);
$messageToSendersMapping[$message] = $messageConfiguration['senders'];
$messagesToSendAndHandle[$message] = $messageConfiguration['send_and_handle'];
}
$senderReferences = [];
foreach ($senderAliases as $alias => $serviceId) {
$senderReferences[$alias] = new Reference($serviceId);
}
$container->getDefinition('messenger.senders_locator')
->replaceArgument(0, $messageToSendersMapping)
->replaceArgument(1, $messagesToSendAndHandle)
->replaceArgument(1, ServiceLocatorTagPass::register($container, $senderReferences))
->replaceArgument(2, $messagesToSendAndHandle)
;
$container->getDefinition('messenger.retry_strategy_locator')
->replaceArgument(0, $transportRetryReferences);
$container->getDefinition('messenger.failure.send_failed_message_to_failure_transport_listener')
->replaceArgument(1, $config['failure_transport']);
if ($config['failure_transport']) {
$container->getDefinition('console.command.messenger_failed_messages_retry')
->replaceArgument(0, $config['failure_transport'])
->replaceArgument(4, $transportRetryReferences[$config['failure_transport']] ?? null);
$container->getDefinition('console.command.messenger_failed_messages_show')
->replaceArgument(0, $config['failure_transport']);
$container->getDefinition('console.command.messenger_failed_messages_remove')
->replaceArgument(0, $config['failure_transport']);
} else {
$container->removeDefinition('console.command.messenger_failed_messages_retry');
$container->removeDefinition('console.command.messenger_failed_messages_show');
$container->removeDefinition('console.command.messenger_failed_messages_remove');
}
}
private function registerCacheConfiguration(array $config, ContainerBuilder $container)

View File

@ -114,6 +114,31 @@
<tag name="console.command" command="messenger:stop-workers" />
</service>
<service id="console.command.messenger_failed_messages_retry" class="Symfony\Component\Messenger\Command\FailedMessagesRetryCommand">
<argument /> <!-- Receiver name -->
<argument /> <!-- Receiver locator -->
<argument type="service" id="messenger.routable_message_bus" />
<argument type="service" id="event_dispatcher" />
<argument /> <!-- Retry strategy -->
<argument type="service" id="logger" />
<tag name="console.command" command="messenger:failed:retry" />
</service>
<service id="console.command.messenger_failed_messages_show" class="Symfony\Component\Messenger\Command\FailedMessagesShowCommand">
<argument /> <!-- Receiver name -->
<argument /> <!-- Receiver locator -->
<tag name="console.command" command="messenger:failed:show" />
</service>
<service id="console.command.messenger_failed_messages_remove" class="Symfony\Component\Messenger\Command\FailedMessagesRemoveCommand">
<argument /> <!-- Receiver name -->
<argument /> <!-- Receiver locator -->
<tag name="console.command" command="messenger:failed:remove" />
</service>
<service id="console.command.router_debug" class="Symfony\Bundle\FrameworkBundle\Command\RouterDebugCommand">
<argument type="service" id="router" />
<argument type="service" id="debug.file_link_formatter" on-invalid="null" />

View File

@ -9,7 +9,8 @@
<!-- Asynchronous -->
<service id="messenger.senders_locator" class="Symfony\Component\Messenger\Transport\Sender\SendersLocator">
<argument type="collection" /> <!-- Per message sender iterators -->
<argument type="collection" /> <!-- Per message senders map -->
<argument /> <!-- senders locator -->
<argument type="collection" /> <!-- Messages to send and handle -->
</service>
<service id="messenger.middleware.send_message" class="Symfony\Component\Messenger\Middleware\SendMessageMiddleware">
@ -91,5 +92,19 @@
<argument /> <!-- multiplier -->
<argument /> <!-- max delay ms -->
</service>
<!-- failed handling -->
<service id="messenger.failure.send_failed_message_to_failure_transport_listener" class="Symfony\Component\Messenger\EventListener\SendFailedMessageToFailureTransportListener">
<tag name="kernel.event_subscriber" />
<tag name="monolog.logger" channel="messenger" />
<argument type="service" id="messenger.routable_message_bus" />
<argument /> <!-- Failure transport name -->
<argument type="service" id="logger" on-invalid="ignore" />
</service>
<!-- routable message bus -->
<service id="messenger.routable_message_bus" class="Symfony\Component\Messenger\RoutableMessageBus">
<argument /> <!-- Message bus locator -->
</service>
</services>
</container>

View File

@ -328,6 +328,7 @@ class ConfigurationTest extends TestCase
'enabled' => !class_exists(FullStack::class) && interface_exists(MessageBusInterface::class),
'routing' => [],
'transports' => [],
'failure_transport' => null,
'serializer' => [
'default_serializer' => 'messenger.transport.native_php_serializer',
'symfony_serializer' => [

View File

@ -721,12 +721,16 @@ abstract class FrameworkExtensionTest extends TestCase
'*' => false,
];
$this->assertSame($messageToSendAndHandleMapping, $senderLocatorDefinition->getArgument(1));
$this->assertSame($messageToSendAndHandleMapping, $senderLocatorDefinition->getArgument(2));
$sendersMapping = $senderLocatorDefinition->getArgument(0);
$this->assertEquals([
'amqp' => new Reference('messenger.transport.amqp'),
'audit' => new Reference('audit'),
], $sendersMapping[DummyMessage::class]->getValues());
'amqp',
'audit',
], $sendersMapping[DummyMessage::class]);
$sendersLocator = $container->getDefinition((string) $senderLocatorDefinition->getArgument(1));
$this->assertSame(['amqp', 'audit'], array_keys($sendersLocator->getArgument(0)));
$this->assertEquals(new Reference('messenger.transport.amqp'), $sendersLocator->getArgument(0)['amqp']->getValues()[0]);
$this->assertEquals(new Reference('audit'), $sendersLocator->getArgument(0)['audit']->getValues()[0]);
}
public function testMessengerTransportConfiguration()

View File

@ -4,6 +4,13 @@ CHANGELOG
4.3.0
-----
* [BC BREAK] `SendersLocatorInterface` has an additional method:
`getSenderByAlias()`.
* A new `ListableReceiverInterface` was added, which a receiver
can implement (when applicable) to enable listing and fetching
individual messages by id (used in the new "Failed Messages" commands).
* Both `SenderInterface::send()` and `ReceiverInterface::get()`
should now (when applicable) add a `TransportMessageIdStamp`.
* Added `WorkerStoppedEvent` dispatched when a worker is stopped.
* Added optional `MessageCountAwareInterface` that receivers can implement
to give information about how many messages are waiting to be processed.

View File

@ -0,0 +1,112 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Command;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Helper\Dumper;
use Symfony\Component\Console\Style\SymfonyStyle;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
/**
* @author Ryan Weaver <ryan@symfonycasts.com>
*
* @internal
* @experimental in 4.3
*/
abstract class AbstractFailedMessagesCommand extends Command
{
private $receiverName;
private $receiver;
public function __construct(string $receiverName, ReceiverInterface $receiver)
{
$this->receiverName = $receiverName;
$this->receiver = $receiver;
parent::__construct();
}
protected function getReceiverName()
{
return $this->receiverName;
}
/**
* @return mixed|null
*/
protected function getMessageId(Envelope $envelope)
{
/** @var TransportMessageIdStamp $stamp */
$stamp = $envelope->last(TransportMessageIdStamp::class);
return null !== $stamp ? $stamp->getId() : null;
}
protected function displaySingleMessage(Envelope $envelope, SymfonyStyle $io)
{
$io->title('Failed Message Details');
/** @var SentToFailureTransportStamp $sentToFailureTransportStamp */
$sentToFailureTransportStamp = $envelope->last(SentToFailureTransportStamp::class);
$rows = [
['Class', \get_class($envelope->getMessage())],
];
if (null !== $id = $this->getMessageId($envelope)) {
$rows[] = ['Message Id', $id];
}
if (null === $sentToFailureTransportStamp) {
$io->warning('Message does not appear to have been sent to this transport after failing');
} else {
$rows = array_merge($rows, [
['Failed at', $sentToFailureTransportStamp->getSentAt()->format('Y-m-d H:i:s')],
['Error', $sentToFailureTransportStamp->getExceptionMessage()],
['Error Class', $sentToFailureTransportStamp->getFlattenException() ? $sentToFailureTransportStamp->getFlattenException()->getClass() : '(unknown)'],
['Transport', $sentToFailureTransportStamp->getOriginalReceiverName()],
]);
}
$io->table([], $rows);
if ($io->isVeryVerbose()) {
$io->title('Message:');
$dump = new Dumper($io);
$io->writeln($dump($envelope->getMessage()));
$io->title('Exception:');
$io->writeln($sentToFailureTransportStamp->getFlattenException()->getTraceAsString());
} else {
$io->writeln(' Re-run command with <info>-vv</info> to see more message & error details.');
}
}
protected function printPendingMessagesMessage(ReceiverInterface $receiver, SymfonyStyle $io)
{
if ($receiver instanceof MessageCountAwareInterface) {
if (1 === $receiver->getMessageCount()) {
$io->writeln('There is <comment>1</comment> message pending in the failure transport.');
} else {
$io->writeln(sprintf('There are <comment>%d</comment> messages pending in the failure transport.', $receiver->getMessageCount()));
}
}
}
protected function getReceiver(): ReceiverInterface
{
return $this->receiver;
}
}

View File

@ -0,0 +1,88 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Command;
use Symfony\Component\Console\Exception\RuntimeException;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\ConsoleOutputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Style\SymfonyStyle;
use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
/**
* @author Ryan Weaver <ryan@symfonycasts.com>
*
* @experimental in 4.3
*/
class FailedMessagesRemoveCommand extends AbstractFailedMessagesCommand
{
protected static $defaultName = 'messenger:failed:remove';
/**
* {@inheritdoc}
*/
protected function configure(): void
{
$this
->setDefinition([
new InputArgument('id', InputArgument::REQUIRED, 'Specific message id to remove'),
new InputOption('force', null, InputOption::VALUE_NONE, 'Force the operation without confirmation'),
])
->setDescription('Remove a message from the failure transport.')
->setHelp(<<<'EOF'
The <info>%command.name%</info> removes a message that is pending in the failure transport.
<info>php %command.full_name% {id}</info>
The specific id can be found via the messenger:failed:show command.
EOF
)
;
}
/**
* {@inheritdoc}
*/
protected function execute(InputInterface $input, OutputInterface $output)
{
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
$receiver = $this->getReceiver();
$shouldForce = $input->getOption('force');
$this->removeSingleMessage($input->getArgument('id'), $receiver, $io, $shouldForce);
}
private function removeSingleMessage($id, ReceiverInterface $receiver, SymfonyStyle $io, bool $shouldForce)
{
if (!$receiver instanceof ListableReceiverInterface) {
throw new RuntimeException(sprintf('The "%s" receiver does not support removing specific messages.', $this->getReceiverName()));
}
$envelope = $receiver->find($id);
if (null === $envelope) {
throw new RuntimeException(sprintf('The message with id "%s" was not found.', $id));
}
$this->displaySingleMessage($envelope, $io);
if ($shouldForce || $io->confirm('Do you want to permanently remove this message?', false)) {
$receiver->reject($envelope);
$io->success('Message removed.');
} else {
$io->note('Message not removed.');
}
}
}

View File

@ -0,0 +1,221 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Command;
use Psr\Log\LoggerInterface;
use Symfony\Component\Console\Exception\RuntimeException;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\ConsoleOutputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Style\SymfonyStyle;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
use Symfony\Component\Messenger\Exception\LogicException;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\Messenger\Transport\Receiver\SingleMessageReceiver;
use Symfony\Component\Messenger\Worker;
/**
* @author Ryan Weaver <ryan@symfonycasts.com>
*
* @experimental in 4.3
*/
class FailedMessagesRetryCommand extends AbstractFailedMessagesCommand
{
protected static $defaultName = 'messenger:failed:retry';
private $eventDispatcher;
private $messageBus;
private $retryStrategy;
private $logger;
public function __construct(string $receiverName, ReceiverInterface $receiver, MessageBusInterface $messageBus, EventDispatcherInterface $eventDispatcher, RetryStrategyInterface $retryStrategy = null, LoggerInterface $logger = null)
{
$this->eventDispatcher = $eventDispatcher;
$this->messageBus = $messageBus;
$this->retryStrategy = $retryStrategy;
$this->logger = $logger;
parent::__construct($receiverName, $receiver);
}
/**
* {@inheritdoc}
*/
protected function configure(): void
{
$this
->setDefinition([
new InputArgument('id', InputArgument::IS_ARRAY, 'Specific message id(s) to retry'),
new InputOption('force', null, InputOption::VALUE_NONE, 'Force action without confirmation'),
])
->setDescription('Retries one or more messages from the failure transport.')
->setHelp(<<<'EOF'
The <info>%command.name%</info> retries message in the failure transport.
<info>php %command.full_name%</info>
The command will interactively ask if each message should be retried
or discarded.
Some transports support retrying a specific message id, which comes
from the <info>messenger:failed:show</info> command.
<info>php %command.full_name% {id}</info>
Or pass multiple ids at once to process multiple messages:
<info>php %command.full_name% {id1} {id2} {id3}</info>
EOF
)
;
}
/**
* {@inheritdoc}
*/
protected function execute(InputInterface $input, OutputInterface $output)
{
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
$io->comment('Quit this command with CONTROL-C.');
if (!$output->isVeryVerbose()) {
$io->comment('Re-run the command with a -vv option to see logs about consumed messages.');
}
$receiver = $this->getReceiver();
$this->printPendingMessagesMessage($receiver, $io);
$io->writeln(sprintf('To retry all the messages, run <comment>messenger:consume %s</comment>', $this->getReceiverName()));
$shouldForce = $input->getOption('force');
$ids = $input->getArgument('id');
if (0 === \count($ids)) {
if (!$input->isInteractive()) {
throw new RuntimeException('Message id must be passed when in non-interactive mode.');
}
$this->runInteractive($io, $shouldForce);
return;
}
$this->retrySpecificIds($ids, $io, $shouldForce);
$io->success('All done!');
}
private function runInteractive(SymfonyStyle $io, bool $shouldForce)
{
$receiver = $this->getReceiver();
$count = 0;
if ($receiver instanceof ListableReceiverInterface) {
// for listable receivers, find the messages one-by-one
// this avoids using get(), which for some less-robust
// transports (like Doctrine), will cause the message
// to be temporarily "acked", even if the user aborts
// handling the message
while (true) {
$ids = [];
foreach ($receiver->all(1) as $envelope) {
++$count;
$id = $this->getMessageId($envelope);
if (null === $id) {
throw new LogicException(sprintf('The "%s" receiver is able to list messages by id but the envelope is missing the TransportMessageIdStamp stamp.', $this->getReceiverName()));
}
$ids[] = $id;
}
// break the loop if all messages are consumed
if (0 === \count($ids)) {
break;
}
$this->retrySpecificIds($ids, $io, $shouldForce);
}
} else {
// get() and ask messages one-by-one
$count = $this->runWorker($this->getReceiver(), $io, $shouldForce);
}
// avoid success message if nothing was processed
if (1 < $count) {
$io->success('All failed messages have been handled or removed!');
}
}
private function runWorker(ReceiverInterface $receiver, SymfonyStyle $io, bool $shouldForce): int
{
$listener = function (WorkerMessageReceivedEvent $messageReceivedEvent) use ($io, $receiver, $shouldForce) {
$envelope = $messageReceivedEvent->getEnvelope();
$this->displaySingleMessage($envelope, $io);
$shouldHandle = $shouldForce || $io->confirm('Do you want to retry (yes) or delete this message (no)?');
if ($shouldHandle) {
return;
}
$messageReceivedEvent->shouldHandle(false);
$receiver->reject($envelope);
};
$this->eventDispatcher->addListener(WorkerMessageReceivedEvent::class, $listener);
$worker = new Worker(
[$this->getReceiverName() => $receiver],
$this->messageBus,
[$this->getReceiverName() => $this->retryStrategy],
$this->eventDispatcher,
$this->logger
);
$count = 0;
try {
$worker->run([], function (?Envelope $envelope) use ($worker, $io, &$count) {
++$count;
if (null === $envelope) {
$worker->stop();
}
});
} finally {
$this->eventDispatcher->removeListener(WorkerMessageReceivedEvent::class, $listener);
}
return $count;
}
private function retrySpecificIds(array $ids, SymfonyStyle $io, bool $shouldForce)
{
$receiver = $this->getReceiver();
if (!$receiver instanceof ListableReceiverInterface) {
throw new RuntimeException(sprintf('The "%s" receiver does not support retrying messages by id.', $this->getReceiverName()));
}
foreach ($ids as $id) {
$envelope = $receiver->find($id);
if (null === $envelope) {
throw new RuntimeException(sprintf('The message "%s" was not found.', $id));
}
$singleReceiver = new SingleMessageReceiver($receiver, $envelope);
$this->runWorker($singleReceiver, $io, $shouldForce);
}
}
}

View File

@ -0,0 +1,129 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Command;
use Symfony\Component\Console\Exception\RuntimeException;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\ConsoleOutputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Style\SymfonyStyle;
use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
/**
* @author Ryan Weaver <ryan@symfonycasts.com>
*
* @experimental in 4.3
*/
class FailedMessagesShowCommand extends AbstractFailedMessagesCommand
{
protected static $defaultName = 'messenger:failed:show';
/**
* {@inheritdoc}
*/
protected function configure(): void
{
$this
->setDefinition([
new InputArgument('id', InputArgument::OPTIONAL, 'Specific message id to show'),
new InputOption('max', null, InputOption::VALUE_REQUIRED, 'Maximum number of messages to list', 50),
])
->setDescription('Shows one or more messages from the failure transport.')
->setHelp(<<<'EOF'
The <info>%command.name%</info> shows message that are pending in the failure transport.
<info>php %command.full_name%</info>
Or look at a specific message by its id:
<info>php %command.full_name% {id}</info>
EOF
)
;
}
/**
* {@inheritdoc}
*/
protected function execute(InputInterface $input, OutputInterface $output)
{
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
$receiver = $this->getReceiver();
$this->printPendingMessagesMessage($receiver, $io);
if (!$receiver instanceof ListableReceiverInterface) {
throw new RuntimeException(sprintf('The "%s" receiver does not support listing or showing specific messages.', $this->getReceiverName()));
}
if (null === $id = $input->getArgument('id')) {
$this->listMessages($io, $input->getOption('max'));
} else {
$this->showMessage($id, $io);
}
}
private function listMessages(SymfonyStyle $io, int $max)
{
/** @var ListableReceiverInterface $receiver */
$receiver = $this->getReceiver();
$envelopes = $receiver->all($max);
$rows = [];
foreach ($envelopes as $envelope) {
/** @var SentToFailureTransportStamp $sentToFailureTransportStamp */
$sentToFailureTransportStamp = $envelope->last(SentToFailureTransportStamp::class);
$rows[] = [
$this->getMessageId($envelope),
\get_class($envelope->getMessage()),
null === $sentToFailureTransportStamp ? '' : $sentToFailureTransportStamp->getSentAt()->format('Y-m-d H:i:s'),
null === $sentToFailureTransportStamp ? '' : $sentToFailureTransportStamp->getExceptionMessage(),
];
}
if (0 === \count($rows)) {
$io->success('No failed messages were found.');
return;
}
$io->table(['Id', 'Class', 'Failed at', 'Error'], $rows);
if (\count($rows) === $max) {
$io->comment(sprintf('Showing first %d messages.', $max));
}
$io->comment('Run <comment>messenger:failed:show {id} -vv</comment> to see message details.');
}
private function showMessage($id, SymfonyStyle $io)
{
/** @var ListableReceiverInterface $receiver */
$receiver = $this->getReceiver();
$envelope = $receiver->find($id);
if (null === $envelope) {
throw new RuntimeException(sprintf('The message "%s" was not found.', $id));
}
$this->displaySingleMessage($envelope, $io);
$io->writeln([
'',
sprintf(' Run <comment>messenger:failed:retry %s</comment> to retry this message.', $id),
sprintf(' Run <comment>messenger:failed:remove %s</comment> to delete it.', $id),
]);
}
}

View File

@ -247,12 +247,18 @@ class MessengerPass implements CompilerPassInterface
foreach ($receiverMapping as $name => $reference) {
$receiverNames[(string) $reference] = $name;
}
if ($container->hasDefinition('console.command.messenger_consume_messages')) {
$buses = [];
foreach ($busIds as $busId) {
$buses[$busId] = new Reference($busId);
}
$buses = [];
foreach ($busIds as $busId) {
$buses[$busId] = new Reference($busId);
}
if ($container->hasDefinition('messenger.routable_message_bus')) {
$container->getDefinition('messenger.routable_message_bus')
->replaceArgument(0, ServiceLocatorTagPass::register($container, $buses));
}
if ($container->hasDefinition('console.command.messenger_consume_messages')) {
$container->getDefinition('console.command.messenger_consume_messages')
->replaceArgument(0, ServiceLocatorTagPass::register($container, $buses))
->replaceArgument(3, array_values($receiverNames));
@ -264,6 +270,18 @@ class MessengerPass implements CompilerPassInterface
}
$container->getDefinition('messenger.receiver_locator')->replaceArgument(0, $receiverMapping);
$failedCommandIds = [
'console.command.messenger_failed_messages_retry',
'console.command.messenger_failed_messages_show',
'console.command.messenger_failed_messages_remove',
];
foreach ($failedCommandIds as $failedCommandId) {
if ($container->hasDefinition($failedCommandId)) {
$definition = $container->getDefinition($failedCommandId);
$definition->replaceArgument(1, $receiverMapping[$definition->getArgument(0)]);
}
}
}
private function registerBusToCollector(ContainerBuilder $container, string $busId)

View File

@ -20,4 +20,14 @@ namespace Symfony\Component\Messenger\Event;
*/
class WorkerMessageReceivedEvent extends AbstractWorkerMessageEvent
{
private $shouldHandle = true;
public function shouldHandle(bool $shouldHandle = null): bool
{
if (null !== $shouldHandle) {
$this->shouldHandle = $shouldHandle;
}
return $this->shouldHandle;
}
}

View File

@ -0,0 +1,89 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\EventListener;
use Psr\Log\LoggerInterface;
use Symfony\Component\Debug\Exception\FlattenException;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
use Symfony\Component\Messenger\Exception\HandlerFailedException;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Stamp\SentStamp;
use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
/**
* Sends a rejected message to a "failure transport".
*
* @author Ryan Weaver <ryan@symfonycasts.com>
*
* @experimental in 4.3
*/
class SendFailedMessageToFailureTransportListener implements EventSubscriberInterface
{
private $messageBus;
private $failureSenderAlias;
private $logger;
public function __construct(MessageBusInterface $messageBus, string $failureSenderAlias, LoggerInterface $logger = null)
{
$this->messageBus = $messageBus;
$this->failureSenderAlias = $failureSenderAlias;
$this->logger = $logger;
}
public function onMessageFailed(WorkerMessageFailedEvent $event)
{
if ($event->willRetry()) {
return;
}
$envelope = $event->getEnvelope();
// avoid re-sending to the failed sender
foreach ($envelope->all(SentStamp::class) as $sentStamp) {
/** @var SentStamp $sentStamp */
if ($sentStamp->getSenderAlias() === $this->failureSenderAlias) {
return;
}
}
// remove the received stamp so it's redelivered
$throwable = $event->getThrowable();
if ($throwable instanceof HandlerFailedException) {
$throwable = $throwable->getNestedExceptions()[0];
}
$flattenedException = \class_exists(FlattenException::class) ? FlattenException::createFromThrowable($throwable) : null;
$envelope = $envelope->withoutAll(ReceivedStamp::class)
->withoutAll(TransportMessageIdStamp::class)
->with(new SentToFailureTransportStamp($throwable->getMessage(), $event->getReceiverName(), $flattenedException))
->with(new RedeliveryStamp(0, $this->failureSenderAlias));
if (null !== $this->logger) {
$this->logger->info('Rejected message {class} will be sent to the failure transport {transport}.', [
'class' => \get_class($envelope->getMessage()),
'transport' => $this->failureSenderAlias,
]);
}
$this->messageBus->dispatch($envelope);
}
public static function getSubscribedEvents()
{
return [
WorkerMessageFailedEvent::class => ['onMessageFailed', -100],
];
}
}

View File

@ -0,0 +1,21 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Exception;
/**
* @author Ryan Weaver <ryan@symfonycasts.com>
*
* @experimental in 4.3
*/
class UnknownSenderException extends \InvalidArgumentException implements ExceptionInterface
{
}

View File

@ -32,7 +32,9 @@ class AddBusNameStampMiddleware implements MiddlewareInterface
public function handle(Envelope $envelope, StackInterface $stack): Envelope
{
$envelope = $envelope->with(new BusNameStamp($this->busName));
if (null === $envelope->last(BusNameStamp::class)) {
$envelope = $envelope->with(new BusNameStamp($this->busName));
}
return $stack->next()->handle($envelope, $stack);
}

View File

@ -19,6 +19,7 @@ use Symfony\Component\Messenger\Stamp\ForceCallHandlersStamp;
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Stamp\SentStamp;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
use Symfony\Component\Messenger\Transport\Sender\SendersLocatorInterface;
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
@ -65,12 +66,7 @@ class SendMessageMiddleware implements MiddlewareInterface
// dispatch event unless this is a redelivery
$shouldDispatchEvent = null === $redeliveryStamp;
foreach ($this->sendersLocator->getSenders($envelope, $handle) as $alias => $sender) {
// on redelivery, only deliver to the given sender
if (null !== $redeliveryStamp && !$redeliveryStamp->shouldRedeliverToSender(\get_class($sender), $alias)) {
continue;
}
foreach ($this->getSenders($envelope, $handle, $redeliveryStamp) as $alias => $sender) {
if (null !== $this->eventDispatcher && $shouldDispatchEvent) {
$event = new SendMessageToTransportsEvent($envelope);
$this->eventDispatcher->dispatch($event);
@ -107,4 +103,18 @@ class SendMessageMiddleware implements MiddlewareInterface
// message should only be sent and not be handled by the next middleware
return $envelope;
}
/**
* * @return iterable|SenderInterface[]
*/
private function getSenders(Envelope $envelope, &$handle, ?RedeliveryStamp $redeliveryStamp): iterable
{
if (null !== $redeliveryStamp) {
return [
$redeliveryStamp->getSenderClassOrAlias() => $this->sendersLocator->getSenderByAlias($redeliveryStamp->getSenderClassOrAlias()),
];
}
return $this->sendersLocator->getSenders($envelope, $handle);
}
}

View File

@ -39,12 +39,12 @@ class MultiplierRetryStrategy implements RetryStrategyInterface
private $maxDelayMilliseconds;
/**
* @param int $maxRetries The maximum number of time to retry (0 means indefinitely)
* @param int $maxRetries The maximum number of time to retry (null means indefinitely)
* @param int $delayMilliseconds Amount of time to delay (or the initial value when multiplier is used)
* @param float $multiplier Multiplier to apply to the delay each time a retry occurs
* @param int $maxDelayMilliseconds Maximum delay to allow (0 means no maximum)
*/
public function __construct(int $maxRetries = 3, int $delayMilliseconds = 1000, float $multiplier = 1, int $maxDelayMilliseconds = 0)
public function __construct(?int $maxRetries = 3, int $delayMilliseconds = 1000, float $multiplier = 1, int $maxDelayMilliseconds = 0)
{
$this->maxRetries = $maxRetries;
@ -66,7 +66,7 @@ class MultiplierRetryStrategy implements RetryStrategyInterface
public function isRetryable(Envelope $message): bool
{
if (0 === $this->maxRetries) {
if (null === $this->maxRetries) {
return true;
}

View File

@ -44,17 +44,4 @@ class RedeliveryStamp implements StampInterface
{
return $this->senderClassOrAlias;
}
public function shouldRedeliverToSender(string $senderClass, ?string $senderAlias): bool
{
if (null !== $senderAlias && $senderAlias === $this->senderClassOrAlias) {
return true;
}
if ($senderClass === $this->senderClassOrAlias) {
return true;
}
return false;
}
}

View File

@ -0,0 +1,57 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Stamp;
use Symfony\Component\Debug\Exception\FlattenException;
/**
* Stamp applied when a message is sent to the failure transport.
*
* @author Ryan Weaver <ryan@symfonycasts.com>
*
* @experimental in 4.3
*/
class SentToFailureTransportStamp implements StampInterface
{
private $exceptionMessage;
private $originalReceiverName;
private $flattenException;
private $sentAt;
public function __construct(string $exceptionMessage, string $originalReceiverName, FlattenException $flattenException = null)
{
$this->exceptionMessage = $exceptionMessage;
$this->originalReceiverName = $originalReceiverName;
$this->flattenException = $flattenException;
$this->sentAt = new \DateTime();
}
public function getExceptionMessage(): string
{
return $this->exceptionMessage;
}
public function getOriginalReceiverName(): string
{
return $this->originalReceiverName;
}
public function getFlattenException(): ?FlattenException
{
return $this->flattenException;
}
public function getSentAt(): \DateTime
{
return $this->sentAt;
}
}

View File

@ -0,0 +1,37 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Stamp;
/**
* Added by a sender or receiver to indicate the id of this message in that transport.
*
* @author Ryan Weaver <ryan@symfonycasts.com>
*
* @experimental in 4.3
*/
class TransportMessageIdStamp implements StampInterface
{
private $id;
/**
* @param mixed $id some "identifier" of the message in a transport
*/
public function __construct($id)
{
$this->id = $id;
}
public function getId()
{
return $this->id;
}
}

View File

@ -0,0 +1,37 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Tests\Command;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Console\Tester\CommandTester;
use Symfony\Component\Messenger\Command\FailedMessagesRemoveCommand;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
class FailedMessagesRemoveCommandTest extends TestCase
{
public function testBasicRun()
{
$receiver = $this->createMock(ListableReceiverInterface::class);
$receiver->expects($this->once())->method('find')->with(20)->willReturn(new Envelope(new \stdClass()));
$command = new FailedMessagesRemoveCommand(
'failure_receiver',
$receiver
);
$tester = new CommandTester($command);
$tester->execute(['id' => 20, '--force' => true]);
$this->assertContains('Message removed.', $tester->getDisplay());
}
}

View File

@ -0,0 +1,49 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Tests\Command;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Console\Tester\CommandTester;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
use Symfony\Component\Messenger\Command\FailedMessagesRetryCommand;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
use Symfony\Component\Messenger\Worker;
class FailedMessagesRetryCommandTest extends TestCase
{
public function testBasicRun()
{
$receiver = $this->createMock(ListableReceiverInterface::class);
$receiver->expects($this->exactly(2))->method('find')->withConsecutive([10], [12])->willReturn(new Envelope(new \stdClass()));
// message will eventually be ack'ed in Worker
$receiver->expects($this->exactly(2))->method('ack');
$dispatcher = $this->createMock(EventDispatcherInterface::class);
$bus = $this->createMock(MessageBusInterface::class);
// the bus should be called in the worker
$bus->expects($this->exactly(2))->method('dispatch')->willReturn(new Envelope(new \stdClass()));
$command = new FailedMessagesRetryCommand(
'failure_receiver',
$receiver,
$bus,
$dispatcher
);
$tester = new CommandTester($command);
$tester->execute(['id' => [10, 12]]);
$this->assertContains('[OK]', $tester->getDisplay());
}
}

View File

@ -0,0 +1,57 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Tests\Command;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Console\Tester\CommandTester;
use Symfony\Component\Messenger\Command\FailedMessagesShowCommand;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
/**
* @group time-sensitive
*/
class FailedMessagesShowCommandTest extends TestCase
{
public function testBasicRun()
{
$sentToFailureStamp = new SentToFailureTransportStamp('Things are bad!', 'async');
$envelope = new Envelope(new \stdClass(), [
new TransportMessageIdStamp(15),
$sentToFailureStamp,
]);
$receiver = $this->createMock(ListableReceiverInterface::class);
$receiver->expects($this->once())->method('find')->with(15)->willReturn($envelope);
$command = new FailedMessagesShowCommand(
'failure_receiver',
$receiver
);
$tester = new CommandTester($command);
$tester->execute(['id' => 15]);
$this->assertContains(sprintf(<<<EOF
------------- ---------------------
Class stdClass
Message Id 15
Failed at %s
Error Things are bad!
Error Class (unknown)
EOF
,
$sentToFailureStamp->getSentAt()->format('Y-m-d H:i:s')),
$tester->getDisplay(true));
}
}

View File

@ -0,0 +1,121 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Tests\Handler;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
use Symfony\Component\Messenger\EventListener\SendFailedMessageToFailureTransportListener;
use Symfony\Component\Messenger\Exception\HandlerFailedException;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Stamp\SentStamp;
use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
class SendFailedMessageToFailureTransportListenerTest extends TestCase
{
public function testItDispatchesToTheFailureTransport()
{
$bus = $this->createMock(MessageBusInterface::class);
$bus->expects($this->once())->method('dispatch')->with($this->callback(function ($envelope) {
/* @var Envelope $envelope */
$this->assertInstanceOf(Envelope::class, $envelope);
/** @var SentToFailureTransportStamp $sentToFailureTransportStamp */
$sentToFailureTransportStamp = $envelope->last(SentToFailureTransportStamp::class);
$this->assertNotNull($sentToFailureTransportStamp);
$this->assertSame('no!', $sentToFailureTransportStamp->getExceptionMessage());
$this->assertSame('my_receiver', $sentToFailureTransportStamp->getOriginalReceiverName());
$this->assertSame('no!', $sentToFailureTransportStamp->getFlattenException()->getMessage());
/** @var RedeliveryStamp $redeliveryStamp */
$redeliveryStamp = $envelope->last(RedeliveryStamp::class);
$this->assertSame('failure_sender', $redeliveryStamp->getSenderClassOrAlias());
$this->assertNull($envelope->last(ReceivedStamp::class));
$this->assertNull($envelope->last(TransportMessageIdStamp::class));
return true;
}))->willReturn(new Envelope(new \stdClass()));
$listener = new SendFailedMessageToFailureTransportListener(
$bus,
'failure_sender'
);
$exception = new \Exception('no!');
$envelope = new Envelope(new \stdClass());
$event = new WorkerMessageFailedEvent($envelope, 'my_receiver', $exception, false);
$listener->onMessageFailed($event);
}
public function testItGetsNestedHandlerFailedException()
{
$bus = $this->createMock(MessageBusInterface::class);
$bus->expects($this->once())->method('dispatch')->with($this->callback(function ($envelope) {
/** @var Envelope $envelope */
/** @var SentToFailureTransportStamp $sentToFailureTransportStamp */
$sentToFailureTransportStamp = $envelope->last(SentToFailureTransportStamp::class);
$this->assertNotNull($sentToFailureTransportStamp);
$this->assertSame('I am inside!', $sentToFailureTransportStamp->getExceptionMessage());
$this->assertSame('Exception', $sentToFailureTransportStamp->getFlattenException()->getClass());
return true;
}))->willReturn(new Envelope(new \stdClass()));
$listener = new SendFailedMessageToFailureTransportListener(
$bus,
'failure_sender'
);
$envelope = new Envelope(new \stdClass());
$exception = new \Exception('I am inside!');
$exception = new HandlerFailedException($envelope, [$exception]);
$event = new WorkerMessageFailedEvent($envelope, 'my_receiver', $exception, false);
$listener->onMessageFailed($event);
}
public function testDoNothingOnRetry()
{
$bus = $this->createMock(MessageBusInterface::class);
$bus->expects($this->never())->method('dispatch');
$listener = new SendFailedMessageToFailureTransportListener(
$bus,
'failure_sender'
);
$envelope = new Envelope(new \stdClass());
$event = new WorkerMessageFailedEvent($envelope, 'my_receiver', new \Exception(''), true);
$listener->onMessageFailed($event);
}
public function testDoNotRedeliverToFailed()
{
$bus = $this->createMock(MessageBusInterface::class);
$bus->expects($this->never())->method('dispatch');
$listener = new SendFailedMessageToFailureTransportListener(
$bus,
'failure_sender'
);
$envelope = new Envelope(new \stdClass(), [
new SentStamp('MySender', 'failure_sender'),
]);
$event = new WorkerMessageFailedEvent($envelope, 'my_receiver', new \Exception(''), false);
$listener->onMessageFailed($event);
}
}

View File

@ -29,5 +29,9 @@ class AddBusNameStampMiddlewareTest extends MiddlewareTestCase
$busNameStamp = $finalEnvelope->last(BusNameStamp::class);
$this->assertNotNull($busNameStamp);
$this->assertSame('the_bus_name', $busNameStamp->getBusName());
// the stamp should not be added over and over again
$finalEnvelope = $middleware->handle($finalEnvelope, $this->getStackMock());
$this->assertCount(1, $finalEnvelope->all(BusNameStamp::class));
}
}

View File

@ -11,6 +11,7 @@
namespace Symfony\Component\Messenger\Tests\Middleware;
use Psr\Container\ContainerInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Event\SendMessageToTransportsEvent;
use Symfony\Component\Messenger\Middleware\SendMessageMiddleware;
@ -34,15 +35,16 @@ class SendMessageMiddlewareTest extends MiddlewareTestCase
$envelope = new Envelope($message);
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
$middleware = new SendMessageMiddleware(new SendersLocator([DummyMessage::class => [$sender]]));
$sendersLocator = $this->createSendersLocator([DummyMessage::class => ['my_sender']], ['my_sender' => $sender]);
$middleware = new SendMessageMiddleware($sendersLocator);
$sender->expects($this->once())->method('send')->with($envelope->with(new SentStamp(\get_class($sender))))->will($this->returnArgument(0));
$sender->expects($this->once())->method('send')->with($envelope->with(new SentStamp(\get_class($sender), 'my_sender')))->will($this->returnArgument(0));
$envelope = $middleware->handle($envelope, $this->getStackMock(false));
/* @var SentStamp $stamp */
$this->assertInstanceOf(SentStamp::class, $stamp = $envelope->last(SentStamp::class), 'it adds a sent stamp');
$this->assertNull($stamp->getSenderAlias());
$this->assertSame('my_sender', $stamp->getSenderAlias());
$this->assertStringMatchesFormat('Mock_SenderInterface_%s', $stamp->getSenderClass());
}
@ -52,9 +54,8 @@ class SendMessageMiddlewareTest extends MiddlewareTestCase
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
$sender2 = $this->getMockBuilder(SenderInterface::class)->getMock();
$middleware = new SendMessageMiddleware(new SendersLocator([
DummyMessage::class => ['foo' => $sender, 'bar' => $sender2],
]));
$sendersLocator = $this->createSendersLocator([DummyMessage::class => ['foo', 'bar']], ['foo' => $sender, 'bar' => $sender2]);
$middleware = new SendMessageMiddleware($sendersLocator);
$sender->expects($this->once())
->method('send')
@ -92,12 +93,16 @@ class SendMessageMiddlewareTest extends MiddlewareTestCase
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
$sender2 = $this->getMockBuilder(SenderInterface::class)->getMock();
$middleware = new SendMessageMiddleware(new SendersLocator([
DummyMessage::class => ['foo' => $sender, 'bar' => $sender2],
], [
// normally, this class sends and handles (but not on retry)
DummyMessage::class => true,
]));
$sendersLocator = $this->createSendersLocator(
[DummyMessage::class => ['foo', 'bar']],
['foo' => $sender, 'bar' => $sender2],
[
// normally, this class sends and handles (but not on retry)
DummyMessage::class => true,
]
);
$middleware = new SendMessageMiddleware($sendersLocator);
$sender->expects($this->never())
->method('send')
@ -116,9 +121,10 @@ class SendMessageMiddlewareTest extends MiddlewareTestCase
$envelope = new Envelope(new ChildDummyMessage('Hey'));
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
$middleware = new SendMessageMiddleware(new SendersLocator([DummyMessage::class => [$sender]]));
$sendersLocator = $this->createSendersLocator([DummyMessage::class => ['foo_sender']], ['foo_sender' => $sender]);
$middleware = new SendMessageMiddleware($sendersLocator);
$sender->expects($this->once())->method('send')->with($envelope->with(new SentStamp(\get_class($sender))))->willReturn($envelope);
$sender->expects($this->once())->method('send')->with($envelope->with(new SentStamp(\get_class($sender), 'foo_sender')))->willReturn($envelope);
$middleware->handle($envelope, $this->getStackMock(false));
}
@ -129,11 +135,12 @@ class SendMessageMiddlewareTest extends MiddlewareTestCase
$envelope = new Envelope($message);
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
$middleware = new SendMessageMiddleware(new SendersLocator(['*' => [$sender]], [
$sendersLocator = $this->createSendersLocator(['*' => ['foo_sender']], ['foo_sender' => $sender], [
DummyMessage::class => true,
]));
]);
$middleware = new SendMessageMiddleware($sendersLocator);
$sender->expects($this->once())->method('send')->with($envelope->with(new SentStamp(\get_class($sender))))->willReturn($envelope);
$sender->expects($this->once())->method('send')->with($envelope->with(new SentStamp(\get_class($sender), 'foo_sender')))->willReturn($envelope);
$middleware->handle($envelope, $this->getStackMock());
}
@ -144,11 +151,12 @@ class SendMessageMiddlewareTest extends MiddlewareTestCase
$envelope = new Envelope($message);
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
$middleware = new SendMessageMiddleware(new SendersLocator(['*' => [$sender]], [
$sendersLocator = $this->createSendersLocator(['*' => ['foo_sender']], ['foo_sender' => $sender], [
DummyMessage::class => true,
]));
]);
$middleware = new SendMessageMiddleware($sendersLocator);
$sender->expects($this->once())->method('send')->with($envelope->with(new SentStamp(\get_class($sender))))->willReturn($envelope);
$sender->expects($this->once())->method('send')->with($envelope->with(new SentStamp(\get_class($sender), 'foo_sender')))->willReturn($envelope);
$middleware->handle($envelope, $this->getStackMock());
}
@ -159,11 +167,12 @@ class SendMessageMiddlewareTest extends MiddlewareTestCase
$envelope = new Envelope($message);
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
$middleware = new SendMessageMiddleware(new SendersLocator(['*' => [$sender]], [
$sendersLocator = $this->createSendersLocator(['*' => ['foo_sender']], ['foo_sender' => $sender], [
DummyMessageInterface::class => true,
]));
]);
$middleware = new SendMessageMiddleware($sendersLocator);
$sender->expects($this->once())->method('send')->with($envelope->with(new SentStamp(\get_class($sender))))->willReturn($envelope);
$sender->expects($this->once())->method('send')->with($envelope->with(new SentStamp(\get_class($sender), 'foo_sender')))->willReturn($envelope);
$middleware->handle($envelope, $this->getStackMock());
}
@ -174,11 +183,12 @@ class SendMessageMiddlewareTest extends MiddlewareTestCase
$envelope = new Envelope($message);
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
$middleware = new SendMessageMiddleware(new SendersLocator(['*' => [$sender]], [
$sendersLocator = $this->createSendersLocator(['*' => ['foo_sender']], ['foo_sender' => $sender], [
'*' => true,
]));
]);
$middleware = new SendMessageMiddleware($sendersLocator);
$sender->expects($this->once())->method('send')->with($envelope->with(new SentStamp(\get_class($sender))))->willReturn($envelope);
$sender->expects($this->once())->method('send')->with($envelope->with(new SentStamp(\get_class($sender), 'foo_sender')))->willReturn($envelope);
$middleware->handle($envelope, $this->getStackMock());
}
@ -188,7 +198,7 @@ class SendMessageMiddlewareTest extends MiddlewareTestCase
$message = new DummyMessage('Hey');
$envelope = new Envelope($message);
$middleware = new SendMessageMiddleware(new SendersLocator([]));
$middleware = new SendMessageMiddleware($this->createSendersLocator([], []));
$middleware->handle($envelope, $this->getStackMock());
}
@ -199,7 +209,8 @@ class SendMessageMiddlewareTest extends MiddlewareTestCase
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
$middleware = new SendMessageMiddleware(new SendersLocator(['*' => [$sender]]));
$sendersLocator = $this->createSendersLocator(['*' => ['foo']], ['foo' => $sender]);
$middleware = new SendMessageMiddleware($sendersLocator);
$sender->expects($this->never())->method('send');
@ -220,7 +231,8 @@ class SendMessageMiddlewareTest extends MiddlewareTestCase
$sender1 = $this->getMockBuilder(SenderInterface::class)->getMock();
$sender2 = $this->getMockBuilder(SenderInterface::class)->getMock();
$middleware = new SendMessageMiddleware(new SendersLocator([DummyMessage::class => [$sender1, $sender2]]), $dispatcher);
$sendersLocator = $this->createSendersLocator([DummyMessage::class => ['foo', 'bar']], ['foo' => $sender1, 'bar' => $sender2]);
$middleware = new SendMessageMiddleware($sendersLocator, $dispatcher);
$sender1->expects($this->once())->method('send')->willReturn($envelope);
$sender2->expects($this->once())->method('send')->willReturn($envelope);
@ -235,7 +247,7 @@ class SendMessageMiddlewareTest extends MiddlewareTestCase
$dispatcher = $this->createMock(EventDispatcherInterface::class);
$dispatcher->expects($this->never())->method('dispatch');
$middleware = new SendMessageMiddleware(new SendersLocator([]), $dispatcher);
$middleware = new SendMessageMiddleware($this->createSendersLocator([], []), $dispatcher);
$middleware->handle($envelope, $this->getStackMock());
}
@ -249,8 +261,11 @@ class SendMessageMiddlewareTest extends MiddlewareTestCase
$dispatcher->expects($this->never())->method('dispatch');
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
$sender2 = $this->getMockBuilder(SenderInterface::class)->getMock();
$sender2->expects($this->once())->method('send')->willReturn(new Envelope(new \stdClass()));
$middleware = new SendMessageMiddleware(new SendersLocator([DummyMessage::class => [$sender]]), $dispatcher);
$sendersLocator = $this->createSendersLocator([DummyMessage::class => ['foo']], ['foo' => $sender, 'foo_sender' => $sender2]);
$middleware = new SendMessageMiddleware($sendersLocator, $dispatcher);
$middleware->handle($envelope, $this->getStackMock(false));
}
@ -263,9 +278,27 @@ class SendMessageMiddlewareTest extends MiddlewareTestCase
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
$sender->expects($this->once())->method('send')->willReturn($envelope);
$middleware = new SendMessageMiddleware(new SendersLocator([DummyMessage::class => [$sender]]));
$sendersLocator = $this->createSendersLocator([DummyMessage::class => ['foo']], ['foo' => $sender]);
$middleware = new SendMessageMiddleware($sendersLocator);
// next handler *should* be called
$middleware->handle($envelope, $this->getStackMock(true));
}
private function createSendersLocator(array $sendersMap, array $senders, array $sendAndHandle = [])
{
$container = $this->createMock(ContainerInterface::class);
$container->expects($this->any())
->method('has')
->willReturnCallback(function ($id) use ($senders) {
return isset($senders[$id]);
});
$container->expects($this->any())
->method('get')
->willReturnCallback(function ($id) use ($senders) {
return $senders[$id];
});
return new SendersLocator($sendersMap, $container, $sendAndHandle);
}
}

View File

@ -26,6 +26,16 @@ class MultiplierRetryStrategyTest extends TestCase
$this->assertTrue($strategy->isRetryable($envelope));
}
public function testIsRetryableWithNullMax()
{
$strategy = new MultiplierRetryStrategy(null);
$envelope = new Envelope(new \stdClass(), [new RedeliveryStamp(0, 'sender_alias')]);
$this->assertTrue($strategy->isRetryable($envelope));
$envelope = new Envelope(new \stdClass(), [new RedeliveryStamp(1, 'sender_alias')]);
$this->assertTrue($strategy->isRetryable($envelope));
}
public function testIsNotRetryable()
{
$strategy = new MultiplierRetryStrategy(3);
@ -34,6 +44,13 @@ class MultiplierRetryStrategyTest extends TestCase
$this->assertFalse($strategy->isRetryable($envelope));
}
public function testIsNotRetryableWithZeroMax()
{
$strategy = new MultiplierRetryStrategy(0);
$envelope = new Envelope(new \stdClass(), [new RedeliveryStamp(0, 'sender_alias')]);
$this->assertFalse($strategy->isRetryable($envelope));
}
public function testIsRetryableWithNoStamp()
{
$strategy = new MultiplierRetryStrategy(3);

View File

@ -12,6 +12,7 @@
namespace Symfony\Component\Messenger\Tests;
use PHPUnit\Framework\TestCase;
use Psr\Container\ContainerInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Handler\HandlerDescriptor;
use Symfony\Component\Messenger\Handler\HandlersLocator;
@ -19,10 +20,10 @@ use Symfony\Component\Messenger\MessageBus;
use Symfony\Component\Messenger\Middleware\HandleMessageMiddleware;
use Symfony\Component\Messenger\Middleware\SendMessageMiddleware;
use Symfony\Component\Messenger\Retry\MultiplierRetryStrategy;
use Symfony\Component\Messenger\Stamp\SentStamp;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessageHandlerFailingFirstTimes;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
use Symfony\Component\Messenger\Transport\Sender\SendersLocator;
use Symfony\Component\Messenger\Worker;
@ -30,20 +31,15 @@ class RetryIntegrationTest extends TestCase
{
public function testRetryMechanism()
{
$apiMessage = new DummyMessage('API');
$senderAndReceiver = new DummySenderAndReceiver();
$receiver = $this->createMock(ReceiverInterface::class);
$receiver->method('get')
->willReturn([
new Envelope($apiMessage, [
new SentStamp('Some\Sender', 'sender_alias'),
]),
]);
$senderLocator = $this->createMock(ContainerInterface::class);
$senderLocator->method('has')->with('sender_alias')->willReturn(true);
$senderLocator->method('get')->with('sender_alias')->willReturn($senderAndReceiver);
$senderLocator = new SendersLocator([DummyMessage::class => ['sender_alias']], $senderLocator);
$senderLocator = new SendersLocator([], ['*' => true]);
$handler = new DummyMessageHandlerFailingFirstTimes();
$throwingHandler = new DummyMessageHandlerFailingFirstTimes(1);
$handler = new DummyMessageHandlerFailingFirstTimes(0, 'A');
$throwingHandler = new DummyMessageHandlerFailingFirstTimes(1, 'B');
$handlerLocator = new HandlersLocator([
DummyMessage::class => [
new HandlerDescriptor($handler, ['alias' => 'first']),
@ -51,14 +47,54 @@ class RetryIntegrationTest extends TestCase
],
]);
// dispatch the message, which will get "sent" and then received by DummySenderAndReceiver
$bus = new MessageBus([new SendMessageMiddleware($senderLocator), new HandleMessageMiddleware($handlerLocator)]);
$envelope = new Envelope(new DummyMessage('API'));
$bus->dispatch($envelope);
$worker = new Worker(['receiverName' => $receiver], $bus, ['receiverName' => new MultiplierRetryStrategy()]);
$worker->run([], function () use ($worker) {
$worker->stop();
$worker = new Worker(['receiverName' => $senderAndReceiver], $bus, ['receiverName' => new MultiplierRetryStrategy()]);
$worker->run([], function (?Envelope $envelope) use ($worker) {
if (null === $envelope) {
$worker->stop();
}
});
$this->assertSame(1, $handler->getTimesCalledWithoutThrowing());
$this->assertSame(1, $throwingHandler->getTimesCalledWithoutThrowing());
}
}
class DummySenderAndReceiver implements ReceiverInterface, SenderInterface
{
private $messagesWaiting = [];
private $messagesReceived = [];
public function get(): iterable
{
$message = array_shift($this->messagesWaiting);
if (null === $message) {
return [];
}
$this->messagesReceived[] = $message;
return [$message];
}
public function ack(Envelope $envelope): void
{
}
public function reject(Envelope $envelope): void
{
}
public function send(Envelope $envelope): Envelope
{
$this->messagesWaiting[] = $envelope;
return $envelope;
}
}

View File

@ -16,22 +16,6 @@ use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
class RedeliveryStampTest extends TestCase
{
public function testShouldRedeliverToSenderWithAlias()
{
$stamp = new RedeliveryStamp(5, 'foo_alias');
$this->assertFalse($stamp->shouldRedeliverToSender('Foo\Bar\Sender', 'bar_alias'));
$this->assertTrue($stamp->shouldRedeliverToSender('Foo\Bar\Sender', 'foo_alias'));
}
public function testShouldRedeliverToSenderWithoutAlias()
{
$stampToRedeliverToSender1 = new RedeliveryStamp(5, 'App\Sender1');
$this->assertTrue($stampToRedeliverToSender1->shouldRedeliverToSender('App\Sender1', null));
$this->assertFalse($stampToRedeliverToSender1->shouldRedeliverToSender('App\Sender2', null));
}
public function testGetters()
{
$stamp = new RedeliveryStamp(10, 'sender_alias');

View File

@ -0,0 +1,33 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Tests\Stamp;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Debug\Exception\FlattenException;
use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
class SentToFailureTransportStampTest extends TestCase
{
public function testGetters()
{
$flattenException = new FlattenException();
$stamp = new SentToFailureTransportStamp(
'exception message',
'original_receiver',
$flattenException
);
$this->assertSame('exception message', $stamp->getExceptionMessage());
$this->assertSame('original_receiver', $stamp->getOriginalReceiverName());
$this->assertSame($flattenException, $stamp->getFlattenException());
$this->assertInstanceOf(\DateTime::class, $stamp->getSentAt());
}
}

View File

@ -12,9 +12,12 @@
namespace Symfony\Component\Messenger\Tests\Transport\Doctrine;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\Doctrine\Connection;
use Symfony\Component\Messenger\Transport\Doctrine\DoctrineReceivedStamp;
use Symfony\Component\Messenger\Transport\Doctrine\DoctrineReceiver;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
@ -28,14 +31,26 @@ class DoctrineReceiverTest extends TestCase
{
$serializer = $this->createSerializer();
$doctrineEnvelop = $this->createDoctrineEnvelope();
$doctrineEnvelope = $this->createDoctrineEnvelope();
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
$connection->method('get')->willReturn($doctrineEnvelop);
$connection->method('get')->willReturn($doctrineEnvelope);
$receiver = new DoctrineReceiver($connection, $serializer);
$actualEnvelopes = iterator_to_array($receiver->get());
$this->assertCount(1, $actualEnvelopes);
/** @var Envelope $actualEnvelope */
$actualEnvelope = $actualEnvelopes[0];
$this->assertEquals(new DummyMessage('Hi'), $actualEnvelopes[0]->getMessage());
/** @var DoctrineReceivedStamp $doctrineReceivedStamp */
$doctrineReceivedStamp = $actualEnvelope->last(DoctrineReceivedStamp::class);
$this->assertNotNull($doctrineReceivedStamp);
$this->assertSame('1', $doctrineReceivedStamp->getId());
/** @var TransportMessageIdStamp $transportMessageIdStamp */
$transportMessageIdStamp = $actualEnvelope->last(TransportMessageIdStamp::class);
$this->assertNotNull($transportMessageIdStamp);
$this->assertSame(1, $transportMessageIdStamp->getId());
}
/**
@ -55,6 +70,34 @@ class DoctrineReceiverTest extends TestCase
iterator_to_array($receiver->get());
}
public function testAll()
{
$serializer = $this->createSerializer();
$doctrineEnvelope1 = $this->createDoctrineEnvelope();
$doctrineEnvelope2 = $this->createDoctrineEnvelope();
$connection = $this->createMock(Connection::class);
$connection->method('findAll')->with(50)->willReturn([$doctrineEnvelope1, $doctrineEnvelope2]);
$receiver = new DoctrineReceiver($connection, $serializer);
$actualEnvelopes = \iterator_to_array($receiver->all(50));
$this->assertCount(2, $actualEnvelopes);
$this->assertEquals(new DummyMessage('Hi'), $actualEnvelopes[0]->getMessage());
}
public function testFind()
{
$serializer = $this->createSerializer();
$doctrineEnvelope = $this->createDoctrineEnvelope();
$connection = $this->createMock(Connection::class);
$connection->method('find')->with(10)->willReturn($doctrineEnvelope);
$receiver = new DoctrineReceiver($connection, $serializer);
$actualEnvelope = $receiver->find(10);
$this->assertEquals(new DummyMessage('Hi'), $actualEnvelope->getMessage());
}
private function createDoctrineEnvelope()
{
return [

View File

@ -14,6 +14,7 @@ namespace Symfony\Component\Messenger\Tests\Transport\Doctrine;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Stamp\DelayStamp;
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\Doctrine\Connection;
use Symfony\Component\Messenger\Transport\Doctrine\DoctrineSender;
@ -29,13 +30,18 @@ class DoctrineSenderTest extends TestCase
$connection = $this->getMockBuilder(Connection::class)
->disableOriginalConstructor()
->getMock();
$connection->expects($this->once())->method('send')->with($encoded['body'], $encoded['headers']);
$connection->expects($this->once())->method('send')->with($encoded['body'], $encoded['headers'])->willReturn(15);
$serializer = $this->getMockBuilder(SerializerInterface::class)->getMock();
$serializer->method('encode')->with($envelope)->willReturnOnConsecutiveCalls($encoded);
$sender = new DoctrineSender($connection, $serializer);
$sender->send($envelope);
$actualEnvelope = $sender->send($envelope);
/** @var TransportMessageIdStamp $transportMessageIdStamp */
$transportMessageIdStamp = $actualEnvelope->last(TransportMessageIdStamp::class);
$this->assertNotNull($transportMessageIdStamp);
$this->assertSame('15', $transportMessageIdStamp->getId());
}
public function testSendWithDelay()

View File

@ -0,0 +1,50 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Tests\Transport\Sender;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Tests\Fixtures\SecondMessage;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\Messenger\Transport\Receiver\SingleMessageReceiver;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
use Symfony\Component\Messenger\Transport\Sender\SendersLocator;
class SingleMessageReceiverTest extends TestCase
{
public function testItReceivesOnlyOneMessage()
{
$innerReceiver = $this->createMock(ReceiverInterface::class);
$envelope = new Envelope(new \stdClass());
$receiver = new SingleMessageReceiver($innerReceiver, $envelope);
$received = \iterator_to_array($receiver->get());
$this->assertCount(1, $received);
$this->assertSame($received[0], $envelope);
$this->assertEmpty(\iterator_to_array($receiver->get()));
}
public function testCallsAreForwarded()
{
$envelope = new Envelope(new \stdClass());
$innerReceiver = $this->createMock(ReceiverInterface::class);
$innerReceiver->expects($this->once())->method('ack')->with($envelope);
$innerReceiver->expects($this->once())->method('reject')->with($envelope);
$receiver = new SingleMessageReceiver($innerReceiver, $envelope);
$receiver->ack($envelope);
$receiver->reject($envelope);
}
}

View File

@ -12,7 +12,9 @@
namespace Symfony\Component\Messenger\Tests\Transport\Sender;
use PHPUnit\Framework\TestCase;
use Psr\Container\ContainerInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\UnknownSenderException;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Tests\Fixtures\SecondMessage;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
@ -21,23 +23,87 @@ use Symfony\Component\Messenger\Transport\Sender\SendersLocator;
class SendersLocatorTest extends TestCase
{
public function testItReturnsTheSenderBasedOnTheMessageClass()
{
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
$sendersLocator = $this->createContainer([
'my_sender' => $sender,
]);
$locator = new SendersLocator([
DummyMessage::class => ['my_sender'],
], $sendersLocator);
$this->assertSame(['my_sender' => $sender], iterator_to_array($locator->getSenders(new Envelope(new DummyMessage('a')))));
$this->assertSame([], iterator_to_array($locator->getSenders(new Envelope(new SecondMessage()))));
}
public function testGetSenderByAlias()
{
$sender1 = $this->getMockBuilder(SenderInterface::class)->getMock();
$sender2 = $this->getMockBuilder(SenderInterface::class)->getMock();
$sendersLocator = $this->createContainer([
'sender1' => $sender1,
'sender2' => $sender2,
]);
$locator = new SendersLocator([], $sendersLocator);
$this->assertSame($sender1, $locator->getSenderByAlias('sender1'));
$this->assertSame($sender2, $locator->getSenderByAlias('sender2'));
}
public function testGetSenderByAliasThrowsException()
{
$this->expectException(UnknownSenderException::class);
$this->expectExceptionMessage('Unknown sender alias');
$sender1 = $this->getMockBuilder(SenderInterface::class)->getMock();
$sendersLocator = $this->createContainer([
'sender1' => $sender1,
]);
$locator = new SendersLocator([], $sendersLocator);
$locator->getSenderByAlias('sender2');
}
/**
* @group legacy
*/
public function testItReturnsTheSenderBasedOnTheMessageClassLegacy()
{
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
$locator = new SendersLocator([
DummyMessage::class => [$sender],
]);
$this->assertSame([$sender], iterator_to_array($locator->getSenders(new Envelope(new DummyMessage('a')))));
$this->assertSame([], iterator_to_array($locator->getSenders(new Envelope(new SecondMessage()))));
}
public function testItYieldsProvidedSenderAliasAsKey()
/**
* @group legacy
*/
public function testItYieldsProvidedSenderAliasAsKeyLegacy()
{
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
$locator = new SendersLocator([
DummyMessage::class => ['dummy' => $sender],
]);
$this->assertSame(['dummy' => $sender], iterator_to_array($locator->getSenders(new Envelope(new DummyMessage('a')))));
}
private function createContainer(array $senders)
{
$container = $this->createMock(ContainerInterface::class);
$container->expects($this->any())
->method('has')
->willReturnCallback(function($id) use ($senders) {
return isset($senders[$id]);
});
$container->expects($this->any())
->method('get')
->willReturnCallback(function($id) use ($senders) {
return $senders[$id];
});
return $container;
}
}

View File

@ -97,7 +97,7 @@ class WorkerTest extends TestCase
$this->assertNotNull($redeliveryStamp);
// retry count now at 1
$this->assertSame(1, $redeliveryStamp->getRetryCount());
$this->assertTrue($redeliveryStamp->shouldRedeliverToSender('Some\Sender', 'sender_alias'));
$this->assertSame('sender_alias', $redeliveryStamp->getSenderClassOrAlias());
// received stamp is removed
$this->assertNull($envelope->last(ReceivedStamp::class));

View File

@ -110,7 +110,11 @@ class AmqpReceiver implements ReceiverInterface, MessageCountAwareInterface
*/
public function getMessageCount(): int
{
return $this->connection->countMessagesInQueues();
try {
return $this->connection->countMessagesInQueues();
} catch (\AMQPException $exception) {
throw new TransportException($exception->getMessage(), 0, $exception);
}
}
private function rejectAmqpEnvelope(\AMQPEnvelope $amqpEnvelope, string $queueName): void

View File

@ -102,9 +102,10 @@ class Connection
/**
* @param int $delay The delay in milliseconds
*
* @return string The inserted id
* @throws \Doctrine\DBAL\DBALException
*/
public function send(string $body, array $headers, int $delay = 0): void
public function send(string $body, array $headers, int $delay = 0): string
{
$now = new \DateTime();
$availableAt = (clone $now)->modify(sprintf('+%d seconds', $delay / 1000));
@ -126,6 +127,8 @@ class Connection
':created_at' => self::formatDateTime($now),
':available_at' => self::formatDateTime($availableAt),
]);
return $this->driverConnection->lastInsertId();
}
public function get(): ?array
@ -205,14 +208,42 @@ class Connection
return $this->executeQuery($queryBuilder->getSQL(), $queryBuilder->getParameters())->fetchColumn();
}
public function findAll(int $limit = null): array
{
if ($this->configuration['auto_setup']) {
$this->setup();
}
$queryBuilder = $this->createAvailableMessagesQueryBuilder();
if (null !== $limit) {
$queryBuilder->setMaxResults($limit);
}
return $this->executeQuery($queryBuilder->getSQL(), $queryBuilder->getParameters())->fetchAll();
}
public function find($id): ?array
{
if ($this->configuration['auto_setup']) {
$this->setup();
}
$queryBuilder = $this->createQueryBuilder()
->where('m.id = :id');
$data = $this->executeQuery($queryBuilder->getSQL(), [
'id' => $id,
])->fetch();
return false === $data ? null : $data;
}
private function createAvailableMessagesQueryBuilder(): QueryBuilder
{
$now = new \DateTime();
$redeliverLimit = (clone $now)->modify(sprintf('-%d seconds', $this->configuration['redeliver_timeout']));
return $this->driverConnection->createQueryBuilder()
->select('m.*')
->from($this->configuration['table_name'], 'm')
return $this->createQueryBuilder()
->where('m.delivered_at is null OR m.delivered_at < :redeliver_limit')
->andWhere('m.available_at <= :now')
->andWhere('m.queue_name = :queue_name')
@ -223,6 +254,13 @@ class Connection
]);
}
private function createQueryBuilder(): QueryBuilder
{
return $this->driverConnection->createQueryBuilder()
->select('m.*')
->from($this->configuration['table_name'], 'm');
}
private function executeQuery(string $sql, array $parameters = [])
{
$stmt = null;

View File

@ -16,6 +16,8 @@ use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\LogicException;
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
use Symfony\Component\Messenger\Exception\TransportException;
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
@ -26,7 +28,7 @@ use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
*
* @experimental in 4.3
*/
class DoctrineReceiver implements ReceiverInterface, MessageCountAwareInterface
class DoctrineReceiver implements ReceiverInterface, MessageCountAwareInterface, ListableReceiverInterface
{
private $connection;
private $serializer;
@ -52,18 +54,7 @@ class DoctrineReceiver implements ReceiverInterface, MessageCountAwareInterface
return [];
}
try {
$envelope = $this->serializer->decode([
'body' => $doctrineEnvelope['body'],
'headers' => $doctrineEnvelope['headers'],
]);
} catch (MessageDecodingFailedException $exception) {
$this->connection->reject($doctrineEnvelope['id']);
throw $exception;
}
yield $envelope->with(new DoctrineReceivedStamp($doctrineEnvelope['id']));
yield $this->createEnvelopeFromData($doctrineEnvelope);
}
/**
@ -71,7 +62,11 @@ class DoctrineReceiver implements ReceiverInterface, MessageCountAwareInterface
*/
public function ack(Envelope $envelope): void
{
$this->connection->ack($this->findDoctrineReceivedStamp($envelope)->getId());
try {
$this->connection->ack($this->findDoctrineReceivedStamp($envelope)->getId());
} catch (DBALException $exception) {
throw new TransportException($exception->getMessage(), 0, $exception);
}
}
/**
@ -79,7 +74,11 @@ class DoctrineReceiver implements ReceiverInterface, MessageCountAwareInterface
*/
public function reject(Envelope $envelope): void
{
$this->connection->reject($this->findDoctrineReceivedStamp($envelope)->getId());
try {
$this->connection->reject($this->findDoctrineReceivedStamp($envelope)->getId());
} catch (DBALException $exception) {
throw new TransportException($exception->getMessage(), 0, $exception);
}
}
/**
@ -87,7 +86,45 @@ class DoctrineReceiver implements ReceiverInterface, MessageCountAwareInterface
*/
public function getMessageCount(): int
{
return $this->connection->getMessageCount();
try {
return $this->connection->getMessageCount();
} catch (DBALException $exception) {
throw new TransportException($exception->getMessage(), 0, $exception);
}
}
/**
* {@inheritdoc}
*/
public function all(int $limit = null): iterable
{
try {
$doctrineEnvelopes = $this->connection->findAll($limit);
} catch (DBALException $exception) {
throw new TransportException($exception->getMessage(), 0, $exception);
}
foreach ($doctrineEnvelopes as $doctrineEnvelope) {
yield $this->createEnvelopeFromData($doctrineEnvelope);
}
}
/**
* {@inheritdoc}
*/
public function find($id): ?Envelope
{
try {
$doctrineEnvelope = $this->connection->find($id);
} catch (DBALException $exception) {
throw new TransportException($exception->getMessage(), 0, $exception);
}
if (null === $doctrineEnvelope) {
return null;
}
return $this->createEnvelopeFromData($doctrineEnvelope);
}
private function findDoctrineReceivedStamp(Envelope $envelope): DoctrineReceivedStamp
@ -101,4 +138,23 @@ class DoctrineReceiver implements ReceiverInterface, MessageCountAwareInterface
return $doctrineReceivedStamp;
}
private function createEnvelopeFromData(array $data): Envelope
{
try {
$envelope = $this->serializer->decode([
'body' => $data['body'],
'headers' => $data['headers'],
]);
} catch (MessageDecodingFailedException $exception) {
$this->connection->reject($data['id']);
throw $exception;
}
return $envelope->with(
new DoctrineReceivedStamp($data['id']),
new TransportMessageIdStamp($data['id'])
);
}
}

View File

@ -15,6 +15,7 @@ use Doctrine\DBAL\DBALException;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\TransportException;
use Symfony\Component\Messenger\Stamp\DelayStamp;
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
@ -47,11 +48,11 @@ class DoctrineSender implements SenderInterface
$delay = null !== $delayStamp ? $delayStamp->getDelay() : 0;
try {
$this->connection->send($encodedMessage['body'], $encodedMessage['headers'] ?? [], $delay);
$id = $this->connection->send($encodedMessage['body'], $encodedMessage['headers'] ?? [], $delay);
} catch (DBALException $exception) {
throw new TransportException($exception->getMessage(), 0, $exception);
}
return $envelope;
return $envelope->with(new TransportMessageIdStamp($id));
}
}

View File

@ -12,6 +12,8 @@
namespace Symfony\Component\Messenger\Transport\Doctrine;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\SetupableTransportInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;
@ -21,7 +23,7 @@ use Symfony\Component\Messenger\Transport\TransportInterface;
*
* @experimental in 4.3
*/
class DoctrineTransport implements TransportInterface, SetupableTransportInterface
class DoctrineTransport implements TransportInterface, SetupableTransportInterface, MessageCountAwareInterface, ListableReceiverInterface
{
private $connection;
private $serializer;
@ -58,6 +60,31 @@ class DoctrineTransport implements TransportInterface, SetupableTransportInterfa
($this->receiver ?? $this->getReceiver())->reject($envelope);
}
/**
* {@inheritdoc}
*/
public function getMessageCount(): int
{
return ($this->receiver ?? $this->getReceiver())->getMessageCount();
}
/**
* {@inheritdoc}
*/
public function all(int $limit = null): iterable
{
return ($this->receiver ?? $this->getReceiver())->all($limit);
}
/**
* {@inheritdoc}
*/
public function find($id): ?Envelope
{
return ($this->receiver ?? $this->getReceiver())->find($id);
}
/**
* {@inheritdoc}
*/

View File

@ -0,0 +1,33 @@
<?php
namespace Symfony\Component\Messenger\Transport\Receiver;
use Symfony\Component\Messenger\Envelope;
/**
* Used when a receiver has the ability to list messages and find specific messages.
* A receiver that implements this should add the TransportMessageIdStamp
* to the Envelopes that it returns.
*
* @author Ryan Weaver <ryan@symfonycasts.com>
*
* @experimental in 4.3
*/
interface ListableReceiverInterface extends ReceiverInterface
{
/**
* Returns all the messages (up to the limit) in this receiver.
*
* Messages should be given the same stamps as when using ReceiverInterface::get().
*
* @return Envelope[]|iterable
*/
public function all(int $limit = null): iterable;
/**
* Returns the Envelope by id or none.
*
* Message should be given the same stamps as when using ReceiverInterface::get().
*/
public function find($id): ?Envelope;
}

View File

@ -35,6 +35,8 @@ interface ReceiverInterface
* call to get() takes a long time, blocking other receivers from
* being called.
*
* If applicable, the Envelope should contain a TransportMessageIdStamp.
*
* If a received message cannot be decoded, the message should not
* be retried again (e.g. if there's a queue, it should be removed)
* and a MessageDecodingFailedException should be thrown.

View File

@ -0,0 +1,47 @@
<?php
namespace Symfony\Component\Messenger\Transport\Receiver;
use Symfony\Component\Messenger\Envelope;
/**
* Receiver that decorates another, but receives only 1 specific message.
*
* @author Ryan Weaver <ryan@symfonycasts.com>
*
* @internal
* @experimental in 4.3
*/
class SingleMessageReceiver implements ReceiverInterface
{
private $receiver;
private $envelope;
private $hasReceived = false;
public function __construct(ReceiverInterface $receiver, Envelope $envelope)
{
$this->receiver = $receiver;
$this->envelope = $envelope;
}
public function get(): iterable
{
if ($this->hasReceived) {
return [];
}
$this->hasReceived = true;
yield $this->envelope;
}
public function ack(Envelope $envelope): void
{
$this->receiver->ack($envelope);
}
public function reject(Envelope $envelope): void
{
$this->receiver->reject($envelope);
}
}

View File

@ -25,6 +25,8 @@ interface SenderInterface
*
* The sender can read different stamps for transport configuration,
* like delivery delay.
*
* If applicable, the returned Envelope should contain a TransportMessageIdStamp.
*/
public function send(Envelope $envelope): Envelope;
}

View File

@ -11,7 +11,11 @@
namespace Symfony\Component\Messenger\Transport\Sender;
use Psr\Container\ContainerInterface;
use Symfony\Component\DependencyInjection\ServiceLocator;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\RuntimeException;
use Symfony\Component\Messenger\Exception\UnknownSenderException;
use Symfony\Component\Messenger\Handler\HandlersLocator;
/**
@ -23,17 +27,30 @@ use Symfony\Component\Messenger\Handler\HandlersLocator;
*/
class SendersLocator implements SendersLocatorInterface
{
private $senders;
private $sendersMap;
private $sendersLocator;
private $useLegacyLookup = false;
private $sendAndHandle;
/**
* @param SenderInterface[][] $senders
* @param bool[] $sendAndHandle
* @param string[][] $sendersMap An array, keyed by "type", set to an array of sender aliases
* @param ContainerInterface $sendersLocator Locator of senders, keyed by sender alias
* @param bool[] $sendAndHandle
*/
public function __construct(array $senders, array $sendAndHandle = [])
public function __construct(array $sendersMap, /*ContainerInterface*/ $sendersLocator = null, array $sendAndHandle = [])
{
$this->senders = $senders;
$this->sendAndHandle = $sendAndHandle;
$this->sendersMap = $sendersMap;
if (is_array($sendersLocator) || null === $sendersLocator) {
@trigger_error(sprintf('"%s::__construct()" requires a "%s" as 2nd argument. Not doing so is deprecated since Symfony 4.3 and will be required in 5.0.', __CLASS__, ContainerInterface::class), E_USER_DEPRECATED);
// "%s" requires a "%s" as 2nd argument. Not doing so is deprecated since Symfony 4.3 and will be required in 5.0.'
$this->sendersLocator = new ServiceLocator([]);
$this->sendAndHandle = $sendersLocator;
$this->useLegacyLookup = true;
} else {
$this->sendersLocator = $sendersLocator;
$this->sendAndHandle = $sendAndHandle;
}
}
/**
@ -46,14 +63,43 @@ class SendersLocator implements SendersLocatorInterface
$seen = [];
foreach (HandlersLocator::listTypes($envelope) as $type) {
foreach ($this->senders[$type] ?? [] as $alias => $sender) {
if (!\in_array($sender, $seen, true)) {
yield $alias => $seen[] = $sender;
// the old way of looking up senders
if ($this->useLegacyLookup) {
foreach ($this->sendersMap[$type] ?? [] as $alias => $sender) {
if (!\in_array($sender, $seen, true)) {
yield $alias => $seen[] = $sender;
}
}
$handle = $handle ?: $this->sendAndHandle[$type] ?? false;
continue;
}
foreach ($this->sendersMap[$type] ?? [] as $senderAlias) {
if (!\in_array($senderAlias, $seen, true)) {
if (!$this->sendersLocator->has($senderAlias)) {
throw new RuntimeException(sprintf('Invalid senders configuration: sender "%s" is not in the senders locator.', $senderAlias));
}
$seen[] = $senderAlias;
$sender = $this->sendersLocator->get($senderAlias);
yield $senderAlias => $sender;
}
}
$handle = $handle ?: $this->sendAndHandle[$type] ?? false;
}
$handle = $handle || null === $sender;
}
public function getSenderByAlias(string $alias): SenderInterface
{
if ($this->sendersLocator->has($alias)) {
return $this->sendersLocator->get($alias);
}
throw new UnknownSenderException(sprintf('Unknown sender alias "%s"', $alias));
}
}

View File

@ -12,6 +12,7 @@
namespace Symfony\Component\Messenger\Transport\Sender;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\UnknownSenderException;
/**
* Maps a message to a list of senders.
@ -32,4 +33,13 @@ interface SendersLocatorInterface
* @return iterable|SenderInterface[] Indexed by sender alias if available
*/
public function getSenders(Envelope $envelope, ?bool &$handle = false): iterable;
/**
* Returns a specific sender by its alias.
*
* @param string $alias The alias given to the sender in getSenders()
*
* @throws UnknownSenderException If the sender is not found
*/
public function getSenderByAlias(string $alias): SenderInterface;
}

View File

@ -47,7 +47,7 @@ class Worker implements WorkerInterface
* @param ReceiverInterface[] $receivers Where the key is the transport name
* @param RetryStrategyInterface[] $retryStrategies Retry strategies for each receiver (array keys must match)
*/
public function __construct(array $receivers, MessageBusInterface $bus, $retryStrategies = [], EventDispatcherInterface $eventDispatcher = null, LoggerInterface $logger = null)
public function __construct(array $receivers, MessageBusInterface $bus, array $retryStrategies = [], EventDispatcherInterface $eventDispatcher = null, LoggerInterface $logger = null)
{
$this->receivers = $receivers;
$this->bus = $bus;
@ -114,9 +114,14 @@ class Worker implements WorkerInterface
$this->dispatchEvent(new WorkerStoppedEvent());
}
private function handleMessage(Envelope $envelope, ReceiverInterface $receiver, string $transportName, ?RetryStrategyInterface $retryStrategy)
private function handleMessage(Envelope $envelope, ReceiverInterface $receiver, string $transportName, ?RetryStrategyInterface $retryStrategy): void
{
$this->dispatchEvent(new WorkerMessageReceivedEvent($envelope, $transportName));
$event = new WorkerMessageReceivedEvent($envelope, $transportName);
$this->dispatchEvent($event);
if (!$event->shouldHandle()) {
return;
}
$message = $envelope->getMessage();
$context = [
@ -148,7 +153,7 @@ class Worker implements WorkerInterface
// add the delay and retry stamp info + remove ReceivedStamp
$retryEnvelope = $envelope->with(new DelayStamp($retryStrategy->getWaitingTime($envelope)))
->with(new RedeliveryStamp($retryCount, $this->getSenderAlias($envelope)))
->with(new RedeliveryStamp($retryCount, $this->getSenderClassOrAlias($envelope)))
->withoutAll(ReceivedStamp::class);
// re-send the message
@ -199,6 +204,15 @@ class Worker implements WorkerInterface
return false;
}
$sentStamp = $envelope->last(SentStamp::class);
if (null === $sentStamp) {
if (null !== $this->logger) {
$this->logger->warning('Message will not be retried because the SentStamp is missing and so the target sender cannot be determined.');
}
return false;
}
return $retryStrategy->isRetryable($envelope);
}
@ -210,11 +224,16 @@ class Worker implements WorkerInterface
return $retryMessageStamp ? $retryMessageStamp->getRetryCount() : 0;
}
private function getSenderAlias(Envelope $envelope): ?string
private function getSenderClassOrAlias(Envelope $envelope): string
{
/** @var SentStamp|null $sentStamp */
$sentStamp = $envelope->last(SentStamp::class);
return $sentStamp ? $sentStamp->getSenderAlias() : null;
if (null === $sentStamp) {
// should not happen, because of the check in shouldRetry()
throw new LogicException('Could not find SentStamp.');
}
return $sentStamp->getSenderAlias() ?: $sentStamp->getSenderClass();
}
}

View File

@ -23,6 +23,7 @@
"doctrine/dbal": "^2.5",
"psr/cache": "~1.0",
"symfony/console": "~3.4|~4.0",
"symfony/debug": "~4.1",
"symfony/dependency-injection": "~3.4.19|^4.1.8",
"symfony/doctrine-bridge": "~3.4|~4.0",
"symfony/event-dispatcher": "~4.3",
@ -35,7 +36,8 @@
"symfony/var-dumper": "~3.4|~4.0"
},
"conflict": {
"symfony/event-dispatcher": "<4.3"
"symfony/event-dispatcher": "<4.3",
"symfony/debug": "<4.1"
},
"suggest": {
"enqueue/messenger-adapter": "For using the php-enqueue library as a transport."