diff --git a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/ConnectionTest.php b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/ConnectionTest.php index b7d1be4453..b8809368e5 100644 --- a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/ConnectionTest.php +++ b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/ConnectionTest.php @@ -233,6 +233,24 @@ class ConnectionTest extends TestCase $connection = Connection::fromDsn('amqp://localhost/%2f/messages?queue[routing_key]=my_key&auto-setup=false', [], true, $factory); $connection->publish('body'); } + + public function testPublishWithQueueOptions() + { + $factory = new TestAmqpFactory( + $amqpConnection = $this->createMock(\AMQPConnection::class), + $amqpChannel = $this->createMock(\AMQPChannel::class), + $amqpQueue = $this->createMock(\AMQPQueue::class), + $amqpExchange = $this->createMock(\AMQPExchange::class) + ); + $headers = [ + 'type' => '*', + ]; + $amqpExchange->expects($this->once())->method('publish') + ->with('body', null, 1, ['delivery_mode' => 2, 'headers' => ['token' => 'uuid', 'type' => '*']]); + + $connection = Connection::fromDsn('amqp://localhost/%2f/messages?queue[attributes][delivery_mode]=2&queue[attributes][headers][token]=uuid&queue[flags]=1', [], true, $factory); + $connection->publish('body', $headers); + } } class TestAmqpFactory extends AmqpFactory diff --git a/src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php b/src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php index cffc49df29..d56b105a2b 100644 --- a/src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php +++ b/src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php @@ -133,7 +133,10 @@ class Connection $this->setup(); } - $this->exchange()->publish($body, $this->queueConfiguration['routing_key'] ?? null, AMQP_NOPARAM, ['headers' => $headers]); + $flags = $this->queueConfiguration['flags'] ?? AMQP_NOPARAM; + $attributes = array_merge_recursive($this->queueConfiguration['attributes'] ?? [], ['headers' => $headers]); + + $this->exchange()->publish($body, $this->queueConfiguration['routing_key'] ?? null, $flags, $attributes); } /**