[Messenger] Add a --bus option to the messenger:consume-messages command

This commit is contained in:
Robin Chalas 2018-08-10 17:40:08 +02:00
parent 4edbd60bbb
commit 539cb62ffe
5 changed files with 57 additions and 19 deletions

View File

@ -70,10 +70,11 @@
</service>
<service id="console.command.messenger_consume_messages" class="Symfony\Component\Messenger\Command\ConsumeMessagesCommand">
<argument type="service" id="message_bus" />
<argument /> <!-- Message bus locator -->
<argument type="service" id="messenger.receiver_locator" />
<argument type="service" id="logger" on-invalid="null" />
<argument>null</argument> <!-- Default receiver name -->
<argument type="collection" /> <!-- Message buses names -->
<tag name="console.command" command="messenger:consume-messages" />
</service>

View File

@ -20,7 +20,6 @@ use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Style\SymfonyStyle;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Transport\Enhancers\StopWhenMemoryUsageIsExceededReceiver;
use Symfony\Component\Messenger\Transport\Enhancers\StopWhenMessageCountIsExceededReceiver;
use Symfony\Component\Messenger\Transport\Enhancers\StopWhenTimeLimitIsReachedReceiver;
@ -35,17 +34,19 @@ class ConsumeMessagesCommand extends Command
{
protected static $defaultName = 'messenger:consume-messages';
private $bus;
private $busLocator;
private $receiverLocator;
private $logger;
private $receiverNames;
private $busNames;
public function __construct(MessageBusInterface $bus, ContainerInterface $receiverLocator, LoggerInterface $logger = null, array $receiverNames = array())
public function __construct(ContainerInterface $busLocator, ContainerInterface $receiverLocator, LoggerInterface $logger = null, array $receiverNames = array(), array $busNames = array())
{
$this->bus = $bus;
$this->busLocator = $busLocator;
$this->receiverLocator = $receiverLocator;
$this->logger = $logger;
$this->receiverNames = $receiverNames;
$this->busNames = $busNames;
parent::__construct();
}
@ -63,6 +64,7 @@ class ConsumeMessagesCommand extends Command
new InputOption('limit', 'l', InputOption::VALUE_REQUIRED, 'Limit the number of received messages'),
new InputOption('memory-limit', 'm', InputOption::VALUE_REQUIRED, 'The memory limit the worker can consume'),
new InputOption('time-limit', 't', InputOption::VALUE_REQUIRED, 'The time limit in seconds the worker can run'),
new InputOption('bus', 'b', InputOption::VALUE_REQUIRED, 'Name of the bus to which received messages should be dispatched', 'message_bus'),
))
->setDescription('Consumes messages')
->setHelp(<<<'EOF'
@ -91,18 +93,34 @@ EOF
*/
protected function interact(InputInterface $input, OutputInterface $output)
{
if (!$this->receiverNames || $this->receiverLocator->has($receiverName = $input->getArgument('receiver'))) {
return;
$style = new SymfonyStyle($input, $output);
if ($this->receiverNames && !$this->receiverLocator->has($receiverName = $input->getArgument('receiver'))) {
if (null === $receiverName) {
$style->block('Missing receiver argument.', null, 'error', ' ', true);
$input->setArgument('receiver', $style->choice('Select one of the available receivers', $this->receiverNames));
} elseif ($alternatives = $this->findAlternatives($receiverName, $this->receiverNames)) {
$style->block(sprintf('Receiver "%s" is not defined.', $receiverName), null, 'error', ' ', true);
if ($style->confirm(sprintf('Do you want to receive from "%s" instead? ', $alternatives[0]), false)) {
$input->setArgument('receiver', $alternatives[0]);
}
}
}
$style = new SymfonyStyle($input, $output);
if (null === $receiverName) {
$style->block('Missing receiver argument.', null, 'error', ' ', true);
$input->setArgument('receiver', $style->choice('Select one of the available receivers', $this->receiverNames));
} elseif ($alternatives = $this->findAlternatives($receiverName, $this->receiverNames)) {
$style->block(sprintf('Receiver "%s" is not defined.', $receiverName), null, 'error', ' ', true);
if ($style->confirm(sprintf('Do you want to receive from "%s" instead? ', $alternatives[0]), false)) {
$input->setArgument('receiver', $alternatives[0]);
$busName = $input->getOption('bus');
if ($busName && $this->busNames && !$this->busLocator->has($busName) && $alternatives = $this->findAlternatives($busName, $this->busNames)) {
if ($alternatives = $this->findAlternatives($busName, $this->busNames)) {
$style->block(sprintf('Bus "%s" is not defined.', $busName), null, 'error', ' ', true);
if (1 === \count($alternatives)) {
if ($style->confirm(sprintf('Do you want to dispatch to "%s" instead? ', $alternatives[0]), true)) {
$input->setOption('bus', $alternatives[0]);
}
} else {
$input->setOption('bus', $style->choice('Did you mean one of the following buses instead?', $alternatives, $alternatives[0]));
}
}
}
}
@ -116,7 +134,12 @@ EOF
throw new RuntimeException(sprintf('Receiver "%s" does not exist.', $receiverName));
}
if (!$this->busLocator->has($busName = $input->getOption('bus'))) {
throw new RuntimeException(sprintf('Bus "%s" does not exist.', $receiverName));
}
$receiver = $this->receiverLocator->get($receiverName);
$bus = $this->busLocator->get($busName);
if ($limit = $input->getOption('limit')) {
$receiver = new StopWhenMessageCountIsExceededReceiver($receiver, $limit, $this->logger);
@ -130,7 +153,7 @@ EOF
$receiver = new StopWhenTimeLimitIsReachedReceiver($receiver, $timeLimit, $this->logger);
}
$worker = new Worker($receiver, $this->bus);
$worker = new Worker($receiver, $bus);
$worker->run();
}

View File

@ -184,6 +184,18 @@ class MessengerPass implements CompilerPassInterface
}
$container->getDefinition('console.command.messenger_debug')->replaceArgument(0, $debugCommandMapping);
}
if ($container->hasDefinition('console.command.messenger_consume_messages')) {
$buses = array();
foreach ($busIds as $busId) {
$buses[$busId] = new Reference($busId);
}
$container
->getDefinition('console.command.messenger_consume_messages')
->replaceArgument(0, ServiceLocatorTagPass::register($container, $buses))
->replaceArgument(4, $busIds);
}
}
private function guessHandledClasses(\ReflectionClass $handlerClass, string $serviceId): iterable

