From 5bc336416797a065662e8bbc0859c8d33f052005 Mon Sep 17 00:00:00 2001 From: Tobias Schultze Date: Fri, 14 Jun 2019 19:29:31 +0200 Subject: [PATCH] [Messenger] fix AMQP delay queue to be per exchange --- .../Transport/AmqpExt/ConnectionTest.php | 111 +++++++++--------- .../Transport/AmqpExt/Connection.php | 34 +++--- 2 files changed, 70 insertions(+), 75 deletions(-) diff --git a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/ConnectionTest.php b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/ConnectionTest.php index 9462529b94..e7dee84219 100644 --- a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/ConnectionTest.php +++ b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/ConnectionTest.php @@ -23,6 +23,8 @@ use Symfony\Component\Messenger\Transport\AmqpExt\Connection; */ class ConnectionTest extends TestCase { + private const DEFAULT_EXCHANGE_NAME = 'messages'; + /** * @expectedException \InvalidArgumentException * @expectedExceptionMessage The given AMQP DSN "amqp://:" is invalid. @@ -40,9 +42,9 @@ class ConnectionTest extends TestCase 'port' => 5672, 'vhost' => '/', ], [ - 'name' => 'messages', + 'name' => self::DEFAULT_EXCHANGE_NAME, ], [ - 'messages' => [], + self::DEFAULT_EXCHANGE_NAME => [], ]), Connection::fromDsn('amqp://') ); @@ -196,7 +198,7 @@ class ConnectionTest extends TestCase $amqpChannel->expects($this->once())->method('isConnected')->willReturn(true); $amqpConnection->expects($this->once())->method('connect'); - $connection = Connection::fromDsn('amqp://localhost/%2f/messages', [], $factory); + $connection = Connection::fromDsn('amqp://localhost', [], $factory); $connection->publish('body'); } @@ -213,7 +215,7 @@ class ConnectionTest extends TestCase $amqpChannel->expects($this->once())->method('isConnected')->willReturn(true); $amqpConnection->expects($this->once())->method('pconnect'); - $connection = Connection::fromDsn('amqp://localhost/%2f/messages?persistent=true', [], $factory); + $connection = Connection::fromDsn('amqp://localhost?persistent=true', [], $factory); $connection->publish('body'); } @@ -226,13 +228,12 @@ class ConnectionTest extends TestCase $amqpExchange = $this->createMock(\AMQPExchange::class) ); - $amqpExchange->method('getName')->willReturn('exchange_name'); $amqpExchange->expects($this->once())->method('declareExchange'); $amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => []]); $amqpQueue->expects($this->once())->method('declareQueue'); - $amqpQueue->expects($this->once())->method('bind')->with('exchange_name', null); + $amqpQueue->expects($this->once())->method('bind')->with(self::DEFAULT_EXCHANGE_NAME, null); - $connection = Connection::fromDsn('amqp://localhost/%2f/messages', [], $factory); + $connection = Connection::fromDsn('amqp://localhost', [], $factory); $connection->publish('body'); } @@ -250,21 +251,20 @@ class ConnectionTest extends TestCase $factory->method('createExchange')->willReturn($amqpExchange); $factory->method('createQueue')->will($this->onConsecutiveCalls($amqpQueue0, $amqpQueue1)); - $amqpExchange->method('getName')->willReturn('exchange_name'); $amqpExchange->expects($this->once())->method('declareExchange'); $amqpExchange->expects($this->once())->method('publish')->with('body', 'routing_key', AMQP_NOPARAM, ['headers' => []]); $amqpQueue0->expects($this->once())->method('declareQueue'); $amqpQueue0->expects($this->exactly(2))->method('bind')->withConsecutive( - ['exchange_name', 'binding_key0'], - ['exchange_name', 'binding_key1'] + [self::DEFAULT_EXCHANGE_NAME, 'binding_key0'], + [self::DEFAULT_EXCHANGE_NAME, 'binding_key1'] ); $amqpQueue1->expects($this->once())->method('declareQueue'); $amqpQueue1->expects($this->exactly(2))->method('bind')->withConsecutive( - ['exchange_name', 'binding_key2'], - ['exchange_name', 'binding_key3'] + [self::DEFAULT_EXCHANGE_NAME, 'binding_key2'], + [self::DEFAULT_EXCHANGE_NAME, 'binding_key3'] ); - $dsn = 'amqp://localhost/%2f/messages?'. + $dsn = 'amqp://localhost?'. 'exchange[default_publish_routing_key]=routing_key&'. 'queues[queue0][binding_keys][0]=binding_key0&'. 'queues[queue0][binding_keys][1]=binding_key1&'. @@ -284,18 +284,17 @@ class ConnectionTest extends TestCase $amqpExchange = $this->createMock(\AMQPExchange::class) ); - $amqpExchange->method('getName')->willReturn('exchange_name'); $amqpExchange->expects($this->never())->method('declareExchange'); $amqpQueue->expects($this->never())->method('declareQueue'); $amqpQueue->expects($this->never())->method('bind'); - $connection = Connection::fromDsn('amqp://localhost/%2f/messages', ['auto_setup' => 'false'], $factory); + $connection = Connection::fromDsn('amqp://localhost', ['auto_setup' => 'false'], $factory); $connection->publish('body'); - $connection = Connection::fromDsn('amqp://localhost/%2f/messages', ['auto_setup' => false], $factory); + $connection = Connection::fromDsn('amqp://localhost', ['auto_setup' => false], $factory); $connection->publish('body'); - $connection = Connection::fromDsn('amqp://localhost/%2f/messages?auto_setup=false', [], $factory); + $connection = Connection::fromDsn('amqp://localhost?auto_setup=false', [], $factory); $connection->publish('body'); } @@ -312,9 +311,9 @@ class ConnectionTest extends TestCase $amqpChannel->expects($this->exactly(2))->method('isConnected')->willReturn(true); $amqpChannel->expects($this->exactly(2))->method('setPrefetchCount')->with(2); - $connection = Connection::fromDsn('amqp://localhost/%2f/messages?prefetch_count=2', [], $factory); + $connection = Connection::fromDsn('amqp://localhost?prefetch_count=2', [], $factory); $connection->setup(); - $connection = Connection::fromDsn('amqp://localhost/%2f/messages', ['prefetch_count' => 2], $factory); + $connection = Connection::fromDsn('amqp://localhost', ['prefetch_count' => 2], $factory); $connection->setup(); } @@ -329,29 +328,29 @@ class ConnectionTest extends TestCase $factory->method('createChannel')->willReturn($amqpChannel); $factory->method('createQueue')->willReturn($delayQueue); $factory->method('createExchange')->will($this->onConsecutiveCalls( - $delayExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock(), - $amqpExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock() + $amqpExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock(), + $delayExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock() )); - $amqpExchange->expects($this->once())->method('setName')->with('messages'); - $amqpExchange->method('getName')->willReturn('messages'); + $amqpExchange->expects($this->once())->method('setName')->with(self::DEFAULT_EXCHANGE_NAME); + $amqpExchange->expects($this->once())->method('declareExchange'); $delayExchange->expects($this->once())->method('setName')->with('delay'); $delayExchange->expects($this->once())->method('declareExchange'); - $delayExchange->method('getName')->willReturn('delay'); - $delayQueue->expects($this->once())->method('setName')->with('delay_queue__5000'); + $delayQueue->expects($this->once())->method('setName')->with('delay_queue_messages__5000'); $delayQueue->expects($this->once())->method('setArguments')->with([ 'x-message-ttl' => 5000, - 'x-dead-letter-exchange' => 'messages', + 'x-dead-letter-exchange' => self::DEFAULT_EXCHANGE_NAME, + 'x-dead-letter-routing-key' => '', ]); $delayQueue->expects($this->once())->method('declareQueue'); - $delayQueue->expects($this->once())->method('bind')->with('delay', 'delay__5000'); + $delayQueue->expects($this->once())->method('bind')->with('delay', 'delay_messages__5000'); - $delayExchange->expects($this->once())->method('publish')->with('{}', 'delay__5000', AMQP_NOPARAM, ['headers' => ['x-some-headers' => 'foo']]); + $delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages__5000', AMQP_NOPARAM, ['headers' => ['x-some-headers' => 'foo']]); - $connection = Connection::fromDsn('amqp://localhost/%2f/messages', [], $factory); + $connection = Connection::fromDsn('amqp://localhost', [], $factory); $connection->publish('{}', ['x-some-headers' => 'foo'], 5000); } @@ -366,16 +365,15 @@ class ConnectionTest extends TestCase $factory->method('createChannel')->willReturn($amqpChannel); $factory->method('createQueue')->willReturn($delayQueue); $factory->method('createExchange')->will($this->onConsecutiveCalls( - $delayExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock(), - $amqpExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock() + $amqpExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock(), + $delayExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock() )); - $amqpExchange->expects($this->once())->method('setName')->with('messages'); - $amqpExchange->method('getName')->willReturn('messages'); + $amqpExchange->expects($this->once())->method('setName')->with(self::DEFAULT_EXCHANGE_NAME); + $amqpExchange->expects($this->once())->method('declareExchange'); $delayExchange->expects($this->once())->method('setName')->with('delay'); $delayExchange->expects($this->once())->method('declareExchange'); - $delayExchange->method('getName')->willReturn('delay'); $connectionOptions = [ 'retry' => [ @@ -383,24 +381,25 @@ class ConnectionTest extends TestCase ], ]; - $connection = Connection::fromDsn('amqp://localhost/%2f/messages', $connectionOptions, $factory); + $connection = Connection::fromDsn('amqp://localhost', $connectionOptions, $factory); - $delayQueue->expects($this->once())->method('setName')->with('delay_queue__120000'); + $delayQueue->expects($this->once())->method('setName')->with('delay_queue_messages__120000'); $delayQueue->expects($this->once())->method('setArguments')->with([ 'x-message-ttl' => 120000, - 'x-dead-letter-exchange' => 'messages', + 'x-dead-letter-exchange' => self::DEFAULT_EXCHANGE_NAME, + 'x-dead-letter-routing-key' => '', ]); $delayQueue->expects($this->once())->method('declareQueue'); - $delayQueue->expects($this->once())->method('bind')->with('delay', 'delay__120000'); + $delayQueue->expects($this->once())->method('bind')->with('delay', 'delay_messages__120000'); - $delayExchange->expects($this->once())->method('publish')->with('{}', 'delay__120000', AMQP_NOPARAM, ['headers' => []]); + $delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages__120000', AMQP_NOPARAM, ['headers' => []]); $connection->publish('{}', [], 120000); } /** * @expectedException \AMQPException - * @expectedExceptionMessage Could not connect to the AMQP server. Please verify the provided DSN. ({"delay":{"routing_key_pattern":"delay_%routing_key%_%delay%","exchange_name":"delay","queue_name_pattern":"delay_queue_%routing_key%_%delay%"},"host":"localhost","port":5672,"vhost":"\/","login":"user","password":"********"}) + * @expectedExceptionMessage Could not connect to the AMQP server. Please verify the provided DSN. ({"host":"localhost","port":5672,"vhost":"\/","login":"user","password":"********"}) */ public function testObfuscatePasswordInDsn() { @@ -415,7 +414,7 @@ class ConnectionTest extends TestCase new \AMQPConnectionException('Oups.') ); - $connection = Connection::fromDsn('amqp://user:secretpassword@localhost/%2f/messages', [], $factory); + $connection = Connection::fromDsn('amqp://user:secretpassword@localhost', [], $factory); $connection->channel(); } @@ -430,7 +429,7 @@ class ConnectionTest extends TestCase $amqpExchange->expects($this->once())->method('publish')->with('body', 'routing_key'); - $connection = Connection::fromDsn('amqp://localhost/%2f/messages?exchange[default_publish_routing_key]=routing_key', [], $factory); + $connection = Connection::fromDsn('amqp://localhost?exchange[default_publish_routing_key]=routing_key', [], $factory); $connection->publish('body'); } @@ -445,7 +444,7 @@ class ConnectionTest extends TestCase $amqpExchange->expects($this->once())->method('publish')->with('body', 'routing_key'); - $connection = Connection::fromDsn('amqp://localhost/%2f/messages?exchange[default_publish_routing_key]=default_routing_key', [], $factory); + $connection = Connection::fromDsn('amqp://localhost?exchange[default_publish_routing_key]=default_routing_key', [], $factory); $connection->publish('body', [], 0, new AmqpStamp('routing_key')); } @@ -460,16 +459,15 @@ class ConnectionTest extends TestCase $factory->method('createChannel')->willReturn($amqpChannel); $factory->method('createQueue')->willReturn($delayQueue); $factory->method('createExchange')->will($this->onConsecutiveCalls( - $delayExchange = $this->createMock(\AMQPExchange::class), - $amqpExchange = $this->createMock(\AMQPExchange::class) + $amqpExchange = $this->createMock(\AMQPExchange::class), + $delayExchange = $this->createMock(\AMQPExchange::class) )); - $amqpExchange->expects($this->once())->method('setName')->with('messages'); - $amqpExchange->method('getName')->willReturn('messages'); + $amqpExchange->expects($this->once())->method('setName')->with(self::DEFAULT_EXCHANGE_NAME); + $amqpExchange->expects($this->once())->method('declareExchange'); $delayExchange->expects($this->once())->method('setName')->with('delay'); $delayExchange->expects($this->once())->method('declareExchange'); - $delayExchange->method('getName')->willReturn('delay'); $connectionOptions = [ 'retry' => [ @@ -477,22 +475,19 @@ class ConnectionTest extends TestCase ], ]; - $connection = Connection::fromDsn('amqp://localhost/%2f/messages', $connectionOptions, $factory); + $connection = Connection::fromDsn('amqp://localhost', $connectionOptions, $factory); - $delayQueue->expects($this->once())->method('setName')->with('delay_queue_routing_key_120000'); + $delayQueue->expects($this->once())->method('setName')->with('delay_queue_messages_routing_key_120000'); $delayQueue->expects($this->once())->method('setArguments')->with([ 'x-message-ttl' => 120000, - 'x-dead-letter-exchange' => 'messages', + 'x-dead-letter-exchange' => self::DEFAULT_EXCHANGE_NAME, + 'x-dead-letter-routing-key' => 'routing_key', ]); - $delayQueue->expects($this->once())->method('setArgument')->with( - 'x-dead-letter-routing-key', - 'routing_key' - ); $delayQueue->expects($this->once())->method('declareQueue'); - $delayQueue->expects($this->once())->method('bind')->with('delay', 'delay_routing_key_120000'); + $delayQueue->expects($this->once())->method('bind')->with('delay', 'delay_messages_routing_key_120000'); - $delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_routing_key_120000', AMQP_NOPARAM, ['headers' => []]); + $delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages_routing_key_120000', AMQP_NOPARAM, ['headers' => []]); $connection->publish('{}', [], 120000, new AmqpStamp('routing_key')); } @@ -512,7 +507,7 @@ class ConnectionTest extends TestCase ['delivery_mode' => 2, 'headers' => ['type' => DummyMessage::class]] ); - $connection = Connection::fromDsn('amqp://localhost/%2f/messages', [], $factory); + $connection = Connection::fromDsn('amqp://localhost', [], $factory); $connection->publish('body', ['type' => DummyMessage::class], 0, new AmqpStamp('routing_key', AMQP_IMMEDIATE, ['delivery_mode' => 2])); } } diff --git a/src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php b/src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php index e35cbaa1e3..6f6b8b22a6 100644 --- a/src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php +++ b/src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php @@ -62,9 +62,9 @@ class Connection { $this->connectionOptions = array_replace_recursive([ 'delay' => [ - 'routing_key_pattern' => 'delay_%routing_key%_%delay%', + 'routing_key_pattern' => 'delay_%exchange_name%_%routing_key%_%delay%', 'exchange_name' => 'delay', - 'queue_name_pattern' => 'delay_queue_%routing_key%_%delay%', + 'queue_name_pattern' => 'delay_queue_%exchange_name%_%routing_key%_%delay%', ], ], $connectionOptions); $this->exchangeOptions = $exchangeOptions; @@ -93,8 +93,8 @@ class Connection * * flags: Exchange flags (Default: AMQP_DURABLE) * * arguments: Extra arguments * * delay: - * * routing_key_pattern: The pattern of the routing key (Default: "delay_%routing_key%_%delay%") - * * queue_name_pattern: Pattern to use to create the queues (Default: "delay_queue_%routing_key%_%delay%") + * * routing_key_pattern: The pattern of the routing key (Default: "delay_%exchange_name%_%routing_key%_%delay%") + * * queue_name_pattern: Pattern to use to create the queues (Default: "delay_queue_%exchange_name%_%routing_key%_%delay%") * * exchange_name: Name of the exchange to be used for the retried messages (Default: "delay") * * auto_setup: Enable or not the auto-setup of queues and exchanges (Default: true) * * prefetch_count: set channel prefetch count @@ -246,12 +246,12 @@ class Connection $this->clear(); } - $exchange = $this->getDelayExchange(); - $exchange->declareExchange(); + $this->exchange()->declareExchange(); // setup normal exchange for delay queue to DLX messages to + $this->getDelayExchange()->declareExchange(); $queue = $this->createDelayQueue($delay, $routingKey); $queue->declareQueue(); - $queue->bind($exchange->getName(), $this->getRoutingKeyForDelay($delay, $routingKey)); + $queue->bind($this->connectionOptions['delay']['exchange_name'], $this->getRoutingKeyForDelay($delay, $routingKey)); } private function getDelayExchange(): \AMQPExchange @@ -278,27 +278,26 @@ class Connection { $queue = $this->amqpFactory->createQueue($this->channel()); $queue->setName(str_replace( - ['%delay%', '%routing_key%'], - [$delay, $routingKey ?? ''], + ['%delay%', '%exchange_name%', '%routing_key%'], + [$delay, $this->exchangeOptions['name'], $routingKey ?? ''], $this->connectionOptions['delay']['queue_name_pattern'] )); $queue->setArguments([ 'x-message-ttl' => $delay, - 'x-dead-letter-exchange' => $this->exchange()->getName(), + 'x-dead-letter-exchange' => $this->exchangeOptions['name'], + // after being released from to DLX, make sure the original routing key will be used + // we must use an empty string instead of null for the argument to be picked up + 'x-dead-letter-routing-key' => $routingKey ?? '', ]); - // after being released from to DLX, make sure the original routing key will be used - // we must use an empty string instead of null for the argument to be picked up - $queue->setArgument('x-dead-letter-routing-key', $routingKey ?? ''); - return $queue; } private function getRoutingKeyForDelay(int $delay, ?string $finalRoutingKey): string { return str_replace( - ['%delay%', '%routing_key%'], - [$delay, $finalRoutingKey ?? ''], + ['%delay%', '%exchange_name%', '%routing_key%'], + [$delay, $this->exchangeOptions['name'], $finalRoutingKey ?? ''], $this->connectionOptions['delay']['routing_key_pattern'] ); } @@ -353,7 +352,7 @@ class Connection foreach ($this->queuesOptions as $queueName => $queueConfig) { $this->queue($queueName)->declareQueue(); foreach ($queueConfig['binding_keys'] ?? [null] as $bindingKey) { - $this->queue($queueName)->bind($this->exchange()->getName(), $bindingKey); + $this->queue($queueName)->bind($this->exchangeOptions['name'], $bindingKey); } } } @@ -377,6 +376,7 @@ class Connection } catch (\AMQPConnectionException $e) { $credentials = $this->connectionOptions; $credentials['password'] = '********'; + unset($credentials['delay']); throw new \AMQPException(sprintf('Could not connect to the AMQP server. Please verify the provided DSN. (%s)', json_encode($credentials)), 0, $e); }