diff --git a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php index 2c8d080859..20ba862353 100644 --- a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php +++ b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php @@ -1774,18 +1774,27 @@ class FrameworkExtension extends Extension $container->setAlias('messenger.default_serializer', $config['serializer']['default_serializer']); } - $senderAliases = []; + $senderReferences = []; + $syncTransports = []; $transportRetryReferences = []; foreach ($config['transports'] as $name => $transport) { $serializerId = $transport['serializer'] ?? 'messenger.default_serializer'; - $transportDefinition = (new Definition(TransportInterface::class)) - ->setFactory([new Reference('messenger.transport_factory'), 'createTransport']) - ->setArguments([$transport['dsn'], $transport['options'] + ['transport_name' => $name], new Reference($serializerId)]) - ->addTag('messenger.receiver', ['alias' => $name]) - ; - $container->setDefinition($transportId = 'messenger.transport.'.$name, $transportDefinition); - $senderAliases[$name] = $transportId; + if (0 === strpos($transport['dsn'], 'sync://')) { + $syncTransports[] = $name; + } else { + $transportDefinition = (new Definition(TransportInterface::class)) + ->setFactory([new Reference('messenger.transport_factory'), 'createTransport']) + ->setArguments([$transport['dsn'], $transport['options'] + ['transport_name' => $name], new Reference($serializerId)]) + ->addTag('messenger.receiver', ['alias' => $name]) + ; + $container->setDefinition($transportId = 'messenger.transport.'.$name, $transportDefinition); + + // alias => service_id + $senderReferences[$name] = new Reference($transportId); + // service_id => service_id + $senderReferences[$transportId] = new Reference($transportId); + } if (null !== $transport['retry_strategy']['service']) { $transportRetryReferences[$name] = new Reference($transport['retry_strategy']['service']); @@ -1803,30 +1812,25 @@ class FrameworkExtension extends Extension } } - $senderReferences = []; - // alias => service_id - foreach ($senderAliases as $alias => $serviceId) { - $senderReferences[$alias] = new Reference($serviceId); - } - // service_id => service_id - foreach ($senderAliases as $serviceId) { - $senderReferences[$serviceId] = new Reference($serviceId); - } - $messageToSendersMapping = []; foreach ($config['routing'] as $message => $messageConfiguration) { if ('*' !== $message && !class_exists($message) && !interface_exists($message, false)) { throw new LogicException(sprintf('Invalid Messenger routing configuration: class or interface "%s" not found.', $message)); } - // make sure senderAliases contains all senders + // filter out "sync" senders + $realSenders = []; foreach ($messageConfiguration['senders'] as $sender) { - if (!isset($senderReferences[$sender])) { + if (isset($senderReferences[$sender])) { + $realSenders[] = $sender; + } elseif (!\in_array($sender, $syncTransports, true)) { throw new LogicException(sprintf('Invalid Messenger routing configuration: the "%s" class is being routed to a sender called "%s". This is not a valid transport or service id.', $message, $sender)); } } - $messageToSendersMapping[$message] = $messageConfiguration['senders']; + if ($realSenders) { + $messageToSendersMapping[$message] = $realSenders; + } } $container->getDefinition('messenger.senders_locator') diff --git a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml index b329aa075c..b5fe17b5f9 100644 --- a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml +++ b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml @@ -73,11 +73,6 @@ - - - - - diff --git a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/php/messenger_sync_transport.php b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/php/messenger_sync_transport.php new file mode 100644 index 0000000000..09f8c53fc3 --- /dev/null +++ b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/php/messenger_sync_transport.php @@ -0,0 +1,15 @@ +loadFromExtension('framework', [ + 'messenger' => [ + 'transports' => [ + 'amqp' => 'amqp://localhost/%2f/messages', + 'sync' => 'sync://', + ], + 'routing' => [ + 'Symfony\Bundle\FrameworkBundle\Tests\Fixtures\Messenger\DummyMessage' => ['amqp'], + 'Symfony\Bundle\FrameworkBundle\Tests\Fixtures\Messenger\SecondMessage' => ['sync'], + 'Symfony\Bundle\FrameworkBundle\Tests\Fixtures\Messenger\FooMessage' => ['amqp', 'sync'], + ], + ], +]); diff --git a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/xml/messenger_sync_transport.xml b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/xml/messenger_sync_transport.xml new file mode 100644 index 0000000000..2345cc2342 --- /dev/null +++ b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/xml/messenger_sync_transport.xml @@ -0,0 +1,24 @@ + + + + + + + + + + + + + + + + + + + + diff --git a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/yml/messenger_sync_transport.yml b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/yml/messenger_sync_transport.yml new file mode 100644 index 0000000000..f7b4fdefe0 --- /dev/null +++ b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/yml/messenger_sync_transport.yml @@ -0,0 +1,10 @@ +framework: + messenger: + transports: + amqp: 'amqp://localhost/%2f/messages' + sync: 'sync://' + + routing: + 'Symfony\Bundle\FrameworkBundle\Tests\Fixtures\Messenger\DummyMessage': amqp + 'Symfony\Bundle\FrameworkBundle\Tests\Fixtures\Messenger\SecondMessage': sync + 'Symfony\Bundle\FrameworkBundle\Tests\Fixtures\Messenger\FooMessage': [amqp, sync] diff --git a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php index 3725ef19f3..639143bae1 100644 --- a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php +++ b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php @@ -16,6 +16,8 @@ use Psr\Log\LoggerAwareInterface; use Symfony\Bundle\FrameworkBundle\DependencyInjection\Compiler\AddAnnotationsCachedReaderPass; use Symfony\Bundle\FrameworkBundle\DependencyInjection\FrameworkExtension; use Symfony\Bundle\FrameworkBundle\Tests\Fixtures\Messenger\DummyMessage; +use Symfony\Bundle\FrameworkBundle\Tests\Fixtures\Messenger\FooMessage; +use Symfony\Bundle\FrameworkBundle\Tests\Fixtures\Messenger\SecondMessage; use Symfony\Bundle\FrameworkBundle\Tests\TestCase; use Symfony\Bundle\FullStack; use Symfony\Component\Cache\Adapter\AdapterInterface; @@ -780,6 +782,17 @@ abstract class FrameworkExtensionTest extends TestCase $this->createContainerFromFile('messenger_routing_invalid_transport'); } + public function testMessengerSyncTransport() + { + $container = $this->createContainerFromFile('messenger_sync_transport'); + $senderLocatorDefinition = $container->getDefinition('messenger.senders_locator'); + + $sendersMapping = $senderLocatorDefinition->getArgument(0); + $this->assertEquals(['amqp'], $sendersMapping[DummyMessage::class]); + $this->assertArrayNotHasKey(SecondMessage::class, $sendersMapping); + $this->assertEquals(['amqp'], $sendersMapping[FooMessage::class]); + } + public function testTranslator() { $container = $this->createContainerFromFile('full'); diff --git a/src/Symfony/Component/Messenger/CHANGELOG.md b/src/Symfony/Component/Messenger/CHANGELOG.md index 30e195f7ee..68d180d0e7 100644 --- a/src/Symfony/Component/Messenger/CHANGELOG.md +++ b/src/Symfony/Component/Messenger/CHANGELOG.md @@ -4,6 +4,7 @@ CHANGELOG 4.4.0 ----- + * [BC BREAK] The `SyncTransport` and `SyncTransportFactory` classes were removed. * Deprecated passing a `ContainerInterface` instance as first argument of the `ConsumeMessagesCommand` constructor, pass a `RoutableMessageBus` instance instead. * Added support for auto trimming of Redis streams. diff --git a/src/Symfony/Component/Messenger/Tests/Transport/Sync/SyncTransportFactoryTest.php b/src/Symfony/Component/Messenger/Tests/Transport/Sync/SyncTransportFactoryTest.php deleted file mode 100644 index 021c7ae970..0000000000 --- a/src/Symfony/Component/Messenger/Tests/Transport/Sync/SyncTransportFactoryTest.php +++ /dev/null @@ -1,30 +0,0 @@ - - * - * For the full copyright and license information, please view the LICENSE - * file that was distributed with this source code. - */ - -namespace Symfony\Component\Messenger\Tests\Transport\AmqpExt; - -use PHPUnit\Framework\TestCase; -use Symfony\Component\Messenger\MessageBusInterface; -use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; -use Symfony\Component\Messenger\Transport\Sync\SyncTransport; -use Symfony\Component\Messenger\Transport\Sync\SyncTransportFactory; - -class SyncTransportFactoryTest extends TestCase -{ - public function testCreateTransport() - { - $serializer = $this->createMock(SerializerInterface::class); - $bus = $this->createMock(MessageBusInterface::class); - $factory = new SyncTransportFactory($bus); - $transport = $factory->createTransport('sync://', [], $serializer); - $this->assertInstanceOf(SyncTransport::class, $transport); - } -} diff --git a/src/Symfony/Component/Messenger/Tests/Transport/Sync/SyncTransportTest.php b/src/Symfony/Component/Messenger/Tests/Transport/Sync/SyncTransportTest.php deleted file mode 100644 index 13549e2758..0000000000 --- a/src/Symfony/Component/Messenger/Tests/Transport/Sync/SyncTransportTest.php +++ /dev/null @@ -1,41 +0,0 @@ - - * - * For the full copyright and license information, please view the LICENSE - * file that was distributed with this source code. - */ - -namespace Symfony\Component\Messenger\Tests\Transport\AmqpExt; - -use PHPUnit\Framework\TestCase; -use Symfony\Component\Messenger\Envelope; -use Symfony\Component\Messenger\MessageBusInterface; -use Symfony\Component\Messenger\Stamp\ReceivedStamp; -use Symfony\Component\Messenger\Transport\Sync\SyncTransport; - -class SyncTransportTest extends TestCase -{ - public function testSend() - { - $bus = $this->createMock(MessageBusInterface::class); - $bus->expects($this->once()) - ->method('dispatch') - ->with($this->callback(function ($arg) { - $this->assertInstanceOf(Envelope::class, $arg); - - return true; - })) - ->willReturnArgument(0); - $message = new \stdClass(); - $envelope = new Envelope($message); - $transport = new SyncTransport($bus); - $envelope = $transport->send($envelope); - - $this->assertSame($message, $envelope->getMessage()); - $this->assertNotNull($envelope->last(ReceivedStamp::class)); - } -} diff --git a/src/Symfony/Component/Messenger/Transport/Sync/SyncTransport.php b/src/Symfony/Component/Messenger/Transport/Sync/SyncTransport.php deleted file mode 100644 index 67af903913..0000000000 --- a/src/Symfony/Component/Messenger/Transport/Sync/SyncTransport.php +++ /dev/null @@ -1,65 +0,0 @@ - - * - * For the full copyright and license information, please view the LICENSE - * file that was distributed with this source code. - */ - -namespace Symfony\Component\Messenger\Transport\Sync; - -use Symfony\Component\Messenger\Envelope; -use Symfony\Component\Messenger\Exception\InvalidArgumentException; -use Symfony\Component\Messenger\MessageBusInterface; -use Symfony\Component\Messenger\Stamp\ReceivedStamp; -use Symfony\Component\Messenger\Stamp\SentStamp; -use Symfony\Component\Messenger\Transport\TransportInterface; - -/** - * Transport that immediately marks messages as received and dispatches for handling. - * - * @author Ryan Weaver - */ -class SyncTransport implements TransportInterface -{ - private $messageBus; - - public function __construct(MessageBusInterface $messageBus) - { - $this->messageBus = $messageBus; - } - - public function get(): iterable - { - throw new InvalidArgumentException('You cannot receive messages from the Messenger SyncTransport.'); - } - - public function stop(): void - { - throw new InvalidArgumentException('You cannot call stop() on the Messenger SyncTransport.'); - } - - public function ack(Envelope $envelope): void - { - throw new InvalidArgumentException('You cannot call ack() on the Messenger SyncTransport.'); - } - - public function reject(Envelope $envelope): void - { - throw new InvalidArgumentException('You cannot call reject() on the Messenger SyncTransport.'); - } - - public function send(Envelope $envelope): Envelope - { - /** @var SentStamp|null $sentStamp */ - $sentStamp = $envelope->last(SentStamp::class); - $alias = null === $sentStamp ? 'sync' : ($sentStamp->getSenderAlias() ?: $sentStamp->getSenderClass()); - - $envelope = $envelope->with(new ReceivedStamp($alias)); - - return $this->messageBus->dispatch($envelope); - } -} diff --git a/src/Symfony/Component/Messenger/Transport/Sync/SyncTransportFactory.php b/src/Symfony/Component/Messenger/Transport/Sync/SyncTransportFactory.php deleted file mode 100644 index 1784bfb797..0000000000 --- a/src/Symfony/Component/Messenger/Transport/Sync/SyncTransportFactory.php +++ /dev/null @@ -1,40 +0,0 @@ - - * - * For the full copyright and license information, please view the LICENSE - * file that was distributed with this source code. - */ - -namespace Symfony\Component\Messenger\Transport\Sync; - -use Symfony\Component\Messenger\MessageBusInterface; -use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; -use Symfony\Component\Messenger\Transport\TransportFactoryInterface; -use Symfony\Component\Messenger\Transport\TransportInterface; - -/** - * @author Ryan Weaver - */ -class SyncTransportFactory implements TransportFactoryInterface -{ - private $messageBus; - - public function __construct(MessageBusInterface $messageBus) - { - $this->messageBus = $messageBus; - } - - public function createTransport(string $dsn, array $options, SerializerInterface $serializer): TransportInterface - { - return new SyncTransport($this->messageBus); - } - - public function supports(string $dsn, array $options): bool - { - return 0 === strpos($dsn, 'sync://'); - } -}