[Messenger] Allow to configure the transport
This commit is contained in:
parent
d2f8df8054
commit
1a3f0bbb14
@ -1008,6 +1008,21 @@ class Configuration implements ConfigurationInterface
|
|||||||
->end()
|
->end()
|
||||||
->end()
|
->end()
|
||||||
->end()
|
->end()
|
||||||
|
->arrayNode('serializer')
|
||||||
|
->canBeDisabled()
|
||||||
|
->addDefaultsIfNotSet()
|
||||||
|
->children()
|
||||||
|
->scalarNode('format')->defaultValue('json')->end()
|
||||||
|
->arrayNode('context')
|
||||||
|
->normalizeKeys(false)
|
||||||
|
->useAttributeAsKey('name')
|
||||||
|
->defaultValue(array())
|
||||||
|
->prototype('variable')->end()
|
||||||
|
->end()
|
||||||
|
->end()
|
||||||
|
->end()
|
||||||
|
->scalarNode('encoder')->defaultValue('messenger.transport.serializer')->end()
|
||||||
|
->scalarNode('decoder')->defaultValue('messenger.transport.serializer')->end()
|
||||||
->arrayNode('middlewares')
|
->arrayNode('middlewares')
|
||||||
->addDefaultsIfNotSet()
|
->addDefaultsIfNotSet()
|
||||||
->children()
|
->children()
|
||||||
|
@ -273,7 +273,7 @@ class FrameworkExtension extends Extension
|
|||||||
}
|
}
|
||||||
|
|
||||||
if ($this->isConfigEnabled($container, $config['messenger'])) {
|
if ($this->isConfigEnabled($container, $config['messenger'])) {
|
||||||
$this->registerMessengerConfiguration($config['messenger'], $container, $loader);
|
$this->registerMessengerConfiguration($config['messenger'], $container, $loader, $config['serializer']);
|
||||||
} else {
|
} else {
|
||||||
$container->removeDefinition('console.command.messenger_consume_messages');
|
$container->removeDefinition('console.command.messenger_consume_messages');
|
||||||
}
|
}
|
||||||
@ -1438,7 +1438,7 @@ class FrameworkExtension extends Extension
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private function registerMessengerConfiguration(array $config, ContainerBuilder $container, XmlFileLoader $loader)
|
private function registerMessengerConfiguration(array $config, ContainerBuilder $container, XmlFileLoader $loader, array $serializerConfig)
|
||||||
{
|
{
|
||||||
if (!interface_exists(MessageBusInterface::class)) {
|
if (!interface_exists(MessageBusInterface::class)) {
|
||||||
throw new LogicException('Messenger support cannot be enabled as the Messenger component is not installed.');
|
throw new LogicException('Messenger support cannot be enabled as the Messenger component is not installed.');
|
||||||
@ -1446,6 +1446,21 @@ class FrameworkExtension extends Extension
|
|||||||
|
|
||||||
$loader->load('messenger.xml');
|
$loader->load('messenger.xml');
|
||||||
|
|
||||||
|
if ($this->isConfigEnabled($container, $config['serializer'])) {
|
||||||
|
if (count($config['adapters']) > 0 && !$this->isConfigEnabled($container, $serializerConfig)) {
|
||||||
|
throw new LogicException('Using the default encoder/decoder, Symfony Messenger requires the Serializer. Enable it or install it by running "composer require symfony/serializer-pack".');
|
||||||
|
}
|
||||||
|
|
||||||
|
$container->getDefinition('messenger.transport.serializer')
|
||||||
|
->replaceArgument(1, $config['serializer']['format'])
|
||||||
|
->replaceArgument(2, $config['serializer']['context']);
|
||||||
|
} else {
|
||||||
|
$container->removeDefinition('messenger.transport.serializer');
|
||||||
|
}
|
||||||
|
|
||||||
|
$container->setAlias('messenger.transport.encoder', $config['encoder']);
|
||||||
|
$container->setAlias('messenger.transport.decoder', $config['decoder']);
|
||||||
|
|
||||||
$messageToSenderIdsMapping = array();
|
$messageToSenderIdsMapping = array();
|
||||||
foreach ($config['routing'] as $message => $messageConfiguration) {
|
foreach ($config['routing'] as $message => $messageConfiguration) {
|
||||||
$messageToSenderIdsMapping[$message] = $messageConfiguration['senders'];
|
$messageToSenderIdsMapping[$message] = $messageConfiguration['senders'];
|
||||||
|
@ -43,13 +43,12 @@
|
|||||||
</service>
|
</service>
|
||||||
|
|
||||||
<!-- Message encoding/decoding -->
|
<!-- Message encoding/decoding -->
|
||||||
<service id="messenger.transport.serialize_message_with_type_in_headers" class="Symfony\Component\Messenger\Transport\Serialization\Serializer">
|
<service id="messenger.transport.serializer" class="Symfony\Component\Messenger\Transport\Serialization\Serializer">
|
||||||
<argument type="service" id="serializer" />
|
<argument type="service" id="serializer" />
|
||||||
|
<argument /> <!-- Format -->
|
||||||
|
<argument type="collection" /> <!-- Context -->
|
||||||
</service>
|
</service>
|
||||||
|
|
||||||
<service id="messenger.transport.default_encoder" alias="messenger.transport.serialize_message_with_type_in_headers" public="true" />
|
|
||||||
<service id="messenger.transport.default_decoder" alias="messenger.transport.serialize_message_with_type_in_headers" public="true" />
|
|
||||||
|
|
||||||
<!-- Logging & Debug -->
|
<!-- Logging & Debug -->
|
||||||
<service id="messenger.middleware.debug.logging" class="Symfony\Component\Messenger\Debug\LoggingMiddleware">
|
<service id="messenger.middleware.debug.logging" class="Symfony\Component\Messenger\Debug\LoggingMiddleware">
|
||||||
<argument type="service" id="logger" />
|
<argument type="service" id="logger" />
|
||||||
@ -79,8 +78,8 @@
|
|||||||
</service>
|
</service>
|
||||||
|
|
||||||
<service id="messenger.adapter.amqp.factory" class="Symfony\Component\Messenger\Adapter\AmqpExt\AmqpAdapterFactory">
|
<service id="messenger.adapter.amqp.factory" class="Symfony\Component\Messenger\Adapter\AmqpExt\AmqpAdapterFactory">
|
||||||
<argument type="service" id="messenger.transport.default_encoder" />
|
<argument type="service" id="messenger.transport.encoder" />
|
||||||
<argument type="service" id="messenger.transport.default_decoder" />
|
<argument type="service" id="messenger.transport.decoder" />
|
||||||
<argument>%kernel.debug%</argument>
|
<argument>%kernel.debug%</argument>
|
||||||
|
|
||||||
<tag name="messenger.adapter_factory" />
|
<tag name="messenger.adapter_factory" />
|
||||||
|
@ -352,12 +352,23 @@
|
|||||||
|
|
||||||
<xsd:complexType name="messenger">
|
<xsd:complexType name="messenger">
|
||||||
<xsd:sequence>
|
<xsd:sequence>
|
||||||
|
<xsd:element name="serializer" type="messenger_serializer" minOccurs="0" />
|
||||||
|
<xsd:element name="encoder" type="xsd:string" minOccurs="0" />
|
||||||
|
<xsd:element name="decoder" type="xsd:string" minOccurs="0" />
|
||||||
<xsd:element name="routing" type="messenger_routing" minOccurs="0" maxOccurs="unbounded" />
|
<xsd:element name="routing" type="messenger_routing" minOccurs="0" maxOccurs="unbounded" />
|
||||||
<xsd:element name="middlewares" type="messenger_middleware" minOccurs="0" maxOccurs="unbounded" />
|
<xsd:element name="middlewares" type="messenger_middleware" minOccurs="0" maxOccurs="unbounded" />
|
||||||
<xsd:element name="adapter" type="messenger_adapter" minOccurs="0" maxOccurs="unbounded" />
|
<xsd:element name="adapter" type="messenger_adapter" minOccurs="0" maxOccurs="unbounded" />
|
||||||
</xsd:sequence>
|
</xsd:sequence>
|
||||||
</xsd:complexType>
|
</xsd:complexType>
|
||||||
|
|
||||||
|
<xsd:complexType name="messenger_serializer">
|
||||||
|
<xsd:sequence>
|
||||||
|
<xsd:element name="context" type="metadata" minOccurs="0" maxOccurs="unbounded" />
|
||||||
|
</xsd:sequence>
|
||||||
|
<xsd:attribute name="format" type="xsd:string" />
|
||||||
|
<xsd:attribute name="enabled" type="xsd:boolean" />
|
||||||
|
</xsd:complexType>
|
||||||
|
|
||||||
<xsd:complexType name="messenger_routing">
|
<xsd:complexType name="messenger_routing">
|
||||||
<xsd:choice minOccurs="0" maxOccurs="unbounded">
|
<xsd:choice minOccurs="0" maxOccurs="unbounded">
|
||||||
<xsd:element name="sender" type="messenger_routing_sender" />
|
<xsd:element name="sender" type="messenger_routing_sender" />
|
||||||
|
@ -18,6 +18,7 @@ use Symfony\Component\Config\Definition\Exception\InvalidConfigurationException;
|
|||||||
use Symfony\Component\Config\Definition\Processor;
|
use Symfony\Component\Config\Definition\Processor;
|
||||||
use Symfony\Component\Lock\Store\SemaphoreStore;
|
use Symfony\Component\Lock\Store\SemaphoreStore;
|
||||||
use Symfony\Component\Messenger\MessageBusInterface;
|
use Symfony\Component\Messenger\MessageBusInterface;
|
||||||
|
use Symfony\Component\Serializer\Serializer;
|
||||||
|
|
||||||
class ConfigurationTest extends TestCase
|
class ConfigurationTest extends TestCase
|
||||||
{
|
{
|
||||||
@ -259,6 +260,13 @@ class ConfigurationTest extends TestCase
|
|||||||
),
|
),
|
||||||
),
|
),
|
||||||
'adapters' => array(),
|
'adapters' => array(),
|
||||||
|
'serializer' => array(
|
||||||
|
'enabled' => true,
|
||||||
|
'format' => 'json',
|
||||||
|
'context' => array(),
|
||||||
|
),
|
||||||
|
'encoder' => 'messenger.transport.serializer',
|
||||||
|
'decoder' => 'messenger.transport.serializer',
|
||||||
),
|
),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
<?php
|
<?php
|
||||||
|
|
||||||
$container->loadFromExtension('framework', array(
|
$container->loadFromExtension('framework', array(
|
||||||
|
'serializer' => true,
|
||||||
'messenger' => array(
|
'messenger' => array(
|
||||||
'adapters' => array(
|
'adapters' => array(
|
||||||
'default' => 'amqp://localhost/%2f/messages',
|
'default' => 'amqp://localhost/%2f/messages',
|
||||||
|
@ -0,0 +1,10 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
$container->loadFromExtension('framework', array(
|
||||||
|
'messenger' => array(
|
||||||
|
'serializer' => array(
|
||||||
|
'format' => 'csv',
|
||||||
|
'context' => array('enable_max_depth' => true),
|
||||||
|
),
|
||||||
|
),
|
||||||
|
));
|
@ -0,0 +1,15 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
$container->loadFromExtension('framework', array(
|
||||||
|
'serializer' => array(
|
||||||
|
'enabled' => false,
|
||||||
|
),
|
||||||
|
'messenger' => array(
|
||||||
|
'serializer' => array(
|
||||||
|
'enabled' => true,
|
||||||
|
),
|
||||||
|
'adapters' => array(
|
||||||
|
'default' => 'amqp://localhost/%2f/messages',
|
||||||
|
),
|
||||||
|
),
|
||||||
|
));
|
@ -6,6 +6,7 @@
|
|||||||
http://symfony.com/schema/dic/symfony http://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
|
http://symfony.com/schema/dic/symfony http://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
|
||||||
|
|
||||||
<framework:config>
|
<framework:config>
|
||||||
|
<framework:serializer enabled="true" />
|
||||||
<framework:messenger>
|
<framework:messenger>
|
||||||
<framework:adapter name="default" dsn="amqp://localhost/%2f/messages" />
|
<framework:adapter name="default" dsn="amqp://localhost/%2f/messages" />
|
||||||
<framework:adapter name="customised" dsn="amqp://localhost/%2f/messages?exchange_name=exchange_name">
|
<framework:adapter name="customised" dsn="amqp://localhost/%2f/messages?exchange_name=exchange_name">
|
||||||
|
@ -0,0 +1,17 @@
|
|||||||
|
<?xml version="1.0" encoding="utf-8" ?>
|
||||||
|
<container xmlns="http://symfony.com/schema/dic/services"
|
||||||
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
|
xmlns:framework="http://symfony.com/schema/dic/symfony"
|
||||||
|
xsi:schemaLocation="http://symfony.com/schema/dic/services http://symfony.com/schema/dic/services/services-1.0.xsd
|
||||||
|
http://symfony.com/schema/dic/symfony http://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
|
||||||
|
|
||||||
|
<framework:config>
|
||||||
|
<framework:messenger>
|
||||||
|
<framework:serializer format="csv">
|
||||||
|
<framework:context>
|
||||||
|
<framework:enable_max_depth>true</framework:enable_max_depth>
|
||||||
|
</framework:context>
|
||||||
|
</framework:serializer>
|
||||||
|
</framework:messenger>
|
||||||
|
</framework:config>
|
||||||
|
</container>
|
@ -0,0 +1,15 @@
|
|||||||
|
<?xml version="1.0" encoding="utf-8" ?>
|
||||||
|
<container xmlns="http://symfony.com/schema/dic/services"
|
||||||
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
|
xmlns:framework="http://symfony.com/schema/dic/symfony"
|
||||||
|
xsi:schemaLocation="http://symfony.com/schema/dic/services http://symfony.com/schema/dic/services/services-1.0.xsd
|
||||||
|
http://symfony.com/schema/dic/symfony http://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
|
||||||
|
|
||||||
|
<framework:config>
|
||||||
|
<framework:serializer enabled="false" />
|
||||||
|
<framework:messenger>
|
||||||
|
<framework:serializer enabled="true" />
|
||||||
|
<framework:adapter name="default" dsn="amqp://localhost/%2f/messages" />
|
||||||
|
</framework:messenger>
|
||||||
|
</framework:config>
|
||||||
|
</container>
|
@ -1,4 +1,5 @@
|
|||||||
framework:
|
framework:
|
||||||
|
serializer: true
|
||||||
messenger:
|
messenger:
|
||||||
adapters:
|
adapters:
|
||||||
default: 'amqp://localhost/%2f/messages'
|
default: 'amqp://localhost/%2f/messages'
|
||||||
|
@ -0,0 +1,6 @@
|
|||||||
|
framework:
|
||||||
|
messenger:
|
||||||
|
serializer:
|
||||||
|
format: csv
|
||||||
|
context:
|
||||||
|
enable_max_depth: true
|
@ -0,0 +1,8 @@
|
|||||||
|
framework:
|
||||||
|
serializer:
|
||||||
|
enabled: false
|
||||||
|
messenger:
|
||||||
|
serializer:
|
||||||
|
enabled: true
|
||||||
|
adapters:
|
||||||
|
default: 'amqp://localhost/%2f/messages'
|
@ -569,6 +569,27 @@ abstract class FrameworkExtensionTest extends TestCase
|
|||||||
$this->assertSame(array('queue_name' => 'Queue'), $receiverArguments[1]);
|
$this->assertSame(array('queue_name' => 'Queue'), $receiverArguments[1]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @expectedException \Symfony\Component\DependencyInjection\Exception\LogicException
|
||||||
|
* @expectedExceptionMessage Using the default encoder/decoder, Symfony Messenger requires the Serializer. Enable it or install it by running "composer require symfony/serializer-pack".
|
||||||
|
*/
|
||||||
|
public function testMessengerTransportConfigurationWithoutSerializer()
|
||||||
|
{
|
||||||
|
$this->createContainerFromFile('messenger_transport_no_serializer');
|
||||||
|
}
|
||||||
|
|
||||||
|
public function testMessengerTransportConfiguration()
|
||||||
|
{
|
||||||
|
$container = $this->createContainerFromFile('messenger_transport');
|
||||||
|
|
||||||
|
$this->assertSame('messenger.transport.serializer', (string) $container->getAlias('messenger.transport.encoder'));
|
||||||
|
$this->assertSame('messenger.transport.serializer', (string) $container->getAlias('messenger.transport.decoder'));
|
||||||
|
|
||||||
|
$serializerTransportDefinition = $container->getDefinition('messenger.transport.serializer');
|
||||||
|
$this->assertSame('csv', $serializerTransportDefinition->getArgument(1));
|
||||||
|
$this->assertSame(array('enable_max_depth' => true), $serializerTransportDefinition->getArgument(2));
|
||||||
|
}
|
||||||
|
|
||||||
public function testTranslator()
|
public function testTranslator()
|
||||||
{
|
{
|
||||||
$container = $this->createContainerFromFile('full');
|
$container = $this->createContainerFromFile('full');
|
||||||
|
@ -53,12 +53,6 @@ class MessengerPass implements CompilerPassInterface
|
|||||||
$container->removeDefinition('messenger.middleware.debug.logging');
|
$container->removeDefinition('messenger.middleware.debug.logging');
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!$container->has('serializer')) {
|
|
||||||
$container->removeDefinition('messenger.transport.serialize_message_with_type_in_headers');
|
|
||||||
$container->removeAlias('messenger.transport.default_encoder');
|
|
||||||
$container->removeAlias('messenger.transport.default_decoder');
|
|
||||||
}
|
|
||||||
|
|
||||||
$this->registerReceivers($container);
|
$this->registerReceivers($container);
|
||||||
$this->registerSenders($container);
|
$this->registerSenders($container);
|
||||||
$this->registerHandlers($container);
|
$this->registerHandlers($container);
|
||||||
|
@ -44,4 +44,21 @@ class SerializerTest extends TestCase
|
|||||||
$this->assertArrayHasKey('type', $encoded['headers']);
|
$this->assertArrayHasKey('type', $encoded['headers']);
|
||||||
$this->assertEquals(DummyMessage::class, $encoded['headers']['type']);
|
$this->assertEquals(DummyMessage::class, $encoded['headers']['type']);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public function testUsesTheCustomFormatAndContext()
|
||||||
|
{
|
||||||
|
$message = new DummyMessage('Foo');
|
||||||
|
|
||||||
|
$serializer = $this->getMockBuilder(SerializerComponent\SerializerInterface::class)->getMock();
|
||||||
|
$serializer->expects($this->once())->method('serialize')->with($message, 'csv', array('foo' => 'bar'))->willReturn('Yay');
|
||||||
|
$serializer->expects($this->once())->method('deserialize')->with('Yay', DummyMessage::class, 'csv', array('foo' => 'bar'))->willReturn($message);
|
||||||
|
|
||||||
|
$encoder = new Serializer($serializer, 'csv', array('foo' => 'bar'));
|
||||||
|
|
||||||
|
$encoded = $encoder->encode($message);
|
||||||
|
$decoded = $encoder->decode($encoded);
|
||||||
|
|
||||||
|
$this->assertSame('Yay', $encoded['body']);
|
||||||
|
$this->assertSame($message, $decoded);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -20,11 +20,13 @@ class Serializer implements DecoderInterface, EncoderInterface
|
|||||||
{
|
{
|
||||||
private $serializer;
|
private $serializer;
|
||||||
private $format;
|
private $format;
|
||||||
|
private $context;
|
||||||
|
|
||||||
public function __construct(SerializerInterface $serializer, string $format = 'json')
|
public function __construct(SerializerInterface $serializer, string $format = 'json', array $context = array())
|
||||||
{
|
{
|
||||||
$this->serializer = $serializer;
|
$this->serializer = $serializer;
|
||||||
$this->format = $format;
|
$this->format = $format;
|
||||||
|
$this->context = $context;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -40,7 +42,7 @@ class Serializer implements DecoderInterface, EncoderInterface
|
|||||||
throw new \InvalidArgumentException('Encoded message does not have a `type` header.');
|
throw new \InvalidArgumentException('Encoded message does not have a `type` header.');
|
||||||
}
|
}
|
||||||
|
|
||||||
return $this->serializer->deserialize($encodedMessage['body'], $encodedMessage['headers']['type'], $this->format);
|
return $this->serializer->deserialize($encodedMessage['body'], $encodedMessage['headers']['type'], $this->format, $this->context);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -49,7 +51,7 @@ class Serializer implements DecoderInterface, EncoderInterface
|
|||||||
public function encode($message): array
|
public function encode($message): array
|
||||||
{
|
{
|
||||||
return array(
|
return array(
|
||||||
'body' => $this->serializer->serialize($message, $this->format),
|
'body' => $this->serializer->serialize($message, $this->format, $this->context),
|
||||||
'headers' => array('type' => \get_class($message)),
|
'headers' => array('type' => \get_class($message)),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user