Merge branch '4.1'

* 4.1:
  [Messenger] Middleware factories support in config
  [HttpKernel] Make TraceableValueResolver $stopwatch mandatory
  [Messenger] Improve the profiler panel
  [Workflow] Added DefinitionBuilder::setMetadataStore().
  [Messenger][DX] Uses custom method names for handlers
  [Messenger] remove autoconfiguration for Sender/ReceiverInterface
  [Messenger] Make sure default receiver name is set before command configuration
  [HttpKernel] Fix services are no longer injected into __invoke controllers method
  Rename tag attribute "name" by "alias"
  Autoconfiguring TransportFactoryInterface classes
  [Messenger] Fix new AMQP Transport test with Envelope
  Fixed return senders based on the message parents/interfaces
  [Messenger] Make sure Sender and Receiver locators have valid services
  [Workflow] add is deprecated since Symfony 4.1. Use addWorkflow() instead
  [Messenger] Fix TraceableBus with envelope
  Ensure the envelope is passed back and can be altered Ensure that the middlewares can also update the message within the envelope
  feature #26945 [Messenger] Support configuring messages when dispatching (ogizanagi)
  Add more tests around the AMQP transport
This commit is contained in:
Samuel ROZE 2018-05-14 20:20:21 +01:00
commit 4a7e6fa9bc
77 changed files with 1807 additions and 327 deletions

View File

@ -0,0 +1,37 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Bridge\Doctrine\Messenger;
use Symfony\Bridge\Doctrine\ManagerRegistry;
/**
* Create a Doctrine ORM transaction middleware to be used in a message bus from an entity manager name.
*
* @author Maxime Steinhausser <maxime.steinhausser@gmail.com>
*
* @experimental in 4.1
* @final
*/
class DoctrineTransactionMiddlewareFactory
{
private $managerRegistry;
public function __construct(ManagerRegistry $managerRegistry)
{
$this->managerRegistry = $managerRegistry;
}
public function createMiddleware(string $managerName): DoctrineTransactionMiddleware
{
return new DoctrineTransactionMiddleware($this->managerRegistry, $managerName);
}
}

View File

@ -1061,7 +1061,36 @@ class Configuration implements ConfigurationInterface
})
->end()
->defaultValue(array())
->prototype('scalar')->end()
->prototype('array')
->beforeNormalization()
->always()
->then(function ($middleware): array {
if (!\is_array($middleware)) {
return array('id' => $middleware);
}
if (isset($middleware['id'])) {
return $middleware;
}
if (\count($middleware) > 1) {
throw new \InvalidArgumentException(sprintf('There is an error at path "framework.messenger" in one of the buses middleware definitions: expected a single entry for a middleware item config, with factory id as key and arguments as value. Got "%s".', json_encode($middleware)));
}
return array(
'id' => key($middleware),
'arguments' => current($middleware),
);
})
->end()
->fixXmlConfig('argument')
->children()
->scalarNode('id')->isRequired()->cannotBeEmpty()->end()
->arrayNode('arguments')
->normalizeKeys(false)
->defaultValue(array())
->prototype('variable')
->end()
->end()
->end()
->end()
->end()
->end()

View File

