From 38f19a960c39200c9bd27e28a07c6b36491eda35 Mon Sep 17 00:00:00 2001 From: Ryan Weaver Date: Mon, 28 Oct 2019 11:14:08 -0400 Subject: [PATCH] Revert "[Messenger] Removing "sync" transport and replacing it with much nicer config trick" This reverts commit 3d4e59a10b0b0e8e47ac48b6a6dd2b79484b0843. --- .../FrameworkExtension.php | 46 ++++++------- .../Resources/config/messenger.xml | 5 ++ .../Fixtures/php/messenger_sync_transport.php | 15 ----- .../Fixtures/xml/messenger_sync_transport.xml | 24 ------- .../Fixtures/yml/messenger_sync_transport.yml | 10 --- .../FrameworkExtensionTest.php | 13 ---- src/Symfony/Component/Messenger/CHANGELOG.md | 1 - .../Sync/SyncTransportFactoryTest.php | 30 +++++++++ .../Transport/Sync/SyncTransportTest.php | 41 ++++++++++++ .../Transport/Sync/SyncTransport.php | 65 +++++++++++++++++++ .../Transport/Sync/SyncTransportFactory.php | 40 ++++++++++++ 11 files changed, 202 insertions(+), 88 deletions(-) delete mode 100644 src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/php/messenger_sync_transport.php delete mode 100644 src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/xml/messenger_sync_transport.xml delete mode 100644 src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/yml/messenger_sync_transport.yml create mode 100644 src/Symfony/Component/Messenger/Tests/Transport/Sync/SyncTransportFactoryTest.php create mode 100644 src/Symfony/Component/Messenger/Tests/Transport/Sync/SyncTransportTest.php create mode 100644 src/Symfony/Component/Messenger/Transport/Sync/SyncTransport.php create mode 100644 src/Symfony/Component/Messenger/Transport/Sync/SyncTransportFactory.php diff --git a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php index 0dae2eebcd..246828f410 100644 --- a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php +++ b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php @@ -1775,27 +1775,18 @@ class FrameworkExtension extends Extension $container->setAlias('messenger.default_serializer', $config['serializer']['default_serializer']); } - $senderReferences = []; - $syncTransports = []; + $senderAliases = []; $transportRetryReferences = []; foreach ($config['transports'] as $name => $transport) { $serializerId = $transport['serializer'] ?? 'messenger.default_serializer'; - if (0 === strpos($transport['dsn'], 'sync://')) { - $syncTransports[] = $name; - } else { - $transportDefinition = (new Definition(TransportInterface::class)) - ->setFactory([new Reference('messenger.transport_factory'), 'createTransport']) - ->setArguments([$transport['dsn'], $transport['options'] + ['transport_name' => $name], new Reference($serializerId)]) - ->addTag('messenger.receiver', ['alias' => $name]) - ; - $container->setDefinition($transportId = 'messenger.transport.'.$name, $transportDefinition); - - // alias => service_id - $senderReferences[$name] = new Reference($transportId); - // service_id => service_id - $senderReferences[$transportId] = new Reference($transportId); - } + $transportDefinition = (new Definition(TransportInterface::class)) + ->setFactory([new Reference('messenger.transport_factory'), 'createTransport']) + ->setArguments([$transport['dsn'], $transport['options'] + ['transport_name' => $name], new Reference($serializerId)]) + ->addTag('messenger.receiver', ['alias' => $name]) + ; + $container->setDefinition($transportId = 'messenger.transport.'.$name, $transportDefinition); + $senderAliases[$name] = $transportId; if (null !== $transport['retry_strategy']['service']) { $transportRetryReferences[$name] = new Reference($transport['retry_strategy']['service']); @@ -1813,25 +1804,30 @@ class FrameworkExtension extends Extension } } + $senderReferences = []; + // alias => service_id + foreach ($senderAliases as $alias => $serviceId) { + $senderReferences[$alias] = new Reference($serviceId); + } + // service_id => service_id + foreach ($senderAliases as $serviceId) { + $senderReferences[$serviceId] = new Reference($serviceId); + } + $messageToSendersMapping = []; foreach ($config['routing'] as $message => $messageConfiguration) { if ('*' !== $message && !class_exists($message) && !interface_exists($message, false)) { throw new LogicException(sprintf('Invalid Messenger routing configuration: class or interface "%s" not found.', $message)); } - // filter out "sync" senders - $realSenders = []; + // make sure senderAliases contains all senders foreach ($messageConfiguration['senders'] as $sender) { - if (isset($senderReferences[$sender])) { - $realSenders[] = $sender; - } elseif (!\in_array($sender, $syncTransports, true)) { + if (!isset($senderReferences[$sender])) { throw new LogicException(sprintf('Invalid Messenger routing configuration: the "%s" class is being routed to a sender called "%s". This is not a valid transport or service id.', $message, $sender)); } } - if ($realSenders) { - $messageToSendersMapping[$message] = $realSenders; - } + $messageToSendersMapping[$message] = $messageConfiguration['senders']; } $container->getDefinition('messenger.senders_locator') diff --git a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml index 6debf4a3c2..2284dd7963 100644 --- a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml +++ b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml @@ -75,6 +75,11 @@ + + + + + diff --git a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/php/messenger_sync_transport.php b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/php/messenger_sync_transport.php deleted file mode 100644 index 09f8c53fc3..0000000000 --- a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/php/messenger_sync_transport.php +++ /dev/null @@ -1,15 +0,0 @@ -loadFromExtension('framework', [ - 'messenger' => [ - 'transports' => [ - 'amqp' => 'amqp://localhost/%2f/messages', - 'sync' => 'sync://', - ], - 'routing' => [ - 'Symfony\Bundle\FrameworkBundle\Tests\Fixtures\Messenger\DummyMessage' => ['amqp'], - 'Symfony\Bundle\FrameworkBundle\Tests\Fixtures\Messenger\SecondMessage' => ['sync'], - 'Symfony\Bundle\FrameworkBundle\Tests\Fixtures\Messenger\FooMessage' => ['amqp', 'sync'], - ], - ], -]); diff --git a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/xml/messenger_sync_transport.xml b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/xml/messenger_sync_transport.xml deleted file mode 100644 index 2345cc2342..0000000000 --- a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/xml/messenger_sync_transport.xml +++ /dev/null @@ -1,24 +0,0 @@ - - - - - - - - - - - - - - - - - - - - diff --git a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/yml/messenger_sync_transport.yml b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/yml/messenger_sync_transport.yml deleted file mode 100644 index f7b4fdefe0..0000000000 --- a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/yml/messenger_sync_transport.yml +++ /dev/null @@ -1,10 +0,0 @@ -framework: - messenger: - transports: - amqp: 'amqp://localhost/%2f/messages' - sync: 'sync://' - - routing: - 'Symfony\Bundle\FrameworkBundle\Tests\Fixtures\Messenger\DummyMessage': amqp - 'Symfony\Bundle\FrameworkBundle\Tests\Fixtures\Messenger\SecondMessage': sync - 'Symfony\Bundle\FrameworkBundle\Tests\Fixtures\Messenger\FooMessage': [amqp, sync] diff --git a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php index fe49656032..1e97f61d7c 100644 --- a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php +++ b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php @@ -16,8 +16,6 @@ use Psr\Log\LoggerAwareInterface; use Symfony\Bundle\FrameworkBundle\DependencyInjection\Compiler\AddAnnotationsCachedReaderPass; use Symfony\Bundle\FrameworkBundle\DependencyInjection\FrameworkExtension; use Symfony\Bundle\FrameworkBundle\Tests\Fixtures\Messenger\DummyMessage; -use Symfony\Bundle\FrameworkBundle\Tests\Fixtures\Messenger\FooMessage; -use Symfony\Bundle\FrameworkBundle\Tests\Fixtures\Messenger\SecondMessage; use Symfony\Bundle\FrameworkBundle\Tests\TestCase; use Symfony\Bundle\FullStack; use Symfony\Component\Cache\Adapter\AdapterInterface; @@ -784,17 +782,6 @@ abstract class FrameworkExtensionTest extends TestCase $this->createContainerFromFile('messenger_routing_invalid_transport'); } - public function testMessengerSyncTransport() - { - $container = $this->createContainerFromFile('messenger_sync_transport'); - $senderLocatorDefinition = $container->getDefinition('messenger.senders_locator'); - - $sendersMapping = $senderLocatorDefinition->getArgument(0); - $this->assertEquals(['amqp'], $sendersMapping[DummyMessage::class]); - $this->assertArrayNotHasKey(SecondMessage::class, $sendersMapping); - $this->assertEquals(['amqp'], $sendersMapping[FooMessage::class]); - } - public function testTranslator() { $container = $this->createContainerFromFile('full'); diff --git a/src/Symfony/Component/Messenger/CHANGELOG.md b/src/Symfony/Component/Messenger/CHANGELOG.md index 68d180d0e7..30e195f7ee 100644 --- a/src/Symfony/Component/Messenger/CHANGELOG.md +++ b/src/Symfony/Component/Messenger/CHANGELOG.md @@ -4,7 +4,6 @@ CHANGELOG 4.4.0 ----- - * [BC BREAK] The `SyncTransport` and `SyncTransportFactory` classes were removed. * Deprecated passing a `ContainerInterface` instance as first argument of the `ConsumeMessagesCommand` constructor, pass a `RoutableMessageBus` instance instead. * Added support for auto trimming of Redis streams. diff --git a/src/Symfony/Component/Messenger/Tests/Transport/Sync/SyncTransportFactoryTest.php b/src/Symfony/Component/Messenger/Tests/Transport/Sync/SyncTransportFactoryTest.php new file mode 100644 index 0000000000..021c7ae970 --- /dev/null +++ b/src/Symfony/Component/Messenger/Tests/Transport/Sync/SyncTransportFactoryTest.php @@ -0,0 +1,30 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Tests\Transport\AmqpExt; + +use PHPUnit\Framework\TestCase; +use Symfony\Component\Messenger\MessageBusInterface; +use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; +use Symfony\Component\Messenger\Transport\Sync\SyncTransport; +use Symfony\Component\Messenger\Transport\Sync\SyncTransportFactory; + +class SyncTransportFactoryTest extends TestCase +{ + public function testCreateTransport() + { + $serializer = $this->createMock(SerializerInterface::class); + $bus = $this->createMock(MessageBusInterface::class); + $factory = new SyncTransportFactory($bus); + $transport = $factory->createTransport('sync://', [], $serializer); + $this->assertInstanceOf(SyncTransport::class, $transport); + } +} diff --git a/src/Symfony/Component/Messenger/Tests/Transport/Sync/SyncTransportTest.php b/src/Symfony/Component/Messenger/Tests/Transport/Sync/SyncTransportTest.php new file mode 100644 index 0000000000..13549e2758 --- /dev/null +++ b/src/Symfony/Component/Messenger/Tests/Transport/Sync/SyncTransportTest.php @@ -0,0 +1,41 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Tests\Transport\AmqpExt; + +use PHPUnit\Framework\TestCase; +use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\MessageBusInterface; +use Symfony\Component\Messenger\Stamp\ReceivedStamp; +use Symfony\Component\Messenger\Transport\Sync\SyncTransport; + +class SyncTransportTest extends TestCase +{ + public function testSend() + { + $bus = $this->createMock(MessageBusInterface::class); + $bus->expects($this->once()) + ->method('dispatch') + ->with($this->callback(function ($arg) { + $this->assertInstanceOf(Envelope::class, $arg); + + return true; + })) + ->willReturnArgument(0); + $message = new \stdClass(); + $envelope = new Envelope($message); + $transport = new SyncTransport($bus); + $envelope = $transport->send($envelope); + + $this->assertSame($message, $envelope->getMessage()); + $this->assertNotNull($envelope->last(ReceivedStamp::class)); + } +} diff --git a/src/Symfony/Component/Messenger/Transport/Sync/SyncTransport.php b/src/Symfony/Component/Messenger/Transport/Sync/SyncTransport.php new file mode 100644 index 0000000000..67af903913 --- /dev/null +++ b/src/Symfony/Component/Messenger/Transport/Sync/SyncTransport.php @@ -0,0 +1,65 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Transport\Sync; + +use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Exception\InvalidArgumentException; +use Symfony\Component\Messenger\MessageBusInterface; +use Symfony\Component\Messenger\Stamp\ReceivedStamp; +use Symfony\Component\Messenger\Stamp\SentStamp; +use Symfony\Component\Messenger\Transport\TransportInterface; + +/** + * Transport that immediately marks messages as received and dispatches for handling. + * + * @author Ryan Weaver + */ +class SyncTransport implements TransportInterface +{ + private $messageBus; + + public function __construct(MessageBusInterface $messageBus) + { + $this->messageBus = $messageBus; + } + + public function get(): iterable + { + throw new InvalidArgumentException('You cannot receive messages from the Messenger SyncTransport.'); + } + + public function stop(): void + { + throw new InvalidArgumentException('You cannot call stop() on the Messenger SyncTransport.'); + } + + public function ack(Envelope $envelope): void + { + throw new InvalidArgumentException('You cannot call ack() on the Messenger SyncTransport.'); + } + + public function reject(Envelope $envelope): void + { + throw new InvalidArgumentException('You cannot call reject() on the Messenger SyncTransport.'); + } + + public function send(Envelope $envelope): Envelope + { + /** @var SentStamp|null $sentStamp */ + $sentStamp = $envelope->last(SentStamp::class); + $alias = null === $sentStamp ? 'sync' : ($sentStamp->getSenderAlias() ?: $sentStamp->getSenderClass()); + + $envelope = $envelope->with(new ReceivedStamp($alias)); + + return $this->messageBus->dispatch($envelope); + } +} diff --git a/src/Symfony/Component/Messenger/Transport/Sync/SyncTransportFactory.php b/src/Symfony/Component/Messenger/Transport/Sync/SyncTransportFactory.php new file mode 100644 index 0000000000..1784bfb797 --- /dev/null +++ b/src/Symfony/Component/Messenger/Transport/Sync/SyncTransportFactory.php @@ -0,0 +1,40 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Transport\Sync; + +use Symfony\Component\Messenger\MessageBusInterface; +use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; +use Symfony\Component\Messenger\Transport\TransportFactoryInterface; +use Symfony\Component\Messenger\Transport\TransportInterface; + +/** + * @author Ryan Weaver + */ +class SyncTransportFactory implements TransportFactoryInterface +{ + private $messageBus; + + public function __construct(MessageBusInterface $messageBus) + { + $this->messageBus = $messageBus; + } + + public function createTransport(string $dsn, array $options, SerializerInterface $serializer): TransportInterface + { + return new SyncTransport($this->messageBus); + } + + public function supports(string $dsn, array $options): bool + { + return 0 === strpos($dsn, 'sync://'); + } +}