feature #34069 [Messenger] Removing "sync" transport and replacing it with config trick (weaverryan)
This PR was merged into the 4.4 branch.
Discussion
----------
[Messenger] Removing "sync" transport and replacing it with config trick
| Q | A
| ------------- | ---
| Branch? | 4.4
| Bug fix? | no (but removes some hackiness)
| New feature? | no
| Deprecations? | yes (BC break yes)
| Tickets | Helps #34066
| License | MIT
| Doc PR | not needed
I introduced `SyncTransport` in 4.3 as a way to allow people to, for example, route a message to an `async` transport and then override that to a `sync` transport in the dev environment. The implementation was always a bit of a hack, and makes this one transport behave different than others *and* makes messages routed to this transport behave a *bit* different than sync messages that are simply not routed at all.
Thanks for an idea from Nicolas, this fixes that. Gone are `SyncTransport`. Instead, the `sync://` transport string is still supported, but it's just a "config" trick: messages routed to transports that are "sync" are simply filtered out of the "senders" in the DI extension. Nice & clean.
Commits
-------
3d4e59a10b
[Messenger] Removing "sync" transport and replacing it with much nicer config trick
This commit is contained in:
commit
90906c5062
@ -1774,18 +1774,27 @@ class FrameworkExtension extends Extension
|
|||||||
$container->setAlias('messenger.default_serializer', $config['serializer']['default_serializer']);
|
$container->setAlias('messenger.default_serializer', $config['serializer']['default_serializer']);
|
||||||
}
|
}
|
||||||
|
|
||||||
$senderAliases = [];
|
$senderReferences = [];
|
||||||
|
$syncTransports = [];
|
||||||
$transportRetryReferences = [];
|
$transportRetryReferences = [];
|
||||||
foreach ($config['transports'] as $name => $transport) {
|
foreach ($config['transports'] as $name => $transport) {
|
||||||
$serializerId = $transport['serializer'] ?? 'messenger.default_serializer';
|
$serializerId = $transport['serializer'] ?? 'messenger.default_serializer';
|
||||||
|
|
||||||
$transportDefinition = (new Definition(TransportInterface::class))
|
if (0 === strpos($transport['dsn'], 'sync://')) {
|
||||||
->setFactory([new Reference('messenger.transport_factory'), 'createTransport'])
|
$syncTransports[] = $name;
|
||||||
->setArguments([$transport['dsn'], $transport['options'] + ['transport_name' => $name], new Reference($serializerId)])
|
} else {
|
||||||
->addTag('messenger.receiver', ['alias' => $name])
|
$transportDefinition = (new Definition(TransportInterface::class))
|
||||||
;
|
->setFactory([new Reference('messenger.transport_factory'), 'createTransport'])
|
||||||
$container->setDefinition($transportId = 'messenger.transport.'.$name, $transportDefinition);
|
->setArguments([$transport['dsn'], $transport['options'] + ['transport_name' => $name], new Reference($serializerId)])
|
||||||
$senderAliases[$name] = $transportId;
|
->addTag('messenger.receiver', ['alias' => $name])
|
||||||
|
;
|
||||||
|
$container->setDefinition($transportId = 'messenger.transport.'.$name, $transportDefinition);
|
||||||
|
|
||||||
|
// alias => service_id
|
||||||
|
$senderReferences[$name] = new Reference($transportId);
|
||||||
|
// service_id => service_id
|
||||||
|
$senderReferences[$transportId] = new Reference($transportId);
|
||||||
|
}
|
||||||
|
|
||||||
if (null !== $transport['retry_strategy']['service']) {
|
if (null !== $transport['retry_strategy']['service']) {
|
||||||
$transportRetryReferences[$name] = new Reference($transport['retry_strategy']['service']);
|
$transportRetryReferences[$name] = new Reference($transport['retry_strategy']['service']);
|
||||||
@ -1803,30 +1812,25 @@ class FrameworkExtension extends Extension
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
$senderReferences = [];
|
|
||||||
// alias => service_id
|
|
||||||
foreach ($senderAliases as $alias => $serviceId) {
|
|
||||||
$senderReferences[$alias] = new Reference($serviceId);
|
|
||||||
}
|
|
||||||
// service_id => service_id
|
|
||||||
foreach ($senderAliases as $serviceId) {
|
|
||||||
$senderReferences[$serviceId] = new Reference($serviceId);
|
|
||||||
}
|
|
||||||
|
|
||||||
$messageToSendersMapping = [];
|
$messageToSendersMapping = [];
|
||||||
foreach ($config['routing'] as $message => $messageConfiguration) {
|
foreach ($config['routing'] as $message => $messageConfiguration) {
|
||||||
if ('*' !== $message && !class_exists($message) && !interface_exists($message, false)) {
|
if ('*' !== $message && !class_exists($message) && !interface_exists($message, false)) {
|
||||||
throw new LogicException(sprintf('Invalid Messenger routing configuration: class or interface "%s" not found.', $message));
|
throw new LogicException(sprintf('Invalid Messenger routing configuration: class or interface "%s" not found.', $message));
|
||||||
}
|
}
|
||||||
|
|
||||||
// make sure senderAliases contains all senders
|
// filter out "sync" senders
|
||||||
|
$realSenders = [];
|
||||||
foreach ($messageConfiguration['senders'] as $sender) {
|
foreach ($messageConfiguration['senders'] as $sender) {
|
||||||
if (!isset($senderReferences[$sender])) {
|
if (isset($senderReferences[$sender])) {
|
||||||
|
$realSenders[] = $sender;
|
||||||
|
} elseif (!\in_array($sender, $syncTransports, true)) {
|
||||||
throw new LogicException(sprintf('Invalid Messenger routing configuration: the "%s" class is being routed to a sender called "%s". This is not a valid transport or service id.', $message, $sender));
|
throw new LogicException(sprintf('Invalid Messenger routing configuration: the "%s" class is being routed to a sender called "%s". This is not a valid transport or service id.', $message, $sender));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
$messageToSendersMapping[$message] = $messageConfiguration['senders'];
|
if ($realSenders) {
|
||||||
|
$messageToSendersMapping[$message] = $realSenders;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
$container->getDefinition('messenger.senders_locator')
|
$container->getDefinition('messenger.senders_locator')
|
||||||
|
@ -73,11 +73,6 @@
|
|||||||
<tag name="messenger.transport_factory" />
|
<tag name="messenger.transport_factory" />
|
||||||
</service>
|
</service>
|
||||||
|
|
||||||
<service id="messenger.transport.sync.factory" class="Symfony\Component\Messenger\Transport\Sync\SyncTransportFactory">
|
|
||||||
<tag name="messenger.transport_factory" />
|
|
||||||
<argument type="service" id="messenger.routable_message_bus" />
|
|
||||||
</service>
|
|
||||||
|
|
||||||
<service id="messenger.transport.in_memory.factory" class="Symfony\Component\Messenger\Transport\InMemoryTransportFactory">
|
<service id="messenger.transport.in_memory.factory" class="Symfony\Component\Messenger\Transport\InMemoryTransportFactory">
|
||||||
<tag name="messenger.transport_factory" />
|
<tag name="messenger.transport_factory" />
|
||||||
<tag name="kernel.reset" method="reset" />
|
<tag name="kernel.reset" method="reset" />
|
||||||
|
@ -0,0 +1,15 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
$container->loadFromExtension('framework', [
|
||||||
|
'messenger' => [
|
||||||
|
'transports' => [
|
||||||
|
'amqp' => 'amqp://localhost/%2f/messages',
|
||||||
|
'sync' => 'sync://',
|
||||||
|
],
|
||||||
|
'routing' => [
|
||||||
|
'Symfony\Bundle\FrameworkBundle\Tests\Fixtures\Messenger\DummyMessage' => ['amqp'],
|
||||||
|
'Symfony\Bundle\FrameworkBundle\Tests\Fixtures\Messenger\SecondMessage' => ['sync'],
|
||||||
|
'Symfony\Bundle\FrameworkBundle\Tests\Fixtures\Messenger\FooMessage' => ['amqp', 'sync'],
|
||||||
|
],
|
||||||
|
],
|
||||||
|
]);
|
@ -0,0 +1,24 @@
|
|||||||
|
<?xml version="1.0" encoding="utf-8" ?>
|
||||||
|
<container xmlns="http://symfony.com/schema/dic/services"
|
||||||
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
|
xmlns:framework="http://symfony.com/schema/dic/symfony"
|
||||||
|
xsi:schemaLocation="http://symfony.com/schema/dic/services https://symfony.com/schema/dic/services/services-1.0.xsd
|
||||||
|
http://symfony.com/schema/dic/symfony https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
|
||||||
|
|
||||||
|
<framework:config>
|
||||||
|
<framework:messenger>
|
||||||
|
<framework:routing message-class="Symfony\Bundle\FrameworkBundle\Tests\Fixtures\Messenger\DummyMessage">
|
||||||
|
<framework:sender service="amqp" />
|
||||||
|
</framework:routing>
|
||||||
|
<framework:routing message-class="Symfony\Bundle\FrameworkBundle\Tests\Fixtures\Messenger\SecondMessage">
|
||||||
|
<framework:sender service="sync" />
|
||||||
|
</framework:routing>
|
||||||
|
<framework:routing message-class="Symfony\Bundle\FrameworkBundle\Tests\Fixtures\Messenger\FooMessage">
|
||||||
|
<framework:sender service="amqp" />
|
||||||
|
<framework:sender service="sync" />
|
||||||
|
</framework:routing>
|
||||||
|
<framework:transport name="amqp" dsn="amqp://localhost/%2f/messages" />
|
||||||
|
<framework:transport name="sync" dsn="sync://" />
|
||||||
|
</framework:messenger>
|
||||||
|
</framework:config>
|
||||||
|
</container>
|
@ -0,0 +1,10 @@
|
|||||||
|
framework:
|
||||||
|
messenger:
|
||||||
|
transports:
|
||||||
|
amqp: 'amqp://localhost/%2f/messages'
|
||||||
|
sync: 'sync://'
|
||||||
|
|
||||||
|
routing:
|
||||||
|
'Symfony\Bundle\FrameworkBundle\Tests\Fixtures\Messenger\DummyMessage': amqp
|
||||||
|
'Symfony\Bundle\FrameworkBundle\Tests\Fixtures\Messenger\SecondMessage': sync
|
||||||
|
'Symfony\Bundle\FrameworkBundle\Tests\Fixtures\Messenger\FooMessage': [amqp, sync]
|
@ -16,6 +16,8 @@ use Psr\Log\LoggerAwareInterface;
|
|||||||
use Symfony\Bundle\FrameworkBundle\DependencyInjection\Compiler\AddAnnotationsCachedReaderPass;
|
use Symfony\Bundle\FrameworkBundle\DependencyInjection\Compiler\AddAnnotationsCachedReaderPass;
|
||||||
use Symfony\Bundle\FrameworkBundle\DependencyInjection\FrameworkExtension;
|
use Symfony\Bundle\FrameworkBundle\DependencyInjection\FrameworkExtension;
|
||||||
use Symfony\Bundle\FrameworkBundle\Tests\Fixtures\Messenger\DummyMessage;
|
use Symfony\Bundle\FrameworkBundle\Tests\Fixtures\Messenger\DummyMessage;
|
||||||
|
use Symfony\Bundle\FrameworkBundle\Tests\Fixtures\Messenger\FooMessage;
|
||||||
|
use Symfony\Bundle\FrameworkBundle\Tests\Fixtures\Messenger\SecondMessage;
|
||||||
use Symfony\Bundle\FrameworkBundle\Tests\TestCase;
|
use Symfony\Bundle\FrameworkBundle\Tests\TestCase;
|
||||||
use Symfony\Bundle\FullStack;
|
use Symfony\Bundle\FullStack;
|
||||||
use Symfony\Component\Cache\Adapter\AdapterInterface;
|
use Symfony\Component\Cache\Adapter\AdapterInterface;
|
||||||
@ -780,6 +782,17 @@ abstract class FrameworkExtensionTest extends TestCase
|
|||||||
$this->createContainerFromFile('messenger_routing_invalid_transport');
|
$this->createContainerFromFile('messenger_routing_invalid_transport');
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public function testMessengerSyncTransport()
|
||||||
|
{
|
||||||
|
$container = $this->createContainerFromFile('messenger_sync_transport');
|
||||||
|
$senderLocatorDefinition = $container->getDefinition('messenger.senders_locator');
|
||||||
|
|
||||||
|
$sendersMapping = $senderLocatorDefinition->getArgument(0);
|
||||||
|
$this->assertEquals(['amqp'], $sendersMapping[DummyMessage::class]);
|
||||||
|
$this->assertArrayNotHasKey(SecondMessage::class, $sendersMapping);
|
||||||
|
$this->assertEquals(['amqp'], $sendersMapping[FooMessage::class]);
|
||||||
|
}
|
||||||
|
|
||||||
public function testTranslator()
|
public function testTranslator()
|
||||||
{
|
{
|
||||||
$container = $this->createContainerFromFile('full');
|
$container = $this->createContainerFromFile('full');
|
||||||
|
@ -4,6 +4,7 @@ CHANGELOG
|
|||||||
4.4.0
|
4.4.0
|
||||||
-----
|
-----
|
||||||
|
|
||||||
|
* [BC BREAK] The `SyncTransport` and `SyncTransportFactory` classes were removed.
|
||||||
* Deprecated passing a `ContainerInterface` instance as first argument of the `ConsumeMessagesCommand` constructor,
|
* Deprecated passing a `ContainerInterface` instance as first argument of the `ConsumeMessagesCommand` constructor,
|
||||||
pass a `RoutableMessageBus` instance instead.
|
pass a `RoutableMessageBus` instance instead.
|
||||||
* Added support for auto trimming of Redis streams.
|
* Added support for auto trimming of Redis streams.
|
||||||
|
@ -1,30 +0,0 @@
|
|||||||
<?php
|
|
||||||
|
|
||||||
/*
|
|
||||||
* This file is part of the Symfony package.
|
|
||||||
*
|
|
||||||
* (c) Fabien Potencier <fabien@symfony.com>
|
|
||||||
*
|
|
||||||
* For the full copyright and license information, please view the LICENSE
|
|
||||||
* file that was distributed with this source code.
|
|
||||||
*/
|
|
||||||
|
|
||||||
namespace Symfony\Component\Messenger\Tests\Transport\AmqpExt;
|
|
||||||
|
|
||||||
use PHPUnit\Framework\TestCase;
|
|
||||||
use Symfony\Component\Messenger\MessageBusInterface;
|
|
||||||
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
|
|
||||||
use Symfony\Component\Messenger\Transport\Sync\SyncTransport;
|
|
||||||
use Symfony\Component\Messenger\Transport\Sync\SyncTransportFactory;
|
|
||||||
|
|
||||||
class SyncTransportFactoryTest extends TestCase
|
|
||||||
{
|
|
||||||
public function testCreateTransport()
|
|
||||||
{
|
|
||||||
$serializer = $this->createMock(SerializerInterface::class);
|
|
||||||
$bus = $this->createMock(MessageBusInterface::class);
|
|
||||||
$factory = new SyncTransportFactory($bus);
|
|
||||||
$transport = $factory->createTransport('sync://', [], $serializer);
|
|
||||||
$this->assertInstanceOf(SyncTransport::class, $transport);
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,41 +0,0 @@
|
|||||||
<?php
|
|
||||||
|
|
||||||
/*
|
|
||||||
* This file is part of the Symfony package.
|
|
||||||
*
|
|
||||||
* (c) Fabien Potencier <fabien@symfony.com>
|
|
||||||
*
|
|
||||||
* For the full copyright and license information, please view the LICENSE
|
|
||||||
* file that was distributed with this source code.
|
|
||||||
*/
|
|
||||||
|
|
||||||
namespace Symfony\Component\Messenger\Tests\Transport\AmqpExt;
|
|
||||||
|
|
||||||
use PHPUnit\Framework\TestCase;
|
|
||||||
use Symfony\Component\Messenger\Envelope;
|
|
||||||
use Symfony\Component\Messenger\MessageBusInterface;
|
|
||||||
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
|
|
||||||
use Symfony\Component\Messenger\Transport\Sync\SyncTransport;
|
|
||||||
|
|
||||||
class SyncTransportTest extends TestCase
|
|
||||||
{
|
|
||||||
public function testSend()
|
|
||||||
{
|
|
||||||
$bus = $this->createMock(MessageBusInterface::class);
|
|
||||||
$bus->expects($this->once())
|
|
||||||
->method('dispatch')
|
|
||||||
->with($this->callback(function ($arg) {
|
|
||||||
$this->assertInstanceOf(Envelope::class, $arg);
|
|
||||||
|
|
||||||
return true;
|
|
||||||
}))
|
|
||||||
->willReturnArgument(0);
|
|
||||||
$message = new \stdClass();
|
|
||||||
$envelope = new Envelope($message);
|
|
||||||
$transport = new SyncTransport($bus);
|
|
||||||
$envelope = $transport->send($envelope);
|
|
||||||
|
|
||||||
$this->assertSame($message, $envelope->getMessage());
|
|
||||||
$this->assertNotNull($envelope->last(ReceivedStamp::class));
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,65 +0,0 @@
|
|||||||
<?php
|
|
||||||
|
|
||||||
/*
|
|
||||||
* This file is part of the Symfony package.
|
|
||||||
*
|
|
||||||
* (c) Fabien Potencier <fabien@symfony.com>
|
|
||||||
*
|
|
||||||
* For the full copyright and license information, please view the LICENSE
|
|
||||||
* file that was distributed with this source code.
|
|
||||||
*/
|
|
||||||
|
|
||||||
namespace Symfony\Component\Messenger\Transport\Sync;
|
|
||||||
|
|
||||||
use Symfony\Component\Messenger\Envelope;
|
|
||||||
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
|
|
||||||
use Symfony\Component\Messenger\MessageBusInterface;
|
|
||||||
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
|
|
||||||
use Symfony\Component\Messenger\Stamp\SentStamp;
|
|
||||||
use Symfony\Component\Messenger\Transport\TransportInterface;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Transport that immediately marks messages as received and dispatches for handling.
|
|
||||||
*
|
|
||||||
* @author Ryan Weaver <ryan@symfonycasts.com>
|
|
||||||
*/
|
|
||||||
class SyncTransport implements TransportInterface
|
|
||||||
{
|
|
||||||
private $messageBus;
|
|
||||||
|
|
||||||
public function __construct(MessageBusInterface $messageBus)
|
|
||||||
{
|
|
||||||
$this->messageBus = $messageBus;
|
|
||||||
}
|
|
||||||
|
|
||||||
public function get(): iterable
|
|
||||||
{
|
|
||||||
throw new InvalidArgumentException('You cannot receive messages from the Messenger SyncTransport.');
|
|
||||||
}
|
|
||||||
|
|
||||||
public function stop(): void
|
|
||||||
{
|
|
||||||
throw new InvalidArgumentException('You cannot call stop() on the Messenger SyncTransport.');
|
|
||||||
}
|
|
||||||
|
|
||||||
public function ack(Envelope $envelope): void
|
|
||||||
{
|
|
||||||
throw new InvalidArgumentException('You cannot call ack() on the Messenger SyncTransport.');
|
|
||||||
}
|
|
||||||
|
|
||||||
public function reject(Envelope $envelope): void
|
|
||||||
{
|
|
||||||
throw new InvalidArgumentException('You cannot call reject() on the Messenger SyncTransport.');
|
|
||||||
}
|
|
||||||
|
|
||||||
public function send(Envelope $envelope): Envelope
|
|
||||||
{
|
|
||||||
/** @var SentStamp|null $sentStamp */
|
|
||||||
$sentStamp = $envelope->last(SentStamp::class);
|
|
||||||
$alias = null === $sentStamp ? 'sync' : ($sentStamp->getSenderAlias() ?: $sentStamp->getSenderClass());
|
|
||||||
|
|
||||||
$envelope = $envelope->with(new ReceivedStamp($alias));
|
|
||||||
|
|
||||||
return $this->messageBus->dispatch($envelope);
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,40 +0,0 @@
|
|||||||
<?php
|
|
||||||
|
|
||||||
/*
|
|
||||||
* This file is part of the Symfony package.
|
|
||||||
*
|
|
||||||
* (c) Fabien Potencier <fabien@symfony.com>
|
|
||||||
*
|
|
||||||
* For the full copyright and license information, please view the LICENSE
|
|
||||||
* file that was distributed with this source code.
|
|
||||||
*/
|
|
||||||
|
|
||||||
namespace Symfony\Component\Messenger\Transport\Sync;
|
|
||||||
|
|
||||||
use Symfony\Component\Messenger\MessageBusInterface;
|
|
||||||
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
|
|
||||||
use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
|
|
||||||
use Symfony\Component\Messenger\Transport\TransportInterface;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @author Ryan Weaver <ryan@symfonycasts.com>
|
|
||||||
*/
|
|
||||||
class SyncTransportFactory implements TransportFactoryInterface
|
|
||||||
{
|
|
||||||
private $messageBus;
|
|
||||||
|
|
||||||
public function __construct(MessageBusInterface $messageBus)
|
|
||||||
{
|
|
||||||
$this->messageBus = $messageBus;
|
|
||||||
}
|
|
||||||
|
|
||||||
public function createTransport(string $dsn, array $options, SerializerInterface $serializer): TransportInterface
|
|
||||||
{
|
|
||||||
return new SyncTransport($this->messageBus);
|
|
||||||
}
|
|
||||||
|
|
||||||
public function supports(string $dsn, array $options): bool
|
|
||||||
{
|
|
||||||
return 0 === strpos($dsn, 'sync://');
|
|
||||||
}
|
|
||||||
}
|
|
Reference in New Issue
Block a user