[Messenger] implement several senders using a ChainSender

This commit is contained in:
Tobias Schultze 2018-04-21 21:29:51 +02:00
parent f59ce97eff
commit 198925ee4e
14 changed files with 204 additions and 90 deletions

View File

@ -986,7 +986,10 @@ class Configuration implements ConfigurationInterface
$newConfig = array();
foreach ($config as $k => $v) {
if (!\is_int($k)) {
$newConfig[$k] = array('senders' => \is_array($v) ? array_values($v) : array($v));
$newConfig[$k] = array(
'senders' => $v['senders'] ?? (\is_array($v) ? array_values($v) : array($v)),
'send_and_handle' => $v['send_and_handle'] ?? false,
);
} else {
$newConfig[$v['message-class']]['senders'] = array_map(
function ($a) {
@ -994,6 +997,7 @@ class Configuration implements ConfigurationInterface
},
array_values($v['sender'])
);
$newConfig[$v['message-class']]['send-and-handle'] = $v['send-and-handle'] ?? false;
}
}
@ -1006,6 +1010,7 @@ class Configuration implements ConfigurationInterface
->requiresAtLeastOneElement()
->prototype('scalar')->end()
->end()
->booleanNode('send_and_handle')->defaultFalse()->end()
->end()
->end()
->end()

View File

