[Messenger] Adding failure transport support
This commit is contained in:
parent
2dacfcade1
commit
36487e5f44
@ -1200,6 +1200,10 @@ class Configuration implements ConfigurationInterface
|
||||
->end()
|
||||
->end()
|
||||
->end()
|
||||
->scalarNode('failure_transport')
|
||||
->defaultNull()
|
||||
->info('Transport name to send failed messages to (after all retries have failed).')
|
||||
->end()
|
||||
->scalarNode('default_bus')->defaultNull()->end()
|
||||
->arrayNode('buses')
|
||||
->defaultValue(['messenger.bus.default' => ['default_middleware' => true, 'middleware' => []]])
|
||||
|
@ -40,9 +40,9 @@ use Symfony\Component\Config\ResourceCheckerInterface;
|
||||
use Symfony\Component\Console\Application;
|
||||
use Symfony\Component\Console\Command\Command;
|
||||
use Symfony\Component\DependencyInjection\Alias;
|
||||
use Symfony\Component\DependencyInjection\Argument\IteratorArgument;
|
||||
use Symfony\Component\DependencyInjection\Argument\ServiceClosureArgument;
|
||||
use Symfony\Component\DependencyInjection\ChildDefinition;
|
||||
use Symfony\Component\DependencyInjection\Compiler\ServiceLocatorTagPass;
|
||||
use Symfony\Component\DependencyInjection\ContainerBuilder;
|
||||
use Symfony\Component\DependencyInjection\ContainerInterface;
|
||||
use Symfony\Component\DependencyInjection\Definition;
|
||||
@ -284,6 +284,9 @@ class FrameworkExtension extends Extension
|
||||
$container->removeDefinition('console.command.messenger_debug');
|
||||
$container->removeDefinition('console.command.messenger_stop_workers');
|
||||
$container->removeDefinition('console.command.messenger_setup_transports');
|
||||
$container->removeDefinition('console.command.messenger_failed_messages_retry');
|
||||
$container->removeDefinition('console.command.messenger_failed_messages_show');
|
||||
$container->removeDefinition('console.command.messenger_failed_messages_remove');
|
||||
}
|
||||
|
||||
$propertyInfoEnabled = $this->isConfigEnabled($container, $config['property_info']);
|
||||
@ -1743,22 +1746,48 @@ class FrameworkExtension extends Extension
|
||||
if ('*' !== $message && !class_exists($message) && !interface_exists($message, false)) {
|
||||
throw new LogicException(sprintf('Invalid Messenger routing configuration: class or interface "%s" not found.', $message));
|
||||
}
|
||||
$senders = [];
|
||||
|
||||
// make sure senderAliases contains all senders
|
||||
foreach ($messageConfiguration['senders'] as $sender) {
|
||||
$senders[$sender] = new Reference($senderAliases[$sender] ?? $sender);
|
||||
if (!isset($senderAliases[$sender])) {
|
||||
$senderAliases[$sender] = $sender;
|
||||
}
|
||||
}
|
||||
|
||||
$messageToSendersMapping[$message] = new IteratorArgument($senders);
|
||||
$messageToSendersMapping[$message] = $messageConfiguration['senders'];
|
||||
$messagesToSendAndHandle[$message] = $messageConfiguration['send_and_handle'];
|
||||
}
|
||||
|
||||
$senderReferences = [];
|
||||
foreach ($senderAliases as $alias => $serviceId) {
|
||||
$senderReferences[$alias] = new Reference($serviceId);
|
||||
}
|
||||
|
||||
$container->getDefinition('messenger.senders_locator')
|
||||
->replaceArgument(0, $messageToSendersMapping)
|
||||
->replaceArgument(1, $messagesToSendAndHandle)
|
||||
->replaceArgument(1, ServiceLocatorTagPass::register($container, $senderReferences))
|
||||
->replaceArgument(2, $messagesToSendAndHandle)
|
||||
;
|
||||
|
||||
$container->getDefinition('messenger.retry_strategy_locator')
|
||||
->replaceArgument(0, $transportRetryReferences);
|
||||
|
||||
$container->getDefinition('messenger.failure.send_failed_message_to_failure_transport_listener')
|
||||
->replaceArgument(1, $config['failure_transport']);
|
||||
|
||||
if ($config['failure_transport']) {
|
||||
$container->getDefinition('console.command.messenger_failed_messages_retry')
|
||||
->replaceArgument(0, $config['failure_transport'])
|
||||
->replaceArgument(4, $transportRetryReferences[$config['failure_transport']] ?? null);
|
||||
$container->getDefinition('console.command.messenger_failed_messages_show')
|
||||
->replaceArgument(0, $config['failure_transport']);
|
||||
$container->getDefinition('console.command.messenger_failed_messages_remove')
|
||||
->replaceArgument(0, $config['failure_transport']);
|
||||
} else {
|
||||
$container->removeDefinition('console.command.messenger_failed_messages_retry');
|
||||
$container->removeDefinition('console.command.messenger_failed_messages_show');
|
||||
$container->removeDefinition('console.command.messenger_failed_messages_remove');
|
||||
}
|
||||
}
|
||||
|
||||
private function registerCacheConfiguration(array $config, ContainerBuilder $container)
|
||||
|
@ -114,6 +114,31 @@
|
||||
<tag name="console.command" command="messenger:stop-workers" />
|
||||
</service>
|
||||
|
||||
<service id="console.command.messenger_failed_messages_retry" class="Symfony\Component\Messenger\Command\FailedMessagesRetryCommand">
|
||||
<argument /> <!-- Receiver name -->
|
||||
<argument /> <!-- Receiver locator -->
|
||||
<argument type="service" id="messenger.routable_message_bus" />
|
||||
<argument type="service" id="event_dispatcher" />
|
||||
<argument /> <!-- Retry strategy -->
|
||||
<argument type="service" id="logger" />
|
||||
|
||||
<tag name="console.command" command="messenger:failed:retry" />
|
||||
</service>
|
||||
|
||||
<service id="console.command.messenger_failed_messages_show" class="Symfony\Component\Messenger\Command\FailedMessagesShowCommand">
|
||||
<argument /> <!-- Receiver name -->
|
||||
<argument /> <!-- Receiver locator -->
|
||||
|
||||
<tag name="console.command" command="messenger:failed:show" />
|
||||
</service>
|
||||
|
||||
<service id="console.command.messenger_failed_messages_remove" class="Symfony\Component\Messenger\Command\FailedMessagesRemoveCommand">
|
||||
<argument /> <!-- Receiver name -->
|
||||
<argument /> <!-- Receiver locator -->
|
||||
|
||||
<tag name="console.command" command="messenger:failed:remove" />
|
||||
</service>
|
||||
|
||||
<service id="console.command.router_debug" class="Symfony\Bundle\FrameworkBundle\Command\RouterDebugCommand">
|
||||
<argument type="service" id="router" />
|
||||
<argument type="service" id="debug.file_link_formatter" on-invalid="null" />
|
||||
|
@ -9,7 +9,8 @@
|
||||
|
||||
<!-- Asynchronous -->
|
||||
<service id="messenger.senders_locator" class="Symfony\Component\Messenger\Transport\Sender\SendersLocator">
|
||||
<argument type="collection" /> <!-- Per message sender iterators -->
|
||||
<argument type="collection" /> <!-- Per message senders map -->
|
||||
<argument /> <!-- senders locator -->
|
||||
<argument type="collection" /> <!-- Messages to send and handle -->
|
||||
</service>
|
||||
<service id="messenger.middleware.send_message" class="Symfony\Component\Messenger\Middleware\SendMessageMiddleware">
|
||||
@ -91,5 +92,19 @@
|
||||
<argument /> <!-- multiplier -->
|
||||
<argument /> <!-- max delay ms -->
|
||||
</service>
|
||||
|
||||
<!-- failed handling -->
|
||||
<service id="messenger.failure.send_failed_message_to_failure_transport_listener" class="Symfony\Component\Messenger\EventListener\SendFailedMessageToFailureTransportListener">
|
||||
<tag name="kernel.event_subscriber" />
|
||||
<tag name="monolog.logger" channel="messenger" />
|
||||
<argument type="service" id="messenger.routable_message_bus" />
|
||||
<argument /> <!-- Failure transport name -->
|
||||
<argument type="service" id="logger" on-invalid="ignore" />
|
||||
</service>
|
||||
|
||||
<!-- routable message bus -->
|
||||
<service id="messenger.routable_message_bus" class="Symfony\Component\Messenger\RoutableMessageBus">
|
||||
<argument /> <!-- Message bus locator -->
|
||||
</service>
|
||||
</services>
|
||||
</container>
|
||||
|
@ -328,6 +328,7 @@ class ConfigurationTest extends TestCase
|
||||
'enabled' => !class_exists(FullStack::class) && interface_exists(MessageBusInterface::class),
|
||||
'routing' => [],
|
||||
'transports' => [],
|
||||
'failure_transport' => null,
|
||||
'serializer' => [
|
||||
'default_serializer' => 'messenger.transport.native_php_serializer',
|
||||
'symfony_serializer' => [
|
||||
|
@ -721,12 +721,16 @@ abstract class FrameworkExtensionTest extends TestCase
|
||||
'*' => false,
|
||||
];
|
||||
|
||||
$this->assertSame($messageToSendAndHandleMapping, $senderLocatorDefinition->getArgument(1));
|
||||
$this->assertSame($messageToSendAndHandleMapping, $senderLocatorDefinition->getArgument(2));
|
||||
$sendersMapping = $senderLocatorDefinition->getArgument(0);
|
||||
$this->assertEquals([
|
||||
'amqp' => new Reference('messenger.transport.amqp'),
|
||||
'audit' => new Reference('audit'),
|
||||
], $sendersMapping[DummyMessage::class]->getValues());
|
||||
'amqp',
|
||||
'audit',
|
||||
], $sendersMapping[DummyMessage::class]);
|
||||
$sendersLocator = $container->getDefinition((string) $senderLocatorDefinition->getArgument(1));
|
||||
$this->assertSame(['amqp', 'audit'], array_keys($sendersLocator->getArgument(0)));
|
||||
$this->assertEquals(new Reference('messenger.transport.amqp'), $sendersLocator->getArgument(0)['amqp']->getValues()[0]);
|
||||
$this->assertEquals(new Reference('audit'), $sendersLocator->getArgument(0)['audit']->getValues()[0]);
|
||||
}
|
||||
|
||||
public function testMessengerTransportConfiguration()
|
||||
|
@ -4,6 +4,13 @@ CHANGELOG
|
||||
4.3.0
|
||||
-----
|
||||
|
||||
* [BC BREAK] `SendersLocatorInterface` has an additional method:
|
||||
`getSenderByAlias()`.
|
||||
* A new `ListableReceiverInterface` was added, which a receiver
|
||||
can implement (when applicable) to enable listing and fetching
|
||||
individual messages by id (used in the new "Failed Messages" commands).
|
||||
* Both `SenderInterface::send()` and `ReceiverInterface::get()`
|
||||
should now (when applicable) add a `TransportMessageIdStamp`.
|
||||
* Added `WorkerStoppedEvent` dispatched when a worker is stopped.
|
||||
* Added optional `MessageCountAwareInterface` that receivers can implement
|
||||
to give information about how many messages are waiting to be processed.
|
||||
|
@ -0,0 +1,112 @@
|
||||
<?php
|
||||
|
||||
/*
|
||||
* This file is part of the Symfony package.
|
||||
*
|
||||
* (c) Fabien Potencier <fabien@symfony.com>
|
||||
*
|
||||
* For the full copyright and license information, please view the LICENSE
|
||||
* file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
namespace Symfony\Component\Messenger\Command;
|
||||
|
||||
use Symfony\Component\Console\Command\Command;
|
||||
use Symfony\Component\Console\Helper\Dumper;
|
||||
use Symfony\Component\Console\Style\SymfonyStyle;
|
||||
use Symfony\Component\Messenger\Envelope;
|
||||
use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
|
||||
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
|
||||
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
|
||||
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
|
||||
|
||||
/**
|
||||
* @author Ryan Weaver <ryan@symfonycasts.com>
|
||||
*
|
||||
* @internal
|
||||
* @experimental in 4.3
|
||||
*/
|
||||
abstract class AbstractFailedMessagesCommand extends Command
|
||||
{
|
||||
private $receiverName;
|
||||
private $receiver;
|
||||
|
||||
public function __construct(string $receiverName, ReceiverInterface $receiver)
|
||||
{
|
||||
$this->receiverName = $receiverName;
|
||||
$this->receiver = $receiver;
|
||||
|
||||
parent::__construct();
|
||||
}
|
||||
|
||||
protected function getReceiverName()
|
||||
{
|
||||
return $this->receiverName;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return mixed|null
|
||||
*/
|
||||
protected function getMessageId(Envelope $envelope)
|
||||
{
|
||||
/** @var TransportMessageIdStamp $stamp */
|
||||
$stamp = $envelope->last(TransportMessageIdStamp::class);
|
||||
|
||||
return null !== $stamp ? $stamp->getId() : null;
|
||||
}
|
||||
|
||||
protected function displaySingleMessage(Envelope $envelope, SymfonyStyle $io)
|
||||
{
|
||||
$io->title('Failed Message Details');
|
||||
|
||||
/** @var SentToFailureTransportStamp $sentToFailureTransportStamp */
|
||||
$sentToFailureTransportStamp = $envelope->last(SentToFailureTransportStamp::class);
|
||||
|
||||
$rows = [
|
||||
['Class', \get_class($envelope->getMessage())],
|
||||
];
|
||||
|
||||
if (null !== $id = $this->getMessageId($envelope)) {
|
||||
$rows[] = ['Message Id', $id];
|
||||
}
|
||||
|
||||
if (null === $sentToFailureTransportStamp) {
|
||||
$io->warning('Message does not appear to have been sent to this transport after failing');
|
||||
} else {
|
||||
$rows = array_merge($rows, [
|
||||
['Failed at', $sentToFailureTransportStamp->getSentAt()->format('Y-m-d H:i:s')],
|
||||
['Error', $sentToFailureTransportStamp->getExceptionMessage()],
|
||||
['Error Class', $sentToFailureTransportStamp->getFlattenException() ? $sentToFailureTransportStamp->getFlattenException()->getClass() : '(unknown)'],
|
||||
['Transport', $sentToFailureTransportStamp->getOriginalReceiverName()],
|
||||
]);
|
||||
}
|
||||
|
||||
$io->table([], $rows);
|
||||
|
||||
if ($io->isVeryVerbose()) {
|
||||
$io->title('Message:');
|
||||
$dump = new Dumper($io);
|
||||
$io->writeln($dump($envelope->getMessage()));
|
||||
$io->title('Exception:');
|
||||
$io->writeln($sentToFailureTransportStamp->getFlattenException()->getTraceAsString());
|
||||
} else {
|
||||
$io->writeln(' Re-run command with <info>-vv</info> to see more message & error details.');
|
||||
}
|
||||
}
|
||||
|
||||
protected function printPendingMessagesMessage(ReceiverInterface $receiver, SymfonyStyle $io)
|
||||
{
|
||||
if ($receiver instanceof MessageCountAwareInterface) {
|
||||
if (1 === $receiver->getMessageCount()) {
|
||||
$io->writeln('There is <comment>1</comment> message pending in the failure transport.');
|
||||
} else {
|
||||
$io->writeln(sprintf('There are <comment>%d</comment> messages pending in the failure transport.', $receiver->getMessageCount()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected function getReceiver(): ReceiverInterface
|
||||
{
|
||||
return $this->receiver;
|
||||
}
|
||||
}
|
@ -0,0 +1,88 @@
|
||||
<?php
|
||||
|
||||
/*
|
||||
* This file is part of the Symfony package.
|
||||
*
|
||||
* (c) Fabien Potencier <fabien@symfony.com>
|
||||
*
|
||||
* For the full copyright and license information, please view the LICENSE
|
||||
* file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
namespace Symfony\Component\Messenger\Command;
|
||||
|
||||
use Symfony\Component\Console\Exception\RuntimeException;
|
||||
use Symfony\Component\Console\Input\InputArgument;
|
||||
use Symfony\Component\Console\Input\InputInterface;
|
||||
use Symfony\Component\Console\Input\InputOption;
|
||||
use Symfony\Component\Console\Output\ConsoleOutputInterface;
|
||||
use Symfony\Component\Console\Output\OutputInterface;
|
||||
use Symfony\Component\Console\Style\SymfonyStyle;
|
||||
use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
|
||||
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
|
||||
|
||||
/**
|
||||
* @author Ryan Weaver <ryan@symfonycasts.com>
|
||||
*
|
||||
* @experimental in 4.3
|
||||
*/
|
||||
class FailedMessagesRemoveCommand extends AbstractFailedMessagesCommand
|
||||
{
|
||||
protected static $defaultName = 'messenger:failed:remove';
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
protected function configure(): void
|
||||
{
|
||||
$this
|
||||
->setDefinition([
|
||||
new InputArgument('id', InputArgument::REQUIRED, 'Specific message id to remove'),
|
||||
new InputOption('force', null, InputOption::VALUE_NONE, 'Force the operation without confirmation'),
|
||||
])
|
||||
->setDescription('Remove a message from the failure transport.')
|
||||
->setHelp(<<<'EOF'
|
||||
The <info>%command.name%</info> removes a message that is pending in the failure transport.
|
||||
|
||||
<info>php %command.full_name% {id}</info>
|
||||
|
||||
The specific id can be found via the messenger:failed:show command.
|
||||
EOF
|
||||
)
|
||||
;
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
protected function execute(InputInterface $input, OutputInterface $output)
|
||||
{
|
||||
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
|
||||
|
||||
$receiver = $this->getReceiver();
|
||||
|
||||
$shouldForce = $input->getOption('force');
|
||||
$this->removeSingleMessage($input->getArgument('id'), $receiver, $io, $shouldForce);
|
||||
}
|
||||
|
||||
private function removeSingleMessage($id, ReceiverInterface $receiver, SymfonyStyle $io, bool $shouldForce)
|
||||
{
|
||||
if (!$receiver instanceof ListableReceiverInterface) {
|
||||
throw new RuntimeException(sprintf('The "%s" receiver does not support removing specific messages.', $this->getReceiverName()));
|
||||
}
|
||||
|
||||
$envelope = $receiver->find($id);
|
||||
if (null === $envelope) {
|
||||
throw new RuntimeException(sprintf('The message with id "%s" was not found.', $id));
|
||||
}
|
||||
$this->displaySingleMessage($envelope, $io);
|
||||
|
||||
if ($shouldForce || $io->confirm('Do you want to permanently remove this message?', false)) {
|
||||
$receiver->reject($envelope);
|
||||
|
||||
$io->success('Message removed.');
|
||||
} else {
|
||||
$io->note('Message not removed.');
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,221 @@
|
||||
<?php
|
||||
|
||||
/*
|
||||
* This file is part of the Symfony package.
|
||||
*
|
||||
* (c) Fabien Potencier <fabien@symfony.com>
|
||||
*
|
||||
* For the full copyright and license information, please view the LICENSE
|
||||
* file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
namespace Symfony\Component\Messenger\Command;
|
||||
|
||||
use Psr\Log\LoggerInterface;
|
||||
use Symfony\Component\Console\Exception\RuntimeException;
|
||||
use Symfony\Component\Console\Input\InputArgument;
|
||||
use Symfony\Component\Console\Input\InputInterface;
|
||||
use Symfony\Component\Console\Input\InputOption;
|
||||
use Symfony\Component\Console\Output\ConsoleOutputInterface;
|
||||
use Symfony\Component\Console\Output\OutputInterface;
|
||||
use Symfony\Component\Console\Style\SymfonyStyle;
|
||||
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
|
||||
use Symfony\Component\Messenger\Envelope;
|
||||
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
|
||||
use Symfony\Component\Messenger\Exception\LogicException;
|
||||
use Symfony\Component\Messenger\MessageBusInterface;
|
||||
use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
|
||||
use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
|
||||
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
|
||||
use Symfony\Component\Messenger\Transport\Receiver\SingleMessageReceiver;
|
||||
use Symfony\Component\Messenger\Worker;
|
||||
|
||||
/**
|
||||
* @author Ryan Weaver <ryan@symfonycasts.com>
|
||||
*
|
||||
* @experimental in 4.3
|
||||
*/
|
||||
class FailedMessagesRetryCommand extends AbstractFailedMessagesCommand
|
||||
{
|
||||
protected static $defaultName = 'messenger:failed:retry';
|
||||
|
||||
private $eventDispatcher;
|
||||
private $messageBus;
|
||||
private $retryStrategy;
|
||||
private $logger;
|
||||
|
||||
public function __construct(string $receiverName, ReceiverInterface $receiver, MessageBusInterface $messageBus, EventDispatcherInterface $eventDispatcher, RetryStrategyInterface $retryStrategy = null, LoggerInterface $logger = null)
|
||||
{
|
||||
$this->eventDispatcher = $eventDispatcher;
|
||||
$this->messageBus = $messageBus;
|
||||
$this->retryStrategy = $retryStrategy;
|
||||
$this->logger = $logger;
|
||||
|
||||
parent::__construct($receiverName, $receiver);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
protected function configure(): void
|
||||
{
|
||||
$this
|
||||
->setDefinition([
|
||||
new InputArgument('id', InputArgument::IS_ARRAY, 'Specific message id(s) to retry'),
|
||||
new InputOption('force', null, InputOption::VALUE_NONE, 'Force action without confirmation'),
|
||||
])
|
||||
->setDescription('Retries one or more messages from the failure transport.')
|
||||
->setHelp(<<<'EOF'
|
||||
The <info>%command.name%</info> retries message in the failure transport.
|
||||
|
||||
<info>php %command.full_name%</info>
|
||||
|
||||
The command will interactively ask if each message should be retried
|
||||
or discarded.
|
||||
|
||||
Some transports support retrying a specific message id, which comes
|
||||
from the <info>messenger:failed:show</info> command.
|
||||
|
||||
<info>php %command.full_name% {id}</info>
|
||||
|
||||
Or pass multiple ids at once to process multiple messages:
|
||||
|
||||
<info>php %command.full_name% {id1} {id2} {id3}</info>
|
||||
|
||||
EOF
|
||||
)
|
||||
;
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
protected function execute(InputInterface $input, OutputInterface $output)
|
||||
{
|
||||
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
|
||||
$io->comment('Quit this command with CONTROL-C.');
|
||||
if (!$output->isVeryVerbose()) {
|
||||
$io->comment('Re-run the command with a -vv option to see logs about consumed messages.');
|
||||
}
|
||||
|
||||
$receiver = $this->getReceiver();
|
||||
$this->printPendingMessagesMessage($receiver, $io);
|
||||
|
||||
$io->writeln(sprintf('To retry all the messages, run <comment>messenger:consume %s</comment>', $this->getReceiverName()));
|
||||
|
||||
$shouldForce = $input->getOption('force');
|
||||
$ids = $input->getArgument('id');
|
||||
if (0 === \count($ids)) {
|
||||
if (!$input->isInteractive()) {
|
||||
throw new RuntimeException('Message id must be passed when in non-interactive mode.');
|
||||
}
|
||||
|
||||
$this->runInteractive($io, $shouldForce);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
$this->retrySpecificIds($ids, $io, $shouldForce);
|
||||
$io->success('All done!');
|
||||
}
|
||||
|
||||
private function runInteractive(SymfonyStyle $io, bool $shouldForce)
|
||||
{
|
||||
$receiver = $this->getReceiver();
|
||||
$count = 0;
|
||||
if ($receiver instanceof ListableReceiverInterface) {
|
||||
// for listable receivers, find the messages one-by-one
|
||||
// this avoids using get(), which for some less-robust
|
||||
// transports (like Doctrine), will cause the message
|
||||
// to be temporarily "acked", even if the user aborts
|
||||
// handling the message
|
||||
while (true) {
|
||||
$ids = [];
|
||||
foreach ($receiver->all(1) as $envelope) {
|
||||
++$count;
|
||||
|
||||
$id = $this->getMessageId($envelope);
|
||||
if (null === $id) {
|
||||
throw new LogicException(sprintf('The "%s" receiver is able to list messages by id but the envelope is missing the TransportMessageIdStamp stamp.', $this->getReceiverName()));
|
||||
}
|
||||
$ids[] = $id;
|
||||
}
|
||||
|
||||
// break the loop if all messages are consumed
|
||||
if (0 === \count($ids)) {
|
||||
break;
|
||||
}
|
||||
|
||||
$this->retrySpecificIds($ids, $io, $shouldForce);
|
||||
}
|
||||
} else {
|
||||
// get() and ask messages one-by-one
|
||||
$count = $this->runWorker($this->getReceiver(), $io, $shouldForce);
|
||||
}
|
||||
|
||||
// avoid success message if nothing was processed
|
||||
if (1 < $count) {
|
||||
$io->success('All failed messages have been handled or removed!');
|
||||
}
|
||||
}
|
||||
|
||||
private function runWorker(ReceiverInterface $receiver, SymfonyStyle $io, bool $shouldForce): int
|
||||
{
|
||||
$listener = function (WorkerMessageReceivedEvent $messageReceivedEvent) use ($io, $receiver, $shouldForce) {
|
||||
$envelope = $messageReceivedEvent->getEnvelope();
|
||||
|
||||
$this->displaySingleMessage($envelope, $io);
|
||||
|
||||
$shouldHandle = $shouldForce || $io->confirm('Do you want to retry (yes) or delete this message (no)?');
|
||||
|
||||
if ($shouldHandle) {
|
||||
return;
|
||||
}
|
||||
|
||||
$messageReceivedEvent->shouldHandle(false);
|
||||
$receiver->reject($envelope);
|
||||
};
|
||||
$this->eventDispatcher->addListener(WorkerMessageReceivedEvent::class, $listener);
|
||||
|
||||
$worker = new Worker(
|
||||
[$this->getReceiverName() => $receiver],
|
||||
$this->messageBus,
|
||||
[$this->getReceiverName() => $this->retryStrategy],
|
||||
$this->eventDispatcher,
|
||||
$this->logger
|
||||
);
|
||||
|
||||
$count = 0;
|
||||
try {
|
||||
$worker->run([], function (?Envelope $envelope) use ($worker, $io, &$count) {
|
||||
++$count;
|
||||
if (null === $envelope) {
|
||||
$worker->stop();
|
||||
}
|
||||
});
|
||||
} finally {
|
||||
$this->eventDispatcher->removeListener(WorkerMessageReceivedEvent::class, $listener);
|
||||
}
|
||||
|
||||
return $count;
|
||||
}
|
||||
|
||||
private function retrySpecificIds(array $ids, SymfonyStyle $io, bool $shouldForce)
|
||||
{
|
||||
$receiver = $this->getReceiver();
|
||||
|
||||
if (!$receiver instanceof ListableReceiverInterface) {
|
||||
throw new RuntimeException(sprintf('The "%s" receiver does not support retrying messages by id.', $this->getReceiverName()));
|
||||
}
|
||||
|
||||
foreach ($ids as $id) {
|
||||
$envelope = $receiver->find($id);
|
||||
if (null === $envelope) {
|
||||
throw new RuntimeException(sprintf('The message "%s" was not found.', $id));
|
||||
}
|
||||
|
||||
$singleReceiver = new SingleMessageReceiver($receiver, $envelope);
|
||||
$this->runWorker($singleReceiver, $io, $shouldForce);
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,129 @@
|
||||
<?php
|
||||
|
||||
/*
|
||||
* This file is part of the Symfony package.
|
||||
*
|
||||
* (c) Fabien Potencier <fabien@symfony.com>
|
||||
*
|
||||
* For the full copyright and license information, please view the LICENSE
|
||||
* file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
namespace Symfony\Component\Messenger\Command;
|
||||
|
||||
use Symfony\Component\Console\Exception\RuntimeException;
|
||||
use Symfony\Component\Console\Input\InputArgument;
|
||||
use Symfony\Component\Console\Input\InputInterface;
|
||||
use Symfony\Component\Console\Input\InputOption;
|
||||
use Symfony\Component\Console\Output\ConsoleOutputInterface;
|
||||
use Symfony\Component\Console\Output\OutputInterface;
|
||||
use Symfony\Component\Console\Style\SymfonyStyle;
|
||||
use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
|
||||
use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
|
||||
|
||||
/**
|
||||
* @author Ryan Weaver <ryan@symfonycasts.com>
|
||||
*
|
||||
* @experimental in 4.3
|
||||
*/
|
||||
class FailedMessagesShowCommand extends AbstractFailedMessagesCommand
|
||||
{
|
||||
protected static $defaultName = 'messenger:failed:show';
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
protected function configure(): void
|
||||
{
|
||||
$this
|
||||
->setDefinition([
|
||||
new InputArgument('id', InputArgument::OPTIONAL, 'Specific message id to show'),
|
||||
new InputOption('max', null, InputOption::VALUE_REQUIRED, 'Maximum number of messages to list', 50),
|
||||
])
|
||||
->setDescription('Shows one or more messages from the failure transport.')
|
||||
->setHelp(<<<'EOF'
|
||||
The <info>%command.name%</info> shows message that are pending in the failure transport.
|
||||
|
||||
<info>php %command.full_name%</info>
|
||||
|
||||
Or look at a specific message by its id:
|
||||
|
||||
<info>php %command.full_name% {id}</info>
|
||||
EOF
|
||||
)
|
||||
;
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
protected function execute(InputInterface $input, OutputInterface $output)
|
||||
{
|
||||
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
|
||||
|
||||
$receiver = $this->getReceiver();
|
||||
$this->printPendingMessagesMessage($receiver, $io);
|
||||
|
||||
if (!$receiver instanceof ListableReceiverInterface) {
|
||||
throw new RuntimeException(sprintf('The "%s" receiver does not support listing or showing specific messages.', $this->getReceiverName()));
|
||||
}
|
||||
|
||||
if (null === $id = $input->getArgument('id')) {
|
||||
$this->listMessages($io, $input->getOption('max'));
|
||||
} else {
|
||||
$this->showMessage($id, $io);
|
||||
}
|
||||
}
|
||||
|
||||
private function listMessages(SymfonyStyle $io, int $max)
|
||||
{
|
||||
/** @var ListableReceiverInterface $receiver */
|
||||
$receiver = $this->getReceiver();
|
||||
$envelopes = $receiver->all($max);
|
||||
|
||||
$rows = [];
|
||||
foreach ($envelopes as $envelope) {
|
||||
/** @var SentToFailureTransportStamp $sentToFailureTransportStamp */
|
||||
$sentToFailureTransportStamp = $envelope->last(SentToFailureTransportStamp::class);
|
||||
|
||||
$rows[] = [
|
||||
$this->getMessageId($envelope),
|
||||
\get_class($envelope->getMessage()),
|
||||
null === $sentToFailureTransportStamp ? '' : $sentToFailureTransportStamp->getSentAt()->format('Y-m-d H:i:s'),
|
||||
null === $sentToFailureTransportStamp ? '' : $sentToFailureTransportStamp->getExceptionMessage(),
|
||||
];
|
||||
}
|
||||
|
||||
if (0 === \count($rows)) {
|
||||
$io->success('No failed messages were found.');
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
$io->table(['Id', 'Class', 'Failed at', 'Error'], $rows);
|
||||
|
||||
if (\count($rows) === $max) {
|
||||
$io->comment(sprintf('Showing first %d messages.', $max));
|
||||
}
|
||||
|
||||
$io->comment('Run <comment>messenger:failed:show {id} -vv</comment> to see message details.');
|
||||
}
|
||||
|
||||
private function showMessage($id, SymfonyStyle $io)
|
||||
{
|
||||
/** @var ListableReceiverInterface $receiver */
|
||||
$receiver = $this->getReceiver();
|
||||
$envelope = $receiver->find($id);
|
||||
if (null === $envelope) {
|
||||
throw new RuntimeException(sprintf('The message "%s" was not found.', $id));
|
||||
}
|
||||
|
||||
$this->displaySingleMessage($envelope, $io);
|
||||
|
||||
$io->writeln([
|
||||
'',
|
||||
sprintf(' Run <comment>messenger:failed:retry %s</comment> to retry this message.', $id),
|
||||
sprintf(' Run <comment>messenger:failed:remove %s</comment> to delete it.', $id),
|
||||
]);
|
||||
}
|
||||
}
|
@ -247,12 +247,18 @@ class MessengerPass implements CompilerPassInterface
|
||||
foreach ($receiverMapping as $name => $reference) {
|
||||
$receiverNames[(string) $reference] = $name;
|
||||
}
|
||||
if ($container->hasDefinition('console.command.messenger_consume_messages')) {
|
||||
$buses = [];
|
||||
foreach ($busIds as $busId) {
|
||||
$buses[$busId] = new Reference($busId);
|
||||
}
|
||||
|
||||
$buses = [];
|
||||
foreach ($busIds as $busId) {
|
||||
$buses[$busId] = new Reference($busId);
|
||||
}
|
||||
|
||||
if ($container->hasDefinition('messenger.routable_message_bus')) {
|
||||
$container->getDefinition('messenger.routable_message_bus')
|
||||
->replaceArgument(0, ServiceLocatorTagPass::register($container, $buses));
|
||||
}
|
||||
|
||||
if ($container->hasDefinition('console.command.messenger_consume_messages')) {
|
||||
$container->getDefinition('console.command.messenger_consume_messages')
|
||||
->replaceArgument(0, ServiceLocatorTagPass::register($container, $buses))
|
||||
->replaceArgument(3, array_values($receiverNames));
|
||||
@ -264,6 +270,18 @@ class MessengerPass implements CompilerPassInterface
|
||||
}
|
||||
|
||||
$container->getDefinition('messenger.receiver_locator')->replaceArgument(0, $receiverMapping);
|
||||
|
||||
$failedCommandIds = [
|
||||
'console.command.messenger_failed_messages_retry',
|
||||
'console.command.messenger_failed_messages_show',
|
||||
'console.command.messenger_failed_messages_remove',
|
||||
];
|
||||
foreach ($failedCommandIds as $failedCommandId) {
|
||||
if ($container->hasDefinition($failedCommandId)) {
|
||||
$definition = $container->getDefinition($failedCommandId);
|
||||
$definition->replaceArgument(1, $receiverMapping[$definition->getArgument(0)]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private function registerBusToCollector(ContainerBuilder $container, string $busId)
|
||||
|
@ -20,4 +20,14 @@ namespace Symfony\Component\Messenger\Event;
|
||||
*/
|
||||
class WorkerMessageReceivedEvent extends AbstractWorkerMessageEvent
|
||||
{
|
||||
private $shouldHandle = true;
|
||||
|
||||
public function shouldHandle(bool $shouldHandle = null): bool
|
||||
{
|
||||
if (null !== $shouldHandle) {
|
||||
$this->shouldHandle = $shouldHandle;
|
||||
}
|
||||
|
||||
return $this->shouldHandle;
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,89 @@
|
||||
<?php
|
||||
/*
|
||||
* This file is part of the Symfony package.
|
||||
*
|
||||
* (c) Fabien Potencier <fabien@symfony.com>
|
||||
*
|
||||
* For the full copyright and license information, please view the LICENSE
|
||||
* file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
namespace Symfony\Component\Messenger\EventListener;
|
||||
|
||||
use Psr\Log\LoggerInterface;
|
||||
use Symfony\Component\Debug\Exception\FlattenException;
|
||||
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
|
||||
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
|
||||
use Symfony\Component\Messenger\Exception\HandlerFailedException;
|
||||
use Symfony\Component\Messenger\MessageBusInterface;
|
||||
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
|
||||
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
|
||||
use Symfony\Component\Messenger\Stamp\SentStamp;
|
||||
use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
|
||||
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
|
||||
|
||||
/**
|
||||
* Sends a rejected message to a "failure transport".
|
||||
*
|
||||
* @author Ryan Weaver <ryan@symfonycasts.com>
|
||||
*
|
||||
* @experimental in 4.3
|
||||
*/
|
||||
class SendFailedMessageToFailureTransportListener implements EventSubscriberInterface
|
||||
{
|
||||
private $messageBus;
|
||||
private $failureSenderAlias;
|
||||
private $logger;
|
||||
|
||||
public function __construct(MessageBusInterface $messageBus, string $failureSenderAlias, LoggerInterface $logger = null)
|
||||
{
|
||||
$this->messageBus = $messageBus;
|
||||
$this->failureSenderAlias = $failureSenderAlias;
|
||||
$this->logger = $logger;
|
||||
}
|
||||
|
||||
public function onMessageFailed(WorkerMessageFailedEvent $event)
|
||||
{
|
||||
if ($event->willRetry()) {
|
||||
return;
|
||||
}
|
||||
|
||||
$envelope = $event->getEnvelope();
|
||||
|
||||
// avoid re-sending to the failed sender
|
||||
foreach ($envelope->all(SentStamp::class) as $sentStamp) {
|
||||
/** @var SentStamp $sentStamp */
|
||||
if ($sentStamp->getSenderAlias() === $this->failureSenderAlias) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// remove the received stamp so it's redelivered
|
||||
$throwable = $event->getThrowable();
|
||||
if ($throwable instanceof HandlerFailedException) {
|
||||
$throwable = $throwable->getNestedExceptions()[0];
|
||||
}
|
||||
|
||||
$flattenedException = \class_exists(FlattenException::class) ? FlattenException::createFromThrowable($throwable) : null;
|
||||
$envelope = $envelope->withoutAll(ReceivedStamp::class)
|
||||
->withoutAll(TransportMessageIdStamp::class)
|
||||
->with(new SentToFailureTransportStamp($throwable->getMessage(), $event->getReceiverName(), $flattenedException))
|
||||
->with(new RedeliveryStamp(0, $this->failureSenderAlias));
|
||||
|
||||
if (null !== $this->logger) {
|
||||
$this->logger->info('Rejected message {class} will be sent to the failure transport {transport}.', [
|
||||
'class' => \get_class($envelope->getMessage()),
|
||||
'transport' => $this->failureSenderAlias,
|
||||
]);
|
||||
}
|
||||
|
||||
$this->messageBus->dispatch($envelope);
|
||||
}
|
||||
|
||||
public static function getSubscribedEvents()
|
||||
{
|
||||
return [
|
||||
WorkerMessageFailedEvent::class => ['onMessageFailed', -100],
|
||||
];
|
||||
}
|
||||
}
|
@ -0,0 +1,21 @@
|
||||
<?php
|
||||
|
||||
/*
|
||||
* This file is part of the Symfony package.
|
||||
*
|
||||
* (c) Fabien Potencier <fabien@symfony.com>
|
||||
*
|
||||
* For the full copyright and license information, please view the LICENSE
|
||||
* file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
namespace Symfony\Component\Messenger\Exception;
|
||||
|
||||
/**
|
||||
* @author Ryan Weaver <ryan@symfonycasts.com>
|
||||
*
|
||||
* @experimental in 4.3
|
||||
*/
|
||||
class UnknownSenderException extends \InvalidArgumentException implements ExceptionInterface
|
||||
{
|
||||
}
|
@ -32,7 +32,9 @@ class AddBusNameStampMiddleware implements MiddlewareInterface
|
||||
|
||||
public function handle(Envelope $envelope, StackInterface $stack): Envelope
|
||||
{
|
||||
$envelope = $envelope->with(new BusNameStamp($this->busName));
|
||||
if (null === $envelope->last(BusNameStamp::class)) {
|
||||
$envelope = $envelope->with(new BusNameStamp($this->busName));
|
||||
}
|
||||
|
||||
return $stack->next()->handle($envelope, $stack);
|
||||
}
|
||||
|
@ -19,6 +19,7 @@ use Symfony\Component\Messenger\Stamp\ForceCallHandlersStamp;
|
||||
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
|
||||
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
|
||||
use Symfony\Component\Messenger\Stamp\SentStamp;
|
||||
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
|
||||
use Symfony\Component\Messenger\Transport\Sender\SendersLocatorInterface;
|
||||
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
|
||||
|
||||
@ -65,12 +66,7 @@ class SendMessageMiddleware implements MiddlewareInterface
|
||||
|
||||
// dispatch event unless this is a redelivery
|
||||
$shouldDispatchEvent = null === $redeliveryStamp;
|
||||
foreach ($this->sendersLocator->getSenders($envelope, $handle) as $alias => $sender) {
|
||||
// on redelivery, only deliver to the given sender
|
||||
if (null !== $redeliveryStamp && !$redeliveryStamp->shouldRedeliverToSender(\get_class($sender), $alias)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
foreach ($this->getSenders($envelope, $handle, $redeliveryStamp) as $alias => $sender) {
|
||||
if (null !== $this->eventDispatcher && $shouldDispatchEvent) {
|
||||
$event = new SendMessageToTransportsEvent($envelope);
|
||||
$this->eventDispatcher->dispatch($event);
|
||||
@ -107,4 +103,18 @@ class SendMessageMiddleware implements MiddlewareInterface
|
||||
// message should only be sent and not be handled by the next middleware
|
||||
return $envelope;
|
||||
}
|
||||
|
||||
/**
|
||||
* * @return iterable|SenderInterface[]
|
||||
*/
|
||||
private function getSenders(Envelope $envelope, &$handle, ?RedeliveryStamp $redeliveryStamp): iterable
|
||||
{
|
||||
if (null !== $redeliveryStamp) {
|
||||
return [
|
||||
$redeliveryStamp->getSenderClassOrAlias() => $this->sendersLocator->getSenderByAlias($redeliveryStamp->getSenderClassOrAlias()),
|
||||
];
|
||||
}
|
||||
|
||||
return $this->sendersLocator->getSenders($envelope, $handle);
|
||||
}
|
||||
}
|
||||
|
@ -39,12 +39,12 @@ class MultiplierRetryStrategy implements RetryStrategyInterface
|
||||
private $maxDelayMilliseconds;
|
||||
|
||||
/**
|
||||
* @param int $maxRetries The maximum number of time to retry (0 means indefinitely)
|
||||
* @param int $maxRetries The maximum number of time to retry (null means indefinitely)
|
||||
* @param int $delayMilliseconds Amount of time to delay (or the initial value when multiplier is used)
|
||||
* @param float $multiplier Multiplier to apply to the delay each time a retry occurs
|
||||
* @param int $maxDelayMilliseconds Maximum delay to allow (0 means no maximum)
|
||||
*/
|
||||
public function __construct(int $maxRetries = 3, int $delayMilliseconds = 1000, float $multiplier = 1, int $maxDelayMilliseconds = 0)
|
||||
public function __construct(?int $maxRetries = 3, int $delayMilliseconds = 1000, float $multiplier = 1, int $maxDelayMilliseconds = 0)
|
||||
{
|
||||
$this->maxRetries = $maxRetries;
|
||||
|
||||
@ -66,7 +66,7 @@ class MultiplierRetryStrategy implements RetryStrategyInterface
|
||||
|
||||
public function isRetryable(Envelope $message): bool
|
||||
{
|
||||
if (0 === $this->maxRetries) {
|
||||
if (null === $this->maxRetries) {
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -44,17 +44,4 @@ class RedeliveryStamp implements StampInterface
|
||||
{
|
||||
return $this->senderClassOrAlias;
|
||||
}
|
||||
|
||||
public function shouldRedeliverToSender(string $senderClass, ?string $senderAlias): bool
|
||||
{
|
||||
if (null !== $senderAlias && $senderAlias === $this->senderClassOrAlias) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if ($senderClass === $this->senderClassOrAlias) {
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,57 @@
|
||||
<?php
|
||||
|
||||
/*
|
||||
* This file is part of the Symfony package.
|
||||
*
|
||||
* (c) Fabien Potencier <fabien@symfony.com>
|
||||
*
|
||||
* For the full copyright and license information, please view the LICENSE
|
||||
* file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
namespace Symfony\Component\Messenger\Stamp;
|
||||
|
||||
use Symfony\Component\Debug\Exception\FlattenException;
|
||||
|
||||
/**
|
||||
* Stamp applied when a message is sent to the failure transport.
|
||||
*
|
||||
* @author Ryan Weaver <ryan@symfonycasts.com>
|
||||
*
|
||||
* @experimental in 4.3
|
||||
*/
|
||||
class SentToFailureTransportStamp implements StampInterface
|
||||
{
|
||||
private $exceptionMessage;
|
||||
private $originalReceiverName;
|
||||
private $flattenException;
|
||||
private $sentAt;
|
||||
|
||||
public function __construct(string $exceptionMessage, string $originalReceiverName, FlattenException $flattenException = null)
|
||||
{
|
||||
$this->exceptionMessage = $exceptionMessage;
|
||||
$this->originalReceiverName = $originalReceiverName;
|
||||
$this->flattenException = $flattenException;
|
||||
$this->sentAt = new \DateTime();
|
||||
}
|
||||
|
||||
public function getExceptionMessage(): string
|
||||
{
|
||||
return $this->exceptionMessage;
|
||||
}
|
||||
|
||||
public function getOriginalReceiverName(): string
|
||||
{
|
||||
return $this->originalReceiverName;
|
||||
}
|
||||
|
||||
public function getFlattenException(): ?FlattenException
|
||||
{
|
||||
return $this->flattenException;
|
||||
}
|
||||
|
||||
public function getSentAt(): \DateTime
|
||||
{
|
||||
return $this->sentAt;
|
||||
}
|
||||
}
|
@ -0,0 +1,37 @@
|
||||
<?php
|
||||
|
||||
/*
|
||||
* This file is part of the Symfony package.
|
||||
*
|
||||
* (c) Fabien Potencier <fabien@symfony.com>
|
||||
*
|
||||
* For the full copyright and license information, please view the LICENSE
|
||||
* file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
namespace Symfony\Component\Messenger\Stamp;
|
||||
|
||||
/**
|
||||
* Added by a sender or receiver to indicate the id of this message in that transport.
|
||||
*
|
||||
* @author Ryan Weaver <ryan@symfonycasts.com>
|
||||
*
|
||||
* @experimental in 4.3
|
||||
*/
|
||||
class TransportMessageIdStamp implements StampInterface
|
||||
{
|
||||
private $id;
|
||||
|
||||
/**
|
||||
* @param mixed $id some "identifier" of the message in a transport
|
||||
*/
|
||||
public function __construct($id)
|
||||
{
|
||||
$this->id = $id;
|
||||
}
|
||||
|
||||
public function getId()
|
||||
{
|
||||
return $this->id;
|
||||
}
|
||||
}
|
@ -0,0 +1,37 @@
|
||||
<?php
|
||||
|
||||
/*
|
||||
* This file is part of the Symfony package.
|
||||
*
|
||||
* (c) Fabien Potencier <fabien@symfony.com>
|
||||
*
|
||||
* For the full copyright and license information, please view the LICENSE
|
||||
* file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
namespace Symfony\Component\Messenger\Tests\Command;
|
||||
|
||||
use PHPUnit\Framework\TestCase;
|
||||
use Symfony\Component\Console\Tester\CommandTester;
|
||||
use Symfony\Component\Messenger\Command\FailedMessagesRemoveCommand;
|
||||
use Symfony\Component\Messenger\Envelope;
|
||||
use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
|
||||
|
||||
class FailedMessagesRemoveCommandTest extends TestCase
|
||||
{
|
||||
public function testBasicRun()
|
||||
{
|
||||
$receiver = $this->createMock(ListableReceiverInterface::class);
|
||||
$receiver->expects($this->once())->method('find')->with(20)->willReturn(new Envelope(new \stdClass()));
|
||||
|
||||
$command = new FailedMessagesRemoveCommand(
|
||||
'failure_receiver',
|
||||
$receiver
|
||||
);
|
||||
|
||||
$tester = new CommandTester($command);
|
||||
$tester->execute(['id' => 20, '--force' => true]);
|
||||
|
||||
$this->assertContains('Message removed.', $tester->getDisplay());
|
||||
}
|
||||
}
|
@ -0,0 +1,49 @@
|
||||
<?php
|
||||
|
||||
/*
|
||||
* This file is part of the Symfony package.
|
||||
*
|
||||
* (c) Fabien Potencier <fabien@symfony.com>
|
||||
*
|
||||
* For the full copyright and license information, please view the LICENSE
|
||||
* file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
namespace Symfony\Component\Messenger\Tests\Command;
|
||||
|
||||
use PHPUnit\Framework\TestCase;
|
||||
use Symfony\Component\Console\Tester\CommandTester;
|
||||
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
|
||||
use Symfony\Component\Messenger\Command\FailedMessagesRetryCommand;
|
||||
use Symfony\Component\Messenger\Envelope;
|
||||
use Symfony\Component\Messenger\MessageBusInterface;
|
||||
use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
|
||||
use Symfony\Component\Messenger\Worker;
|
||||
|
||||
class FailedMessagesRetryCommandTest extends TestCase
|
||||
{
|
||||
public function testBasicRun()
|
||||
{
|
||||
$receiver = $this->createMock(ListableReceiverInterface::class);
|
||||
$receiver->expects($this->exactly(2))->method('find')->withConsecutive([10], [12])->willReturn(new Envelope(new \stdClass()));
|
||||
// message will eventually be ack'ed in Worker
|
||||
$receiver->expects($this->exactly(2))->method('ack');
|
||||
|
||||
$dispatcher = $this->createMock(EventDispatcherInterface::class);
|
||||
$bus = $this->createMock(MessageBusInterface::class);
|
||||
// the bus should be called in the worker
|
||||
$bus->expects($this->exactly(2))->method('dispatch')->willReturn(new Envelope(new \stdClass()));
|
||||
|
||||
$command = new FailedMessagesRetryCommand(
|
||||
'failure_receiver',
|
||||
$receiver,
|
||||
$bus,
|
||||
$dispatcher
|
||||
);
|
||||
|
||||
$tester = new CommandTester($command);
|
||||
$tester->execute(['id' => [10, 12]]);
|
||||
|
||||
$this->assertContains('[OK]', $tester->getDisplay());
|
||||
}
|
||||
}
|
@ -0,0 +1,57 @@
|
||||
<?php
|
||||
|
||||
/*
|
||||
* This file is part of the Symfony package.
|
||||
*
|
||||
* (c) Fabien Potencier <fabien@symfony.com>
|
||||
*
|
||||
* For the full copyright and license information, please view the LICENSE
|
||||
* file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
namespace Symfony\Component\Messenger\Tests\Command;
|
||||
|
||||
use PHPUnit\Framework\TestCase;
|
||||
use Symfony\Component\Console\Tester\CommandTester;
|
||||
use Symfony\Component\Messenger\Command\FailedMessagesShowCommand;
|
||||
use Symfony\Component\Messenger\Envelope;
|
||||
use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
|
||||
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
|
||||
use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
|
||||
|
||||
/**
|
||||
* @group time-sensitive
|
||||
*/
|
||||
class FailedMessagesShowCommandTest extends TestCase
|
||||
{
|
||||
public function testBasicRun()
|
||||
{
|
||||
$sentToFailureStamp = new SentToFailureTransportStamp('Things are bad!', 'async');
|
||||
$envelope = new Envelope(new \stdClass(), [
|
||||
new TransportMessageIdStamp(15),
|
||||
$sentToFailureStamp,
|
||||
]);
|
||||
$receiver = $this->createMock(ListableReceiverInterface::class);
|
||||
$receiver->expects($this->once())->method('find')->with(15)->willReturn($envelope);
|
||||
|
||||
$command = new FailedMessagesShowCommand(
|
||||
'failure_receiver',
|
||||
$receiver
|
||||
);
|
||||
|
||||
$tester = new CommandTester($command);
|
||||
$tester->execute(['id' => 15]);
|
||||
|
||||
$this->assertContains(sprintf(<<<EOF
|
||||
------------- ---------------------
|
||||
Class stdClass
|
||||
Message Id 15
|
||||
Failed at %s
|
||||
Error Things are bad!
|
||||
Error Class (unknown)
|
||||
EOF
|
||||
,
|
||||
$sentToFailureStamp->getSentAt()->format('Y-m-d H:i:s')),
|
||||
$tester->getDisplay(true));
|
||||
}
|
||||
}
|
@ -0,0 +1,121 @@
|
||||
<?php
|
||||
|
||||
/*
|
||||
* This file is part of the Symfony package.
|
||||
*
|
||||
* (c) Fabien Potencier <fabien@symfony.com>
|
||||
*
|
||||
* For the full copyright and license information, please view the LICENSE
|
||||
* file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
namespace Symfony\Component\Messenger\Tests\Handler;
|
||||
|
||||
use PHPUnit\Framework\TestCase;
|
||||
use Symfony\Component\Messenger\Envelope;
|
||||
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
|
||||
use Symfony\Component\Messenger\EventListener\SendFailedMessageToFailureTransportListener;
|
||||
use Symfony\Component\Messenger\Exception\HandlerFailedException;
|
||||
use Symfony\Component\Messenger\MessageBusInterface;
|
||||
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
|
||||
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
|
||||
use Symfony\Component\Messenger\Stamp\SentStamp;
|
||||
use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
|
||||
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
|
||||
|
||||
class SendFailedMessageToFailureTransportListenerTest extends TestCase
|
||||
{
|
||||
public function testItDispatchesToTheFailureTransport()
|
||||
{
|
||||
$bus = $this->createMock(MessageBusInterface::class);
|
||||
$bus->expects($this->once())->method('dispatch')->with($this->callback(function ($envelope) {
|
||||
/* @var Envelope $envelope */
|
||||
$this->assertInstanceOf(Envelope::class, $envelope);
|
||||
|
||||
/** @var SentToFailureTransportStamp $sentToFailureTransportStamp */
|
||||
$sentToFailureTransportStamp = $envelope->last(SentToFailureTransportStamp::class);
|
||||
$this->assertNotNull($sentToFailureTransportStamp);
|
||||
$this->assertSame('no!', $sentToFailureTransportStamp->getExceptionMessage());
|
||||
$this->assertSame('my_receiver', $sentToFailureTransportStamp->getOriginalReceiverName());
|
||||
$this->assertSame('no!', $sentToFailureTransportStamp->getFlattenException()->getMessage());
|
||||
|
||||
/** @var RedeliveryStamp $redeliveryStamp */
|
||||
$redeliveryStamp = $envelope->last(RedeliveryStamp::class);
|
||||
$this->assertSame('failure_sender', $redeliveryStamp->getSenderClassOrAlias());
|
||||
|
||||
$this->assertNull($envelope->last(ReceivedStamp::class));
|
||||
$this->assertNull($envelope->last(TransportMessageIdStamp::class));
|
||||
|
||||
return true;
|
||||
}))->willReturn(new Envelope(new \stdClass()));
|
||||
$listener = new SendFailedMessageToFailureTransportListener(
|
||||
$bus,
|
||||
'failure_sender'
|
||||
);
|
||||
|
||||
$exception = new \Exception('no!');
|
||||
$envelope = new Envelope(new \stdClass());
|
||||
$event = new WorkerMessageFailedEvent($envelope, 'my_receiver', $exception, false);
|
||||
|
||||
$listener->onMessageFailed($event);
|
||||
}
|
||||
|
||||
public function testItGetsNestedHandlerFailedException()
|
||||
{
|
||||
$bus = $this->createMock(MessageBusInterface::class);
|
||||
$bus->expects($this->once())->method('dispatch')->with($this->callback(function ($envelope) {
|
||||
/** @var Envelope $envelope */
|
||||
/** @var SentToFailureTransportStamp $sentToFailureTransportStamp */
|
||||
$sentToFailureTransportStamp = $envelope->last(SentToFailureTransportStamp::class);
|
||||
$this->assertNotNull($sentToFailureTransportStamp);
|
||||
$this->assertSame('I am inside!', $sentToFailureTransportStamp->getExceptionMessage());
|
||||
$this->assertSame('Exception', $sentToFailureTransportStamp->getFlattenException()->getClass());
|
||||
|
||||
return true;
|
||||
}))->willReturn(new Envelope(new \stdClass()));
|
||||
|
||||
$listener = new SendFailedMessageToFailureTransportListener(
|
||||
$bus,
|
||||
'failure_sender'
|
||||
);
|
||||
|
||||
$envelope = new Envelope(new \stdClass());
|
||||
$exception = new \Exception('I am inside!');
|
||||
$exception = new HandlerFailedException($envelope, [$exception]);
|
||||
$event = new WorkerMessageFailedEvent($envelope, 'my_receiver', $exception, false);
|
||||
|
||||
$listener->onMessageFailed($event);
|
||||
}
|
||||
|
||||
public function testDoNothingOnRetry()
|
||||
{
|
||||
$bus = $this->createMock(MessageBusInterface::class);
|
||||
$bus->expects($this->never())->method('dispatch');
|
||||
$listener = new SendFailedMessageToFailureTransportListener(
|
||||
$bus,
|
||||
'failure_sender'
|
||||
);
|
||||
|
||||
$envelope = new Envelope(new \stdClass());
|
||||
$event = new WorkerMessageFailedEvent($envelope, 'my_receiver', new \Exception(''), true);
|
||||
|
||||
$listener->onMessageFailed($event);
|
||||
}
|
||||
|
||||
public function testDoNotRedeliverToFailed()
|
||||
{
|
||||
$bus = $this->createMock(MessageBusInterface::class);
|
||||
$bus->expects($this->never())->method('dispatch');
|
||||
$listener = new SendFailedMessageToFailureTransportListener(
|
||||
$bus,
|
||||
'failure_sender'
|
||||
);
|
||||
|
||||
$envelope = new Envelope(new \stdClass(), [
|
||||
new SentStamp('MySender', 'failure_sender'),
|
||||
]);
|
||||
$event = new WorkerMessageFailedEvent($envelope, 'my_receiver', new \Exception(''), false);
|
||||
|
||||
$listener->onMessageFailed($event);
|
||||
}
|
||||
}
|
@ -29,5 +29,9 @@ class AddBusNameStampMiddlewareTest extends MiddlewareTestCase
|
||||
$busNameStamp = $finalEnvelope->last(BusNameStamp::class);
|
||||
$this->assertNotNull($busNameStamp);
|
||||
$this->assertSame('the_bus_name', $busNameStamp->getBusName());
|
||||
|
||||
// the stamp should not be added over and over again
|
||||
$finalEnvelope = $middleware->handle($finalEnvelope, $this->getStackMock());
|
||||
$this->assertCount(1, $finalEnvelope->all(BusNameStamp::class));
|
||||
}
|
||||
}
|
||||
|
@ -11,6 +11,7 @@
|
||||
|
||||
namespace Symfony\Component\Messenger\Tests\Middleware;
|
||||
|
||||
use Psr\Container\ContainerInterface;
|
||||
use Symfony\Component\Messenger\Envelope;
|
||||
use Symfony\Component\Messenger\Event\SendMessageToTransportsEvent;
|
||||
use Symfony\Component\Messenger\Middleware\SendMessageMiddleware;
|
||||
@ -34,15 +35,16 @@ class SendMessageMiddlewareTest extends MiddlewareTestCase
|
||||
$envelope = new Envelope($message);
|
||||
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
|
||||
|
||||
$middleware = new SendMessageMiddleware(new SendersLocator([DummyMessage::class => [$sender]]));
|
||||
$sendersLocator = $this->createSendersLocator([DummyMessage::class => ['my_sender']], ['my_sender' => $sender]);
|
||||
$middleware = new SendMessageMiddleware($sendersLocator);
|
||||
|
||||
$sender->expects($this->once())->method('send')->with($envelope->with(new SentStamp(\get_class($sender))))->will($this->returnArgument(0));
|
||||
$sender->expects($this->once())->method('send')->with($envelope->with(new SentStamp(\get_class($sender), 'my_sender')))->will($this->returnArgument(0));
|
||||
|
||||
$envelope = $middleware->handle($envelope, $this->getStackMock(false));
|
||||
|
||||
/* @var SentStamp $stamp */
|
||||
$this->assertInstanceOf(SentStamp::class, $stamp = $envelope->last(SentStamp::class), 'it adds a sent stamp');
|
||||
$this->assertNull($stamp->getSenderAlias());
|
||||
$this->assertSame('my_sender', $stamp->getSenderAlias());
|
||||
$this->assertStringMatchesFormat('Mock_SenderInterface_%s', $stamp->getSenderClass());
|
||||
}
|
||||
|
||||
@ -52,9 +54,8 @@ class SendMessageMiddlewareTest extends MiddlewareTestCase
|
||||
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
|
||||
$sender2 = $this->getMockBuilder(SenderInterface::class)->getMock();
|
||||
|
||||
$middleware = new SendMessageMiddleware(new SendersLocator([
|
||||
DummyMessage::class => ['foo' => $sender, 'bar' => $sender2],
|
||||
]));
|
||||
$sendersLocator = $this->createSendersLocator([DummyMessage::class => ['foo', 'bar']], ['foo' => $sender, 'bar' => $sender2]);
|
||||
$middleware = new SendMessageMiddleware($sendersLocator);
|
||||
|
||||
$sender->expects($this->once())
|
||||
->method('send')
|
||||
@ -92,12 +93,16 @@ class SendMessageMiddlewareTest extends MiddlewareTestCase
|
||||
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
|
||||
$sender2 = $this->getMockBuilder(SenderInterface::class)->getMock();
|
||||
|
||||
$middleware = new SendMessageMiddleware(new SendersLocator([
|
||||
DummyMessage::class => ['foo' => $sender, 'bar' => $sender2],
|
||||
], [
|
||||
// normally, this class sends and handles (but not on retry)
|
||||
DummyMessage::class => true,
|
||||
]));
|
||||
$sendersLocator = $this->createSendersLocator(
|
||||
[DummyMessage::class => ['foo', 'bar']],
|
||||
['foo' => $sender, 'bar' => $sender2],
|
||||
[
|
||||
// normally, this class sends and handles (but not on retry)
|
||||
DummyMessage::class => true,
|
||||
]
|
||||
);
|
||||
|
||||
$middleware = new SendMessageMiddleware($sendersLocator);
|
||||
|
||||
$sender->expects($this->never())
|
||||
->method('send')
|
||||
@ -116,9 +121,10 @@ class SendMessageMiddlewareTest extends MiddlewareTestCase
|
||||
$envelope = new Envelope(new ChildDummyMessage('Hey'));
|
||||
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
|
||||
|
||||
$middleware = new SendMessageMiddleware(new SendersLocator([DummyMessage::class => [$sender]]));
|
||||
$sendersLocator = $this->createSendersLocator([DummyMessage::class => ['foo_sender']], ['foo_sender' => $sender]);
|
||||
$middleware = new SendMessageMiddleware($sendersLocator);
|
||||
|
||||
$sender->expects($this->once())->method('send')->with($envelope->with(new SentStamp(\get_class($sender))))->willReturn($envelope);
|
||||
$sender->expects($this->once())->method('send')->with($envelope->with(new SentStamp(\get_class($sender), 'foo_sender')))->willReturn($envelope);
|
||||
|
||||
$middleware->handle($envelope, $this->getStackMock(false));
|
||||
}
|
||||
@ -129,11 +135,12 @@ class SendMessageMiddlewareTest extends MiddlewareTestCase
|
||||
$envelope = new Envelope($message);
|
||||
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
|
||||
|
||||
$middleware = new SendMessageMiddleware(new SendersLocator(['*' => [$sender]], [
|
||||
$sendersLocator = $this->createSendersLocator(['*' => ['foo_sender']], ['foo_sender' => $sender], [
|
||||
DummyMessage::class => true,
|
||||
]));
|
||||
]);
|
||||
$middleware = new SendMessageMiddleware($sendersLocator);
|
||||
|
||||
$sender->expects($this->once())->method('send')->with($envelope->with(new SentStamp(\get_class($sender))))->willReturn($envelope);
|
||||
$sender->expects($this->once())->method('send')->with($envelope->with(new SentStamp(\get_class($sender), 'foo_sender')))->willReturn($envelope);
|
||||
|
||||
$middleware->handle($envelope, $this->getStackMock());
|
||||
}
|
||||
@ -144,11 +151,12 @@ class SendMessageMiddlewareTest extends MiddlewareTestCase
|
||||
$envelope = new Envelope($message);
|
||||
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
|
||||
|
||||
$middleware = new SendMessageMiddleware(new SendersLocator(['*' => [$sender]], [
|
||||
$sendersLocator = $this->createSendersLocator(['*' => ['foo_sender']], ['foo_sender' => $sender], [
|
||||
DummyMessage::class => true,
|
||||
]));
|
||||
]);
|
||||
$middleware = new SendMessageMiddleware($sendersLocator);
|
||||
|
||||
$sender->expects($this->once())->method('send')->with($envelope->with(new SentStamp(\get_class($sender))))->willReturn($envelope);
|
||||
$sender->expects($this->once())->method('send')->with($envelope->with(new SentStamp(\get_class($sender), 'foo_sender')))->willReturn($envelope);
|
||||
|
||||
$middleware->handle($envelope, $this->getStackMock());
|
||||
}
|
||||
@ -159,11 +167,12 @@ class SendMessageMiddlewareTest extends MiddlewareTestCase
|
||||
$envelope = new Envelope($message);
|
||||
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
|
||||
|
||||
$middleware = new SendMessageMiddleware(new SendersLocator(['*' => [$sender]], [
|
||||
$sendersLocator = $this->createSendersLocator(['*' => ['foo_sender']], ['foo_sender' => $sender], [
|
||||
DummyMessageInterface::class => true,
|
||||
]));
|
||||
]);
|
||||
$middleware = new SendMessageMiddleware($sendersLocator);
|
||||
|
||||
$sender->expects($this->once())->method('send')->with($envelope->with(new SentStamp(\get_class($sender))))->willReturn($envelope);
|
||||
$sender->expects($this->once())->method('send')->with($envelope->with(new SentStamp(\get_class($sender), 'foo_sender')))->willReturn($envelope);
|
||||
|
||||
$middleware->handle($envelope, $this->getStackMock());
|
||||
}
|
||||
@ -174,11 +183,12 @@ class SendMessageMiddlewareTest extends MiddlewareTestCase
|
||||
$envelope = new Envelope($message);
|
||||
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
|
||||
|
||||
$middleware = new SendMessageMiddleware(new SendersLocator(['*' => [$sender]], [
|
||||
$sendersLocator = $this->createSendersLocator(['*' => ['foo_sender']], ['foo_sender' => $sender], [
|
||||
'*' => true,
|
||||
]));
|
||||
]);
|
||||
$middleware = new SendMessageMiddleware($sendersLocator);
|
||||
|
||||
$sender->expects($this->once())->method('send')->with($envelope->with(new SentStamp(\get_class($sender))))->willReturn($envelope);
|
||||
$sender->expects($this->once())->method('send')->with($envelope->with(new SentStamp(\get_class($sender), 'foo_sender')))->willReturn($envelope);
|
||||
|
||||
$middleware->handle($envelope, $this->getStackMock());
|
||||
}
|
||||
@ -188,7 +198,7 @@ class SendMessageMiddlewareTest extends MiddlewareTestCase
|
||||
$message = new DummyMessage('Hey');
|
||||
$envelope = new Envelope($message);
|
||||
|
||||
$middleware = new SendMessageMiddleware(new SendersLocator([]));
|
||||
$middleware = new SendMessageMiddleware($this->createSendersLocator([], []));
|
||||
|
||||
$middleware->handle($envelope, $this->getStackMock());
|
||||
}
|
||||
@ -199,7 +209,8 @@ class SendMessageMiddlewareTest extends MiddlewareTestCase
|
||||
|
||||
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
|
||||
|
||||
$middleware = new SendMessageMiddleware(new SendersLocator(['*' => [$sender]]));
|
||||
$sendersLocator = $this->createSendersLocator(['*' => ['foo']], ['foo' => $sender]);
|
||||
$middleware = new SendMessageMiddleware($sendersLocator);
|
||||
|
||||
$sender->expects($this->never())->method('send');
|
||||
|
||||
@ -220,7 +231,8 @@ class SendMessageMiddlewareTest extends MiddlewareTestCase
|
||||
$sender1 = $this->getMockBuilder(SenderInterface::class)->getMock();
|
||||
$sender2 = $this->getMockBuilder(SenderInterface::class)->getMock();
|
||||
|
||||
$middleware = new SendMessageMiddleware(new SendersLocator([DummyMessage::class => [$sender1, $sender2]]), $dispatcher);
|
||||
$sendersLocator = $this->createSendersLocator([DummyMessage::class => ['foo', 'bar']], ['foo' => $sender1, 'bar' => $sender2]);
|
||||
$middleware = new SendMessageMiddleware($sendersLocator, $dispatcher);
|
||||
|
||||
$sender1->expects($this->once())->method('send')->willReturn($envelope);
|
||||
$sender2->expects($this->once())->method('send')->willReturn($envelope);
|
||||
@ -235,7 +247,7 @@ class SendMessageMiddlewareTest extends MiddlewareTestCase
|
||||
$dispatcher = $this->createMock(EventDispatcherInterface::class);
|
||||
$dispatcher->expects($this->never())->method('dispatch');
|
||||
|
||||
$middleware = new SendMessageMiddleware(new SendersLocator([]), $dispatcher);
|
||||
$middleware = new SendMessageMiddleware($this->createSendersLocator([], []), $dispatcher);
|
||||
|
||||
$middleware->handle($envelope, $this->getStackMock());
|
||||
}
|
||||
@ -249,8 +261,11 @@ class SendMessageMiddlewareTest extends MiddlewareTestCase
|
||||
$dispatcher->expects($this->never())->method('dispatch');
|
||||
|
||||
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
|
||||
$sender2 = $this->getMockBuilder(SenderInterface::class)->getMock();
|
||||
$sender2->expects($this->once())->method('send')->willReturn(new Envelope(new \stdClass()));
|
||||
|
||||
$middleware = new SendMessageMiddleware(new SendersLocator([DummyMessage::class => [$sender]]), $dispatcher);
|
||||
$sendersLocator = $this->createSendersLocator([DummyMessage::class => ['foo']], ['foo' => $sender, 'foo_sender' => $sender2]);
|
||||
$middleware = new SendMessageMiddleware($sendersLocator, $dispatcher);
|
||||
|
||||
$middleware->handle($envelope, $this->getStackMock(false));
|
||||
}
|
||||
@ -263,9 +278,27 @@ class SendMessageMiddlewareTest extends MiddlewareTestCase
|
||||
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
|
||||
$sender->expects($this->once())->method('send')->willReturn($envelope);
|
||||
|
||||
$middleware = new SendMessageMiddleware(new SendersLocator([DummyMessage::class => [$sender]]));
|
||||
$sendersLocator = $this->createSendersLocator([DummyMessage::class => ['foo']], ['foo' => $sender]);
|
||||
$middleware = new SendMessageMiddleware($sendersLocator);
|
||||
|
||||
// next handler *should* be called
|
||||
$middleware->handle($envelope, $this->getStackMock(true));
|
||||
}
|
||||
|
||||
private function createSendersLocator(array $sendersMap, array $senders, array $sendAndHandle = [])
|
||||
{
|
||||
$container = $this->createMock(ContainerInterface::class);
|
||||
$container->expects($this->any())
|
||||
->method('has')
|
||||
->willReturnCallback(function ($id) use ($senders) {
|
||||
return isset($senders[$id]);
|
||||
});
|
||||
$container->expects($this->any())
|
||||
->method('get')
|
||||
->willReturnCallback(function ($id) use ($senders) {
|
||||
return $senders[$id];
|
||||
});
|
||||
|
||||
return new SendersLocator($sendersMap, $container, $sendAndHandle);
|
||||
}
|
||||
}
|
||||
|
@ -26,6 +26,16 @@ class MultiplierRetryStrategyTest extends TestCase
|
||||
$this->assertTrue($strategy->isRetryable($envelope));
|
||||
}
|
||||
|
||||
public function testIsRetryableWithNullMax()
|
||||
{
|
||||
$strategy = new MultiplierRetryStrategy(null);
|
||||
$envelope = new Envelope(new \stdClass(), [new RedeliveryStamp(0, 'sender_alias')]);
|
||||
$this->assertTrue($strategy->isRetryable($envelope));
|
||||
|
||||
$envelope = new Envelope(new \stdClass(), [new RedeliveryStamp(1, 'sender_alias')]);
|
||||
$this->assertTrue($strategy->isRetryable($envelope));
|
||||
}
|
||||
|
||||
public function testIsNotRetryable()
|
||||
{
|
||||
$strategy = new MultiplierRetryStrategy(3);
|
||||
@ -34,6 +44,13 @@ class MultiplierRetryStrategyTest extends TestCase
|
||||
$this->assertFalse($strategy->isRetryable($envelope));
|
||||
}
|
||||
|
||||
public function testIsNotRetryableWithZeroMax()
|
||||
{
|
||||
$strategy = new MultiplierRetryStrategy(0);
|
||||
$envelope = new Envelope(new \stdClass(), [new RedeliveryStamp(0, 'sender_alias')]);
|
||||
$this->assertFalse($strategy->isRetryable($envelope));
|
||||
}
|
||||
|
||||
public function testIsRetryableWithNoStamp()
|
||||
{
|
||||
$strategy = new MultiplierRetryStrategy(3);
|
||||
|
@ -12,6 +12,7 @@
|
||||
namespace Symfony\Component\Messenger\Tests;
|
||||
|
||||
use PHPUnit\Framework\TestCase;
|
||||
use Psr\Container\ContainerInterface;
|
||||
use Symfony\Component\Messenger\Envelope;
|
||||
use Symfony\Component\Messenger\Handler\HandlerDescriptor;
|
||||
use Symfony\Component\Messenger\Handler\HandlersLocator;
|
||||
@ -19,10 +20,10 @@ use Symfony\Component\Messenger\MessageBus;
|
||||
use Symfony\Component\Messenger\Middleware\HandleMessageMiddleware;
|
||||
use Symfony\Component\Messenger\Middleware\SendMessageMiddleware;
|
||||
use Symfony\Component\Messenger\Retry\MultiplierRetryStrategy;
|
||||
use Symfony\Component\Messenger\Stamp\SentStamp;
|
||||
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
||||
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessageHandlerFailingFirstTimes;
|
||||
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
|
||||
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
|
||||
use Symfony\Component\Messenger\Transport\Sender\SendersLocator;
|
||||
use Symfony\Component\Messenger\Worker;
|
||||
|
||||
@ -30,20 +31,15 @@ class RetryIntegrationTest extends TestCase
|
||||
{
|
||||
public function testRetryMechanism()
|
||||
{
|
||||
$apiMessage = new DummyMessage('API');
|
||||
$senderAndReceiver = new DummySenderAndReceiver();
|
||||
|
||||
$receiver = $this->createMock(ReceiverInterface::class);
|
||||
$receiver->method('get')
|
||||
->willReturn([
|
||||
new Envelope($apiMessage, [
|
||||
new SentStamp('Some\Sender', 'sender_alias'),
|
||||
]),
|
||||
]);
|
||||
$senderLocator = $this->createMock(ContainerInterface::class);
|
||||
$senderLocator->method('has')->with('sender_alias')->willReturn(true);
|
||||
$senderLocator->method('get')->with('sender_alias')->willReturn($senderAndReceiver);
|
||||
$senderLocator = new SendersLocator([DummyMessage::class => ['sender_alias']], $senderLocator);
|
||||
|
||||
$senderLocator = new SendersLocator([], ['*' => true]);
|
||||
|
||||
$handler = new DummyMessageHandlerFailingFirstTimes();
|
||||
$throwingHandler = new DummyMessageHandlerFailingFirstTimes(1);
|
||||
$handler = new DummyMessageHandlerFailingFirstTimes(0, 'A');
|
||||
$throwingHandler = new DummyMessageHandlerFailingFirstTimes(1, 'B');
|
||||
$handlerLocator = new HandlersLocator([
|
||||
DummyMessage::class => [
|
||||
new HandlerDescriptor($handler, ['alias' => 'first']),
|
||||
@ -51,14 +47,54 @@ class RetryIntegrationTest extends TestCase
|
||||
],
|
||||
]);
|
||||
|
||||
// dispatch the message, which will get "sent" and then received by DummySenderAndReceiver
|
||||
$bus = new MessageBus([new SendMessageMiddleware($senderLocator), new HandleMessageMiddleware($handlerLocator)]);
|
||||
$envelope = new Envelope(new DummyMessage('API'));
|
||||
$bus->dispatch($envelope);
|
||||
|
||||
$worker = new Worker(['receiverName' => $receiver], $bus, ['receiverName' => new MultiplierRetryStrategy()]);
|
||||
$worker->run([], function () use ($worker) {
|
||||
$worker->stop();
|
||||
$worker = new Worker(['receiverName' => $senderAndReceiver], $bus, ['receiverName' => new MultiplierRetryStrategy()]);
|
||||
$worker->run([], function (?Envelope $envelope) use ($worker) {
|
||||
if (null === $envelope) {
|
||||
$worker->stop();
|
||||
}
|
||||
});
|
||||
|
||||
$this->assertSame(1, $handler->getTimesCalledWithoutThrowing());
|
||||
$this->assertSame(1, $throwingHandler->getTimesCalledWithoutThrowing());
|
||||
}
|
||||
}
|
||||
|
||||
class DummySenderAndReceiver implements ReceiverInterface, SenderInterface
|
||||
{
|
||||
private $messagesWaiting = [];
|
||||
|
||||
private $messagesReceived = [];
|
||||
|
||||
public function get(): iterable
|
||||
{
|
||||
$message = array_shift($this->messagesWaiting);
|
||||
|
||||
if (null === $message) {
|
||||
return [];
|
||||
}
|
||||
|
||||
$this->messagesReceived[] = $message;
|
||||
|
||||
return [$message];
|
||||
}
|
||||
|
||||
public function ack(Envelope $envelope): void
|
||||
{
|
||||
}
|
||||
|
||||
public function reject(Envelope $envelope): void
|
||||
{
|
||||
}
|
||||
|
||||
public function send(Envelope $envelope): Envelope
|
||||
{
|
||||
$this->messagesWaiting[] = $envelope;
|
||||
|
||||
return $envelope;
|
||||
}
|
||||
}
|
||||
|
@ -16,22 +16,6 @@ use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
|
||||
|
||||
class RedeliveryStampTest extends TestCase
|
||||
{
|
||||
public function testShouldRedeliverToSenderWithAlias()
|
||||
{
|
||||
$stamp = new RedeliveryStamp(5, 'foo_alias');
|
||||
|
||||
$this->assertFalse($stamp->shouldRedeliverToSender('Foo\Bar\Sender', 'bar_alias'));
|
||||
$this->assertTrue($stamp->shouldRedeliverToSender('Foo\Bar\Sender', 'foo_alias'));
|
||||
}
|
||||
|
||||
public function testShouldRedeliverToSenderWithoutAlias()
|
||||
{
|
||||
$stampToRedeliverToSender1 = new RedeliveryStamp(5, 'App\Sender1');
|
||||
|
||||
$this->assertTrue($stampToRedeliverToSender1->shouldRedeliverToSender('App\Sender1', null));
|
||||
$this->assertFalse($stampToRedeliverToSender1->shouldRedeliverToSender('App\Sender2', null));
|
||||
}
|
||||
|
||||
public function testGetters()
|
||||
{
|
||||
$stamp = new RedeliveryStamp(10, 'sender_alias');
|
||||
|
@ -0,0 +1,33 @@
|
||||
<?php
|
||||
|
||||
/*
|
||||
* This file is part of the Symfony package.
|
||||
*
|
||||
* (c) Fabien Potencier <fabien@symfony.com>
|
||||
*
|
||||
* For the full copyright and license information, please view the LICENSE
|
||||
* file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
namespace Symfony\Component\Messenger\Tests\Stamp;
|
||||
|
||||
use PHPUnit\Framework\TestCase;
|
||||
use Symfony\Component\Debug\Exception\FlattenException;
|
||||
use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
|
||||
|
||||
class SentToFailureTransportStampTest extends TestCase
|
||||
{
|
||||
public function testGetters()
|
||||
{
|
||||
$flattenException = new FlattenException();
|
||||
$stamp = new SentToFailureTransportStamp(
|
||||
'exception message',
|
||||
'original_receiver',
|
||||
$flattenException
|
||||
);
|
||||
$this->assertSame('exception message', $stamp->getExceptionMessage());
|
||||
$this->assertSame('original_receiver', $stamp->getOriginalReceiverName());
|
||||
$this->assertSame($flattenException, $stamp->getFlattenException());
|
||||
$this->assertInstanceOf(\DateTime::class, $stamp->getSentAt());
|
||||
}
|
||||
}
|
@ -12,9 +12,12 @@
|
||||
namespace Symfony\Component\Messenger\Tests\Transport\Doctrine;
|
||||
|
||||
use PHPUnit\Framework\TestCase;
|
||||
use Symfony\Component\Messenger\Envelope;
|
||||
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
|
||||
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
|
||||
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
||||
use Symfony\Component\Messenger\Transport\Doctrine\Connection;
|
||||
use Symfony\Component\Messenger\Transport\Doctrine\DoctrineReceivedStamp;
|
||||
use Symfony\Component\Messenger\Transport\Doctrine\DoctrineReceiver;
|
||||
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
|
||||
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
|
||||
@ -28,14 +31,26 @@ class DoctrineReceiverTest extends TestCase
|
||||
{
|
||||
$serializer = $this->createSerializer();
|
||||
|
||||
$doctrineEnvelop = $this->createDoctrineEnvelope();
|
||||
$doctrineEnvelope = $this->createDoctrineEnvelope();
|
||||
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
|
||||
$connection->method('get')->willReturn($doctrineEnvelop);
|
||||
$connection->method('get')->willReturn($doctrineEnvelope);
|
||||
|
||||
$receiver = new DoctrineReceiver($connection, $serializer);
|
||||
$actualEnvelopes = iterator_to_array($receiver->get());
|
||||
$this->assertCount(1, $actualEnvelopes);
|
||||
/** @var Envelope $actualEnvelope */
|
||||
$actualEnvelope = $actualEnvelopes[0];
|
||||
$this->assertEquals(new DummyMessage('Hi'), $actualEnvelopes[0]->getMessage());
|
||||
|
||||
/** @var DoctrineReceivedStamp $doctrineReceivedStamp */
|
||||
$doctrineReceivedStamp = $actualEnvelope->last(DoctrineReceivedStamp::class);
|
||||
$this->assertNotNull($doctrineReceivedStamp);
|
||||
$this->assertSame('1', $doctrineReceivedStamp->getId());
|
||||
|
||||
/** @var TransportMessageIdStamp $transportMessageIdStamp */
|
||||
$transportMessageIdStamp = $actualEnvelope->last(TransportMessageIdStamp::class);
|
||||
$this->assertNotNull($transportMessageIdStamp);
|
||||
$this->assertSame(1, $transportMessageIdStamp->getId());
|
||||
}
|
||||
|
||||
/**
|
||||
@ -55,6 +70,34 @@ class DoctrineReceiverTest extends TestCase
|
||||
iterator_to_array($receiver->get());
|
||||
}
|
||||
|
||||
public function testAll()
|
||||
{
|
||||
$serializer = $this->createSerializer();
|
||||
|
||||
$doctrineEnvelope1 = $this->createDoctrineEnvelope();
|
||||
$doctrineEnvelope2 = $this->createDoctrineEnvelope();
|
||||
$connection = $this->createMock(Connection::class);
|
||||
$connection->method('findAll')->with(50)->willReturn([$doctrineEnvelope1, $doctrineEnvelope2]);
|
||||
|
||||
$receiver = new DoctrineReceiver($connection, $serializer);
|
||||
$actualEnvelopes = \iterator_to_array($receiver->all(50));
|
||||
$this->assertCount(2, $actualEnvelopes);
|
||||
$this->assertEquals(new DummyMessage('Hi'), $actualEnvelopes[0]->getMessage());
|
||||
}
|
||||
|
||||
public function testFind()
|
||||
{
|
||||
$serializer = $this->createSerializer();
|
||||
|
||||
$doctrineEnvelope = $this->createDoctrineEnvelope();
|
||||
$connection = $this->createMock(Connection::class);
|
||||
$connection->method('find')->with(10)->willReturn($doctrineEnvelope);
|
||||
|
||||
$receiver = new DoctrineReceiver($connection, $serializer);
|
||||
$actualEnvelope = $receiver->find(10);
|
||||
$this->assertEquals(new DummyMessage('Hi'), $actualEnvelope->getMessage());
|
||||
}
|
||||
|
||||
private function createDoctrineEnvelope()
|
||||
{
|
||||
return [
|
||||
|
@ -14,6 +14,7 @@ namespace Symfony\Component\Messenger\Tests\Transport\Doctrine;
|
||||
use PHPUnit\Framework\TestCase;
|
||||
use Symfony\Component\Messenger\Envelope;
|
||||
use Symfony\Component\Messenger\Stamp\DelayStamp;
|
||||
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
|
||||
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
||||
use Symfony\Component\Messenger\Transport\Doctrine\Connection;
|
||||
use Symfony\Component\Messenger\Transport\Doctrine\DoctrineSender;
|
||||
@ -29,13 +30,18 @@ class DoctrineSenderTest extends TestCase
|
||||
$connection = $this->getMockBuilder(Connection::class)
|
||||
->disableOriginalConstructor()
|
||||
->getMock();
|
||||
$connection->expects($this->once())->method('send')->with($encoded['body'], $encoded['headers']);
|
||||
$connection->expects($this->once())->method('send')->with($encoded['body'], $encoded['headers'])->willReturn(15);
|
||||
|
||||
$serializer = $this->getMockBuilder(SerializerInterface::class)->getMock();
|
||||
$serializer->method('encode')->with($envelope)->willReturnOnConsecutiveCalls($encoded);
|
||||
|
||||
$sender = new DoctrineSender($connection, $serializer);
|
||||
$sender->send($envelope);
|
||||
$actualEnvelope = $sender->send($envelope);
|
||||
|
||||
/** @var TransportMessageIdStamp $transportMessageIdStamp */
|
||||
$transportMessageIdStamp = $actualEnvelope->last(TransportMessageIdStamp::class);
|
||||
$this->assertNotNull($transportMessageIdStamp);
|
||||
$this->assertSame('15', $transportMessageIdStamp->getId());
|
||||
}
|
||||
|
||||
public function testSendWithDelay()
|
||||
|
@ -0,0 +1,50 @@
|
||||
<?php
|
||||
|
||||
/*
|
||||
* This file is part of the Symfony package.
|
||||
*
|
||||
* (c) Fabien Potencier <fabien@symfony.com>
|
||||
*
|
||||
* For the full copyright and license information, please view the LICENSE
|
||||
* file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
namespace Symfony\Component\Messenger\Tests\Transport\Sender;
|
||||
|
||||
use PHPUnit\Framework\TestCase;
|
||||
use Symfony\Component\Messenger\Envelope;
|
||||
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
||||
use Symfony\Component\Messenger\Tests\Fixtures\SecondMessage;
|
||||
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
|
||||
use Symfony\Component\Messenger\Transport\Receiver\SingleMessageReceiver;
|
||||
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
|
||||
use Symfony\Component\Messenger\Transport\Sender\SendersLocator;
|
||||
|
||||
class SingleMessageReceiverTest extends TestCase
|
||||
{
|
||||
public function testItReceivesOnlyOneMessage()
|
||||
{
|
||||
$innerReceiver = $this->createMock(ReceiverInterface::class);
|
||||
$envelope = new Envelope(new \stdClass());
|
||||
|
||||
$receiver = new SingleMessageReceiver($innerReceiver, $envelope);
|
||||
$received = \iterator_to_array($receiver->get());
|
||||
$this->assertCount(1, $received);
|
||||
$this->assertSame($received[0], $envelope);
|
||||
|
||||
$this->assertEmpty(\iterator_to_array($receiver->get()));
|
||||
}
|
||||
|
||||
public function testCallsAreForwarded()
|
||||
{
|
||||
$envelope = new Envelope(new \stdClass());
|
||||
|
||||
$innerReceiver = $this->createMock(ReceiverInterface::class);
|
||||
$innerReceiver->expects($this->once())->method('ack')->with($envelope);
|
||||
$innerReceiver->expects($this->once())->method('reject')->with($envelope);
|
||||
|
||||
$receiver = new SingleMessageReceiver($innerReceiver, $envelope);
|
||||
$receiver->ack($envelope);
|
||||
$receiver->reject($envelope);
|
||||
}
|
||||
}
|
@ -12,7 +12,9 @@
|
||||
namespace Symfony\Component\Messenger\Tests\Transport\Sender;
|
||||
|
||||
use PHPUnit\Framework\TestCase;
|
||||
use Psr\Container\ContainerInterface;
|
||||
use Symfony\Component\Messenger\Envelope;
|
||||
use Symfony\Component\Messenger\Exception\UnknownSenderException;
|
||||
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
||||
use Symfony\Component\Messenger\Tests\Fixtures\SecondMessage;
|
||||
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
|
||||
@ -21,23 +23,87 @@ use Symfony\Component\Messenger\Transport\Sender\SendersLocator;
|
||||
class SendersLocatorTest extends TestCase
|
||||
{
|
||||
public function testItReturnsTheSenderBasedOnTheMessageClass()
|
||||
{
|
||||
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
|
||||
$sendersLocator = $this->createContainer([
|
||||
'my_sender' => $sender,
|
||||
]);
|
||||
$locator = new SendersLocator([
|
||||
DummyMessage::class => ['my_sender'],
|
||||
], $sendersLocator);
|
||||
|
||||
$this->assertSame(['my_sender' => $sender], iterator_to_array($locator->getSenders(new Envelope(new DummyMessage('a')))));
|
||||
$this->assertSame([], iterator_to_array($locator->getSenders(new Envelope(new SecondMessage()))));
|
||||
}
|
||||
|
||||
public function testGetSenderByAlias()
|
||||
{
|
||||
$sender1 = $this->getMockBuilder(SenderInterface::class)->getMock();
|
||||
$sender2 = $this->getMockBuilder(SenderInterface::class)->getMock();
|
||||
$sendersLocator = $this->createContainer([
|
||||
'sender1' => $sender1,
|
||||
'sender2' => $sender2,
|
||||
]);
|
||||
|
||||
$locator = new SendersLocator([], $sendersLocator);
|
||||
|
||||
$this->assertSame($sender1, $locator->getSenderByAlias('sender1'));
|
||||
$this->assertSame($sender2, $locator->getSenderByAlias('sender2'));
|
||||
}
|
||||
|
||||
public function testGetSenderByAliasThrowsException()
|
||||
{
|
||||
$this->expectException(UnknownSenderException::class);
|
||||
$this->expectExceptionMessage('Unknown sender alias');
|
||||
|
||||
$sender1 = $this->getMockBuilder(SenderInterface::class)->getMock();
|
||||
$sendersLocator = $this->createContainer([
|
||||
'sender1' => $sender1,
|
||||
]);
|
||||
|
||||
$locator = new SendersLocator([], $sendersLocator);
|
||||
$locator->getSenderByAlias('sender2');
|
||||
}
|
||||
|
||||
/**
|
||||
* @group legacy
|
||||
*/
|
||||
public function testItReturnsTheSenderBasedOnTheMessageClassLegacy()
|
||||
{
|
||||
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
|
||||
$locator = new SendersLocator([
|
||||
DummyMessage::class => [$sender],
|
||||
]);
|
||||
|
||||
$this->assertSame([$sender], iterator_to_array($locator->getSenders(new Envelope(new DummyMessage('a')))));
|
||||
$this->assertSame([], iterator_to_array($locator->getSenders(new Envelope(new SecondMessage()))));
|
||||
}
|
||||
|
||||
public function testItYieldsProvidedSenderAliasAsKey()
|
||||
/**
|
||||
* @group legacy
|
||||
*/
|
||||
public function testItYieldsProvidedSenderAliasAsKeyLegacy()
|
||||
{
|
||||
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
|
||||
$locator = new SendersLocator([
|
||||
DummyMessage::class => ['dummy' => $sender],
|
||||
]);
|
||||
|
||||
$this->assertSame(['dummy' => $sender], iterator_to_array($locator->getSenders(new Envelope(new DummyMessage('a')))));
|
||||
}
|
||||
|
||||
private function createContainer(array $senders)
|
||||
{
|
||||
$container = $this->createMock(ContainerInterface::class);
|
||||
$container->expects($this->any())
|
||||
->method('has')
|
||||
->willReturnCallback(function($id) use ($senders) {
|
||||
return isset($senders[$id]);
|
||||
});
|
||||
$container->expects($this->any())
|
||||
->method('get')
|
||||
->willReturnCallback(function($id) use ($senders) {
|
||||
return $senders[$id];
|
||||
});
|
||||
|
||||
return $container;
|
||||
}
|
||||
}
|
||||
|
@ -97,7 +97,7 @@ class WorkerTest extends TestCase
|
||||
$this->assertNotNull($redeliveryStamp);
|
||||
// retry count now at 1
|
||||
$this->assertSame(1, $redeliveryStamp->getRetryCount());
|
||||
$this->assertTrue($redeliveryStamp->shouldRedeliverToSender('Some\Sender', 'sender_alias'));
|
||||
$this->assertSame('sender_alias', $redeliveryStamp->getSenderClassOrAlias());
|
||||
|
||||
// received stamp is removed
|
||||
$this->assertNull($envelope->last(ReceivedStamp::class));
|
||||
|
@ -110,7 +110,11 @@ class AmqpReceiver implements ReceiverInterface, MessageCountAwareInterface
|
||||
*/
|
||||
public function getMessageCount(): int
|
||||
{
|
||||
return $this->connection->countMessagesInQueues();
|
||||
try {
|
||||
return $this->connection->countMessagesInQueues();
|
||||
} catch (\AMQPException $exception) {
|
||||
throw new TransportException($exception->getMessage(), 0, $exception);
|
||||
}
|
||||
}
|
||||
|
||||
private function rejectAmqpEnvelope(\AMQPEnvelope $amqpEnvelope, string $queueName): void
|
||||
|
@ -102,9 +102,10 @@ class Connection
|
||||
/**
|
||||
* @param int $delay The delay in milliseconds
|
||||
*
|
||||
* @return string The inserted id
|
||||
* @throws \Doctrine\DBAL\DBALException
|
||||
*/
|
||||
public function send(string $body, array $headers, int $delay = 0): void
|
||||
public function send(string $body, array $headers, int $delay = 0): string
|
||||
{
|
||||
$now = new \DateTime();
|
||||
$availableAt = (clone $now)->modify(sprintf('+%d seconds', $delay / 1000));
|
||||
@ -126,6 +127,8 @@ class Connection
|
||||
':created_at' => self::formatDateTime($now),
|
||||
':available_at' => self::formatDateTime($availableAt),
|
||||
]);
|
||||
|
||||
return $this->driverConnection->lastInsertId();
|
||||
}
|
||||
|
||||
public function get(): ?array
|
||||
@ -205,14 +208,42 @@ class Connection
|
||||
return $this->executeQuery($queryBuilder->getSQL(), $queryBuilder->getParameters())->fetchColumn();
|
||||
}
|
||||
|
||||
public function findAll(int $limit = null): array
|
||||
{
|
||||
if ($this->configuration['auto_setup']) {
|
||||
$this->setup();
|
||||
}
|
||||
|
||||
$queryBuilder = $this->createAvailableMessagesQueryBuilder();
|
||||
if (null !== $limit) {
|
||||
$queryBuilder->setMaxResults($limit);
|
||||
}
|
||||
|
||||
return $this->executeQuery($queryBuilder->getSQL(), $queryBuilder->getParameters())->fetchAll();
|
||||
}
|
||||
|
||||
public function find($id): ?array
|
||||
{
|
||||
if ($this->configuration['auto_setup']) {
|
||||
$this->setup();
|
||||
}
|
||||
|
||||
$queryBuilder = $this->createQueryBuilder()
|
||||
->where('m.id = :id');
|
||||
|
||||
$data = $this->executeQuery($queryBuilder->getSQL(), [
|
||||
'id' => $id,
|
||||
])->fetch();
|
||||
|
||||
return false === $data ? null : $data;
|
||||
}
|
||||
|
||||
private function createAvailableMessagesQueryBuilder(): QueryBuilder
|
||||
{
|
||||
$now = new \DateTime();
|
||||
$redeliverLimit = (clone $now)->modify(sprintf('-%d seconds', $this->configuration['redeliver_timeout']));
|
||||
|
||||
return $this->driverConnection->createQueryBuilder()
|
||||
->select('m.*')
|
||||
->from($this->configuration['table_name'], 'm')
|
||||
return $this->createQueryBuilder()
|
||||
->where('m.delivered_at is null OR m.delivered_at < :redeliver_limit')
|
||||
->andWhere('m.available_at <= :now')
|
||||
->andWhere('m.queue_name = :queue_name')
|
||||
@ -223,6 +254,13 @@ class Connection
|
||||
]);
|
||||
}
|
||||
|
||||
private function createQueryBuilder(): QueryBuilder
|
||||
{
|
||||
return $this->driverConnection->createQueryBuilder()
|
||||
->select('m.*')
|
||||
->from($this->configuration['table_name'], 'm');
|
||||
}
|
||||
|
||||
private function executeQuery(string $sql, array $parameters = [])
|
||||
{
|
||||
$stmt = null;
|
||||
|
@ -16,6 +16,8 @@ use Symfony\Component\Messenger\Envelope;
|
||||
use Symfony\Component\Messenger\Exception\LogicException;
|
||||
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
|
||||
use Symfony\Component\Messenger\Exception\TransportException;
|
||||
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
|
||||
use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
|
||||
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
|
||||
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
|
||||
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
|
||||
@ -26,7 +28,7 @@ use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
|
||||
*
|
||||
* @experimental in 4.3
|
||||
*/
|
||||
class DoctrineReceiver implements ReceiverInterface, MessageCountAwareInterface
|
||||
class DoctrineReceiver implements ReceiverInterface, MessageCountAwareInterface, ListableReceiverInterface
|
||||
{
|
||||
private $connection;
|
||||
private $serializer;
|
||||
@ -52,18 +54,7 @@ class DoctrineReceiver implements ReceiverInterface, MessageCountAwareInterface
|
||||
return [];
|
||||
}
|
||||
|
||||
try {
|
||||
$envelope = $this->serializer->decode([
|
||||
'body' => $doctrineEnvelope['body'],
|
||||
'headers' => $doctrineEnvelope['headers'],
|
||||
]);
|
||||
} catch (MessageDecodingFailedException $exception) {
|
||||
$this->connection->reject($doctrineEnvelope['id']);
|
||||
|
||||
throw $exception;
|
||||
}
|
||||
|
||||
yield $envelope->with(new DoctrineReceivedStamp($doctrineEnvelope['id']));
|
||||
yield $this->createEnvelopeFromData($doctrineEnvelope);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -71,7 +62,11 @@ class DoctrineReceiver implements ReceiverInterface, MessageCountAwareInterface
|
||||
*/
|
||||
public function ack(Envelope $envelope): void
|
||||
{
|
||||
$this->connection->ack($this->findDoctrineReceivedStamp($envelope)->getId());
|
||||
try {
|
||||
$this->connection->ack($this->findDoctrineReceivedStamp($envelope)->getId());
|
||||
} catch (DBALException $exception) {
|
||||
throw new TransportException($exception->getMessage(), 0, $exception);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -79,7 +74,11 @@ class DoctrineReceiver implements ReceiverInterface, MessageCountAwareInterface
|
||||
*/
|
||||
public function reject(Envelope $envelope): void
|
||||
{
|
||||
$this->connection->reject($this->findDoctrineReceivedStamp($envelope)->getId());
|
||||
try {
|
||||
$this->connection->reject($this->findDoctrineReceivedStamp($envelope)->getId());
|
||||
} catch (DBALException $exception) {
|
||||
throw new TransportException($exception->getMessage(), 0, $exception);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -87,7 +86,45 @@ class DoctrineReceiver implements ReceiverInterface, MessageCountAwareInterface
|
||||
*/
|
||||
public function getMessageCount(): int
|
||||
{
|
||||
return $this->connection->getMessageCount();
|
||||
try {
|
||||
return $this->connection->getMessageCount();
|
||||
} catch (DBALException $exception) {
|
||||
throw new TransportException($exception->getMessage(), 0, $exception);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function all(int $limit = null): iterable
|
||||
{
|
||||
try {
|
||||
$doctrineEnvelopes = $this->connection->findAll($limit);
|
||||
} catch (DBALException $exception) {
|
||||
throw new TransportException($exception->getMessage(), 0, $exception);
|
||||
}
|
||||
|
||||
foreach ($doctrineEnvelopes as $doctrineEnvelope) {
|
||||
yield $this->createEnvelopeFromData($doctrineEnvelope);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function find($id): ?Envelope
|
||||
{
|
||||
try {
|
||||
$doctrineEnvelope = $this->connection->find($id);
|
||||
} catch (DBALException $exception) {
|
||||
throw new TransportException($exception->getMessage(), 0, $exception);
|
||||
}
|
||||
|
||||
if (null === $doctrineEnvelope) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return $this->createEnvelopeFromData($doctrineEnvelope);
|
||||
}
|
||||
|
||||
private function findDoctrineReceivedStamp(Envelope $envelope): DoctrineReceivedStamp
|
||||
@ -101,4 +138,23 @@ class DoctrineReceiver implements ReceiverInterface, MessageCountAwareInterface
|
||||
|
||||
return $doctrineReceivedStamp;
|
||||
}
|
||||
|
||||
private function createEnvelopeFromData(array $data): Envelope
|
||||
{
|
||||
try {
|
||||
$envelope = $this->serializer->decode([
|
||||
'body' => $data['body'],
|
||||
'headers' => $data['headers'],
|
||||
]);
|
||||
} catch (MessageDecodingFailedException $exception) {
|
||||
$this->connection->reject($data['id']);
|
||||
|
||||
throw $exception;
|
||||
}
|
||||
|
||||
return $envelope->with(
|
||||
new DoctrineReceivedStamp($data['id']),
|
||||
new TransportMessageIdStamp($data['id'])
|
||||
);
|
||||
}
|
||||
}
|
||||
|
@ -15,6 +15,7 @@ use Doctrine\DBAL\DBALException;
|
||||
use Symfony\Component\Messenger\Envelope;
|
||||
use Symfony\Component\Messenger\Exception\TransportException;
|
||||
use Symfony\Component\Messenger\Stamp\DelayStamp;
|
||||
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
|
||||
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
|
||||
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
|
||||
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
|
||||
@ -47,11 +48,11 @@ class DoctrineSender implements SenderInterface
|
||||
$delay = null !== $delayStamp ? $delayStamp->getDelay() : 0;
|
||||
|
||||
try {
|
||||
$this->connection->send($encodedMessage['body'], $encodedMessage['headers'] ?? [], $delay);
|
||||
$id = $this->connection->send($encodedMessage['body'], $encodedMessage['headers'] ?? [], $delay);
|
||||
} catch (DBALException $exception) {
|
||||
throw new TransportException($exception->getMessage(), 0, $exception);
|
||||
}
|
||||
|
||||
return $envelope;
|
||||
return $envelope->with(new TransportMessageIdStamp($id));
|
||||
}
|
||||
}
|
||||
|
@ -12,6 +12,8 @@
|
||||
namespace Symfony\Component\Messenger\Transport\Doctrine;
|
||||
|
||||
use Symfony\Component\Messenger\Envelope;
|
||||
use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
|
||||
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
|
||||
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
|
||||
use Symfony\Component\Messenger\Transport\SetupableTransportInterface;
|
||||
use Symfony\Component\Messenger\Transport\TransportInterface;
|
||||
@ -21,7 +23,7 @@ use Symfony\Component\Messenger\Transport\TransportInterface;
|
||||
*
|
||||
* @experimental in 4.3
|
||||
*/
|
||||
class DoctrineTransport implements TransportInterface, SetupableTransportInterface
|
||||
class DoctrineTransport implements TransportInterface, SetupableTransportInterface, MessageCountAwareInterface, ListableReceiverInterface
|
||||
{
|
||||
private $connection;
|
||||
private $serializer;
|
||||
@ -58,6 +60,31 @@ class DoctrineTransport implements TransportInterface, SetupableTransportInterfa
|
||||
($this->receiver ?? $this->getReceiver())->reject($envelope);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function getMessageCount(): int
|
||||
{
|
||||
return ($this->receiver ?? $this->getReceiver())->getMessageCount();
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function all(int $limit = null): iterable
|
||||
{
|
||||
return ($this->receiver ?? $this->getReceiver())->all($limit);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function find($id): ?Envelope
|
||||
{
|
||||
return ($this->receiver ?? $this->getReceiver())->find($id);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
|
@ -0,0 +1,33 @@
|
||||
<?php
|
||||
|
||||
namespace Symfony\Component\Messenger\Transport\Receiver;
|
||||
|
||||
use Symfony\Component\Messenger\Envelope;
|
||||
|
||||
/**
|
||||
* Used when a receiver has the ability to list messages and find specific messages.
|
||||
* A receiver that implements this should add the TransportMessageIdStamp
|
||||
* to the Envelopes that it returns.
|
||||
*
|
||||
* @author Ryan Weaver <ryan@symfonycasts.com>
|
||||
*
|
||||
* @experimental in 4.3
|
||||
*/
|
||||
interface ListableReceiverInterface extends ReceiverInterface
|
||||
{
|
||||
/**
|
||||
* Returns all the messages (up to the limit) in this receiver.
|
||||
*
|
||||
* Messages should be given the same stamps as when using ReceiverInterface::get().
|
||||
*
|
||||
* @return Envelope[]|iterable
|
||||
*/
|
||||
public function all(int $limit = null): iterable;
|
||||
|
||||
/**
|
||||
* Returns the Envelope by id or none.
|
||||
*
|
||||
* Message should be given the same stamps as when using ReceiverInterface::get().
|
||||
*/
|
||||
public function find($id): ?Envelope;
|
||||
}
|
@ -35,6 +35,8 @@ interface ReceiverInterface
|
||||
* call to get() takes a long time, blocking other receivers from
|
||||
* being called.
|
||||
*
|
||||
* If applicable, the Envelope should contain a TransportMessageIdStamp.
|
||||
*
|
||||
* If a received message cannot be decoded, the message should not
|
||||
* be retried again (e.g. if there's a queue, it should be removed)
|
||||
* and a MessageDecodingFailedException should be thrown.
|
||||
|
@ -0,0 +1,47 @@
|
||||
<?php
|
||||
|
||||
namespace Symfony\Component\Messenger\Transport\Receiver;
|
||||
|
||||
use Symfony\Component\Messenger\Envelope;
|
||||
|
||||
/**
|
||||
* Receiver that decorates another, but receives only 1 specific message.
|
||||
*
|
||||
* @author Ryan Weaver <ryan@symfonycasts.com>
|
||||
*
|
||||
* @internal
|
||||
* @experimental in 4.3
|
||||
*/
|
||||
class SingleMessageReceiver implements ReceiverInterface
|
||||
{
|
||||
private $receiver;
|
||||
private $envelope;
|
||||
private $hasReceived = false;
|
||||
|
||||
public function __construct(ReceiverInterface $receiver, Envelope $envelope)
|
||||
{
|
||||
$this->receiver = $receiver;
|
||||
$this->envelope = $envelope;
|
||||
}
|
||||
|
||||
public function get(): iterable
|
||||
{
|
||||
if ($this->hasReceived) {
|
||||
return [];
|
||||
}
|
||||
|
||||
$this->hasReceived = true;
|
||||
|
||||
yield $this->envelope;
|
||||
}
|
||||
|
||||
public function ack(Envelope $envelope): void
|
||||
{
|
||||
$this->receiver->ack($envelope);
|
||||
}
|
||||
|
||||
public function reject(Envelope $envelope): void
|
||||
{
|
||||
$this->receiver->reject($envelope);
|
||||
}
|
||||
}
|
@ -25,6 +25,8 @@ interface SenderInterface
|
||||
*
|
||||
* The sender can read different stamps for transport configuration,
|
||||
* like delivery delay.
|
||||
*
|
||||
* If applicable, the returned Envelope should contain a TransportMessageIdStamp.
|
||||
*/
|
||||
public function send(Envelope $envelope): Envelope;
|
||||
}
|
||||
|
@ -11,7 +11,11 @@
|
||||
|
||||
namespace Symfony\Component\Messenger\Transport\Sender;
|
||||
|
||||
use Psr\Container\ContainerInterface;
|
||||
use Symfony\Component\DependencyInjection\ServiceLocator;
|
||||
use Symfony\Component\Messenger\Envelope;
|
||||
use Symfony\Component\Messenger\Exception\RuntimeException;
|
||||
use Symfony\Component\Messenger\Exception\UnknownSenderException;
|
||||
use Symfony\Component\Messenger\Handler\HandlersLocator;
|
||||
|
||||
/**
|
||||
@ -23,17 +27,30 @@ use Symfony\Component\Messenger\Handler\HandlersLocator;
|
||||
*/
|
||||
class SendersLocator implements SendersLocatorInterface
|
||||
{
|
||||
private $senders;
|
||||
private $sendersMap;
|
||||
private $sendersLocator;
|
||||
private $useLegacyLookup = false;
|
||||
private $sendAndHandle;
|
||||
|
||||
/**
|
||||
* @param SenderInterface[][] $senders
|
||||
* @param bool[] $sendAndHandle
|
||||
* @param string[][] $sendersMap An array, keyed by "type", set to an array of sender aliases
|
||||
* @param ContainerInterface $sendersLocator Locator of senders, keyed by sender alias
|
||||
* @param bool[] $sendAndHandle
|
||||
*/
|
||||
public function __construct(array $senders, array $sendAndHandle = [])
|
||||
public function __construct(array $sendersMap, /*ContainerInterface*/ $sendersLocator = null, array $sendAndHandle = [])
|
||||
{
|
||||
$this->senders = $senders;
|
||||
$this->sendAndHandle = $sendAndHandle;
|
||||
$this->sendersMap = $sendersMap;
|
||||
|
||||
if (is_array($sendersLocator) || null === $sendersLocator) {
|
||||
@trigger_error(sprintf('"%s::__construct()" requires a "%s" as 2nd argument. Not doing so is deprecated since Symfony 4.3 and will be required in 5.0.', __CLASS__, ContainerInterface::class), E_USER_DEPRECATED);
|
||||
// "%s" requires a "%s" as 2nd argument. Not doing so is deprecated since Symfony 4.3 and will be required in 5.0.'
|
||||
$this->sendersLocator = new ServiceLocator([]);
|
||||
$this->sendAndHandle = $sendersLocator;
|
||||
$this->useLegacyLookup = true;
|
||||
} else {
|
||||
$this->sendersLocator = $sendersLocator;
|
||||
$this->sendAndHandle = $sendAndHandle;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -46,14 +63,43 @@ class SendersLocator implements SendersLocatorInterface
|
||||
$seen = [];
|
||||
|
||||
foreach (HandlersLocator::listTypes($envelope) as $type) {
|
||||
foreach ($this->senders[$type] ?? [] as $alias => $sender) {
|
||||
if (!\in_array($sender, $seen, true)) {
|
||||
yield $alias => $seen[] = $sender;
|
||||
// the old way of looking up senders
|
||||
if ($this->useLegacyLookup) {
|
||||
foreach ($this->sendersMap[$type] ?? [] as $alias => $sender) {
|
||||
if (!\in_array($sender, $seen, true)) {
|
||||
yield $alias => $seen[] = $sender;
|
||||
}
|
||||
}
|
||||
|
||||
$handle = $handle ?: $this->sendAndHandle[$type] ?? false;
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
foreach ($this->sendersMap[$type] ?? [] as $senderAlias) {
|
||||
if (!\in_array($senderAlias, $seen, true)) {
|
||||
if (!$this->sendersLocator->has($senderAlias)) {
|
||||
throw new RuntimeException(sprintf('Invalid senders configuration: sender "%s" is not in the senders locator.', $senderAlias));
|
||||
}
|
||||
|
||||
$seen[] = $senderAlias;
|
||||
$sender = $this->sendersLocator->get($senderAlias);
|
||||
yield $senderAlias => $sender;
|
||||
}
|
||||
}
|
||||
|
||||
$handle = $handle ?: $this->sendAndHandle[$type] ?? false;
|
||||
}
|
||||
|
||||
$handle = $handle || null === $sender;
|
||||
}
|
||||
|
||||
public function getSenderByAlias(string $alias): SenderInterface
|
||||
{
|
||||
if ($this->sendersLocator->has($alias)) {
|
||||
return $this->sendersLocator->get($alias);
|
||||
}
|
||||
|
||||
throw new UnknownSenderException(sprintf('Unknown sender alias "%s"', $alias));
|
||||
}
|
||||
}
|
||||
|
@ -12,6 +12,7 @@
|
||||
namespace Symfony\Component\Messenger\Transport\Sender;
|
||||
|
||||
use Symfony\Component\Messenger\Envelope;
|
||||
use Symfony\Component\Messenger\Exception\UnknownSenderException;
|
||||
|
||||
/**
|
||||
* Maps a message to a list of senders.
|
||||
@ -32,4 +33,13 @@ interface SendersLocatorInterface
|
||||
* @return iterable|SenderInterface[] Indexed by sender alias if available
|
||||
*/
|
||||
public function getSenders(Envelope $envelope, ?bool &$handle = false): iterable;
|
||||
|
||||
/**
|
||||
* Returns a specific sender by its alias.
|
||||
*
|
||||
* @param string $alias The alias given to the sender in getSenders()
|
||||
*
|
||||
* @throws UnknownSenderException If the sender is not found
|
||||
*/
|
||||
public function getSenderByAlias(string $alias): SenderInterface;
|
||||
}
|
||||
|
@ -47,7 +47,7 @@ class Worker implements WorkerInterface
|
||||
* @param ReceiverInterface[] $receivers Where the key is the transport name
|
||||
* @param RetryStrategyInterface[] $retryStrategies Retry strategies for each receiver (array keys must match)
|
||||
*/
|
||||
public function __construct(array $receivers, MessageBusInterface $bus, $retryStrategies = [], EventDispatcherInterface $eventDispatcher = null, LoggerInterface $logger = null)
|
||||
public function __construct(array $receivers, MessageBusInterface $bus, array $retryStrategies = [], EventDispatcherInterface $eventDispatcher = null, LoggerInterface $logger = null)
|
||||
{
|
||||
$this->receivers = $receivers;
|
||||
$this->bus = $bus;
|
||||
@ -114,9 +114,14 @@ class Worker implements WorkerInterface
|
||||
$this->dispatchEvent(new WorkerStoppedEvent());
|
||||
}
|
||||
|
||||
private function handleMessage(Envelope $envelope, ReceiverInterface $receiver, string $transportName, ?RetryStrategyInterface $retryStrategy)
|
||||
private function handleMessage(Envelope $envelope, ReceiverInterface $receiver, string $transportName, ?RetryStrategyInterface $retryStrategy): void
|
||||
{
|
||||
$this->dispatchEvent(new WorkerMessageReceivedEvent($envelope, $transportName));
|
||||
$event = new WorkerMessageReceivedEvent($envelope, $transportName);
|
||||
$this->dispatchEvent($event);
|
||||
|
||||
if (!$event->shouldHandle()) {
|
||||
return;
|
||||
}
|
||||
|
||||
$message = $envelope->getMessage();
|
||||
$context = [
|
||||
@ -148,7 +153,7 @@ class Worker implements WorkerInterface
|
||||
|
||||
// add the delay and retry stamp info + remove ReceivedStamp
|
||||
$retryEnvelope = $envelope->with(new DelayStamp($retryStrategy->getWaitingTime($envelope)))
|
||||
->with(new RedeliveryStamp($retryCount, $this->getSenderAlias($envelope)))
|
||||
->with(new RedeliveryStamp($retryCount, $this->getSenderClassOrAlias($envelope)))
|
||||
->withoutAll(ReceivedStamp::class);
|
||||
|
||||
// re-send the message
|
||||
@ -199,6 +204,15 @@ class Worker implements WorkerInterface
|
||||
return false;
|
||||
}
|
||||
|
||||
$sentStamp = $envelope->last(SentStamp::class);
|
||||
if (null === $sentStamp) {
|
||||
if (null !== $this->logger) {
|
||||
$this->logger->warning('Message will not be retried because the SentStamp is missing and so the target sender cannot be determined.');
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
return $retryStrategy->isRetryable($envelope);
|
||||
}
|
||||
|
||||
@ -210,11 +224,16 @@ class Worker implements WorkerInterface
|
||||
return $retryMessageStamp ? $retryMessageStamp->getRetryCount() : 0;
|
||||
}
|
||||
|
||||
private function getSenderAlias(Envelope $envelope): ?string
|
||||
private function getSenderClassOrAlias(Envelope $envelope): string
|
||||
{
|
||||
/** @var SentStamp|null $sentStamp */
|
||||
$sentStamp = $envelope->last(SentStamp::class);
|
||||
|
||||
return $sentStamp ? $sentStamp->getSenderAlias() : null;
|
||||
if (null === $sentStamp) {
|
||||
// should not happen, because of the check in shouldRetry()
|
||||
throw new LogicException('Could not find SentStamp.');
|
||||
}
|
||||
|
||||
return $sentStamp->getSenderAlias() ?: $sentStamp->getSenderClass();
|
||||
}
|
||||
}
|
||||
|
@ -23,6 +23,7 @@
|
||||
"doctrine/dbal": "^2.5",
|
||||
"psr/cache": "~1.0",
|
||||
"symfony/console": "~3.4|~4.0",
|
||||
"symfony/debug": "~4.1",
|
||||
"symfony/dependency-injection": "~3.4.19|^4.1.8",
|
||||
"symfony/doctrine-bridge": "~3.4|~4.0",
|
||||
"symfony/event-dispatcher": "~4.3",
|
||||
@ -35,7 +36,8 @@
|
||||
"symfony/var-dumper": "~3.4|~4.0"
|
||||
},
|
||||
"conflict": {
|
||||
"symfony/event-dispatcher": "<4.3"
|
||||
"symfony/event-dispatcher": "<4.3",
|
||||
"symfony/debug": "<4.1"
|
||||
},
|
||||
"suggest": {
|
||||
"enqueue/messenger-adapter": "For using the php-enqueue library as a transport."
|
||||
|
Reference in New Issue
Block a user