@ -63,8 +63,7 @@ use Symfony\Component\Lock\StoreInterface;
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;
use Symfony\Component\Messenger\MessageBus;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Transport\ReceiverInterface;
use Symfony\Component\Messenger\Transport\SenderInterface;
use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;
use Symfony\Component\PropertyAccess\PropertyAccessor;
use Symfony\Component\PropertyInfo\PropertyAccessExtractorInterface;
@ -343,12 +342,10 @@ class FrameworkExtension extends Extension
->addTag('validator.constraint_validator');
$container->registerForAutoconfiguration(ObjectInitializerInterface::class)
->addTag('validator.initializer');
$container->registerForAutoconfiguration(ReceiverInterface::class)
->addTag('messenger.receiver');
$container->registerForAutoconfiguration(SenderInterface::class)
->addTag('messenger.sender');
$container->registerForAutoconfiguration(MessageHandlerInterface::class)
->addTag('messenger.message_handler');
$container->registerForAutoconfiguration(TransportFactoryInterface::class)
->addTag('messenger.transport_factory');
if (!$container->getParameter('kernel.debug')) {
// remove tagged iterator argument for resource checkers
@ -573,10 +570,10 @@ class FrameworkExtension extends Extension
foreach ($workflow['supports'] as $supportedClassName) {
$strategyDefinition = new Definition(Workflow\SupportStrategy\InstanceOfSupportStrategy::class, array($supportedClassName));
$strategyDefinition->setPublic(false);
$registryDefinition->addMethodCall('add', array(new Reference($workflowId), $strategyDefinition));
$registryDefinition->addMethodCall('addWorkflow', array(new Reference($workflowId), $strategyDefinition));
}
} elseif (isset($workflow['support_strategy'])) {
$registryDefinition->addMethodCall('add', array(new Reference($workflowId), new Reference($workflow['support_strategy'])));
$registryDefinition->addMethodCall('addWorkflow', array(new Reference($workflowId), new Reference($workflow['support_strategy'])));
}
// Enable the AuditTrail
@ -1471,12 +1468,17 @@ class FrameworkExtension extends Extension
$config['default_bus'] = key($config['buses']);
}
$defaultMiddleware = array('before' => array('logging'), 'after' => array('route_messages', 'call_message_handler'));
$defaultMiddleware = array(
'before' => array(array('id' => 'logging')),
'after' => array(array('id' => 'route_messages'), array('id' => 'call_message_handler')),
);
foreach ($config['buses'] as $busId => $bus) {
$middleware = $bus['default_middleware'] ? array_merge($defaultMiddleware['before'], $bus['middleware'], $defaultMiddleware['after']) : $bus['middleware'];
if (!$validationConfig['enabled'] && \in_array('messenger.middleware.validation', $middleware, true)) {
throw new LogicException('The Validation middleware is only available when the Validator component is installed and enabled. Try running "composer require symfony/validator".');
foreach ($middleware as $middlewareItem) {
if (!$validationConfig['enabled'] && 'messenger.middleware.validation' === $middlewareItem['id']) {
throw new LogicException('The Validation middleware is only available when the Validator component is installed and enabled. Try running "composer require symfony/validator".');
}
}
$container->setParameter($busId.'.middleware', $middleware);
@ -1511,8 +1513,8 @@ class FrameworkExtension extends Extension
$transportDefinition = (new Definition(TransportInterface::class))
->setFactory(array(new Reference('messenger.transport_factory'), 'createTransport'))
->setArguments(array($transport['dsn'], $transport['options']))
->addTag('messenger.receiver', array('name' => $name))
->addTag('messenger.sender', array('name' => $name))
->addTag('messenger.receiver', array('alias' => $name))
->addTag('messenger.sender', array('alias' => $name))
;
$container->setDefinition('messenger.transport.'.$name, $transportDefinition);
}

View File

@ -391,9 +391,16 @@
<xsd:complexType name="messenger_bus">
<xsd:sequence>
<xsd:element name="middleware" type="xsd:string" minOccurs="0" maxOccurs="unbounded" />
<xsd:element name="middleware" type="messenger_middleware" minOccurs="0" maxOccurs="unbounded" />
</xsd:sequence>
<xsd:attribute name="name" type="xsd:string" use="required"/>
<xsd:attribute name="default-middleware" type="xsd:boolean"/>
</xsd:complexType>
<xsd:complexType name="messenger_middleware">
<xsd:sequence>
<xsd:element name="argument" type="xsd:anyType" minOccurs="0" maxOccurs="unbounded" />
</xsd:sequence>
<xsd:attribute name="id" type="xsd:string" use="required"/>
</xsd:complexType>
</xsd:schema>

View File

@ -0,0 +1,16 @@
<?php
$container->loadFromExtension('framework', array(
'messenger' => array(
'buses' => array(
'command_bus' => array(
'middleware' => array(
array(
'foo' => array('qux'),
'bar' => array('baz'),
),
),
),
),
),
));

View File

@ -7,6 +7,7 @@ $container->loadFromExtension('framework', array(
'messenger.bus.commands' => null,
'messenger.bus.events' => array(
'middleware' => array(
array('with_factory' => array('foo', true, array('bar' => 'baz'))),
'allow_no_handler',
),
),

View File

@ -9,12 +9,19 @@
<framework:messenger default-bus="messenger.bus.commands">
<framework:bus name="messenger.bus.commands" />
<framework:bus name="messenger.bus.events">
<framework:middleware>allow_no_handler</framework:middleware>
<framework:middleware id="with_factory">
<framework:argument>foo</framework:argument>
<framework:argument>true</framework:argument>
<framework:argument>
<framework:bar>baz</framework:bar>
</framework:argument>
</framework:middleware>
<framework:middleware id="allow_no_handler" />
</framework:bus>
<framework:bus name="messenger.bus.queries" default-middleware="false">
<framework:middleware>route_messages</framework:middleware>
<framework:middleware>allow_no_handler</framework:middleware>
<framework:middleware>call_message_handler</framework:middleware>
<framework:middleware id="route_messages" />
<framework:middleware id="allow_no_handler" />
<framework:middleware id="call_message_handler" />
</framework:bus>
</framework:messenger>
</framework:config>

View File

@ -0,0 +1,7 @@
framework:
messenger:
buses:
command_bus:
middleware:
- foo: ['qux']
bar: ['baz']

View File

@ -5,6 +5,7 @@ framework:
messenger.bus.commands: ~
messenger.bus.events:
middleware:
- with_factory: [foo, true, { bar: baz }]
- "allow_no_handler"
messenger.bus.queries:
default_middleware: false

View File

@ -36,6 +36,7 @@ use Symfony\Component\EventDispatcher\EventDispatcherInterface;
use Symfony\Component\HttpKernel\DependencyInjection\LoggerPass;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Tests\Fixtures\SecondMessage;
use Symfony\Component\Messenger\Transport\TransportFactory;
use Symfony\Component\PropertyAccess\PropertyAccessor;
use Symfony\Component\Serializer\Mapping\Loader\AnnotationLoader;
use Symfony\Component\Serializer\Normalizer\DateIntervalNormalizer;
@ -528,6 +529,8 @@ abstract class FrameworkExtensionTest extends TestCase
$container = $this->createContainerFromFile('messenger');
$this->assertTrue($container->hasAlias('message_bus'));
$this->assertFalse($container->hasDefinition('messenger.transport.amqp.factory'));
$this->assertTrue($container->hasDefinition('messenger.transport_factory'));
$this->assertSame(TransportFactory::class, $container->getDefinition('messenger.transport_factory')->getClass());
}
public function testMessengerTransports()
@ -536,8 +539,8 @@ abstract class FrameworkExtensionTest extends TestCase
$this->assertTrue($container->hasDefinition('messenger.transport.default'));
$this->assertTrue($container->getDefinition('messenger.transport.default')->hasTag('messenger.receiver'));
$this->assertTrue($container->getDefinition('messenger.transport.default')->hasTag('messenger.sender'));
$this->assertEquals(array(array('name' => 'default')), $container->getDefinition('messenger.transport.default')->getTag('messenger.receiver'));
$this->assertEquals(array(array('name' => 'default')), $container->getDefinition('messenger.transport.default')->getTag('messenger.sender'));
$this->assertEquals(array(array('alias' => 'default')), $container->getDefinition('messenger.transport.default')->getTag('messenger.receiver'));
$this->assertEquals(array(array('alias' => 'default')), $container->getDefinition('messenger.transport.default')->getTag('messenger.sender'));
$this->assertTrue($container->hasDefinition('messenger.transport.customised'));
$transportFactory = $container->getDefinition('messenger.transport.customised')->getFactory();
@ -601,18 +604,41 @@ abstract class FrameworkExtensionTest extends TestCase
$this->assertTrue($container->has('messenger.bus.commands'));
$this->assertSame(array(), $container->getDefinition('messenger.bus.commands')->getArgument(0));
$this->assertEquals(array('logging', 'route_messages', 'call_message_handler'), $container->getParameter('messenger.bus.commands.middleware'));
$this->assertEquals(array(
array('id' => 'logging'),
array('id' => 'route_messages'),
array('id' => 'call_message_handler'),
), $container->getParameter('messenger.bus.commands.middleware'));
$this->assertTrue($container->has('messenger.bus.events'));
$this->assertSame(array(), $container->getDefinition('messenger.bus.events')->getArgument(0));
$this->assertEquals(array('logging', 'allow_no_handler', 'route_messages', 'call_message_handler'), $container->getParameter('messenger.bus.events.middleware'));
$this->assertEquals(array(
array('id' => 'logging'),
array('id' => 'with_factory', 'arguments' => array('foo', true, array('bar' => 'baz'))),
array('id' => 'allow_no_handler', 'arguments' => array()),
array('id' => 'route_messages'),
array('id' => 'call_message_handler'),
), $container->getParameter('messenger.bus.events.middleware'));
$this->assertTrue($container->has('messenger.bus.queries'));
$this->assertSame(array(), $container->getDefinition('messenger.bus.queries')->getArgument(0));
$this->assertEquals(array('route_messages', 'allow_no_handler', 'call_message_handler'), $container->getParameter('messenger.bus.queries.middleware'));
$this->assertEquals(array(
array('id' => 'route_messages', 'arguments' => array()),
array('id' => 'allow_no_handler', 'arguments' => array()),
array('id' => 'call_message_handler', 'arguments' => array()),
), $container->getParameter('messenger.bus.queries.middleware'));
$this->assertTrue($container->hasAlias('message_bus'));
$this->assertSame('messenger.bus.commands', (string) $container->getAlias('message_bus'));
}
/**
* @expectedException \InvalidArgumentException
* @expectedExceptionMessage There is an error at path "framework.messenger" in one of the buses middleware definitions: expected a single entry for a middleware item config, with factory id as key and arguments as value. Got "{"foo":["qux"],"bar":["baz"]}"
*/
public function testMessengerMiddlewareFactoryErroneousFormat()
{
$this->createContainerFromFile('messenger_middleware_factory_erroneous_format');
}
public function testTranslator()
{
$container = $this->createContainerFromFile('full');

View File

@ -27,4 +27,9 @@ class XmlFrameworkExtensionTest extends FrameworkExtensionTest
{
$this->markTestSkipped('The assets key cannot be set to false using the XML configuration format.');
}
public function testMessengerMiddlewareFactoryErroneousFormat()
{
$this->markTestSkipped('XML configuration will not allow eeroneous format.');
}
}

View File

@ -51,7 +51,7 @@
"symfony/templating": "~3.4|~4.0",
"symfony/validator": "^4.1",
"symfony/var-dumper": "~3.4|~4.0",
"symfony/workflow": "~3.4|~4.0",
"symfony/workflow": "^4.1",
"symfony/yaml": "~3.4|~4.0",
"symfony/property-info": "~3.4|~4.0",
"symfony/lock": "~3.4|~4.0",
@ -72,7 +72,7 @@
"symfony/stopwatch": "<3.4",
"symfony/translation": "<3.4",
"symfony/validator": "<4.1",
"symfony/workflow": "<3.4"
"symfony/workflow": "<4.1"
},
"suggest": {
"ext-apcu": "For best performance of the system caches",

View File

@ -4,17 +4,33 @@
{% block toolbar %}
{% if collector.messages|length > 0 %}
{% set status_color = collector.exceptionsCount ? 'red' %}
{% set icon %}
{{ include('@WebProfiler/Icon/messenger.svg') }}
<span class="sf-toolbar-value">{{ collector.messages|length }}</span>
{% endset %}
{{ include('@WebProfiler/Profiler/toolbar_item.html.twig', { link: 'messenger' }) }}
{% set text %}
{% for bus in collector.buses %}
{% set exceptionsCount = collector.exceptionsCount(bus) %}
<div class="sf-toolbar-info-piece">
<b>{{ bus }}</b>
<span
title="{{ exceptionsCount }} message(s) with exceptions"
class="sf-toolbar-status sf-toolbar-status-{{ exceptionsCount ? 'red' }}"
>
{{ collector.messages(bus)|length }}
</span>
</div>
{% endfor %}
{% endset %}
{{ include('@WebProfiler/Profiler/toolbar_item.html.twig', { link: 'messenger', status: status_color }) }}
{% endif %}
{% endblock %}
{% block menu %}
<span class="label">
<span class="label {{ collector.exceptionsCount ? 'label-status-error' }}">
<span class="icon">{{ include('@WebProfiler/Icon/messenger.svg') }}</span>
<strong>Messages</strong>
@ -24,7 +40,28 @@
</span>
{% endblock %}
{% block head %}
{{ parent() }}
<style>
.message-item thead th { position: relative; cursor: pointer; user-select: none; padding-right: 35px; }
.message-item tbody tr td:first-child { width: 115px; }
.message-item .label { float: right; padding: 1px 5px; opacity: .75; margin-left: 5px; }
.message-item .toggle-button { position: absolute; right: 6px; top: 6px; opacity: .5; pointer-events: none }
.message-item .icon svg { height: 24px; width: 24px; }
.message-item .sf-toggle-off .icon-close, .sf-toggle-on .icon-open { display: none; }
.message-item .sf-toggle-off .icon-open, .sf-toggle-on .icon-close { display: block; }
.message-bus .badge.status-some-errors { line-height: 16px; border-bottom: 2px solid #B0413E; }
.message-item .sf-toggle-content.sf-toggle-visible { display: table-row-group; }
</style>
{% endblock %}
{% block panel %}
{% import _self as helper %}
<h2>Messages</h2>
{% if collector.messages is empty %}
@ -32,41 +69,99 @@
<p>No messages have been collected.</p>
</div>
{% else %}
<table>
<thead>
<tr>
<th>Bus</th>
<th>Message</th>
<th>Result</th>
</tr>
</thead>
<tbody>
{% for message in collector.messages %}
<tr>
<td>{{ message.bus }}</td>
<td>
{% if message.result.object is defined %}
{{ profiler_dump(message.message.object, maxDepth=2) }}
{% else %}
{{ message.message.type }}
{% endif %}
</td>
<td>
{% if message.result.object is defined %}
{{ profiler_dump(message.result.object, maxDepth=2) }}
{% elseif message.result.type is defined %}
{{ message.result.type }}
{% if message.result.value is defined %}
{{ message.result.value }}
{% endif %}
{% endif %}
{% if message.exception.type is defined %}
{{ message.exception.type }}
{% endif %}
</td>
</tr>
{% endfor %}
</tbody>
</table>
<div class="sf-tabs message-bus">
<div class="tab">
{% set messages = collector.messages %}
{% set exceptionsCount = collector.exceptionsCount %}
<h3 class="tab-title">All<span class="badge {{ exceptionsCount ? exceptionsCount == messages|length ? 'status-error' : 'status-some-errors' }}">{{ messages|length }}</span></h3>
<div class="tab-content">
<p class="text-muted">Ordered list of dispatched messages across all your buses</p>
{{ helper.render_bus_messages(messages, true) }}
</div>
</div>
{% for bus in collector.buses %}
<div class="tab message-bus">
{% set messages = collector.messages(bus) %}
{% set exceptionsCount = collector.exceptionsCount(bus) %}
<h3 class="tab-title">{{ bus }}<span class="badge {{ exceptionsCount ? exceptionsCount == messages|length ? 'status-error' : 'status-some-errors' }}">{{ messages|length }}</span></h3>
<div class="tab-content">
<p class="text-muted">Ordered list of messages dispatched on the <code>{{ bus }}</code> bus</p>
{{ helper.render_bus_messages(messages) }}
</div>
</div>
{% endfor %}
{% endif %}
{% endblock %}
{% macro render_bus_messages(messages, showBus = false) %}
{% set discr = random() %}
{% for i, dispatchCall in messages %}
<table class="message-item">
<thead>
<tr>
<th colspan="2" class="sf-toggle"
data-toggle-selector="#message-item-{{ discr }}-{{ i }}-details"
data-toggle-initial="{{ loop.first ? 'display' }}"
>
<span class="dump-inline">{{ profiler_dump(dispatchCall.message.type) }}</span>
{% if showBus %}
<span class="label">{{ dispatchCall.bus }}</span>
{% endif %}
{% if dispatchCall.exception is defined %}
<span class="label status-error">exception</span>
{% endif %}
<a class="toggle-button">
<span class="icon icon-close">{{ include('@Twig/images/icon-minus-square.svg') }}</span>
<span class="icon icon-open">{{ include('@Twig/images/icon-plus-square.svg') }}</span>
</a>
</th>
</tr>
</thead>
<tbody id="message-item-{{ discr }}-{{ i }}-details" class="sf-toggle-content">
{% if showBus %}
<tr>
<td class="text-bold">Bus</td>
<td>{{ dispatchCall.bus }}</td>
</tr>
{% endif %}
<tr>
<td class="text-bold">Message</td>
<td>{{ profiler_dump(dispatchCall.message.value, maxDepth=2) }}</td>
</tr>
<tr>
<td class="text-bold">Envelope items</td>
<td>
{% for item in dispatchCall.envelopeItems %}
{{ profiler_dump(item) }}
{% else %}
<span class="text-muted">No items</span>
{% endfor %}
</td>
</tr>
<tr>
<td class="text-bold">Result</td>
<td>
{% if dispatchCall.result is defined %}
{{ profiler_dump(dispatchCall.result.seek('value'), maxDepth=2) }}
{% elseif dispatchCall.exception is defined %}
<span class="text-danger">No result as an exception occurred</span>
{% endif %}
</td>
</tr>
{% if dispatchCall.exception is defined %}
<tr>
<td class="text-bold">Exception</td>
<td>
{{ profiler_dump(dispatchCall.exception.value, maxDepth=1) }}
</td>
</tr>
{% endif %}
</tbody>
</table>
{% endfor %}
{% endmacro %}

View File

@ -215,6 +215,9 @@ table tbody ul {
.text-muted {
color: #999;
}
.text-danger {
color: {{ colors.error|raw }};
}
.text-bold {
font-weight: bold;
}

View File

@ -26,10 +26,10 @@ final class TraceableValueResolver implements ArgumentValueResolverInterface
private $inner;
private $stopwatch;
public function __construct(ArgumentValueResolverInterface $inner, ?Stopwatch $stopwatch = null)
public function __construct(ArgumentValueResolverInterface $inner, Stopwatch $stopwatch)
{
$this->inner = $inner;
$this->stopwatch = $stopwatch ?? new Stopwatch();
$this->stopwatch = $stopwatch;
}
/**

View File

@ -15,7 +15,6 @@ use Symfony\Component\DependencyInjection\Argument\IteratorArgument;
use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface;
use Symfony\Component\DependencyInjection\Compiler\PriorityTaggedServiceTrait;
use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\DependencyInjection\ContainerInterface;
use Symfony\Component\DependencyInjection\Reference;
use Symfony\Component\HttpKernel\Controller\ArgumentResolver\TraceableValueResolver;
use Symfony\Component\Stopwatch\Stopwatch;
@ -31,11 +30,13 @@ class ControllerArgumentValueResolverPass implements CompilerPassInterface
private $argumentResolverService;
private $argumentValueResolverTag;
private $traceableResolverStopwatch;
public function __construct(string $argumentResolverService = 'argument_resolver', string $argumentValueResolverTag = 'controller.argument_value_resolver')
public function __construct(string $argumentResolverService = 'argument_resolver', string $argumentValueResolverTag = 'controller.argument_value_resolver', string $traceableResolverStopwatch = 'debug.stopwatch')
{
$this->argumentResolverService = $argumentResolverService;
$this->argumentValueResolverTag = $argumentValueResolverTag;
$this->traceableResolverStopwatch = $traceableResolverStopwatch;
}
public function process(ContainerBuilder $container)
@ -46,12 +47,12 @@ class ControllerArgumentValueResolverPass implements CompilerPassInterface
$resolvers = $this->findAndSortTaggedServices($this->argumentValueResolverTag, $container);
if ($container->getParameter('kernel.debug') && class_exists(Stopwatch::class)) {
if ($container->getParameter('kernel.debug') && class_exists(Stopwatch::class) && $container->has($this->traceableResolverStopwatch)) {
foreach ($resolvers as $resolverReference) {
$id = (string) $resolverReference;
$container->register("debug.$id", TraceableValueResolver::class)
->setDecoratedService($id)
->setArguments(array(new Reference("debug.$id.inner"), new Reference('debug.stopwatch', ContainerInterface::NULL_ON_INVALID_REFERENCE)));
->setArguments(array(new Reference("debug.$id.inner"), new Reference($this->traceableResolverStopwatch)));
}
}

View File

@ -33,11 +33,13 @@ class RegisterControllerArgumentLocatorsPass implements CompilerPassInterface
{
private $resolverServiceId;
private $controllerTag;
private $controllerLocator;
public function __construct(string $resolverServiceId = 'argument_resolver.service', string $controllerTag = 'controller.service_arguments')
public function __construct(string $resolverServiceId = 'argument_resolver.service', string $controllerTag = 'controller.service_arguments', string $controllerLocator = 'argument_resolver.controller_locator')
{
$this->resolverServiceId = $resolverServiceId;
$this->controllerTag = $controllerTag;
$this->controllerLocator = $controllerLocator;
}
public function process(ContainerBuilder $container)
@ -179,6 +181,8 @@ class RegisterControllerArgumentLocatorsPass implements CompilerPassInterface
}
$container->getDefinition($this->resolverServiceId)
->replaceArgument(0, ServiceLocatorTagPass::register($container, $controllers));
->replaceArgument(0, $controllerLocatorRef = ServiceLocatorTagPass::register($container, $controllers));
$container->setAlias($this->controllerLocator, (string) $controllerLocatorRef);
}
}

View File

@ -21,21 +21,16 @@ use Symfony\Component\DependencyInjection\ContainerBuilder;
*/
class RemoveEmptyControllerArgumentLocatorsPass implements CompilerPassInterface
{
private $resolverServiceId;
private $controllerLocator;
public function __construct(string $resolverServiceId = 'argument_resolver.service')
public function __construct(string $controllerLocator = 'argument_resolver.controller_locator')
{
$this->resolverServiceId = $resolverServiceId;
$this->controllerLocator = $controllerLocator;
}
public function process(ContainerBuilder $container)
{
if (false === $container->hasDefinition($this->resolverServiceId)) {
return;
}
$serviceResolver = $container->getDefinition($this->resolverServiceId);
$controllerLocator = $container->getDefinition((string) $serviceResolver->getArgument(0));
$controllerLocator = $container->findDefinition($this->controllerLocator);
$controllers = $controllerLocator->getArgument(0);
foreach ($controllers as $controller => $argumentRef) {

View File

@ -17,6 +17,7 @@ use Symfony\Component\DependencyInjection\Definition;
use Symfony\Component\DependencyInjection\Reference;
use Symfony\Component\HttpKernel\Controller\ArgumentResolver;
use Symfony\Component\HttpKernel\DependencyInjection\ControllerArgumentValueResolverPass;
use Symfony\Component\Stopwatch\Stopwatch;
class ControllerArgumentValueResolverPassTest extends TestCase
{
@ -52,7 +53,7 @@ class ControllerArgumentValueResolverPassTest extends TestCase
$this->assertFalse($container->hasDefinition('n3.traceable'));
}
public function testInDebug()
public function testInDebugWithStopWatchDefinition()
{
$services = array(
'n3' => array(array()),
@ -68,6 +69,7 @@ class ControllerArgumentValueResolverPassTest extends TestCase
$definition = new Definition(ArgumentResolver::class, array(null, array()));
$container = new ContainerBuilder();
$container->register('debug.stopwatch', Stopwatch::class);
$container->setDefinition('argument_resolver', $definition);
foreach ($services as $id => list($tag)) {
@ -88,6 +90,24 @@ class ControllerArgumentValueResolverPassTest extends TestCase
$this->assertTrue($container->hasDefinition('n3'));
}
public function testInDebugWithouStopWatchDefinition()
{
$expected = array(new Reference('n1'));
$definition = new Definition(ArgumentResolver::class, array(null, array()));
$container = new ContainerBuilder();
$container->register('n1')->addTag('controller.argument_value_resolver');
$container->setDefinition('argument_resolver', $definition);
$container->setParameter('kernel.debug', true);
(new ControllerArgumentValueResolverPass())->process($container);
$this->assertEquals($expected, $definition->getArgument(1)->getValues());
$this->assertFalse($container->hasDefinition('debug.n1'));
$this->assertTrue($container->hasDefinition('n1'));
}
public function testReturningEmptyArrayWhenNoService()
{
$definition = new Definition(ArgumentResolver::class, array(null, array()));

View File

@ -13,12 +13,14 @@ namespace Symfony\Component\Messenger\Asynchronous\Middleware;
use Symfony\Component\Messenger\Asynchronous\Routing\SenderLocatorInterface;
use Symfony\Component\Messenger\Asynchronous\Transport\ReceivedMessage;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\EnvelopeAwareInterface;
use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
/**
* @author Samuel Roze <samuel.roze@gmail.com>
*/
class SendMessageMiddleware implements MiddlewareInterface
class SendMessageMiddleware implements MiddlewareInterface, EnvelopeAwareInterface
{
private $senderLocator;
@ -32,17 +34,19 @@ class SendMessageMiddleware implements MiddlewareInterface
*/
public function handle($message, callable $next)
{
if ($message instanceof ReceivedMessage) {
return $next($message->getMessage());
$envelope = Envelope::wrap($message);
if ($envelope->get(ReceivedMessage::class)) {
// It's a received message. Do not send it back:
return $next($message);
}
if (!empty($senders = $this->senderLocator->getSendersForMessage($message))) {
if (!empty($senders = $this->senderLocator->getSendersForMessage($envelope->getMessage()))) {
foreach ($senders as $sender) {
if (null === $sender) {
continue;
}
$sender->send($message);
$sender->send($envelope);
}
if (!\in_array(null, $senders, true)) {

View File

@ -32,13 +32,29 @@ class SenderLocator implements SenderLocatorInterface
*/
public function getSendersForMessage($message): array
{
$senderIds = $this->messageToSenderIdsMapping[\get_class($message)] ?? $this->messageToSenderIdsMapping['*'] ?? array();
$senders = array();
foreach ($senderIds as $senderId) {
foreach ($this->getSenderIds($message) as $senderId) {
$senders[] = $this->senderServiceLocator->get($senderId);
}
return $senders;
}
private function getSenderIds($message): array
{
if (isset($this->messageToSenderIdsMapping[\get_class($message)])) {
return $this->messageToSenderIdsMapping[\get_class($message)];
}
if ($messageToSenderIdsMapping = array_intersect_key($this->messageToSenderIdsMapping, class_parents($message))) {
return current($messageToSenderIdsMapping);
}
if ($messageToSenderIdsMapping = array_intersect_key($this->messageToSenderIdsMapping, class_implements($message))) {
return current($messageToSenderIdsMapping);
}
if (isset($this->messageToSenderIdsMapping['*'])) {
return $this->messageToSenderIdsMapping['*'];
}
return array();
}
}

View File

@ -12,26 +12,26 @@
namespace Symfony\Component\Messenger\Asynchronous\Transport;
use Symfony\Component\Messenger\Asynchronous\Middleware\SendMessageMiddleware;
use Symfony\Component\Messenger\EnvelopeItemInterface;
/**
* Wraps a received message. This is mainly used by the `SendMessageMiddleware` middleware to identify
* Marker config for a received message.
* This is mainly used by the `SendMessageMiddleware` middleware to identify
* a message should not be sent if it was just received.
*
* @see SendMessageMiddleware
*
* @author Samuel Roze <samuel.roze@gmail.com>
*/
final class ReceivedMessage
final class ReceivedMessage implements EnvelopeItemInterface
{
private $message;
public function __construct($message)
public function serialize()
{
$this->message = $message;
return '';
}
public function getMessage()
public function unserialize($serialized)
{
return $this->message;
// noop
}
}

View File

@ -11,6 +11,7 @@
namespace Symfony\Component\Messenger\Asynchronous\Transport;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\ReceiverInterface;
/**
@ -27,12 +28,12 @@ class WrapIntoReceivedMessage implements ReceiverInterface
public function receive(callable $handler): void
{
$this->decoratedReceiver->receive(function ($message) use ($handler) {
if (null !== $message) {
$message = new ReceivedMessage($message);
$this->decoratedReceiver->receive(function (?Envelope $envelope) use ($handler) {
if (null !== $envelope) {
$envelope = $envelope->with(new ReceivedMessage());
}
$handler($message);
$handler($envelope);
});
}

View File

@ -14,6 +14,7 @@ namespace Symfony\Component\Messenger\Command;
use Psr\Container\ContainerInterface;
use Psr\Log\LoggerInterface;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Exception\RuntimeException;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
@ -22,7 +23,6 @@ use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Transport\Enhancers\StopWhenMemoryUsageIsExceededReceiver;
use Symfony\Component\Messenger\Transport\Enhancers\StopWhenMessageCountIsExceededReceiver;
use Symfony\Component\Messenger\Transport\Enhancers\StopWhenTimeLimitIsReachedReceiver;
use Symfony\Component\Messenger\Transport\ReceiverInterface;
use Symfony\Component\Messenger\Worker;
/**
@ -41,12 +41,12 @@ class ConsumeMessagesCommand extends Command
public function __construct(MessageBusInterface $bus, ContainerInterface $receiverLocator, LoggerInterface $logger = null, string $defaultReceiverName = null)
{
parent::__construct();
$this->bus = $bus;
$this->receiverLocator = $receiverLocator;
$this->logger = $logger;
$this->defaultReceiverName = $defaultReceiverName;
parent::__construct();
}
/**
@ -89,12 +89,10 @@ EOF
protected function execute(InputInterface $input, OutputInterface $output): void
{
if (!$this->receiverLocator->has($receiverName = $input->getArgument('receiver'))) {
throw new \RuntimeException(sprintf('Receiver "%s" does not exist.', $receiverName));
throw new RuntimeException(sprintf('Receiver "%s" does not exist.', $receiverName));
}
if (!($receiver = $this->receiverLocator->get($receiverName)) instanceof ReceiverInterface) {
throw new \RuntimeException(sprintf('Receiver "%s" is not a valid message consumer. It must implement the "%s" interface.', $receiverName, ReceiverInterface::class));
}
$receiver = $this->receiverLocator->get($receiverName);
if ($limit = $input->getOption('limit')) {
$receiver = new StopWhenMessageCountIsExceededReceiver($receiver, $limit, $this->logger);
@ -117,9 +115,9 @@ EOF
$memoryLimit = strtolower($memoryLimit);
$max = strtolower(ltrim($memoryLimit, '+'));
if (0 === strpos($max, '0x')) {
$max = intval($max, 16);
$max = \intval($max, 16);
} elseif (0 === strpos($max, '0')) {
$max = intval($max, 8);
$max = \intval($max, 8);
} else {
$max = (int) $max;
}

View File

@ -16,6 +16,8 @@ use Symfony\Component\HttpFoundation\Response;
use Symfony\Component\HttpKernel\DataCollector\DataCollector;
use Symfony\Component\HttpKernel\DataCollector\LateDataCollectorInterface;
use Symfony\Component\Messenger\TraceableMessageBus;
use Symfony\Component\VarDumper\Caster\ClassStub;
use Symfony\Component\VarDumper\Cloner\Data;
/**
* @author Samuel Roze <samuel.roze@gmail.com>
@ -44,13 +46,25 @@ class MessengerDataCollector extends DataCollector implements LateDataCollectorI
*/
public function lateCollect()
{
$this->data = array('messages' => array());
$this->data = array('messages' => array(), 'buses' => array_keys($this->traceableBuses));
$messages = array();
foreach ($this->traceableBuses as $busName => $bus) {
foreach ($bus->getDispatchedMessages() as $message) {
$this->data['messages'][] = $this->collectMessage($busName, $message);
$debugRepresentation = $this->cloneVar($this->collectMessage($busName, $message));
$messages[] = array($debugRepresentation, $message['callTime']);
}
}
// Order by call time
usort($messages, function (array $a, array $b): int {
return $a[1] > $b[1] ? 1 : -1;
});
// Keep the messages clones only
$this->data['messages'] = array_map(function (array $item): Data {
return $item[0];
}, $messages);
}
/**
@ -78,31 +92,19 @@ class MessengerDataCollector extends DataCollector implements LateDataCollectorI
$debugRepresentation = array(
'bus' => $busName,
'envelopeItems' => $tracedMessage['envelopeItems'] ?? null,
'message' => array(
'type' => \get_class($message),
'object' => $this->cloneVar($message),
'type' => new ClassStub(\get_class($message)),
'value' => $message,
),
);
if (array_key_exists('result', $tracedMessage)) {
$result = $tracedMessage['result'];
if (\is_object($result)) {
$debugRepresentation['result'] = array(
'type' => \get_class($result),
'object' => $this->cloneVar($result),
);
} elseif (\is_array($result)) {
$debugRepresentation['result'] = array(
'type' => 'array',
'object' => $this->cloneVar($result),
);
} else {
$debugRepresentation['result'] = array(
'type' => \gettype($result),
'value' => $result,
);
}
$debugRepresentation['result'] = array(
'type' => \is_object($result) ? \get_class($result) : gettype($result),
'value' => $result,
);
}
if (isset($tracedMessage['exception'])) {
@ -110,15 +112,31 @@ class MessengerDataCollector extends DataCollector implements LateDataCollectorI
$debugRepresentation['exception'] = array(
'type' => \get_class($exception),
'message' => $exception->getMessage(),
'value' => $exception,
);
}
return $debugRepresentation;
}
public function getMessages(): array
public function getExceptionsCount(string $bus = null): int
{
return $this->data['messages'] ?? array();
return array_reduce($this->getMessages($bus), function (int $carry, Data $message) {
return $carry += isset($message['exception']) ? 1 : 0;
}, 0);
}
public function getMessages(string $bus = null): array
{
$messages = $this->data['messages'] ?? array();
return $bus ? array_filter($messages, function (Data $message) use ($bus): bool {
return $bus === $message['bus'];
}) : $messages;
}
public function getBuses(): array
{
return $this->data['buses'];
}
}

View File

@ -23,6 +23,8 @@ use Symfony\Component\Messenger\Handler\ChainHandler;
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;
use Symfony\Component\Messenger\Handler\MessageSubscriberInterface;
use Symfony\Component\Messenger\TraceableMessageBus;
use Symfony\Component\Messenger\Transport\ReceiverInterface;
use Symfony\Component\Messenger\Transport\SenderInterface;
/**
* @author Samuel Roze <samuel.roze@gmail.com>
@ -61,7 +63,7 @@ class MessengerPass implements CompilerPassInterface
}
if ($container->hasDefinition('messenger.data_collector')) {
$this->registerBusToCollector($container, $busId, $tags[0]);
$this->registerBusToCollector($container, $busId);
}
}
@ -72,14 +74,27 @@ class MessengerPass implements CompilerPassInterface
private function registerHandlers(ContainerBuilder $container)
{
$definitions = array();
$handlersByMessage = array();
foreach ($container->findTaggedServiceIds($this->handlerTag, true) as $serviceId => $tags) {
foreach ($tags as $tag) {
$handles = isset($tag['handles']) ? array($tag['handles']) : $this->guessHandledClasses($r = $container->getReflectionClass($container->getDefinition($serviceId)->getClass()), $serviceId);
$r = $container->getReflectionClass($container->getDefinition($serviceId)->getClass());
if (isset($tag['handles'])) {
$handles = isset($tag['method']) ? array($tag['handles'] => $tag['method']) : array($tag['handles']);
} else {
$handles = $this->guessHandledClasses($r, $serviceId);
}
$priority = $tag['priority'] ?? 0;
foreach ($handles as $messageClass) {
foreach ($handles as $messageClass => $method) {
if (\is_int($messageClass)) {
$messageClass = $method;
$method = '__invoke';
}
if (\is_array($messageClass)) {
$messagePriority = $messageClass[1];
$messageClass = $messageClass[0];
@ -87,12 +102,27 @@ class MessengerPass implements CompilerPassInterface
$messagePriority = $priority;
}
if (!class_exists($messageClass)) {
$messageClassLocation = isset($tag['handles']) ? 'declared in your tag attribute "handles"' : sprintf($r->implementsInterface(MessageHandlerInterface::class) ? 'returned by method "%s::getHandledMessages()"' : 'used as argument type in method "%s::__invoke()"', $r->getName());
if (\is_array($method)) {
$messagePriority = $method[1];
$method = $method[0];
}
if (!\class_exists($messageClass)) {
$messageClassLocation = isset($tag['handles']) ? 'declared in your tag attribute "handles"' : $r->implementsInterface(MessageHandlerInterface::class) ? sprintf('returned by method "%s::getHandledMessages()"', $r->getName()) : sprintf('used as argument type in method "%s::%s()"', $r->getName(), $method);
throw new RuntimeException(sprintf('Invalid handler service "%s": message class "%s" %s does not exist.', $serviceId, $messageClass, $messageClassLocation));
}
if (!$r->hasMethod($method)) {
throw new RuntimeException(sprintf('Invalid handler service "%s": method "%s::%s()" does not exist.', $serviceId, $r->getName(), $method));
}
if ('__invoke' !== $method) {
$wrapperDefinition = (new Definition('callable'))->addArgument(array(new Reference($serviceId), $method))->setFactory('Closure::fromCallable');
$definitions[$serviceId = '.messenger.method_on_object_wrapper.'.ContainerBuilder::hash($messageClass.':'.$messagePriority.':'.$serviceId.':'.$method)] = $wrapperDefinition;
}
$handlersByMessage[$messageClass][$messagePriority][] = new Reference($serviceId);
}
}
@ -103,7 +133,6 @@ class MessengerPass implements CompilerPassInterface
$handlersByMessage[$message] = array_merge(...$handlersByMessage[$message]);
}
$definitions = array();
$handlersLocatorMapping = array();
foreach ($handlersByMessage as $message => $handlers) {
if (1 === \count($handlers)) {
@ -167,11 +196,16 @@ class MessengerPass implements CompilerPassInterface
$taggedReceivers = $container->findTaggedServiceIds($this->receiverTag);
foreach ($taggedReceivers as $id => $tags) {
$receiverClass = $container->findDefinition($id)->getClass();
if (!is_subclass_of($receiverClass, ReceiverInterface::class)) {
throw new RuntimeException(sprintf('Invalid receiver "%s": class "%s" must implement interface "%s".', $id, $receiverClass, ReceiverInterface::class));
}
$receiverMapping[$id] = new Reference($id);
foreach ($tags as $tag) {
if (isset($tag['name'])) {
$receiverMapping[$tag['name']] = $receiverMapping[$id];
if (isset($tag['alias'])) {
$receiverMapping[$tag['alias']] = $receiverMapping[$id];
}
}
}
@ -187,11 +221,16 @@ class MessengerPass implements CompilerPassInterface
{
$senderLocatorMapping = array();
foreach ($container->findTaggedServiceIds($this->senderTag) as $id => $tags) {
$senderClass = $container->findDefinition($id)->getClass();
if (!is_subclass_of($senderClass, SenderInterface::class)) {
throw new RuntimeException(sprintf('Invalid sender "%s": class "%s" must implement interface "%s".', $id, $senderClass, SenderInterface::class));
}
$senderLocatorMapping[$id] = new Reference($id);
foreach ($tags as $tag) {
if (isset($tag['name'])) {
$senderLocatorMapping[$tag['name']] = $senderLocatorMapping[$id];
if (isset($tag['alias'])) {
$senderLocatorMapping[$tag['alias']] = $senderLocatorMapping[$id];
}
}
}
@ -199,7 +238,7 @@ class MessengerPass implements CompilerPassInterface
$container->getDefinition('messenger.sender_locator')->replaceArgument(0, $senderLocatorMapping);
}
private function registerBusToCollector(ContainerBuilder $container, string $busId, array $tag)
private function registerBusToCollector(ContainerBuilder $container, string $busId)
{
$container->setDefinition(
$tracedBusId = 'debug.traced.'.$busId,
@ -209,24 +248,37 @@ class MessengerPass implements CompilerPassInterface
$container->getDefinition('messenger.data_collector')->addMethodCall('registerBus', array($busId, new Reference($tracedBusId)));
}
private function registerBusMiddleware(ContainerBuilder $container, string $busId, array $middleware)
private function registerBusMiddleware(ContainerBuilder $container, string $busId, array $middlewareCollection)
{
$container->getDefinition($busId)->replaceArgument(0, array_map(function (string $name) use ($container, $busId) {
if (!$container->has($messengerMiddlewareId = 'messenger.middleware.'.$name)) {
$messengerMiddlewareId = $name;
$middlewareReferences = array();
foreach ($middlewareCollection as $middlewareItem) {
$id = $middlewareItem['id'];
$arguments = $middlewareItem['arguments'] ?? array();
if (!$container->has($messengerMiddlewareId = 'messenger.middleware.'.$id)) {
$messengerMiddlewareId = $id;
}
if (!$container->has($messengerMiddlewareId)) {
throw new RuntimeException(sprintf('Invalid middleware "%s": define such service to be able to use it.', $name));
throw new RuntimeException(sprintf('Invalid middleware "%s": define such service to be able to use it.', $id));
}
if ($container->getDefinition($messengerMiddlewareId)->isAbstract()) {
if (($definition = $container->findDefinition($messengerMiddlewareId))->isAbstract()) {
$childDefinition = new ChildDefinition($messengerMiddlewareId);
$count = \count($definition->getArguments());
foreach (array_values($arguments ?? array()) as $key => $argument) {
// Parent definition can provide default arguments.
// Replace each explicitly or add if not set:
$key < $count ? $childDefinition->replaceArgument($key, $argument) : $childDefinition->addArgument($argument);
}
$container->setDefinition($messengerMiddlewareId = $busId.'.middleware.'.$name, $childDefinition);
$container->setDefinition($messengerMiddlewareId = $busId.'.middleware.'.$id, $childDefinition);
} elseif ($arguments) {
throw new RuntimeException(sprintf('Invalid middleware factory "%s": a middleware factory must be an abstract definition.', $id));
}
return new Reference($messengerMiddlewareId);
}, $middleware));
$middlewareReferences[] = new Reference($messengerMiddlewareId);
}
$container->getDefinition($busId)->replaceArgument(0, $middlewareReferences);
}
}

View File

@ -0,0 +1,89 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger;
/**
* A message wrapped in an envelope with items (configurations, markers, ...).
*
* @author Maxime Steinhausser <maxime.steinhausser@gmail.com>
*
* @experimental in 4.1
*/
final class Envelope
{
private $items = array();
private $message;
/**
* @param object $message
* @param EnvelopeItemInterface[] $items
*/
public function __construct($message, array $items = array())
{
$this->message = $message;
foreach ($items as $item) {
$this->items[\get_class($item)] = $item;
}
}
/**
* Wrap a message into an envelope if not already wrapped.
*
* @param Envelope|object $message
*/
public static function wrap($message): self
{
return $message instanceof self ? $message : new self($message);
}
/**
* @return Envelope a new Envelope instance with additional item
*/
public function with(EnvelopeItemInterface $item): self
{
$cloned = clone $this;
$cloned->items[\get_class($item)] = $item;
return $cloned;
}
public function withMessage($message): self
{
$cloned = clone $this;
$cloned->message = $message;
return $cloned;
}
public function get(string $itemFqcn): ?EnvelopeItemInterface
{
return $this->items[$itemFqcn] ?? null;
}
/**
* @return EnvelopeItemInterface[] indexed by fqcn
*/
public function all(): array
{
return $this->items;
}
/**
* @return object The original message contained in the envelope
*/
public function getMessage()
{
return $this->message;
}
}

View File

@ -0,0 +1,23 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger;
/**
* A Messenger protagonist aware of the message envelope and its content.
*
* @author Maxime Steinhausser <maxime.steinhausser@gmail.com>
*
* @experimental in 4.1
*/
interface EnvelopeAwareInterface
{
}

View File

@ -0,0 +1,24 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger;
/**
* An envelope item related to a message.
* This item must be serializable for transport.
*
* @author Maxime Steinhausser <maxime.steinhausser@gmail.com>
*
* @experimental in 4.1
*/
interface EnvelopeItemInterface extends \Serializable
{
}

View File

@ -34,9 +34,14 @@ interface MessageSubscriberInterface extends MessageHandlerInterface
* [SecondMessage::class, -10],
* ];
*
* The `__invoke` method of the handler will be called as usual with the message to handle.
* It can also specify a method and/or a priority per message:
*
* @return array
* return [
* FirstMessage::class => 'firstMessageMethod',
* SecondMessage::class => ['secondMessageMethod', 20],
* ];
*
* The `__invoke` method of the handler will be called as usual with the message to handle.
*/
public static function getHandledMessages(): array;
public static function getHandledMessages(): iterable;
}

View File

@ -44,10 +44,10 @@ class MessageBus implements MessageBusInterface
throw new InvalidArgumentException(sprintf('Invalid type for message argument. Expected object, but got "%s".', \gettype($message)));
}
return \call_user_func($this->callableForNextMiddleware(0), $message);
return \call_user_func($this->callableForNextMiddleware(0, Envelope::wrap($message)), $message);
}
private function callableForNextMiddleware(int $index): callable
private function callableForNextMiddleware(int $index, Envelope $currentEnvelope): callable
{
if (null === $this->indexedMiddlewareHandlers) {
$this->indexedMiddlewareHandlers = \is_array($this->middlewareHandlers) ? array_values($this->middlewareHandlers) : iterator_to_array($this->middlewareHandlers, false);
@ -59,8 +59,19 @@ class MessageBus implements MessageBusInterface
$middleware = $this->indexedMiddlewareHandlers[$index];
return function ($message) use ($middleware, $index) {
return $middleware->handle($message, $this->callableForNextMiddleware($index + 1));
return function ($message) use ($middleware, $index, $currentEnvelope) {
if ($message instanceof Envelope) {
$currentEnvelope = $message;
} else {
$message = $currentEnvelope->withMessage($message);
}
if (!$middleware instanceof EnvelopeAwareInterface) {
// Do not provide the envelope if the middleware cannot read it:
$message = $message->getMessage();
}
return $middleware->handle($message, $this->callableForNextMiddleware($index + 1, $currentEnvelope));
};
}
}

View File

@ -23,7 +23,7 @@ interface MessageBusInterface
*
* The bus can return a value coming from handlers, but is not required to do so.
*
* @param object $message
* @param object|Envelope $message The message or the message pre-wrapped in an envelope
*
* @return mixed
*/

View File

@ -0,0 +1,58 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Middleware\Configuration;
use Symfony\Component\Messenger\EnvelopeItemInterface;
use Symfony\Component\Validator\Constraints\GroupSequence;
/**
* @author Maxime Steinhausser <maxime.steinhausser@gmail.com>
*
* @experimental in 4.1
*/
final class ValidationConfiguration implements EnvelopeItemInterface
{
private $groups;
/**
* @param string[]|GroupSequence $groups
*/
public function __construct($groups)
{
$this->groups = $groups;
}
public function getGroups()
{
return $this->groups;
}
public function serialize()
{
$isGroupSequence = $this->groups instanceof GroupSequence;
return serialize(array(
'groups' => $isGroupSequence ? $this->groups->groups : $this->groups,
'is_group_sequence' => $isGroupSequence,
));
}
public function unserialize($serialized)
{
list(
'groups' => $groups,
'is_group_sequence' => $isGroupSequence
) = unserialize($serialized, array('allowed_classes' => false));
$this->__construct($isGroupSequence ? new GroupSequence($groups) : $groups);
}
}

View File

@ -11,7 +11,6 @@
namespace Symfony\Component\Messenger\Middleware;
use Symfony\Component\Messenger\Asynchronous\Transport\ReceivedMessage;
use Psr\Log\LoggerInterface;
/**
@ -51,10 +50,6 @@ class LoggingMiddleware implements MiddlewareInterface
private function createContext($message): array
{
if ($message instanceof ReceivedMessage) {
$message = $message->getMessage();
}
return array(
'message' => $message,
'class' => \get_class($message),

View File

@ -11,13 +11,16 @@
namespace Symfony\Component\Messenger\Middleware;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\EnvelopeAwareInterface;
use Symfony\Component\Messenger\Exception\ValidationFailedException;
use Symfony\Component\Messenger\Middleware\Configuration\ValidationConfiguration;
use Symfony\Component\Validator\Validator\ValidatorInterface;
/**
* @author Tobias Nyholm <tobias.nyholm@gmail.com>
*/
class ValidationMiddleware implements MiddlewareInterface
class ValidationMiddleware implements MiddlewareInterface, EnvelopeAwareInterface
{
private $validator;
@ -28,9 +31,17 @@ class ValidationMiddleware implements MiddlewareInterface
public function handle($message, callable $next)
{
$violations = $this->validator->validate($message);
$envelope = Envelope::wrap($message);
$subject = $envelope->getMessage();
$groups = null;
/** @var ValidationConfiguration|null $validationConfig */
if ($validationConfig = $envelope->get(ValidationConfiguration::class)) {
$groups = $validationConfig->getGroups();
}
$violations = $this->validator->validate($subject, null, $groups);
if (\count($violations)) {
throw new ValidationFailedException($message, $violations);
throw new ValidationFailedException($subject, $violations);
}
return $next($message);

View File

@ -15,6 +15,7 @@ use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Asynchronous\Middleware\SendMessageMiddleware;
use Symfony\Component\Messenger\Asynchronous\Routing\SenderLocatorInterface;
use Symfony\Component\Messenger\Asynchronous\Transport\ReceivedMessage;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\SenderInterface;
@ -30,12 +31,28 @@ class SendMessageMiddlewareTest extends TestCase
$sender,
)));
$sender->expects($this->once())->method('send')->with($message);
$sender->expects($this->once())->method('send')->with(Envelope::wrap($message));
$next->expects($this->never())->method($this->anything());
$middleware->handle($message, $next);
}
public function testItSendsTheMessageToAssignedSenderWithPreWrappedMessage()
{
$envelope = Envelope::wrap(new DummyMessage('Hey'));
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
$next = $this->createPartialMock(\stdClass::class, array('__invoke'));
$middleware = new SendMessageMiddleware(new InMemorySenderLocator(array(
$sender,
)));
$sender->expects($this->once())->method('send')->with($envelope);
$next->expects($this->never())->method($this->anything());
$middleware->handle($envelope, $next);
}
public function testItAlsoCallsTheNextMiddlewareIfASenderIsNull()
{
$message = new DummyMessage('Hey');
@ -47,7 +64,7 @@ class SendMessageMiddlewareTest extends TestCase
null,
)));
$sender->expects($this->once())->method('send')->with($message);
$sender->expects($this->once())->method('send')->with(Envelope::wrap($message));
$next->expects($this->once())->method($this->anything());
$middleware->handle($message, $next);
@ -67,8 +84,7 @@ class SendMessageMiddlewareTest extends TestCase
public function testItSkipsReceivedMessages()
{
$innerMessage = new DummyMessage('Hey');
$message = new ReceivedMessage($innerMessage);
$envelope = Envelope::wrap(new DummyMessage('Hey'))->with(new ReceivedMessage());
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
$next = $this->createPartialMock(\stdClass::class, array('__invoke'));
@ -78,9 +94,9 @@ class SendMessageMiddlewareTest extends TestCase
)));
$sender->expects($this->never())->method('send');
$next->expects($this->once())->method('__invoke')->with($innerMessage);
$next->expects($this->once())->method('__invoke')->with($envelope);
$middleware->handle($message, $next);
$middleware->handle($envelope, $next);
}
}

View File

@ -15,6 +15,7 @@ use PHPUnit\Framework\TestCase;
use Symfony\Component\DependencyInjection\Container;
use Symfony\Component\Messenger\Asynchronous\Routing\SenderLocator;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessageInterface;
use Symfony\Component\Messenger\Tests\Fixtures\SecondMessage;
use Symfony\Component\Messenger\Transport\SenderInterface;
@ -32,8 +33,47 @@ class SenderLocatorTest extends TestCase
),
));
$this->assertEquals(array($sender), $locator->getSendersForMessage(new DummyMessage('Hello')));
$this->assertEquals(array(), $locator->getSendersForMessage(new SecondMessage()));
$this->assertSame(array($sender), $locator->getSendersForMessage(new DummyMessage('Hello')));
$this->assertSame(array(), $locator->getSendersForMessage(new SecondMessage()));
}
public function testItReturnsTheSenderBasedOnTheMessageParentClass()
{
$container = new Container();
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
$container->set('my_amqp_sender', $sender);
$apiSender = $this->getMockBuilder(SenderInterface::class)->getMock();
$container->set('my_api_sender', $apiSender);
$locator = new SenderLocator($container, array(
DummyMessageInterface::class => array(
'my_api_sender',
),
DummyMessage::class => array(
'my_amqp_sender',
),
));
$this->assertSame(array($sender), $locator->getSendersForMessage(new ChildDummyMessage('Hello')));
$this->assertSame(array(), $locator->getSendersForMessage(new SecondMessage()));
}
public function testItReturnsTheSenderBasedOnTheMessageInterface()
{
$container = new Container();
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
$container->set('my_amqp_sender', $sender);
$locator = new SenderLocator($container, array(
DummyMessageInterface::class => array(
'my_amqp_sender',
),
));
$this->assertSame(array($sender), $locator->getSendersForMessage(new DummyMessage('Hello')));
$this->assertSame(array(), $locator->getSendersForMessage(new SecondMessage()));
}
public function testItSupportsAWildcardInsteadOfTheMessageClass()
@ -55,7 +95,11 @@ class SenderLocatorTest extends TestCase
),
));
$this->assertEquals(array($sender), $locator->getSendersForMessage(new DummyMessage('Hello')));
$this->assertEquals(array($apiSender), $locator->getSendersForMessage(new SecondMessage()));
$this->assertSame(array($sender), $locator->getSendersForMessage(new DummyMessage('Hello')));
$this->assertSame(array($apiSender), $locator->getSendersForMessage(new SecondMessage()));
}
}
class ChildDummyMessage extends DummyMessage
{
}

