[Messenger] implement several senders using a ChainSender
This commit is contained in:
parent
f59ce97eff
commit
198925ee4e
@ -986,7 +986,10 @@ class Configuration implements ConfigurationInterface
|
||||
$newConfig = array();
|
||||
foreach ($config as $k => $v) {
|
||||
if (!\is_int($k)) {
|
||||
$newConfig[$k] = array('senders' => \is_array($v) ? array_values($v) : array($v));
|
||||
$newConfig[$k] = array(
|
||||
'senders' => $v['senders'] ?? (\is_array($v) ? array_values($v) : array($v)),
|
||||
'send_and_handle' => $v['send_and_handle'] ?? false,
|
||||
);
|
||||
} else {
|
||||
$newConfig[$v['message-class']]['senders'] = array_map(
|
||||
function ($a) {
|
||||
@ -994,6 +997,7 @@ class Configuration implements ConfigurationInterface
|
||||
},
|
||||
array_values($v['sender'])
|
||||
);
|
||||
$newConfig[$v['message-class']]['send-and-handle'] = $v['send-and-handle'] ?? false;
|
||||
}
|
||||
}
|
||||
|
||||
@ -1006,6 +1010,7 @@ class Configuration implements ConfigurationInterface
|
||||
->requiresAtLeastOneElement()
|
||||
->prototype('scalar')->end()
|
||||
->end()
|
||||
->booleanNode('send_and_handle')->defaultFalse()->end()
|
||||
->end()
|
||||
->end()
|
||||
->end()
|
||||
|
@ -63,6 +63,7 @@ use Symfony\Component\Lock\StoreInterface;
|
||||
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;
|
||||
use Symfony\Component\Messenger\MessageBus;
|
||||
use Symfony\Component\Messenger\MessageBusInterface;
|
||||
use Symfony\Component\Messenger\Transport\ChainSender;
|
||||
use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
|
||||
use Symfony\Component\Messenger\Transport\TransportInterface;
|
||||
use Symfony\Component\PropertyAccess\PropertyAccessor;
|
||||
@ -1494,16 +1495,28 @@ class FrameworkExtension extends Extension
|
||||
throw new LogicException(sprintf('The default bus named "%s" is not defined. Define it or change the default bus name.', $config['default_bus']));
|
||||
}
|
||||
|
||||
$messageToSenderIdsMapping = array();
|
||||
$messageToSenderIdMapping = array();
|
||||
$messageToSendAndHandleMapping = array();
|
||||
foreach ($config['routing'] as $message => $messageConfiguration) {
|
||||
if ('*' !== $message && !class_exists($message) && !interface_exists($message, false)) {
|
||||
throw new LogicException(sprintf('Messenger routing configuration contains a mistake: message "%s" does not exist. It needs to match an existing class or interface.', $message));
|
||||
}
|
||||
|
||||
$messageToSenderIdsMapping[$message] = $messageConfiguration['senders'];
|
||||
if (1 < \count($messageConfiguration['senders'])) {
|
||||
$senders = array_map(function ($sender) { return new Reference($sender); }, $messageConfiguration['senders']);
|
||||
$chainSenderDefinition = new Definition(ChainSender::class, array($senders));
|
||||
$chainSenderId = '.messenger.chain_sender.'.$message;
|
||||
$container->setDefinition($chainSenderId, $chainSenderDefinition);
|
||||
$messageToSenderIdMapping[$message] = $chainSenderId;
|
||||
} else {
|
||||
$messageToSenderIdMapping[$message] = $messageConfiguration['senders'][0];
|
||||
}
|
||||
|
||||
$messageToSendAndHandleMapping[$message] = $messageConfiguration['send_and_handle'];
|
||||
}
|
||||
|
||||
$container->getDefinition('messenger.asynchronous.routing.sender_locator')->replaceArgument(1, $messageToSenderIdsMapping);
|
||||
$container->getDefinition('messenger.asynchronous.routing.sender_locator')->replaceArgument(1, $messageToSenderIdMapping);
|
||||
$container->getDefinition('messenger.middleware.route_messages')->replaceArgument(1, $messageToSendAndHandleMapping);
|
||||
|
||||
foreach ($config['transports'] as $name => $transport) {
|
||||
if (0 === strpos($transport['dsn'], 'amqp://') && !$container->hasDefinition('messenger.transport.amqp.factory')) {
|
||||
|
@ -15,10 +15,11 @@
|
||||
<!-- Asynchronous -->
|
||||
<service id="messenger.asynchronous.routing.sender_locator" class="Symfony\Component\Messenger\Asynchronous\Routing\SenderLocator">
|
||||
<argument type="service" id="messenger.sender_locator" />
|
||||
<argument /> <!-- Message to sender ID mapping -->
|
||||
<argument type="collection" /> <!-- Message to sender ID mapping -->
|
||||
</service>
|
||||
<service id="messenger.middleware.route_messages" class="Symfony\Component\Messenger\Asynchronous\Middleware\SendMessageMiddleware">
|
||||
<argument type="service" id="messenger.asynchronous.routing.sender_locator" />
|
||||
<argument type="collection" /> <!-- Message to send and handle mapping -->
|
||||
</service>
|
||||
|
||||
<!-- Message encoding/decoding -->
|
||||
|
@ -375,6 +375,7 @@
|
||||
<xsd:element name="sender" type="messenger_routing_sender" />
|
||||
</xsd:choice>
|
||||
<xsd:attribute name="message-class" type="xsd:string" use="required"/>
|
||||
<xsd:attribute name="send-and-handle" type="xsd:boolean" default="false"/>
|
||||
</xsd:complexType>
|
||||
|
||||
<xsd:complexType name="messenger_routing_sender">
|
||||
|
@ -3,8 +3,11 @@
|
||||
$container->loadFromExtension('framework', array(
|
||||
'messenger' => array(
|
||||
'routing' => array(
|
||||
'Symfony\Component\Messenger\Tests\Fixtures\DummyMessage' => array('amqp'),
|
||||
'Symfony\Component\Messenger\Tests\Fixtures\SecondMessage' => array('amqp', 'audit', null),
|
||||
'Symfony\Component\Messenger\Tests\Fixtures\DummyMessage' => array('amqp', 'audit'),
|
||||
'Symfony\Component\Messenger\Tests\Fixtures\SecondMessage' => array(
|
||||
'senders' => array('amqp', 'audit'),
|
||||
'send_and_handle' => true,
|
||||
),
|
||||
'*' => 'amqp',
|
||||
),
|
||||
),
|
||||
|
@ -9,11 +9,11 @@
|
||||
<framework:messenger>
|
||||
<framework:routing message-class="Symfony\Component\Messenger\Tests\Fixtures\DummyMessage">
|
||||
<framework:sender service="amqp" />
|
||||
<framework:sender service="audit" />
|
||||
</framework:routing>
|
||||
<framework:routing message-class="Symfony\Component\Messenger\Tests\Fixtures\SecondMessage">
|
||||
<framework:routing message-class="Symfony\Component\Messenger\Tests\Fixtures\SecondMessage" send-and-handle="true">
|
||||
<framework:sender service="amqp" />
|
||||
<framework:sender service="audit" />
|
||||
<framework:sender service="null" />
|
||||
</framework:routing>
|
||||
<framework:routing message-class="*">
|
||||
<framework:sender service="amqp" />
|
||||
|
@ -1,6 +1,8 @@
|
||||
framework:
|
||||
messenger:
|
||||
routing:
|
||||
'Symfony\Component\Messenger\Tests\Fixtures\DummyMessage': amqp
|
||||
'Symfony\Component\Messenger\Tests\Fixtures\SecondMessage': [amqp, audit, ~]
|
||||
'Symfony\Component\Messenger\Tests\Fixtures\DummyMessage': [amqp, audit]
|
||||
'Symfony\Component\Messenger\Tests\Fixtures\SecondMessage':
|
||||
senders: [amqp, audit]
|
||||
send_and_handle: true
|
||||
'*': amqp
|
||||
|
@ -558,14 +558,21 @@ abstract class FrameworkExtensionTest extends TestCase
|
||||
{
|
||||
$container = $this->createContainerFromFile('messenger_routing');
|
||||
$senderLocatorDefinition = $container->getDefinition('messenger.asynchronous.routing.sender_locator');
|
||||
$sendMessageMiddlewareDefinition = $container->getDefinition('messenger.middleware.route_messages');
|
||||
|
||||
$messageToSenderIdsMapping = array(
|
||||
DummyMessage::class => array('amqp'),
|
||||
SecondMessage::class => array('amqp', 'audit', null),
|
||||
'*' => array('amqp'),
|
||||
DummyMessage::class => '.messenger.chain_sender.'.DummyMessage::class,
|
||||
SecondMessage::class => '.messenger.chain_sender.'.SecondMessage::class,
|
||||
'*' => 'amqp',
|
||||
);
|
||||
$messageToSendAndHandleMapping = array(
|
||||
DummyMessage::class => false,
|
||||
SecondMessage::class => true,
|
||||
'*' => false,
|
||||
);
|
||||
|
||||
$this->assertSame($messageToSenderIdsMapping, $senderLocatorDefinition->getArgument(1));
|
||||
$this->assertSame($messageToSendAndHandleMapping, $sendMessageMiddlewareDefinition->getArgument(1));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -11,6 +11,7 @@
|
||||
|
||||
namespace Symfony\Component\Messenger\Asynchronous\Middleware;
|
||||
|
||||
use Symfony\Component\Messenger\Asynchronous\Routing\SenderLocator;
|
||||
use Symfony\Component\Messenger\Asynchronous\Routing\SenderLocatorInterface;
|
||||
use Symfony\Component\Messenger\Asynchronous\Transport\ReceivedMessage;
|
||||
use Symfony\Component\Messenger\Envelope;
|
||||
@ -19,14 +20,17 @@ use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
|
||||
|
||||
/**
|
||||
* @author Samuel Roze <samuel.roze@gmail.com>
|
||||
* @author Tobias Schultze <http://tobion.de>
|
||||
*/
|
||||
class SendMessageMiddleware implements MiddlewareInterface, EnvelopeAwareInterface
|
||||
{
|
||||
private $senderLocator;
|
||||
private $messagesToSendAndHandleMapping;
|
||||
|
||||
public function __construct(SenderLocatorInterface $senderLocator)
|
||||
public function __construct(SenderLocatorInterface $senderLocator, array $messagesToSendAndHandleMapping = array())
|
||||
{
|
||||
$this->senderLocator = $senderLocator;
|
||||
$this->messagesToSendAndHandleMapping = $messagesToSendAndHandleMapping;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -40,20 +44,21 @@ class SendMessageMiddleware implements MiddlewareInterface, EnvelopeAwareInterfa
|
||||
return $next($message);
|
||||
}
|
||||
|
||||
if (!empty($senders = $this->senderLocator->getSendersForMessage($envelope->getMessage()))) {
|
||||
foreach ($senders as $sender) {
|
||||
if (null === $sender) {
|
||||
continue;
|
||||
}
|
||||
$sender = $this->senderLocator->getSenderForMessage($envelope->getMessage());
|
||||
|
||||
$sender->send($envelope);
|
||||
}
|
||||
if ($sender) {
|
||||
$sender->send($envelope);
|
||||
|
||||
if (!\in_array(null, $senders, true)) {
|
||||
if (!$this->mustSendAndHandle($envelope->getMessage())) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
return $next($message);
|
||||
}
|
||||
|
||||
private function mustSendAndHandle($message): bool
|
||||
{
|
||||
return (bool) SenderLocator::getValueFromMessageRouting($this->messagesToSendAndHandleMapping, $message);
|
||||
}
|
||||
}
|
||||
|
@ -12,6 +12,7 @@
|
||||
namespace Symfony\Component\Messenger\Asynchronous\Routing;
|
||||
|
||||
use Psr\Container\ContainerInterface;
|
||||
use Symfony\Component\Messenger\Transport\SenderInterface;
|
||||
|
||||
/**
|
||||
* @author Samuel Roze <samuel.roze@gmail.com>
|
||||
@ -19,42 +20,47 @@ use Psr\Container\ContainerInterface;
|
||||
class SenderLocator implements SenderLocatorInterface
|
||||
{
|
||||
private $senderServiceLocator;
|
||||
private $messageToSenderIdsMapping;
|
||||
private $messageToSenderIdMapping;
|
||||
|
||||
public function __construct(ContainerInterface $senderServiceLocator, array $messageToSenderIdsMapping)
|
||||
public function __construct(ContainerInterface $senderServiceLocator, array $messageToSenderIdMapping)
|
||||
{
|
||||
$this->senderServiceLocator = $senderServiceLocator;
|
||||
$this->messageToSenderIdsMapping = $messageToSenderIdsMapping;
|
||||
$this->messageToSenderIdMapping = $messageToSenderIdMapping;
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function getSendersForMessage($message): array
|
||||
public function getSenderForMessage($message): ?SenderInterface
|
||||
{
|
||||
$senders = array();
|
||||
foreach ($this->getSenderIds($message) as $senderId) {
|
||||
$senders[] = $this->senderServiceLocator->get($senderId);
|
||||
}
|
||||
$senderId = $this->getSenderId($message);
|
||||
|
||||
return $senders;
|
||||
return $senderId ? $this->senderServiceLocator->get($senderId) : null;
|
||||
}
|
||||
|
||||
private function getSenderIds($message): array
|
||||
private function getSenderId($message): ?string
|
||||
{
|
||||
if (isset($this->messageToSenderIdsMapping[\get_class($message)])) {
|
||||
return $this->messageToSenderIdsMapping[\get_class($message)];
|
||||
return self::getValueFromMessageRouting($this->messageToSenderIdMapping, $message);
|
||||
}
|
||||
|
||||
/**
|
||||
* @internal
|
||||
*/
|
||||
public static function getValueFromMessageRouting(array $mapping, $message)
|
||||
{
|
||||
if (isset($mapping[\get_class($message)])) {
|
||||
return $mapping[\get_class($message)];
|
||||
}
|
||||
if ($messageToSenderIdsMapping = array_intersect_key($this->messageToSenderIdsMapping, class_parents($message))) {
|
||||
return current($messageToSenderIdsMapping);
|
||||
if ($parentsMapping = array_intersect_key($mapping, class_parents($message))) {
|
||||
return current($parentsMapping);
|
||||
}
|
||||
if ($messageToSenderIdsMapping = array_intersect_key($this->messageToSenderIdsMapping, class_implements($message))) {
|
||||
return current($messageToSenderIdsMapping);
|
||||
if ($interfaceMapping = array_intersect_key($mapping, class_implements($message))) {
|
||||
return current($interfaceMapping);
|
||||
}
|
||||
if (isset($this->messageToSenderIdsMapping['*'])) {
|
||||
return $this->messageToSenderIdsMapping['*'];
|
||||
if (isset($mapping['*'])) {
|
||||
return $mapping['*'];
|
||||
}
|
||||
|
||||
return array();
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
@ -15,6 +15,7 @@ use Symfony\Component\Messenger\Transport\SenderInterface;
|
||||
|
||||
/**
|
||||
* @author Samuel Roze <samuel.roze@gmail.com>
|
||||
* @author Tobias Schultze <http://tobion.de>
|
||||
*
|
||||
* @experimental in 4.1
|
||||
*/
|
||||
@ -25,7 +26,7 @@ interface SenderLocatorInterface
|
||||
*
|
||||
* @param object $message
|
||||
*
|
||||
* @return SenderInterface[]
|
||||
* @return SenderInterface|null
|
||||
*/
|
||||
public function getSendersForMessage($message): array;
|
||||
public function getSenderForMessage($message): ?SenderInterface;
|
||||
}
|
||||
|
@ -16,7 +16,9 @@ use Symfony\Component\Messenger\Asynchronous\Middleware\SendMessageMiddleware;
|
||||
use Symfony\Component\Messenger\Asynchronous\Routing\SenderLocatorInterface;
|
||||
use Symfony\Component\Messenger\Asynchronous\Transport\ReceivedMessage;
|
||||
use Symfony\Component\Messenger\Envelope;
|
||||
use Symfony\Component\Messenger\Tests\Asynchronous\Routing\ChildDummyMessage;
|
||||
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
||||
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessageInterface;
|
||||
use Symfony\Component\Messenger\Transport\SenderInterface;
|
||||
|
||||
class SendMessageMiddlewareTest extends TestCase
|
||||
@ -27,9 +29,7 @@ class SendMessageMiddlewareTest extends TestCase
|
||||
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
|
||||
$next = $this->createPartialMock(\stdClass::class, array('__invoke'));
|
||||
|
||||
$middleware = new SendMessageMiddleware(new InMemorySenderLocator(array(
|
||||
$sender,
|
||||
)));
|
||||
$middleware = new SendMessageMiddleware(new InMemorySenderLocator($sender));
|
||||
|
||||
$sender->expects($this->once())->method('send')->with(Envelope::wrap($message));
|
||||
$next->expects($this->never())->method($this->anything());
|
||||
@ -43,9 +43,7 @@ class SendMessageMiddlewareTest extends TestCase
|
||||
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
|
||||
$next = $this->createPartialMock(\stdClass::class, array('__invoke'));
|
||||
|
||||
$middleware = new SendMessageMiddleware(new InMemorySenderLocator(array(
|
||||
$sender,
|
||||
)));
|
||||
$middleware = new SendMessageMiddleware(new InMemorySenderLocator($sender));
|
||||
|
||||
$sender->expects($this->once())->method('send')->with($envelope);
|
||||
$next->expects($this->never())->method($this->anything());
|
||||
@ -53,16 +51,63 @@ class SendMessageMiddlewareTest extends TestCase
|
||||
$middleware->handle($envelope, $next);
|
||||
}
|
||||
|
||||
public function testItAlsoCallsTheNextMiddlewareIfASenderIsNull()
|
||||
public function testItAlsoCallsTheNextMiddlewareBasedOnTheMessageClass()
|
||||
{
|
||||
$message = new DummyMessage('Hey');
|
||||
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
|
||||
$next = $this->createPartialMock(\stdClass::class, array('__invoke'));
|
||||
|
||||
$middleware = new SendMessageMiddleware(new InMemorySenderLocator(array(
|
||||
$sender,
|
||||
null,
|
||||
)));
|
||||
$middleware = new SendMessageMiddleware(new InMemorySenderLocator($sender), array(
|
||||
DummyMessage::class => true,
|
||||
));
|
||||
|
||||
$sender->expects($this->once())->method('send')->with(Envelope::wrap($message));
|
||||
$next->expects($this->once())->method($this->anything());
|
||||
|
||||
$middleware->handle($message, $next);
|
||||
}
|
||||
|
||||
public function testItAlsoCallsTheNextMiddlewareBasedOnTheMessageParentClass()
|
||||
{
|
||||
$message = new ChildDummyMessage('Hey');
|
||||
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
|
||||
$next = $this->createPartialMock(\stdClass::class, array('__invoke'));
|
||||
|
||||
$middleware = new SendMessageMiddleware(new InMemorySenderLocator($sender), array(
|
||||
DummyMessage::class => true,
|
||||
));
|
||||
|
||||
$sender->expects($this->once())->method('send')->with(Envelope::wrap($message));
|
||||
$next->expects($this->once())->method($this->anything());
|
||||
|
||||
$middleware->handle($message, $next);
|
||||
}
|
||||
|
||||
public function testItAlsoCallsTheNextMiddlewareBasedOnTheMessageInterface()
|
||||
{
|
||||
$message = new DummyMessage('Hey');
|
||||
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
|
||||
$next = $this->createPartialMock(\stdClass::class, array('__invoke'));
|
||||
|
||||
$middleware = new SendMessageMiddleware(new InMemorySenderLocator($sender), array(
|
||||
DummyMessageInterface::class => true,
|
||||
));
|
||||
|
||||
$sender->expects($this->once())->method('send')->with(Envelope::wrap($message));
|
||||
$next->expects($this->once())->method($this->anything());
|
||||
|
||||
$middleware->handle($message, $next);
|
||||
}
|
||||
|
||||
public function testItAlsoCallsTheNextMiddlewareBasedOnWildcard()
|
||||
{
|
||||
$message = new DummyMessage('Hey');
|
||||
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
|
||||
$next = $this->createPartialMock(\stdClass::class, array('__invoke'));
|
||||
|
||||
$middleware = new SendMessageMiddleware(new InMemorySenderLocator($sender), array(
|
||||
'*' => true,
|
||||
));
|
||||
|
||||
$sender->expects($this->once())->method('send')->with(Envelope::wrap($message));
|
||||
$next->expects($this->once())->method($this->anything());
|
||||
@ -75,7 +120,7 @@ class SendMessageMiddlewareTest extends TestCase
|
||||
$message = new DummyMessage('Hey');
|
||||
$next = $this->createPartialMock(\stdClass::class, array('__invoke'));
|
||||
|
||||
$middleware = new SendMessageMiddleware(new InMemorySenderLocator(array()));
|
||||
$middleware = new SendMessageMiddleware(new InMemorySenderLocator(null));
|
||||
|
||||
$next->expects($this->once())->method($this->anything());
|
||||
|
||||
@ -89,9 +134,7 @@ class SendMessageMiddlewareTest extends TestCase
|
||||
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
|
||||
$next = $this->createPartialMock(\stdClass::class, array('__invoke'));
|
||||
|
||||
$middleware = new SendMessageMiddleware(new InMemorySenderLocator(array(
|
||||
$sender,
|
||||
)));
|
||||
$middleware = new SendMessageMiddleware(new InMemorySenderLocator($sender));
|
||||
|
||||
$sender->expects($this->never())->method('send');
|
||||
$next->expects($this->once())->method('__invoke')->with($envelope);
|
||||
@ -102,15 +145,15 @@ class SendMessageMiddlewareTest extends TestCase
|
||||
|
||||
class InMemorySenderLocator implements SenderLocatorInterface
|
||||
{
|
||||
private $senders;
|
||||
private $sender;
|
||||
|
||||
public function __construct(array $senders)
|
||||
public function __construct(?SenderInterface $sender)
|
||||
{
|
||||
$this->senders = $senders;
|
||||
$this->sender = $sender;
|
||||
}
|
||||
|
||||
public function getSendersForMessage($message): array
|
||||
public function getSenderForMessage($message): ?SenderInterface
|
||||
{
|
||||
return $this->senders;
|
||||
return $this->sender;
|
||||
}
|
||||
}
|
||||
|
@ -28,13 +28,11 @@ class SenderLocatorTest extends TestCase
|
||||
$container->set('my_amqp_sender', $sender);
|
||||
|
||||
$locator = new SenderLocator($container, array(
|
||||
DummyMessage::class => array(
|
||||
'my_amqp_sender',
|
||||
),
|
||||
DummyMessage::class => 'my_amqp_sender',
|
||||
));
|
||||
|
||||
$this->assertSame(array($sender), $locator->getSendersForMessage(new DummyMessage('Hello')));
|
||||
$this->assertSame(array(), $locator->getSendersForMessage(new SecondMessage()));
|
||||
$this->assertSame($sender, $locator->getSenderForMessage(new DummyMessage('Hello')));
|
||||
$this->assertNull($locator->getSenderForMessage(new SecondMessage()));
|
||||
}
|
||||
|
||||
public function testItReturnsTheSenderBasedOnTheMessageParentClass()
|
||||
@ -48,15 +46,12 @@ class SenderLocatorTest extends TestCase
|
||||
$container->set('my_api_sender', $apiSender);
|
||||
|
||||
$locator = new SenderLocator($container, array(
|
||||
DummyMessageInterface::class => array(
|
||||
'my_api_sender',
|
||||
),
|
||||
DummyMessage::class => array(
|
||||
'my_amqp_sender',
|
||||
),
|
||||
DummyMessageInterface::class => 'my_api_sender',
|
||||
DummyMessage::class => 'my_amqp_sender',
|
||||
));
|
||||
$this->assertSame(array($sender), $locator->getSendersForMessage(new ChildDummyMessage('Hello')));
|
||||
$this->assertSame(array(), $locator->getSendersForMessage(new SecondMessage()));
|
||||
|
||||
$this->assertSame($sender, $locator->getSenderForMessage(new ChildDummyMessage('Hello')));
|
||||
$this->assertNull($locator->getSenderForMessage(new SecondMessage()));
|
||||
}
|
||||
|
||||
public function testItReturnsTheSenderBasedOnTheMessageInterface()
|
||||
@ -67,13 +62,11 @@ class SenderLocatorTest extends TestCase
|
||||
$container->set('my_amqp_sender', $sender);
|
||||
|
||||
$locator = new SenderLocator($container, array(
|
||||
DummyMessageInterface::class => array(
|
||||
'my_amqp_sender',
|
||||
),
|
||||
DummyMessageInterface::class => 'my_amqp_sender',
|
||||
));
|
||||
|
||||
$this->assertSame(array($sender), $locator->getSendersForMessage(new DummyMessage('Hello')));
|
||||
$this->assertSame(array(), $locator->getSendersForMessage(new SecondMessage()));
|
||||
$this->assertSame($sender, $locator->getSenderForMessage(new DummyMessage('Hello')));
|
||||
$this->assertNull($locator->getSenderForMessage(new SecondMessage()));
|
||||
}
|
||||
|
||||
public function testItSupportsAWildcardInsteadOfTheMessageClass()
|
||||
@ -87,16 +80,12 @@ class SenderLocatorTest extends TestCase
|
||||
$container->set('my_api_sender', $apiSender);
|
||||
|
||||
$locator = new SenderLocator($container, array(
|
||||
DummyMessage::class => array(
|
||||
'my_amqp_sender',
|
||||
),
|
||||
'*' => array(
|
||||
'my_api_sender',
|
||||
),
|
||||
DummyMessage::class => 'my_amqp_sender',
|
||||
'*' => 'my_api_sender',
|
||||
));
|
||||
|
||||
$this->assertSame(array($sender), $locator->getSendersForMessage(new DummyMessage('Hello')));
|
||||
$this->assertSame(array($apiSender), $locator->getSendersForMessage(new SecondMessage()));
|
||||
$this->assertSame($sender, $locator->getSenderForMessage(new DummyMessage('Hello')));
|
||||
$this->assertSame($apiSender, $locator->getSenderForMessage(new SecondMessage()));
|
||||
}
|
||||
}
|
||||
|
||||
|
38
src/Symfony/Component/Messenger/Transport/ChainSender.php
Normal file
38
src/Symfony/Component/Messenger/Transport/ChainSender.php
Normal file
@ -0,0 +1,38 @@
|
||||
<?php
|
||||
|
||||
/*
|
||||
* This file is part of the Symfony package.
|
||||
*
|
||||
* (c) Fabien Potencier <fabien@symfony.com>
|
||||
*
|
||||
* For the full copyright and license information, please view the LICENSE
|
||||
* file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
namespace Symfony\Component\Messenger\Transport;
|
||||
|
||||
/**
|
||||
* @author Tobias Schultze <http://tobion.de>
|
||||
*/
|
||||
class ChainSender implements SenderInterface
|
||||
{
|
||||
private $senders;
|
||||
|
||||
/**
|
||||
* @param SenderInterface[] $senders
|
||||
*/
|
||||
public function __construct(iterable $senders)
|
||||
{
|
||||
$this->senders = $senders;
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function send($message): void
|
||||
{
|
||||
foreach ($this->senders as $sender) {
|
||||
$sender->send($message);
|
||||
}
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user