Chaining senders with their aliases should work
This commit is contained in:
parent
47aba84899
commit
98bb64c32a
@ -1495,6 +1495,22 @@ class FrameworkExtension extends Extension
|
||||
throw new LogicException(sprintf('The default bus named "%s" is not defined. Define it or change the default bus name.', $config['default_bus']));
|
||||
}
|
||||
|
||||
$senderAliases = array();
|
||||
foreach ($config['transports'] as $name => $transport) {
|
||||
if (0 === strpos($transport['dsn'], 'amqp://') && !$container->hasDefinition('messenger.transport.amqp.factory')) {
|
||||
throw new LogicException('The default AMQP transport is not available. Make sure you have installed and enabled the Serializer component. Try enable it or install it by running "composer require symfony/serializer-pack".');
|
||||
}
|
||||
|
||||
$transportDefinition = (new Definition(TransportInterface::class))
|
||||
->setFactory(array(new Reference('messenger.transport_factory'), 'createTransport'))
|
||||
->setArguments(array($transport['dsn'], $transport['options']))
|
||||
->addTag('messenger.receiver', array('alias' => $name))
|
||||
->addTag('messenger.sender', array('alias' => $name))
|
||||
;
|
||||
$container->setDefinition($transportId = 'messenger.transport.'.$name, $transportDefinition);
|
||||
$senderAliases[$name] = $transportId;
|
||||
}
|
||||
|
||||
$messageToSenderIdMapping = array();
|
||||
$messageToSendAndHandleMapping = array();
|
||||
foreach ($config['routing'] as $message => $messageConfiguration) {
|
||||
@ -1503,8 +1519,11 @@ class FrameworkExtension extends Extension
|
||||
}
|
||||
|
||||
if (1 < \count($messageConfiguration['senders'])) {
|
||||
$senders = array_map(function ($sender) { return new Reference($sender); }, $messageConfiguration['senders']);
|
||||
$senders = array_map(function ($sender) use ($senderAliases) {
|
||||
return new Reference($senderAliases[$sender] ?? $sender);
|
||||
}, $messageConfiguration['senders']);
|
||||
$chainSenderDefinition = new Definition(ChainSender::class, array($senders));
|
||||
$chainSenderDefinition->addTag('messenger.sender');
|
||||
$chainSenderId = '.messenger.chain_sender.'.$message;
|
||||
$container->setDefinition($chainSenderId, $chainSenderDefinition);
|
||||
$messageToSenderIdMapping[$message] = $chainSenderId;
|
||||
@ -1517,20 +1536,6 @@ class FrameworkExtension extends Extension
|
||||
|
||||
$container->getDefinition('messenger.asynchronous.routing.sender_locator')->replaceArgument(1, $messageToSenderIdMapping);
|
||||
$container->getDefinition('messenger.middleware.route_messages')->replaceArgument(1, $messageToSendAndHandleMapping);
|
||||
|
||||
foreach ($config['transports'] as $name => $transport) {
|
||||
if (0 === strpos($transport['dsn'], 'amqp://') && !$container->hasDefinition('messenger.transport.amqp.factory')) {
|
||||
throw new LogicException('The default AMQP transport is not available. Make sure you have installed and enabled the Serializer component. Try enable it or install it by running "composer require symfony/serializer-pack".');
|
||||
}
|
||||
|
||||
$transportDefinition = (new Definition(TransportInterface::class))
|
||||
->setFactory(array(new Reference('messenger.transport_factory'), 'createTransport'))
|
||||
->setArguments(array($transport['dsn'], $transport['options']))
|
||||
->addTag('messenger.receiver', array('alias' => $name))
|
||||
->addTag('messenger.sender', array('alias' => $name))
|
||||
;
|
||||
$container->setDefinition('messenger.transport.'.$name, $transportDefinition);
|
||||
}
|
||||
}
|
||||
|
||||
private function registerCacheConfiguration(array $config, ContainerBuilder $container)
|
||||
|
@ -1,7 +1,9 @@
|
||||
<?php
|
||||
|
||||
$container->loadFromExtension('framework', array(
|
||||
'serializer' => true,
|
||||
'messenger' => array(
|
||||
'serializer' => true,
|
||||
'routing' => array(
|
||||
'Symfony\Component\Messenger\Tests\Fixtures\DummyMessage' => array('amqp', 'audit'),
|
||||
'Symfony\Component\Messenger\Tests\Fixtures\SecondMessage' => array(
|
||||
@ -10,5 +12,8 @@ $container->loadFromExtension('framework', array(
|
||||
),
|
||||
'*' => 'amqp',
|
||||
),
|
||||
'transports' => array(
|
||||
'amqp' => 'amqp://localhost/%2f/messages',
|
||||
),
|
||||
),
|
||||
));
|
||||
|
@ -6,7 +6,9 @@
|
||||
http://symfony.com/schema/dic/symfony http://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
|
||||
|
||||
<framework:config>
|
||||
<framework:serializer enabled="true" />
|
||||
<framework:messenger>
|
||||
<framework:serializer enabled="true" />
|
||||
<framework:routing message-class="Symfony\Component\Messenger\Tests\Fixtures\DummyMessage">
|
||||
<framework:sender service="amqp" />
|
||||
<framework:sender service="audit" />
|
||||
@ -18,6 +20,7 @@
|
||||
<framework:routing message-class="*">
|
||||
<framework:sender service="amqp" />
|
||||
</framework:routing>
|
||||
<framework:transport name="amqp" dsn="amqp://localhost/%2f/messages" />
|
||||
</framework:messenger>
|
||||
</framework:config>
|
||||
</container>
|
||||
|
@ -1,8 +1,12 @@
|
||||
framework:
|
||||
serializer: true
|
||||
messenger:
|
||||
serializer: true
|
||||
routing:
|
||||
'Symfony\Component\Messenger\Tests\Fixtures\DummyMessage': [amqp, audit]
|
||||
'Symfony\Component\Messenger\Tests\Fixtures\SecondMessage':
|
||||
senders: [amqp, audit]
|
||||
send_and_handle: true
|
||||
'*': amqp
|
||||
transports:
|
||||
amqp: 'amqp://localhost/%2f/messages'
|
||||
|
@ -573,6 +573,7 @@ abstract class FrameworkExtensionTest extends TestCase
|
||||
|
||||
$this->assertSame($messageToSenderIdsMapping, $senderLocatorDefinition->getArgument(1));
|
||||
$this->assertSame($messageToSendAndHandleMapping, $sendMessageMiddlewareDefinition->getArgument(1));
|
||||
$this->assertEquals(array(new Reference('messenger.transport.amqp'), new Reference('audit')), $container->getDefinition('.messenger.chain_sender.'.DummyMessage::class)->getArgument(0));
|
||||
}
|
||||
|
||||
/**
|
||||
|
Reference in New Issue
Block a user