Fixing a bug where a transport could receive a message and dispatch it to a different bus
This commit is contained in:
parent
6ff11858b1
commit
ef077cf26c
@ -1616,8 +1616,14 @@ class FrameworkExtension extends Extension
|
|||||||
}
|
}
|
||||||
|
|
||||||
$defaultMiddleware = [
|
$defaultMiddleware = [
|
||||||
'before' => [['id' => 'dispatch_after_current_bus']],
|
'before' => [
|
||||||
'after' => [['id' => 'send_message'], ['id' => 'handle_message']],
|
['id' => 'add_bus_name_stamp_middleware'],
|
||||||
|
['id' => 'dispatch_after_current_bus'],
|
||||||
|
],
|
||||||
|
'after' => [
|
||||||
|
['id' => 'send_message'],
|
||||||
|
['id' => 'handle_message'],
|
||||||
|
],
|
||||||
];
|
];
|
||||||
foreach ($config['buses'] as $busId => $bus) {
|
foreach ($config['buses'] as $busId => $bus) {
|
||||||
$middleware = $bus['middleware'];
|
$middleware = $bus['middleware'];
|
||||||
@ -1628,6 +1634,10 @@ class FrameworkExtension extends Extension
|
|||||||
} else {
|
} else {
|
||||||
unset($defaultMiddleware['after'][1]['arguments']);
|
unset($defaultMiddleware['after'][1]['arguments']);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// argument to add_bus_name_stamp_middleware
|
||||||
|
$defaultMiddleware['before'][0]['arguments'] = [$busId];
|
||||||
|
|
||||||
$middleware = array_merge($defaultMiddleware['before'], $middleware, $defaultMiddleware['after']);
|
$middleware = array_merge($defaultMiddleware['before'], $middleware, $defaultMiddleware['after']);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -81,7 +81,6 @@
|
|||||||
<argument type="service" id="messenger.receiver_locator" />
|
<argument type="service" id="messenger.receiver_locator" />
|
||||||
<argument type="service" id="logger" on-invalid="null" />
|
<argument type="service" id="logger" on-invalid="null" />
|
||||||
<argument type="collection" /> <!-- Receiver names -->
|
<argument type="collection" /> <!-- Receiver names -->
|
||||||
<argument type="collection" /> <!-- Message bus names -->
|
|
||||||
<argument type="service" id="messenger.retry_strategy_locator" />
|
<argument type="service" id="messenger.retry_strategy_locator" />
|
||||||
<argument type="service" id="event_dispatcher" />
|
<argument type="service" id="event_dispatcher" />
|
||||||
|
|
||||||
|
@ -39,6 +39,8 @@
|
|||||||
</call>
|
</call>
|
||||||
</service>
|
</service>
|
||||||
|
|
||||||
|
<service id="messenger.middleware.add_bus_name_stamp_middleware" class="Symfony\Component\Messenger\Middleware\AddBusNameStampMiddleware" abstract="true" />
|
||||||
|
|
||||||
<service id="messenger.middleware.dispatch_after_current_bus" class="Symfony\Component\Messenger\Middleware\DispatchAfterCurrentBusMiddleware" />
|
<service id="messenger.middleware.dispatch_after_current_bus" class="Symfony\Component\Messenger\Middleware\DispatchAfterCurrentBusMiddleware" />
|
||||||
|
|
||||||
<service id="messenger.middleware.validation" class="Symfony\Component\Messenger\Middleware\ValidationMiddleware">
|
<service id="messenger.middleware.validation" class="Symfony\Component\Messenger\Middleware\ValidationMiddleware">
|
||||||
|
@ -727,6 +727,7 @@ abstract class FrameworkExtensionTest extends TestCase
|
|||||||
$this->assertTrue($container->has('messenger.bus.commands'));
|
$this->assertTrue($container->has('messenger.bus.commands'));
|
||||||
$this->assertSame([], $container->getDefinition('messenger.bus.commands')->getArgument(0));
|
$this->assertSame([], $container->getDefinition('messenger.bus.commands')->getArgument(0));
|
||||||
$this->assertEquals([
|
$this->assertEquals([
|
||||||
|
['id' => 'add_bus_name_stamp_middleware', 'arguments' => ['messenger.bus.commands']],
|
||||||
['id' => 'dispatch_after_current_bus'],
|
['id' => 'dispatch_after_current_bus'],
|
||||||
['id' => 'send_message'],
|
['id' => 'send_message'],
|
||||||
['id' => 'handle_message'],
|
['id' => 'handle_message'],
|
||||||
@ -734,6 +735,7 @@ abstract class FrameworkExtensionTest extends TestCase
|
|||||||
$this->assertTrue($container->has('messenger.bus.events'));
|
$this->assertTrue($container->has('messenger.bus.events'));
|
||||||
$this->assertSame([], $container->getDefinition('messenger.bus.events')->getArgument(0));
|
$this->assertSame([], $container->getDefinition('messenger.bus.events')->getArgument(0));
|
||||||
$this->assertEquals([
|
$this->assertEquals([
|
||||||
|
['id' => 'add_bus_name_stamp_middleware', 'arguments' => ['messenger.bus.events']],
|
||||||
['id' => 'dispatch_after_current_bus'],
|
['id' => 'dispatch_after_current_bus'],
|
||||||
['id' => 'with_factory', 'arguments' => ['foo', true, ['bar' => 'baz']]],
|
['id' => 'with_factory', 'arguments' => ['foo', true, ['bar' => 'baz']]],
|
||||||
['id' => 'send_message'],
|
['id' => 'send_message'],
|
||||||
|
@ -4,6 +4,11 @@ CHANGELOG
|
|||||||
4.3.0
|
4.3.0
|
||||||
-----
|
-----
|
||||||
|
|
||||||
|
* New classes: `RoutableMessageBus`, `AddBusNameStampMiddleware`
|
||||||
|
and `BusNameStamp` were added, which allow you to add a bus identifier
|
||||||
|
to the `Envelope` then find the correct bus when receiving from
|
||||||
|
the transport. See `ConsumeMessagesCommand`.
|
||||||
|
* An optional `ConsumeMessagesCommand` constructor argument was removed.
|
||||||
* [BC BREAK] 2 new methods were added to `ReceiverInterface`:
|
* [BC BREAK] 2 new methods were added to `ReceiverInterface`:
|
||||||
`ack()` and `reject()`.
|
`ack()` and `reject()`.
|
||||||
* [BC BREAK] Error handling was moved from the receivers into
|
* [BC BREAK] Error handling was moved from the receivers into
|
||||||
|
@ -21,6 +21,7 @@ use Symfony\Component\Console\Input\InputOption;
|
|||||||
use Symfony\Component\Console\Output\OutputInterface;
|
use Symfony\Component\Console\Output\OutputInterface;
|
||||||
use Symfony\Component\Console\Style\SymfonyStyle;
|
use Symfony\Component\Console\Style\SymfonyStyle;
|
||||||
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
|
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
|
||||||
|
use Symfony\Component\Messenger\RoutableMessageBus;
|
||||||
use Symfony\Component\Messenger\Transport\Receiver\StopWhenMemoryUsageIsExceededReceiver;
|
use Symfony\Component\Messenger\Transport\Receiver\StopWhenMemoryUsageIsExceededReceiver;
|
||||||
use Symfony\Component\Messenger\Transport\Receiver\StopWhenMessageCountIsExceededReceiver;
|
use Symfony\Component\Messenger\Transport\Receiver\StopWhenMessageCountIsExceededReceiver;
|
||||||
use Symfony\Component\Messenger\Transport\Receiver\StopWhenTimeLimitIsReachedReceiver;
|
use Symfony\Component\Messenger\Transport\Receiver\StopWhenTimeLimitIsReachedReceiver;
|
||||||
@ -39,17 +40,15 @@ class ConsumeMessagesCommand extends Command
|
|||||||
private $receiverLocator;
|
private $receiverLocator;
|
||||||
private $logger;
|
private $logger;
|
||||||
private $receiverNames;
|
private $receiverNames;
|
||||||
private $busNames;
|
|
||||||
private $retryStrategyLocator;
|
private $retryStrategyLocator;
|
||||||
private $eventDispatcher;
|
private $eventDispatcher;
|
||||||
|
|
||||||
public function __construct(ContainerInterface $busLocator, ContainerInterface $receiverLocator, LoggerInterface $logger = null, array $receiverNames = [], array $busNames = [], ContainerInterface $retryStrategyLocator = null, EventDispatcherInterface $eventDispatcher = null)
|
public function __construct(ContainerInterface $busLocator, ContainerInterface $receiverLocator, LoggerInterface $logger = null, array $receiverNames = [], ContainerInterface $retryStrategyLocator = null, EventDispatcherInterface $eventDispatcher = null)
|
||||||
{
|
{
|
||||||
$this->busLocator = $busLocator;
|
$this->busLocator = $busLocator;
|
||||||
$this->receiverLocator = $receiverLocator;
|
$this->receiverLocator = $receiverLocator;
|
||||||
$this->logger = $logger;
|
$this->logger = $logger;
|
||||||
$this->receiverNames = $receiverNames;
|
$this->receiverNames = $receiverNames;
|
||||||
$this->busNames = $busNames;
|
|
||||||
$this->retryStrategyLocator = $retryStrategyLocator;
|
$this->retryStrategyLocator = $retryStrategyLocator;
|
||||||
$this->eventDispatcher = $eventDispatcher;
|
$this->eventDispatcher = $eventDispatcher;
|
||||||
|
|
||||||
@ -62,7 +61,6 @@ class ConsumeMessagesCommand extends Command
|
|||||||
protected function configure(): void
|
protected function configure(): void
|
||||||
{
|
{
|
||||||
$defaultReceiverName = 1 === \count($this->receiverNames) ? current($this->receiverNames) : null;
|
$defaultReceiverName = 1 === \count($this->receiverNames) ? current($this->receiverNames) : null;
|
||||||
$defaultBusName = 1 === \count($this->busNames) ? current($this->busNames) : null;
|
|
||||||
|
|
||||||
$this
|
$this
|
||||||
->setDefinition([
|
->setDefinition([
|
||||||
@ -70,7 +68,7 @@ class ConsumeMessagesCommand extends Command
|
|||||||
new InputOption('limit', 'l', InputOption::VALUE_REQUIRED, 'Limit the number of received messages'),
|
new InputOption('limit', 'l', InputOption::VALUE_REQUIRED, 'Limit the number of received messages'),
|
||||||
new InputOption('memory-limit', 'm', InputOption::VALUE_REQUIRED, 'The memory limit the worker can consume'),
|
new InputOption('memory-limit', 'm', InputOption::VALUE_REQUIRED, 'The memory limit the worker can consume'),
|
||||||
new InputOption('time-limit', 't', InputOption::VALUE_REQUIRED, 'The time limit in seconds the worker can run'),
|
new InputOption('time-limit', 't', InputOption::VALUE_REQUIRED, 'The time limit in seconds the worker can run'),
|
||||||
new InputOption('bus', 'b', InputOption::VALUE_REQUIRED, 'Name of the bus to which received messages should be dispatched', $defaultBusName),
|
new InputOption('bus', 'b', InputOption::VALUE_REQUIRED, 'Name of the bus to which received messages should be dispatched (if not passed, bus is determined automatically.'),
|
||||||
])
|
])
|
||||||
->setDescription('Consumes messages')
|
->setDescription('Consumes messages')
|
||||||
->setHelp(<<<'EOF'
|
->setHelp(<<<'EOF'
|
||||||
@ -89,6 +87,12 @@ Use the --memory-limit option to stop the worker if it exceeds a given memory us
|
|||||||
Use the --time-limit option to stop the worker when the given time limit (in seconds) is reached:
|
Use the --time-limit option to stop the worker when the given time limit (in seconds) is reached:
|
||||||
|
|
||||||
<info>php %command.full_name% <receiver-name> --time-limit=3600</info>
|
<info>php %command.full_name% <receiver-name> --time-limit=3600</info>
|
||||||
|
|
||||||
|
Use the --bus option to specify the message bus to dispatch received messages
|
||||||
|
to instead of trying to determine it automatically. This is required if the
|
||||||
|
messages didn't originate from Messenger:
|
||||||
|
|
||||||
|
<info>php %command.full_name% <receiver-name> --bus=event_bus</info>
|
||||||
EOF
|
EOF
|
||||||
)
|
)
|
||||||
;
|
;
|
||||||
@ -112,24 +116,6 @@ EOF
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
$busName = $input->getOption('bus');
|
|
||||||
if ($this->busNames && !$this->busLocator->has($busName)) {
|
|
||||||
if (null === $busName) {
|
|
||||||
$io->block('Missing bus argument.', null, 'error', ' ', true);
|
|
||||||
$input->setOption('bus', $io->choice('Select one of the available buses', $this->busNames));
|
|
||||||
} elseif ($alternatives = $this->findAlternatives($busName, $this->busNames)) {
|
|
||||||
$io->block(sprintf('Bus "%s" is not defined.', $busName), null, 'error', ' ', true);
|
|
||||||
|
|
||||||
if (1 === \count($alternatives)) {
|
|
||||||
if ($io->confirm(sprintf('Do you want to dispatch to "%s" instead? ', $alternatives[0]), true)) {
|
|
||||||
$input->setOption('bus', $alternatives[0]);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
$input->setOption('bus', $io->choice('Did you mean one of the following buses instead?', $alternatives, $alternatives[0]));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -147,18 +133,19 @@ EOF
|
|||||||
throw new RuntimeException(sprintf('Receiver "%s" does not exist.', $receiverName));
|
throw new RuntimeException(sprintf('Receiver "%s" does not exist.', $receiverName));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!$this->busLocator->has($busName = $input->getOption('bus'))) {
|
|
||||||
throw new RuntimeException(sprintf('Bus "%s" does not exist.', $busName));
|
|
||||||
}
|
|
||||||
|
|
||||||
if (null !== $this->retryStrategyLocator && !$this->retryStrategyLocator->has($receiverName)) {
|
if (null !== $this->retryStrategyLocator && !$this->retryStrategyLocator->has($receiverName)) {
|
||||||
throw new RuntimeException(sprintf('Receiver "%s" does not have a configured retry strategy.', $receiverName));
|
throw new RuntimeException(sprintf('Receiver "%s" does not have a configured retry strategy.', $receiverName));
|
||||||
}
|
}
|
||||||
|
|
||||||
$receiver = $this->receiverLocator->get($receiverName);
|
$receiver = $this->receiverLocator->get($receiverName);
|
||||||
$bus = $this->busLocator->get($busName);
|
|
||||||
$retryStrategy = null !== $this->retryStrategyLocator ? $this->retryStrategyLocator->get($receiverName) : null;
|
$retryStrategy = null !== $this->retryStrategyLocator ? $this->retryStrategyLocator->get($receiverName) : null;
|
||||||
|
|
||||||
|
if (null !== $input->getOption('bus')) {
|
||||||
|
$bus = $this->busLocator->get($input->getOption('bus'));
|
||||||
|
} else {
|
||||||
|
$bus = new RoutableMessageBus($this->busLocator);
|
||||||
|
}
|
||||||
|
|
||||||
$stopsWhen = [];
|
$stopsWhen = [];
|
||||||
if ($limit = $input->getOption('limit')) {
|
if ($limit = $input->getOption('limit')) {
|
||||||
$stopsWhen[] = "processed {$limit} messages";
|
$stopsWhen[] = "processed {$limit} messages";
|
||||||
@ -176,7 +163,7 @@ EOF
|
|||||||
}
|
}
|
||||||
|
|
||||||
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
|
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
|
||||||
$io->success(sprintf('Consuming messages from transport "%s" on bus "%s".', $receiverName, $busName));
|
$io->success(sprintf('Consuming messages from transport "%s".', $receiverName));
|
||||||
|
|
||||||
if ($stopsWhen) {
|
if ($stopsWhen) {
|
||||||
$last = array_pop($stopsWhen);
|
$last = array_pop($stopsWhen);
|
||||||
|
@ -248,8 +248,7 @@ class MessengerPass implements CompilerPassInterface
|
|||||||
|
|
||||||
$container->getDefinition('console.command.messenger_consume_messages')
|
$container->getDefinition('console.command.messenger_consume_messages')
|
||||||
->replaceArgument(0, ServiceLocatorTagPass::register($container, $buses))
|
->replaceArgument(0, ServiceLocatorTagPass::register($container, $buses))
|
||||||
->replaceArgument(3, array_values($receiverNames))
|
->replaceArgument(3, array_values($receiverNames));
|
||||||
->replaceArgument(4, $busIds);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
$container->getDefinition('messenger.receiver_locator')->replaceArgument(0, $receiverMapping);
|
$container->getDefinition('messenger.receiver_locator')->replaceArgument(0, $receiverMapping);
|
||||||
|
@ -0,0 +1,39 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This file is part of the Symfony package.
|
||||||
|
*
|
||||||
|
* (c) Fabien Potencier <fabien@symfony.com>
|
||||||
|
*
|
||||||
|
* For the full copyright and license information, please view the LICENSE
|
||||||
|
* file that was distributed with this source code.
|
||||||
|
*/
|
||||||
|
|
||||||
|
namespace Symfony\Component\Messenger\Middleware;
|
||||||
|
|
||||||
|
use Symfony\Component\Messenger\Envelope;
|
||||||
|
use Symfony\Component\Messenger\Stamp\BusNameStamp;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Adds the BusNameStamp to the bus.
|
||||||
|
*
|
||||||
|
* @experimental in 4.3
|
||||||
|
*
|
||||||
|
* @author Ryan Weaver <ryan@symfonycasts.com>
|
||||||
|
*/
|
||||||
|
class AddBusNameStampMiddleware implements MiddlewareInterface
|
||||||
|
{
|
||||||
|
private $busName;
|
||||||
|
|
||||||
|
public function __construct(string $busName)
|
||||||
|
{
|
||||||
|
$this->busName = $busName;
|
||||||
|
}
|
||||||
|
|
||||||
|
public function handle(Envelope $envelope, StackInterface $stack): Envelope
|
||||||
|
{
|
||||||
|
$envelope = $envelope->with(new BusNameStamp($this->busName));
|
||||||
|
|
||||||
|
return $stack->next()->handle($envelope, $stack);
|
||||||
|
}
|
||||||
|
}
|
58
src/Symfony/Component/Messenger/RoutableMessageBus.php
Normal file
58
src/Symfony/Component/Messenger/RoutableMessageBus.php
Normal file
@ -0,0 +1,58 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This file is part of the Symfony package.
|
||||||
|
*
|
||||||
|
* (c) Fabien Potencier <fabien@symfony.com>
|
||||||
|
*
|
||||||
|
* For the full copyright and license information, please view the LICENSE
|
||||||
|
* file that was distributed with this source code.
|
||||||
|
*/
|
||||||
|
|
||||||
|
namespace Symfony\Component\Messenger;
|
||||||
|
|
||||||
|
use Psr\Container\ContainerInterface;
|
||||||
|
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
|
||||||
|
use Symfony\Component\Messenger\Stamp\BusNameStamp;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Bus of buses that is routable using a BusNameStamp.
|
||||||
|
*
|
||||||
|
* This is useful when passed to Worker: messages received
|
||||||
|
* from the transport can be sent to the correct bus.
|
||||||
|
*
|
||||||
|
* @experimental in 4.3
|
||||||
|
*
|
||||||
|
* @author Ryan Weaver <ryan@symfonycasts.com>
|
||||||
|
*/
|
||||||
|
class RoutableMessageBus implements MessageBusInterface
|
||||||
|
{
|
||||||
|
private $busLocator;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param ContainerInterface $busLocator A locator full of MessageBusInterface objects
|
||||||
|
*/
|
||||||
|
public function __construct(ContainerInterface $busLocator)
|
||||||
|
{
|
||||||
|
$this->busLocator = $busLocator;
|
||||||
|
}
|
||||||
|
|
||||||
|
public function dispatch($envelope): Envelope
|
||||||
|
{
|
||||||
|
if (!$envelope instanceof Envelope) {
|
||||||
|
throw new InvalidArgumentException('Messages passed to RoutableMessageBus::dispatch() must be inside an Envelope');
|
||||||
|
}
|
||||||
|
|
||||||
|
/** @var BusNameStamp $busNameStamp */
|
||||||
|
$busNameStamp = $envelope->last(BusNameStamp::class);
|
||||||
|
if (null === $busNameStamp) {
|
||||||
|
throw new InvalidArgumentException('Envelope does not contain a BusNameStamp.');
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!$this->busLocator->has($busNameStamp->getBusName())) {
|
||||||
|
throw new InvalidArgumentException(sprintf('Invalid bus name "%s" on BusNameStamp.', $busNameStamp->getBusName()));
|
||||||
|
}
|
||||||
|
|
||||||
|
return $this->busLocator->get($busNameStamp->getBusName())->dispatch($envelope);
|
||||||
|
}
|
||||||
|
}
|
34
src/Symfony/Component/Messenger/Stamp/BusNameStamp.php
Normal file
34
src/Symfony/Component/Messenger/Stamp/BusNameStamp.php
Normal file
@ -0,0 +1,34 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This file is part of the Symfony package.
|
||||||
|
*
|
||||||
|
* (c) Fabien Potencier <fabien@symfony.com>
|
||||||
|
*
|
||||||
|
* For the full copyright and license information, please view the LICENSE
|
||||||
|
* file that was distributed with this source code.
|
||||||
|
*/
|
||||||
|
|
||||||
|
namespace Symfony\Component\Messenger\Stamp;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Stamp used to identify which bus it was passed to.
|
||||||
|
*
|
||||||
|
* @experimental in 4.3
|
||||||
|
*
|
||||||
|
* @author Ryan Weaver <ryan@symfonycasts.com>
|
||||||
|
*/
|
||||||
|
class BusNameStamp implements StampInterface
|
||||||
|
{
|
||||||
|
private $busName;
|
||||||
|
|
||||||
|
public function __construct(string $busName)
|
||||||
|
{
|
||||||
|
$this->busName = $busName;
|
||||||
|
}
|
||||||
|
|
||||||
|
public function getBusName(): string
|
||||||
|
{
|
||||||
|
return $this->busName;
|
||||||
|
}
|
||||||
|
}
|
@ -263,7 +263,6 @@ class MessengerPassTest extends TestCase
|
|||||||
(new MessengerPass())->process($container);
|
(new MessengerPass())->process($container);
|
||||||
|
|
||||||
$this->assertSame(['amqp', 'dummy'], $container->getDefinition('console.command.messenger_consume_messages')->getArgument(3));
|
$this->assertSame(['amqp', 'dummy'], $container->getDefinition('console.command.messenger_consume_messages')->getArgument(3));
|
||||||
$this->assertSame(['message_bus'], $container->getDefinition('console.command.messenger_consume_messages')->getArgument(4));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public function testItShouldNotThrowIfGeneratorIsReturnedInsteadOfArray()
|
public function testItShouldNotThrowIfGeneratorIsReturnedInsteadOfArray()
|
||||||
|
@ -0,0 +1,33 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This file is part of the Symfony package.
|
||||||
|
*
|
||||||
|
* (c) Fabien Potencier <fabien@symfony.com>
|
||||||
|
*
|
||||||
|
* For the full copyright and license information, please view the LICENSE
|
||||||
|
* file that was distributed with this source code.
|
||||||
|
*/
|
||||||
|
|
||||||
|
namespace Symfony\Component\Messenger\Tests\Middleware;
|
||||||
|
|
||||||
|
use Symfony\Component\Messenger\Envelope;
|
||||||
|
use Symfony\Component\Messenger\Middleware\AddBusNameStampMiddleware;
|
||||||
|
use Symfony\Component\Messenger\Stamp\BusNameStamp;
|
||||||
|
use Symfony\Component\Messenger\Test\Middleware\MiddlewareTestCase;
|
||||||
|
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
||||||
|
|
||||||
|
class AddBusNameStampMiddlewareTest extends MiddlewareTestCase
|
||||||
|
{
|
||||||
|
public function testItSendsTheMessageToAssignedSender()
|
||||||
|
{
|
||||||
|
$middleware = new AddBusNameStampMiddleware('the_bus_name');
|
||||||
|
$envelope = new Envelope(new DummyMessage('the message'));
|
||||||
|
|
||||||
|
$finalEnvelope = $middleware->handle($envelope, $this->getStackMock());
|
||||||
|
/** @var BusNameStamp $busNameStamp */
|
||||||
|
$busNameStamp = $finalEnvelope->last(BusNameStamp::class);
|
||||||
|
$this->assertNotNull($busNameStamp);
|
||||||
|
$this->assertSame('the_bus_name', $busNameStamp->getBusName());
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,69 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This file is part of the Symfony package.
|
||||||
|
*
|
||||||
|
* (c) Fabien Potencier <fabien@symfony.com>
|
||||||
|
*
|
||||||
|
* For the full copyright and license information, please view the LICENSE
|
||||||
|
* file that was distributed with this source code.
|
||||||
|
*/
|
||||||
|
|
||||||
|
namespace Symfony\Component\Messenger\Tests;
|
||||||
|
|
||||||
|
use PHPUnit\Framework\TestCase;
|
||||||
|
use Psr\Container\ContainerInterface;
|
||||||
|
use Symfony\Component\Messenger\Envelope;
|
||||||
|
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
|
||||||
|
use Symfony\Component\Messenger\MessageBusInterface;
|
||||||
|
use Symfony\Component\Messenger\RoutableMessageBus;
|
||||||
|
use Symfony\Component\Messenger\Stamp\BusNameStamp;
|
||||||
|
|
||||||
|
class RoutableMessageBusTest extends TestCase
|
||||||
|
{
|
||||||
|
public function testItRoutesToTheCorrectBus()
|
||||||
|
{
|
||||||
|
$envelope = new Envelope(new \stdClass(), new BusNameStamp('foo_bus'));
|
||||||
|
|
||||||
|
$bus1 = $this->createMock(MessageBusInterface::class);
|
||||||
|
$bus2 = $this->createMock(MessageBusInterface::class);
|
||||||
|
|
||||||
|
$container = $this->createMock(ContainerInterface::class);
|
||||||
|
$container->expects($this->once())->method('has')->with('foo_bus')->willReturn(true);
|
||||||
|
$container->expects($this->once())->method('get')->will($this->returnValue($bus2));
|
||||||
|
|
||||||
|
$bus1->expects($this->never())->method('dispatch');
|
||||||
|
$bus2->expects($this->once())->method('dispatch')->with($envelope)->willReturn($envelope);
|
||||||
|
|
||||||
|
$routableBus = new RoutableMessageBus($container);
|
||||||
|
$this->assertSame($envelope, $routableBus->dispatch($envelope));
|
||||||
|
}
|
||||||
|
|
||||||
|
public function testItExceptionOnMissingStamp()
|
||||||
|
{
|
||||||
|
$this->expectException(InvalidArgumentException::class);
|
||||||
|
$this->expectExceptionMessage('does not contain a BusNameStamp');
|
||||||
|
|
||||||
|
$envelope = new Envelope(new \stdClass());
|
||||||
|
|
||||||
|
$container = $this->createMock(ContainerInterface::class);
|
||||||
|
$container->expects($this->never())->method('has');
|
||||||
|
|
||||||
|
$routableBus = new RoutableMessageBus($container);
|
||||||
|
$routableBus->dispatch($envelope);
|
||||||
|
}
|
||||||
|
|
||||||
|
public function testItExceptionOnBusNotFound()
|
||||||
|
{
|
||||||
|
$this->expectException(InvalidArgumentException::class);
|
||||||
|
$this->expectExceptionMessage('Invalid bus name');
|
||||||
|
|
||||||
|
$envelope = new Envelope(new \stdClass(), new BusNameStamp('foo_bus'));
|
||||||
|
|
||||||
|
$container = $this->createMock(ContainerInterface::class);
|
||||||
|
$container->expects($this->once())->method('has')->willReturn(false);
|
||||||
|
|
||||||
|
$routableBus = new RoutableMessageBus($container);
|
||||||
|
$routableBus->dispatch($envelope);
|
||||||
|
}
|
||||||
|
}
|
Reference in New Issue
Block a user