diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/CHANGELOG.md b/src/Symfony/Component/Messenger/Bridge/Amqp/CHANGELOG.md index 34bb9547ea..1e5bcf3d35 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/CHANGELOG.md +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/CHANGELOG.md @@ -6,6 +6,7 @@ CHANGELOG * Deprecated the `prefetch_count` parameter, it has no effect and will be removed in Symfony 6.0. * `AmqpReceiver` implements `QueueReceiverInterface` to fetch messages from a specific set of queues. + * Add ability to distinguish retry and delay actions 5.2.0 ----- diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpExtIntegrationTest.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpExtIntegrationTest.php index c7e612301d..a80694ee30 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpExtIntegrationTest.php +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpExtIntegrationTest.php @@ -140,6 +140,45 @@ class AmqpExtIntegrationTest extends TestCase $receiver->ack($envelope); } + public function testRetryAffectsOnlyOriginalQueue() + { + $connection = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN'), [ + 'exchange' => [ + 'name' => 'messages_topic', + 'type' => 'topic', + 'default_publish_routing_key' => 'topic_routing_key', + ], + 'queues' => [ + 'A' => ['binding_keys' => ['topic_routing_key']], + 'B' => ['binding_keys' => ['topic_routing_key']], + ], + ]); + $connection->setup(); + $connection->purgeQueues(); + + $serializer = $this->createSerializer(); + $sender = new AmqpSender($connection, $serializer); + $receiver = new AmqpReceiver($connection, $serializer); + + // initial delivery: should receive in both queues + $sender->send(new Envelope(new DummyMessage('Payload'))); + + $receivedEnvelopes = $this->receiveWithQueueName($receiver); + $this->assertCount(2, $receivedEnvelopes); + $this->assertArrayHasKey('A', $receivedEnvelopes); + $this->assertArrayHasKey('B', $receivedEnvelopes); + + // retry: should receive in only "A" queue + $retryEnvelope = $receivedEnvelopes['A'] + ->with(new DelayStamp(10)) + ->with(new RedeliveryStamp(1)); + $sender->send($retryEnvelope); + + $retriedEnvelopes = $this->receiveWithQueueName($receiver); + $this->assertCount(1, $retriedEnvelopes); + $this->assertArrayHasKey('A', $retriedEnvelopes); + } + public function testItReceivesSignals() { $serializer = $this->createSerializer(); @@ -255,4 +294,19 @@ TXT return $envelopes; } + + private function receiveWithQueueName(AmqpReceiver $receiver) + { + // let RabbitMQ receive messages + usleep(100 * 1000); // 100ms + + $receivedEnvelopes = []; + foreach ($receiver->get() as $envelope) { + $queueName = $envelope->last(AmqpReceivedStamp::class)->getQueueName(); + $receivedEnvelopes[$queueName] = $envelope; + $receiver->ack($envelope); + } + + return $receivedEnvelopes; + } } diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php index 543494acfb..299e443538 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php @@ -493,38 +493,32 @@ class ConnectionTest extends TestCase public function testItDelaysTheMessage() { - $amqpConnection = $this->createMock(\AMQPConnection::class); - $amqpChannel = $this->createMock(\AMQPChannel::class); + $delayExchange = $this->createMock(\AMQPExchange::class); + $delayExchange->expects($this->once()) + ->method('publish') + ->with('{}', 'delay_messages__5000_delay', AMQP_NOPARAM, [ + 'headers' => ['x-some-headers' => 'foo'], + 'delivery_mode' => 2, + 'timestamp' => time(), + ]); + $connection = $this->createDelayOrRetryConnection($delayExchange, self::DEFAULT_EXCHANGE_NAME, 'delay_messages__5000_delay'); - $factory = $this->createMock(AmqpFactory::class); - $factory->method('createConnection')->willReturn($amqpConnection); - $factory->method('createChannel')->willReturn($amqpChannel); - $factory->method('createQueue')->will($this->onConsecutiveCalls( - $this->createMock(\AMQPQueue::class), - $delayQueue = $this->createMock(\AMQPQueue::class) - )); - $factory->method('createExchange')->will($this->onConsecutiveCalls( - $this->createMock(\AMQPExchange::class), - $delayExchange = $this->createMock(\AMQPExchange::class) - )); - - $delayQueue->expects($this->once())->method('setName')->with('delay_messages__5000'); - $delayQueue->expects($this->once())->method('setArguments')->with([ - 'x-message-ttl' => 5000, - 'x-expires' => 5000 + 10000, - 'x-dead-letter-exchange' => self::DEFAULT_EXCHANGE_NAME, - 'x-dead-letter-routing-key' => '', - ]); - - $delayQueue->expects($this->once())->method('declareQueue'); - $delayQueue->expects($this->once())->method('bind')->with('delays', 'delay_messages__5000'); - - $delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages__5000', \AMQP_NOPARAM, ['headers' => ['x-some-headers' => 'foo'], 'delivery_mode' => 2, 'timestamp' => time()]); - - $connection = Connection::fromDsn('amqp://localhost', [], $factory); $connection->publish('{}', ['x-some-headers' => 'foo'], 5000); } + public function testItRetriesTheMessage() + { + $delayExchange = $this->createMock(\AMQPExchange::class); + $delayExchange->expects($this->once()) + ->method('publish') + ->with('{}', 'delay_messages__5000_retry', AMQP_NOPARAM); + $connection = $this->createDelayOrRetryConnection($delayExchange, '', 'delay_messages__5000_retry'); + + $amqpEnvelope = $this->createMock(\AMQPEnvelope::class); + $amqpStamp = AmqpStamp::createFromAmqpEnvelope($amqpEnvelope, null, ''); + $connection->publish('{}', [], 5000, $amqpStamp); + } + public function testItDelaysTheMessageWithADifferentRoutingKeyAndTTLs() { $amqpConnection = $this->createMock(\AMQPConnection::class); @@ -550,7 +544,7 @@ class ConnectionTest extends TestCase $connection = Connection::fromDsn('amqp://localhost', $connectionOptions, $factory); - $delayQueue->expects($this->once())->method('setName')->with('delay_messages__120000'); + $delayQueue->expects($this->once())->method('setName')->with('delay_messages__120000_delay'); $delayQueue->expects($this->once())->method('setArguments')->with([ 'x-message-ttl' => 120000, 'x-expires' => 120000 + 10000, @@ -559,9 +553,9 @@ class ConnectionTest extends TestCase ]); $delayQueue->expects($this->once())->method('declareQueue'); - $delayQueue->expects($this->once())->method('bind')->with('delays', 'delay_messages__120000'); + $delayQueue->expects($this->once())->method('bind')->with('delays', 'delay_messages__120000_delay'); - $delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages__120000', \AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2, 'timestamp' => time()]); + $delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages__120000_delay', \AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2, 'timestamp' => time()]); $connection->publish('{}', [], 120000); } @@ -690,7 +684,7 @@ class ConnectionTest extends TestCase $connection = Connection::fromDsn('amqp://localhost', $connectionOptions, $factory); - $delayQueue->expects($this->once())->method('setName')->with('delay_messages_routing_key_120000'); + $delayQueue->expects($this->once())->method('setName')->with('delay_messages_routing_key_120000_delay'); $delayQueue->expects($this->once())->method('setArguments')->with([ 'x-message-ttl' => 120000, 'x-expires' => 120000 + 10000, @@ -699,9 +693,9 @@ class ConnectionTest extends TestCase ]); $delayQueue->expects($this->once())->method('declareQueue'); - $delayQueue->expects($this->once())->method('bind')->with('delays', 'delay_messages_routing_key_120000'); + $delayQueue->expects($this->once())->method('bind')->with('delays', 'delay_messages_routing_key_120000_delay'); - $delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages_routing_key_120000', \AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2, 'timestamp' => time()]); + $delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages_routing_key_120000_delay', \AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2, 'timestamp' => time()]); $connection->publish('{}', [], 120000, new AmqpStamp('routing_key')); } @@ -769,6 +763,37 @@ class ConnectionTest extends TestCase $connection = Connection::fromDsn('amqp://localhost?confirm_timeout=0.5', [], $factory); $connection->publish('body'); } + + private function createDelayOrRetryConnection(\AMQPExchange $delayExchange, string $deadLetterExchangeName, string $delayQueueName): Connection + { + $amqpConnection = $this->createMock(\AMQPConnection::class); + $amqpChannel = $this->createMock(\AMQPChannel::class); + + $factory = $this->createMock(AmqpFactory::class); + $factory->method('createConnection')->willReturn($amqpConnection); + $factory->method('createChannel')->willReturn($amqpChannel); + $factory->method('createQueue')->will($this->onConsecutiveCalls( + $this->createMock(\AMQPQueue::class), + $delayQueue = $this->createMock(\AMQPQueue::class) + )); + $factory->method('createExchange')->will($this->onConsecutiveCalls( + $this->createMock(\AMQPExchange::class), + $delayExchange + )); + + $delayQueue->expects($this->once())->method('setName')->with($delayQueueName); + $delayQueue->expects($this->once())->method('setArguments')->with([ + 'x-message-ttl' => 5000, + 'x-expires' => 5000 + 10000, + 'x-dead-letter-exchange' => $deadLetterExchangeName, + 'x-dead-letter-routing-key' => '', + ]); + + $delayQueue->expects($this->once())->method('declareQueue'); + $delayQueue->expects($this->once())->method('bind')->with('delays', $delayQueueName); + + return Connection::fromDsn('amqp://localhost', [], $factory); + } } class TestAmqpFactory extends AmqpFactory diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpSender.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpSender.php index a77268bd05..713c9bc6b1 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpSender.php +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpSender.php @@ -14,6 +14,7 @@ namespace Symfony\Component\Messenger\Bridge\Amqp\Transport; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Exception\TransportException; use Symfony\Component\Messenger\Stamp\DelayStamp; +use Symfony\Component\Messenger\Stamp\RedeliveryStamp; use Symfony\Component\Messenger\Transport\Sender\SenderInterface; use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer; use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; @@ -58,7 +59,11 @@ class AmqpSender implements SenderInterface $amqpReceivedStamp = $envelope->last(AmqpReceivedStamp::class); if ($amqpReceivedStamp instanceof AmqpReceivedStamp) { - $amqpStamp = AmqpStamp::createFromAmqpEnvelope($amqpReceivedStamp->getAmqpEnvelope(), $amqpStamp); + $amqpStamp = AmqpStamp::createFromAmqpEnvelope( + $amqpReceivedStamp->getAmqpEnvelope(), + $amqpStamp, + $envelope->last(RedeliveryStamp::class) ? $amqpReceivedStamp->getQueueName() : null + ); } try { diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpStamp.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpStamp.php index 7d013974cc..b0f8a30a84 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpStamp.php +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpStamp.php @@ -22,6 +22,7 @@ final class AmqpStamp implements NonSendableStampInterface private $routingKey; private $flags; private $attributes; + private $isRetryAttempt = false; public function __construct(string $routingKey = null, int $flags = \AMQP_NOPARAM, array $attributes = []) { @@ -45,7 +46,7 @@ final class AmqpStamp implements NonSendableStampInterface return $this->attributes; } - public static function createFromAmqpEnvelope(\AMQPEnvelope $amqpEnvelope, self $previousStamp = null): self + public static function createFromAmqpEnvelope(\AMQPEnvelope $amqpEnvelope, self $previousStamp = null, string $retryRoutingKey = null): self { $attr = $previousStamp->attributes ?? []; @@ -62,7 +63,19 @@ final class AmqpStamp implements NonSendableStampInterface $attr['type'] = $attr['type'] ?? $amqpEnvelope->getType(); $attr['reply_to'] = $attr['reply_to'] ?? $amqpEnvelope->getReplyTo(); - return new self($previousStamp->routingKey ?? $amqpEnvelope->getRoutingKey(), $previousStamp->flags ?? \AMQP_NOPARAM, $attr); + if (null === $retryRoutingKey) { + $stamp = new self($previousStamp->routingKey ?? $amqpEnvelope->getRoutingKey(), $previousStamp->flags ?? AMQP_NOPARAM, $attr); + } else { + $stamp = new self($retryRoutingKey, $previousStamp->flags ?? AMQP_NOPARAM, $attr); + $stamp->isRetryAttempt = true; + } + + return $stamp; + } + + public function isRetryAttempt(): bool + { + return $this->isRetryAttempt; } public static function createWithAttributes(array $attributes, self $previousStamp = null): self diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php index d2d6decf71..3187a18ecd 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php @@ -323,13 +323,14 @@ class Connection private function publishWithDelay(string $body, array $headers, int $delay, AmqpStamp $amqpStamp = null) { $routingKey = $this->getRoutingKeyForMessage($amqpStamp); + $isRetryAttempt = $amqpStamp ? $amqpStamp->isRetryAttempt() : false; - $this->setupDelay($delay, $routingKey); + $this->setupDelay($delay, $routingKey, $isRetryAttempt); $this->publishOnExchange( $this->getDelayExchange(), $body, - $this->getRoutingKeyForDelay($delay, $routingKey), + $this->getRoutingKeyForDelay($delay, $routingKey, $isRetryAttempt), $headers, $amqpStamp ); @@ -354,15 +355,15 @@ class Connection } } - private function setupDelay(int $delay, ?string $routingKey) + private function setupDelay(int $delay, ?string $routingKey, bool $isRetryAttempt) { if ($this->autoSetup) { $this->setup(); // setup delay exchange and normal exchange for delay queue to DLX messages to } - $queue = $this->createDelayQueue($delay, $routingKey); + $queue = $this->createDelayQueue($delay, $routingKey, $isRetryAttempt); $queue->declareQueue(); // the delay queue always need to be declared because the name is dynamic and cannot be declared in advance - $queue->bind($this->connectionOptions['delay']['exchange_name'], $this->getRoutingKeyForDelay($delay, $routingKey)); + $queue->bind($this->connectionOptions['delay']['exchange_name'], $this->getRoutingKeyForDelay($delay, $routingKey, $isRetryAttempt)); } private function getDelayExchange(): \AMQPExchange @@ -386,21 +387,19 @@ class Connection * which is the original exchange, resulting on it being put back into * the original queue. */ - private function createDelayQueue(int $delay, ?string $routingKey): \AMQPQueue + private function createDelayQueue(int $delay, ?string $routingKey, bool $isRetryAttempt): \AMQPQueue { $queue = $this->amqpFactory->createQueue($this->channel()); - $queue->setName(str_replace( - ['%delay%', '%exchange_name%', '%routing_key%'], - [$delay, $this->exchangeOptions['name'], $routingKey ?? ''], - $this->connectionOptions['delay']['queue_name_pattern'] - )); + $queue->setName($this->getRoutingKeyForDelay($delay, $routingKey, $isRetryAttempt)); $queue->setFlags(\AMQP_DURABLE); $queue->setArguments([ 'x-message-ttl' => $delay, // delete the delay queue 10 seconds after the message expires // publishing another message redeclares the queue which renews the lease 'x-expires' => $delay + 10000, - 'x-dead-letter-exchange' => $this->exchangeOptions['name'], + // message should be broadcasted to all consumers during delay, but to only one queue during retry + // empty name is default direct exchange + 'x-dead-letter-exchange' => $isRetryAttempt ? '' : $this->exchangeOptions['name'], // after being released from to DLX, make sure the original routing key will be used // we must use an empty string instead of null for the argument to be picked up 'x-dead-letter-routing-key' => $routingKey ?? '', @@ -409,13 +408,15 @@ class Connection return $queue; } - private function getRoutingKeyForDelay(int $delay, ?string $finalRoutingKey): string + private function getRoutingKeyForDelay(int $delay, ?string $finalRoutingKey, bool $isRetryAttempt): string { + $action = $isRetryAttempt ? '_retry' : '_delay'; + return str_replace( ['%delay%', '%exchange_name%', '%routing_key%'], [$delay, $this->exchangeOptions['name'], $finalRoutingKey ?? ''], $this->connectionOptions['delay']['queue_name_pattern'] - ); + ).$action; } /**