diff --git a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php
index 0dae2eebcd..246828f410 100644
--- a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php
+++ b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php
@@ -1775,27 +1775,18 @@ class FrameworkExtension extends Extension
$container->setAlias('messenger.default_serializer', $config['serializer']['default_serializer']);
}
- $senderReferences = [];
- $syncTransports = [];
+ $senderAliases = [];
$transportRetryReferences = [];
foreach ($config['transports'] as $name => $transport) {
$serializerId = $transport['serializer'] ?? 'messenger.default_serializer';
- if (0 === strpos($transport['dsn'], 'sync://')) {
- $syncTransports[] = $name;
- } else {
- $transportDefinition = (new Definition(TransportInterface::class))
- ->setFactory([new Reference('messenger.transport_factory'), 'createTransport'])
- ->setArguments([$transport['dsn'], $transport['options'] + ['transport_name' => $name], new Reference($serializerId)])
- ->addTag('messenger.receiver', ['alias' => $name])
- ;
- $container->setDefinition($transportId = 'messenger.transport.'.$name, $transportDefinition);
-
- // alias => service_id
- $senderReferences[$name] = new Reference($transportId);
- // service_id => service_id
- $senderReferences[$transportId] = new Reference($transportId);
- }
+ $transportDefinition = (new Definition(TransportInterface::class))
+ ->setFactory([new Reference('messenger.transport_factory'), 'createTransport'])
+ ->setArguments([$transport['dsn'], $transport['options'] + ['transport_name' => $name], new Reference($serializerId)])
+ ->addTag('messenger.receiver', ['alias' => $name])
+ ;
+ $container->setDefinition($transportId = 'messenger.transport.'.$name, $transportDefinition);
+ $senderAliases[$name] = $transportId;
if (null !== $transport['retry_strategy']['service']) {
$transportRetryReferences[$name] = new Reference($transport['retry_strategy']['service']);
@@ -1813,25 +1804,30 @@ class FrameworkExtension extends Extension
}
}
+ $senderReferences = [];
+ // alias => service_id
+ foreach ($senderAliases as $alias => $serviceId) {
+ $senderReferences[$alias] = new Reference($serviceId);
+ }
+ // service_id => service_id
+ foreach ($senderAliases as $serviceId) {
+ $senderReferences[$serviceId] = new Reference($serviceId);
+ }
+
$messageToSendersMapping = [];
foreach ($config['routing'] as $message => $messageConfiguration) {
if ('*' !== $message && !class_exists($message) && !interface_exists($message, false)) {
throw new LogicException(sprintf('Invalid Messenger routing configuration: class or interface "%s" not found.', $message));
}
- // filter out "sync" senders
- $realSenders = [];
+ // make sure senderAliases contains all senders
foreach ($messageConfiguration['senders'] as $sender) {
- if (isset($senderReferences[$sender])) {
- $realSenders[] = $sender;
- } elseif (!\in_array($sender, $syncTransports, true)) {
+ if (!isset($senderReferences[$sender])) {
throw new LogicException(sprintf('Invalid Messenger routing configuration: the "%s" class is being routed to a sender called "%s". This is not a valid transport or service id.', $message, $sender));
}
}
- if ($realSenders) {
- $messageToSendersMapping[$message] = $realSenders;
- }
+ $messageToSendersMapping[$message] = $messageConfiguration['senders'];
}
$container->getDefinition('messenger.senders_locator')
diff --git a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml
index 6debf4a3c2..2284dd7963 100644
--- a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml
+++ b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml
@@ -75,6 +75,11 @@
+
+
+
+
+
diff --git a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/php/messenger_sync_transport.php b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/php/messenger_sync_transport.php
deleted file mode 100644
index 09f8c53fc3..0000000000
--- a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/php/messenger_sync_transport.php
+++ /dev/null
@@ -1,15 +0,0 @@
-loadFromExtension('framework', [
- 'messenger' => [
- 'transports' => [
- 'amqp' => 'amqp://localhost/%2f/messages',
- 'sync' => 'sync://',
- ],
- 'routing' => [
- 'Symfony\Bundle\FrameworkBundle\Tests\Fixtures\Messenger\DummyMessage' => ['amqp'],
- 'Symfony\Bundle\FrameworkBundle\Tests\Fixtures\Messenger\SecondMessage' => ['sync'],
- 'Symfony\Bundle\FrameworkBundle\Tests\Fixtures\Messenger\FooMessage' => ['amqp', 'sync'],
- ],
- ],
-]);
diff --git a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/xml/messenger_sync_transport.xml b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/xml/messenger_sync_transport.xml
deleted file mode 100644
index 2345cc2342..0000000000
--- a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/xml/messenger_sync_transport.xml
+++ /dev/null
@@ -1,24 +0,0 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
diff --git a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/yml/messenger_sync_transport.yml b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/yml/messenger_sync_transport.yml
deleted file mode 100644
index f7b4fdefe0..0000000000
--- a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/yml/messenger_sync_transport.yml
+++ /dev/null
@@ -1,10 +0,0 @@
-framework:
- messenger:
- transports:
- amqp: 'amqp://localhost/%2f/messages'
- sync: 'sync://'
-
- routing:
- 'Symfony\Bundle\FrameworkBundle\Tests\Fixtures\Messenger\DummyMessage': amqp
- 'Symfony\Bundle\FrameworkBundle\Tests\Fixtures\Messenger\SecondMessage': sync
- 'Symfony\Bundle\FrameworkBundle\Tests\Fixtures\Messenger\FooMessage': [amqp, sync]
diff --git a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php
index fe49656032..1e97f61d7c 100644
--- a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php
+++ b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php
@@ -16,8 +16,6 @@ use Psr\Log\LoggerAwareInterface;
use Symfony\Bundle\FrameworkBundle\DependencyInjection\Compiler\AddAnnotationsCachedReaderPass;
use Symfony\Bundle\FrameworkBundle\DependencyInjection\FrameworkExtension;
use Symfony\Bundle\FrameworkBundle\Tests\Fixtures\Messenger\DummyMessage;
-use Symfony\Bundle\FrameworkBundle\Tests\Fixtures\Messenger\FooMessage;
-use Symfony\Bundle\FrameworkBundle\Tests\Fixtures\Messenger\SecondMessage;
use Symfony\Bundle\FrameworkBundle\Tests\TestCase;
use Symfony\Bundle\FullStack;
use Symfony\Component\Cache\Adapter\AdapterInterface;
@@ -784,17 +782,6 @@ abstract class FrameworkExtensionTest extends TestCase
$this->createContainerFromFile('messenger_routing_invalid_transport');
}
- public function testMessengerSyncTransport()
- {
- $container = $this->createContainerFromFile('messenger_sync_transport');
- $senderLocatorDefinition = $container->getDefinition('messenger.senders_locator');
-
- $sendersMapping = $senderLocatorDefinition->getArgument(0);
- $this->assertEquals(['amqp'], $sendersMapping[DummyMessage::class]);
- $this->assertArrayNotHasKey(SecondMessage::class, $sendersMapping);
- $this->assertEquals(['amqp'], $sendersMapping[FooMessage::class]);
- }
-
public function testTranslator()
{
$container = $this->createContainerFromFile('full');
diff --git a/src/Symfony/Component/Messenger/CHANGELOG.md b/src/Symfony/Component/Messenger/CHANGELOG.md
index 68d180d0e7..30e195f7ee 100644
--- a/src/Symfony/Component/Messenger/CHANGELOG.md
+++ b/src/Symfony/Component/Messenger/CHANGELOG.md
@@ -4,7 +4,6 @@ CHANGELOG
4.4.0
-----
- * [BC BREAK] The `SyncTransport` and `SyncTransportFactory` classes were removed.
* Deprecated passing a `ContainerInterface` instance as first argument of the `ConsumeMessagesCommand` constructor,
pass a `RoutableMessageBus` instance instead.
* Added support for auto trimming of Redis streams.
diff --git a/src/Symfony/Component/Messenger/Tests/Transport/Sync/SyncTransportFactoryTest.php b/src/Symfony/Component/Messenger/Tests/Transport/Sync/SyncTransportFactoryTest.php
new file mode 100644
index 0000000000..021c7ae970
--- /dev/null
+++ b/src/Symfony/Component/Messenger/Tests/Transport/Sync/SyncTransportFactoryTest.php
@@ -0,0 +1,30 @@
+
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Symfony\Component\Messenger\Tests\Transport\AmqpExt;
+
+use PHPUnit\Framework\TestCase;
+use Symfony\Component\Messenger\MessageBusInterface;
+use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
+use Symfony\Component\Messenger\Transport\Sync\SyncTransport;
+use Symfony\Component\Messenger\Transport\Sync\SyncTransportFactory;
+
+class SyncTransportFactoryTest extends TestCase
+{
+ public function testCreateTransport()
+ {
+ $serializer = $this->createMock(SerializerInterface::class);
+ $bus = $this->createMock(MessageBusInterface::class);
+ $factory = new SyncTransportFactory($bus);
+ $transport = $factory->createTransport('sync://', [], $serializer);
+ $this->assertInstanceOf(SyncTransport::class, $transport);
+ }
+}
diff --git a/src/Symfony/Component/Messenger/Tests/Transport/Sync/SyncTransportTest.php b/src/Symfony/Component/Messenger/Tests/Transport/Sync/SyncTransportTest.php
new file mode 100644
index 0000000000..13549e2758
--- /dev/null
+++ b/src/Symfony/Component/Messenger/Tests/Transport/Sync/SyncTransportTest.php
@@ -0,0 +1,41 @@
+
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Symfony\Component\Messenger\Tests\Transport\AmqpExt;
+
+use PHPUnit\Framework\TestCase;
+use Symfony\Component\Messenger\Envelope;
+use Symfony\Component\Messenger\MessageBusInterface;
+use Symfony\Component\Messenger\Stamp\ReceivedStamp;
+use Symfony\Component\Messenger\Transport\Sync\SyncTransport;
+
+class SyncTransportTest extends TestCase
+{
+ public function testSend()
+ {
+ $bus = $this->createMock(MessageBusInterface::class);
+ $bus->expects($this->once())
+ ->method('dispatch')
+ ->with($this->callback(function ($arg) {
+ $this->assertInstanceOf(Envelope::class, $arg);
+
+ return true;
+ }))
+ ->willReturnArgument(0);
+ $message = new \stdClass();
+ $envelope = new Envelope($message);
+ $transport = new SyncTransport($bus);
+ $envelope = $transport->send($envelope);
+
+ $this->assertSame($message, $envelope->getMessage());
+ $this->assertNotNull($envelope->last(ReceivedStamp::class));
+ }
+}
diff --git a/src/Symfony/Component/Messenger/Transport/Sync/SyncTransport.php b/src/Symfony/Component/Messenger/Transport/Sync/SyncTransport.php
new file mode 100644
index 0000000000..67af903913
--- /dev/null
+++ b/src/Symfony/Component/Messenger/Transport/Sync/SyncTransport.php
@@ -0,0 +1,65 @@
+
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Symfony\Component\Messenger\Transport\Sync;
+
+use Symfony\Component\Messenger\Envelope;
+use Symfony\Component\Messenger\Exception\InvalidArgumentException;
+use Symfony\Component\Messenger\MessageBusInterface;
+use Symfony\Component\Messenger\Stamp\ReceivedStamp;
+use Symfony\Component\Messenger\Stamp\SentStamp;
+use Symfony\Component\Messenger\Transport\TransportInterface;
+
+/**
+ * Transport that immediately marks messages as received and dispatches for handling.
+ *
+ * @author Ryan Weaver
+ */
+class SyncTransport implements TransportInterface
+{
+ private $messageBus;
+
+ public function __construct(MessageBusInterface $messageBus)
+ {
+ $this->messageBus = $messageBus;
+ }
+
+ public function get(): iterable
+ {
+ throw new InvalidArgumentException('You cannot receive messages from the Messenger SyncTransport.');
+ }
+
+ public function stop(): void
+ {
+ throw new InvalidArgumentException('You cannot call stop() on the Messenger SyncTransport.');
+ }
+
+ public function ack(Envelope $envelope): void
+ {
+ throw new InvalidArgumentException('You cannot call ack() on the Messenger SyncTransport.');
+ }
+
+ public function reject(Envelope $envelope): void
+ {
+ throw new InvalidArgumentException('You cannot call reject() on the Messenger SyncTransport.');
+ }
+
+ public function send(Envelope $envelope): Envelope
+ {
+ /** @var SentStamp|null $sentStamp */
+ $sentStamp = $envelope->last(SentStamp::class);
+ $alias = null === $sentStamp ? 'sync' : ($sentStamp->getSenderAlias() ?: $sentStamp->getSenderClass());
+
+ $envelope = $envelope->with(new ReceivedStamp($alias));
+
+ return $this->messageBus->dispatch($envelope);
+ }
+}
diff --git a/src/Symfony/Component/Messenger/Transport/Sync/SyncTransportFactory.php b/src/Symfony/Component/Messenger/Transport/Sync/SyncTransportFactory.php
new file mode 100644
index 0000000000..1784bfb797
--- /dev/null
+++ b/src/Symfony/Component/Messenger/Transport/Sync/SyncTransportFactory.php
@@ -0,0 +1,40 @@
+
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Symfony\Component\Messenger\Transport\Sync;
+
+use Symfony\Component\Messenger\MessageBusInterface;
+use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
+use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
+use Symfony\Component\Messenger\Transport\TransportInterface;
+
+/**
+ * @author Ryan Weaver
+ */
+class SyncTransportFactory implements TransportFactoryInterface
+{
+ private $messageBus;
+
+ public function __construct(MessageBusInterface $messageBus)
+ {
+ $this->messageBus = $messageBus;
+ }
+
+ public function createTransport(string $dsn, array $options, SerializerInterface $serializer): TransportInterface
+ {
+ return new SyncTransport($this->messageBus);
+ }
+
+ public function supports(string $dsn, array $options): bool
+ {
+ return 0 === strpos($dsn, 'sync://');
+ }
+}