From be2eb6fcc74b462ab1cd4d33999270d941367752 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gr=C3=A9goire=20Pineau?= Date: Thu, 12 Dec 2019 16:55:12 +0100 Subject: [PATCH] [Messenger][AMQP] Use delivery_mode=2 by default --- .../Transport/AmqpExt/ConnectionTest.php | 27 ++++++++++++++----- .../Transport/AmqpExt/Connection.php | 1 + 2 files changed, 22 insertions(+), 6 deletions(-) diff --git a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/ConnectionTest.php b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/ConnectionTest.php index 614cbb82e7..489020a6dc 100644 --- a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/ConnectionTest.php +++ b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/ConnectionTest.php @@ -227,7 +227,7 @@ class ConnectionTest extends TestCase ); $amqpExchange->expects($this->once())->method('declareExchange'); - $amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => []]); + $amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2]); $amqpQueue->expects($this->once())->method('declareQueue'); $amqpQueue->expects($this->once())->method('bind')->with(self::DEFAULT_EXCHANGE_NAME, null); @@ -250,7 +250,7 @@ class ConnectionTest extends TestCase $factory->method('createQueue')->will($this->onConsecutiveCalls($amqpQueue0, $amqpQueue1)); $amqpExchange->expects($this->once())->method('declareExchange'); - $amqpExchange->expects($this->once())->method('publish')->with('body', 'routing_key', AMQP_NOPARAM, ['headers' => []]); + $amqpExchange->expects($this->once())->method('publish')->with('body', 'routing_key', AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2]); $amqpQueue0->expects($this->once())->method('declareQueue'); $amqpQueue0->expects($this->exactly(2))->method('bind')->withConsecutive( [self::DEFAULT_EXCHANGE_NAME, 'binding_key0'], @@ -373,7 +373,7 @@ class ConnectionTest extends TestCase $delayQueue->expects($this->once())->method('declareQueue'); $delayQueue->expects($this->once())->method('bind')->with('delays', 'delay_messages__5000'); - $delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages__5000', AMQP_NOPARAM, ['headers' => ['x-some-headers' => 'foo']]); + $delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages__5000', AMQP_NOPARAM, ['headers' => ['x-some-headers' => 'foo'], 'delivery_mode' => 2]); $connection = Connection::fromDsn('amqp://localhost', [], $factory); $connection->publish('{}', ['x-some-headers' => 'foo'], 5000); @@ -415,7 +415,7 @@ class ConnectionTest extends TestCase $delayQueue->expects($this->once())->method('declareQueue'); $delayQueue->expects($this->once())->method('bind')->with('delays', 'delay_messages__120000'); - $delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages__120000', AMQP_NOPARAM, ['headers' => []]); + $delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages__120000', AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2]); $connection->publish('{}', [], 120000); } @@ -447,12 +447,27 @@ class ConnectionTest extends TestCase $amqpExchange = $this->createMock(\AMQPExchange::class) ); - $amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => ['Foo' => 'X', 'Bar' => 'Y']]); + $amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => ['Foo' => 'X', 'Bar' => 'Y'], 'delivery_mode' => 2]); $connection = Connection::fromDsn('amqp://localhost', [], $factory); $connection->publish('body', ['Foo' => 'X'], 0, new AmqpStamp(null, AMQP_NOPARAM, ['headers' => ['Bar' => 'Y']])); } + public function testAmqpStampDelireryModeIsUsed() + { + $factory = new TestAmqpFactory( + $this->createMock(\AMQPConnection::class), + $this->createMock(\AMQPChannel::class), + $this->createMock(\AMQPQueue::class), + $amqpExchange = $this->createMock(\AMQPExchange::class) + ); + + $amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 1]); + + $connection = Connection::fromDsn('amqp://localhost', [], $factory); + $connection->publish('body', [], 0, new AmqpStamp(null, AMQP_NOPARAM, ['delivery_mode' => 1])); + } + public function testItCanPublishWithTheDefaultRoutingKey() { $factory = new TestAmqpFactory( @@ -519,7 +534,7 @@ class ConnectionTest extends TestCase $delayQueue->expects($this->once())->method('declareQueue'); $delayQueue->expects($this->once())->method('bind')->with('delays', 'delay_messages_routing_key_120000'); - $delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages_routing_key_120000', AMQP_NOPARAM, ['headers' => []]); + $delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages_routing_key_120000', AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2]); $connection->publish('{}', [], 120000, new AmqpStamp('routing_key')); } diff --git a/src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php b/src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php index 4124eb3604..51cc3877d7 100644 --- a/src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php +++ b/src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php @@ -231,6 +231,7 @@ class Connection { $attributes = $amqpStamp ? $amqpStamp->getAttributes() : []; $attributes['headers'] = array_merge($attributes['headers'] ?? [], $headers); + $attributes['delivery_mode'] = $attributes['delivery_mode'] ?? 2; $exchange->publish( $body,