feature #30652 Fixing a bug where messenger:consume could send message to wrong bus (weaverryan)

This PR was merged into the 4.3-dev branch.

Discussion
----------

Fixing a bug where messenger:consume could send message to wrong bus

| Q             | A
| ------------- | ---
| Branch?       | master
| Bug fix?      | yes
| New feature?  | arguably, yes
| BC breaks?    | no
| Deprecations? | no
| Tests pass?   | yes
| Fixed tickets | #30631
| License       | MIT
| Doc PR        | Not needed

This fixes #30631, where you can run `messener:consume` and accidentally sent received messages into the wrong bus.

The fix (done via middleware) is to attach a "bus name" to the `Envelope` and use it when the message is received to find that bus.

Commits
-------

ef077cf26c Fixing a bug where a transport could receive a message and dispatch it to a different bus
This commit is contained in:
Fabien Potencier 2019-03-23 15:33:29 +01:00
commit e512b7ecff
13 changed files with 272 additions and 36 deletions

View File

@ -1616,8 +1616,14 @@ class FrameworkExtension extends Extension
}
$defaultMiddleware = [
'before' => [['id' => 'dispatch_after_current_bus']],
'after' => [['id' => 'send_message'], ['id' => 'handle_message']],
'before' => [
['id' => 'add_bus_name_stamp_middleware'],
['id' => 'dispatch_after_current_bus'],
],
'after' => [
['id' => 'send_message'],
['id' => 'handle_message'],
],
];
foreach ($config['buses'] as $busId => $bus) {
$middleware = $bus['middleware'];
@ -1628,6 +1634,10 @@ class FrameworkExtension extends Extension
} else {
unset($defaultMiddleware['after'][1]['arguments']);
}
// argument to add_bus_name_stamp_middleware
$defaultMiddleware['before'][0]['arguments'] = [$busId];
$middleware = array_merge($defaultMiddleware['before'], $middleware, $defaultMiddleware['after']);
}

View File

@ -81,7 +81,6 @@
<argument type="service" id="messenger.receiver_locator" />
<argument type="service" id="logger" on-invalid="null" />
<argument type="collection" /> <!-- Receiver names -->
<argument type="collection" /> <!-- Message bus names -->
<argument type="service" id="messenger.retry_strategy_locator" />
<argument type="service" id="event_dispatcher" />

View File

@ -39,6 +39,8 @@
</call>
</service>
<service id="messenger.middleware.add_bus_name_stamp_middleware" class="Symfony\Component\Messenger\Middleware\AddBusNameStampMiddleware" abstract="true" />
<service id="messenger.middleware.dispatch_after_current_bus" class="Symfony\Component\Messenger\Middleware\DispatchAfterCurrentBusMiddleware" />
<service id="messenger.middleware.validation" class="Symfony\Component\Messenger\Middleware\ValidationMiddleware">

View File

