[Messenger] Define multiple buses from the framework.messenger.buses configuration

This commit is contained in:
Samuel ROZE 2018-04-08 15:33:26 +01:00
parent 4429c9b20f
commit e5deb8499b
25 changed files with 518 additions and 184 deletions

View File

@ -972,6 +972,7 @@ class Configuration implements ConfigurationInterface
->info('Messenger configuration') ->info('Messenger configuration')
->{!class_exists(FullStack::class) && interface_exists(MessageBusInterface::class) ? 'canBeDisabled' : 'canBeEnabled'}() ->{!class_exists(FullStack::class) && interface_exists(MessageBusInterface::class) ? 'canBeDisabled' : 'canBeEnabled'}()
->fixXmlConfig('adapter') ->fixXmlConfig('adapter')
->fixXmlConfig('bus', 'buses')
->children() ->children()
->arrayNode('routing') ->arrayNode('routing')
->useAttributeAsKey('message_class') ->useAttributeAsKey('message_class')
@ -1023,14 +1024,6 @@ class Configuration implements ConfigurationInterface
->end() ->end()
->scalarNode('encoder')->defaultValue('messenger.transport.serializer')->end() ->scalarNode('encoder')->defaultValue('messenger.transport.serializer')->end()
->scalarNode('decoder')->defaultValue('messenger.transport.serializer')->end() ->scalarNode('decoder')->defaultValue('messenger.transport.serializer')->end()
->arrayNode('middlewares')
->addDefaultsIfNotSet()
->children()
->arrayNode('validation')
->{!class_exists(FullStack::class) && class_exists(Validation::class) ? 'canBeDisabled' : 'canBeEnabled'}()
->end()
->end()
->end()
->arrayNode('adapters') ->arrayNode('adapters')
->useAttributeAsKey('name') ->useAttributeAsKey('name')
->arrayPrototype() ->arrayPrototype()
@ -1052,6 +1045,22 @@ class Configuration implements ConfigurationInterface
->end() ->end()
->end() ->end()
->end() ->end()
->scalarNode('default_bus')->defaultValue(null)->end()
->arrayNode('buses')
->defaultValue(array('default' => array('default_middlewares' => true, 'middlewares' => array())))
->useAttributeAsKey('name')
->prototype('array')
->fixXmlConfig('middleware')
->addDefaultsIfNotSet()
->children()
->booleanNode('default_middlewares')->defaultTrue()->end()
->arrayNode('middlewares')
->defaultValue(array())
->prototype('scalar')->end()
->end()
->end()
->end()
->end()
->end() ->end()
->end() ->end()
->end() ->end()

View File

