From ef077cf26c4124b7c1f16220a6b05e1979219978 Mon Sep 17 00:00:00 2001 From: Ryan Weaver Date: Fri, 22 Mar 2019 14:25:21 -0400 Subject: [PATCH] Fixing a bug where a transport could receive a message and dispatch it to a different bus --- .../FrameworkExtension.php | 14 +++- .../Resources/config/console.xml | 1 - .../Resources/config/messenger.xml | 2 + .../FrameworkExtensionTest.php | 2 + src/Symfony/Component/Messenger/CHANGELOG.md | 7 +- .../Command/ConsumeMessagesCommand.php | 45 +++++------- .../DependencyInjection/MessengerPass.php | 3 +- .../Middleware/AddBusNameStampMiddleware.php | 39 +++++++++++ .../Messenger/RoutableMessageBus.php | 58 ++++++++++++++++ .../Messenger/Stamp/BusNameStamp.php | 34 +++++++++ .../DependencyInjection/MessengerPassTest.php | 1 - .../AddBusNameStampMiddlewareTest.php | 33 +++++++++ .../Tests/RoutableMessageBusTest.php | 69 +++++++++++++++++++ 13 files changed, 272 insertions(+), 36 deletions(-) create mode 100644 src/Symfony/Component/Messenger/Middleware/AddBusNameStampMiddleware.php create mode 100644 src/Symfony/Component/Messenger/RoutableMessageBus.php create mode 100644 src/Symfony/Component/Messenger/Stamp/BusNameStamp.php create mode 100644 src/Symfony/Component/Messenger/Tests/Middleware/AddBusNameStampMiddlewareTest.php create mode 100644 src/Symfony/Component/Messenger/Tests/RoutableMessageBusTest.php diff --git a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php index 0e12127272..fa505bbb0c 100644 --- a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php +++ b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php @@ -1616,8 +1616,14 @@ class FrameworkExtension extends Extension } $defaultMiddleware = [ - 'before' => [['id' => 'dispatch_after_current_bus']], - 'after' => [['id' => 'send_message'], ['id' => 'handle_message']], + 'before' => [ + ['id' => 'add_bus_name_stamp_middleware'], + ['id' => 'dispatch_after_current_bus'], + ], + 'after' => [ + ['id' => 'send_message'], + ['id' => 'handle_message'], + ], ]; foreach ($config['buses'] as $busId => $bus) { $middleware = $bus['middleware']; @@ -1628,6 +1634,10 @@ class FrameworkExtension extends Extension } else { unset($defaultMiddleware['after'][1]['arguments']); } + + // argument to add_bus_name_stamp_middleware + $defaultMiddleware['before'][0]['arguments'] = [$busId]; + $middleware = array_merge($defaultMiddleware['before'], $middleware, $defaultMiddleware['after']); } diff --git a/src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml b/src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml index eeb36961f7..35d9f813c4 100644 --- a/src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml +++ b/src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml @@ -81,7 +81,6 @@ - diff --git a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml index 3d471c9338..b4e6cd69ef 100644 --- a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml +++ b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml @@ -39,6 +39,8 @@ + + diff --git a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php index dd626d62c4..ddd9d64286 100644 --- a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php +++ b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php @@ -727,6 +727,7 @@ abstract class FrameworkExtensionTest extends TestCase $this->assertTrue($container->has('messenger.bus.commands')); $this->assertSame([], $container->getDefinition('messenger.bus.commands')->getArgument(0)); $this->assertEquals([ + ['id' => 'add_bus_name_stamp_middleware', 'arguments' => ['messenger.bus.commands']], ['id' => 'dispatch_after_current_bus'], ['id' => 'send_message'], ['id' => 'handle_message'], @@ -734,6 +735,7 @@ abstract class FrameworkExtensionTest extends TestCase $this->assertTrue($container->has('messenger.bus.events')); $this->assertSame([], $container->getDefinition('messenger.bus.events')->getArgument(0)); $this->assertEquals([ + ['id' => 'add_bus_name_stamp_middleware', 'arguments' => ['messenger.bus.events']], ['id' => 'dispatch_after_current_bus'], ['id' => 'with_factory', 'arguments' => ['foo', true, ['bar' => 'baz']]], ['id' => 'send_message'], diff --git a/src/Symfony/Component/Messenger/CHANGELOG.md b/src/Symfony/Component/Messenger/CHANGELOG.md index 40f8c1dc04..519966d3e9 100644 --- a/src/Symfony/Component/Messenger/CHANGELOG.md +++ b/src/Symfony/Component/Messenger/CHANGELOG.md @@ -3,7 +3,12 @@ CHANGELOG 4.3.0 ----- - + + * New classes: `RoutableMessageBus`, `AddBusNameStampMiddleware` + and `BusNameStamp` were added, which allow you to add a bus identifier + to the `Envelope` then find the correct bus when receiving from + the transport. See `ConsumeMessagesCommand`. + * An optional `ConsumeMessagesCommand` constructor argument was removed. * [BC BREAK] 2 new methods were added to `ReceiverInterface`: `ack()` and `reject()`. * [BC BREAK] Error handling was moved from the receivers into diff --git a/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php b/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php index a973a2337d..ca1973afd4 100644 --- a/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php +++ b/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php @@ -21,6 +21,7 @@ use Symfony\Component\Console\Input\InputOption; use Symfony\Component\Console\Output\OutputInterface; use Symfony\Component\Console\Style\SymfonyStyle; use Symfony\Component\EventDispatcher\EventDispatcherInterface; +use Symfony\Component\Messenger\RoutableMessageBus; use Symfony\Component\Messenger\Transport\Receiver\StopWhenMemoryUsageIsExceededReceiver; use Symfony\Component\Messenger\Transport\Receiver\StopWhenMessageCountIsExceededReceiver; use Symfony\Component\Messenger\Transport\Receiver\StopWhenTimeLimitIsReachedReceiver; @@ -39,17 +40,15 @@ class ConsumeMessagesCommand extends Command private $receiverLocator; private $logger; private $receiverNames; - private $busNames; private $retryStrategyLocator; private $eventDispatcher; - public function __construct(ContainerInterface $busLocator, ContainerInterface $receiverLocator, LoggerInterface $logger = null, array $receiverNames = [], array $busNames = [], ContainerInterface $retryStrategyLocator = null, EventDispatcherInterface $eventDispatcher = null) + public function __construct(ContainerInterface $busLocator, ContainerInterface $receiverLocator, LoggerInterface $logger = null, array $receiverNames = [], ContainerInterface $retryStrategyLocator = null, EventDispatcherInterface $eventDispatcher = null) { $this->busLocator = $busLocator; $this->receiverLocator = $receiverLocator; $this->logger = $logger; $this->receiverNames = $receiverNames; - $this->busNames = $busNames; $this->retryStrategyLocator = $retryStrategyLocator; $this->eventDispatcher = $eventDispatcher; @@ -62,7 +61,6 @@ class ConsumeMessagesCommand extends Command protected function configure(): void { $defaultReceiverName = 1 === \count($this->receiverNames) ? current($this->receiverNames) : null; - $defaultBusName = 1 === \count($this->busNames) ? current($this->busNames) : null; $this ->setDefinition([ @@ -70,7 +68,7 @@ class ConsumeMessagesCommand extends Command new InputOption('limit', 'l', InputOption::VALUE_REQUIRED, 'Limit the number of received messages'), new InputOption('memory-limit', 'm', InputOption::VALUE_REQUIRED, 'The memory limit the worker can consume'), new InputOption('time-limit', 't', InputOption::VALUE_REQUIRED, 'The time limit in seconds the worker can run'), - new InputOption('bus', 'b', InputOption::VALUE_REQUIRED, 'Name of the bus to which received messages should be dispatched', $defaultBusName), + new InputOption('bus', 'b', InputOption::VALUE_REQUIRED, 'Name of the bus to which received messages should be dispatched (if not passed, bus is determined automatically.'), ]) ->setDescription('Consumes messages') ->setHelp(<<<'EOF' @@ -89,6 +87,12 @@ Use the --memory-limit option to stop the worker if it exceeds a given memory us Use the --time-limit option to stop the worker when the given time limit (in seconds) is reached: php %command.full_name% --time-limit=3600 + +Use the --bus option to specify the message bus to dispatch received messages +to instead of trying to determine it automatically. This is required if the +messages didn't originate from Messenger: + + php %command.full_name% --bus=event_bus EOF ) ; @@ -112,24 +116,6 @@ EOF } } } - - $busName = $input->getOption('bus'); - if ($this->busNames && !$this->busLocator->has($busName)) { - if (null === $busName) { - $io->block('Missing bus argument.', null, 'error', ' ', true); - $input->setOption('bus', $io->choice('Select one of the available buses', $this->busNames)); - } elseif ($alternatives = $this->findAlternatives($busName, $this->busNames)) { - $io->block(sprintf('Bus "%s" is not defined.', $busName), null, 'error', ' ', true); - - if (1 === \count($alternatives)) { - if ($io->confirm(sprintf('Do you want to dispatch to "%s" instead? ', $alternatives[0]), true)) { - $input->setOption('bus', $alternatives[0]); - } - } else { - $input->setOption('bus', $io->choice('Did you mean one of the following buses instead?', $alternatives, $alternatives[0])); - } - } - } } /** @@ -147,18 +133,19 @@ EOF throw new RuntimeException(sprintf('Receiver "%s" does not exist.', $receiverName)); } - if (!$this->busLocator->has($busName = $input->getOption('bus'))) { - throw new RuntimeException(sprintf('Bus "%s" does not exist.', $busName)); - } - if (null !== $this->retryStrategyLocator && !$this->retryStrategyLocator->has($receiverName)) { throw new RuntimeException(sprintf('Receiver "%s" does not have a configured retry strategy.', $receiverName)); } $receiver = $this->receiverLocator->get($receiverName); - $bus = $this->busLocator->get($busName); $retryStrategy = null !== $this->retryStrategyLocator ? $this->retryStrategyLocator->get($receiverName) : null; + if (null !== $input->getOption('bus')) { + $bus = $this->busLocator->get($input->getOption('bus')); + } else { + $bus = new RoutableMessageBus($this->busLocator); + } + $stopsWhen = []; if ($limit = $input->getOption('limit')) { $stopsWhen[] = "processed {$limit} messages"; @@ -176,7 +163,7 @@ EOF } $io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output); - $io->success(sprintf('Consuming messages from transport "%s" on bus "%s".', $receiverName, $busName)); + $io->success(sprintf('Consuming messages from transport "%s".', $receiverName)); if ($stopsWhen) { $last = array_pop($stopsWhen); diff --git a/src/Symfony/Component/Messenger/DependencyInjection/MessengerPass.php b/src/Symfony/Component/Messenger/DependencyInjection/MessengerPass.php index 8c4773462e..92ddfc2d4a 100644 --- a/src/Symfony/Component/Messenger/DependencyInjection/MessengerPass.php +++ b/src/Symfony/Component/Messenger/DependencyInjection/MessengerPass.php @@ -248,8 +248,7 @@ class MessengerPass implements CompilerPassInterface $container->getDefinition('console.command.messenger_consume_messages') ->replaceArgument(0, ServiceLocatorTagPass::register($container, $buses)) - ->replaceArgument(3, array_values($receiverNames)) - ->replaceArgument(4, $busIds); + ->replaceArgument(3, array_values($receiverNames)); } $container->getDefinition('messenger.receiver_locator')->replaceArgument(0, $receiverMapping); diff --git a/src/Symfony/Component/Messenger/Middleware/AddBusNameStampMiddleware.php b/src/Symfony/Component/Messenger/Middleware/AddBusNameStampMiddleware.php new file mode 100644 index 0000000000..94392e3c98 --- /dev/null +++ b/src/Symfony/Component/Messenger/Middleware/AddBusNameStampMiddleware.php @@ -0,0 +1,39 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Middleware; + +use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Stamp\BusNameStamp; + +/** + * Adds the BusNameStamp to the bus. + * + * @experimental in 4.3 + * + * @author Ryan Weaver + */ +class AddBusNameStampMiddleware implements MiddlewareInterface +{ + private $busName; + + public function __construct(string $busName) + { + $this->busName = $busName; + } + + public function handle(Envelope $envelope, StackInterface $stack): Envelope + { + $envelope = $envelope->with(new BusNameStamp($this->busName)); + + return $stack->next()->handle($envelope, $stack); + } +} diff --git a/src/Symfony/Component/Messenger/RoutableMessageBus.php b/src/Symfony/Component/Messenger/RoutableMessageBus.php new file mode 100644 index 0000000000..3b1aba9751 --- /dev/null +++ b/src/Symfony/Component/Messenger/RoutableMessageBus.php @@ -0,0 +1,58 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger; + +use Psr\Container\ContainerInterface; +use Symfony\Component\Messenger\Exception\InvalidArgumentException; +use Symfony\Component\Messenger\Stamp\BusNameStamp; + +/** + * Bus of buses that is routable using a BusNameStamp. + * + * This is useful when passed to Worker: messages received + * from the transport can be sent to the correct bus. + * + * @experimental in 4.3 + * + * @author Ryan Weaver + */ +class RoutableMessageBus implements MessageBusInterface +{ + private $busLocator; + + /** + * @param ContainerInterface $busLocator A locator full of MessageBusInterface objects + */ + public function __construct(ContainerInterface $busLocator) + { + $this->busLocator = $busLocator; + } + + public function dispatch($envelope): Envelope + { + if (!$envelope instanceof Envelope) { + throw new InvalidArgumentException('Messages passed to RoutableMessageBus::dispatch() must be inside an Envelope'); + } + + /** @var BusNameStamp $busNameStamp */ + $busNameStamp = $envelope->last(BusNameStamp::class); + if (null === $busNameStamp) { + throw new InvalidArgumentException('Envelope does not contain a BusNameStamp.'); + } + + if (!$this->busLocator->has($busNameStamp->getBusName())) { + throw new InvalidArgumentException(sprintf('Invalid bus name "%s" on BusNameStamp.', $busNameStamp->getBusName())); + } + + return $this->busLocator->get($busNameStamp->getBusName())->dispatch($envelope); + } +} diff --git a/src/Symfony/Component/Messenger/Stamp/BusNameStamp.php b/src/Symfony/Component/Messenger/Stamp/BusNameStamp.php new file mode 100644 index 0000000000..eee3f9e846 --- /dev/null +++ b/src/Symfony/Component/Messenger/Stamp/BusNameStamp.php @@ -0,0 +1,34 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Stamp; + +/** + * Stamp used to identify which bus it was passed to. + * + * @experimental in 4.3 + * + * @author Ryan Weaver + */ +class BusNameStamp implements StampInterface +{ + private $busName; + + public function __construct(string $busName) + { + $this->busName = $busName; + } + + public function getBusName(): string + { + return $this->busName; + } +} diff --git a/src/Symfony/Component/Messenger/Tests/DependencyInjection/MessengerPassTest.php b/src/Symfony/Component/Messenger/Tests/DependencyInjection/MessengerPassTest.php index 24820df0c6..c6ef2c0cec 100644 --- a/src/Symfony/Component/Messenger/Tests/DependencyInjection/MessengerPassTest.php +++ b/src/Symfony/Component/Messenger/Tests/DependencyInjection/MessengerPassTest.php @@ -263,7 +263,6 @@ class MessengerPassTest extends TestCase (new MessengerPass())->process($container); $this->assertSame(['amqp', 'dummy'], $container->getDefinition('console.command.messenger_consume_messages')->getArgument(3)); - $this->assertSame(['message_bus'], $container->getDefinition('console.command.messenger_consume_messages')->getArgument(4)); } public function testItShouldNotThrowIfGeneratorIsReturnedInsteadOfArray() diff --git a/src/Symfony/Component/Messenger/Tests/Middleware/AddBusNameStampMiddlewareTest.php b/src/Symfony/Component/Messenger/Tests/Middleware/AddBusNameStampMiddlewareTest.php new file mode 100644 index 0000000000..71fe15b25e --- /dev/null +++ b/src/Symfony/Component/Messenger/Tests/Middleware/AddBusNameStampMiddlewareTest.php @@ -0,0 +1,33 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Tests\Middleware; + +use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Middleware\AddBusNameStampMiddleware; +use Symfony\Component\Messenger\Stamp\BusNameStamp; +use Symfony\Component\Messenger\Test\Middleware\MiddlewareTestCase; +use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; + +class AddBusNameStampMiddlewareTest extends MiddlewareTestCase +{ + public function testItSendsTheMessageToAssignedSender() + { + $middleware = new AddBusNameStampMiddleware('the_bus_name'); + $envelope = new Envelope(new DummyMessage('the message')); + + $finalEnvelope = $middleware->handle($envelope, $this->getStackMock()); + /** @var BusNameStamp $busNameStamp */ + $busNameStamp = $finalEnvelope->last(BusNameStamp::class); + $this->assertNotNull($busNameStamp); + $this->assertSame('the_bus_name', $busNameStamp->getBusName()); + } +} diff --git a/src/Symfony/Component/Messenger/Tests/RoutableMessageBusTest.php b/src/Symfony/Component/Messenger/Tests/RoutableMessageBusTest.php new file mode 100644 index 0000000000..517a57914c --- /dev/null +++ b/src/Symfony/Component/Messenger/Tests/RoutableMessageBusTest.php @@ -0,0 +1,69 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Tests; + +use PHPUnit\Framework\TestCase; +use Psr\Container\ContainerInterface; +use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Exception\InvalidArgumentException; +use Symfony\Component\Messenger\MessageBusInterface; +use Symfony\Component\Messenger\RoutableMessageBus; +use Symfony\Component\Messenger\Stamp\BusNameStamp; + +class RoutableMessageBusTest extends TestCase +{ + public function testItRoutesToTheCorrectBus() + { + $envelope = new Envelope(new \stdClass(), new BusNameStamp('foo_bus')); + + $bus1 = $this->createMock(MessageBusInterface::class); + $bus2 = $this->createMock(MessageBusInterface::class); + + $container = $this->createMock(ContainerInterface::class); + $container->expects($this->once())->method('has')->with('foo_bus')->willReturn(true); + $container->expects($this->once())->method('get')->will($this->returnValue($bus2)); + + $bus1->expects($this->never())->method('dispatch'); + $bus2->expects($this->once())->method('dispatch')->with($envelope)->willReturn($envelope); + + $routableBus = new RoutableMessageBus($container); + $this->assertSame($envelope, $routableBus->dispatch($envelope)); + } + + public function testItExceptionOnMissingStamp() + { + $this->expectException(InvalidArgumentException::class); + $this->expectExceptionMessage('does not contain a BusNameStamp'); + + $envelope = new Envelope(new \stdClass()); + + $container = $this->createMock(ContainerInterface::class); + $container->expects($this->never())->method('has'); + + $routableBus = new RoutableMessageBus($container); + $routableBus->dispatch($envelope); + } + + public function testItExceptionOnBusNotFound() + { + $this->expectException(InvalidArgumentException::class); + $this->expectExceptionMessage('Invalid bus name'); + + $envelope = new Envelope(new \stdClass(), new BusNameStamp('foo_bus')); + + $container = $this->createMock(ContainerInterface::class); + $container->expects($this->once())->method('has')->willReturn(false); + + $routableBus = new RoutableMessageBus($container); + $routableBus->dispatch($envelope); + } +}