feature #28405 [Messenger] Uses a messenger serializer, not an individual encoder/decoder (sroze)
This PR was merged into the 4.2-dev branch.
Discussion
----------
[Messenger] Uses a messenger serializer, not an individual encoder/decoder
| Q | A
| ------------- | ---
| Branch? | master
| Bug fix? | no
| New feature? | no
| BC breaks? | yes
| Deprecations? | no
| Tests pass? | yes
| Fixed tickets | ø
| License | MIT
| Doc PR | Will come
Makes the component-based even simpler.
**Before**
```php
$encoderDecoder = Serializer::create();
$middleware = [new SendMessageMiddleware(new SenderLocator([
Message::class => new AmqpTransport($encoderDecoder, $encoderDecoder, $connection),
]))];
```
**After**
```php
$middleware = [new SendMessageMiddleware(new SenderLocator([
Message::class => new AmqpTransport(Serializer::create(), $connection),
]))];
```
Commits
-------
5b93f5f45e
Uses a messenger serializer, not an individual encoder/decoder
This commit is contained in:
commit
0cd1da66d7
@ -85,6 +85,23 @@ FrameworkBundle
|
||||
and will allow dumping optimized routers and using Unicode classes in requirements.
|
||||
* Added support for the SameSite attribute for session cookies. It is highly recommended to set this setting (`framework.session.cookie_samesite`) to `lax` for increased security against CSRF attacks.
|
||||
* The `Controller` class has been deprecated, use `AbstractController` instead.
|
||||
* The Messenger encoder/decoder configuration has been changed for a unified Messenger serializer configuration.
|
||||
|
||||
Before:
|
||||
```yaml
|
||||
framework:
|
||||
messenger:
|
||||
encoder: your_encoder_service_id
|
||||
decoder: your_decoder_service_id
|
||||
```
|
||||
|
||||
After:
|
||||
```yaml
|
||||
framework:
|
||||
messenger:
|
||||
serializer:
|
||||
id: your_messenger_service_id
|
||||
```
|
||||
|
||||
Messenger
|
||||
---------
|
||||
@ -122,6 +139,9 @@ Messenger
|
||||
'priority' => 20,
|
||||
];
|
||||
```
|
||||
* The `EncoderInterface` and `DecoderInterface` interfaces have been replaced by a unified `Symfony\Component\Messenger\Transport\Serialization\SerializerInterface`.
|
||||
Each interface method have been merged untouched into the `Serializer` interface, so you can simply merge your two implementations together and implement the new interface.
|
||||
|
||||
|
||||
Security
|
||||
--------
|
||||
|
@ -11,7 +11,8 @@ CHANGELOG
|
||||
* Deprecated the `Symfony\Bundle\FrameworkBundle\Controller\Controller` class in favor of `Symfony\Bundle\FrameworkBundle\Controller\AbstractController`.
|
||||
* Enabled autoconfiguration for `Psr\Log\LoggerAwareInterface`
|
||||
* Added new "auto" mode for `framework.session.cookie_secure` to turn it on when HTTPS is used
|
||||
|
||||
* Removed the `framework.messenger.encoder` and `framework.messenger.decoder` options. Use the `framework.messenger.serializer.id` option to replace the Messenger serializer.
|
||||
|
||||
4.1.0
|
||||
-----
|
||||
|
||||
|
@ -1030,9 +1030,23 @@ class Configuration implements ConfigurationInterface
|
||||
->end()
|
||||
->end()
|
||||
->arrayNode('serializer')
|
||||
->{!class_exists(FullStack::class) && class_exists(Serializer::class) ? 'canBeDisabled' : 'canBeEnabled'}()
|
||||
->addDefaultsIfNotSet()
|
||||
->beforeNormalization()
|
||||
->always()
|
||||
->then(function ($config) {
|
||||
if (false === $config) {
|
||||
return array('id' => null);
|
||||
}
|
||||
|
||||
if (\is_string($config)) {
|
||||
return array('id' => $config);
|
||||
}
|
||||
|
||||
return $config;
|
||||
})
|
||||
->end()
|
||||
->children()
|
||||
->scalarNode('id')->defaultValue('messenger.transport.symfony_serializer')->end()
|
||||
->scalarNode('format')->defaultValue('json')->end()
|
||||
->arrayNode('context')
|
||||
->normalizeKeys(false)
|
||||
@ -1042,8 +1056,6 @@ class Configuration implements ConfigurationInterface
|
||||
->end()
|
||||
->end()
|
||||
->end()
|
||||
->scalarNode('encoder')->defaultValue('messenger.transport.serializer')->end()
|
||||
->scalarNode('decoder')->defaultValue('messenger.transport.serializer')->end()
|
||||
->arrayNode('transports')
|
||||
->useAttributeAsKey('name')
|
||||
->arrayPrototype()
|
||||
|
@ -1490,24 +1490,27 @@ class FrameworkExtension extends Extension
|
||||
|
||||
$loader->load('messenger.xml');
|
||||
|
||||
if ($this->isConfigEnabled($container, $config['serializer'])) {
|
||||
if (!$this->isConfigEnabled($container, $serializerConfig)) {
|
||||
throw new LogicException('The default Messenger serializer cannot be enabled as the Serializer support is not available. Try enable it or install it by running "composer require symfony/serializer-pack".');
|
||||
if (empty($config['transports'])) {
|
||||
$container->removeDefinition('messenger.transport.symfony_serializer');
|
||||
$container->removeDefinition('messenger.transport.amqp.factory');
|
||||
} else {
|
||||
if ('messenger.transport.symfony_serializer' === $config['serializer']['id']) {
|
||||
if (!$this->isConfigEnabled($container, $serializerConfig)) {
|
||||
throw new LogicException('The default Messenger serializer cannot be enabled as the Serializer support is not available. Try enable it or install it by running "composer require symfony/serializer-pack".');
|
||||
}
|
||||
|
||||
$container->getDefinition('messenger.transport.symfony_serializer')
|
||||
->replaceArgument(1, $config['serializer']['format'])
|
||||
->replaceArgument(2, $config['serializer']['context']);
|
||||
}
|
||||
|
||||
$container->getDefinition('messenger.transport.serializer')
|
||||
->replaceArgument(1, $config['serializer']['format'])
|
||||
->replaceArgument(2, $config['serializer']['context']);
|
||||
} else {
|
||||
$container->removeDefinition('messenger.transport.serializer');
|
||||
if ('messenger.transport.serializer' === $config['encoder'] || 'messenger.transport.serializer' === $config['decoder']) {
|
||||
if ($config['serializer']['id']) {
|
||||
$container->setAlias('messenger.transport.serializer', $config['serializer']['id']);
|
||||
} else {
|
||||
$container->removeDefinition('messenger.transport.amqp.factory');
|
||||
}
|
||||
}
|
||||
|
||||
$container->setAlias('messenger.transport.encoder', $config['encoder']);
|
||||
$container->setAlias('messenger.transport.decoder', $config['decoder']);
|
||||
|
||||
if (null === $config['default_bus']) {
|
||||
if (\count($config['buses']) > 1) {
|
||||
throw new LogicException(sprintf('You need to define a default bus with the "default_bus" configuration. Possible values: %s', implode(', ', array_keys($config['buses']))));
|
||||
|
@ -18,7 +18,7 @@
|
||||
</service>
|
||||
|
||||
<!-- Message encoding/decoding -->
|
||||
<service id="messenger.transport.serializer" class="Symfony\Component\Messenger\Transport\Serialization\Serializer">
|
||||
<service id="messenger.transport.symfony_serializer" class="Symfony\Component\Messenger\Transport\Serialization\Serializer">
|
||||
<argument type="service" id="serializer" />
|
||||
<argument /> <!-- Format -->
|
||||
<argument type="collection" /> <!-- Context -->
|
||||
@ -61,8 +61,7 @@
|
||||
</service>
|
||||
|
||||
<service id="messenger.transport.amqp.factory" class="Symfony\Component\Messenger\Transport\AmqpExt\AmqpTransportFactory">
|
||||
<argument type="service" id="messenger.transport.encoder" />
|
||||
<argument type="service" id="messenger.transport.decoder" />
|
||||
<argument type="service" id="messenger.transport.serializer" />
|
||||
<argument>%kernel.debug%</argument>
|
||||
|
||||
<tag name="messenger.transport_factory" />
|
||||
|
@ -378,7 +378,7 @@
|
||||
<xsd:element name="context" type="metadata" minOccurs="0" maxOccurs="unbounded" />
|
||||
</xsd:sequence>
|
||||
<xsd:attribute name="format" type="xsd:string" />
|
||||
<xsd:attribute name="enabled" type="xsd:boolean" />
|
||||
<xsd:attribute name="id" type="xsd:string" />
|
||||
</xsd:complexType>
|
||||
|
||||
<xsd:complexType name="messenger_routing">
|
||||
|
@ -293,12 +293,10 @@ class ConfigurationTest extends TestCase
|
||||
'routing' => array(),
|
||||
'transports' => array(),
|
||||
'serializer' => array(
|
||||
'enabled' => !class_exists(FullStack::class),
|
||||
'id' => 'messenger.transport.symfony_serializer',
|
||||
'format' => 'json',
|
||||
'context' => array(),
|
||||
),
|
||||
'encoder' => 'messenger.transport.serializer',
|
||||
'decoder' => 'messenger.transport.serializer',
|
||||
'default_bus' => null,
|
||||
'buses' => array('messenger.bus.default' => array('default_middleware' => true, 'middleware' => array())),
|
||||
),
|
||||
|
@ -2,9 +2,7 @@
|
||||
|
||||
$container->loadFromExtension('framework', array(
|
||||
'messenger' => array(
|
||||
'serializer' => array(
|
||||
'enabled' => false,
|
||||
),
|
||||
'serializer' => false,
|
||||
'transports' => array(
|
||||
'default' => 'amqp://localhost/%2f/messages',
|
||||
),
|
||||
|
@ -3,7 +3,6 @@
|
||||
$container->loadFromExtension('framework', array(
|
||||
'serializer' => true,
|
||||
'messenger' => array(
|
||||
'serializer' => true,
|
||||
'routing' => array(
|
||||
'Symfony\Component\Messenger\Tests\Fixtures\DummyMessage' => array('amqp', 'audit'),
|
||||
'Symfony\Component\Messenger\Tests\Fixtures\SecondMessage' => array(
|
||||
|
@ -7,5 +7,8 @@ $container->loadFromExtension('framework', array(
|
||||
'format' => 'csv',
|
||||
'context' => array('enable_max_depth' => true),
|
||||
),
|
||||
'transports' => array(
|
||||
'default' => 'amqp://localhost/%2f/messages',
|
||||
),
|
||||
),
|
||||
));
|
||||
|
@ -5,9 +5,6 @@ $container->loadFromExtension('framework', array(
|
||||
'enabled' => false,
|
||||
),
|
||||
'messenger' => array(
|
||||
'serializer' => array(
|
||||
'enabled' => true,
|
||||
),
|
||||
'transports' => array(
|
||||
'default' => 'amqp://localhost/%2f/messages',
|
||||
),
|
||||
|
@ -3,7 +3,6 @@
|
||||
$container->loadFromExtension('framework', array(
|
||||
'serializer' => true,
|
||||
'messenger' => array(
|
||||
'serializer' => true,
|
||||
'transports' => array(
|
||||
'default' => 'amqp://localhost/%2f/messages',
|
||||
'customised' => array(
|
||||
|
@ -4,7 +4,4 @@ $container->loadFromExtension('framework', array(
|
||||
'serializer' => array(
|
||||
'enabled' => false,
|
||||
),
|
||||
'messenger' => array(
|
||||
'serializer' => false,
|
||||
),
|
||||
));
|
||||
|
@ -7,7 +7,6 @@
|
||||
|
||||
<framework:config>
|
||||
<framework:messenger>
|
||||
<framework:serializer enabled="false" />
|
||||
<framework:routing message-class="Symfony\Bundle\FrameworkBundle\Tests\Fixtures\Messenger\FooMessage">
|
||||
<framework:sender service="sender.bar" />
|
||||
<framework:sender service="sender.biz" />
|
||||
|
@ -7,7 +7,7 @@
|
||||
|
||||
<framework:config>
|
||||
<framework:messenger>
|
||||
<framework:serializer enabled="false" />
|
||||
<framework:serializer id="null" />
|
||||
<framework:transport name="default" dsn="amqp://localhost/%2f/messages" />
|
||||
</framework:messenger>
|
||||
</framework:config>
|
||||
|
@ -8,7 +8,6 @@
|
||||
<framework:config>
|
||||
<framework:serializer enabled="true" />
|
||||
<framework:messenger>
|
||||
<framework:serializer enabled="true" />
|
||||
<framework:routing message-class="Symfony\Component\Messenger\Tests\Fixtures\DummyMessage">
|
||||
<framework:sender service="amqp" />
|
||||
<framework:sender service="audit" />
|
||||
|
@ -13,6 +13,7 @@
|
||||
<framework:enable_max_depth>true</framework:enable_max_depth>
|
||||
</framework:context>
|
||||
</framework:serializer>
|
||||
<framework:transport name="default" dsn="amqp://localhost/%2f/messages" />
|
||||
</framework:messenger>
|
||||
</framework:config>
|
||||
</container>
|
||||
|
@ -8,7 +8,6 @@
|
||||
<framework:config>
|
||||
<framework:serializer enabled="false" />
|
||||
<framework:messenger>
|
||||
<framework:serializer enabled="true" />
|
||||
<framework:transport name="default" dsn="amqp://localhost/%2f/messages" />
|
||||
</framework:messenger>
|
||||
</framework:config>
|
||||
|
@ -8,7 +8,6 @@
|
||||
<framework:config>
|
||||
<framework:serializer enabled="true" />
|
||||
<framework:messenger>
|
||||
<framework:serializer enabled="true" />
|
||||
<framework:transport name="default" dsn="amqp://localhost/%2f/messages" />
|
||||
<framework:transport name="customised" dsn="amqp://localhost/%2f/messages?exchange_name=exchange_name">
|
||||
<framework:options>
|
||||
|
@ -7,8 +7,5 @@
|
||||
|
||||
<framework:config>
|
||||
<framework:serializer enabled="false" />
|
||||
<framework:messenger>
|
||||
<framework:serializer enabled="false" />
|
||||
</framework:messenger>
|
||||
</framework:config>
|
||||
</container>
|
||||
|
@ -1,7 +1,6 @@
|
||||
framework:
|
||||
serializer: true
|
||||
messenger:
|
||||
serializer: true
|
||||
routing:
|
||||
'Symfony\Component\Messenger\Tests\Fixtures\DummyMessage': [amqp, audit]
|
||||
'Symfony\Component\Messenger\Tests\Fixtures\SecondMessage':
|
||||
|
@ -5,3 +5,5 @@ framework:
|
||||
format: csv
|
||||
context:
|
||||
enable_max_depth: true
|
||||
transports:
|
||||
default: 'amqp://localhost/%2f/messages'
|
||||
|
@ -2,7 +2,5 @@ framework:
|
||||
serializer:
|
||||
enabled: false
|
||||
messenger:
|
||||
serializer:
|
||||
enabled: true
|
||||
transports:
|
||||
default: 'amqp://localhost/%2f/messages'
|
||||
|
@ -1,7 +1,6 @@
|
||||
framework:
|
||||
serializer: true
|
||||
messenger:
|
||||
serializer: true
|
||||
transports:
|
||||
default: 'amqp://localhost/%2f/messages'
|
||||
customised:
|
||||
|
@ -1,5 +1,3 @@
|
||||
framework:
|
||||
serializer:
|
||||
enabled: false
|
||||
messenger:
|
||||
serializer: false
|
||||
|
@ -604,10 +604,9 @@ abstract class FrameworkExtensionTest extends TestCase
|
||||
{
|
||||
$container = $this->createContainerFromFile('messenger_transport');
|
||||
|
||||
$this->assertSame('messenger.transport.serializer', (string) $container->getAlias('messenger.transport.encoder'));
|
||||
$this->assertSame('messenger.transport.serializer', (string) $container->getAlias('messenger.transport.decoder'));
|
||||
$this->assertSame('messenger.transport.symfony_serializer', (string) $container->getAlias('messenger.transport.serializer'));
|
||||
|
||||
$serializerTransportDefinition = $container->getDefinition('messenger.transport.serializer');
|
||||
$serializerTransportDefinition = $container->getDefinition('messenger.transport.symfony_serializer');
|
||||
$this->assertSame('csv', $serializerTransportDefinition->getArgument(1));
|
||||
$this->assertSame(array('enable_max_depth' => true), $serializerTransportDefinition->getArgument(2));
|
||||
}
|
||||
|
@ -13,3 +13,4 @@ CHANGELOG
|
||||
* `EnvelopeItemInterface` doesn't extend `Serializable` anymore
|
||||
* [BC BREAK] The `ConsumeMessagesCommand` class now takes an instance of `Psr\Container\ContainerInterface`
|
||||
as first constructor argument
|
||||
* [BC BREAK] The `EncoderInterface` and `DecoderInterface` have been replaced by a unified `Symfony\Component\Messenger\Transport\Serialization\SerializerInterface`.
|
||||
|
@ -16,7 +16,7 @@ use Symfony\Component\Messenger\Envelope;
|
||||
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
||||
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpSender;
|
||||
use Symfony\Component\Messenger\Transport\AmqpExt\Connection;
|
||||
use Symfony\Component\Messenger\Transport\Serialization\EncoderInterface;
|
||||
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
|
||||
|
||||
/**
|
||||
* @requires extension amqp
|
||||
@ -28,13 +28,13 @@ class AmqpSenderTest extends TestCase
|
||||
$envelope = Envelope::wrap(new DummyMessage('Oy'));
|
||||
$encoded = array('body' => '...', 'headers' => array('type' => DummyMessage::class));
|
||||
|
||||
$encoder = $this->getMockBuilder(EncoderInterface::class)->getMock();
|
||||
$encoder->method('encode')->with($envelope)->willReturnOnConsecutiveCalls($encoded);
|
||||
$serializer = $this->getMockBuilder(SerializerInterface::class)->getMock();
|
||||
$serializer->method('encode')->with($envelope)->willReturnOnConsecutiveCalls($encoded);
|
||||
|
||||
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
|
||||
$connection->expects($this->once())->method('publish')->with($encoded['body'], $encoded['headers']);
|
||||
|
||||
$sender = new AmqpSender($encoder, $connection);
|
||||
$sender = new AmqpSender($serializer, $connection);
|
||||
$sender->send($envelope);
|
||||
}
|
||||
}
|
||||
|
@ -15,16 +15,14 @@ use PHPUnit\Framework\TestCase;
|
||||
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpTransport;
|
||||
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpTransportFactory;
|
||||
use Symfony\Component\Messenger\Transport\AmqpExt\Connection;
|
||||
use Symfony\Component\Messenger\Transport\Serialization\DecoderInterface;
|
||||
use Symfony\Component\Messenger\Transport\Serialization\EncoderInterface;
|
||||
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
|
||||
|
||||
class AmqpTransportFactoryTest extends TestCase
|
||||
{
|
||||
public function testSupportsOnlyAmqpTransports()
|
||||
{
|
||||
$factory = new AmqpTransportFactory(
|
||||
$this->getMockBuilder(EncoderInterface::class)->getMock(),
|
||||
$this->getMockBuilder(DecoderInterface::class)->getMock(),
|
||||
$this->getMockBuilder(SerializerInterface::class)->getMock(),
|
||||
true
|
||||
);
|
||||
|
||||
@ -36,12 +34,11 @@ class AmqpTransportFactoryTest extends TestCase
|
||||
public function testItCreatesTheTransport()
|
||||
{
|
||||
$factory = new AmqpTransportFactory(
|
||||
$encoder = $this->getMockBuilder(EncoderInterface::class)->getMock(),
|
||||
$decoder = $this->getMockBuilder(DecoderInterface::class)->getMock(),
|
||||
$serializer = $this->getMockBuilder(SerializerInterface::class)->getMock(),
|
||||
true
|
||||
);
|
||||
|
||||
$expectedTransport = new AmqpTransport($encoder, $decoder, Connection::fromDsn('amqp://localhost', array('foo' => 'bar'), true), array('foo' => 'bar'), true);
|
||||
$expectedTransport = new AmqpTransport($serializer, Connection::fromDsn('amqp://localhost', array('foo' => 'bar'), true));
|
||||
|
||||
$this->assertEquals($expectedTransport, $factory->createTransport('amqp://localhost', array('foo' => 'bar')));
|
||||
}
|
||||
|
@ -16,8 +16,7 @@ use Symfony\Component\Messenger\Envelope;
|
||||
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
||||
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpTransport;
|
||||
use Symfony\Component\Messenger\Transport\AmqpExt\Connection;
|
||||
use Symfony\Component\Messenger\Transport\Serialization\DecoderInterface;
|
||||
use Symfony\Component\Messenger\Transport\Serialization\EncoderInterface;
|
||||
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
|
||||
use Symfony\Component\Messenger\Transport\TransportInterface;
|
||||
|
||||
/**
|
||||
@ -35,8 +34,7 @@ class AmqpTransportTest extends TestCase
|
||||
public function testReceivesMessages()
|
||||
{
|
||||
$transport = $this->getTransport(
|
||||
null,
|
||||
$decoder = $this->getMockBuilder(DecoderInterface::class)->getMock(),
|
||||
$serializer = $this->getMockBuilder(SerializerInterface::class)->getMock(),
|
||||
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock()
|
||||
);
|
||||
|
||||
@ -46,7 +44,7 @@ class AmqpTransportTest extends TestCase
|
||||
$amqpEnvelope->method('getBody')->willReturn('body');
|
||||
$amqpEnvelope->method('getHeaders')->willReturn(array('my' => 'header'));
|
||||
|
||||
$decoder->method('decode')->with(array('body' => 'body', 'headers' => array('my' => 'header')))->willReturn(Envelope::wrap($decodedMessage));
|
||||
$serializer->method('decode')->with(array('body' => 'body', 'headers' => array('my' => 'header')))->willReturn(Envelope::wrap($decodedMessage));
|
||||
$connection->method('get')->willReturn($amqpEnvelope);
|
||||
|
||||
$transport->receive(function (Envelope $envelope) use ($transport, $decodedMessage) {
|
||||
@ -56,12 +54,11 @@ class AmqpTransportTest extends TestCase
|
||||
});
|
||||
}
|
||||
|
||||
private function getTransport(EncoderInterface $encoder = null, DecoderInterface $decoder = null, Connection $connection = null)
|
||||
private function getTransport(SerializerInterface $serializer = null, Connection $connection = null)
|
||||
{
|
||||
$encoder = $encoder ?: $this->getMockBuilder(EncoderInterface::class)->getMock();
|
||||
$decoder = $decoder ?: $this->getMockBuilder(DecoderInterface::class)->getMock();
|
||||
$serializer = $serializer ?: $this->getMockBuilder(SerializerInterface::class)->getMock();
|
||||
$connection = $connection ?: $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
|
||||
|
||||
return new AmqpTransport($encoder, $decoder, $connection);
|
||||
return new AmqpTransport($serializer, $connection);
|
||||
}
|
||||
}
|
||||
|
@ -13,7 +13,7 @@ namespace Symfony\Component\Messenger\Transport\AmqpExt;
|
||||
|
||||
use Symfony\Component\Messenger\Transport\AmqpExt\Exception\RejectMessageExceptionInterface;
|
||||
use Symfony\Component\Messenger\Transport\ReceiverInterface;
|
||||
use Symfony\Component\Messenger\Transport\Serialization\DecoderInterface;
|
||||
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
|
||||
|
||||
/**
|
||||
* Symfony Messenger receiver to get messages from AMQP brokers using PHP's AMQP extension.
|
||||
@ -22,13 +22,13 @@ use Symfony\Component\Messenger\Transport\Serialization\DecoderInterface;
|
||||
*/
|
||||
class AmqpReceiver implements ReceiverInterface
|
||||
{
|
||||
private $decoder;
|
||||
private $serializer;
|
||||
private $connection;
|
||||
private $shouldStop;
|
||||
|
||||
public function __construct(DecoderInterface $decoder, Connection $connection)
|
||||
public function __construct(SerializerInterface $serializer, Connection $connection)
|
||||
{
|
||||
$this->decoder = $decoder;
|
||||
$this->serializer = $serializer;
|
||||
$this->connection = $connection;
|
||||
}
|
||||
|
||||
@ -51,7 +51,7 @@ class AmqpReceiver implements ReceiverInterface
|
||||
}
|
||||
|
||||
try {
|
||||
$handler($this->decoder->decode(array(
|
||||
$handler($this->serializer->decode(array(
|
||||
'body' => $AMQPEnvelope->getBody(),
|
||||
'headers' => $AMQPEnvelope->getHeaders(),
|
||||
)));
|
||||
|
@ -13,7 +13,7 @@ namespace Symfony\Component\Messenger\Transport\AmqpExt;
|
||||
|
||||
use Symfony\Component\Messenger\Envelope;
|
||||
use Symfony\Component\Messenger\Transport\SenderInterface;
|
||||
use Symfony\Component\Messenger\Transport\Serialization\EncoderInterface;
|
||||
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
|
||||
|
||||
/**
|
||||
* Symfony Messenger sender to send messages to AMQP brokers using PHP's AMQP extension.
|
||||
@ -22,12 +22,12 @@ use Symfony\Component\Messenger\Transport\Serialization\EncoderInterface;
|
||||
*/
|
||||
class AmqpSender implements SenderInterface
|
||||
{
|
||||
private $encoder;
|
||||
private $serializer;
|
||||
private $connection;
|
||||
|
||||
public function __construct(EncoderInterface $encoder, Connection $connection)
|
||||
public function __construct(SerializerInterface $serializer, Connection $connection)
|
||||
{
|
||||
$this->encoder = $encoder;
|
||||
$this->serializer = $serializer;
|
||||
$this->connection = $connection;
|
||||
}
|
||||
|
||||
@ -36,7 +36,7 @@ class AmqpSender implements SenderInterface
|
||||
*/
|
||||
public function send(Envelope $envelope)
|
||||
{
|
||||
$encodedMessage = $this->encoder->encode($envelope);
|
||||
$encodedMessage = $this->serializer->encode($envelope);
|
||||
|
||||
$this->connection->publish($encodedMessage['body'], $encodedMessage['headers']);
|
||||
}
|
||||
|
@ -12,8 +12,7 @@
|
||||
namespace Symfony\Component\Messenger\Transport\AmqpExt;
|
||||
|
||||
use Symfony\Component\Messenger\Envelope;
|
||||
use Symfony\Component\Messenger\Transport\Serialization\DecoderInterface;
|
||||
use Symfony\Component\Messenger\Transport\Serialization\EncoderInterface;
|
||||
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
|
||||
use Symfony\Component\Messenger\Transport\TransportInterface;
|
||||
|
||||
/**
|
||||
@ -21,16 +20,14 @@ use Symfony\Component\Messenger\Transport\TransportInterface;
|
||||
*/
|
||||
class AmqpTransport implements TransportInterface
|
||||
{
|
||||
private $encoder;
|
||||
private $decoder;
|
||||
private $serializer;
|
||||
private $connection;
|
||||
private $receiver;
|
||||
private $sender;
|
||||
|
||||
public function __construct(EncoderInterface $encoder, DecoderInterface $decoder, Connection $connection)
|
||||
public function __construct(SerializerInterface $serializer, Connection $connection)
|
||||
{
|
||||
$this->encoder = $encoder;
|
||||
$this->decoder = $decoder;
|
||||
$this->serializer = $serializer;
|
||||
$this->connection = $connection;
|
||||
}
|
||||
|
||||
@ -60,11 +57,11 @@ class AmqpTransport implements TransportInterface
|
||||
|
||||
private function getReceiver()
|
||||
{
|
||||
return $this->receiver = new AmqpReceiver($this->decoder, $this->connection);
|
||||
return $this->receiver = new AmqpReceiver($this->serializer, $this->connection);
|
||||
}
|
||||
|
||||
private function getSender()
|
||||
{
|
||||
return $this->sender = new AmqpSender($this->encoder, $this->connection);
|
||||
return $this->sender = new AmqpSender($this->serializer, $this->connection);
|
||||
}
|
||||
}
|
||||
|
@ -11,8 +11,7 @@
|
||||
|
||||
namespace Symfony\Component\Messenger\Transport\AmqpExt;
|
||||
|
||||
use Symfony\Component\Messenger\Transport\Serialization\DecoderInterface;
|
||||
use Symfony\Component\Messenger\Transport\Serialization\EncoderInterface;
|
||||
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
|
||||
use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
|
||||
use Symfony\Component\Messenger\Transport\TransportInterface;
|
||||
|
||||
@ -21,20 +20,18 @@ use Symfony\Component\Messenger\Transport\TransportInterface;
|
||||
*/
|
||||
class AmqpTransportFactory implements TransportFactoryInterface
|
||||
{
|
||||
private $encoder;
|
||||
private $decoder;
|
||||
private $serializer;
|
||||
private $debug;
|
||||
|
||||
public function __construct(EncoderInterface $encoder, DecoderInterface $decoder, bool $debug)
|
||||
public function __construct(SerializerInterface $serializer, bool $debug)
|
||||
{
|
||||
$this->encoder = $encoder;
|
||||
$this->decoder = $decoder;
|
||||
$this->serializer = $serializer;
|
||||
$this->debug = $debug;
|
||||
}
|
||||
|
||||
public function createTransport(string $dsn, array $options): TransportInterface
|
||||
{
|
||||
return new AmqpTransport($this->encoder, $this->decoder, Connection::fromDsn($dsn, $options, $this->debug));
|
||||
return new AmqpTransport($this->serializer, Connection::fromDsn($dsn, $options, $this->debug));
|
||||
}
|
||||
|
||||
public function supports(string $dsn, array $options): bool
|
||||
|
@ -1,32 +0,0 @@
|
||||
<?php
|
||||
|
||||
/*
|
||||
* This file is part of the Symfony package.
|
||||
*
|
||||
* (c) Fabien Potencier <fabien@symfony.com>
|
||||
*
|
||||
* For the full copyright and license information, please view the LICENSE
|
||||
* file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
namespace Symfony\Component\Messenger\Transport\Serialization;
|
||||
|
||||
use Symfony\Component\Messenger\Envelope;
|
||||
|
||||
/**
|
||||
* @author Samuel Roze <samuel.roze@gmail.com>
|
||||
*/
|
||||
interface EncoderInterface
|
||||
{
|
||||
/**
|
||||
* Encodes an envelope content (message & items) to a common format understandable by transports.
|
||||
* The encoded array should only contain scalar and arrays.
|
||||
*
|
||||
* The most common keys of the encoded array are:
|
||||
* - `body` (string) - the message body
|
||||
* - `headers` (string<string>) - a key/value pair of headers
|
||||
*
|
||||
* @param Envelope $envelope The envelop containing the message put on the MessageBus by the user
|
||||
*/
|
||||
public function encode(Envelope $envelope): array;
|
||||
}
|
@ -17,18 +17,18 @@ use Symfony\Component\Serializer\Encoder\JsonEncoder;
|
||||
use Symfony\Component\Serializer\Encoder\XmlEncoder;
|
||||
use Symfony\Component\Serializer\Normalizer\ObjectNormalizer;
|
||||
use Symfony\Component\Serializer\Serializer as SymfonySerializer;
|
||||
use Symfony\Component\Serializer\SerializerInterface;
|
||||
use Symfony\Component\Serializer\SerializerInterface as SymfonySerializerInterface;
|
||||
|
||||
/**
|
||||
* @author Samuel Roze <samuel.roze@gmail.com>
|
||||
*/
|
||||
class Serializer implements DecoderInterface, EncoderInterface
|
||||
class Serializer implements SerializerInterface
|
||||
{
|
||||
private $serializer;
|
||||
private $format;
|
||||
private $context;
|
||||
|
||||
public function __construct(SerializerInterface $serializer, string $format = 'json', array $context = array())
|
||||
public function __construct(SymfonySerializerInterface $serializer, string $format = 'json', array $context = array())
|
||||
{
|
||||
$this->serializer = $serializer;
|
||||
$this->format = $format;
|
||||
|
@ -16,7 +16,7 @@ use Symfony\Component\Messenger\Envelope;
|
||||
/**
|
||||
* @author Samuel Roze <samuel.roze@gmail.com>
|
||||
*/
|
||||
interface DecoderInterface
|
||||
interface SerializerInterface
|
||||
{
|
||||
/**
|
||||
* Decodes an envelope and its message from an encoded-form.
|
||||
@ -27,8 +27,16 @@ interface DecoderInterface
|
||||
* The most common keys are:
|
||||
* - `body` (string) - the message body
|
||||
* - `headers` (string<string>) - a key/value pair of headers
|
||||
*
|
||||
* @return Envelope
|
||||
*/
|
||||
public function decode(array $encodedEnvelope): Envelope;
|
||||
|
||||
/**
|
||||
* Encodes an envelope content (message & items) to a common format understandable by transports.
|
||||
* The encoded array should only contain scalars and arrays.
|
||||
*
|
||||
* The most common keys of the encoded array are:
|
||||
* - `body` (string) - the message body
|
||||
* - `headers` (string<string>) - a key/value pair of headers
|
||||
*/
|
||||
public function encode(Envelope $envelope): array;
|
||||
}
|
Reference in New Issue
Block a user