diff --git a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php
index 0e12127272..fa505bbb0c 100644
--- a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php
+++ b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php
@@ -1616,8 +1616,14 @@ class FrameworkExtension extends Extension
}
$defaultMiddleware = [
- 'before' => [['id' => 'dispatch_after_current_bus']],
- 'after' => [['id' => 'send_message'], ['id' => 'handle_message']],
+ 'before' => [
+ ['id' => 'add_bus_name_stamp_middleware'],
+ ['id' => 'dispatch_after_current_bus'],
+ ],
+ 'after' => [
+ ['id' => 'send_message'],
+ ['id' => 'handle_message'],
+ ],
];
foreach ($config['buses'] as $busId => $bus) {
$middleware = $bus['middleware'];
@@ -1628,6 +1634,10 @@ class FrameworkExtension extends Extension
} else {
unset($defaultMiddleware['after'][1]['arguments']);
}
+
+ // argument to add_bus_name_stamp_middleware
+ $defaultMiddleware['before'][0]['arguments'] = [$busId];
+
$middleware = array_merge($defaultMiddleware['before'], $middleware, $defaultMiddleware['after']);
}
diff --git a/src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml b/src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml
index eeb36961f7..35d9f813c4 100644
--- a/src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml
+++ b/src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml
@@ -81,7 +81,6 @@
-
diff --git a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml
index 3d471c9338..b4e6cd69ef 100644
--- a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml
+++ b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml
@@ -39,6 +39,8 @@
+
+
diff --git a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php
index dd626d62c4..ddd9d64286 100644
--- a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php
+++ b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php
@@ -727,6 +727,7 @@ abstract class FrameworkExtensionTest extends TestCase
$this->assertTrue($container->has('messenger.bus.commands'));
$this->assertSame([], $container->getDefinition('messenger.bus.commands')->getArgument(0));
$this->assertEquals([
+ ['id' => 'add_bus_name_stamp_middleware', 'arguments' => ['messenger.bus.commands']],
['id' => 'dispatch_after_current_bus'],
['id' => 'send_message'],
['id' => 'handle_message'],
@@ -734,6 +735,7 @@ abstract class FrameworkExtensionTest extends TestCase
$this->assertTrue($container->has('messenger.bus.events'));
$this->assertSame([], $container->getDefinition('messenger.bus.events')->getArgument(0));
$this->assertEquals([
+ ['id' => 'add_bus_name_stamp_middleware', 'arguments' => ['messenger.bus.events']],
['id' => 'dispatch_after_current_bus'],
['id' => 'with_factory', 'arguments' => ['foo', true, ['bar' => 'baz']]],
['id' => 'send_message'],
diff --git a/src/Symfony/Component/Messenger/CHANGELOG.md b/src/Symfony/Component/Messenger/CHANGELOG.md
index 40f8c1dc04..519966d3e9 100644
--- a/src/Symfony/Component/Messenger/CHANGELOG.md
+++ b/src/Symfony/Component/Messenger/CHANGELOG.md
@@ -3,7 +3,12 @@ CHANGELOG
4.3.0
-----
-
+
+ * New classes: `RoutableMessageBus`, `AddBusNameStampMiddleware`
+ and `BusNameStamp` were added, which allow you to add a bus identifier
+ to the `Envelope` then find the correct bus when receiving from
+ the transport. See `ConsumeMessagesCommand`.
+ * An optional `ConsumeMessagesCommand` constructor argument was removed.
* [BC BREAK] 2 new methods were added to `ReceiverInterface`:
`ack()` and `reject()`.
* [BC BREAK] Error handling was moved from the receivers into
diff --git a/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php b/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php
index a973a2337d..ca1973afd4 100644
--- a/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php
+++ b/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php
@@ -21,6 +21,7 @@ use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Style\SymfonyStyle;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
+use Symfony\Component\Messenger\RoutableMessageBus;
use Symfony\Component\Messenger\Transport\Receiver\StopWhenMemoryUsageIsExceededReceiver;
use Symfony\Component\Messenger\Transport\Receiver\StopWhenMessageCountIsExceededReceiver;
use Symfony\Component\Messenger\Transport\Receiver\StopWhenTimeLimitIsReachedReceiver;
@@ -39,17 +40,15 @@ class ConsumeMessagesCommand extends Command
private $receiverLocator;
private $logger;
private $receiverNames;
- private $busNames;
private $retryStrategyLocator;
private $eventDispatcher;
- public function __construct(ContainerInterface $busLocator, ContainerInterface $receiverLocator, LoggerInterface $logger = null, array $receiverNames = [], array $busNames = [], ContainerInterface $retryStrategyLocator = null, EventDispatcherInterface $eventDispatcher = null)
+ public function __construct(ContainerInterface $busLocator, ContainerInterface $receiverLocator, LoggerInterface $logger = null, array $receiverNames = [], ContainerInterface $retryStrategyLocator = null, EventDispatcherInterface $eventDispatcher = null)
{
$this->busLocator = $busLocator;
$this->receiverLocator = $receiverLocator;
$this->logger = $logger;
$this->receiverNames = $receiverNames;
- $this->busNames = $busNames;
$this->retryStrategyLocator = $retryStrategyLocator;
$this->eventDispatcher = $eventDispatcher;
@@ -62,7 +61,6 @@ class ConsumeMessagesCommand extends Command
protected function configure(): void
{
$defaultReceiverName = 1 === \count($this->receiverNames) ? current($this->receiverNames) : null;
- $defaultBusName = 1 === \count($this->busNames) ? current($this->busNames) : null;
$this
->setDefinition([
@@ -70,7 +68,7 @@ class ConsumeMessagesCommand extends Command
new InputOption('limit', 'l', InputOption::VALUE_REQUIRED, 'Limit the number of received messages'),
new InputOption('memory-limit', 'm', InputOption::VALUE_REQUIRED, 'The memory limit the worker can consume'),
new InputOption('time-limit', 't', InputOption::VALUE_REQUIRED, 'The time limit in seconds the worker can run'),
- new InputOption('bus', 'b', InputOption::VALUE_REQUIRED, 'Name of the bus to which received messages should be dispatched', $defaultBusName),
+ new InputOption('bus', 'b', InputOption::VALUE_REQUIRED, 'Name of the bus to which received messages should be dispatched (if not passed, bus is determined automatically.'),
])
->setDescription('Consumes messages')
->setHelp(<<<'EOF'
@@ -89,6 +87,12 @@ Use the --memory-limit option to stop the worker if it exceeds a given memory us
Use the --time-limit option to stop the worker when the given time limit (in seconds) is reached:
php %command.full_name% --time-limit=3600
+
+Use the --bus option to specify the message bus to dispatch received messages
+to instead of trying to determine it automatically. This is required if the
+messages didn't originate from Messenger:
+
+ php %command.full_name% --bus=event_bus
EOF
)
;
@@ -112,24 +116,6 @@ EOF
}
}
}
-
- $busName = $input->getOption('bus');
- if ($this->busNames && !$this->busLocator->has($busName)) {
- if (null === $busName) {
- $io->block('Missing bus argument.', null, 'error', ' ', true);
- $input->setOption('bus', $io->choice('Select one of the available buses', $this->busNames));
- } elseif ($alternatives = $this->findAlternatives($busName, $this->busNames)) {
- $io->block(sprintf('Bus "%s" is not defined.', $busName), null, 'error', ' ', true);
-
- if (1 === \count($alternatives)) {
- if ($io->confirm(sprintf('Do you want to dispatch to "%s" instead? ', $alternatives[0]), true)) {
- $input->setOption('bus', $alternatives[0]);
- }
- } else {
- $input->setOption('bus', $io->choice('Did you mean one of the following buses instead?', $alternatives, $alternatives[0]));
- }
- }
- }
}
/**
@@ -147,18 +133,19 @@ EOF
throw new RuntimeException(sprintf('Receiver "%s" does not exist.', $receiverName));
}
- if (!$this->busLocator->has($busName = $input->getOption('bus'))) {
- throw new RuntimeException(sprintf('Bus "%s" does not exist.', $busName));
- }
-
if (null !== $this->retryStrategyLocator && !$this->retryStrategyLocator->has($receiverName)) {
throw new RuntimeException(sprintf('Receiver "%s" does not have a configured retry strategy.', $receiverName));
}
$receiver = $this->receiverLocator->get($receiverName);
- $bus = $this->busLocator->get($busName);
$retryStrategy = null !== $this->retryStrategyLocator ? $this->retryStrategyLocator->get($receiverName) : null;
+ if (null !== $input->getOption('bus')) {
+ $bus = $this->busLocator->get($input->getOption('bus'));
+ } else {
+ $bus = new RoutableMessageBus($this->busLocator);
+ }
+
$stopsWhen = [];
if ($limit = $input->getOption('limit')) {
$stopsWhen[] = "processed {$limit} messages";
@@ -176,7 +163,7 @@ EOF
}
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
- $io->success(sprintf('Consuming messages from transport "%s" on bus "%s".', $receiverName, $busName));
+ $io->success(sprintf('Consuming messages from transport "%s".', $receiverName));
if ($stopsWhen) {
$last = array_pop($stopsWhen);
diff --git a/src/Symfony/Component/Messenger/DependencyInjection/MessengerPass.php b/src/Symfony/Component/Messenger/DependencyInjection/MessengerPass.php
index 8c4773462e..92ddfc2d4a 100644
--- a/src/Symfony/Component/Messenger/DependencyInjection/MessengerPass.php
+++ b/src/Symfony/Component/Messenger/DependencyInjection/MessengerPass.php
@@ -248,8 +248,7 @@ class MessengerPass implements CompilerPassInterface
$container->getDefinition('console.command.messenger_consume_messages')
->replaceArgument(0, ServiceLocatorTagPass::register($container, $buses))
- ->replaceArgument(3, array_values($receiverNames))
- ->replaceArgument(4, $busIds);
+ ->replaceArgument(3, array_values($receiverNames));
}
$container->getDefinition('messenger.receiver_locator')->replaceArgument(0, $receiverMapping);
diff --git a/src/Symfony/Component/Messenger/Middleware/AddBusNameStampMiddleware.php b/src/Symfony/Component/Messenger/Middleware/AddBusNameStampMiddleware.php
new file mode 100644
index 0000000000..94392e3c98
--- /dev/null
+++ b/src/Symfony/Component/Messenger/Middleware/AddBusNameStampMiddleware.php
@@ -0,0 +1,39 @@
+
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Symfony\Component\Messenger\Middleware;
+
+use Symfony\Component\Messenger\Envelope;
+use Symfony\Component\Messenger\Stamp\BusNameStamp;
+
+/**
+ * Adds the BusNameStamp to the bus.
+ *
+ * @experimental in 4.3
+ *
+ * @author Ryan Weaver
+ */
+class AddBusNameStampMiddleware implements MiddlewareInterface
+{
+ private $busName;
+
+ public function __construct(string $busName)
+ {
+ $this->busName = $busName;
+ }
+
+ public function handle(Envelope $envelope, StackInterface $stack): Envelope
+ {
+ $envelope = $envelope->with(new BusNameStamp($this->busName));
+
+ return $stack->next()->handle($envelope, $stack);
+ }
+}
diff --git a/src/Symfony/Component/Messenger/RoutableMessageBus.php b/src/Symfony/Component/Messenger/RoutableMessageBus.php
new file mode 100644
index 0000000000..3b1aba9751
--- /dev/null
+++ b/src/Symfony/Component/Messenger/RoutableMessageBus.php
@@ -0,0 +1,58 @@
+
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Symfony\Component\Messenger;
+
+use Psr\Container\ContainerInterface;
+use Symfony\Component\Messenger\Exception\InvalidArgumentException;
+use Symfony\Component\Messenger\Stamp\BusNameStamp;
+
+/**
+ * Bus of buses that is routable using a BusNameStamp.
+ *
+ * This is useful when passed to Worker: messages received
+ * from the transport can be sent to the correct bus.
+ *
+ * @experimental in 4.3
+ *
+ * @author Ryan Weaver
+ */
+class RoutableMessageBus implements MessageBusInterface
+{
+ private $busLocator;
+
+ /**
+ * @param ContainerInterface $busLocator A locator full of MessageBusInterface objects
+ */
+ public function __construct(ContainerInterface $busLocator)
+ {
+ $this->busLocator = $busLocator;
+ }
+
+ public function dispatch($envelope): Envelope
+ {
+ if (!$envelope instanceof Envelope) {
+ throw new InvalidArgumentException('Messages passed to RoutableMessageBus::dispatch() must be inside an Envelope');
+ }
+
+ /** @var BusNameStamp $busNameStamp */
+ $busNameStamp = $envelope->last(BusNameStamp::class);
+ if (null === $busNameStamp) {
+ throw new InvalidArgumentException('Envelope does not contain a BusNameStamp.');
+ }
+
+ if (!$this->busLocator->has($busNameStamp->getBusName())) {
+ throw new InvalidArgumentException(sprintf('Invalid bus name "%s" on BusNameStamp.', $busNameStamp->getBusName()));
+ }
+
+ return $this->busLocator->get($busNameStamp->getBusName())->dispatch($envelope);
+ }
+}
diff --git a/src/Symfony/Component/Messenger/Stamp/BusNameStamp.php b/src/Symfony/Component/Messenger/Stamp/BusNameStamp.php
new file mode 100644
index 0000000000..eee3f9e846
--- /dev/null
+++ b/src/Symfony/Component/Messenger/Stamp/BusNameStamp.php
@@ -0,0 +1,34 @@
+
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Symfony\Component\Messenger\Stamp;
+
+/**
+ * Stamp used to identify which bus it was passed to.
+ *
+ * @experimental in 4.3
+ *
+ * @author Ryan Weaver
+ */
+class BusNameStamp implements StampInterface
+{
+ private $busName;
+
+ public function __construct(string $busName)
+ {
+ $this->busName = $busName;
+ }
+
+ public function getBusName(): string
+ {
+ return $this->busName;
+ }
+}
diff --git a/src/Symfony/Component/Messenger/Tests/DependencyInjection/MessengerPassTest.php b/src/Symfony/Component/Messenger/Tests/DependencyInjection/MessengerPassTest.php
index 24820df0c6..c6ef2c0cec 100644
--- a/src/Symfony/Component/Messenger/Tests/DependencyInjection/MessengerPassTest.php
+++ b/src/Symfony/Component/Messenger/Tests/DependencyInjection/MessengerPassTest.php
@@ -263,7 +263,6 @@ class MessengerPassTest extends TestCase
(new MessengerPass())->process($container);
$this->assertSame(['amqp', 'dummy'], $container->getDefinition('console.command.messenger_consume_messages')->getArgument(3));
- $this->assertSame(['message_bus'], $container->getDefinition('console.command.messenger_consume_messages')->getArgument(4));
}
public function testItShouldNotThrowIfGeneratorIsReturnedInsteadOfArray()
diff --git a/src/Symfony/Component/Messenger/Tests/Middleware/AddBusNameStampMiddlewareTest.php b/src/Symfony/Component/Messenger/Tests/Middleware/AddBusNameStampMiddlewareTest.php
new file mode 100644
index 0000000000..71fe15b25e
--- /dev/null
+++ b/src/Symfony/Component/Messenger/Tests/Middleware/AddBusNameStampMiddlewareTest.php
@@ -0,0 +1,33 @@
+
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Symfony\Component\Messenger\Tests\Middleware;
+
+use Symfony\Component\Messenger\Envelope;
+use Symfony\Component\Messenger\Middleware\AddBusNameStampMiddleware;
+use Symfony\Component\Messenger\Stamp\BusNameStamp;
+use Symfony\Component\Messenger\Test\Middleware\MiddlewareTestCase;
+use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
+
+class AddBusNameStampMiddlewareTest extends MiddlewareTestCase
+{
+ public function testItSendsTheMessageToAssignedSender()
+ {
+ $middleware = new AddBusNameStampMiddleware('the_bus_name');
+ $envelope = new Envelope(new DummyMessage('the message'));
+
+ $finalEnvelope = $middleware->handle($envelope, $this->getStackMock());
+ /** @var BusNameStamp $busNameStamp */
+ $busNameStamp = $finalEnvelope->last(BusNameStamp::class);
+ $this->assertNotNull($busNameStamp);
+ $this->assertSame('the_bus_name', $busNameStamp->getBusName());
+ }
+}
diff --git a/src/Symfony/Component/Messenger/Tests/RoutableMessageBusTest.php b/src/Symfony/Component/Messenger/Tests/RoutableMessageBusTest.php
new file mode 100644
index 0000000000..517a57914c
--- /dev/null
+++ b/src/Symfony/Component/Messenger/Tests/RoutableMessageBusTest.php
@@ -0,0 +1,69 @@
+
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Symfony\Component\Messenger\Tests;
+
+use PHPUnit\Framework\TestCase;
+use Psr\Container\ContainerInterface;
+use Symfony\Component\Messenger\Envelope;
+use Symfony\Component\Messenger\Exception\InvalidArgumentException;
+use Symfony\Component\Messenger\MessageBusInterface;
+use Symfony\Component\Messenger\RoutableMessageBus;
+use Symfony\Component\Messenger\Stamp\BusNameStamp;
+
+class RoutableMessageBusTest extends TestCase
+{
+ public function testItRoutesToTheCorrectBus()
+ {
+ $envelope = new Envelope(new \stdClass(), new BusNameStamp('foo_bus'));
+
+ $bus1 = $this->createMock(MessageBusInterface::class);
+ $bus2 = $this->createMock(MessageBusInterface::class);
+
+ $container = $this->createMock(ContainerInterface::class);
+ $container->expects($this->once())->method('has')->with('foo_bus')->willReturn(true);
+ $container->expects($this->once())->method('get')->will($this->returnValue($bus2));
+
+ $bus1->expects($this->never())->method('dispatch');
+ $bus2->expects($this->once())->method('dispatch')->with($envelope)->willReturn($envelope);
+
+ $routableBus = new RoutableMessageBus($container);
+ $this->assertSame($envelope, $routableBus->dispatch($envelope));
+ }
+
+ public function testItExceptionOnMissingStamp()
+ {
+ $this->expectException(InvalidArgumentException::class);
+ $this->expectExceptionMessage('does not contain a BusNameStamp');
+
+ $envelope = new Envelope(new \stdClass());
+
+ $container = $this->createMock(ContainerInterface::class);
+ $container->expects($this->never())->method('has');
+
+ $routableBus = new RoutableMessageBus($container);
+ $routableBus->dispatch($envelope);
+ }
+
+ public function testItExceptionOnBusNotFound()
+ {
+ $this->expectException(InvalidArgumentException::class);
+ $this->expectExceptionMessage('Invalid bus name');
+
+ $envelope = new Envelope(new \stdClass(), new BusNameStamp('foo_bus'));
+
+ $container = $this->createMock(ContainerInterface::class);
+ $container->expects($this->once())->method('has')->willReturn(false);
+
+ $routableBus = new RoutableMessageBus($container);
+ $routableBus->dispatch($envelope);
+ }
+}