feature #26941 [Messenger] Allow to configure the transport (sroze)
This PR was squashed before being merged into the 4.1-dev branch (closes #26941).
Discussion
----------
[Messenger] Allow to configure the transport
| Q | A
| ------------- | ---
| Branch? | master
| Bug fix? | no
| New feature? | ish
| BC breaks? | no
| Deprecations? | no
| Tests pass? | yes
| Fixed tickets | #26900, #26908, #26935
| License | MIT
| Doc PR | ø
We allow users to configure the encoder/decoder used by the built-in adapter(s). This also adds the support of configuring the default's encoder/decoder format and context.
Commits
-------
1a3f0bbb14
[Messenger] Allow to configure the transport
This commit is contained in:
commit
fe199317da
@ -1008,6 +1008,21 @@ class Configuration implements ConfigurationInterface
|
||||
->end()
|
||||
->end()
|
||||
->end()
|
||||
->arrayNode('serializer')
|
||||
->canBeDisabled()
|
||||
->addDefaultsIfNotSet()
|
||||
->children()
|
||||
->scalarNode('format')->defaultValue('json')->end()
|
||||
->arrayNode('context')
|
||||
->normalizeKeys(false)
|
||||
->useAttributeAsKey('name')
|
||||
->defaultValue(array())
|
||||
->prototype('variable')->end()
|
||||
->end()
|
||||
->end()
|
||||
->end()
|
||||
->scalarNode('encoder')->defaultValue('messenger.transport.serializer')->end()
|
||||
->scalarNode('decoder')->defaultValue('messenger.transport.serializer')->end()
|
||||
->arrayNode('middlewares')
|
||||
->addDefaultsIfNotSet()
|
||||
->children()
|
||||
|
@ -273,7 +273,7 @@ class FrameworkExtension extends Extension
|
||||
}
|
||||
|
||||
if ($this->isConfigEnabled($container, $config['messenger'])) {
|
||||
$this->registerMessengerConfiguration($config['messenger'], $container, $loader);
|
||||
$this->registerMessengerConfiguration($config['messenger'], $container, $loader, $config['serializer']);
|
||||
} else {
|
||||
$container->removeDefinition('console.command.messenger_consume_messages');
|
||||
}
|
||||
@ -1438,7 +1438,7 @@ class FrameworkExtension extends Extension
|
||||
}
|
||||
}
|
||||
|
||||
private function registerMessengerConfiguration(array $config, ContainerBuilder $container, XmlFileLoader $loader)
|
||||
private function registerMessengerConfiguration(array $config, ContainerBuilder $container, XmlFileLoader $loader, array $serializerConfig)
|
||||
{
|
||||
if (!interface_exists(MessageBusInterface::class)) {
|
||||
throw new LogicException('Messenger support cannot be enabled as the Messenger component is not installed.');
|
||||
@ -1446,6 +1446,21 @@ class FrameworkExtension extends Extension
|
||||
|
||||
$loader->load('messenger.xml');
|
||||
|
||||
if ($this->isConfigEnabled($container, $config['serializer'])) {
|
||||
if (count($config['adapters']) > 0 && !$this->isConfigEnabled($container, $serializerConfig)) {
|
||||
throw new LogicException('Using the default encoder/decoder, Symfony Messenger requires the Serializer. Enable it or install it by running "composer require symfony/serializer-pack".');
|
||||
}
|
||||
|
||||
$container->getDefinition('messenger.transport.serializer')
|
||||
->replaceArgument(1, $config['serializer']['format'])
|
||||
->replaceArgument(2, $config['serializer']['context']);
|
||||
} else {
|
||||
$container->removeDefinition('messenger.transport.serializer');
|
||||
}
|
||||
|
||||
$container->setAlias('messenger.transport.encoder', $config['encoder']);
|
||||
$container->setAlias('messenger.transport.decoder', $config['decoder']);
|
||||
|
||||
$messageToSenderIdsMapping = array();
|
||||
foreach ($config['routing'] as $message => $messageConfiguration) {
|
||||
$messageToSenderIdsMapping[$message] = $messageConfiguration['senders'];
|
||||
|
@ -43,13 +43,12 @@
|
||||
</service>
|
||||
|
||||
<!-- Message encoding/decoding -->
|
||||
<service id="messenger.transport.serialize_message_with_type_in_headers" class="Symfony\Component\Messenger\Transport\Serialization\Serializer">
|
||||
<service id="messenger.transport.serializer" class="Symfony\Component\Messenger\Transport\Serialization\Serializer">
|
||||
<argument type="service" id="serializer" />
|
||||
<argument /> <!-- Format -->
|
||||
<argument type="collection" /> <!-- Context -->
|
||||
</service>
|
||||
|
||||
<service id="messenger.transport.default_encoder" alias="messenger.transport.serialize_message_with_type_in_headers" public="true" />
|
||||
<service id="messenger.transport.default_decoder" alias="messenger.transport.serialize_message_with_type_in_headers" public="true" />
|
||||
|
||||
<!-- Logging & Debug -->
|
||||
<service id="messenger.middleware.debug.logging" class="Symfony\Component\Messenger\Debug\LoggingMiddleware">
|
||||
<argument type="service" id="logger" />
|
||||
@ -79,8 +78,8 @@
|
||||
</service>
|
||||
|
||||
<service id="messenger.adapter.amqp.factory" class="Symfony\Component\Messenger\Adapter\AmqpExt\AmqpAdapterFactory">
|
||||
<argument type="service" id="messenger.transport.default_encoder" />
|
||||
<argument type="service" id="messenger.transport.default_decoder" />
|
||||
<argument type="service" id="messenger.transport.encoder" />
|
||||
<argument type="service" id="messenger.transport.decoder" />
|
||||
<argument>%kernel.debug%</argument>
|
||||
|
||||
<tag name="messenger.adapter_factory" />
|
||||
|
@ -352,12 +352,23 @@
|
||||
|
||||
<xsd:complexType name="messenger">
|
||||
<xsd:sequence>
|
||||
<xsd:element name="serializer" type="messenger_serializer" minOccurs="0" />
|
||||
<xsd:element name="encoder" type="xsd:string" minOccurs="0" />
|
||||
<xsd:element name="decoder" type="xsd:string" minOccurs="0" />
|
||||
<xsd:element name="routing" type="messenger_routing" minOccurs="0" maxOccurs="unbounded" />
|
||||
<xsd:element name="middlewares" type="messenger_middleware" minOccurs="0" maxOccurs="unbounded" />
|
||||
<xsd:element name="adapter" type="messenger_adapter" minOccurs="0" maxOccurs="unbounded" />
|
||||
</xsd:sequence>
|
||||
</xsd:complexType>
|
||||
|
||||
<xsd:complexType name="messenger_serializer">
|
||||
<xsd:sequence>
|
||||
<xsd:element name="context" type="metadata" minOccurs="0" maxOccurs="unbounded" />
|
||||
</xsd:sequence>
|
||||
<xsd:attribute name="format" type="xsd:string" />
|
||||
<xsd:attribute name="enabled" type="xsd:boolean" />
|
||||
</xsd:complexType>
|
||||
|
||||
<xsd:complexType name="messenger_routing">
|
||||
<xsd:choice minOccurs="0" maxOccurs="unbounded">
|
||||
<xsd:element name="sender" type="messenger_routing_sender" />
|
||||
|
@ -18,6 +18,7 @@ use Symfony\Component\Config\Definition\Exception\InvalidConfigurationException;
|
||||
use Symfony\Component\Config\Definition\Processor;
|
||||
use Symfony\Component\Lock\Store\SemaphoreStore;
|
||||
use Symfony\Component\Messenger\MessageBusInterface;
|
||||
use Symfony\Component\Serializer\Serializer;
|
||||
|
||||
class ConfigurationTest extends TestCase
|
||||
{
|
||||
@ -259,6 +260,13 @@ class ConfigurationTest extends TestCase
|
||||
),
|
||||
),
|
||||
'adapters' => array(),
|
||||
'serializer' => array(
|
||||
'enabled' => true,
|
||||
'format' => 'json',
|
||||
'context' => array(),
|
||||
),
|
||||
'encoder' => 'messenger.transport.serializer',
|
||||
'decoder' => 'messenger.transport.serializer',
|
||||
),
|
||||
);
|
||||
}
|
||||
|
@ -1,6 +1,7 @@
|
||||
<?php
|
||||
|
||||
$container->loadFromExtension('framework', array(
|
||||
'serializer' => true,
|
||||
'messenger' => array(
|
||||
'adapters' => array(
|
||||
'default' => 'amqp://localhost/%2f/messages',
|
||||
|
@ -0,0 +1,10 @@
|
||||
<?php
|
||||
|
||||
$container->loadFromExtension('framework', array(
|
||||
'messenger' => array(
|
||||
'serializer' => array(
|
||||
'format' => 'csv',
|
||||
'context' => array('enable_max_depth' => true),
|
||||
),
|
||||
),
|
||||
));
|
@ -0,0 +1,15 @@
|
||||
<?php
|
||||
|
||||
$container->loadFromExtension('framework', array(
|
||||
'serializer' => array(
|
||||
'enabled' => false,
|
||||
),
|
||||
'messenger' => array(
|
||||
'serializer' => array(
|
||||
'enabled' => true,
|
||||
),
|
||||
'adapters' => array(
|
||||
'default' => 'amqp://localhost/%2f/messages',
|
||||
),
|
||||
),
|
||||
));
|
@ -6,6 +6,7 @@
|
||||
http://symfony.com/schema/dic/symfony http://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
|
||||
|
||||
<framework:config>
|
||||
<framework:serializer enabled="true" />
|
||||
<framework:messenger>
|
||||
<framework:adapter name="default" dsn="amqp://localhost/%2f/messages" />
|
||||
<framework:adapter name="customised" dsn="amqp://localhost/%2f/messages?exchange_name=exchange_name">
|
||||
|
@ -0,0 +1,17 @@
|
||||
<?xml version="1.0" encoding="utf-8" ?>
|
||||
<container xmlns="http://symfony.com/schema/dic/services"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xmlns:framework="http://symfony.com/schema/dic/symfony"
|
||||
xsi:schemaLocation="http://symfony.com/schema/dic/services http://symfony.com/schema/dic/services/services-1.0.xsd
|
||||
http://symfony.com/schema/dic/symfony http://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
|
||||
|
||||
<framework:config>
|
||||
<framework:messenger>
|
||||
<framework:serializer format="csv">
|
||||
<framework:context>
|
||||
<framework:enable_max_depth>true</framework:enable_max_depth>
|
||||
</framework:context>
|
||||
</framework:serializer>
|
||||
</framework:messenger>
|
||||
</framework:config>
|
||||
</container>
|
@ -0,0 +1,15 @@
|
||||
<?xml version="1.0" encoding="utf-8" ?>
|
||||
<container xmlns="http://symfony.com/schema/dic/services"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xmlns:framework="http://symfony.com/schema/dic/symfony"
|
||||
xsi:schemaLocation="http://symfony.com/schema/dic/services http://symfony.com/schema/dic/services/services-1.0.xsd
|
||||
http://symfony.com/schema/dic/symfony http://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
|
||||
|
||||
<framework:config>
|
||||
<framework:serializer enabled="false" />
|
||||
<framework:messenger>
|
||||
<framework:serializer enabled="true" />
|
||||
<framework:adapter name="default" dsn="amqp://localhost/%2f/messages" />
|
||||
</framework:messenger>
|
||||
</framework:config>
|
||||
</container>
|
@ -1,4 +1,5 @@
|
||||
framework:
|
||||
serializer: true
|
||||
messenger:
|
||||
adapters:
|
||||
default: 'amqp://localhost/%2f/messages'
|
||||
|
@ -0,0 +1,6 @@
|
||||
framework:
|
||||
messenger:
|
||||
serializer:
|
||||
format: csv
|
||||
context:
|
||||
enable_max_depth: true
|
@ -0,0 +1,8 @@
|
||||
framework:
|
||||
serializer:
|
||||
enabled: false
|
||||
messenger:
|
||||
serializer:
|
||||
enabled: true
|
||||
adapters:
|
||||
default: 'amqp://localhost/%2f/messages'
|
@ -569,6 +569,27 @@ abstract class FrameworkExtensionTest extends TestCase
|
||||
$this->assertSame(array('queue' => array('name' => 'Queue')), $receiverArguments[1]);
|
||||
}
|
||||
|
||||
/**
|
||||
* @expectedException \Symfony\Component\DependencyInjection\Exception\LogicException
|
||||
* @expectedExceptionMessage Using the default encoder/decoder, Symfony Messenger requires the Serializer. Enable it or install it by running "composer require symfony/serializer-pack".
|
||||
*/
|
||||
public function testMessengerTransportConfigurationWithoutSerializer()
|
||||
{
|
||||
$this->createContainerFromFile('messenger_transport_no_serializer');
|
||||
}
|
||||
|
||||
public function testMessengerTransportConfiguration()
|
||||
{
|
||||
$container = $this->createContainerFromFile('messenger_transport');
|
||||
|
||||
$this->assertSame('messenger.transport.serializer', (string) $container->getAlias('messenger.transport.encoder'));
|
||||
$this->assertSame('messenger.transport.serializer', (string) $container->getAlias('messenger.transport.decoder'));
|
||||
|
||||
$serializerTransportDefinition = $container->getDefinition('messenger.transport.serializer');
|
||||
$this->assertSame('csv', $serializerTransportDefinition->getArgument(1));
|
||||
$this->assertSame(array('enable_max_depth' => true), $serializerTransportDefinition->getArgument(2));
|
||||
}
|
||||
|
||||
public function testTranslator()
|
||||
{
|
||||
$container = $this->createContainerFromFile('full');
|
||||
|
@ -53,12 +53,6 @@ class MessengerPass implements CompilerPassInterface
|
||||
$container->removeDefinition('messenger.middleware.debug.logging');
|
||||
}
|
||||
|
||||
if (!$container->has('serializer')) {
|
||||
$container->removeDefinition('messenger.transport.serialize_message_with_type_in_headers');
|
||||
$container->removeAlias('messenger.transport.default_encoder');
|
||||
$container->removeAlias('messenger.transport.default_decoder');
|
||||
}
|
||||
|
||||
$this->registerReceivers($container);
|
||||
$this->registerSenders($container);
|
||||
$this->registerHandlers($container);
|
||||
|
@ -44,4 +44,21 @@ class SerializerTest extends TestCase
|
||||
$this->assertArrayHasKey('type', $encoded['headers']);
|
||||
$this->assertEquals(DummyMessage::class, $encoded['headers']['type']);
|
||||
}
|
||||
|
||||
public function testUsesTheCustomFormatAndContext()
|
||||
{
|
||||
$message = new DummyMessage('Foo');
|
||||
|
||||
$serializer = $this->getMockBuilder(SerializerComponent\SerializerInterface::class)->getMock();
|
||||
$serializer->expects($this->once())->method('serialize')->with($message, 'csv', array('foo' => 'bar'))->willReturn('Yay');
|
||||
$serializer->expects($this->once())->method('deserialize')->with('Yay', DummyMessage::class, 'csv', array('foo' => 'bar'))->willReturn($message);
|
||||
|
||||
$encoder = new Serializer($serializer, 'csv', array('foo' => 'bar'));
|
||||
|
||||
$encoded = $encoder->encode($message);
|
||||
$decoded = $encoder->decode($encoded);
|
||||
|
||||
$this->assertSame('Yay', $encoded['body']);
|
||||
$this->assertSame($message, $decoded);
|
||||
}
|
||||
}
|
||||
|
@ -20,11 +20,13 @@ class Serializer implements DecoderInterface, EncoderInterface
|
||||
{
|
||||
private $serializer;
|
||||
private $format;
|
||||
private $context;
|
||||
|
||||
public function __construct(SerializerInterface $serializer, string $format = 'json')
|
||||
public function __construct(SerializerInterface $serializer, string $format = 'json', array $context = array())
|
||||
{
|
||||
$this->serializer = $serializer;
|
||||
$this->format = $format;
|
||||
$this->context = $context;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -40,7 +42,7 @@ class Serializer implements DecoderInterface, EncoderInterface
|
||||
throw new \InvalidArgumentException('Encoded message does not have a `type` header.');
|
||||
}
|
||||
|
||||
return $this->serializer->deserialize($encodedMessage['body'], $encodedMessage['headers']['type'], $this->format);
|
||||
return $this->serializer->deserialize($encodedMessage['body'], $encodedMessage['headers']['type'], $this->format, $this->context);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -49,7 +51,7 @@ class Serializer implements DecoderInterface, EncoderInterface
|
||||
public function encode($message): array
|
||||
{
|
||||
return array(
|
||||
'body' => $this->serializer->serialize($message, $this->format),
|
||||
'body' => $this->serializer->serialize($message, $this->format, $this->context),
|
||||
'headers' => array('type' => \get_class($message)),
|
||||
);
|
||||
}
|
||||
|
Reference in New Issue
Block a user