diff --git a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/ConnectionTest.php b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/ConnectionTest.php index 5d8cd58c98..614cbb82e7 100644 --- a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/ConnectionTest.php +++ b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/ConnectionTest.php @@ -337,7 +337,7 @@ class ConnectionTest extends TestCase $amqpQueue->expects($this->once())->method('setName')->with(self::DEFAULT_EXCHANGE_NAME); $amqpQueue->expects($this->once())->method('declareQueue'); - $delayExchange->expects($this->once())->method('setName')->with('delay'); + $delayExchange->expects($this->once())->method('setName')->with('delays'); $delayExchange->expects($this->once())->method('declareExchange'); $delayExchange->expects($this->once())->method('publish'); @@ -371,7 +371,7 @@ class ConnectionTest extends TestCase ]); $delayQueue->expects($this->once())->method('declareQueue'); - $delayQueue->expects($this->once())->method('bind')->with('delay', 'delay_messages__5000'); + $delayQueue->expects($this->once())->method('bind')->with('delays', 'delay_messages__5000'); $delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages__5000', AMQP_NOPARAM, ['headers' => ['x-some-headers' => 'foo']]); @@ -413,7 +413,7 @@ class ConnectionTest extends TestCase ]); $delayQueue->expects($this->once())->method('declareQueue'); - $delayQueue->expects($this->once())->method('bind')->with('delay', 'delay_messages__120000'); + $delayQueue->expects($this->once())->method('bind')->with('delays', 'delay_messages__120000'); $delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages__120000', AMQP_NOPARAM, ['headers' => []]); $connection->publish('{}', [], 120000); @@ -517,7 +517,7 @@ class ConnectionTest extends TestCase ]); $delayQueue->expects($this->once())->method('declareQueue'); - $delayQueue->expects($this->once())->method('bind')->with('delay', 'delay_messages_routing_key_120000'); + $delayQueue->expects($this->once())->method('bind')->with('delays', 'delay_messages_routing_key_120000'); $delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages_routing_key_120000', AMQP_NOPARAM, ['headers' => []]); $connection->publish('{}', [], 120000, new AmqpStamp('routing_key')); diff --git a/src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php b/src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php index 3f0212f47f..e27741bf58 100644 --- a/src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php +++ b/src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php @@ -62,7 +62,7 @@ class Connection { $this->connectionOptions = array_replace_recursive([ 'delay' => [ - 'exchange_name' => 'delay', + 'exchange_name' => 'delays', 'queue_name_pattern' => 'delay_%exchange_name%_%routing_key%_%delay%', ], ], $connectionOptions); @@ -93,7 +93,7 @@ class Connection * * arguments: Extra arguments * * delay: * * queue_name_pattern: Pattern to use to create the queues (Default: "delay_%exchange_name%_%routing_key%_%delay%") - * * exchange_name: Name of the exchange to be used for the delayed/retried messages (Default: "delay") + * * exchange_name: Name of the exchange to be used for the delayed/retried messages (Default: "delays") * * auto_setup: Enable or not the auto-setup of queues and exchanges (Default: true) * * prefetch_count: set channel prefetch count */ @@ -252,6 +252,11 @@ class Connection $this->amqpDelayExchange = $this->amqpFactory->createExchange($this->channel()); $this->amqpDelayExchange->setName($this->connectionOptions['delay']['exchange_name']); $this->amqpDelayExchange->setType(AMQP_EX_TYPE_DIRECT); + if ('delays' === $this->connectionOptions['delay']['exchange_name']) { + // only add the new flag when the name was not provided explicitly so we're using the new default name to prevent a redeclaration error + // the condition will be removed in 4.4 + $this->amqpDelayExchange->setFlags(AMQP_DURABLE); + } } return $this->amqpDelayExchange; @@ -274,16 +279,24 @@ class Connection [$delay, $this->exchangeOptions['name'], $routingKey ?? ''], $this->connectionOptions['delay']['queue_name_pattern'] )); + if ('delay_%exchange_name%_%routing_key%_%delay%' === $this->connectionOptions['delay']['queue_name_pattern']) { + // the condition will be removed in 4.4 + $queue->setFlags(AMQP_DURABLE); + $extraArguments = [ + // delete the delay queue 10 seconds after the message expires + // publishing another message redeclares the queue which renews the lease + 'x-expires' => $delay + 10000, + ]; + } else { + $extraArguments = []; + } $queue->setArguments([ 'x-message-ttl' => $delay, - // delete the delay queue 10 seconds after the message expires - // publishing another message redeclares the queue which renews the lease - 'x-expires' => $delay + 10000, 'x-dead-letter-exchange' => $this->exchangeOptions['name'], // after being released from to DLX, make sure the original routing key will be used // we must use an empty string instead of null for the argument to be picked up 'x-dead-letter-routing-key' => $routingKey ?? '', - ]); + ] + $extraArguments); return $queue; }