@ -727,6 +727,7 @@ abstract class FrameworkExtensionTest extends TestCase
$this->assertTrue($container->has('messenger.bus.commands'));
$this->assertSame([], $container->getDefinition('messenger.bus.commands')->getArgument(0));
$this->assertEquals([
['id' => 'add_bus_name_stamp_middleware', 'arguments' => ['messenger.bus.commands']],
['id' => 'dispatch_after_current_bus'],
['id' => 'send_message'],
['id' => 'handle_message'],
@ -734,6 +735,7 @@ abstract class FrameworkExtensionTest extends TestCase
$this->assertTrue($container->has('messenger.bus.events'));
$this->assertSame([], $container->getDefinition('messenger.bus.events')->getArgument(0));
$this->assertEquals([
['id' => 'add_bus_name_stamp_middleware', 'arguments' => ['messenger.bus.events']],
['id' => 'dispatch_after_current_bus'],
['id' => 'with_factory', 'arguments' => ['foo', true, ['bar' => 'baz']]],
['id' => 'send_message'],

View File

@ -3,7 +3,12 @@ CHANGELOG
4.3.0
-----
* New classes: `RoutableMessageBus`, `AddBusNameStampMiddleware`
and `BusNameStamp` were added, which allow you to add a bus identifier
to the `Envelope` then find the correct bus when receiving from
the transport. See `ConsumeMessagesCommand`.
* An optional `ConsumeMessagesCommand` constructor argument was removed.
* [BC BREAK] 2 new methods were added to `ReceiverInterface`:
`ack()` and `reject()`.
* [BC BREAK] Error handling was moved from the receivers into

View File

@ -21,6 +21,7 @@ use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Style\SymfonyStyle;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
use Symfony\Component\Messenger\RoutableMessageBus;
use Symfony\Component\Messenger\Transport\Receiver\StopWhenMemoryUsageIsExceededReceiver;
use Symfony\Component\Messenger\Transport\Receiver\StopWhenMessageCountIsExceededReceiver;
use Symfony\Component\Messenger\Transport\Receiver\StopWhenTimeLimitIsReachedReceiver;
@ -39,17 +40,15 @@ class ConsumeMessagesCommand extends Command
private $receiverLocator;
private $logger;
private $receiverNames;
private $busNames;
private $retryStrategyLocator;
private $eventDispatcher;
public function __construct(ContainerInterface $busLocator, ContainerInterface $receiverLocator, LoggerInterface $logger = null, array $receiverNames = [], array $busNames = [], ContainerInterface $retryStrategyLocator = null, EventDispatcherInterface $eventDispatcher = null)
public function __construct(ContainerInterface $busLocator, ContainerInterface $receiverLocator, LoggerInterface $logger = null, array $receiverNames = [], ContainerInterface $retryStrategyLocator = null, EventDispatcherInterface $eventDispatcher = null)
{
$this->busLocator = $busLocator;
$this->receiverLocator = $receiverLocator;
$this->logger = $logger;
$this->receiverNames = $receiverNames;
$this->busNames = $busNames;
$this->retryStrategyLocator = $retryStrategyLocator;
$this->eventDispatcher = $eventDispatcher;
@ -62,7 +61,6 @@ class ConsumeMessagesCommand extends Command
protected function configure(): void
{
$defaultReceiverName = 1 === \count($this->receiverNames) ? current($this->receiverNames) : null;
$defaultBusName = 1 === \count($this->busNames) ? current($this->busNames) : null;
$this
->setDefinition([
@ -70,7 +68,7 @@ class ConsumeMessagesCommand extends Command
new InputOption('limit', 'l', InputOption::VALUE_REQUIRED, 'Limit the number of received messages'),
new InputOption('memory-limit', 'm', InputOption::VALUE_REQUIRED, 'The memory limit the worker can consume'),
new InputOption('time-limit', 't', InputOption::VALUE_REQUIRED, 'The time limit in seconds the worker can run'),
new InputOption('bus', 'b', InputOption::VALUE_REQUIRED, 'Name of the bus to which received messages should be dispatched', $defaultBusName),
new InputOption('bus', 'b', InputOption::VALUE_REQUIRED, 'Name of the bus to which received messages should be dispatched (if not passed, bus is determined automatically.'),
])
->setDescription('Consumes messages')
->setHelp(<<<'EOF'
@ -89,6 +87,12 @@ Use the --memory-limit option to stop the worker if it exceeds a given memory us
Use the --time-limit option to stop the worker when the given time limit (in seconds) is reached:
<info>php %command.full_name% <receiver-name> --time-limit=3600</info>
Use the --bus option to specify the message bus to dispatch received messages
to instead of trying to determine it automatically. This is required if the
messages didn't originate from Messenger:
<info>php %command.full_name% <receiver-name> --bus=event_bus</info>
EOF
)
;
@ -112,24 +116,6 @@ EOF
}
}
}
$busName = $input->getOption('bus');
if ($this->busNames && !$this->busLocator->has($busName)) {
if (null === $busName) {
$io->block('Missing bus argument.', null, 'error', ' ', true);
$input->setOption('bus', $io->choice('Select one of the available buses', $this->busNames));
} elseif ($alternatives = $this->findAlternatives($busName, $this->busNames)) {
$io->block(sprintf('Bus "%s" is not defined.', $busName), null, 'error', ' ', true);
if (1 === \count($alternatives)) {
if ($io->confirm(sprintf('Do you want to dispatch to "%s" instead? ', $alternatives[0]), true)) {
$input->setOption('bus', $alternatives[0]);
}
} else {
$input->setOption('bus', $io->choice('Did you mean one of the following buses instead?', $alternatives, $alternatives[0]));
}
}
}
}
/**
@ -147,18 +133,19 @@ EOF
throw new RuntimeException(sprintf('Receiver "%s" does not exist.', $receiverName));
}
if (!$this->busLocator->has($busName = $input->getOption('bus'))) {
throw new RuntimeException(sprintf('Bus "%s" does not exist.', $busName));
}
if (null !== $this->retryStrategyLocator && !$this->retryStrategyLocator->has($receiverName)) {
throw new RuntimeException(sprintf('Receiver "%s" does not have a configured retry strategy.', $receiverName));
}
$receiver = $this->receiverLocator->get($receiverName);
$bus = $this->busLocator->get($busName);
$retryStrategy = null !== $this->retryStrategyLocator ? $this->retryStrategyLocator->get($receiverName) : null;
if (null !== $input->getOption('bus')) {
$bus = $this->busLocator->get($input->getOption('bus'));
} else {
$bus = new RoutableMessageBus($this->busLocator);
}
$stopsWhen = [];
if ($limit = $input->getOption('limit')) {
$stopsWhen[] = "processed {$limit} messages";
@ -176,7 +163,7 @@ EOF
}
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
$io->success(sprintf('Consuming messages from transport "%s" on bus "%s".', $receiverName, $busName));
$io->success(sprintf('Consuming messages from transport "%s".', $receiverName));
if ($stopsWhen) {
$last = array_pop($stopsWhen);

View File

@ -248,8 +248,7 @@ class MessengerPass implements CompilerPassInterface
$container->getDefinition('console.command.messenger_consume_messages')
->replaceArgument(0, ServiceLocatorTagPass::register($container, $buses))
->replaceArgument(3, array_values($receiverNames))
->replaceArgument(4, $busIds);
->replaceArgument(3, array_values($receiverNames));
}
$container->getDefinition('messenger.receiver_locator')->replaceArgument(0, $receiverMapping);

View File

@ -0,0 +1,39 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Middleware;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Stamp\BusNameStamp;
/**
* Adds the BusNameStamp to the bus.
*
* @experimental in 4.3
*
* @author Ryan Weaver <ryan@symfonycasts.com>
*/
class AddBusNameStampMiddleware implements MiddlewareInterface
{
private $busName;
public function __construct(string $busName)
{
$this->busName = $busName;
}
public function handle(Envelope $envelope, StackInterface $stack): Envelope
{
$envelope = $envelope->with(new BusNameStamp($this->busName));
return $stack->next()->handle($envelope, $stack);
}
}

View File

@ -0,0 +1,58 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger;
use Psr\Container\ContainerInterface;
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
use Symfony\Component\Messenger\Stamp\BusNameStamp;
/**
* Bus of buses that is routable using a BusNameStamp.
*
* This is useful when passed to Worker: messages received
* from the transport can be sent to the correct bus.
*
* @experimental in 4.3
*
* @author Ryan Weaver <ryan@symfonycasts.com>
*/
class RoutableMessageBus implements MessageBusInterface
{
private $busLocator;
/**
* @param ContainerInterface $busLocator A locator full of MessageBusInterface objects
*/
public function __construct(ContainerInterface $busLocator)
{
$this->busLocator = $busLocator;
}
public function dispatch($envelope): Envelope
{
if (!$envelope instanceof Envelope) {
throw new InvalidArgumentException('Messages passed to RoutableMessageBus::dispatch() must be inside an Envelope');
}
/** @var BusNameStamp $busNameStamp */
$busNameStamp = $envelope->last(BusNameStamp::class);
if (null === $busNameStamp) {
throw new InvalidArgumentException('Envelope does not contain a BusNameStamp.');
}
if (!$this->busLocator->has($busNameStamp->getBusName())) {
throw new InvalidArgumentException(sprintf('Invalid bus name "%s" on BusNameStamp.', $busNameStamp->getBusName()));
}
return $this->busLocator->get($busNameStamp->getBusName())->dispatch($envelope);
}
}

View File

@ -0,0 +1,34 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Stamp;
/**
* Stamp used to identify which bus it was passed to.
*
* @experimental in 4.3
*
* @author Ryan Weaver <ryan@symfonycasts.com>
*/
class BusNameStamp implements StampInterface
{
private $busName;
public function __construct(string $busName)
{
$this->busName = $busName;
}
public function getBusName(): string
{
return $this->busName;
}
}

View File

@ -263,7 +263,6 @@ class MessengerPassTest extends TestCase
(new MessengerPass())->process($container);
$this->assertSame(['amqp', 'dummy'], $container->getDefinition('console.command.messenger_consume_messages')->getArgument(3));
$this->assertSame(['message_bus'], $container->getDefinition('console.command.messenger_consume_messages')->getArgument(4));
}
public function testItShouldNotThrowIfGeneratorIsReturnedInsteadOfArray()

View File

@ -0,0 +1,33 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Tests\Middleware;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Middleware\AddBusNameStampMiddleware;
use Symfony\Component\Messenger\Stamp\BusNameStamp;
use Symfony\Component\Messenger\Test\Middleware\MiddlewareTestCase;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
class AddBusNameStampMiddlewareTest extends MiddlewareTestCase
{
public function testItSendsTheMessageToAssignedSender()
{
$middleware = new AddBusNameStampMiddleware('the_bus_name');
$envelope = new Envelope(new DummyMessage('the message'));
$finalEnvelope = $middleware->handle($envelope, $this->getStackMock());
/** @var BusNameStamp $busNameStamp */
$busNameStamp = $finalEnvelope->last(BusNameStamp::class);
$this->assertNotNull($busNameStamp);
$this->assertSame('the_bus_name', $busNameStamp->getBusName());
}
}

View File

@ -0,0 +1,69 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Tests;
use PHPUnit\Framework\TestCase;
use Psr\Container\ContainerInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\RoutableMessageBus;
use Symfony\Component\Messenger\Stamp\BusNameStamp;
class RoutableMessageBusTest extends TestCase
{
public function testItRoutesToTheCorrectBus()
{
$envelope = new Envelope(new \stdClass(), new BusNameStamp('foo_bus'));
$bus1 = $this->createMock(MessageBusInterface::class);
$bus2 = $this->createMock(MessageBusInterface::class);
$container = $this->createMock(ContainerInterface::class);
$container->expects($this->once())->method('has')->with('foo_bus')->willReturn(true);
$container->expects($this->once())->method('get')->will($this->returnValue($bus2));
$bus1->expects($this->never())->method('dispatch');
$bus2->expects($this->once())->method('dispatch')->with($envelope)->willReturn($envelope);
$routableBus = new RoutableMessageBus($container);
$this->assertSame($envelope, $routableBus->dispatch($envelope));
}
public function testItExceptionOnMissingStamp()
{
$this->expectException(InvalidArgumentException::class);
$this->expectExceptionMessage('does not contain a BusNameStamp');
$envelope = new Envelope(new \stdClass());
$container = $this->createMock(ContainerInterface::class);
$container->expects($this->never())->method('has');
$routableBus = new RoutableMessageBus($container);
$routableBus->dispatch($envelope);
}
public function testItExceptionOnBusNotFound()
{
$this->expectException(InvalidArgumentException::class);
$this->expectExceptionMessage('Invalid bus name');
$envelope = new Envelope(new \stdClass(), new BusNameStamp('foo_bus'));
$container = $this->createMock(ContainerInterface::class);
$container->expects($this->once())->method('has')->willReturn(false);
$routableBus = new RoutableMessageBus($container);
$routableBus->dispatch($envelope);
}
}