diff --git a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/ConnectionTest.php b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/ConnectionTest.php index a81f54c4bc..f51070c1d9 100644 --- a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/ConnectionTest.php +++ b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/ConnectionTest.php @@ -308,7 +308,7 @@ class ConnectionTest extends TestCase ); // makes sure the channel looks connected, so it's not re-created - $amqpChannel->expects($this->exactly(2))->method('isConnected')->willReturn(true); + $amqpChannel->expects($this->any())->method('isConnected')->willReturn(true); $amqpChannel->expects($this->exactly(2))->method('setPrefetchCount')->with(2); $connection = Connection::fromDsn('amqp://localhost?prefetch_count=2', [], $factory); @@ -317,30 +317,57 @@ class ConnectionTest extends TestCase $connection->setup(); } - public function testItDelaysTheMessage() + public function testAutoSetupWithDelayDeclaresExchangeQueuesAndDelay() { $amqpConnection = $this->createMock(\AMQPConnection::class); $amqpChannel = $this->createMock(\AMQPChannel::class); - $delayQueue = $this->createMock(\AMQPQueue::class); $factory = $this->createMock(AmqpFactory::class); $factory->method('createConnection')->willReturn($amqpConnection); $factory->method('createChannel')->willReturn($amqpChannel); - $factory->method('createQueue')->willReturn($delayQueue); + $factory->method('createQueue')->will($this->onConsecutiveCalls( + $amqpQueue = $this->createMock(\AMQPQueue::class), + $delayQueue = $this->createMock(\AMQPQueue::class) + )); $factory->method('createExchange')->will($this->onConsecutiveCalls( - $amqpExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock(), - $delayExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock() + $amqpExchange = $this->createMock(\AMQPExchange::class), + $delayExchange = $this->createMock(\AMQPExchange::class) )); $amqpExchange->expects($this->once())->method('setName')->with(self::DEFAULT_EXCHANGE_NAME); $amqpExchange->expects($this->once())->method('declareExchange'); + $amqpQueue->expects($this->once())->method('setName')->with(self::DEFAULT_EXCHANGE_NAME); + $amqpQueue->expects($this->once())->method('declareQueue'); $delayExchange->expects($this->once())->method('setName')->with('delay'); $delayExchange->expects($this->once())->method('declareExchange'); + $delayExchange->expects($this->once())->method('publish'); - $delayQueue->expects($this->once())->method('setName')->with('delay_queue_messages__5000'); + $connection = Connection::fromDsn('amqp://localhost', [], $factory); + $connection->publish('{}', ['x-some-headers' => 'foo'], 5000); + } + + public function testItDelaysTheMessage() + { + $amqpConnection = $this->createMock(\AMQPConnection::class); + $amqpChannel = $this->createMock(\AMQPChannel::class); + + $factory = $this->createMock(AmqpFactory::class); + $factory->method('createConnection')->willReturn($amqpConnection); + $factory->method('createChannel')->willReturn($amqpChannel); + $factory->method('createQueue')->will($this->onConsecutiveCalls( + $this->createMock(\AMQPQueue::class), + $delayQueue = $this->createMock(\AMQPQueue::class) + )); + $factory->method('createExchange')->will($this->onConsecutiveCalls( + $this->createMock(\AMQPExchange::class), + $delayExchange = $this->createMock(\AMQPExchange::class) + )); + + $delayQueue->expects($this->once())->method('setName')->with('delay_messages__5000'); $delayQueue->expects($this->once())->method('setArguments')->with([ 'x-message-ttl' => 5000, + 'x-expires' => 5000 + 10000, 'x-dead-letter-exchange' => self::DEFAULT_EXCHANGE_NAME, 'x-dead-letter-routing-key' => '', ]); @@ -358,22 +385,18 @@ class ConnectionTest extends TestCase { $amqpConnection = $this->createMock(\AMQPConnection::class); $amqpChannel = $this->createMock(\AMQPChannel::class); - $delayQueue = $this->createMock(\AMQPQueue::class); $factory = $this->createMock(AmqpFactory::class); $factory->method('createConnection')->willReturn($amqpConnection); $factory->method('createChannel')->willReturn($amqpChannel); - $factory->method('createQueue')->willReturn($delayQueue); - $factory->method('createExchange')->will($this->onConsecutiveCalls( - $amqpExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock(), - $delayExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock() + $factory->method('createQueue')->will($this->onConsecutiveCalls( + $this->createMock(\AMQPQueue::class), + $delayQueue = $this->createMock(\AMQPQueue::class) + )); + $factory->method('createExchange')->will($this->onConsecutiveCalls( + $this->createMock(\AMQPExchange::class), + $delayExchange = $this->createMock(\AMQPExchange::class) )); - - $amqpExchange->expects($this->once())->method('setName')->with(self::DEFAULT_EXCHANGE_NAME); - $amqpExchange->expects($this->once())->method('declareExchange'); - - $delayExchange->expects($this->once())->method('setName')->with('delay'); - $delayExchange->expects($this->once())->method('declareExchange'); $connectionOptions = [ 'retry' => [ @@ -383,9 +406,10 @@ class ConnectionTest extends TestCase $connection = Connection::fromDsn('amqp://localhost', $connectionOptions, $factory); - $delayQueue->expects($this->once())->method('setName')->with('delay_queue_messages__120000'); + $delayQueue->expects($this->once())->method('setName')->with('delay_messages__120000'); $delayQueue->expects($this->once())->method('setArguments')->with([ 'x-message-ttl' => 120000, + 'x-expires' => 120000 + 10000, 'x-dead-letter-exchange' => self::DEFAULT_EXCHANGE_NAME, 'x-dead-letter-routing-key' => '', ]); @@ -467,23 +491,19 @@ class ConnectionTest extends TestCase { $amqpConnection = $this->createMock(\AMQPConnection::class); $amqpChannel = $this->createMock(\AMQPChannel::class); - $delayQueue = $this->createMock(\AMQPQueue::class); $factory = $this->createMock(AmqpFactory::class); $factory->method('createConnection')->willReturn($amqpConnection); $factory->method('createChannel')->willReturn($amqpChannel); - $factory->method('createQueue')->willReturn($delayQueue); + $factory->method('createQueue')->will($this->onConsecutiveCalls( + $this->createMock(\AMQPQueue::class), + $delayQueue = $this->createMock(\AMQPQueue::class) + )); $factory->method('createExchange')->will($this->onConsecutiveCalls( - $amqpExchange = $this->createMock(\AMQPExchange::class), + $this->createMock(\AMQPExchange::class), $delayExchange = $this->createMock(\AMQPExchange::class) )); - $amqpExchange->expects($this->once())->method('setName')->with(self::DEFAULT_EXCHANGE_NAME); - $amqpExchange->expects($this->once())->method('declareExchange'); - - $delayExchange->expects($this->once())->method('setName')->with('delay'); - $delayExchange->expects($this->once())->method('declareExchange'); - $connectionOptions = [ 'retry' => [ 'dead_routing_key' => 'my_dead_routing_key', @@ -492,9 +512,10 @@ class ConnectionTest extends TestCase $connection = Connection::fromDsn('amqp://localhost', $connectionOptions, $factory); - $delayQueue->expects($this->once())->method('setName')->with('delay_queue_messages_routing_key_120000'); + $delayQueue->expects($this->once())->method('setName')->with('delay_messages_routing_key_120000'); $delayQueue->expects($this->once())->method('setArguments')->with([ 'x-message-ttl' => 120000, + 'x-expires' => 120000 + 10000, 'x-dead-letter-exchange' => self::DEFAULT_EXCHANGE_NAME, 'x-dead-letter-routing-key' => 'routing_key', ]); diff --git a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpSender.php b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpSender.php index cc97af135e..67df49b396 100644 --- a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpSender.php +++ b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpSender.php @@ -45,10 +45,7 @@ class AmqpSender implements SenderInterface /** @var DelayStamp|null $delayStamp */ $delayStamp = $envelope->last(DelayStamp::class); - $delay = 0; - if (null !== $delayStamp) { - $delay = $delayStamp->getDelay(); - } + $delay = $delayStamp ? $delayStamp->getDelay() : 0; $amqpStamp = $envelope->last(AmqpStamp::class); if (isset($encodedMessage['headers']['Content-Type'])) { diff --git a/src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php b/src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php index 3c7a91f621..3f0212f47f 100644 --- a/src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php +++ b/src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php @@ -62,9 +62,8 @@ class Connection { $this->connectionOptions = array_replace_recursive([ 'delay' => [ - 'routing_key_pattern' => 'delay_%exchange_name%_%routing_key%_%delay%', 'exchange_name' => 'delay', - 'queue_name_pattern' => 'delay_queue_%exchange_name%_%routing_key%_%delay%', + 'queue_name_pattern' => 'delay_%exchange_name%_%routing_key%_%delay%', ], ], $connectionOptions); $this->exchangeOptions = $exchangeOptions; @@ -93,9 +92,8 @@ class Connection * * flags: Exchange flags (Default: AMQP_DURABLE) * * arguments: Extra arguments * * delay: - * * routing_key_pattern: The pattern of the routing key (Default: "delay_%exchange_name%_%routing_key%_%delay%") - * * queue_name_pattern: Pattern to use to create the queues (Default: "delay_queue_%exchange_name%_%routing_key%_%delay%") - * * exchange_name: Name of the exchange to be used for the retried messages (Default: "delay") + * * queue_name_pattern: Pattern to use to create the queues (Default: "delay_%exchange_name%_%routing_key%_%delay%") + * * exchange_name: Name of the exchange to be used for the delayed/retried messages (Default: "delay") * * auto_setup: Enable or not the auto-setup of queues and exchanges (Default: true) * * prefetch_count: set channel prefetch count */ @@ -171,20 +169,20 @@ class Connection } /** - * @param int $delay The delay in milliseconds - * * @throws \AMQPException */ - public function publish(string $body, array $headers = [], int $delay = 0, AmqpStamp $amqpStamp = null): void + public function publish(string $body, array $headers = [], int $delayInMs = 0, AmqpStamp $amqpStamp = null): void { - if (0 !== $delay) { - $this->publishWithDelay($body, $headers, $delay, $amqpStamp); + $this->clearWhenDisconnected(); + + if (0 !== $delayInMs) { + $this->publishWithDelay($body, $headers, $delayInMs, $amqpStamp); return; } if ($this->shouldSetup()) { - $this->setup(); + $this->setupExchangeAndQueues(); } $this->publishOnExchange( @@ -213,9 +211,7 @@ class Connection { $routingKey = $this->getRoutingKeyForMessage($amqpStamp); - if ($this->shouldSetup()) { - $this->setupDelay($delay, $routingKey); - } + $this->setupDelay($delay, $routingKey); $this->publishOnExchange( $this->getDelayExchange(), @@ -241,15 +237,12 @@ class Connection private function setupDelay(int $delay, ?string $routingKey) { - if (!$this->channel()->isConnected()) { - $this->clear(); + if ($this->shouldSetup()) { + $this->setup(); // setup delay exchange and normal exchange for delay queue to DLX messages to } - $this->exchange()->declareExchange(); // setup normal exchange for delay queue to DLX messages to - $this->getDelayExchange()->declareExchange(); - $queue = $this->createDelayQueue($delay, $routingKey); - $queue->declareQueue(); + $queue->declareQueue(); // the delay queue always need to be declared because the name is dynamic and cannot be declared in advance $queue->bind($this->connectionOptions['delay']['exchange_name'], $this->getRoutingKeyForDelay($delay, $routingKey)); } @@ -283,6 +276,9 @@ class Connection )); $queue->setArguments([ 'x-message-ttl' => $delay, + // delete the delay queue 10 seconds after the message expires + // publishing another message redeclares the queue which renews the lease + 'x-expires' => $delay + 10000, 'x-dead-letter-exchange' => $this->exchangeOptions['name'], // after being released from to DLX, make sure the original routing key will be used // we must use an empty string instead of null for the argument to be picked up @@ -297,7 +293,7 @@ class Connection return str_replace( ['%delay%', '%exchange_name%', '%routing_key%'], [$delay, $this->exchangeOptions['name'], $finalRoutingKey ?? ''], - $this->connectionOptions['delay']['routing_key_pattern'] + $this->connectionOptions['delay']['queue_name_pattern'] ); } @@ -308,8 +304,10 @@ class Connection */ public function get(string $queueName): ?\AMQPEnvelope { + $this->clearWhenDisconnected(); + if ($this->shouldSetup()) { - $this->setup(); + $this->setupExchangeAndQueues(); } try { @@ -319,7 +317,7 @@ class Connection } catch (\AMQPQueueException $e) { if (404 === $e->getCode() && $this->shouldSetup()) { // If we get a 404 for the queue, it means we need to setup the exchange & queue. - $this->setup(); + $this->setupExchangeAndQueues(); return $this->get(); } @@ -342,10 +340,12 @@ class Connection public function setup(): void { - if (!$this->channel()->isConnected()) { - $this->clear(); - } + $this->setupExchangeAndQueues(); + $this->getDelayExchange()->declareExchange(); + } + private function setupExchangeAndQueues(): void + { $this->exchange()->declareExchange(); foreach ($this->queuesOptions as $queueName => $queueConfig) { @@ -424,12 +424,14 @@ class Connection return $this->amqpExchange; } - private function clear(): void + private function clearWhenDisconnected(): void { - $this->amqpChannel = null; - $this->amqpQueues = []; - $this->amqpExchange = null; - $this->amqpDelayExchange = null; + if (!$this->channel()->isConnected()) { + $this->amqpChannel = null; + $this->amqpQueues = []; + $this->amqpExchange = null; + $this->amqpDelayExchange = null; + } } private function shouldSetup(): bool