[Messenger] Add a command to setup transports

This commit is contained in:
Vincent Touzet 2018-11-21 22:50:37 +01:00
parent 76260e7d44
commit fbb534a838
10 changed files with 242 additions and 5 deletions

View File

@ -19,6 +19,7 @@ CHANGELOG
* Added support for Translator paths, Twig paths in translation commands.
* Added support for PHP files with translations in translation commands.
* Added support for boolean container parameters within routes.
* Added the `messenger:setup-transports` command to setup messenger transports
4.2.0
-----

View File

@ -279,6 +279,7 @@ class FrameworkExtension extends Extension
} else {
$container->removeDefinition('console.command.messenger_consume_messages');
$container->removeDefinition('console.command.messenger_debug');
$container->removeDefinition('console.command.messenger_setup_transports');
}
$this->registerValidationConfiguration($config['validation'], $container, $loader);

View File

@ -89,6 +89,13 @@
<tag name="monolog.logger" channel="messenger" />
</service>
<service id="console.command.messenger_setup_transports" class="Symfony\Component\Messenger\Command\SetupTransportsCommand">
<argument type="service" id="messenger.receiver_locator" />
<argument type="collection" /> <!-- Receiver names -->
<tag name="console.command" command="messenger:setup-transports" />
</service>
<service id="console.command.messenger_debug" class="Symfony\Component\Messenger\Command\DebugCommand">
<argument type="collection" /> <!-- Message to handlers mapping -->
<tag name="console.command" command="debug:messenger" />

View File

@ -51,6 +51,8 @@ CHANGELOG
and queues by default. Previously, this was done when in "debug" mode
only. Pass the `auto_setup` connection option to control this.
* Added a `SetupTransportsCommand` command to setup the transports
4.2.0
-----

View File

@ -0,0 +1,80 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Command;
use Psr\Container\ContainerInterface;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Style\SymfonyStyle;
use Symfony\Component\Messenger\Transport\SetupableTransportInterface;
/**
* @author Vincent Touzet <vincent.touzet@gmail.com>
*/
class SetupTransportsCommand extends Command
{
protected static $defaultName = 'messenger:setup-transports';
private $transportLocator;
private $transportNames;
public function __construct(ContainerInterface $transportLocator, array $transportNames = [])
{
$this->transportLocator = $transportLocator;
$this->transportNames = $transportNames;
parent::__construct();
}
protected function configure()
{
$this
->addArgument('transport', InputArgument::OPTIONAL, 'Name of the transport to setup', null)
->setHelp(<<<EOF
The <info>%command.name%</info> command setups the transports:
<info>php %command.full_name%</info>
Or a specific transport only:
<info>php %command.full_name% <transport></info>
EOF
)
;
}
protected function execute(InputInterface $input, OutputInterface $output)
{
$io = new SymfonyStyle($input, $output);
$transportNames = $this->transportNames;
// do we want to setup only one transport?
if ($transport = $input->getArgument('transport')) {
if (!$this->transportLocator->has($transport)) {
throw new \RuntimeException(sprintf('The "%s" transport does not exist.', $transport));
}
$transportNames = [$transport];
}
foreach ($transportNames as $id => $transportName) {
$transport = $this->transportLocator->get($transportName);
if ($transport instanceof SetupableTransportInterface) {
$transport->setup();
$io->success(sprintf('The "%s" transport was setup successfully.', $transportName));
} else {
$io->note(sprintf('The "%s" transport does not support setup.', $transportName));
}
}
}
}

View File

@ -236,11 +236,11 @@ class MessengerPass implements CompilerPassInterface
}
}
$receiverNames = [];
foreach ($receiverMapping as $name => $reference) {
$receiverNames[(string) $reference] = $name;
}
if ($container->hasDefinition('console.command.messenger_consume_messages')) {
$receiverNames = [];
foreach ($receiverMapping as $name => $reference) {
$receiverNames[(string) $reference] = $name;
}
$buses = [];
foreach ($busIds as $busId) {
$buses[$busId] = new Reference($busId);
@ -251,6 +251,11 @@ class MessengerPass implements CompilerPassInterface
->replaceArgument(3, array_values($receiverNames));
}
if ($container->hasDefinition('console.command.messenger_setup_transports')) {
$container->getDefinition('console.command.messenger_setup_transports')
->replaceArgument(1, array_values($receiverNames));
}
$container->getDefinition('messenger.receiver_locator')->replaceArgument(0, $receiverMapping);
}

View File

