Fixing a bug where a transport could receive a message and dispatch it to a different bus
This commit is contained in:
parent
6ff11858b1
commit
ef077cf26c
@ -1616,8 +1616,14 @@ class FrameworkExtension extends Extension
|
||||
}
|
||||
|
||||
$defaultMiddleware = [
|
||||
'before' => [['id' => 'dispatch_after_current_bus']],
|
||||
'after' => [['id' => 'send_message'], ['id' => 'handle_message']],
|
||||
'before' => [
|
||||
['id' => 'add_bus_name_stamp_middleware'],
|
||||
['id' => 'dispatch_after_current_bus'],
|
||||
],
|
||||
'after' => [
|
||||
['id' => 'send_message'],
|
||||
['id' => 'handle_message'],
|
||||
],
|
||||
];
|
||||
foreach ($config['buses'] as $busId => $bus) {
|
||||
$middleware = $bus['middleware'];
|
||||
@ -1628,6 +1634,10 @@ class FrameworkExtension extends Extension
|
||||
} else {
|
||||
unset($defaultMiddleware['after'][1]['arguments']);
|
||||
}
|
||||
|
||||
// argument to add_bus_name_stamp_middleware
|
||||
$defaultMiddleware['before'][0]['arguments'] = [$busId];
|
||||
|
||||
$middleware = array_merge($defaultMiddleware['before'], $middleware, $defaultMiddleware['after']);
|
||||
}
|
||||
|
||||
|
@ -81,7 +81,6 @@
|
||||
<argument type="service" id="messenger.receiver_locator" />
|
||||
<argument type="service" id="logger" on-invalid="null" />
|
||||
<argument type="collection" /> <!-- Receiver names -->
|
||||
<argument type="collection" /> <!-- Message bus names -->
|
||||
<argument type="service" id="messenger.retry_strategy_locator" />
|
||||
<argument type="service" id="event_dispatcher" />
|
||||
|
||||
|
@ -39,6 +39,8 @@
|
||||
</call>
|
||||
</service>
|
||||
|
||||
<service id="messenger.middleware.add_bus_name_stamp_middleware" class="Symfony\Component\Messenger\Middleware\AddBusNameStampMiddleware" abstract="true" />
|
||||
|
||||
<service id="messenger.middleware.dispatch_after_current_bus" class="Symfony\Component\Messenger\Middleware\DispatchAfterCurrentBusMiddleware" />
|
||||
|
||||
<service id="messenger.middleware.validation" class="Symfony\Component\Messenger\Middleware\ValidationMiddleware">
|
||||
|
@ -727,6 +727,7 @@ abstract class FrameworkExtensionTest extends TestCase
|
||||
$this->assertTrue($container->has('messenger.bus.commands'));
|
||||
$this->assertSame([], $container->getDefinition('messenger.bus.commands')->getArgument(0));
|
||||
$this->assertEquals([
|
||||
['id' => 'add_bus_name_stamp_middleware', 'arguments' => ['messenger.bus.commands']],
|
||||
['id' => 'dispatch_after_current_bus'],
|
||||
['id' => 'send_message'],
|
||||
['id' => 'handle_message'],
|
||||
@ -734,6 +735,7 @@ abstract class FrameworkExtensionTest extends TestCase
|
||||
$this->assertTrue($container->has('messenger.bus.events'));
|
||||
$this->assertSame([], $container->getDefinition('messenger.bus.events')->getArgument(0));
|
||||
$this->assertEquals([
|
||||
['id' => 'add_bus_name_stamp_middleware', 'arguments' => ['messenger.bus.events']],
|
||||
['id' => 'dispatch_after_current_bus'],
|
||||
['id' => 'with_factory', 'arguments' => ['foo', true, ['bar' => 'baz']]],
|
||||
['id' => 'send_message'],
|
||||
|
@ -4,6 +4,11 @@ CHANGELOG
|
||||
4.3.0
|
||||
-----
|
||||
|
||||
* New classes: `RoutableMessageBus`, `AddBusNameStampMiddleware`
|
||||
and `BusNameStamp` were added, which allow you to add a bus identifier
|
||||
to the `Envelope` then find the correct bus when receiving from
|
||||
the transport. See `ConsumeMessagesCommand`.
|
||||
* An optional `ConsumeMessagesCommand` constructor argument was removed.
|
||||
* [BC BREAK] 2 new methods were added to `ReceiverInterface`:
|
||||
`ack()` and `reject()`.
|
||||
* [BC BREAK] Error handling was moved from the receivers into
|
||||
|
@ -21,6 +21,7 @@ use Symfony\Component\Console\Input\InputOption;
|
||||
use Symfony\Component\Console\Output\OutputInterface;
|
||||
use Symfony\Component\Console\Style\SymfonyStyle;
|
||||
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
|
||||
use Symfony\Component\Messenger\RoutableMessageBus;
|
||||
use Symfony\Component\Messenger\Transport\Receiver\StopWhenMemoryUsageIsExceededReceiver;
|
||||
use Symfony\Component\Messenger\Transport\Receiver\StopWhenMessageCountIsExceededReceiver;
|
||||
use Symfony\Component\Messenger\Transport\Receiver\StopWhenTimeLimitIsReachedReceiver;
|
||||
@ -39,17 +40,15 @@ class ConsumeMessagesCommand extends Command
|
||||
private $receiverLocator;
|
||||
private $logger;
|
||||
private $receiverNames;
|
||||
private $busNames;
|
||||
private $retryStrategyLocator;
|
||||
private $eventDispatcher;
|
||||
|
||||
public function __construct(ContainerInterface $busLocator, ContainerInterface $receiverLocator, LoggerInterface $logger = null, array $receiverNames = [], array $busNames = [], ContainerInterface $retryStrategyLocator = null, EventDispatcherInterface $eventDispatcher = null)
|
||||
public function __construct(ContainerInterface $busLocator, ContainerInterface $receiverLocator, LoggerInterface $logger = null, array $receiverNames = [], ContainerInterface $retryStrategyLocator = null, EventDispatcherInterface $eventDispatcher = null)
|
||||
{
|
||||
$this->busLocator = $busLocator;
|
||||
$this->receiverLocator = $receiverLocator;
|
||||
$this->logger = $logger;
|
||||
$this->receiverNames = $receiverNames;
|
||||
$this->busNames = $busNames;
|
||||
$this->retryStrategyLocator = $retryStrategyLocator;
|
||||
$this->eventDispatcher = $eventDispatcher;
|
||||
|
||||
@ -62,7 +61,6 @@ class ConsumeMessagesCommand extends Command
|
||||
protected function configure(): void
|
||||
{
|
||||
$defaultReceiverName = 1 === \count($this->receiverNames) ? current($this->receiverNames) : null;
|
||||
$defaultBusName = 1 === \count($this->busNames) ? current($this->busNames) : null;
|
||||
|
||||
$this
|
||||
->setDefinition([
|
||||
@ -70,7 +68,7 @@ class ConsumeMessagesCommand extends Command
|
||||
new InputOption('limit', 'l', InputOption::VALUE_REQUIRED, 'Limit the number of received messages'),
|
||||
new InputOption('memory-limit', 'm', InputOption::VALUE_REQUIRED, 'The memory limit the worker can consume'),
|
||||
new InputOption('time-limit', 't', InputOption::VALUE_REQUIRED, 'The time limit in seconds the worker can run'),
|
||||
new InputOption('bus', 'b', InputOption::VALUE_REQUIRED, 'Name of the bus to which received messages should be dispatched', $defaultBusName),
|
||||
new InputOption('bus', 'b', InputOption::VALUE_REQUIRED, 'Name of the bus to which received messages should be dispatched (if not passed, bus is determined automatically.'),
|
||||
])
|
||||
->setDescription('Consumes messages')
|
||||
->setHelp(<<<'EOF'
|
||||
@ -89,6 +87,12 @@ Use the --memory-limit option to stop the worker if it exceeds a given memory us
|
||||
Use the --time-limit option to stop the worker when the given time limit (in seconds) is reached:
|
||||
|
||||
<info>php %command.full_name% <receiver-name> --time-limit=3600</info>
|
||||
|
||||
Use the --bus option to specify the message bus to dispatch received messages
|
||||
to instead of trying to determine it automatically. This is required if the
|
||||
messages didn't originate from Messenger:
|
||||
|
||||
<info>php %command.full_name% <receiver-name> --bus=event_bus</info>
|
||||
EOF
|
||||
)
|
||||
;
|
||||
@ -112,24 +116,6 @@ EOF
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
$busName = $input->getOption('bus');
|
||||
if ($this->busNames && !$this->busLocator->has($busName)) {
|
||||
if (null === $busName) {
|
||||
$io->block('Missing bus argument.', null, 'error', ' ', true);
|
||||
$input->setOption('bus', $io->choice('Select one of the available buses', $this->busNames));
|
||||
} elseif ($alternatives = $this->findAlternatives($busName, $this->busNames)) {
|
||||
$io->block(sprintf('Bus "%s" is not defined.', $busName), null, 'error', ' ', true);
|
||||
|
||||
if (1 === \count($alternatives)) {
|
||||
if ($io->confirm(sprintf('Do you want to dispatch to "%s" instead? ', $alternatives[0]), true)) {
|
||||
$input->setOption('bus', $alternatives[0]);
|
||||
}
|
||||
} else {
|
||||
$input->setOption('bus', $io->choice('Did you mean one of the following buses instead?', $alternatives, $alternatives[0]));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -147,18 +133,19 @@ EOF
|
||||
throw new RuntimeException(sprintf('Receiver "%s" does not exist.', $receiverName));
|
||||
}
|
||||
|
||||
if (!$this->busLocator->has($busName = $input->getOption('bus'))) {
|
||||
throw new RuntimeException(sprintf('Bus "%s" does not exist.', $busName));
|
||||
}
|
||||
|
||||
if (null !== $this->retryStrategyLocator && !$this->retryStrategyLocator->has($receiverName)) {
|
||||
throw new RuntimeException(sprintf('Receiver "%s" does not have a configured retry strategy.', $receiverName));
|
||||
}
|
||||
|
||||
$receiver = $this->receiverLocator->get($receiverName);
|
||||
$bus = $this->busLocator->get($busName);
|
||||
$retryStrategy = null !== $this->retryStrategyLocator ? $this->retryStrategyLocator->get($receiverName) : null;
|
||||
|
||||
if (null !== $input->getOption('bus')) {
|
||||
$bus = $this->busLocator->get($input->getOption('bus'));
|
||||
} else {
|
||||
$bus = new RoutableMessageBus($this->busLocator);
|
||||
}
|
||||
|
||||
$stopsWhen = [];
|
||||
if ($limit = $input->getOption('limit')) {
|
||||
$stopsWhen[] = "processed {$limit} messages";
|
||||
@ -176,7 +163,7 @@ EOF
|
||||
}
|
||||
|
||||
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
|
||||
$io->success(sprintf('Consuming messages from transport "%s" on bus "%s".', $receiverName, $busName));
|
||||
$io->success(sprintf('Consuming messages from transport "%s".', $receiverName));
|
||||
|
||||
if ($stopsWhen) {
|
||||
$last = array_pop($stopsWhen);
|
||||
|
@ -248,8 +248,7 @@ class MessengerPass implements CompilerPassInterface
|
||||
|
||||
$container->getDefinition('console.command.messenger_consume_messages')
|
||||
->replaceArgument(0, ServiceLocatorTagPass::register($container, $buses))
|
||||
->replaceArgument(3, array_values($receiverNames))
|
||||
->replaceArgument(4, $busIds);
|
||||
->replaceArgument(3, array_values($receiverNames));
|
||||
}
|
||||
|
||||
$container->getDefinition('messenger.receiver_locator')->replaceArgument(0, $receiverMapping);
|
||||
|
@ -0,0 +1,39 @@
|
||||
<?php
|
||||
|
||||
/*
|
||||
* This file is part of the Symfony package.
|
||||
*
|
||||
* (c) Fabien Potencier <fabien@symfony.com>
|
||||
*
|
||||
* For the full copyright and license information, please view the LICENSE
|
||||
* file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
namespace Symfony\Component\Messenger\Middleware;
|
||||
|
||||
use Symfony\Component\Messenger\Envelope;
|
||||
use Symfony\Component\Messenger\Stamp\BusNameStamp;
|
||||
|
||||
/**
|
||||
* Adds the BusNameStamp to the bus.
|
||||
*
|
||||
* @experimental in 4.3
|
||||
*
|
||||
* @author Ryan Weaver <ryan@symfonycasts.com>
|
||||
*/
|
||||
class AddBusNameStampMiddleware implements MiddlewareInterface
|
||||
{
|
||||
private $busName;
|
||||
|
||||
public function __construct(string $busName)
|
||||
{
|
||||
$this->busName = $busName;
|
||||
}
|
||||
|
||||
public function handle(Envelope $envelope, StackInterface $stack): Envelope
|
||||
{
|
||||
$envelope = $envelope->with(new BusNameStamp($this->busName));
|
||||
|
||||
return $stack->next()->handle($envelope, $stack);
|
||||
}
|
||||
}
|
58
src/Symfony/Component/Messenger/RoutableMessageBus.php
Normal file
58
src/Symfony/Component/Messenger/RoutableMessageBus.php
Normal file
@ -0,0 +1,58 @@
|
||||
<?php
|
||||
|
||||
/*
|
||||
* This file is part of the Symfony package.
|
||||
*
|
||||
* (c) Fabien Potencier <fabien@symfony.com>
|
||||
*
|
||||
* For the full copyright and license information, please view the LICENSE
|
||||
* file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
namespace Symfony\Component\Messenger;
|
||||
|
||||
use Psr\Container\ContainerInterface;
|
||||
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
|
||||
use Symfony\Component\Messenger\Stamp\BusNameStamp;
|
||||
|
||||
/**
|
||||
* Bus of buses that is routable using a BusNameStamp.
|
||||
*
|
||||
* This is useful when passed to Worker: messages received
|
||||
* from the transport can be sent to the correct bus.
|
||||
*
|
||||
* @experimental in 4.3
|
||||
*
|
||||
* @author Ryan Weaver <ryan@symfonycasts.com>
|
||||
*/
|
||||
class RoutableMessageBus implements MessageBusInterface
|
||||
{
|
||||
private $busLocator;
|
||||
|
||||
/**
|
||||
* @param ContainerInterface $busLocator A locator full of MessageBusInterface objects
|
||||
*/
|
||||
public function __construct(ContainerInterface $busLocator)
|
||||
{
|
||||
$this->busLocator = $busLocator;
|
||||
}
|
||||
|
||||
public function dispatch($envelope): Envelope
|
||||
{
|
||||
if (!$envelope instanceof Envelope) {
|
||||
throw new InvalidArgumentException('Messages passed to RoutableMessageBus::dispatch() must be inside an Envelope');
|
||||
}
|
||||
|
||||
/** @var BusNameStamp $busNameStamp */
|
||||
$busNameStamp = $envelope->last(BusNameStamp::class);
|
||||
if (null === $busNameStamp) {
|
||||
throw new InvalidArgumentException('Envelope does not contain a BusNameStamp.');
|
||||
}
|
||||
|
||||
if (!$this->busLocator->has($busNameStamp->getBusName())) {
|
||||
throw new InvalidArgumentException(sprintf('Invalid bus name "%s" on BusNameStamp.', $busNameStamp->getBusName()));
|
||||
}
|
||||
|
||||
return $this->busLocator->get($busNameStamp->getBusName())->dispatch($envelope);
|
||||
}
|
||||
}
|
34
src/Symfony/Component/Messenger/Stamp/BusNameStamp.php
Normal file
34
src/Symfony/Component/Messenger/Stamp/BusNameStamp.php
Normal file
@ -0,0 +1,34 @@
|
||||
<?php
|
||||
|
||||
/*
|
||||
* This file is part of the Symfony package.
|
||||
*
|
||||
* (c) Fabien Potencier <fabien@symfony.com>
|
||||
*
|
||||
* For the full copyright and license information, please view the LICENSE
|
||||
* file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
namespace Symfony\Component\Messenger\Stamp;
|
||||
|
||||
/**
|
||||
* Stamp used to identify which bus it was passed to.
|
||||
*
|
||||
* @experimental in 4.3
|
||||
*
|
||||
* @author Ryan Weaver <ryan@symfonycasts.com>
|
||||
*/
|
||||
class BusNameStamp implements StampInterface
|
||||
{
|
||||
private $busName;
|
||||
|
||||
public function __construct(string $busName)
|
||||
{
|
||||
$this->busName = $busName;
|
||||
}
|
||||
|
||||
public function getBusName(): string
|
||||
{
|
||||
return $this->busName;
|
||||
}
|
||||
}
|
@ -263,7 +263,6 @@ class MessengerPassTest extends TestCase
|
||||
(new MessengerPass())->process($container);
|
||||
|
||||
$this->assertSame(['amqp', 'dummy'], $container->getDefinition('console.command.messenger_consume_messages')->getArgument(3));
|
||||
$this->assertSame(['message_bus'], $container->getDefinition('console.command.messenger_consume_messages')->getArgument(4));
|
||||
}
|
||||
|
||||
public function testItShouldNotThrowIfGeneratorIsReturnedInsteadOfArray()
|
||||
|
@ -0,0 +1,33 @@
|
||||
<?php
|
||||
|
||||
/*
|
||||
* This file is part of the Symfony package.
|
||||
*
|
||||
* (c) Fabien Potencier <fabien@symfony.com>
|
||||
*
|
||||
* For the full copyright and license information, please view the LICENSE
|
||||
* file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
namespace Symfony\Component\Messenger\Tests\Middleware;
|
||||
|
||||
use Symfony\Component\Messenger\Envelope;
|
||||
use Symfony\Component\Messenger\Middleware\AddBusNameStampMiddleware;
|
||||
use Symfony\Component\Messenger\Stamp\BusNameStamp;
|
||||
use Symfony\Component\Messenger\Test\Middleware\MiddlewareTestCase;
|
||||
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
||||
|
||||
class AddBusNameStampMiddlewareTest extends MiddlewareTestCase
|
||||
{
|
||||
public function testItSendsTheMessageToAssignedSender()
|
||||
{
|
||||
$middleware = new AddBusNameStampMiddleware('the_bus_name');
|
||||
$envelope = new Envelope(new DummyMessage('the message'));
|
||||
|
||||
$finalEnvelope = $middleware->handle($envelope, $this->getStackMock());
|
||||
/** @var BusNameStamp $busNameStamp */
|
||||
$busNameStamp = $finalEnvelope->last(BusNameStamp::class);
|
||||
$this->assertNotNull($busNameStamp);
|
||||
$this->assertSame('the_bus_name', $busNameStamp->getBusName());
|
||||
}
|
||||
}
|
@ -0,0 +1,69 @@
|
||||
<?php
|
||||
|
||||
/*
|
||||
* This file is part of the Symfony package.
|
||||
*
|
||||
* (c) Fabien Potencier <fabien@symfony.com>
|
||||
*
|
||||
* For the full copyright and license information, please view the LICENSE
|
||||
* file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
namespace Symfony\Component\Messenger\Tests;
|
||||
|
||||
use PHPUnit\Framework\TestCase;
|
||||
use Psr\Container\ContainerInterface;
|
||||
use Symfony\Component\Messenger\Envelope;
|
||||
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
|
||||
use Symfony\Component\Messenger\MessageBusInterface;
|
||||
use Symfony\Component\Messenger\RoutableMessageBus;
|
||||
use Symfony\Component\Messenger\Stamp\BusNameStamp;
|
||||
|
||||
class RoutableMessageBusTest extends TestCase
|
||||
{
|
||||
public function testItRoutesToTheCorrectBus()
|
||||
{
|
||||
$envelope = new Envelope(new \stdClass(), new BusNameStamp('foo_bus'));
|
||||
|
||||
$bus1 = $this->createMock(MessageBusInterface::class);
|
||||
$bus2 = $this->createMock(MessageBusInterface::class);
|
||||
|
||||
$container = $this->createMock(ContainerInterface::class);
|
||||
$container->expects($this->once())->method('has')->with('foo_bus')->willReturn(true);
|
||||
$container->expects($this->once())->method('get')->will($this->returnValue($bus2));
|
||||
|
||||
$bus1->expects($this->never())->method('dispatch');
|
||||
$bus2->expects($this->once())->method('dispatch')->with($envelope)->willReturn($envelope);
|
||||
|
||||
$routableBus = new RoutableMessageBus($container);
|
||||
$this->assertSame($envelope, $routableBus->dispatch($envelope));
|
||||
}
|
||||
|
||||
public function testItExceptionOnMissingStamp()
|
||||
{
|
||||
$this->expectException(InvalidArgumentException::class);
|
||||
$this->expectExceptionMessage('does not contain a BusNameStamp');
|
||||
|
||||
$envelope = new Envelope(new \stdClass());
|
||||
|
||||
$container = $this->createMock(ContainerInterface::class);
|
||||
$container->expects($this->never())->method('has');
|
||||
|
||||
$routableBus = new RoutableMessageBus($container);
|
||||
$routableBus->dispatch($envelope);
|
||||
}
|
||||
|
||||
public function testItExceptionOnBusNotFound()
|
||||
{
|
||||
$this->expectException(InvalidArgumentException::class);
|
||||
$this->expectExceptionMessage('Invalid bus name');
|
||||
|
||||
$envelope = new Envelope(new \stdClass(), new BusNameStamp('foo_bus'));
|
||||
|
||||
$container = $this->createMock(ContainerInterface::class);
|
||||
$container->expects($this->once())->method('has')->willReturn(false);
|
||||
|
||||
$routableBus = new RoutableMessageBus($container);
|
||||
$routableBus->dispatch($envelope);
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user