View File

@ -0,0 +1,30 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Tests\Asynchronous\Serialization;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Transport\Serialization\SerializerConfiguration;
use Symfony\Component\Serializer\Normalizer\ObjectNormalizer;
/**
* @author Maxime Steinhausser <maxime.steinhausser@gmail.com>
*/
class SerializerConfigurationTest extends TestCase
{
public function testSerialiazable()
{
$config = new SerializerConfiguration(array(ObjectNormalizer::GROUPS => array('Default', 'Extra')));
$this->assertTrue(is_subclass_of(SerializerConfiguration::class, \Serializable::class, true));
$this->assertEquals($config, unserialize(serialize($config)));
}
}

View File

@ -0,0 +1,36 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Tests\Command;
use PHPUnit\Framework\TestCase;
use Symfony\Component\DependencyInjection\ServiceLocator;
use Symfony\Component\Messenger\Command\ConsumeMessagesCommand;
use Symfony\Component\Messenger\MessageBus;
class ConsumeMessagesCommandTest extends TestCase
{
public function testConfigurationWithDefaultReceiver()
{
$command = new ConsumeMessagesCommand($this->createMock(MessageBus::class), $this->createMock(ServiceLocator::class), null, 'messenger.transport.amqp');
$inputArgument = $command->getDefinition()->getArgument('receiver');
$this->assertFalse($inputArgument->isRequired());
$this->assertSame('messenger.transport.amqp', $inputArgument->getDefault());
}
public function testConfigurationWithoutDefaultReceiver()
{
$command = new ConsumeMessagesCommand($this->createMock(MessageBus::class), $this->createMock(ServiceLocator::class));
$inputArgument = $command->getDefinition()->getArgument('receiver');
$this->assertTrue($inputArgument->isRequired());
$this->assertNull($inputArgument->getDefault());
}
}