@ -63,6 +63,7 @@ use Symfony\Component\Lock\StoreInterface;
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;
use Symfony\Component\Messenger\MessageBus;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Transport\ChainSender;
use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;
use Symfony\Component\PropertyAccess\PropertyAccessor;
@ -1494,16 +1495,28 @@ class FrameworkExtension extends Extension
throw new LogicException(sprintf('The default bus named "%s" is not defined. Define it or change the default bus name.', $config['default_bus']));
}
$messageToSenderIdsMapping = array();
$messageToSenderIdMapping = array();
$messageToSendAndHandleMapping = array();
foreach ($config['routing'] as $message => $messageConfiguration) {
if ('*' !== $message && !class_exists($message) && !interface_exists($message, false)) {
throw new LogicException(sprintf('Messenger routing configuration contains a mistake: message "%s" does not exist. It needs to match an existing class or interface.', $message));
}
$messageToSenderIdsMapping[$message] = $messageConfiguration['senders'];
if (1 < \count($messageConfiguration['senders'])) {
$senders = array_map(function ($sender) { return new Reference($sender); }, $messageConfiguration['senders']);
$chainSenderDefinition = new Definition(ChainSender::class, array($senders));
$chainSenderId = '.messenger.chain_sender.'.$message;
$container->setDefinition($chainSenderId, $chainSenderDefinition);
$messageToSenderIdMapping[$message] = $chainSenderId;
} else {
$messageToSenderIdMapping[$message] = $messageConfiguration['senders'][0];
}
$messageToSendAndHandleMapping[$message] = $messageConfiguration['send_and_handle'];
}
$container->getDefinition('messenger.asynchronous.routing.sender_locator')->replaceArgument(1, $messageToSenderIdsMapping);
$container->getDefinition('messenger.asynchronous.routing.sender_locator')->replaceArgument(1, $messageToSenderIdMapping);
$container->getDefinition('messenger.middleware.route_messages')->replaceArgument(1, $messageToSendAndHandleMapping);
foreach ($config['transports'] as $name => $transport) {
if (0 === strpos($transport['dsn'], 'amqp://') && !$container->hasDefinition('messenger.transport.amqp.factory')) {

View File

@ -15,10 +15,11 @@
<!-- Asynchronous -->
<service id="messenger.asynchronous.routing.sender_locator" class="Symfony\Component\Messenger\Asynchronous\Routing\SenderLocator">
<argument type="service" id="messenger.sender_locator" />
<argument /> <!-- Message to sender ID mapping -->
<argument type="collection" /> <!-- Message to sender ID mapping -->
</service>
<service id="messenger.middleware.route_messages" class="Symfony\Component\Messenger\Asynchronous\Middleware\SendMessageMiddleware">
<argument type="service" id="messenger.asynchronous.routing.sender_locator" />
<argument type="collection" /> <!-- Message to send and handle mapping -->
</service>
<!-- Message encoding/decoding -->

View File

@ -375,6 +375,7 @@
<xsd:element name="sender" type="messenger_routing_sender" />
</xsd:choice>
<xsd:attribute name="message-class" type="xsd:string" use="required"/>
<xsd:attribute name="send-and-handle" type="xsd:boolean" default="false"/>
</xsd:complexType>
<xsd:complexType name="messenger_routing_sender">

View File

@ -3,8 +3,11 @@
$container->loadFromExtension('framework', array(
'messenger' => array(
'routing' => array(
'Symfony\Component\Messenger\Tests\Fixtures\DummyMessage' => array('amqp'),
'Symfony\Component\Messenger\Tests\Fixtures\SecondMessage' => array('amqp', 'audit', null),
'Symfony\Component\Messenger\Tests\Fixtures\DummyMessage' => array('amqp', 'audit'),
'Symfony\Component\Messenger\Tests\Fixtures\SecondMessage' => array(
'senders' => array('amqp', 'audit'),
'send_and_handle' => true,
),
'*' => 'amqp',
),
),

View File

@ -9,11 +9,11 @@
<framework:messenger>
<framework:routing message-class="Symfony\Component\Messenger\Tests\Fixtures\DummyMessage">
<framework:sender service="amqp" />
<framework:sender service="audit" />
</framework:routing>
<framework:routing message-class="Symfony\Component\Messenger\Tests\Fixtures\SecondMessage">
<framework:routing message-class="Symfony\Component\Messenger\Tests\Fixtures\SecondMessage" send-and-handle="true">
<framework:sender service="amqp" />
<framework:sender service="audit" />
<framework:sender service="null" />
</framework:routing>
<framework:routing message-class="*">
<framework:sender service="amqp" />

View File

@ -1,6 +1,8 @@
framework:
messenger:
routing:
'Symfony\Component\Messenger\Tests\Fixtures\DummyMessage': amqp
'Symfony\Component\Messenger\Tests\Fixtures\SecondMessage': [amqp, audit, ~]
'Symfony\Component\Messenger\Tests\Fixtures\DummyMessage': [amqp, audit]
'Symfony\Component\Messenger\Tests\Fixtures\SecondMessage':
senders: [amqp, audit]
send_and_handle: true
'*': amqp

View File

@ -558,14 +558,21 @@ abstract class FrameworkExtensionTest extends TestCase
{
$container = $this->createContainerFromFile('messenger_routing');
$senderLocatorDefinition = $container->getDefinition('messenger.asynchronous.routing.sender_locator');
$sendMessageMiddlewareDefinition = $container->getDefinition('messenger.middleware.route_messages');
$messageToSenderIdsMapping = array(
DummyMessage::class => array('amqp'),
SecondMessage::class => array('amqp', 'audit', null),
'*' => array('amqp'),
DummyMessage::class => '.messenger.chain_sender.'.DummyMessage::class,
SecondMessage::class => '.messenger.chain_sender.'.SecondMessage::class,
'*' => 'amqp',
);
$messageToSendAndHandleMapping = array(
DummyMessage::class => false,
SecondMessage::class => true,
'*' => false,
);
$this->assertSame($messageToSenderIdsMapping, $senderLocatorDefinition->getArgument(1));
$this->assertSame($messageToSendAndHandleMapping, $sendMessageMiddlewareDefinition->getArgument(1));
}
/**

View File

@ -11,6 +11,7 @@
namespace Symfony\Component\Messenger\Asynchronous\Middleware;
use Symfony\Component\Messenger\Asynchronous\Routing\SenderLocator;
use Symfony\Component\Messenger\Asynchronous\Routing\SenderLocatorInterface;
use Symfony\Component\Messenger\Asynchronous\Transport\ReceivedMessage;
use Symfony\Component\Messenger\Envelope;
@ -19,14 +20,17 @@ use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
/**
* @author Samuel Roze <samuel.roze@gmail.com>
* @author Tobias Schultze <http://tobion.de>
*/
class SendMessageMiddleware implements MiddlewareInterface, EnvelopeAwareInterface
{
private $senderLocator;
private $messagesToSendAndHandleMapping;
public function __construct(SenderLocatorInterface $senderLocator)
public function __construct(SenderLocatorInterface $senderLocator, array $messagesToSendAndHandleMapping = array())
{
$this->senderLocator = $senderLocator;
$this->messagesToSendAndHandleMapping = $messagesToSendAndHandleMapping;
}
/**
@ -40,20 +44,21 @@ class SendMessageMiddleware implements MiddlewareInterface, EnvelopeAwareInterfa
return $next($message);
}
if (!empty($senders = $this->senderLocator->getSendersForMessage($envelope->getMessage()))) {
foreach ($senders as $sender) {
if (null === $sender) {
continue;
}
$sender = $this->senderLocator->getSenderForMessage($envelope->getMessage());
$sender->send($envelope);
}
if ($sender) {
$sender->send($envelope);
if (!\in_array(null, $senders, true)) {
if (!$this->mustSendAndHandle($envelope->getMessage())) {
return;
}
}
return $next($message);
}
private function mustSendAndHandle($message): bool
{
return (bool) SenderLocator::getValueFromMessageRouting($this->messagesToSendAndHandleMapping, $message);
}
}

View File

@ -12,6 +12,7 @@
namespace Symfony\Component\Messenger\Asynchronous\Routing;
use Psr\Container\ContainerInterface;
use Symfony\Component\Messenger\Transport\SenderInterface;
/**
* @author Samuel Roze <samuel.roze@gmail.com>
@ -19,42 +20,47 @@ use Psr\Container\ContainerInterface;
class SenderLocator implements SenderLocatorInterface
{
private $senderServiceLocator;
private $messageToSenderIdsMapping;
private $messageToSenderIdMapping;
public function __construct(ContainerInterface $senderServiceLocator, array $messageToSenderIdsMapping)
public function __construct(ContainerInterface $senderServiceLocator, array $messageToSenderIdMapping)
{
$this->senderServiceLocator = $senderServiceLocator;
$this->messageToSenderIdsMapping = $messageToSenderIdsMapping;
$this->messageToSenderIdMapping = $messageToSenderIdMapping;
}
/**
* {@inheritdoc}
*/
public function getSendersForMessage($message): array
public function getSenderForMessage($message): ?SenderInterface
{
$senders = array();
foreach ($this->getSenderIds($message) as $senderId) {
$senders[] = $this->senderServiceLocator->get($senderId);
}
$senderId = $this->getSenderId($message);
return $senders;
return $senderId ? $this->senderServiceLocator->get($senderId) : null;
}
private function getSenderIds($message): array
private function getSenderId($message): ?string
{
if (isset($this->messageToSenderIdsMapping[\get_class($message)])) {
return $this->messageToSenderIdsMapping[\get_class($message)];
return self::getValueFromMessageRouting($this->messageToSenderIdMapping, $message);
}
/**
* @internal
*/
public static function getValueFromMessageRouting(array $mapping, $message)
{
if (isset($mapping[\get_class($message)])) {
return $mapping[\get_class($message)];
}
if ($messageToSenderIdsMapping = array_intersect_key($this->messageToSenderIdsMapping, class_parents($message))) {
return current($messageToSenderIdsMapping);
if ($parentsMapping = array_intersect_key($mapping, class_parents($message))) {
return current($parentsMapping);
}
if ($messageToSenderIdsMapping = array_intersect_key($this->messageToSenderIdsMapping, class_implements($message))) {
return current($messageToSenderIdsMapping);
if ($interfaceMapping = array_intersect_key($mapping, class_implements($message))) {
return current($interfaceMapping);
}
if (isset($this->messageToSenderIdsMapping['*'])) {
return $this->messageToSenderIdsMapping['*'];
if (isset($mapping['*'])) {
return $mapping['*'];
}
return array();
return null;
}
}

View File

@ -15,6 +15,7 @@ use Symfony\Component\Messenger\Transport\SenderInterface;
/**
* @author Samuel Roze <samuel.roze@gmail.com>
* @author Tobias Schultze <http://tobion.de>
*
* @experimental in 4.1
*/
@ -25,7 +26,7 @@ interface SenderLocatorInterface
*
* @param object $message
*
* @return SenderInterface[]
* @return SenderInterface|null
*/
public function getSendersForMessage($message): array;
public function getSenderForMessage($message): ?SenderInterface;
}

View File

@ -16,7 +16,9 @@ use Symfony\Component\Messenger\Asynchronous\Middleware\SendMessageMiddleware;
use Symfony\Component\Messenger\Asynchronous\Routing\SenderLocatorInterface;
use Symfony\Component\Messenger\Asynchronous\Transport\ReceivedMessage;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Tests\Asynchronous\Routing\ChildDummyMessage;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessageInterface;
use Symfony\Component\Messenger\Transport\SenderInterface;
class SendMessageMiddlewareTest extends TestCase
@ -27,9 +29,7 @@ class SendMessageMiddlewareTest extends TestCase
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
$next = $this->createPartialMock(\stdClass::class, array('__invoke'));
$middleware = new SendMessageMiddleware(new InMemorySenderLocator(array(
$sender,
)));
$middleware = new SendMessageMiddleware(new InMemorySenderLocator($sender));
$sender->expects($this->once())->method('send')->with(Envelope::wrap($message));
$next->expects($this->never())->method($this->anything());
@ -43,9 +43,7 @@ class SendMessageMiddlewareTest extends TestCase
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
$next = $this->createPartialMock(\stdClass::class, array('__invoke'));
$middleware = new SendMessageMiddleware(new InMemorySenderLocator(array(
$sender,
)));
$middleware = new SendMessageMiddleware(new InMemorySenderLocator($sender));
$sender->expects($this->once())->method('send')->with($envelope);
$next->expects($this->never())->method($this->anything());
@ -53,16 +51,63 @@ class SendMessageMiddlewareTest extends TestCase
$middleware->handle($envelope, $next);
}
public function testItAlsoCallsTheNextMiddlewareIfASenderIsNull()
public function testItAlsoCallsTheNextMiddlewareBasedOnTheMessageClass()
{
$message = new DummyMessage('Hey');
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
$next = $this->createPartialMock(\stdClass::class, array('__invoke'));
$middleware = new SendMessageMiddleware(new InMemorySenderLocator(array(
$sender,
null,
)));
$middleware = new SendMessageMiddleware(new InMemorySenderLocator($sender), array(
DummyMessage::class => true,
));
$sender->expects($this->once())->method('send')->with(Envelope::wrap($message));
$next->expects($this->once())->method($this->anything());
$middleware->handle($message, $next);
}
public function testItAlsoCallsTheNextMiddlewareBasedOnTheMessageParentClass()
{
$message = new ChildDummyMessage('Hey');
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
$next = $this->createPartialMock(\stdClass::class, array('__invoke'));
$middleware = new SendMessageMiddleware(new InMemorySenderLocator($sender), array(
DummyMessage::class => true,
));
$sender->expects($this->once())->method('send')->with(Envelope::wrap($message));
$next->expects($this->once())->method($this->anything());
$middleware->handle($message, $next);
}
public function testItAlsoCallsTheNextMiddlewareBasedOnTheMessageInterface()
{
$message = new DummyMessage('Hey');
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
$next = $this->createPartialMock(\stdClass::class, array('__invoke'));
$middleware = new SendMessageMiddleware(new InMemorySenderLocator($sender), array(
DummyMessageInterface::class => true,
));
$sender->expects($this->once())->method('send')->with(Envelope::wrap($message));
$next->expects($this->once())->method($this->anything());
$middleware->handle($message, $next);
}
public function testItAlsoCallsTheNextMiddlewareBasedOnWildcard()
{
$message = new DummyMessage('Hey');
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
$next = $this->createPartialMock(\stdClass::class, array('__invoke'));
$middleware = new SendMessageMiddleware(new InMemorySenderLocator($sender), array(
'*' => true,
));
$sender->expects($this->once())->method('send')->with(Envelope::wrap($message));
$next->expects($this->once())->method($this->anything());
@ -75,7 +120,7 @@ class SendMessageMiddlewareTest extends TestCase
$message = new DummyMessage('Hey');
$next = $this->createPartialMock(\stdClass::class, array('__invoke'));
$middleware = new SendMessageMiddleware(new InMemorySenderLocator(array()));
$middleware = new SendMessageMiddleware(new InMemorySenderLocator(null));
$next->expects($this->once())->method($this->anything());
@ -89,9 +134,7 @@ class SendMessageMiddlewareTest extends TestCase
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
$next = $this->createPartialMock(\stdClass::class, array('__invoke'));
$middleware = new SendMessageMiddleware(new InMemorySenderLocator(array(
$sender,
)));
$middleware = new SendMessageMiddleware(new InMemorySenderLocator($sender));
$sender->expects($this->never())->method('send');
$next->expects($this->once())->method('__invoke')->with($envelope);
@ -102,15 +145,15 @@ class SendMessageMiddlewareTest extends TestCase
class InMemorySenderLocator implements SenderLocatorInterface
{
private $senders;
private $sender;
public function __construct(array $senders)
public function __construct(?SenderInterface $sender)
{
$this->senders = $senders;
$this->sender = $sender;
}
public function getSendersForMessage($message): array
public function getSenderForMessage($message): ?SenderInterface
{
return $this->senders;
return $this->sender;
}
}

View File

@ -28,13 +28,11 @@ class SenderLocatorTest extends TestCase
$container->set('my_amqp_sender', $sender);
$locator = new SenderLocator($container, array(
DummyMessage::class => array(
'my_amqp_sender',
),
DummyMessage::class => 'my_amqp_sender',
));
$this->assertSame(array($sender), $locator->getSendersForMessage(new DummyMessage('Hello')));
$this->assertSame(array(), $locator->getSendersForMessage(new SecondMessage()));
$this->assertSame($sender, $locator->getSenderForMessage(new DummyMessage('Hello')));
$this->assertNull($locator->getSenderForMessage(new SecondMessage()));
}
public function testItReturnsTheSenderBasedOnTheMessageParentClass()
@ -48,15 +46,12 @@ class SenderLocatorTest extends TestCase
$container->set('my_api_sender', $apiSender);
$locator = new SenderLocator($container, array(
DummyMessageInterface::class => array(
'my_api_sender',
),
DummyMessage::class => array(
'my_amqp_sender',
),
DummyMessageInterface::class => 'my_api_sender',
DummyMessage::class => 'my_amqp_sender',
));
$this->assertSame(array($sender), $locator->getSendersForMessage(new ChildDummyMessage('Hello')));
$this->assertSame(array(), $locator->getSendersForMessage(new SecondMessage()));
$this->assertSame($sender, $locator->getSenderForMessage(new ChildDummyMessage('Hello')));
$this->assertNull($locator->getSenderForMessage(new SecondMessage()));
}
public function testItReturnsTheSenderBasedOnTheMessageInterface()
@ -67,13 +62,11 @@ class SenderLocatorTest extends TestCase
$container->set('my_amqp_sender', $sender);
$locator = new SenderLocator($container, array(
DummyMessageInterface::class => array(
'my_amqp_sender',
),
DummyMessageInterface::class => 'my_amqp_sender',
));
$this->assertSame(array($sender), $locator->getSendersForMessage(new DummyMessage('Hello')));
$this->assertSame(array(), $locator->getSendersForMessage(new SecondMessage()));
$this->assertSame($sender, $locator->getSenderForMessage(new DummyMessage('Hello')));
$this->assertNull($locator->getSenderForMessage(new SecondMessage()));
}
public function testItSupportsAWildcardInsteadOfTheMessageClass()
@ -87,16 +80,12 @@ class SenderLocatorTest extends TestCase
$container->set('my_api_sender', $apiSender);
$locator = new SenderLocator($container, array(
DummyMessage::class => array(
'my_amqp_sender',
),
'*' => array(
'my_api_sender',
),
DummyMessage::class => 'my_amqp_sender',
'*' => 'my_api_sender',
));
$this->assertSame(array($sender), $locator->getSendersForMessage(new DummyMessage('Hello')));
$this->assertSame(array($apiSender), $locator->getSendersForMessage(new SecondMessage()));
$this->assertSame($sender, $locator->getSenderForMessage(new DummyMessage('Hello')));
$this->assertSame($apiSender, $locator->getSenderForMessage(new SecondMessage()));
}
}

View File

@ -0,0 +1,38 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Transport;
/**
* @author Tobias Schultze <http://tobion.de>
*/
class ChainSender implements SenderInterface
{
private $senders;
/**
* @param SenderInterface[] $senders
*/
public function __construct(iterable $senders)
{
$this->senders = $senders;
}
/**
* {@inheritdoc}
*/
public function send($message): void
{
foreach ($this->senders as $sender) {
$sender->send($message);
}
}
}