View File

@ -20,7 +20,7 @@ class ConsumeMessagesCommandTest extends TestCase
{
public function testConfigurationWithDefaultReceiver()
{
$command = new ConsumeMessagesCommand($this->createMock(MessageBus::class), $this->createMock(ServiceLocator::class), null, array('amqp'));
$command = new ConsumeMessagesCommand($this->createMock(ServiceLocator::class), $this->createMock(ServiceLocator::class), null, array('amqp'));
$inputArgument = $command->getDefinition()->getArgument('receiver');
$this->assertFalse($inputArgument->isRequired());
$this->assertSame('amqp', $inputArgument->getDefault());
@ -28,7 +28,7 @@ class ConsumeMessagesCommandTest extends TestCase
public function testConfigurationWithoutDefaultReceiver()
{
$command = new ConsumeMessagesCommand($this->createMock(MessageBus::class), $this->createMock(ServiceLocator::class), null, array('amqp', 'dummy'));
$command = new ConsumeMessagesCommand($this->createMock(ServiceLocator::class), $this->createMock(ServiceLocator::class), null, array('amqp', 'dummy'));
$inputArgument = $command->getDefinition()->getArgument('receiver');
$this->assertTrue($inputArgument->isRequired());
$this->assertNull($inputArgument->getDefault());

View File

@ -241,10 +241,11 @@ class MessengerPassTest extends TestCase
{
$container = $this->getContainerBuilder();
$container->register('console.command.messenger_consume_messages', ConsumeMessagesCommand::class)->setArguments(array(
new Reference('message_bus'),
null,
new Reference('messenger.receiver_locator'),
null,
null,
null
));
$container->register(AmqpReceiver::class, AmqpReceiver::class)->addTag('messenger.receiver', array('alias' => 'amqp'));
@ -253,6 +254,7 @@ class MessengerPassTest extends TestCase
(new MessengerPass())->process($container);
$this->assertSame(array('amqp', 'dummy'), $container->getDefinition('console.command.messenger_consume_messages')->getArgument(3));
$this->assertSame(array('message_bus'), $container->getDefinition('console.command.messenger_consume_messages')->getArgument(4));
}
public function testItRegistersSenders()