View File

@ -16,14 +16,22 @@ use Symfony\Component\Messenger\DataCollector\MessengerDataCollector;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\TraceableMessageBus;
use Symfony\Component\VarDumper\Test\VarDumperTestTrait;
use Symfony\Component\VarDumper\Cloner\Data;
use Symfony\Component\VarDumper\Dumper\CliDumper;
/**
* @author Maxime Steinhausser <maxime.steinhausser@gmail.com>
*/
class MessengerDataCollectorTest extends TestCase
{
use VarDumperTestTrait;
/** @var CliDumper */
private $dumper;
protected function setUp()
{
$this->dumper = new CliDumper();
$this->dumper->setColors(false);
}
/**
* @dataProvider getHandleTestData
@ -46,17 +54,18 @@ class MessengerDataCollectorTest extends TestCase
$messages = $collector->getMessages();
$this->assertCount(1, $messages);
$this->assertDumpMatchesFormat($expected, $messages[0]);
$this->assertStringMatchesFormat($expected, $this->getDataAsString($messages[0]));
}
public function getHandleTestData()
{
$messageDump = <<<DUMP
"bus" => "default"
"envelopeItems" => null
"message" => array:2 [
"type" => "Symfony\Component\Messenger\Tests\Fixtures\DummyMessage"
"object" => Symfony\Component\VarDumper\Cloner\Data {%A
%A+class: "Symfony\Component\Messenger\Tests\Fixtures\DummyMessage"%A
"value" => Symfony\Component\Messenger\Tests\Fixtures\DummyMessage %A
-message: "dummy message"
}
]
DUMP;
@ -64,7 +73,7 @@ DUMP;
yield 'no returned value' => array(
null,
<<<DUMP
array:3 [
array:4 [
$messageDump
"result" => array:2 [
"type" => "NULL"
@ -77,7 +86,7 @@ DUMP
yield 'scalar returned value' => array(
'returned value',
<<<DUMP
array:3 [
array:4 [
$messageDump
"result" => array:2 [
"type" => "string"
@ -90,11 +99,13 @@ DUMP
yield 'array returned value' => array(
array('returned value'),
<<<DUMP
array:3 [
array:4 [
$messageDump
"result" => array:2 [
"type" => "array"
"object" => Symfony\Component\VarDumper\Cloner\Data {%A
"value" => array:1 [
0 => "returned value"
]
]
]
DUMP
@ -123,21 +134,66 @@ DUMP
$messages = $collector->getMessages();
$this->assertCount(1, $messages);
$this->assertDumpMatchesFormat(<<<DUMP
array:3 [
$this->assertStringMatchesFormat(<<<DUMP
array:4 [
"bus" => "default"
"envelopeItems" => null
"message" => array:2 [
"type" => "Symfony\Component\Messenger\Tests\Fixtures\DummyMessage"
"object" => Symfony\Component\VarDumper\Cloner\Data {%A
%A+class: "Symfony\Component\Messenger\Tests\Fixtures\DummyMessage"%A
"value" => Symfony\Component\Messenger\Tests\Fixtures\DummyMessage %A
-message: "dummy message"
}
]
"exception" => array:2 [
"type" => "RuntimeException"
"message" => "foo"
"value" => RuntimeException %A
]
]
]
DUMP
, $messages[0]);
, $this->getDataAsString($messages[0]));
}
public function testKeepsOrderedDispatchCalls()
{
$firstBus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
$firstBus = new TraceableMessageBus($firstBus);
$secondBus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
$secondBus = new TraceableMessageBus($secondBus);
$collector = new MessengerDataCollector();
$collector->registerBus('first bus', $firstBus);
$collector->registerBus('second bus', $secondBus);
$firstBus->dispatch(new DummyMessage('#1'));
$secondBus->dispatch(new DummyMessage('#2'));
$secondBus->dispatch(new DummyMessage('#3'));
$firstBus->dispatch(new DummyMessage('#4'));
$secondBus->dispatch(new DummyMessage('#5'));
$collector->lateCollect();
$messages = $collector->getMessages();
$this->assertCount(5, $messages);
$this->assertSame('#1', $messages[0]['message']['value']['message']);
$this->assertSame('first bus', $messages[0]['bus']);
$this->assertSame('#2', $messages[1]['message']['value']['message']);
$this->assertSame('second bus', $messages[1]['bus']);
$this->assertSame('#3', $messages[2]['message']['value']['message']);
$this->assertSame('second bus', $messages[2]['bus']);
$this->assertSame('#4', $messages[3]['message']['value']['message']);
$this->assertSame('first bus', $messages[3]['bus']);
$this->assertSame('#5', $messages[4]['message']['value']['message']);
$this->assertSame('second bus', $messages[4]['bus']);
}
private function getDataAsString(Data $data): string
{
return rtrim($this->dumper->dump($data, true));
}
}

View File

@ -13,10 +13,12 @@ namespace Symfony\Component\Messenger\Tests\DependencyInjection;
use PHPUnit\Framework\TestCase;
use Symfony\Component\DependencyInjection\Argument\ServiceClosureArgument;
use Symfony\Component\DependencyInjection\Compiler\ResolveChildDefinitionsPass;
use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\DependencyInjection\Reference;
use Symfony\Component\DependencyInjection\ServiceLocator;
use Symfony\Component\Messenger\Command\ConsumeMessagesCommand;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceiver;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpSender;
use Symfony\Component\Messenger\Handler\Locator\ContainerHandlerLocator;
@ -97,10 +99,57 @@ class MessengerPassTest extends TestCase
$this->assertEquals(array(new Reference(PrioritizedHandler::class), new Reference(HandlerWithMultipleMessages::class)), $definition->getArgument(0));
}
public function testGetClassesAndMethodsAndPrioritiesFromTheSubscriber()
{
$container = $this->getContainerBuilder();
$container
->register(HandlerMappingMethods::class, HandlerMappingMethods::class)
->addTag('messenger.message_handler')
;
$container
->register(PrioritizedHandler::class, PrioritizedHandler::class)
->addTag('messenger.message_handler')
;
(new MessengerPass())->process($container);
$handlerLocatorDefinition = $container->getDefinition($container->getDefinition('messenger.handler_resolver')->getArgument(0));
$handlerMapping = $handlerLocatorDefinition->getArgument(0);
$this->assertArrayHasKey('handler.'.DummyMessage::class, $handlerMapping);
$this->assertArrayHasKey('handler.'.SecondMessage::class, $handlerMapping);
$dummyHandlerReference = (string) $handlerMapping['handler.'.DummyMessage::class]->getValues()[0];
$dummyHandlerDefinition = $container->getDefinition($dummyHandlerReference);
$this->assertSame('callable', $dummyHandlerDefinition->getClass());
$this->assertEquals(array(new Reference(HandlerMappingMethods::class), 'dummyMethod'), $dummyHandlerDefinition->getArgument(0));
$this->assertSame(array('Closure', 'fromCallable'), $dummyHandlerDefinition->getFactory());
$secondHandlerReference = (string) $handlerMapping['handler.'.SecondMessage::class]->getValues()[0];
$secondHandlerDefinition = $container->getDefinition($secondHandlerReference);
$this->assertSame(ChainHandler::class, $secondHandlerDefinition->getClass());
$this->assertEquals(new Reference(PrioritizedHandler::class), $secondHandlerDefinition->getArgument(0)[1]);
}
/**
* @expectedException \Symfony\Component\DependencyInjection\Exception\RuntimeException
* @expectedExceptionMessage Invalid handler service "Symfony\Component\Messenger\Tests\DependencyInjection\HandlerMappingWithNonExistentMethod": method "Symfony\Component\Messenger\Tests\DependencyInjection\HandlerMappingWithNonExistentMethod::dummyMethod()" does not exist.
*/
public function testThrowsExceptionIfTheHandlerMethodDoesNotExist()
{
$container = $this->getContainerBuilder();
$container
->register(HandlerMappingWithNonExistentMethod::class, HandlerMappingWithNonExistentMethod::class)
->addTag('messenger.message_handler')
;
(new MessengerPass())->process($container);
}
public function testItRegistersReceivers()
{
$container = $this->getContainerBuilder();
$container->register(AmqpReceiver::class, AmqpReceiver::class)->addTag('messenger.receiver', array('name' => 'amqp'));
$container->register(AmqpReceiver::class, AmqpReceiver::class)->addTag('messenger.receiver', array('alias' => 'amqp'));
(new MessengerPass())->process($container);
@ -127,7 +176,7 @@ class MessengerPassTest extends TestCase
null,
));
$container->register(AmqpReceiver::class, AmqpReceiver::class)->addTag('messenger.receiver', array('name' => 'amqp'));
$container->register(AmqpReceiver::class, AmqpReceiver::class)->addTag('messenger.receiver', array('alias' => 'amqp'));
(new MessengerPass())->process($container);
@ -144,8 +193,8 @@ class MessengerPassTest extends TestCase
null,
));
$container->register(AmqpReceiver::class, AmqpReceiver::class)->addTag('messenger.receiver', array('name' => 'amqp'));
$container->register(DummyReceiver::class, DummyReceiver::class)->addTag('messenger.receiver', array('name' => 'dummy'));
$container->register(AmqpReceiver::class, AmqpReceiver::class)->addTag('messenger.receiver', array('alias' => 'amqp'));
$container->register(DummyReceiver::class, DummyReceiver::class)->addTag('messenger.receiver', array('alias' => 'dummy'));
(new MessengerPass())->process($container);
@ -155,7 +204,7 @@ class MessengerPassTest extends TestCase
public function testItRegistersSenders()
{
$container = $this->getContainerBuilder();
$container->register(AmqpSender::class, AmqpSender::class)->addTag('messenger.sender', array('name' => 'amqp'));
$container->register(AmqpSender::class, AmqpSender::class)->addTag('messenger.sender', array('alias' => 'amqp'));
(new MessengerPass())->process($container);
@ -172,6 +221,19 @@ class MessengerPassTest extends TestCase
$this->assertEquals(array(AmqpSender::class => new Reference(AmqpSender::class)), $container->getDefinition('messenger.sender_locator')->getArgument(0));
}
/**
* @expectedException \Symfony\Component\DependencyInjection\Exception\RuntimeException
* @expectedExceptionMessage Invalid sender "app.messenger.sender": class "Symfony\Component\Messenger\Tests\DependencyInjection\InvalidSender" must implement interface "Symfony\Component\Messenger\Transport\SenderInterface".
*/
public function testItDoesNotRegisterInvalidSender()
{
$container = $this->getContainerBuilder();
$container->register('app.messenger.sender', InvalidSender::class)
->addTag('messenger.sender');
(new MessengerPass())->process($container);
}
/**
* @expectedException \Symfony\Component\DependencyInjection\Exception\RuntimeException
* @expectedExceptionMessage Invalid handler service "Symfony\Component\Messenger\Tests\DependencyInjection\UndefinedMessageHandler": message class "Symfony\Component\Messenger\Tests\DependencyInjection\UndefinedMessage" used as argument type in method "Symfony\Component\Messenger\Tests\DependencyInjection\UndefinedMessageHandler::__invoke()" does not exist.
@ -298,14 +360,42 @@ class MessengerPassTest extends TestCase
$container = $this->getContainerBuilder();
$container->register($fooBusId = 'messenger.bus.foo', MessageBusInterface::class)->setArgument(0, array())->addTag('messenger.bus');
$container->register('messenger.middleware.allow_no_handler', AllowNoHandlerMiddleware::class)->setAbstract(true);
$container->register('middleware_with_factory', UselessMiddleware::class)->addArgument('some_default')->setAbstract(true);
$container->register('middleware_with_factory_using_default', UselessMiddleware::class)->addArgument('some_default')->setAbstract(true);
$container->register(UselessMiddleware::class, UselessMiddleware::class);
$container->setParameter($middlewareParameter = $fooBusId.'.middleware', array(UselessMiddleware::class, 'allow_no_handler'));
$container->setParameter($middlewareParameter = $fooBusId.'.middleware', array(
array('id' => UselessMiddleware::class),
array('id' => 'middleware_with_factory', 'arguments' => array('foo', 'bar')),
array('id' => 'middleware_with_factory_using_default'),
array('id' => 'allow_no_handler'),
));
(new MessengerPass())->process($container);
(new ResolveChildDefinitionsPass())->process($container);
$this->assertTrue($container->hasDefinition($childMiddlewareId = $fooBusId.'.middleware.allow_no_handler'));
$this->assertEquals(array(new Reference(UselessMiddleware::class), new Reference($childMiddlewareId)), $container->getDefinition($fooBusId)->getArgument(0));
$this->assertTrue($container->hasDefinition($factoryChildMiddlewareId = $fooBusId.'.middleware.middleware_with_factory'));
$this->assertEquals(
array('foo', 'bar'),
$container->getDefinition($factoryChildMiddlewareId)->getArguments(),
'parent default argument is overridden, and next ones appended'
);
$this->assertTrue($container->hasDefinition($factoryWithDefaultChildMiddlewareId = $fooBusId.'.middleware.middleware_with_factory_using_default'));
$this->assertEquals(
array('some_default'),
$container->getDefinition($factoryWithDefaultChildMiddlewareId)->getArguments(),
'parent default argument is used'
);
$this->assertEquals(array(
new Reference(UselessMiddleware::class),
new Reference($factoryChildMiddlewareId),
new Reference($factoryWithDefaultChildMiddlewareId),
new Reference($childMiddlewareId),
), $container->getDefinition($fooBusId)->getArgument(0));
$this->assertFalse($container->hasParameter($middlewareParameter));
}
@ -317,7 +407,25 @@ class MessengerPassTest extends TestCase
{
$container = $this->getContainerBuilder();
$container->register($fooBusId = 'messenger.bus.foo', MessageBusInterface::class)->setArgument(0, array())->addTag('messenger.bus');
$container->setParameter($middlewareParameter = $fooBusId.'.middleware', array('not_defined_middleware'));
$container->setParameter($middlewareParameter = $fooBusId.'.middleware', array(
array('id' => 'not_defined_middleware', 'arguments' => array()),
));
(new MessengerPass())->process($container);
}
/**
* @expectedException \Symfony\Component\DependencyInjection\Exception\RuntimeException
* @expectedExceptionMessage Invalid middleware factory "not_an_abstract_definition": a middleware factory must be an abstract definition.
*/
public function testMiddlewareFactoryDefinitionMustBeAbstract()
{
$container = $this->getContainerBuilder();
$container->register('not_an_abstract_definition', UselessMiddleware::class);
$container->register($fooBusId = 'messenger.bus.foo', MessageBusInterface::class)->setArgument(0, array())->addTag('messenger.bus', array('name' => 'foo'));
$container->setParameter($middlewareParameter = $fooBusId.'.middleware', array(
array('id' => 'not_an_abstract_definition', 'arguments' => array('foo')),
));
(new MessengerPass())->process($container);
}
@ -357,7 +465,7 @@ class DummyReceiver implements ReceiverInterface
public function receive(callable $handler): void
{
for ($i = 0; $i < 3; ++$i) {
$handler(new DummyMessage("Dummy $i"));
$handler(Envelope::wrap(new DummyMessage("Dummy $i")));
}
}
@ -366,6 +474,14 @@ class DummyReceiver implements ReceiverInterface
}
}
class InvalidReceiver
{
}
class InvalidSender
{
}
class UndefinedMessageHandler
{
public function __invoke(UndefinedMessage $message)
@ -375,7 +491,7 @@ class UndefinedMessageHandler
class UndefinedMessageHandlerViaInterface implements MessageSubscriberInterface
{
public static function getHandledMessages(): array
public static function getHandledMessages(): iterable
{
return array(UndefinedMessage::class);
}
@ -412,31 +528,72 @@ class BuiltinArgumentTypeHandler
class HandlerWithMultipleMessages implements MessageSubscriberInterface
{
public static function getHandledMessages(): array
public static function getHandledMessages(): iterable
{
return array(
DummyMessage::class,
SecondMessage::class,
);
}
public function __invoke()
{
}
}
class PrioritizedHandler implements MessageSubscriberInterface
{
public static function getHandledMessages(): array
public static function getHandledMessages(): iterable
{
return array(
array(SecondMessage::class, 10),
);
}
public function __invoke()
{
}
}
class HandlerMappingMethods implements MessageSubscriberInterface
{
public static function getHandledMessages(): iterable
{
return array(
DummyMessage::class => 'dummyMethod',
SecondMessage::class => array('secondMessage', 20),
);
}
public function dummyMethod()
{
}
public function secondMessage()
{
}
}
class HandlerMappingWithNonExistentMethod implements MessageSubscriberInterface
{
public static function getHandledMessages(): iterable
{
return array(
DummyMessage::class => 'dummyMethod',
);
}
}
class HandleNoMessageHandler implements MessageSubscriberInterface
{
public static function getHandledMessages(): array
public static function getHandledMessages(): iterable
{
return array();
}
public function __invoke()
{
}
}
class UselessMiddleware implements MiddlewareInterface

View File

@ -0,0 +1,82 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Tests;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Asynchronous\Transport\ReceivedMessage;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\EnvelopeAwareInterface;
use Symfony\Component\Messenger\Middleware\Configuration\ValidationConfiguration;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
/**
* @author Maxime Steinhausser <maxime.steinhausser@gmail.com>
*/
class EnvelopeTest extends TestCase
{
public function testConstruct()
{
$envelope = new Envelope($dummy = new DummyMessage('dummy'), array(
$receivedConfig = new ReceivedMessage(),
));
$this->assertSame($dummy, $envelope->getMessage());
$this->assertArrayHasKey(ReceivedMessage::class, $configs = $envelope->all());
$this->assertSame($receivedConfig, $configs[ReceivedMessage::class]);
}
public function testWrap()
{
$first = Envelope::wrap($dummy = new DummyMessage('dummy'));
$this->assertInstanceOf(Envelope::class, $first);
$this->assertSame($dummy, $first->getMessage());
$envelope = Envelope::wrap($first);
$this->assertSame($first, $envelope);
}
public function testWithReturnsNewInstance()
{
$envelope = Envelope::wrap($dummy = new DummyMessage('dummy'));
$this->assertNotSame($envelope, $envelope->with(new ReceivedMessage()));
}
public function testGet()
{
$envelope = Envelope::wrap($dummy = new DummyMessage('dummy'))
->with($config = new ReceivedMessage())
;
$this->assertSame($config, $envelope->get(ReceivedMessage::class));
$this->assertNull($envelope->get(ValidationConfiguration::class));
}
public function testAll()
{
$envelope = Envelope::wrap($dummy = new DummyMessage('dummy'))
->with($receivedConfig = new ReceivedMessage())
->with($validationConfig = new ValidationConfiguration(array('foo')))
;
$configs = $envelope->all();
$this->assertArrayHasKey(ReceivedMessage::class, $configs);
$this->assertSame($receivedConfig, $configs[ReceivedMessage::class]);
$this->assertArrayHasKey(ValidationConfiguration::class, $configs);
$this->assertSame($validationConfig, $configs[ValidationConfiguration::class]);
}
}
class FooConfigurationConsumer implements EnvelopeAwareInterface
{
}

View File

@ -0,0 +1,33 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Tests\Fixtures;
use Symfony\Component\Messenger\EnvelopeItemInterface;
class AnEnvelopeItem implements EnvelopeItemInterface
{
/**
* {@inheritdoc}
*/
public function serialize()
{
return '';
}
/**
* {@inheritdoc}
*/
public function unserialize($serialized)
{
// noop
}
}

View File

@ -2,7 +2,7 @@
namespace Symfony\Component\Messenger\Tests\Fixtures;
class DummyMessage
class DummyMessage implements DummyMessageInterface
{
private $message;

View File

@ -0,0 +1,7 @@
<?php
namespace Symfony\Component\Messenger\Tests\Fixtures;
interface DummyMessageInterface
{
}

View File

@ -12,9 +12,13 @@
namespace Symfony\Component\Messenger\Tests;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Asynchronous\Transport\ReceivedMessage;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\EnvelopeAwareInterface;
use Symfony\Component\Messenger\MessageBus;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
use Symfony\Component\Messenger\Tests\Fixtures\AnEnvelopeItem;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
class MessageBusTest extends TestCase
@ -61,4 +65,98 @@ class MessageBusTest extends TestCase
$this->assertEquals($responseFromDepthMiddleware, $bus->dispatch($message));
}
public function testItKeepsTheEnvelopeEvenThroughAMiddlewareThatIsNotEnvelopeAware()
{
$message = new DummyMessage('Hello');
$envelope = new Envelope($message, array(new ReceivedMessage()));
$firstMiddleware = $this->getMockBuilder(MiddlewareInterface::class)->getMock();
$firstMiddleware->expects($this->once())
->method('handle')
->with($message, $this->anything())
->will($this->returnCallback(function ($message, $next) {
return $next($message);
}));
$secondMiddleware = $this->getMockBuilder(array(MiddlewareInterface::class, EnvelopeAwareInterface::class))->getMock();
$secondMiddleware->expects($this->once())
->method('handle')
->with($envelope, $this->anything())
;
$bus = new MessageBus(array(
$firstMiddleware,
$secondMiddleware,
));
$bus->dispatch($envelope);
}
public function testThatAMiddlewareCanAddSomeItemsToTheEnvelope()
{
$message = new DummyMessage('Hello');
$envelope = new Envelope($message, array(new ReceivedMessage()));
$envelopeWithAnotherItem = $envelope->with(new AnEnvelopeItem());
$firstMiddleware = $this->getMockBuilder(array(MiddlewareInterface::class, EnvelopeAwareInterface::class))->getMock();
$firstMiddleware->expects($this->once())
->method('handle')
->with($envelope, $this->anything())
->will($this->returnCallback(function ($message, $next) {
return $next($message->with(new AnEnvelopeItem()));
}));
$secondMiddleware = $this->getMockBuilder(MiddlewareInterface::class)->getMock();
$secondMiddleware->expects($this->once())
->method('handle')
->with($message, $this->anything())
->will($this->returnCallback(function ($message, $next) {
return $next($message);
}));
$thirdMiddleware = $this->getMockBuilder(array(MiddlewareInterface::class, EnvelopeAwareInterface::class))->getMock();
$thirdMiddleware->expects($this->once())
->method('handle')
->with($envelopeWithAnotherItem, $this->anything())
;
$bus = new MessageBus(array(
$firstMiddleware,
$secondMiddleware,
$thirdMiddleware,
));
$bus->dispatch($envelope);
}
public function testThatAMiddlewareCanUpdateTheMessageWhileKeepingTheEnvelopeItems()
{
$message = new DummyMessage('Hello');
$envelope = new Envelope($message, $items = array(new ReceivedMessage()));
$changedMessage = new DummyMessage('Changed');
$expectedEnvelope = new Envelope($changedMessage, $items);
$firstMiddleware = $this->getMockBuilder(MiddlewareInterface::class)->getMock();
$firstMiddleware->expects($this->once())
->method('handle')
->with($message, $this->anything())
->will($this->returnCallback(function ($message, $next) use ($changedMessage) {
return $next($changedMessage);
}));
$secondMiddleware = $this->getMockBuilder(array(MiddlewareInterface::class, EnvelopeAwareInterface::class))->getMock();
$secondMiddleware->expects($this->once())
->method('handle')
->with($expectedEnvelope, $this->anything())
;
$bus = new MessageBus(array(
$firstMiddleware,
$secondMiddleware,
));
$bus->dispatch($envelope);
}
}

View File

@ -0,0 +1,38 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Tests\Middleware\Configuration;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Middleware\Configuration\ValidationConfiguration;
use Symfony\Component\Validator\Constraints\GroupSequence;
/**
* @author Maxime Steinhausser <maxime.steinhausser@gmail.com>
*/
class ValidationConfigurationTest extends TestCase
{
public function testConfig()
{
$config = new ValidationConfiguration($groups = array('Default', 'Extra'));
$this->assertSame($groups, $config->getGroups());
$config = new ValidationConfiguration($groups = new GroupSequence(array('Default', 'Then')));
$this->assertSame($groups, $config->getGroups());
}
public function testSerialiazable()
{
$this->assertTrue(is_subclass_of(ValidationConfiguration::class, \Serializable::class, true));
$this->assertEquals($config = new ValidationConfiguration(array('Default', 'Extra')), unserialize(serialize($config)));
$this->assertEquals($config = new ValidationConfiguration(new GroupSequence(array('Default', 'Then'))), unserialize(serialize($config)));
}
}

View File

@ -12,6 +12,8 @@
namespace Symfony\Component\Messenger\Tests\Middleware;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Middleware\Configuration\ValidationConfiguration;
use Symfony\Component\Messenger\Middleware\ValidationMiddleware;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Validator\ConstraintViolationListInterface;
@ -43,6 +45,30 @@ class ValidationMiddlewareTest extends TestCase
$this->assertSame('Hello', $result);
}
public function testValidateWithConfigurationAndNextMiddleware()
{
$envelope = Envelope::wrap($message = new DummyMessage('Hey'))->with(new ValidationConfiguration($groups = array('Default', 'Extra')));
$validator = $this->createMock(ValidatorInterface::class);
$validator
->expects($this->once())
->method('validate')
->with($message, null, $groups)
->willReturn($this->createMock(ConstraintViolationListInterface::class))
;
$next = $this->createPartialMock(\stdClass::class, array('__invoke'));
$next
->expects($this->once())
->method('__invoke')
->with($envelope)
->willReturn('Hello')
;
$result = (new ValidationMiddleware($validator))->handle($envelope, $next);
$this->assertSame('Hello', $result);
}
/**
* @expectedException \Symfony\Component\Messenger\Exception\ValidationFailedException
* @expectedExceptionMessage Message of type "Symfony\Component\Messenger\Tests\Fixtures\DummyMessage" failed validation.

View File

@ -12,7 +12,9 @@
namespace Symfony\Component\Messenger\Tests;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Tests\Fixtures\AnEnvelopeItem;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\TraceableMessageBus;
@ -27,7 +29,29 @@ class TraceableMessageBusTest extends TestCase
$traceableBus = new TraceableMessageBus($bus);
$this->assertSame($result, $traceableBus->dispatch($message));
$this->assertSame(array(array('message' => $message, 'result' => $result)), $traceableBus->getDispatchedMessages());
$this->assertCount(1, $tracedMessages = $traceableBus->getDispatchedMessages());
$this->assertArraySubset(array(
'message' => $message,
'result' => $result,
'envelopeItems' => null,
), $tracedMessages[0], true);
}
public function testItTracesResultWithEnvelope()
{
$envelope = Envelope::wrap($message = new DummyMessage('Hello'))->with($envelopeItem = new AnEnvelopeItem());
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
$bus->expects($this->once())->method('dispatch')->with($envelope)->willReturn($result = array('foo' => 'bar'));
$traceableBus = new TraceableMessageBus($bus);
$this->assertSame($result, $traceableBus->dispatch($envelope));
$this->assertCount(1, $tracedMessages = $traceableBus->getDispatchedMessages());
$this->assertArraySubset(array(
'message' => $message,
'result' => $result,
'envelopeItems' => array($envelopeItem),
), $tracedMessages[0], true);
}
public function testItTracesExceptions()
@ -45,6 +69,11 @@ class TraceableMessageBusTest extends TestCase
$this->assertSame($exception, $e);
}
$this->assertSame(array(array('message' => $message, 'exception' => $exception)), $traceableBus->getDispatchedMessages());
$this->assertCount(1, $tracedMessages = $traceableBus->getDispatchedMessages());
$this->assertArraySubset(array(
'message' => $message,
'exception' => $exception,
'envelopeItems' => null,
), $tracedMessages[0], true);
}
}

View File

@ -12,6 +12,7 @@
namespace Symfony\Component\Messenger\Tests\Transport\AmqpExt;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceiver;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpSender;
use Symfony\Component\Messenger\Transport\AmqpExt\Connection;
@ -50,12 +51,12 @@ class AmqpExtIntegrationTest extends TestCase
$sender = new AmqpSender($serializer, $connection);
$receiver = new AmqpReceiver($serializer, $connection);
$sender->send($firstMessage = new DummyMessage('First'));
$sender->send($secondMessage = new DummyMessage('Second'));
$sender->send($first = Envelope::wrap(new DummyMessage('First')));
$sender->send($second = Envelope::wrap(new DummyMessage('Second')));
$receivedMessages = 0;
$receiver->receive(function ($message) use ($receiver, &$receivedMessages, $firstMessage, $secondMessage) {
$this->assertEquals(0 == $receivedMessages ? $firstMessage : $secondMessage, $message);
$receiver->receive(function (?Envelope $envelope) use ($receiver, &$receivedMessages, $first, $second) {
$this->assertEquals(0 == $receivedMessages ? $first : $second, $envelope);
if (2 === ++$receivedMessages) {
$receiver->stop();
@ -74,7 +75,7 @@ class AmqpExtIntegrationTest extends TestCase
$connection->queue()->purge();
$sender = new AmqpSender($serializer, $connection);
$sender->send(new DummyMessage('Hello'));
$sender->send(Envelope::wrap(new DummyMessage('Hello')));
$amqpReadTimeout = 30;
$dsn = getenv('MESSENGER_AMQP_DSN').'?read_timeout='.$amqpReadTimeout;
@ -98,7 +99,15 @@ class AmqpExtIntegrationTest extends TestCase
$this->assertFalse($process->isRunning());
$this->assertLessThan($amqpReadTimeout, microtime(true) - $signalTime);
$this->assertEquals($expectedOutput."Get message: Symfony\Component\Messenger\Asynchronous\Transport\ReceivedMessage\nDone.\n", $process->getOutput());
$this->assertSame($expectedOutput.<<<'TXT'
Get envelope with message: Symfony\Component\Messenger\Tests\Fixtures\DummyMessage
with items: [
"Symfony\\Component\\Messenger\\Asynchronous\\Transport\\ReceivedMessage"
]
Done.
TXT
, $process->getOutput());
}
/**
@ -114,12 +123,11 @@ class AmqpExtIntegrationTest extends TestCase
$connection->setup();
$connection->queue()->purge();
$sender = new AmqpSender($serializer, $connection);
$receiver = new AmqpReceiver($serializer, $connection);
$receivedMessages = 0;
$receiver->receive(function ($message) use ($receiver, &$receivedMessages) {
$this->assertNull($message);
$receiver->receive(function (?Envelope $envelope) use ($receiver, &$receivedMessages) {
$this->assertNull($envelope);
if (2 === ++$receivedMessages) {
$receiver->stop();

View File

@ -12,6 +12,7 @@
namespace Symfony\Component\Messenger\Tests\Transport\AmqpExt;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceiver;
use Symfony\Component\Messenger\Transport\AmqpExt\Connection;
use Symfony\Component\Messenger\Transport\AmqpExt\Exception\RejectMessageExceptionInterface;
@ -44,8 +45,8 @@ class AmqpReceiverTest extends TestCase
$connection->expects($this->once())->method('ack')->with($envelope);
$receiver = new AmqpReceiver($serializer, $connection);
$receiver->receive(function ($message) use ($receiver) {
$this->assertEquals(new DummyMessage('Hi'), $message);
$receiver->receive(function (?Envelope $envelope) use ($receiver) {
$this->assertEquals(new DummyMessage('Hi'), $envelope->getMessage());
$receiver->stop();
});
}

View File

@ -11,6 +11,7 @@
namespace Symfony\Component\Messenger\Tests\Transport\AmqpExt;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpSender;
use Symfony\Component\Messenger\Transport\AmqpExt\Connection;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
@ -24,16 +25,16 @@ class AmqpSenderTest extends TestCase
{
public function testItSendsTheEncodedMessage()
{
$message = new DummyMessage('Oy');
$envelope = Envelope::wrap(new DummyMessage('Oy'));
$encoded = array('body' => '...', 'headers' => array('type' => DummyMessage::class));
$encoder = $this->getMockBuilder(EncoderInterface::class)->getMock();
$encoder->method('encode')->with($message)->willReturnOnConsecutiveCalls($encoded);
$encoder->method('encode')->with($envelope)->willReturnOnConsecutiveCalls($encoded);
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
$connection->expects($this->once())->method('publish')->with($encoded['body'], $encoded['headers']);
$sender = new AmqpSender($encoder, $connection);
$sender->send($message);
$sender->send($envelope);
}
}

View File

@ -0,0 +1,48 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Tests\Transport\AmqpExt;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpTransport;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpTransportFactory;
use Symfony\Component\Messenger\Transport\AmqpExt\Connection;
use Symfony\Component\Messenger\Transport\Serialization\DecoderInterface;
use Symfony\Component\Messenger\Transport\Serialization\EncoderInterface;
class AmqpTransportFactoryTest extends TestCase
{
public function testSupportsOnlyAmqpTransports()
{
$factory = new AmqpTransportFactory(
$this->getMockBuilder(EncoderInterface::class)->getMock(),
$this->getMockBuilder(DecoderInterface::class)->getMock(),
true
);
$this->assertTrue($factory->supports('amqp://localhost', array()));
$this->assertFalse($factory->supports('sqs://localhost', array()));
$this->assertFalse($factory->supports('invalid-dsn', array()));
}
public function testItCreatesTheTransport()
{
$factory = new AmqpTransportFactory(
$encoder = $this->getMockBuilder(EncoderInterface::class)->getMock(),
$decoder = $this->getMockBuilder(DecoderInterface::class)->getMock(),
true
);
$expectedTransport = new AmqpTransport($encoder, $decoder, Connection::fromDsn('amqp://localhost', array('foo' => 'bar'), true), array('foo' => 'bar'), true);
$this->assertEquals($expectedTransport, $factory->createTransport('amqp://localhost', array('foo' => 'bar')));
}
}

View File

@ -0,0 +1,67 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Tests\Transport\AmqpExt;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpTransport;
use Symfony\Component\Messenger\Transport\AmqpExt\Connection;
use Symfony\Component\Messenger\Transport\Serialization\DecoderInterface;
use Symfony\Component\Messenger\Transport\Serialization\EncoderInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;
/**
* @requires extension amqp
*/
class AmqpTransportTest extends TestCase
{
public function testItIsATransport()
{
$transport = $this->getTransport();
$this->assertInstanceOf(TransportInterface::class, $transport);
}
public function testReceivesMessages()
{
$transport = $this->getTransport(
null,
$decoder = $this->getMockBuilder(DecoderInterface::class)->getMock(),
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock()
);
$decodedMessage = new DummyMessage('Decoded.');
$amqpEnvelope = $this->getMockBuilder(\AMQPEnvelope::class)->getMock();
$amqpEnvelope->method('getBody')->willReturn('body');
$amqpEnvelope->method('getHeaders')->willReturn(array('my' => 'header'));
$decoder->method('decode')->with(array('body' => 'body', 'headers' => array('my' => 'header')))->willReturn(Envelope::wrap($decodedMessage));
$connection->method('get')->willReturn($amqpEnvelope);
$transport->receive(function (Envelope $envelope) use ($transport, $decodedMessage) {
$this->assertSame($decodedMessage, $envelope->getMessage());
$transport->stop();
});
}
private function getTransport(EncoderInterface $encoder = null, DecoderInterface $decoder = null, Connection $connection = null)
{
$encoder = $encoder ?: $this->getMockBuilder(EncoderInterface::class)->getMock();
$decoder = $decoder ?: $this->getMockBuilder(DecoderInterface::class)->getMock();
$connection = $connection ?: $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
return new AmqpTransport($encoder, $decoder, $connection);
}
}