@ -61,6 +61,7 @@ use Symfony\Component\Lock\LockInterface;
use Symfony\Component\Lock\Store\StoreFactory; use Symfony\Component\Lock\Store\StoreFactory;
use Symfony\Component\Lock\StoreInterface; use Symfony\Component\Lock\StoreInterface;
use Symfony\Component\Messenger\Handler\MessageHandlerInterface; use Symfony\Component\Messenger\Handler\MessageHandlerInterface;
use Symfony\Component\Messenger\MessageBus;
use Symfony\Component\Messenger\MessageBusInterface; use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Transport\ReceiverInterface; use Symfony\Component\Messenger\Transport\ReceiverInterface;
use Symfony\Component\Messenger\Transport\SenderInterface; use Symfony\Component\Messenger\Transport\SenderInterface;
@ -273,7 +274,7 @@ class FrameworkExtension extends Extension
} }
if ($this->isConfigEnabled($container, $config['messenger'])) { if ($this->isConfigEnabled($container, $config['messenger'])) {
$this->registerMessengerConfiguration($config['messenger'], $container, $loader, $config['serializer']); $this->registerMessengerConfiguration($config['messenger'], $container, $loader, $config['serializer'], $config['validation']);
} else { } else {
$container->removeDefinition('console.command.messenger_consume_messages'); $container->removeDefinition('console.command.messenger_consume_messages');
} }
@ -1438,7 +1439,7 @@ class FrameworkExtension extends Extension
} }
} }
private function registerMessengerConfiguration(array $config, ContainerBuilder $container, XmlFileLoader $loader, array $serializerConfig) private function registerMessengerConfiguration(array $config, ContainerBuilder $container, XmlFileLoader $loader, array $serializerConfig, array $validationConfig)
{ {
if (!interface_exists(MessageBusInterface::class)) { if (!interface_exists(MessageBusInterface::class)) {
throw new LogicException('Messenger support cannot be enabled as the Messenger component is not installed.'); throw new LogicException('Messenger support cannot be enabled as the Messenger component is not installed.');
@ -1461,6 +1462,37 @@ class FrameworkExtension extends Extension
$container->setAlias('messenger.transport.encoder', $config['encoder']); $container->setAlias('messenger.transport.encoder', $config['encoder']);
$container->setAlias('messenger.transport.decoder', $config['decoder']); $container->setAlias('messenger.transport.decoder', $config['decoder']);
if (null === $config['default_bus']) {
if (count($config['buses']) > 1) {
throw new LogicException(sprintf('You need to define a default bus with the "default_bus" configuration. Possible values: %s', implode(', ', array_keys($config['buses']))));
}
$config['default_bus'] = array_keys($config['buses'])[0];
}
$defaultMiddlewares = array('before' => array('logging'), 'after' => array('route_messages', 'call_message_handler'));
foreach ($config['buses'] as $name => $bus) {
$busId = 'messenger.bus.'.$name;
$middlewares = $bus['default_middlewares'] ? array_merge($defaultMiddlewares['before'], $bus['middlewares'], $defaultMiddlewares['after']) : $bus['middlewares'];
if (in_array('messenger.middleware.validation', $middlewares) && !$validationConfig['enabled']) {
throw new LogicException('The Validation middleware is only available when the Validator component is installed and enabled. Try running "composer require symfony/validator".');
}
$container->setParameter($busId.'.middlewares', $middlewares);
$container->setDefinition($busId, (new Definition(MessageBus::class, array(array())))->addTag('messenger.bus', array('name' => $name)));
if ($name === $config['default_bus']) {
$container->setAlias('message_bus', $busId);
$container->setAlias(MessageBusInterface::class, $busId);
}
}
if (!$container->hasAlias('message_bus')) {
throw new LogicException(sprintf('The default bus named "%s" is not defined. Define it or change the default bus name.', $config['default_bus']));
}
$messageToSenderIdsMapping = array(); $messageToSenderIdsMapping = array();
foreach ($config['routing'] as $message => $messageConfiguration) { foreach ($config['routing'] as $message => $messageConfiguration) {
$messageToSenderIdsMapping[$message] = $messageConfiguration['senders']; $messageToSenderIdsMapping[$message] = $messageConfiguration['senders'];
@ -1468,14 +1500,6 @@ class FrameworkExtension extends Extension
$container->getDefinition('messenger.asynchronous.routing.sender_locator')->replaceArgument(1, $messageToSenderIdsMapping); $container->getDefinition('messenger.asynchronous.routing.sender_locator')->replaceArgument(1, $messageToSenderIdsMapping);
if ($config['middlewares']['validation']['enabled']) {
if (!$container->has('validator')) {
throw new LogicException('The Validation middleware is only available when the Validator component is installed and enabled. Try running "composer require symfony/validator".');
}
} else {
$container->removeDefinition('messenger.middleware.validator');
}
foreach ($config['adapters'] as $name => $adapter) { foreach ($config['adapters'] as $name => $adapter) {
$container->setDefinition('messenger.sender.'.$name, (new Definition(SenderInterface::class))->setFactory(array( $container->setDefinition('messenger.sender.'.$name, (new Definition(SenderInterface::class))->setFactory(array(
new Reference('messenger.adapter_factory'), new Reference('messenger.adapter_factory'),

View File

@ -7,39 +7,18 @@
<services> <services>
<defaults public="false" /> <defaults public="false" />
<!-- Bus -->
<service id="message_bus" class="Symfony\Component\Messenger\MessageBus" public="true">
<argument type="tagged" tag="messenger.bus_middleware" /> <!-- Middlewares -->
</service>
<service id="Symfony\Component\Messenger\MessageBusInterface" alias="message_bus" />
<!-- Handlers --> <!-- Handlers -->
<service id="messenger.handler_resolver" class="Symfony\Component\Messenger\ContainerHandlerLocator"> <service id="messenger.handler_resolver" class="Symfony\Component\Messenger\ContainerHandlerLocator">
<argument type="service" id="service_container"/> <argument type="service" id="service_container"/>
</service> </service>
<service id="messenger.middleware.call_message_handler" class="Symfony\Component\Messenger\Middleware\HandleMessageMiddleware">
<argument type="service" id="messenger.handler_resolver" />
<tag name="messenger.bus_middleware" priority="-10" />
</service>
<service id="messenger.middleware.validator" class="Symfony\Component\Messenger\Middleware\ValidationMiddleware">
<argument type="service" id="validator" />
<tag name="messenger.bus_middleware" priority="100" />
</service>
<!-- Asynchronous --> <!-- Asynchronous -->
<service id="messenger.asynchronous.routing.sender_locator" class="Symfony\Component\Messenger\Asynchronous\Routing\SenderLocator"> <service id="messenger.asynchronous.routing.sender_locator" class="Symfony\Component\Messenger\Asynchronous\Routing\SenderLocator">
<argument type="service" id="messenger.sender_locator" /> <argument type="service" id="messenger.sender_locator" />
<argument /> <!-- Message to sender ID mapping --> <argument /> <!-- Message to sender ID mapping -->
</service> </service>
<service id="messenger.asynchronous.middleware.send_message_to_producer" class="Symfony\Component\Messenger\Asynchronous\Middleware\SendMessageMiddleware"> <service id="messenger.middleware.route_messages" class="Symfony\Component\Messenger\Asynchronous\Middleware\SendMessageMiddleware">
<argument type="service" id="messenger.asynchronous.routing.sender_locator" /> <argument type="service" id="messenger.asynchronous.routing.sender_locator" />
<tag name="messenger.bus_middleware" priority="-5" />
</service> </service>
<!-- Message encoding/decoding --> <!-- Message encoding/decoding -->
@ -49,17 +28,25 @@
<argument type="collection" /> <!-- Context --> <argument type="collection" /> <!-- Context -->
</service> </service>
<!-- Middlewares -->
<service id="messenger.middleware.tolerate_no_handler" class="Symfony\Component\Messenger\Middleware\TolerateNoHandler" abstract="true" />
<service id="messenger.middleware.call_message_handler" class="Symfony\Component\Messenger\Middleware\HandleMessageMiddleware" abstract="true">
<argument type="service" id="messenger.handler_resolver" />
</service>
<service id="messenger.middleware.validation" class="Symfony\Component\Messenger\Middleware\ValidationMiddleware" abstract="true">
<argument type="service" id="validator" />
</service>
<!-- Logging & Debug --> <!-- Logging & Debug -->
<service id="messenger.middleware.debug.logging" class="Symfony\Component\Messenger\Debug\LoggingMiddleware"> <service id="messenger.middleware.logging" class="Symfony\Component\Messenger\Middleware\LoggingMiddleware" abstract="true">
<argument type="service" id="logger" /> <argument type="service" id="logger" />
<tag name="messenger.bus_middleware" priority="10" />
<tag name="monolog.logger" channel="messenger" /> <tag name="monolog.logger" channel="messenger" />
</service> </service>
<service id="data_collector.messenger" class="Symfony\Component\Messenger\DataCollector\MessengerDataCollector"> <service id="messenger.data_collector" class="Symfony\Component\Messenger\DataCollector\MessengerDataCollector">
<tag name="data_collector" template="@WebProfiler/Collector/messenger.html.twig" id="messenger" priority="100" /> <tag name="data_collector" template="@WebProfiler/Collector/messenger.html.twig" id="messenger" priority="100" />
<tag name="messenger.bus_middleware" />
</service> </service>
<!-- Discovery --> <!-- Discovery -->

View File

@ -356,9 +356,10 @@
<xsd:element name="encoder" type="xsd:string" minOccurs="0" /> <xsd:element name="encoder" type="xsd:string" minOccurs="0" />
<xsd:element name="decoder" type="xsd:string" minOccurs="0" /> <xsd:element name="decoder" type="xsd:string" minOccurs="0" />
<xsd:element name="routing" type="messenger_routing" minOccurs="0" maxOccurs="unbounded" /> <xsd:element name="routing" type="messenger_routing" minOccurs="0" maxOccurs="unbounded" />
<xsd:element name="middlewares" type="messenger_middleware" minOccurs="0" maxOccurs="unbounded" />
<xsd:element name="adapter" type="messenger_adapter" minOccurs="0" maxOccurs="unbounded" /> <xsd:element name="adapter" type="messenger_adapter" minOccurs="0" maxOccurs="unbounded" />
<xsd:element name="bus" type="messenger_bus" minOccurs="0" maxOccurs="unbounded" />
</xsd:sequence> </xsd:sequence>
<xsd:attribute name="default-bus" type="xsd:string" />
</xsd:complexType> </xsd:complexType>
<xsd:complexType name="messenger_serializer"> <xsd:complexType name="messenger_serializer">
@ -388,13 +389,16 @@
<xsd:attribute name="dsn" type="xsd:string" /> <xsd:attribute name="dsn" type="xsd:string" />
</xsd:complexType> </xsd:complexType>
<xsd:complexType name="messenger_middleware"> <xsd:complexType name="messenger_adapter_option">
<xsd:sequence> <xsd:attribute name="name" type="xsd:string" />
<xsd:element name="validation" type="messenger_validation" minOccurs="0" maxOccurs="1" /> <xsd:attribute name="value" type="xsd:string" />
</xsd:sequence>
</xsd:complexType> </xsd:complexType>
<xsd:complexType name="messenger_validation"> <xsd:complexType name="messenger_bus">
<xsd:attribute name="enabled" type="xsd:boolean" /> <xsd:sequence>
<xsd:element name="middleware" type="xsd:string" minOccurs="0" maxOccurs="unbounded" />
</xsd:sequence>
<xsd:attribute name="name" type="xsd:string" use="required"/>
<xsd:attribute name="default-middlewares" type="xsd:boolean"/>
</xsd:complexType> </xsd:complexType>
</xsd:schema> </xsd:schema>

View File

@ -254,11 +254,6 @@ class ConfigurationTest extends TestCase
'messenger' => array( 'messenger' => array(
'enabled' => !class_exists(FullStack::class) && interface_exists(MessageBusInterface::class), 'enabled' => !class_exists(FullStack::class) && interface_exists(MessageBusInterface::class),
'routing' => array(), 'routing' => array(),
'middlewares' => array(
'validation' => array(
'enabled' => !class_exists(FullStack::class),
),
),
'adapters' => array(), 'adapters' => array(),
'serializer' => array( 'serializer' => array(
'enabled' => true, 'enabled' => true,
@ -267,6 +262,8 @@ class ConfigurationTest extends TestCase
), ),
'encoder' => 'messenger.transport.serializer', 'encoder' => 'messenger.transport.serializer',
'decoder' => 'messenger.transport.serializer', 'decoder' => 'messenger.transport.serializer',
'default_bus' => null,
'buses' => array('default' => array('default_middlewares' => true, 'middlewares' => array())),
), ),
); );
} }

View File

@ -0,0 +1,23 @@
<?php
$container->loadFromExtension('framework', array(
'messenger' => array(
'default_bus' => 'commands',
'buses' => array(
'commands' => null,
'events' => array(
'middlewares' => array(
'tolerate_no_handler',
),
),
'queries' => array(
'default_middlewares' => false,
'middlewares' => array(
'route_messages',
'tolerate_no_handler',
'call_message_handler',
),
),
),
),
));

View File

@ -1,11 +0,0 @@
<?php
$container->loadFromExtension('framework', array(
'messenger' => array(
'middlewares' => array(
'validation' => array(
'enabled' => false,
),
),
),
));

View File

@ -1,12 +0,0 @@
<?php
$container->loadFromExtension('framework', array(
'validation' => array('enabled' => true),
'messenger' => array(
'middlewares' => array(
'validation' => array(
'enabled' => true,
),
),
),
));

View File

@ -0,0 +1,21 @@
<?xml version="1.0" encoding="utf-8" ?>
<container xmlns="http://symfony.com/schema/dic/services"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:framework="http://symfony.com/schema/dic/symfony"
xsi:schemaLocation="http://symfony.com/schema/dic/services http://symfony.com/schema/dic/services/services-1.0.xsd
http://symfony.com/schema/dic/symfony http://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
<framework:config>
<framework:messenger default-bus="commands">
<framework:bus name="commands" />
<framework:bus name="events">
<framework:middleware>tolerate_no_handler</framework:middleware>
</framework:bus>
<framework:bus name="queries" default-middlewares="false">
<framework:middleware>route_messages</framework:middleware>
<framework:middleware>tolerate_no_handler</framework:middleware>
<framework:middleware>call_message_handler</framework:middleware>
</framework:bus>
</framework:messenger>
</framework:config>
</container>

View File

@ -1,15 +0,0 @@
<?xml version="1.0" encoding="utf-8" ?>
<container xmlns="http://symfony.com/schema/dic/services"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:framework="http://symfony.com/schema/dic/symfony"
xsi:schemaLocation="http://symfony.com/schema/dic/services http://symfony.com/schema/dic/services/services-1.0.xsd
http://symfony.com/schema/dic/symfony http://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
<framework:config>
<framework:messenger>
<framework:middlewares>
<framework:validation enabled="false"/>
</framework:middlewares>
</framework:messenger>
</framework:config>
</container>

View File

@ -1,16 +0,0 @@
<?xml version="1.0" encoding="utf-8" ?>
<container xmlns="http://symfony.com/schema/dic/services"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:framework="http://symfony.com/schema/dic/symfony"
xsi:schemaLocation="http://symfony.com/schema/dic/services http://symfony.com/schema/dic/services/services-1.0.xsd
http://symfony.com/schema/dic/symfony http://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
<framework:config>
<framework:validation enabled="true"/>
<framework:messenger>
<framework:middlewares>
<framework:validation enabled="true"/>
</framework:middlewares>
</framework:messenger>
</framework:config>
</container>

View File

@ -0,0 +1,14 @@
framework:
messenger:
default_bus: commands
buses:
commands: ~
events:
middlewares:
- "tolerate_no_handler"
queries:
default_middlewares: false
middlewares:
- "route_messages"
- "tolerate_no_handler"
- "call_message_handler"

View File

@ -1,5 +0,0 @@
framework:
messenger:
middlewares:
validation:
enabled: false

View File

@ -1,7 +0,0 @@
framework:
validation:
enabled: true
messenger:
middlewares:
validation:
enabled: true

View File

@ -524,20 +524,7 @@ abstract class FrameworkExtensionTest extends TestCase
public function testMessenger() public function testMessenger()
{ {
$container = $this->createContainerFromFile('messenger'); $container = $this->createContainerFromFile('messenger');
$this->assertTrue($container->hasDefinition('message_bus')); $this->assertTrue($container->has('message_bus'));
$this->assertFalse($container->hasDefinition('messenger.middleware.doctrine_transaction'));
}
public function testMessengerValidationEnabled()
{
$container = $this->createContainerFromFile('messenger_validation_enabled');
$this->assertTrue($definition = $container->hasDefinition('messenger.middleware.validator'));
}
public function testMessengerValidationDisabled()
{
$container = $this->createContainerFromFile('messenger_validation_disabled');
$this->assertFalse($container->hasDefinition('messenger.middleware.validator'));
} }
public function testMessengerAdapter() public function testMessengerAdapter()
@ -590,6 +577,24 @@ abstract class FrameworkExtensionTest extends TestCase
$this->assertSame(array('enable_max_depth' => true), $serializerTransportDefinition->getArgument(2)); $this->assertSame(array('enable_max_depth' => true), $serializerTransportDefinition->getArgument(2));
} }
public function testMessengerWithMultipleBuses()
{
$container = $this->createContainerFromFile('messenger_multiple_buses');
$this->assertTrue($container->has('messenger.bus.commands'));
$this->assertSame(array(), $container->getDefinition('messenger.bus.commands')->getArgument(0));
$this->assertEquals(array('logging', 'route_messages', 'call_message_handler'), $container->getParameter('messenger.bus.commands.middlewares'));
$this->assertTrue($container->has('messenger.bus.events'));
$this->assertSame(array(), $container->getDefinition('messenger.bus.events')->getArgument(0));
$this->assertEquals(array('logging', 'tolerate_no_handler', 'route_messages', 'call_message_handler'), $container->getParameter('messenger.bus.events.middlewares'));
$this->assertTrue($container->has('messenger.bus.queries'));
$this->assertSame(array(), $container->getDefinition('messenger.bus.queries')->getArgument(0));
$this->assertEquals(array('route_messages', 'tolerate_no_handler', 'call_message_handler'), $container->getParameter('messenger.bus.queries.middlewares'));
$this->assertTrue($container->hasAlias('message_bus'));
$this->assertSame('messenger.bus.commands', (string) $container->getAlias('message_bus'));
}
public function testTranslator() public function testTranslator()
{ {
$container = $this->createContainerFromFile('full'); $container = $this->createContainerFromFile('full');

View File

@ -14,21 +14,34 @@ namespace Symfony\Component\Messenger\DataCollector;
use Symfony\Component\HttpFoundation\Request; use Symfony\Component\HttpFoundation\Request;
use Symfony\Component\HttpFoundation\Response; use Symfony\Component\HttpFoundation\Response;
use Symfony\Component\HttpKernel\DataCollector\DataCollector; use Symfony\Component\HttpKernel\DataCollector\DataCollector;
use Symfony\Component\Messenger\MiddlewareInterface; use Symfony\Component\Messenger\TraceableMessageBus;
/** /**
* @author Samuel Roze <samuel.roze@gmail.com> * @author Samuel Roze <samuel.roze@gmail.com>
* *
* @experimental in 4.1 * @experimental in 4.1
*/ */
class MessengerDataCollector extends DataCollector implements MiddlewareInterface class MessengerDataCollector extends DataCollector
{ {
private $traceableBuses = array();
public function registerBus(string $name, TraceableMessageBus $bus)
{
$this->traceableBuses[$name] = $bus;
}
/** /**
* {@inheritdoc} * {@inheritdoc}
*/ */
public function collect(Request $request, Response $response, \Exception $exception = null) public function collect(Request $request, Response $response, \Exception $exception = null)
{ {
// noop $this->data = array('messages' => array());
foreach ($this->traceableBuses as $busName => $bus) {
foreach ($bus->getDispatchedMessages() as $message) {
$this->data['messages'][] = $this->collectMessage($busName, $message);
}
}
} }
/** /**
@ -47,21 +60,20 @@ class MessengerDataCollector extends DataCollector implements MiddlewareInterfac
$this->data = array(); $this->data = array();
} }
/** private function collectMessage(string $busName, array $tracedMessage)
* {@inheritdoc}
*/
public function handle($message, callable $next)
{ {
$message = $tracedMessage['message'];
$debugRepresentation = array( $debugRepresentation = array(
'bus' => $busName,
'message' => array( 'message' => array(
'type' => \get_class($message), 'type' => \get_class($message),
'object' => $this->cloneVar($message), 'object' => $this->cloneVar($message),
), ),
); );
$exception = null; if (array_key_exists('result', $tracedMessage)) {
try { $result = $tracedMessage['result'];
$result = $next($message);
if (\is_object($result)) { if (\is_object($result)) {
$debugRepresentation['result'] = array( $debugRepresentation['result'] = array(
@ -79,20 +91,18 @@ class MessengerDataCollector extends DataCollector implements MiddlewareInterfac
'value' => $result, 'value' => $result,
); );
} }
} catch (\Throwable $exception) { }
if (isset($tracedMessage['exception'])) {
$exception = $tracedMessage['exception'];
$debugRepresentation['exception'] = array( $debugRepresentation['exception'] = array(
'type' => \get_class($exception), 'type' => \get_class($exception),
'message' => $exception->getMessage(), 'message' => $exception->getMessage(),
); );
} }
$this->data['messages'][] = $debugRepresentation; return $debugRepresentation;
if (null !== $exception) {
throw $exception;
}
return $result;
} }
public function getMessages(): array public function getMessages(): array

View File

@ -11,6 +11,7 @@
namespace Symfony\Component\Messenger\DependencyInjection; namespace Symfony\Component\Messenger\DependencyInjection;
use Symfony\Component\DependencyInjection\ChildDefinition;
use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface; use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface;
use Symfony\Component\DependencyInjection\Compiler\PriorityTaggedServiceTrait; use Symfony\Component\DependencyInjection\Compiler\PriorityTaggedServiceTrait;
use Symfony\Component\DependencyInjection\Compiler\ServiceLocatorTagPass; use Symfony\Component\DependencyInjection\Compiler\ServiceLocatorTagPass;
@ -21,6 +22,7 @@ use Symfony\Component\DependencyInjection\Reference;
use Symfony\Component\Messenger\Handler\ChainHandler; use Symfony\Component\Messenger\Handler\ChainHandler;
use Symfony\Component\Messenger\Handler\MessageHandlerInterface; use Symfony\Component\Messenger\Handler\MessageHandlerInterface;
use Symfony\Component\Messenger\Handler\MessageSubscriberInterface; use Symfony\Component\Messenger\Handler\MessageSubscriberInterface;
use Symfony\Component\Messenger\TraceableMessageBus;
/** /**
* @author Samuel Roze <samuel.roze@gmail.com> * @author Samuel Roze <samuel.roze@gmail.com>
@ -29,15 +31,17 @@ class MessengerPass implements CompilerPassInterface
{ {
use PriorityTaggedServiceTrait; use PriorityTaggedServiceTrait;
private $messageBusService;
private $messageHandlerResolverService;
private $handlerTag; private $handlerTag;
private $busTag;
private $senderTag;
private $receiverTag;
public function __construct(string $messageBusService = 'message_bus', string $messageHandlerResolverService = 'messenger.handler_resolver', string $handlerTag = 'messenger.message_handler') public function __construct(string $handlerTag = 'messenger.message_handler', string $busTag = 'messenger.bus', string $senderTag = 'messenger.sender', string $receiverTag = 'messenger.receiver')
{ {
$this->messageBusService = $messageBusService;
$this->messageHandlerResolverService = $messageHandlerResolverService;
$this->handlerTag = $handlerTag; $this->handlerTag = $handlerTag;
$this->busTag = $busTag;
$this->senderTag = $senderTag;
$this->receiverTag = $receiverTag;
} }
/** /**
@ -45,12 +49,20 @@ class MessengerPass implements CompilerPassInterface
*/ */
public function process(ContainerBuilder $container) public function process(ContainerBuilder $container)
{ {
if (!$container->hasDefinition($this->messageBusService)) { if (!$container->hasDefinition('messenger.handler_resolver')) {
return; return;
} }
if (!$container->getParameter('kernel.debug') || !$container->hasAlias('logger')) { foreach ($container->findTaggedServiceIds($this->busTag) as $busId => $tags) {
$container->removeDefinition('messenger.middleware.debug.logging'); if ($container->hasParameter($busMiddlewaresParameter = $busId.'.middlewares')) {
$this->registerBusMiddlewares($container, $busId, $container->getParameter($busMiddlewaresParameter));
$container->getParameterBag()->remove($busMiddlewaresParameter);
}
if ($container->hasDefinition('messenger.data_collector')) {
$this->registerBusToCollector($container, $busId, $tags[0]);
}
} }
$this->registerReceivers($container); $this->registerReceivers($container);
@ -110,7 +122,7 @@ class MessengerPass implements CompilerPassInterface
$handlersLocatorMapping['handler.'.$message] = $handler; $handlersLocatorMapping['handler.'.$message] = $handler;
} }
$handlerResolver = $container->getDefinition($this->messageHandlerResolverService); $handlerResolver = $container->getDefinition('messenger.handler_resolver');
$handlerResolver->replaceArgument(0, ServiceLocatorTagPass::register($container, $handlersLocatorMapping)); $handlerResolver->replaceArgument(0, ServiceLocatorTagPass::register($container, $handlersLocatorMapping));
} }
@ -149,7 +161,7 @@ class MessengerPass implements CompilerPassInterface
private function registerReceivers(ContainerBuilder $container) private function registerReceivers(ContainerBuilder $container)
{ {
$receiverMapping = array(); $receiverMapping = array();
foreach ($container->findTaggedServiceIds('messenger.receiver') as $id => $tags) { foreach ($container->findTaggedServiceIds($this->receiverTag) as $id => $tags) {
foreach ($tags as $tag) { foreach ($tags as $tag) {
$receiverMapping[$id] = new Reference($id); $receiverMapping[$id] = new Reference($id);
@ -165,7 +177,7 @@ class MessengerPass implements CompilerPassInterface
private function registerSenders(ContainerBuilder $container) private function registerSenders(ContainerBuilder $container)
{ {
$senderLocatorMapping = array(); $senderLocatorMapping = array();
foreach ($container->findTaggedServiceIds('messenger.sender') as $id => $tags) { foreach ($container->findTaggedServiceIds($this->senderTag) as $id => $tags) {
foreach ($tags as $tag) { foreach ($tags as $tag) {
$senderLocatorMapping[$id] = new Reference($id); $senderLocatorMapping[$id] = new Reference($id);
@ -177,4 +189,35 @@ class MessengerPass implements CompilerPassInterface
$container->getDefinition('messenger.sender_locator')->replaceArgument(0, $senderLocatorMapping); $container->getDefinition('messenger.sender_locator')->replaceArgument(0, $senderLocatorMapping);
} }
private function registerBusToCollector(ContainerBuilder $container, string $busId, array $tag)
{
$container->setDefinition(
$tracedBusId = 'debug.traced.'.$busId,
(new Definition(TraceableMessageBus::class, array(new Reference($tracedBusId.'.inner'))))->setDecoratedService($busId)
);
$container->getDefinition('messenger.data_collector')->addMethodCall('registerBus', array($tag['name'] ?? $busId, new Reference($tracedBusId)));
}
private function registerBusMiddlewares(ContainerBuilder $container, string $busId, array $middlewares)
{
$container->getDefinition($busId)->replaceArgument(0, array_map(function (string $name) use ($container, $busId) {
if (!$container->has($messengerMiddlewareId = 'messenger.middleware.'.$name)) {
$messengerMiddlewareId = $name;
}
if (!$container->has($messengerMiddlewareId)) {
throw new RuntimeException(sprintf('Invalid middleware "%s": define such service to be able to use it.', $name));
}
if ($container->getDefinition($messengerMiddlewareId)->isAbstract()) {
$childDefinition = new ChildDefinition($messengerMiddlewareId);
$container->setDefinition($messengerMiddlewareId = $busId.'.middleware.'.$name, $childDefinition);
}
return new Reference($messengerMiddlewareId);
}, $middlewares));
}
} }

View File

@ -9,7 +9,7 @@
* file that was distributed with this source code. * file that was distributed with this source code.
*/ */
namespace Symfony\Component\Messenger\Debug; namespace Symfony\Component\Messenger\Middleware;
use Symfony\Component\Messenger\MiddlewareInterface; use Symfony\Component\Messenger\MiddlewareInterface;
use Psr\Log\LoggerInterface; use Psr\Log\LoggerInterface;

View File

@ -0,0 +1,30 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Middleware;
use Symfony\Component\Messenger\Exception\NoHandlerForMessageException;
use Symfony\Component\Messenger\MiddlewareInterface;
/**
* @author Samuel Roze <samuel.roze@gmail.com>
*/
class TolerateNoHandler implements MiddlewareInterface
{
public function handle($message, callable $next)
{
try {
return $next($message);
} catch (NoHandlerForMessageException $e) {
// We tolerate not having a handler for this message.
}
}
}

View File

@ -12,8 +12,12 @@
namespace Symfony\Component\Messenger\Tests\DataCollector; namespace Symfony\Component\Messenger\Tests\DataCollector;
use PHPUnit\Framework\TestCase; use PHPUnit\Framework\TestCase;
use Symfony\Component\HttpFoundation\Request;
use Symfony\Component\HttpFoundation\Response;
use Symfony\Component\Messenger\DataCollector\MessengerDataCollector; use Symfony\Component\Messenger\DataCollector\MessengerDataCollector;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\TraceableMessageBus;
use Symfony\Component\VarDumper\Test\VarDumperTestTrait; use Symfony\Component\VarDumper\Test\VarDumperTestTrait;
/** /**
@ -28,13 +32,18 @@ class MessengerDataCollectorTest extends TestCase
*/ */
public function testHandle($returnedValue, $expected) public function testHandle($returnedValue, $expected)
{ {
$collector = new MessengerDataCollector();
$message = new DummyMessage('dummy message'); $message = new DummyMessage('dummy message');
$next = $this->createPartialMock(\stdClass::class, array('__invoke')); $bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
$next->expects($this->once())->method('__invoke')->with($message)->willReturn($returnedValue); $bus->method('dispatch')->with($message)->willReturn($returnedValue);
$bus = new TraceableMessageBus($bus);
$this->assertSame($returnedValue, $collector->handle($message, $next)); $collector = new MessengerDataCollector();
$collector->registerBus('default', $bus);
$bus->dispatch($message);
$collector->collect(Request::create('/'), new Response());
$messages = $collector->getMessages(); $messages = $collector->getMessages();
$this->assertCount(1, $messages); $this->assertCount(1, $messages);
@ -45,6 +54,7 @@ class MessengerDataCollectorTest extends TestCase
public function getHandleTestData() public function getHandleTestData()
{ {
$messageDump = <<<DUMP $messageDump = <<<DUMP
"bus" => "default"
"message" => array:2 [ "message" => array:2 [
"type" => "Symfony\Component\Messenger\Tests\Fixtures\DummyMessage" "type" => "Symfony\Component\Messenger\Tests\Fixtures\DummyMessage"
"object" => Symfony\Component\VarDumper\Cloner\Data {%A "object" => Symfony\Component\VarDumper\Cloner\Data {%A
@ -56,7 +66,7 @@ DUMP;
yield 'no returned value' => array( yield 'no returned value' => array(
null, null,
<<<DUMP <<<DUMP
array:2 [ array:3 [
$messageDump $messageDump
"result" => array:2 [ "result" => array:2 [
"type" => "NULL" "type" => "NULL"
@ -69,7 +79,7 @@ DUMP
yield 'scalar returned value' => array( yield 'scalar returned value' => array(
'returned value', 'returned value',
<<<DUMP <<<DUMP
array:2 [ array:3 [
$messageDump $messageDump
"result" => array:2 [ "result" => array:2 [
"type" => "string" "type" => "string"
@ -82,7 +92,7 @@ DUMP
yield 'array returned value' => array( yield 'array returned value' => array(
array('returned value'), array('returned value'),
<<<DUMP <<<DUMP
array:2 [ array:3 [
$messageDump $messageDump
"result" => array:2 [ "result" => array:2 [
"type" => "array" "type" => "array"
@ -95,24 +105,29 @@ DUMP
public function testHandleWithException() public function testHandleWithException()
{ {
$collector = new MessengerDataCollector();
$message = new DummyMessage('dummy message'); $message = new DummyMessage('dummy message');
$expectedException = new \RuntimeException('foo'); $bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
$next = $this->createPartialMock(\stdClass::class, array('__invoke')); $bus->method('dispatch')->with($message)->will($this->throwException(new \RuntimeException('foo')));
$next->expects($this->once())->method('__invoke')->with($message)->willThrowException($expectedException); $bus = new TraceableMessageBus($bus);
$collector = new MessengerDataCollector();
$collector->registerBus('default', $bus);
try { try {
$collector->handle($message, $next); $bus->dispatch($message);
} catch (\Throwable $actualException) { } catch (\Throwable $e) {
$this->assertSame($expectedException, $actualException); // Ignore.
} }
$collector->collect(Request::create('/'), new Response());
$messages = $collector->getMessages(); $messages = $collector->getMessages();
$this->assertCount(1, $messages); $this->assertCount(1, $messages);
$this->assertDumpMatchesFormat(<<<DUMP $this->assertDumpMatchesFormat(<<<DUMP
array:2 [ array:3 [
"bus" => "default"
"message" => array:2 [ "message" => array:2 [
"type" => "Symfony\Component\Messenger\Tests\Fixtures\DummyMessage" "type" => "Symfony\Component\Messenger\Tests\Fixtures\DummyMessage"
"object" => Symfony\Component\VarDumper\Cloner\Data {%A "object" => Symfony\Component\VarDumper\Cloner\Data {%A

View File

@ -19,9 +19,13 @@ use Symfony\Component\DependencyInjection\ServiceLocator;
use Symfony\Component\Messenger\Adapter\AmqpExt\AmqpReceiver; use Symfony\Component\Messenger\Adapter\AmqpExt\AmqpReceiver;
use Symfony\Component\Messenger\Adapter\AmqpExt\AmqpSender; use Symfony\Component\Messenger\Adapter\AmqpExt\AmqpSender;
use Symfony\Component\Messenger\ContainerHandlerLocator; use Symfony\Component\Messenger\ContainerHandlerLocator;
use Symfony\Component\Messenger\DataCollector\MessengerDataCollector;
use Symfony\Component\Messenger\DependencyInjection\MessengerPass; use Symfony\Component\Messenger\DependencyInjection\MessengerPass;
use Symfony\Component\Messenger\Handler\ChainHandler; use Symfony\Component\Messenger\Handler\ChainHandler;
use Symfony\Component\Messenger\Handler\MessageSubscriberInterface; use Symfony\Component\Messenger\Handler\MessageSubscriberInterface;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Middleware\TolerateNoHandler;
use Symfony\Component\Messenger\MiddlewareInterface;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Tests\Fixtures\SecondMessage; use Symfony\Component\Messenger\Tests\Fixtures\SecondMessage;
use Symfony\Component\Messenger\Transport\ReceiverInterface; use Symfony\Component\Messenger\Transport\ReceiverInterface;
@ -237,11 +241,58 @@ class MessengerPassTest extends TestCase
(new MessengerPass())->process($container); (new MessengerPass())->process($container);
} }
public function testRegistersTraceableBusesToCollector()
{
$dataCollector = $this->getMockBuilder(MessengerDataCollector::class)->getMock();
$container = $this->getContainerBuilder();
$container->register('messenger.data_collector', $dataCollector);
$container->register($fooBusId = 'messenger.bus.foo', MessageBusInterface::class)->addTag('messenger.bus', array('name' => 'foo'));
$container->register($barBusId = 'messenger.bus.bar', MessageBusInterface::class)->addTag('messenger.bus');
$container->setParameter('kernel.debug', true);
(new MessengerPass())->process($container);
$this->assertTrue($container->hasDefinition($debuggedFooBusId = 'debug.traced.'.$fooBusId));
$this->assertSame(array($fooBusId, null, 0), $container->getDefinition($debuggedFooBusId)->getDecoratedService());
$this->assertTrue($container->hasDefinition($debuggedBarBusId = 'debug.traced.'.$barBusId));
$this->assertSame(array($barBusId, null, 0), $container->getDefinition($debuggedBarBusId)->getDecoratedService());
$this->assertEquals(array(array('registerBus', array('foo', new Reference($debuggedFooBusId))), array('registerBus', array('messenger.bus.bar', new Reference($debuggedBarBusId)))), $container->getDefinition('messenger.data_collector')->getMethodCalls());
}
public function testRegistersMiddlewaresFromServices()
{
$container = $this->getContainerBuilder();
$container->register($fooBusId = 'messenger.bus.foo', MessageBusInterface::class)->setArgument(0, array())->addTag('messenger.bus', array('name' => 'foo'));
$container->register('messenger.middleware.tolerate_no_handler', TolerateNoHandler::class)->setAbstract(true);
$container->register(UselessMiddleware::class, UselessMiddleware::class);
$container->setParameter($middlewaresParameter = $fooBusId.'.middlewares', array(UselessMiddleware::class, 'tolerate_no_handler'));
(new MessengerPass())->process($container);
$this->assertTrue($container->hasDefinition($childMiddlewareId = $fooBusId.'.middleware.tolerate_no_handler'));
$this->assertEquals(array(new Reference(UselessMiddleware::class), new Reference($childMiddlewareId)), $container->getDefinition($fooBusId)->getArgument(0));
$this->assertFalse($container->hasParameter($middlewaresParameter));
}
/**
* @expectedException \Symfony\Component\DependencyInjection\Exception\RuntimeException
* @expectedExceptionMessage Invalid middleware "not_defined_middleware": define such service to be able to use it.
*/
public function testCannotRegistersAnUndefinedMiddleware()
{
$container = $this->getContainerBuilder();
$container->register($fooBusId = 'messenger.bus.foo', MessageBusInterface::class)->setArgument(0, array())->addTag('messenger.bus', array('name' => 'foo'));
$container->setParameter($middlewaresParameter = $fooBusId.'.middlewares', array('not_defined_middleware'));
(new MessengerPass())->process($container);
}
private function getContainerBuilder(): ContainerBuilder private function getContainerBuilder(): ContainerBuilder
{ {
$container = new ContainerBuilder(); $container = new ContainerBuilder();
$container->setParameter('kernel.debug', true); $container->setParameter('kernel.debug', true);
$container->register('message_bus', ContainerHandlerLocator::class);
$container $container
->register('messenger.sender_locator', ServiceLocator::class) ->register('messenger.sender_locator', ServiceLocator::class)
@ -354,3 +405,11 @@ class HandleNoMessageHandler implements MessageSubscriberInterface
return array(); return array();
} }
} }
class UselessMiddleware implements MiddlewareInterface
{
public function handle($message, callable $next)
{
return $next($message);
}
}

View File

@ -9,11 +9,11 @@
* file that was distributed with this source code. * file that was distributed with this source code.
*/ */
namespace Symfony\Component\Messenger\Tests\Debug; namespace Symfony\Component\Messenger\Tests\Middleware;
use PHPUnit\Framework\TestCase; use PHPUnit\Framework\TestCase;
use Psr\Log\LoggerInterface; use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Debug\LoggingMiddleware; use Symfony\Component\Messenger\Middleware\LoggingMiddleware;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
class LoggingMiddlewareTest extends TestCase class LoggingMiddlewareTest extends TestCase

View File

@ -0,0 +1,54 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Tests\Middleware;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Exception\NoHandlerForMessageException;
use Symfony\Component\Messenger\Middleware\TolerateNoHandler;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
class TolerateNoHandlerTest extends TestCase
{
public function testItCallsNextMiddlewareAndReturnsItsResult()
{
$message = new DummyMessage('Hey');
$next = $this->createPartialMock(\stdClass::class, array('__invoke'));
$next->expects($this->once())->method('__invoke')->with($message)->willReturn('Foo');
$middleware = new TolerateNoHandler();
$this->assertSame('Foo', $middleware->handle($message, $next));
}
public function testItCatchesTheNoHandlerException()
{
$next = $this->createPartialMock(\stdClass::class, array('__invoke'));
$next->expects($this->once())->method('__invoke')->will($this->throwException(new NoHandlerForMessageException()));
$middleware = new TolerateNoHandler();
$this->assertNull($middleware->handle(new DummyMessage('Hey'), $next));
}
/**
* @expectedException \RuntimeException
* @expectedExceptionMessage Something went wrong.
*/
public function testItDoesNotCatchOtherExceptions()
{
$next = $this->createPartialMock(\stdClass::class, array('__invoke'));
$next->expects($this->once())->method('__invoke')->will($this->throwException(new \RuntimeException('Something went wrong.')));
$middleware = new TolerateNoHandler();
$middleware->handle(new DummyMessage('Hey'), $next);
}
}

View File

@ -0,0 +1,50 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Tests;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\TraceableMessageBus;
class TraceableMessageBusTest extends TestCase
{
public function testItTracesResult()
{
$message = new DummyMessage('Hello');
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
$bus->expects($this->once())->method('dispatch')->with($message)->willReturn($result = array('foo' => 'bar'));
$traceableBus = new TraceableMessageBus($bus);
$this->assertSame($result, $traceableBus->dispatch($message));
$this->assertSame(array(array('message' => $message, 'result' => $result)), $traceableBus->getDispatchedMessages());
}
public function testItTracesExceptions()
{
$message = new DummyMessage('Hello');
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
$bus->expects($this->once())->method('dispatch')->with($message)->will($this->throwException($exception = new \RuntimeException('Meh.')));
$traceableBus = new TraceableMessageBus($bus);
try {
$traceableBus->dispatch($message);
} catch (\RuntimeException $e) {
$this->assertSame($exception, $e);
}
$this->assertSame(array(array('message' => $message, 'exception' => $exception)), $traceableBus->getDispatchedMessages());
}
}

View File

@ -0,0 +1,55 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger;
/**
* @author Samuel Roze <samuel.roze@gmail.com>
*/
class TraceableMessageBus implements MessageBusInterface
{
private $decoratedBus;
private $dispatchedMessages = array();
public function __construct(MessageBusInterface $decoratedBus)
{
$this->decoratedBus = $decoratedBus;
}
/**
* {@inheritdoc}
*/
public function dispatch($message)
{
try {
$result = $this->decoratedBus->dispatch($message);
$this->dispatchedMessages[] = array(
'message' => $message,
'result' => $result,
);
return $result;
} catch (\Throwable $e) {
$this->dispatchedMessages[] = array(
'message' => $message,
'exception' => $e,
);
throw $e;
}
}
public function getDispatchedMessages(): array
{
return $this->dispatchedMessages;
}
}