[Messenger] Inject RoutableMessageBus instead of bus locator

This commit is contained in:
Robin Chalas 2019-05-30 20:39:48 +02:00
parent 0da213711c
commit 91817e4de0
5 changed files with 159 additions and 17 deletions

View File

@ -82,7 +82,7 @@
</service>
<service id="console.command.messenger_consume_messages" class="Symfony\Component\Messenger\Command\ConsumeMessagesCommand">
<argument /> <!-- Message bus locator -->
<argument type="service" id="messenger.routable_message_bus" />
<argument type="service" id="messenger.receiver_locator" />
<argument type="service" id="logger" on-invalid="null" />
<argument type="collection" /> <!-- Receiver names -->

View File

@ -40,7 +40,7 @@ class ConsumeMessagesCommand extends Command
{
protected static $defaultName = 'messenger:consume';
private $busLocator;
private $routableBus;
private $receiverLocator;
private $logger;
private $receiverNames;
@ -49,15 +49,23 @@ class ConsumeMessagesCommand extends Command
/** @var CacheItemPoolInterface|null */
private $restartSignalCachePool;
public function __construct(ContainerInterface $busLocator, ContainerInterface $receiverLocator, LoggerInterface $logger = null, array $receiverNames = [], /* ContainerInterface */ $retryStrategyLocator = null, EventDispatcherInterface $eventDispatcher = null)
/**
* @param RoutableMessageBus $routableBus
*/
public function __construct($routableBus, ContainerInterface $receiverLocator, LoggerInterface $logger = null, array $receiverNames = [], /* ContainerInterface */ $retryStrategyLocator = null, EventDispatcherInterface $eventDispatcher = null)
{
// to be deprecated in 4.4
if ($routableBus instanceof ContainerInterface) {
$routableBus = new RoutableMessageBus($routableBus);
}
if (\is_array($retryStrategyLocator)) {
@trigger_error(sprintf('The 5th argument of the class "%s" should be a retry-strategy locator, an array of bus names as a value is deprecated since Symfony 4.3.', __CLASS__), E_USER_DEPRECATED);
$retryStrategyLocator = null;
}
$this->busLocator = $busLocator;
$this->routableBus = $routableBus;
$this->receiverLocator = $receiverLocator;
$this->logger = $logger;
$this->receiverNames = $receiverNames;
@ -177,11 +185,7 @@ EOF
$retryStrategies[$receiverName] = null !== $this->retryStrategyLocator ? $this->retryStrategyLocator->get($receiverName) : null;
}
if (null !== $input->getOption('bus')) {
$bus = $this->busLocator->get($input->getOption('bus'));
} else {
$bus = new RoutableMessageBus($this->busLocator);
}
$bus = $input->getOption('bus') ? $this->routableBus->getMessageBus($input->getOption('bus')) : $this->routableBus;
$worker = new Worker($receivers, $bus, $retryStrategies, $this->eventDispatcher, $this->logger);
$stopsWhen = [];

View File

@ -260,7 +260,6 @@ class MessengerPass implements CompilerPassInterface
if ($container->hasDefinition('console.command.messenger_consume_messages')) {
$container->getDefinition('console.command.messenger_consume_messages')
->replaceArgument(0, ServiceLocatorTagPass::register($container, $buses))
->replaceArgument(3, array_values($receiverNames));
}

View File

@ -45,11 +45,6 @@ class RoutableMessageBus implements MessageBusInterface
throw new InvalidArgumentException('Messages passed to RoutableMessageBus::dispatch() must be inside an Envelope');
}
return $this->getMessageBus($envelope)->dispatch($envelope, $stamps);
}
private function getMessageBus(Envelope $envelope): MessageBusInterface
{
/** @var BusNameStamp|null $busNameStamp */
$busNameStamp = $envelope->last(BusNameStamp::class);
@ -58,11 +53,17 @@ class RoutableMessageBus implements MessageBusInterface
throw new InvalidArgumentException(sprintf('Envelope is missing a BusNameStamp and no fallback message bus is configured on RoutableMessageBus.'));
}
return $this->fallbackBus;
return $this->fallbackBus->dispatch($envelope, $stamps);
}
$busName = $busNameStamp->getBusName();
return $this->getMessageBus($busNameStamp->getBusName())->dispatch($envelope, $stamps);
}
/**
* @internal
*/
public function getMessageBus(string $busName): MessageBusInterface
{
if (!$this->busLocator->has($busName)) {
throw new InvalidArgumentException(sprintf('Bus named "%s" does not exist.', $busName));
}

View File

@ -12,8 +12,16 @@
namespace Symfony\Component\Messenger\Tests\Command;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Console\Application;
use Symfony\Component\Console\Tester\CommandTester;
use Symfony\Component\DependencyInjection\ContainerInterface;
use Symfony\Component\DependencyInjection\ServiceLocator;
use Symfony\Component\Messenger\Command\ConsumeMessagesCommand;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\RoutableMessageBus;
use Symfony\Component\Messenger\Stamp\BusNameStamp;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
class ConsumeMessagesCommandTest extends TestCase
{
@ -24,4 +32,134 @@ class ConsumeMessagesCommandTest extends TestCase
$this->assertFalse($inputArgument->isRequired());
$this->assertSame(['amqp'], $inputArgument->getDefault());
}
public function testBasicRun()
{
$envelope = new Envelope(new \stdClass(), [new BusNameStamp('dummy-bus')]);
$receiver = $this->createMock(ReceiverInterface::class);
$receiver->expects($this->once())->method('get')->willReturn([$envelope]);
$receiverLocator = $this->createMock(ContainerInterface::class);
$receiverLocator->expects($this->once())->method('has')->with('dummy-receiver')->willReturn(true);
$receiverLocator->expects($this->once())->method('get')->with('dummy-receiver')->willReturn($receiver);
$bus = $this->createMock(MessageBusInterface::class);
$bus->expects($this->once())->method('dispatch');
$busLocator = $this->createMock(ContainerInterface::class);
$busLocator->expects($this->once())->method('has')->with('dummy-bus')->willReturn(true);
$busLocator->expects($this->once())->method('get')->with('dummy-bus')->willReturn($bus);
$command = new ConsumeMessagesCommand(new RoutableMessageBus($busLocator), $receiverLocator);
$application = new Application();
$application->add($command);
$tester = new CommandTester($application->get('messenger:consume'));
$tester->execute([
'receivers' => ['dummy-receiver'],
'--limit' => 1,
]);
$this->assertSame(0, $tester->getStatusCode());
$this->assertContains('[OK] Consuming messages from transports "dummy-receiver"', $tester->getDisplay());
}
public function testRunWithBusOption()
{
$envelope = new Envelope(new \stdClass());
$receiver = $this->createMock(ReceiverInterface::class);
$receiver->expects($this->once())->method('get')->willReturn([$envelope]);
$receiverLocator = $this->createMock(ContainerInterface::class);
$receiverLocator->expects($this->once())->method('has')->with('dummy-receiver')->willReturn(true);
$receiverLocator->expects($this->once())->method('get')->with('dummy-receiver')->willReturn($receiver);
$bus = $this->createMock(MessageBusInterface::class);
$bus->expects($this->once())->method('dispatch');
$busLocator = $this->createMock(ContainerInterface::class);
$busLocator->expects($this->once())->method('has')->with('dummy-bus')->willReturn(true);
$busLocator->expects($this->once())->method('get')->with('dummy-bus')->willReturn($bus);
$command = new ConsumeMessagesCommand(new RoutableMessageBus($busLocator), $receiverLocator);
$application = new Application();
$application->add($command);
$tester = new CommandTester($application->get('messenger:consume'));
$tester->execute([
'receivers' => ['dummy-receiver'],
'--bus' => 'dummy-bus',
'--limit' => 1,
]);
$this->assertSame(0, $tester->getStatusCode());
$this->assertContains('[OK] Consuming messages from transports "dummy-receiver"', $tester->getDisplay());
}
public function testBasicRunWithBusLocator()
{
$envelope = new Envelope(new \stdClass(), [new BusNameStamp('dummy-bus')]);
$receiver = $this->createMock(ReceiverInterface::class);
$receiver->expects($this->once())->method('get')->willReturn([$envelope]);
$receiverLocator = $this->createMock(ContainerInterface::class);
$receiverLocator->expects($this->once())->method('has')->with('dummy-receiver')->willReturn(true);
$receiverLocator->expects($this->once())->method('get')->with('dummy-receiver')->willReturn($receiver);
$bus = $this->createMock(MessageBusInterface::class);
$bus->expects($this->once())->method('dispatch');
$busLocator = $this->createMock(ContainerInterface::class);
$busLocator->expects($this->once())->method('has')->with('dummy-bus')->willReturn(true);
$busLocator->expects($this->once())->method('get')->with('dummy-bus')->willReturn($bus);
$command = new ConsumeMessagesCommand($busLocator, $receiverLocator);
$application = new Application();
$application->add($command);
$tester = new CommandTester($application->get('messenger:consume'));
$tester->execute([
'receivers' => ['dummy-receiver'],
'--limit' => 1,
]);
$this->assertSame(0, $tester->getStatusCode());
$this->assertContains('[OK] Consuming messages from transports "dummy-receiver"', $tester->getDisplay());
}
public function testRunWithBusOptionAndBusLocator()
{
$envelope = new Envelope(new \stdClass());
$receiver = $this->createMock(ReceiverInterface::class);
$receiver->expects($this->once())->method('get')->willReturn([$envelope]);
$receiverLocator = $this->createMock(ContainerInterface::class);
$receiverLocator->expects($this->once())->method('has')->with('dummy-receiver')->willReturn(true);
$receiverLocator->expects($this->once())->method('get')->with('dummy-receiver')->willReturn($receiver);
$bus = $this->createMock(MessageBusInterface::class);
$bus->expects($this->once())->method('dispatch');
$busLocator = $this->createMock(ContainerInterface::class);
$busLocator->expects($this->once())->method('has')->with('dummy-bus')->willReturn(true);
$busLocator->expects($this->once())->method('get')->with('dummy-bus')->willReturn($bus);
$command = new ConsumeMessagesCommand($busLocator, $receiverLocator);
$application = new Application();
$application->add($command);
$tester = new CommandTester($application->get('messenger:consume'));
$tester->execute([
'receivers' => ['dummy-receiver'],
'--bus' => 'dummy-bus',
'--limit' => 1,
]);
$this->assertSame(0, $tester->getStatusCode());
$this->assertContains('[OK] Consuming messages from transports "dummy-receiver"', $tester->getDisplay());
}
}