From 6f9fdaf7e4a218a88095a102e603bfc3cfd7d52d Mon Sep 17 00:00:00 2001 From: "fedor.f" Date: Wed, 13 Feb 2019 11:30:45 +0200 Subject: [PATCH] publish message with custom queue options : flags | attributes --- .../Tests/Transport/AmqpExt/ConnectionTest.php | 18 ++++++++++++++++++ .../Messenger/Transport/AmqpExt/Connection.php | 5 ++++- 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/ConnectionTest.php b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/ConnectionTest.php index b7d1be4453..b459c16331 100644 --- a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/ConnectionTest.php +++ b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/ConnectionTest.php @@ -233,6 +233,24 @@ class ConnectionTest extends TestCase $connection = Connection::fromDsn('amqp://localhost/%2f/messages?queue[routing_key]=my_key&auto-setup=false', [], true, $factory); $connection->publish('body'); } + + public function testPublishWithQueueOptions() + { + $factory = new TestAmqpFactory( + $amqpConnection = $this->getMockBuilder(\AMQPConnection::class)->disableOriginalConstructor()->getMock(), + $amqpChannel = $this->getMockBuilder(\AMQPChannel::class)->disableOriginalConstructor()->getMock(), + $amqpQueue = $this->getMockBuilder(\AMQPQueue::class)->disableOriginalConstructor()->getMock(), + $amqpExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock() + ); + $headers = [ + 'type' => '*', + ]; + $amqpExchange->expects($this->once())->method('publish') + ->with('body', null, 1, ['delivery_mode' => 2, 'headers' => $headers]); + + $connection = Connection::fromDsn('amqp://localhost/%2f/messages?queue[attributes][delivery_mode]=2&queue[flags]=1', [], true, $factory); + $connection->publish('body', $headers); + } } class TestAmqpFactory extends AmqpFactory diff --git a/src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php b/src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php index cffc49df29..d56b105a2b 100644 --- a/src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php +++ b/src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php @@ -133,7 +133,10 @@ class Connection $this->setup(); } - $this->exchange()->publish($body, $this->queueConfiguration['routing_key'] ?? null, AMQP_NOPARAM, ['headers' => $headers]); + $flags = $this->queueConfiguration['flags'] ?? AMQP_NOPARAM; + $attributes = array_merge_recursive($this->queueConfiguration['attributes'] ?? [], ['headers' => $headers]); + + $this->exchange()->publish($body, $this->queueConfiguration['routing_key'] ?? null, $flags, $attributes); } /**