View File

@ -12,10 +12,9 @@ if (!file_exists($autoload)) {
require_once $autoload;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceiver;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpSender;
use Symfony\Component\Messenger\Transport\AmqpExt\Connection;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceiver;
use Symfony\Component\Messenger\Transport\AmqpExt\Connection;
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
use Symfony\Component\Messenger\Worker;
use Symfony\Component\Serializer as SerializerComponent;
@ -27,13 +26,14 @@ $serializer = new Serializer(
);
$connection = Connection::fromDsn(getenv('DSN'));
$sender = new AmqpSender($serializer, $connection);
$receiver = new AmqpReceiver($serializer, $connection);
$worker = new Worker($receiver, new class() implements MessageBusInterface {
public function dispatch($message)
public function dispatch($envelope)
{
echo 'Get message: '.get_class($message)."\n";
echo 'Get envelope with message: '.get_class($envelope->getMessage())."\n";
echo sprintf("with items: %s\n", json_encode(array_keys($envelope->all()), JSON_PRETTY_PRINT));
sleep(30);
echo "Done.\n";
}

View File

@ -13,6 +13,7 @@ namespace Symfony\Component\Messenger\Tests\Transport\Enhancers;
use PHPUnit\Framework\TestCase;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Tests\Fixtures\CallbackReceiver;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\Enhancers\StopWhenMemoryUsageIsExceededReceiver;
@ -25,7 +26,7 @@ class StopWhenMemoryUsageIsExceededReceiverTest extends TestCase
public function testReceiverStopsWhenMemoryLimitExceeded(int $memoryUsage, int $memoryLimit, bool $shouldStop)
{
$callable = function ($handler) {
$handler(new DummyMessage('API'));
$handler(Envelope::wrap(new DummyMessage('API')));
};
$decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class)
@ -58,7 +59,7 @@ class StopWhenMemoryUsageIsExceededReceiverTest extends TestCase
public function testReceiverLogsMemoryExceededWhenLoggerIsGiven()
{
$callable = function ($handler) {
$handler(new DummyMessage('API'));
$handler(Envelope::wrap(new DummyMessage('API')));
};
$decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class)

View File

@ -13,6 +13,7 @@ namespace Symfony\Component\Messenger\Tests\Transport\Enhancers;
use PHPUnit\Framework\TestCase;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Tests\Fixtures\CallbackReceiver;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\Enhancers\StopWhenMessageCountIsExceededReceiver;
@ -25,9 +26,9 @@ class StopWhenMessageCountIsExceededReceiverTest extends TestCase
public function testReceiverStopsWhenMaximumCountExceeded($max, $shouldStop)
{
$callable = function ($handler) {
$handler(new DummyMessage('First message'));
$handler(new DummyMessage('Second message'));
$handler(new DummyMessage('Third message'));
$handler(Envelope::wrap(new DummyMessage('First message')));
$handler(Envelope::wrap(new DummyMessage('Second message')));
$handler(Envelope::wrap(new DummyMessage('Third message')));
};
$decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class)
@ -78,7 +79,7 @@ class StopWhenMessageCountIsExceededReceiverTest extends TestCase
public function testReceiverLogsMaximumCountExceededWhenLoggerIsGiven()
{
$callable = function ($handler) {
$handler(new DummyMessage('First message'));
$handler(Envelope::wrap(new DummyMessage('First message')));
};
$decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class)

View File

@ -12,8 +12,11 @@
namespace Symfony\Component\Messenger\Tests\Transport\Serialization;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Middleware\Configuration\ValidationConfiguration;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerConfiguration;
use Symfony\Component\Serializer as SerializerComponent;
use Symfony\Component\Serializer\Encoder\JsonEncoder;
use Symfony\Component\Serializer\Normalizer\ObjectNormalizer;
@ -26,9 +29,23 @@ class SerializerTest extends TestCase
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
);
$message = new DummyMessage('Hello');
$envelope = Envelope::wrap(new DummyMessage('Hello'));
$this->assertEquals($message, $serializer->decode($serializer->encode($message)));
$this->assertEquals($envelope, $serializer->decode($serializer->encode($envelope)));
}
public function testEncodedWithConfigurationIsDecodable()
{
$serializer = new Serializer(
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
);
$envelope = Envelope::wrap(new DummyMessage('Hello'))
->with(new SerializerConfiguration(array(ObjectNormalizer::GROUPS => array('foo'))))
->with(new ValidationConfiguration(array('foo', 'bar')))
;
$this->assertEquals($envelope, $serializer->decode($serializer->encode($envelope)));
}
public function testEncodedIsHavingTheBodyAndTypeHeader()
@ -37,11 +54,12 @@ class SerializerTest extends TestCase
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
);
$encoded = $serializer->encode(new DummyMessage('Hello'));
$encoded = $serializer->encode(Envelope::wrap(new DummyMessage('Hello')));
$this->assertArrayHasKey('body', $encoded);
$this->assertArrayHasKey('headers', $encoded);
$this->assertArrayHasKey('type', $encoded['headers']);
$this->assertArrayNotHasKey('X-Message-Envelope-Items', $encoded['headers']);
$this->assertEquals(DummyMessage::class, $encoded['headers']['type']);
}
@ -55,10 +73,31 @@ class SerializerTest extends TestCase
$encoder = new Serializer($serializer, 'csv', array('foo' => 'bar'));
$encoded = $encoder->encode($message);
$encoded = $encoder->encode(Envelope::wrap($message));
$decoded = $encoder->decode($encoded);
$this->assertSame('Yay', $encoded['body']);
$this->assertSame($message, $decoded);
$this->assertSame($message, $decoded->getMessage());
}
public function testEncodedWithSerializationConfiguration()
{
$serializer = new Serializer(
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
);
$envelope = Envelope::wrap(new DummyMessage('Hello'))
->with(new SerializerConfiguration(array(ObjectNormalizer::GROUPS => array('foo'))))
->with(new ValidationConfiguration(array('foo', 'bar')))
;
$encoded = $serializer->encode($envelope);
$this->assertArrayHasKey('body', $encoded);
$this->assertArrayHasKey('headers', $encoded);
$this->assertArrayHasKey('type', $encoded['headers']);
$this->assertEquals(DummyMessage::class, $encoded['headers']['type']);
$this->assertArrayHasKey('X-Message-Envelope-Items', $encoded['headers']);
$this->assertSame('a:2:{s:75:"Symfony\Component\Messenger\Transport\Serialization\SerializerConfiguration";C:75:"Symfony\Component\Messenger\Transport\Serialization\SerializerConfiguration":59:{a:1:{s:7:"context";a:1:{s:6:"groups";a:1:{i:0;s:3:"foo";}}}}s:76:"Symfony\Component\Messenger\Middleware\Configuration\ValidationConfiguration";C:76:"Symfony\Component\Messenger\Middleware\Configuration\ValidationConfiguration":82:{a:2:{s:6:"groups";a:2:{i:0;s:3:"foo";i:1;s:3:"bar";}s:17:"is_group_sequence";b:0;}}}', $encoded['headers']['X-Message-Envelope-Items']);
}
}

