From 3211caa5b55b7d8ef3b11de4426258e890264dd8 Mon Sep 17 00:00:00 2001 From: Cedrick Oka Date: Fri, 25 Jan 2019 13:17:45 +0000 Subject: [PATCH] Allow exchange type headers binding --- .../Transport/AmqpExt/ConnectionTest.php | 27 +++++++++++++++++++ .../Transport/AmqpExt/Connection.php | 3 ++- 2 files changed, 29 insertions(+), 1 deletion(-) diff --git a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/ConnectionTest.php b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/ConnectionTest.php index 5d8cd58c98..43cce8a740 100644 --- a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/ConnectionTest.php +++ b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/ConnectionTest.php @@ -273,6 +273,33 @@ class ConnectionTest extends TestCase $connection->publish('body'); } + public function testBindingArguments() + { + $amqpConnection = $this->createMock(\AMQPConnection::class); + $amqpChannel = $this->createMock(\AMQPChannel::class); + $amqpExchange = $this->createMock(\AMQPExchange::class); + $amqpQueue = $this->createMock(\AMQPQueue::class); + + $factory = $this->createMock(AmqpFactory::class); + $factory->method('createConnection')->willReturn($amqpConnection); + $factory->method('createChannel')->willReturn($amqpChannel); + $factory->method('createExchange')->willReturn($amqpExchange); + $factory->method('createQueue')->willReturn($amqpQueue); + + $amqpExchange->expects($this->once())->method('declareExchange'); + $amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => []]); + $amqpQueue->expects($this->once())->method('declareQueue'); + $amqpQueue->expects($this->exactly(1))->method('bind')->withConsecutive( + [self::DEFAULT_EXCHANGE_NAME, null, ['x-match' => 'all']] + ); + + $dsn = 'amqp://localhost?exchange[type]=headers'. + '&queues[queue0][binding_arguments][x-match]=all'; + + $connection = Connection::fromDsn($dsn, [], $factory); + $connection->publish('body'); + } + public function testItCanDisableTheSetup() { $factory = new TestAmqpFactory( diff --git a/src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php b/src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php index e4736db7e2..fb51b57cfc 100644 --- a/src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php +++ b/src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php @@ -81,6 +81,7 @@ class Connection * * password: Password to use the connect to the AMQP service * * queues[name]: An array of queues, keyed by the name * * binding_keys: The binding keys (if any) to bind to this queue + * * binding_arguments: Arguments to be used while binding the queue. * * flags: Queue flags (Default: AMQP_DURABLE) * * arguments: Extra arguments * * exchange: @@ -349,7 +350,7 @@ class Connection foreach ($this->queuesOptions as $queueName => $queueConfig) { $this->queue($queueName)->declareQueue(); foreach ($queueConfig['binding_keys'] ?? [null] as $bindingKey) { - $this->queue($queueName)->bind($this->exchangeOptions['name'], $bindingKey); + $this->queue($queueName)->bind($this->exchangeOptions['name'], $bindingKey, $queueConfig['binding_arguments'] ?? []); } } }