Revert "[Messenger] Removing "sync" transport and replacing it with much nicer config trick"
This reverts commit 3d4e59a10b
.
This commit is contained in:
parent
0472dbfccb
commit
38f19a960c
@ -1775,27 +1775,18 @@ class FrameworkExtension extends Extension
|
||||
$container->setAlias('messenger.default_serializer', $config['serializer']['default_serializer']);
|
||||
}
|
||||
|
||||
$senderReferences = [];
|
||||
$syncTransports = [];
|
||||
$senderAliases = [];
|
||||
$transportRetryReferences = [];
|
||||
foreach ($config['transports'] as $name => $transport) {
|
||||
$serializerId = $transport['serializer'] ?? 'messenger.default_serializer';
|
||||
|
||||
if (0 === strpos($transport['dsn'], 'sync://')) {
|
||||
$syncTransports[] = $name;
|
||||
} else {
|
||||
$transportDefinition = (new Definition(TransportInterface::class))
|
||||
->setFactory([new Reference('messenger.transport_factory'), 'createTransport'])
|
||||
->setArguments([$transport['dsn'], $transport['options'] + ['transport_name' => $name], new Reference($serializerId)])
|
||||
->addTag('messenger.receiver', ['alias' => $name])
|
||||
;
|
||||
$container->setDefinition($transportId = 'messenger.transport.'.$name, $transportDefinition);
|
||||
|
||||
// alias => service_id
|
||||
$senderReferences[$name] = new Reference($transportId);
|
||||
// service_id => service_id
|
||||
$senderReferences[$transportId] = new Reference($transportId);
|
||||
}
|
||||
$senderAliases[$name] = $transportId;
|
||||
|
||||
if (null !== $transport['retry_strategy']['service']) {
|
||||
$transportRetryReferences[$name] = new Reference($transport['retry_strategy']['service']);
|
||||
@ -1813,25 +1804,30 @@ class FrameworkExtension extends Extension
|
||||
}
|
||||
}
|
||||
|
||||
$senderReferences = [];
|
||||
// alias => service_id
|
||||
foreach ($senderAliases as $alias => $serviceId) {
|
||||
$senderReferences[$alias] = new Reference($serviceId);
|
||||
}
|
||||
// service_id => service_id
|
||||
foreach ($senderAliases as $serviceId) {
|
||||
$senderReferences[$serviceId] = new Reference($serviceId);
|
||||
}
|
||||
|
||||
$messageToSendersMapping = [];
|
||||
foreach ($config['routing'] as $message => $messageConfiguration) {
|
||||
if ('*' !== $message && !class_exists($message) && !interface_exists($message, false)) {
|
||||
throw new LogicException(sprintf('Invalid Messenger routing configuration: class or interface "%s" not found.', $message));
|
||||
}
|
||||
|
||||
// filter out "sync" senders
|
||||
$realSenders = [];
|
||||
// make sure senderAliases contains all senders
|
||||
foreach ($messageConfiguration['senders'] as $sender) {
|
||||
if (isset($senderReferences[$sender])) {
|
||||
$realSenders[] = $sender;
|
||||
} elseif (!\in_array($sender, $syncTransports, true)) {
|
||||
if (!isset($senderReferences[$sender])) {
|
||||
throw new LogicException(sprintf('Invalid Messenger routing configuration: the "%s" class is being routed to a sender called "%s". This is not a valid transport or service id.', $message, $sender));
|
||||
}
|
||||
}
|
||||
|
||||
if ($realSenders) {
|
||||
$messageToSendersMapping[$message] = $realSenders;
|
||||
}
|
||||
$messageToSendersMapping[$message] = $messageConfiguration['senders'];
|
||||
}
|
||||
|
||||
$container->getDefinition('messenger.senders_locator')
|
||||
|
@ -75,6 +75,11 @@
|
||||
<tag name="messenger.transport_factory" />
|
||||
</service>
|
||||
|
||||
<service id="messenger.transport.sync.factory" class="Symfony\Component\Messenger\Transport\Sync\SyncTransportFactory">
|
||||
<tag name="messenger.transport_factory" />
|
||||
<argument type="service" id="messenger.routable_message_bus" />
|
||||
</service>
|
||||
|
||||
<service id="messenger.transport.in_memory.factory" class="Symfony\Component\Messenger\Transport\InMemoryTransportFactory">
|
||||
<tag name="messenger.transport_factory" />
|
||||
<tag name="kernel.reset" method="reset" />
|
||||
|
@ -1,15 +0,0 @@
|
||||
<?php
|
||||
|
||||
$container->loadFromExtension('framework', [
|
||||
'messenger' => [
|
||||
'transports' => [
|
||||
'amqp' => 'amqp://localhost/%2f/messages',
|
||||
'sync' => 'sync://',
|
||||
],
|
||||
'routing' => [
|
||||
'Symfony\Bundle\FrameworkBundle\Tests\Fixtures\Messenger\DummyMessage' => ['amqp'],
|
||||
'Symfony\Bundle\FrameworkBundle\Tests\Fixtures\Messenger\SecondMessage' => ['sync'],
|
||||
'Symfony\Bundle\FrameworkBundle\Tests\Fixtures\Messenger\FooMessage' => ['amqp', 'sync'],
|
||||
],
|
||||
],
|
||||
]);
|
@ -1,24 +0,0 @@
|
||||
<?xml version="1.0" encoding="utf-8" ?>
|
||||
<container xmlns="http://symfony.com/schema/dic/services"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xmlns:framework="http://symfony.com/schema/dic/symfony"
|
||||
xsi:schemaLocation="http://symfony.com/schema/dic/services https://symfony.com/schema/dic/services/services-1.0.xsd
|
||||
http://symfony.com/schema/dic/symfony https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
|
||||
|
||||
<framework:config>
|
||||
<framework:messenger>
|
||||
<framework:routing message-class="Symfony\Bundle\FrameworkBundle\Tests\Fixtures\Messenger\DummyMessage">
|
||||
<framework:sender service="amqp" />
|
||||
</framework:routing>
|
||||
<framework:routing message-class="Symfony\Bundle\FrameworkBundle\Tests\Fixtures\Messenger\SecondMessage">
|
||||
<framework:sender service="sync" />
|
||||
</framework:routing>
|
||||
<framework:routing message-class="Symfony\Bundle\FrameworkBundle\Tests\Fixtures\Messenger\FooMessage">
|
||||
<framework:sender service="amqp" />
|
||||
<framework:sender service="sync" />
|
||||
</framework:routing>
|
||||
<framework:transport name="amqp" dsn="amqp://localhost/%2f/messages" />
|
||||
<framework:transport name="sync" dsn="sync://" />
|
||||
</framework:messenger>
|
||||
</framework:config>
|
||||
</container>
|
@ -1,10 +0,0 @@
|
||||
framework:
|
||||
messenger:
|
||||
transports:
|
||||
amqp: 'amqp://localhost/%2f/messages'
|
||||
sync: 'sync://'
|
||||
|
||||
routing:
|
||||
'Symfony\Bundle\FrameworkBundle\Tests\Fixtures\Messenger\DummyMessage': amqp
|
||||
'Symfony\Bundle\FrameworkBundle\Tests\Fixtures\Messenger\SecondMessage': sync
|
||||
'Symfony\Bundle\FrameworkBundle\Tests\Fixtures\Messenger\FooMessage': [amqp, sync]
|
@ -16,8 +16,6 @@ use Psr\Log\LoggerAwareInterface;
|
||||
use Symfony\Bundle\FrameworkBundle\DependencyInjection\Compiler\AddAnnotationsCachedReaderPass;
|
||||
use Symfony\Bundle\FrameworkBundle\DependencyInjection\FrameworkExtension;
|
||||
use Symfony\Bundle\FrameworkBundle\Tests\Fixtures\Messenger\DummyMessage;
|
||||
use Symfony\Bundle\FrameworkBundle\Tests\Fixtures\Messenger\FooMessage;
|
||||
use Symfony\Bundle\FrameworkBundle\Tests\Fixtures\Messenger\SecondMessage;
|
||||
use Symfony\Bundle\FrameworkBundle\Tests\TestCase;
|
||||
use Symfony\Bundle\FullStack;
|
||||
use Symfony\Component\Cache\Adapter\AdapterInterface;
|
||||
@ -784,17 +782,6 @@ abstract class FrameworkExtensionTest extends TestCase
|
||||
$this->createContainerFromFile('messenger_routing_invalid_transport');
|
||||
}
|
||||
|
||||
public function testMessengerSyncTransport()
|
||||
{
|
||||
$container = $this->createContainerFromFile('messenger_sync_transport');
|
||||
$senderLocatorDefinition = $container->getDefinition('messenger.senders_locator');
|
||||
|
||||
$sendersMapping = $senderLocatorDefinition->getArgument(0);
|
||||
$this->assertEquals(['amqp'], $sendersMapping[DummyMessage::class]);
|
||||
$this->assertArrayNotHasKey(SecondMessage::class, $sendersMapping);
|
||||
$this->assertEquals(['amqp'], $sendersMapping[FooMessage::class]);
|
||||
}
|
||||
|
||||
public function testTranslator()
|
||||
{
|
||||
$container = $this->createContainerFromFile('full');
|
||||
|
@ -4,7 +4,6 @@ CHANGELOG
|
||||
4.4.0
|
||||
-----
|
||||
|
||||
* [BC BREAK] The `SyncTransport` and `SyncTransportFactory` classes were removed.
|
||||
* Deprecated passing a `ContainerInterface` instance as first argument of the `ConsumeMessagesCommand` constructor,
|
||||
pass a `RoutableMessageBus` instance instead.
|
||||
* Added support for auto trimming of Redis streams.
|
||||
|
@ -0,0 +1,30 @@
|
||||
<?php
|
||||
|
||||
/*
|
||||
* This file is part of the Symfony package.
|
||||
*
|
||||
* (c) Fabien Potencier <fabien@symfony.com>
|
||||
*
|
||||
* For the full copyright and license information, please view the LICENSE
|
||||
* file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
namespace Symfony\Component\Messenger\Tests\Transport\AmqpExt;
|
||||
|
||||
use PHPUnit\Framework\TestCase;
|
||||
use Symfony\Component\Messenger\MessageBusInterface;
|
||||
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
|
||||
use Symfony\Component\Messenger\Transport\Sync\SyncTransport;
|
||||
use Symfony\Component\Messenger\Transport\Sync\SyncTransportFactory;
|
||||
|
||||
class SyncTransportFactoryTest extends TestCase
|
||||
{
|
||||
public function testCreateTransport()
|
||||
{
|
||||
$serializer = $this->createMock(SerializerInterface::class);
|
||||
$bus = $this->createMock(MessageBusInterface::class);
|
||||
$factory = new SyncTransportFactory($bus);
|
||||
$transport = $factory->createTransport('sync://', [], $serializer);
|
||||
$this->assertInstanceOf(SyncTransport::class, $transport);
|
||||
}
|
||||
}
|
@ -0,0 +1,41 @@
|
||||
<?php
|
||||
|
||||
/*
|
||||
* This file is part of the Symfony package.
|
||||
*
|
||||
* (c) Fabien Potencier <fabien@symfony.com>
|
||||
*
|
||||
* For the full copyright and license information, please view the LICENSE
|
||||
* file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
namespace Symfony\Component\Messenger\Tests\Transport\AmqpExt;
|
||||
|
||||
use PHPUnit\Framework\TestCase;
|
||||
use Symfony\Component\Messenger\Envelope;
|
||||
use Symfony\Component\Messenger\MessageBusInterface;
|
||||
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
|
||||
use Symfony\Component\Messenger\Transport\Sync\SyncTransport;
|
||||
|
||||
class SyncTransportTest extends TestCase
|
||||
{
|
||||
public function testSend()
|
||||
{
|
||||
$bus = $this->createMock(MessageBusInterface::class);
|
||||
$bus->expects($this->once())
|
||||
->method('dispatch')
|
||||
->with($this->callback(function ($arg) {
|
||||
$this->assertInstanceOf(Envelope::class, $arg);
|
||||
|
||||
return true;
|
||||
}))
|
||||
->willReturnArgument(0);
|
||||
$message = new \stdClass();
|
||||
$envelope = new Envelope($message);
|
||||
$transport = new SyncTransport($bus);
|
||||
$envelope = $transport->send($envelope);
|
||||
|
||||
$this->assertSame($message, $envelope->getMessage());
|
||||
$this->assertNotNull($envelope->last(ReceivedStamp::class));
|
||||
}
|
||||
}
|
@ -0,0 +1,65 @@
|
||||
<?php
|
||||
|
||||
/*
|
||||
* This file is part of the Symfony package.
|
||||
*
|
||||
* (c) Fabien Potencier <fabien@symfony.com>
|
||||
*
|
||||
* For the full copyright and license information, please view the LICENSE
|
||||
* file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
namespace Symfony\Component\Messenger\Transport\Sync;
|
||||
|
||||
use Symfony\Component\Messenger\Envelope;
|
||||
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
|
||||
use Symfony\Component\Messenger\MessageBusInterface;
|
||||
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
|
||||
use Symfony\Component\Messenger\Stamp\SentStamp;
|
||||
use Symfony\Component\Messenger\Transport\TransportInterface;
|
||||
|
||||
/**
|
||||
* Transport that immediately marks messages as received and dispatches for handling.
|
||||
*
|
||||
* @author Ryan Weaver <ryan@symfonycasts.com>
|
||||
*/
|
||||
class SyncTransport implements TransportInterface
|
||||
{
|
||||
private $messageBus;
|
||||
|
||||
public function __construct(MessageBusInterface $messageBus)
|
||||
{
|
||||
$this->messageBus = $messageBus;
|
||||
}
|
||||
|
||||
public function get(): iterable
|
||||
{
|
||||
throw new InvalidArgumentException('You cannot receive messages from the Messenger SyncTransport.');
|
||||
}
|
||||
|
||||
public function stop(): void
|
||||
{
|
||||
throw new InvalidArgumentException('You cannot call stop() on the Messenger SyncTransport.');
|
||||
}
|
||||
|
||||
public function ack(Envelope $envelope): void
|
||||
{
|
||||
throw new InvalidArgumentException('You cannot call ack() on the Messenger SyncTransport.');
|
||||
}
|
||||
|
||||
public function reject(Envelope $envelope): void
|
||||
{
|
||||
throw new InvalidArgumentException('You cannot call reject() on the Messenger SyncTransport.');
|
||||
}
|
||||
|
||||
public function send(Envelope $envelope): Envelope
|
||||
{
|
||||
/** @var SentStamp|null $sentStamp */
|
||||
$sentStamp = $envelope->last(SentStamp::class);
|
||||
$alias = null === $sentStamp ? 'sync' : ($sentStamp->getSenderAlias() ?: $sentStamp->getSenderClass());
|
||||
|
||||
$envelope = $envelope->with(new ReceivedStamp($alias));
|
||||
|
||||
return $this->messageBus->dispatch($envelope);
|
||||
}
|
||||
}
|
@ -0,0 +1,40 @@
|
||||
<?php
|
||||
|
||||
/*
|
||||
* This file is part of the Symfony package.
|
||||
*
|
||||
* (c) Fabien Potencier <fabien@symfony.com>
|
||||
*
|
||||
* For the full copyright and license information, please view the LICENSE
|
||||
* file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
namespace Symfony\Component\Messenger\Transport\Sync;
|
||||
|
||||
use Symfony\Component\Messenger\MessageBusInterface;
|
||||
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
|
||||
use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
|
||||
use Symfony\Component\Messenger\Transport\TransportInterface;
|
||||
|
||||
/**
|
||||
* @author Ryan Weaver <ryan@symfonycasts.com>
|
||||
*/
|
||||
class SyncTransportFactory implements TransportFactoryInterface
|
||||
{
|
||||
private $messageBus;
|
||||
|
||||
public function __construct(MessageBusInterface $messageBus)
|
||||
{
|
||||
$this->messageBus = $messageBus;
|
||||
}
|
||||
|
||||
public function createTransport(string $dsn, array $options, SerializerInterface $serializer): TransportInterface
|
||||
{
|
||||
return new SyncTransport($this->messageBus);
|
||||
}
|
||||
|
||||
public function supports(string $dsn, array $options): bool
|
||||
{
|
||||
return 0 === strpos($dsn, 'sync://');
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user