View File

@ -13,6 +13,7 @@ namespace Symfony\Component\Messenger\Tests;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Asynchronous\Transport\ReceivedMessage;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Tests\Fixtures\CallbackReceiver;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
@ -22,29 +23,33 @@ class WorkerTest extends TestCase
{
public function testWorkerDispatchTheReceivedMessage()
{
$receiver = new CallbackReceiver(function ($handler) {
$handler(new DummyMessage('API'));
$handler(new DummyMessage('IPA'));
$apiMessage = new DummyMessage('API');
$ipaMessage = new DummyMessage('IPA');
$receiver = new CallbackReceiver(function ($handler) use ($apiMessage, $ipaMessage) {
$handler(Envelope::wrap($apiMessage));
$handler(Envelope::wrap($ipaMessage));
});
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
$bus->expects($this->at(0))->method('dispatch')->with(new ReceivedMessage(new DummyMessage('API')));
$bus->expects($this->at(1))->method('dispatch')->with(new ReceivedMessage(new DummyMessage('IPA')));
$bus->expects($this->at(0))->method('dispatch')->with(Envelope::wrap($apiMessage)->with(new ReceivedMessage()));
$bus->expects($this->at(1))->method('dispatch')->with(Envelope::wrap($ipaMessage)->with(new ReceivedMessage()));
$worker = new Worker($receiver, $bus);
$worker->run();
}
public function testWorkerDoesNotWrapMessagesAlreadyWrappedInReceivedMessages()
public function testWorkerDoesNotWrapMessagesAlreadyWrappedWithReceivedMessage()
{
$receiver = new CallbackReceiver(function ($handler) {
$handler(new ReceivedMessage(new DummyMessage('API')));
$envelop = Envelope::wrap(new DummyMessage('API'))->with(new ReceivedMessage());
$receiver = new CallbackReceiver(function ($handler) use ($envelop) {
$handler($envelop);
});
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
$bus->expects($this->at(0))->method('dispatch')->with(new ReceivedMessage(new DummyMessage('API')));
$bus->expects($this->at(0))->method('dispatch')->with($envelop);
$worker = new Worker($receiver, $bus);
$worker->run();
@ -54,7 +59,7 @@ class WorkerTest extends TestCase
{
$receiver = new CallbackReceiver(function ($handler) {
try {
$handler(new DummyMessage('Hello'));
$handler(Envelope::wrap(new DummyMessage('Hello')));
$this->assertTrue(false, 'This should not be called because the exception is sent back to the generator.');
} catch (\InvalidArgumentException $e) {

View File

@ -29,19 +29,27 @@ class TraceableMessageBus implements MessageBusInterface
*/
public function dispatch($message)
{
$callTime = microtime(true);
$messageToTrace = $message instanceof Envelope ? $message->getMessage() : $message;
$envelopeItems = $message instanceof Envelope ? array_values($message->all()) : null;
try {
$result = $this->decoratedBus->dispatch($message);
$this->dispatchedMessages[] = array(
'message' => $message,
'envelopeItems' => $envelopeItems,
'message' => $messageToTrace,
'result' => $result,
'callTime' => $callTime,
);
return $result;
} catch (\Throwable $e) {
$this->dispatchedMessages[] = array(
'message' => $message,
'envelopeItems' => $envelopeItems,
'message' => $messageToTrace,
'exception' => $e,
'callTime' => $callTime,
);
throw $e;

View File

@ -22,13 +22,13 @@ use Symfony\Component\Messenger\Transport\Serialization\DecoderInterface;
*/
class AmqpReceiver implements ReceiverInterface
{
private $messageDecoder;
private $decoder;
private $connection;
private $shouldStop;
public function __construct(DecoderInterface $messageDecoder, Connection $connection)
public function __construct(DecoderInterface $decoder, Connection $connection)
{
$this->messageDecoder = $messageDecoder;
$this->decoder = $decoder;
$this->connection = $connection;
}
@ -38,8 +38,8 @@ class AmqpReceiver implements ReceiverInterface
public function receive(callable $handler): void
{
while (!$this->shouldStop) {
$message = $this->connection->get();
if (null === $message) {
$AMQPEnvelope = $this->connection->get();
if (null === $AMQPEnvelope) {
$handler(null);
usleep($this->connection->getConnectionCredentials()['loop_sleep'] ?? 200000);
@ -51,18 +51,18 @@ class AmqpReceiver implements ReceiverInterface
}
try {
$handler($this->messageDecoder->decode(array(
'body' => $message->getBody(),
'headers' => $message->getHeaders(),
$handler($this->decoder->decode(array(
'body' => $AMQPEnvelope->getBody(),
'headers' => $AMQPEnvelope->getHeaders(),
)));
$this->connection->ack($message);
$this->connection->ack($AMQPEnvelope);
} catch (RejectMessageExceptionInterface $e) {
$this->connection->reject($message);
$this->connection->reject($AMQPEnvelope);
throw $e;
} catch (\Throwable $e) {
$this->connection->nack($message, AMQP_REQUEUE);
$this->connection->nack($AMQPEnvelope, AMQP_REQUEUE);
throw $e;
} finally {

View File

@ -11,6 +11,7 @@
namespace Symfony\Component\Messenger\Transport\AmqpExt;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\SenderInterface;
use Symfony\Component\Messenger\Transport\Serialization\EncoderInterface;
@ -21,21 +22,21 @@ use Symfony\Component\Messenger\Transport\Serialization\EncoderInterface;
*/
class AmqpSender implements SenderInterface
{
private $messageEncoder;
private $encoder;
private $connection;
public function __construct(EncoderInterface $messageEncoder, Connection $connection)
public function __construct(EncoderInterface $encoder, Connection $connection)
{
$this->messageEncoder = $messageEncoder;
$this->encoder = $encoder;
$this->connection = $connection;
}
/**
* {@inheritdoc}
*/
public function send($message): void
public function send(Envelope $envelope)
{
$encodedMessage = $this->messageEncoder->encode($message);
$encodedMessage = $this->encoder->encode($envelope);
$this->connection->publish($encodedMessage['body'], $encodedMessage['headers']);
}

View File

@ -11,6 +11,7 @@
namespace Symfony\Component\Messenger\Transport\AmqpExt;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\Serialization\DecoderInterface;
use Symfony\Component\Messenger\Transport\Serialization\EncoderInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;
@ -22,20 +23,15 @@ class AmqpTransport implements TransportInterface
{
private $encoder;
private $decoder;
private $dsn;
private $options;
private $debug;
private $connection;
private $receiver;
private $sender;
public function __construct(EncoderInterface $encoder, DecoderInterface $decoder, string $dsn, array $options, bool $debug)
public function __construct(EncoderInterface $encoder, DecoderInterface $decoder, Connection $connection)
{
$this->encoder = $encoder;
$this->decoder = $decoder;
$this->dsn = $dsn;
$this->options = $options;
$this->debug = $debug;
$this->connection = $connection;
}
/**
@ -57,23 +53,18 @@ class AmqpTransport implements TransportInterface
/**
* {@inheritdoc}
*/
public function send($message): void
public function send(Envelope $envelope): void
{
($this->sender ?? $this->getSender())->send($message);
($this->sender ?? $this->getSender())->send($envelope);
}
private function getReceiver()
{
return $this->receiver = new AmqpReceiver($this->decoder, $this->connection ?? $this->getConnection());
return $this->receiver = new AmqpReceiver($this->decoder, $this->connection);
}
private function getSender()
{
return $this->sender = new AmqpSender($this->encoder, $this->connection ?? $this->getConnection());
}
private function getConnection()
{
return $this->connection = Connection::fromDsn($this->dsn, $this->options, $this->debug);
return $this->sender = new AmqpSender($this->encoder, $this->connection);
}
}

View File

@ -34,7 +34,7 @@ class AmqpTransportFactory implements TransportFactoryInterface
public function createTransport(string $dsn, array $options): TransportInterface
{
return new AmqpTransport($this->encoder, $this->decoder, $dsn, $options, $this->debug);
return new AmqpTransport($this->encoder, $this->decoder, Connection::fromDsn($dsn, $options, $this->debug));
}
public function supports(string $dsn, array $options): bool

View File

@ -12,6 +12,7 @@
namespace Symfony\Component\Messenger\Transport\Enhancers;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\ReceiverInterface;
/**
@ -36,8 +37,8 @@ class StopWhenMemoryUsageIsExceededReceiver implements ReceiverInterface
public function receive(callable $handler): void
{
$this->decoratedReceiver->receive(function ($message) use ($handler) {
$handler($message);
$this->decoratedReceiver->receive(function (?Envelope $envelope) use ($handler) {
$handler($envelope);
$memoryResolver = $this->memoryResolver;
if ($memoryResolver() > $this->memoryLimit) {

View File

@ -12,6 +12,7 @@
namespace Symfony\Component\Messenger\Transport\Enhancers;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\ReceiverInterface;
/**
@ -34,10 +35,10 @@ class StopWhenMessageCountIsExceededReceiver implements ReceiverInterface
{
$receivedMessages = 0;
$this->decoratedReceiver->receive(function ($message) use ($handler, &$receivedMessages) {
$handler($message);
$this->decoratedReceiver->receive(function (?Envelope $envelope) use ($handler, &$receivedMessages) {
$handler($envelope);
if (null !== $message && ++$receivedMessages >= $this->maximumNumberOfMessages) {
if (null !== $envelope && ++$receivedMessages >= $this->maximumNumberOfMessages) {
$this->stop();
if (null !== $this->logger) {
$this->logger->info('Receiver stopped due to maximum count of {count} exceeded', array('count' => $this->maximumNumberOfMessages));

View File

@ -21,10 +21,10 @@ interface ReceiverInterface
/**
* Receive some messages to the given handler.
*
* The handler will have, as argument, the received message. Note that this message
* can be `null` if the timeout to receive something has expired.
* The handler will have, as argument, the received {@link \Symfony\Component\Messenger\Envelope} containing the message.
* Note that this envelope can be `null` if the timeout to receive something has expired.
*/
public function receive(callable $handler) : void;
public function receive(callable $handler): void;
/**
* Stop receiving some messages.

View File

@ -11,6 +11,8 @@
namespace Symfony\Component\Messenger\Transport;
use Symfony\Component\Messenger\Envelope;
/**
* @author Samuel Roze <samuel.roze@gmail.com>
*
@ -19,9 +21,9 @@ namespace Symfony\Component\Messenger\Transport;
interface SenderInterface
{
/**
* Sends the given message.
* Sends the given envelope.
*
* @param object $message
* @param Envelope $envelope
*/
public function send($message): void;
public function send(Envelope $envelope);
}

View File

@ -11,6 +11,8 @@
namespace Symfony\Component\Messenger\Transport\Serialization;
use Symfony\Component\Messenger\Envelope;
/**
* @author Samuel Roze <samuel.roze@gmail.com>
*
@ -19,16 +21,16 @@ namespace Symfony\Component\Messenger\Transport\Serialization;
interface DecoderInterface
{
/**
* Decodes the message from an encoded-form.
* Decodes an envelope and its message from an encoded-form.
*
* The `$encodedMessage` parameter is a key-value array that
* describes the message, that will be used by the different transports.
* The `$encodedEnvelope` parameter is a key-value array that
* describes the envelope and its content, that will be used by the different transports.
*
* The most common keys are:
* - `body` (string) - the message body
* - `headers` (string<string>) - a key/value pair of headers
*
* @return object
* @return Envelope
*/
public function decode(array $encodedMessage);
public function decode(array $encodedEnvelope): Envelope;
}

View File

@ -10,6 +10,7 @@
*/
namespace Symfony\Component\Messenger\Transport\Serialization;
use Symfony\Component\Messenger\Envelope;
/**
* @author Samuel Roze <samuel.roze@gmail.com>
@ -19,14 +20,14 @@ namespace Symfony\Component\Messenger\Transport\Serialization;
interface EncoderInterface
{
/**
* Encodes a message to a common format understandable by transports. The encoded array should only
* contain scalar and arrays.
* Encodes an envelope content (message & items) to a common format understandable by transports.
* The encoded array should only contain scalar and arrays.
*
* The most common keys of the encoded array are:
* - `body` (string) - the message body
* - `headers` (string<string>) - a key/value pair of headers
*
* @param object $message The object that is put on the MessageBus by the user
* @param Envelope $envelope The envelop containing the message put on the MessageBus by the user
*/
public function encode($message): array;
public function encode(Envelope $envelope): array;
}

View File

@ -11,6 +11,7 @@
namespace Symfony\Component\Messenger\Transport\Serialization;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Serializer\SerializerInterface;
/**
@ -32,27 +33,48 @@ class Serializer implements DecoderInterface, EncoderInterface
/**
* {@inheritdoc}
*/
public function decode(array $encodedMessage)
public function decode(array $encodedEnvelope): Envelope
{
if (empty($encodedMessage['body']) || empty($encodedMessage['headers'])) {
throw new \InvalidArgumentException('Encoded message should have at least a `body` and some `headers`.');
if (empty($encodedEnvelope['body']) || empty($encodedEnvelope['headers'])) {
throw new \InvalidArgumentException('Encoded envelope should have at least a `body` and some `headers`.');
}
if (empty($encodedMessage['headers']['type'])) {
throw new \InvalidArgumentException('Encoded message does not have a `type` header.');
if (empty($encodedEnvelope['headers']['type'])) {
throw new \InvalidArgumentException('Encoded envelope does not have a `type` header.');
}
return $this->serializer->deserialize($encodedMessage['body'], $encodedMessage['headers']['type'], $this->format, $this->context);
$envelopeItems = isset($encodedEnvelope['headers']['X-Message-Envelope-Items']) ? unserialize($encodedEnvelope['headers']['X-Message-Envelope-Items']) : array();
$context = $this->context;
/** @var SerializerConfiguration|null $serializerConfig */
if ($serializerConfig = $envelopeItems[SerializerConfiguration::class] ?? null) {
$context = $serializerConfig->getContext() + $context;
}
$message = $this->serializer->deserialize($encodedEnvelope['body'], $encodedEnvelope['headers']['type'], $this->format, $context);
return new Envelope($message, $envelopeItems);
}
/**
* {@inheritdoc}
*/
public function encode($message): array
public function encode(Envelope $envelope): array
{
$context = $this->context;
/** @var SerializerConfiguration|null $serializerConfig */
if ($serializerConfig = $envelope->get(SerializerConfiguration::class)) {
$context = $serializerConfig->getContext() + $context;
}
$headers = array('type' => \get_class($envelope->getMessage()));
if ($configurations = $envelope->all()) {
$headers['X-Message-Envelope-Items'] = serialize($configurations);
}
return array(
'body' => $this->serializer->serialize($message, $this->format, $this->context),
'headers' => array('type' => \get_class($message)),
'body' => $this->serializer->serialize($envelope->getMessage(), $this->format, $context),
'headers' => $headers,
);
}
}

View File

@ -0,0 +1,46 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Transport\Serialization;
use Symfony\Component\Messenger\EnvelopeItemInterface;
/**
* @author Maxime Steinhausser <maxime.steinhausser@gmail.com>
*
* @experimental in 4.1
*/
final class SerializerConfiguration implements EnvelopeItemInterface
{
private $context;
public function __construct(array $context)
{
$this->context = $context;
}
public function getContext(): array
{
return $this->context;
}
public function serialize()
{
return serialize(array('context' => $this->context));
}
public function unserialize($serialized)
{
list('context' => $context) = unserialize($serialized, array('allowed_classes' => false));
$this->__construct($context);
}
}

View File

@ -41,16 +41,12 @@ class Worker
});
}
$this->receiver->receive(function ($message) {
if (null === $message) {
$this->receiver->receive(function (?Envelope $envelope) {
if (null === $envelope) {
return;
}
if (!$message instanceof ReceivedMessage) {
$message = new ReceivedMessage($message);
}
$this->bus->dispatch($message);
$this->bus->dispatch($envelope->with(new ReceivedMessage()));
});
}
}

View File

@ -20,6 +20,7 @@
},
"require-dev": {
"psr/log": "~1.0",
"symfony/console": "~3.4|~4.0",
"symfony/dependency-injection": "~3.4.6|~4.0",
"symfony/http-kernel": "~3.4|~4.0",
"symfony/process": "~3.4|~4.0",

View File

@ -30,7 +30,6 @@ final class Definition
/**
* @param string[] $places
* @param Transition[] $transitions
* @param string|null $initialPlace
*/
public function __construct(array $places, array $transitions, string $initialPlace = null, MetadataStoreInterface $metadataStore = null)
{

View File

@ -11,6 +11,8 @@
namespace Symfony\Component\Workflow;
use Symfony\Component\Workflow\Metadata\MetadataStoreInterface;
/**
* Builds a definition.
*
@ -23,6 +25,7 @@ class DefinitionBuilder
private $places = array();
private $transitions = array();
private $initialPlace;
private $metadataStore;
/**
* @param string[] $places
@ -39,7 +42,7 @@ class DefinitionBuilder
*/
public function build()
{
return new Definition($this->places, $this->transitions, $this->initialPlace);
return new Definition($this->places, $this->transitions, $this->initialPlace, $this->metadataStore);
}
/**
@ -52,6 +55,7 @@ class DefinitionBuilder
$this->places = array();
$this->transitions = array();
$this->initialPlace = null;
$this->metadataStore = null;
return $this;
}
@ -122,6 +126,16 @@ class DefinitionBuilder
return $this;
}
/**
* @return $this
*/
public function setMetadataStore(MetadataStoreInterface $metadataStore)
{
$this->metadataStore = $metadataStore;
return $this;
}
/**
* @deprecated since Symfony 4.1, use the clear() method instead.
*

View File

@ -4,6 +4,7 @@ namespace Symfony\Component\Workflow\Tests;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Workflow\DefinitionBuilder;
use Symfony\Component\Workflow\Metadata\InMemoryMetadataStore;
use Symfony\Component\Workflow\Transition;
class DefinitionBuilderTest extends TestCase
@ -44,4 +45,14 @@ class DefinitionBuilderTest extends TestCase
$this->assertEquals('a', $definition->getPlaces()['a']);
$this->assertEquals('b', $definition->getPlaces()['b']);
}
public function testSetMetadataStore()
{
$builder = new DefinitionBuilder(array('a'));
$metadataStore = new InMemoryMetadataStore();
$builder->setMetadataStore($metadataStore);
$definition = $builder->build();
$this->assertSame($metadataStore, $definition->getMetadataStore());
}
}