@ -0,0 +1,92 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Tests\Command;
use PHPUnit\Framework\MockObject\MockObject;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Console\Tester\CommandTester;
use Symfony\Component\DependencyInjection\ServiceLocator;
use Symfony\Component\Messenger\Command\SetupTransportsCommand;
use Symfony\Component\Messenger\Transport\SetupableTransportInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;
class SetupTransportsCommandTest extends TestCase
{
public function testReceiverNames()
{
// mock a service locator
/** @var MockObject|ServiceLocator $serviceLocator */
$serviceLocator = $this->createMock(ServiceLocator::class);
// get method must be call twice and will return consecutively a setup-able transport and a non setup-able transport
$serviceLocator->expects($this->exactly(2))
->method('get')
->will($this->onConsecutiveCalls(
$this->createMock(SetupableTransportInterface::class),
$this->createMock(TransportInterface::class)
));
$serviceLocator
->method('has')
->willReturn(true);
$command = new SetupTransportsCommand($serviceLocator, ['amqp', 'other_transport']);
$tester = new CommandTester($command);
$tester->execute([]);
$display = $tester->getDisplay();
$this->assertContains('The "amqp" transport was setup successfully.', $display);
$this->assertContains('The "other_transport" transport does not support setup.', $display);
}
public function testReceiverNameArgument()
{
// mock a service locator
/** @var MockObject|ServiceLocator $serviceLocator */
$serviceLocator = $this->createMock(ServiceLocator::class);
// get method must be call twice and will return consecutively a setup-able transport and a non setup-able transport
$serviceLocator->expects($this->exactly(1))
->method('get')
->will($this->onConsecutiveCalls(
$this->createMock(SetupableTransportInterface::class)
));
$serviceLocator->expects($this->exactly(1))
->method('has')
->willReturn(true);
$command = new SetupTransportsCommand($serviceLocator, ['amqp', 'other_transport']);
$tester = new CommandTester($command);
$tester->execute(['transport' => 'amqp']);
$display = $tester->getDisplay();
$this->assertContains('The "amqp" transport was setup successfully.', $display);
}
/**
* @expectedException \RuntimeException
* @expectedExceptionMessage The "not_found" transport does not exist.
*/
public function testReceiverNameArgumentNotFound()
{
// mock a service locator
/** @var MockObject|ServiceLocator $serviceLocator */
$serviceLocator = $this->createMock(ServiceLocator::class);
// get method must be call twice and will return consecutively a setup-able transport and a non setup-able transport
$serviceLocator->expects($this->exactly(0))
->method('get');
$serviceLocator->expects($this->exactly(1))
->method('has')
->willReturn(false);
$command = new SetupTransportsCommand($serviceLocator, ['amqp', 'other_transport']);
$tester = new CommandTester($command);
$tester->execute(['transport' => 'not_found']);
}
}

View File

@ -20,6 +20,7 @@ use Symfony\Component\DependencyInjection\Reference;
use Symfony\Component\DependencyInjection\ServiceLocator;
use Symfony\Component\Messenger\Command\ConsumeMessagesCommand;
use Symfony\Component\Messenger\Command\DebugCommand;
use Symfony\Component\Messenger\Command\SetupTransportsCommand;
use Symfony\Component\Messenger\DataCollector\MessengerDataCollector;
use Symfony\Component\Messenger\DependencyInjection\MessengerPass;
use Symfony\Component\Messenger\Envelope;
@ -265,6 +266,22 @@ class MessengerPassTest extends TestCase
$this->assertSame(['amqp', 'dummy'], $container->getDefinition('console.command.messenger_consume_messages')->getArgument(3));
}
public function testItSetsTheReceiverNamesOnTheSetupTransportsCommand()
{
$container = $this->getContainerBuilder();
$container->register('console.command.messenger_setup_transports', SetupTransportsCommand::class)->setArguments([
new Reference('messenger.receiver_locator'),
null,
]);
$container->register(AmqpReceiver::class, AmqpReceiver::class)->addTag('messenger.receiver', ['alias' => 'amqp']);
$container->register(DummyReceiver::class, DummyReceiver::class)->addTag('messenger.receiver', ['alias' => 'dummy']);
(new MessengerPass())->process($container);
$this->assertSame(['amqp', 'dummy'], $container->getDefinition('console.command.messenger_setup_transports')->getArgument(1));
}
public function testItShouldNotThrowIfGeneratorIsReturnedInsteadOfArray()
{
$container = $this->getContainerBuilder($busId = 'message_bus');

View File

@ -14,6 +14,7 @@ namespace Symfony\Component\Messenger\Transport\AmqpExt;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\SetupableTransportInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;
/**
@ -21,7 +22,7 @@ use Symfony\Component\Messenger\Transport\TransportInterface;
*
* @experimental in 4.2
*/
class AmqpTransport implements TransportInterface
class AmqpTransport implements TransportInterface, SetupableTransportInterface
{
private $serializer;
private $connection;
@ -74,6 +75,14 @@ class AmqpTransport implements TransportInterface
return ($this->sender ?? $this->getSender())->send($envelope);
}
/**
* {@inheritdoc}
*/
public function setup(): void
{
$this->connection->setup();
}
private function getReceiver()
{
return $this->receiver = new AmqpReceiver($this->connection, $this->serializer);

View File

@ -0,0 +1,23 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Transport;
/**
* @author Vincent Touzet <vincent.touzet@gmail.com>
*/
interface SetupableTransportInterface
{
/**
* Setup the transport.
*/
public function setup(): void;
}