diff --git a/src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml b/src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml index 16732e2af4..5b02ad1939 100644 --- a/src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml +++ b/src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml @@ -70,10 +70,11 @@ - + null + diff --git a/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php b/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php index 06b48a50c5..5c8df18146 100644 --- a/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php +++ b/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php @@ -20,7 +20,6 @@ use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Input\InputOption; use Symfony\Component\Console\Output\OutputInterface; use Symfony\Component\Console\Style\SymfonyStyle; -use Symfony\Component\Messenger\MessageBusInterface; use Symfony\Component\Messenger\Transport\Enhancers\StopWhenMemoryUsageIsExceededReceiver; use Symfony\Component\Messenger\Transport\Enhancers\StopWhenMessageCountIsExceededReceiver; use Symfony\Component\Messenger\Transport\Enhancers\StopWhenTimeLimitIsReachedReceiver; @@ -35,17 +34,19 @@ class ConsumeMessagesCommand extends Command { protected static $defaultName = 'messenger:consume-messages'; - private $bus; + private $busLocator; private $receiverLocator; private $logger; private $receiverNames; + private $busNames; - public function __construct(MessageBusInterface $bus, ContainerInterface $receiverLocator, LoggerInterface $logger = null, array $receiverNames = array()) + public function __construct(ContainerInterface $busLocator, ContainerInterface $receiverLocator, LoggerInterface $logger = null, array $receiverNames = array(), array $busNames = array()) { - $this->bus = $bus; + $this->busLocator = $busLocator; $this->receiverLocator = $receiverLocator; $this->logger = $logger; $this->receiverNames = $receiverNames; + $this->busNames = $busNames; parent::__construct(); } @@ -63,6 +64,7 @@ class ConsumeMessagesCommand extends Command new InputOption('limit', 'l', InputOption::VALUE_REQUIRED, 'Limit the number of received messages'), new InputOption('memory-limit', 'm', InputOption::VALUE_REQUIRED, 'The memory limit the worker can consume'), new InputOption('time-limit', 't', InputOption::VALUE_REQUIRED, 'The time limit in seconds the worker can run'), + new InputOption('bus', 'b', InputOption::VALUE_REQUIRED, 'Name of the bus to which received messages should be dispatched', 'message_bus'), )) ->setDescription('Consumes messages') ->setHelp(<<<'EOF' @@ -91,18 +93,34 @@ EOF */ protected function interact(InputInterface $input, OutputInterface $output) { - if (!$this->receiverNames || $this->receiverLocator->has($receiverName = $input->getArgument('receiver'))) { - return; + $style = new SymfonyStyle($input, $output); + + if ($this->receiverNames && !$this->receiverLocator->has($receiverName = $input->getArgument('receiver'))) { + if (null === $receiverName) { + $style->block('Missing receiver argument.', null, 'error', ' ', true); + $input->setArgument('receiver', $style->choice('Select one of the available receivers', $this->receiverNames)); + } elseif ($alternatives = $this->findAlternatives($receiverName, $this->receiverNames)) { + $style->block(sprintf('Receiver "%s" is not defined.', $receiverName), null, 'error', ' ', true); + if ($style->confirm(sprintf('Do you want to receive from "%s" instead? ', $alternatives[0]), false)) { + $input->setArgument('receiver', $alternatives[0]); + } + } } - $style = new SymfonyStyle($input, $output); - if (null === $receiverName) { - $style->block('Missing receiver argument.', null, 'error', ' ', true); - $input->setArgument('receiver', $style->choice('Select one of the available receivers', $this->receiverNames)); - } elseif ($alternatives = $this->findAlternatives($receiverName, $this->receiverNames)) { - $style->block(sprintf('Receiver "%s" is not defined.', $receiverName), null, 'error', ' ', true); - if ($style->confirm(sprintf('Do you want to receive from "%s" instead? ', $alternatives[0]), false)) { - $input->setArgument('receiver', $alternatives[0]); + + $busName = $input->getOption('bus'); + + if ($busName && $this->busNames && !$this->busLocator->has($busName) && $alternatives = $this->findAlternatives($busName, $this->busNames)) { + if ($alternatives = $this->findAlternatives($busName, $this->busNames)) { + $style->block(sprintf('Bus "%s" is not defined.', $busName), null, 'error', ' ', true); + + if (1 === \count($alternatives)) { + if ($style->confirm(sprintf('Do you want to dispatch to "%s" instead? ', $alternatives[0]), true)) { + $input->setOption('bus', $alternatives[0]); + } + } else { + $input->setOption('bus', $style->choice('Did you mean one of the following buses instead?', $alternatives, $alternatives[0])); + } } } } @@ -116,7 +134,12 @@ EOF throw new RuntimeException(sprintf('Receiver "%s" does not exist.', $receiverName)); } + if (!$this->busLocator->has($busName = $input->getOption('bus'))) { + throw new RuntimeException(sprintf('Bus "%s" does not exist.', $receiverName)); + } + $receiver = $this->receiverLocator->get($receiverName); + $bus = $this->busLocator->get($busName); if ($limit = $input->getOption('limit')) { $receiver = new StopWhenMessageCountIsExceededReceiver($receiver, $limit, $this->logger); @@ -130,7 +153,7 @@ EOF $receiver = new StopWhenTimeLimitIsReachedReceiver($receiver, $timeLimit, $this->logger); } - $worker = new Worker($receiver, $this->bus); + $worker = new Worker($receiver, $bus); $worker->run(); } diff --git a/src/Symfony/Component/Messenger/DependencyInjection/MessengerPass.php b/src/Symfony/Component/Messenger/DependencyInjection/MessengerPass.php index 2de3289a51..4c59b95279 100644 --- a/src/Symfony/Component/Messenger/DependencyInjection/MessengerPass.php +++ b/src/Symfony/Component/Messenger/DependencyInjection/MessengerPass.php @@ -184,6 +184,18 @@ class MessengerPass implements CompilerPassInterface } $container->getDefinition('console.command.messenger_debug')->replaceArgument(0, $debugCommandMapping); } + + if ($container->hasDefinition('console.command.messenger_consume_messages')) { + $buses = array(); + foreach ($busIds as $busId) { + $buses[$busId] = new Reference($busId); + } + $container + ->getDefinition('console.command.messenger_consume_messages') + ->replaceArgument(0, ServiceLocatorTagPass::register($container, $buses)) + ->replaceArgument(4, $busIds); + } + } private function guessHandledClasses(\ReflectionClass $handlerClass, string $serviceId): iterable diff --git a/src/Symfony/Component/Messenger/Tests/Command/ConsumeMessagesCommandTest.php b/src/Symfony/Component/Messenger/Tests/Command/ConsumeMessagesCommandTest.php index de489b545f..e327893c6f 100644 --- a/src/Symfony/Component/Messenger/Tests/Command/ConsumeMessagesCommandTest.php +++ b/src/Symfony/Component/Messenger/Tests/Command/ConsumeMessagesCommandTest.php @@ -20,7 +20,7 @@ class ConsumeMessagesCommandTest extends TestCase { public function testConfigurationWithDefaultReceiver() { - $command = new ConsumeMessagesCommand($this->createMock(MessageBus::class), $this->createMock(ServiceLocator::class), null, array('amqp')); + $command = new ConsumeMessagesCommand($this->createMock(ServiceLocator::class), $this->createMock(ServiceLocator::class), null, array('amqp')); $inputArgument = $command->getDefinition()->getArgument('receiver'); $this->assertFalse($inputArgument->isRequired()); $this->assertSame('amqp', $inputArgument->getDefault()); @@ -28,7 +28,7 @@ class ConsumeMessagesCommandTest extends TestCase public function testConfigurationWithoutDefaultReceiver() { - $command = new ConsumeMessagesCommand($this->createMock(MessageBus::class), $this->createMock(ServiceLocator::class), null, array('amqp', 'dummy')); + $command = new ConsumeMessagesCommand($this->createMock(ServiceLocator::class), $this->createMock(ServiceLocator::class), null, array('amqp', 'dummy')); $inputArgument = $command->getDefinition()->getArgument('receiver'); $this->assertTrue($inputArgument->isRequired()); $this->assertNull($inputArgument->getDefault()); diff --git a/src/Symfony/Component/Messenger/Tests/DependencyInjection/MessengerPassTest.php b/src/Symfony/Component/Messenger/Tests/DependencyInjection/MessengerPassTest.php index f3fc76baa3..73481b55af 100644 --- a/src/Symfony/Component/Messenger/Tests/DependencyInjection/MessengerPassTest.php +++ b/src/Symfony/Component/Messenger/Tests/DependencyInjection/MessengerPassTest.php @@ -241,10 +241,11 @@ class MessengerPassTest extends TestCase { $container = $this->getContainerBuilder(); $container->register('console.command.messenger_consume_messages', ConsumeMessagesCommand::class)->setArguments(array( - new Reference('message_bus'), + null, new Reference('messenger.receiver_locator'), null, null, + null )); $container->register(AmqpReceiver::class, AmqpReceiver::class)->addTag('messenger.receiver', array('alias' => 'amqp')); @@ -253,6 +254,7 @@ class MessengerPassTest extends TestCase (new MessengerPass())->process($container); $this->assertSame(array('amqp', 'dummy'), $container->getDefinition('console.command.messenger_consume_messages')->getArgument(3)); + $this->assertSame(array('message_bus'), $container->getDefinition('console.command.messenger_consume_messages')->getArgument(4)); } public function testItRegistersSenders()