bug #32052 [Messenger] fix AMQP delay queue to be per exchange (Tobion)

This PR was squashed before being merged into the 4.3 branch (closes #32052).

Discussion
----------

[Messenger] fix AMQP delay queue to be per exchange

| Q             | A
| ------------- | ---
| Branch?       | 4.3
| Bug fix?      | yes
| New feature?  | no <!-- please update src/**/CHANGELOG.md files -->
| BC breaks?    | no     <!-- see https://symfony.com/bc -->
| Deprecations? | no <!-- please update UPGRADE-*.md and src/**/CHANGELOG.md files -->
| Tests pass?   | yes    <!-- please add some, will be required by reviewers -->
| Fixed tickets | #32050
| License       | MIT
| Doc PR        |

this makes the delay/retry work when having several exchanges or renaming your exchange.

also the delay setup did not declare the target exchange. so if you only do delayed messages for a connection, auto-setup forgot to actually create the target exchange.

Commits
-------

5bc3364167 [Messenger] fix AMQP delay queue to be per exchange
This commit is contained in:
Fabien Potencier 2019-06-17 18:44:05 +02:00
commit 12b852f7ab
2 changed files with 70 additions and 75 deletions

View File

@ -23,6 +23,8 @@ use Symfony\Component\Messenger\Transport\AmqpExt\Connection;
*/
class ConnectionTest extends TestCase
{
private const DEFAULT_EXCHANGE_NAME = 'messages';
/**
* @expectedException \InvalidArgumentException
* @expectedExceptionMessage The given AMQP DSN "amqp://:" is invalid.
@ -40,9 +42,9 @@ class ConnectionTest extends TestCase
'port' => 5672,
'vhost' => '/',
], [
'name' => 'messages',
'name' => self::DEFAULT_EXCHANGE_NAME,
], [
'messages' => [],
self::DEFAULT_EXCHANGE_NAME => [],
]),
Connection::fromDsn('amqp://')
);
@ -196,7 +198,7 @@ class ConnectionTest extends TestCase
$amqpChannel->expects($this->once())->method('isConnected')->willReturn(true);
$amqpConnection->expects($this->once())->method('connect');
$connection = Connection::fromDsn('amqp://localhost/%2f/messages', [], $factory);
$connection = Connection::fromDsn('amqp://localhost', [], $factory);
$connection->publish('body');
}
@ -213,7 +215,7 @@ class ConnectionTest extends TestCase
$amqpChannel->expects($this->once())->method('isConnected')->willReturn(true);
$amqpConnection->expects($this->once())->method('pconnect');
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?persistent=true', [], $factory);
$connection = Connection::fromDsn('amqp://localhost?persistent=true', [], $factory);
$connection->publish('body');
}
@ -226,13 +228,12 @@ class ConnectionTest extends TestCase
$amqpExchange = $this->createMock(\AMQPExchange::class)
);
$amqpExchange->method('getName')->willReturn('exchange_name');
$amqpExchange->expects($this->once())->method('declareExchange');
$amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => []]);
$amqpQueue->expects($this->once())->method('declareQueue');
$amqpQueue->expects($this->once())->method('bind')->with('exchange_name', null);
$amqpQueue->expects($this->once())->method('bind')->with(self::DEFAULT_EXCHANGE_NAME, null);
$connection = Connection::fromDsn('amqp://localhost/%2f/messages', [], $factory);
$connection = Connection::fromDsn('amqp://localhost', [], $factory);
$connection->publish('body');
}
@ -250,21 +251,20 @@ class ConnectionTest extends TestCase
$factory->method('createExchange')->willReturn($amqpExchange);
$factory->method('createQueue')->will($this->onConsecutiveCalls($amqpQueue0, $amqpQueue1));
$amqpExchange->method('getName')->willReturn('exchange_name');
$amqpExchange->expects($this->once())->method('declareExchange');
$amqpExchange->expects($this->once())->method('publish')->with('body', 'routing_key', AMQP_NOPARAM, ['headers' => []]);
$amqpQueue0->expects($this->once())->method('declareQueue');
$amqpQueue0->expects($this->exactly(2))->method('bind')->withConsecutive(
['exchange_name', 'binding_key0'],
['exchange_name', 'binding_key1']
[self::DEFAULT_EXCHANGE_NAME, 'binding_key0'],
[self::DEFAULT_EXCHANGE_NAME, 'binding_key1']
);
$amqpQueue1->expects($this->once())->method('declareQueue');
$amqpQueue1->expects($this->exactly(2))->method('bind')->withConsecutive(
['exchange_name', 'binding_key2'],
['exchange_name', 'binding_key3']
[self::DEFAULT_EXCHANGE_NAME, 'binding_key2'],
[self::DEFAULT_EXCHANGE_NAME, 'binding_key3']
);
$dsn = 'amqp://localhost/%2f/messages?'.
$dsn = 'amqp://localhost?'.
'exchange[default_publish_routing_key]=routing_key&'.
'queues[queue0][binding_keys][0]=binding_key0&'.
'queues[queue0][binding_keys][1]=binding_key1&'.
@ -284,18 +284,17 @@ class ConnectionTest extends TestCase
$amqpExchange = $this->createMock(\AMQPExchange::class)
);
$amqpExchange->method('getName')->willReturn('exchange_name');
$amqpExchange->expects($this->never())->method('declareExchange');
$amqpQueue->expects($this->never())->method('declareQueue');
$amqpQueue->expects($this->never())->method('bind');
$connection = Connection::fromDsn('amqp://localhost/%2f/messages', ['auto_setup' => 'false'], $factory);
$connection = Connection::fromDsn('amqp://localhost', ['auto_setup' => 'false'], $factory);
$connection->publish('body');
$connection = Connection::fromDsn('amqp://localhost/%2f/messages', ['auto_setup' => false], $factory);
$connection = Connection::fromDsn('amqp://localhost', ['auto_setup' => false], $factory);
$connection->publish('body');
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?auto_setup=false', [], $factory);
$connection = Connection::fromDsn('amqp://localhost?auto_setup=false', [], $factory);
$connection->publish('body');
}
@ -312,9 +311,9 @@ class ConnectionTest extends TestCase
$amqpChannel->expects($this->exactly(2))->method('isConnected')->willReturn(true);
$amqpChannel->expects($this->exactly(2))->method('setPrefetchCount')->with(2);
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?prefetch_count=2', [], $factory);
$connection = Connection::fromDsn('amqp://localhost?prefetch_count=2', [], $factory);
$connection->setup();
$connection = Connection::fromDsn('amqp://localhost/%2f/messages', ['prefetch_count' => 2], $factory);
$connection = Connection::fromDsn('amqp://localhost', ['prefetch_count' => 2], $factory);
$connection->setup();
}
@ -329,29 +328,29 @@ class ConnectionTest extends TestCase
$factory->method('createChannel')->willReturn($amqpChannel);
$factory->method('createQueue')->willReturn($delayQueue);
$factory->method('createExchange')->will($this->onConsecutiveCalls(
$delayExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock(),
$amqpExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock()
$amqpExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock(),
$delayExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock()
));
$amqpExchange->expects($this->once())->method('setName')->with('messages');
$amqpExchange->method('getName')->willReturn('messages');
$amqpExchange->expects($this->once())->method('setName')->with(self::DEFAULT_EXCHANGE_NAME);
$amqpExchange->expects($this->once())->method('declareExchange');
$delayExchange->expects($this->once())->method('setName')->with('delay');
$delayExchange->expects($this->once())->method('declareExchange');
$delayExchange->method('getName')->willReturn('delay');
$delayQueue->expects($this->once())->method('setName')->with('delay_queue__5000');
$delayQueue->expects($this->once())->method('setName')->with('delay_queue_messages__5000');
$delayQueue->expects($this->once())->method('setArguments')->with([
'x-message-ttl' => 5000,
'x-dead-letter-exchange' => 'messages',
'x-dead-letter-exchange' => self::DEFAULT_EXCHANGE_NAME,
'x-dead-letter-routing-key' => '',
]);
$delayQueue->expects($this->once())->method('declareQueue');
$delayQueue->expects($this->once())->method('bind')->with('delay', 'delay__5000');
$delayQueue->expects($this->once())->method('bind')->with('delay', 'delay_messages__5000');
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay__5000', AMQP_NOPARAM, ['headers' => ['x-some-headers' => 'foo']]);
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages__5000', AMQP_NOPARAM, ['headers' => ['x-some-headers' => 'foo']]);
$connection = Connection::fromDsn('amqp://localhost/%2f/messages', [], $factory);
$connection = Connection::fromDsn('amqp://localhost', [], $factory);
$connection->publish('{}', ['x-some-headers' => 'foo'], 5000);
}
@ -366,16 +365,15 @@ class ConnectionTest extends TestCase
$factory->method('createChannel')->willReturn($amqpChannel);
$factory->method('createQueue')->willReturn($delayQueue);
$factory->method('createExchange')->will($this->onConsecutiveCalls(
$delayExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock(),
$amqpExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock()
$amqpExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock(),
$delayExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock()
));
$amqpExchange->expects($this->once())->method('setName')->with('messages');
$amqpExchange->method('getName')->willReturn('messages');
$amqpExchange->expects($this->once())->method('setName')->with(self::DEFAULT_EXCHANGE_NAME);
$amqpExchange->expects($this->once())->method('declareExchange');
$delayExchange->expects($this->once())->method('setName')->with('delay');
$delayExchange->expects($this->once())->method('declareExchange');
$delayExchange->method('getName')->willReturn('delay');
$connectionOptions = [
'retry' => [
@ -383,24 +381,25 @@ class ConnectionTest extends TestCase
],
];
$connection = Connection::fromDsn('amqp://localhost/%2f/messages', $connectionOptions, $factory);
$connection = Connection::fromDsn('amqp://localhost', $connectionOptions, $factory);
$delayQueue->expects($this->once())->method('setName')->with('delay_queue__120000');
$delayQueue->expects($this->once())->method('setName')->with('delay_queue_messages__120000');
$delayQueue->expects($this->once())->method('setArguments')->with([
'x-message-ttl' => 120000,
'x-dead-letter-exchange' => 'messages',
'x-dead-letter-exchange' => self::DEFAULT_EXCHANGE_NAME,
'x-dead-letter-routing-key' => '',
]);
$delayQueue->expects($this->once())->method('declareQueue');
$delayQueue->expects($this->once())->method('bind')->with('delay', 'delay__120000');
$delayQueue->expects($this->once())->method('bind')->with('delay', 'delay_messages__120000');
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay__120000', AMQP_NOPARAM, ['headers' => []]);
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages__120000', AMQP_NOPARAM, ['headers' => []]);
$connection->publish('{}', [], 120000);
}
/**
* @expectedException \AMQPException
* @expectedExceptionMessage Could not connect to the AMQP server. Please verify the provided DSN. ({"delay":{"routing_key_pattern":"delay_%routing_key%_%delay%","exchange_name":"delay","queue_name_pattern":"delay_queue_%routing_key%_%delay%"},"host":"localhost","port":5672,"vhost":"\/","login":"user","password":"********"})
* @expectedExceptionMessage Could not connect to the AMQP server. Please verify the provided DSN. ({"host":"localhost","port":5672,"vhost":"\/","login":"user","password":"********"})
*/
public function testObfuscatePasswordInDsn()
{
@ -415,7 +414,7 @@ class ConnectionTest extends TestCase
new \AMQPConnectionException('Oups.')
);
$connection = Connection::fromDsn('amqp://user:secretpassword@localhost/%2f/messages', [], $factory);
$connection = Connection::fromDsn('amqp://user:secretpassword@localhost', [], $factory);
$connection->channel();
}
@ -430,7 +429,7 @@ class ConnectionTest extends TestCase
$amqpExchange->expects($this->once())->method('publish')->with('body', 'routing_key');
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?exchange[default_publish_routing_key]=routing_key', [], $factory);
$connection = Connection::fromDsn('amqp://localhost?exchange[default_publish_routing_key]=routing_key', [], $factory);
$connection->publish('body');
}
@ -445,7 +444,7 @@ class ConnectionTest extends TestCase
$amqpExchange->expects($this->once())->method('publish')->with('body', 'routing_key');
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?exchange[default_publish_routing_key]=default_routing_key', [], $factory);
$connection = Connection::fromDsn('amqp://localhost?exchange[default_publish_routing_key]=default_routing_key', [], $factory);
$connection->publish('body', [], 0, new AmqpStamp('routing_key'));
}
@ -460,16 +459,15 @@ class ConnectionTest extends TestCase
$factory->method('createChannel')->willReturn($amqpChannel);
$factory->method('createQueue')->willReturn($delayQueue);
$factory->method('createExchange')->will($this->onConsecutiveCalls(
$delayExchange = $this->createMock(\AMQPExchange::class),
$amqpExchange = $this->createMock(\AMQPExchange::class)
$amqpExchange = $this->createMock(\AMQPExchange::class),
$delayExchange = $this->createMock(\AMQPExchange::class)
));
$amqpExchange->expects($this->once())->method('setName')->with('messages');
$amqpExchange->method('getName')->willReturn('messages');
$amqpExchange->expects($this->once())->method('setName')->with(self::DEFAULT_EXCHANGE_NAME);
$amqpExchange->expects($this->once())->method('declareExchange');
$delayExchange->expects($this->once())->method('setName')->with('delay');
$delayExchange->expects($this->once())->method('declareExchange');
$delayExchange->method('getName')->willReturn('delay');
$connectionOptions = [
'retry' => [
@ -477,22 +475,19 @@ class ConnectionTest extends TestCase
],
];
$connection = Connection::fromDsn('amqp://localhost/%2f/messages', $connectionOptions, $factory);
$connection = Connection::fromDsn('amqp://localhost', $connectionOptions, $factory);
$delayQueue->expects($this->once())->method('setName')->with('delay_queue_routing_key_120000');
$delayQueue->expects($this->once())->method('setName')->with('delay_queue_messages_routing_key_120000');
$delayQueue->expects($this->once())->method('setArguments')->with([
'x-message-ttl' => 120000,
'x-dead-letter-exchange' => 'messages',
'x-dead-letter-exchange' => self::DEFAULT_EXCHANGE_NAME,
'x-dead-letter-routing-key' => 'routing_key',
]);
$delayQueue->expects($this->once())->method('setArgument')->with(
'x-dead-letter-routing-key',
'routing_key'
);
$delayQueue->expects($this->once())->method('declareQueue');
$delayQueue->expects($this->once())->method('bind')->with('delay', 'delay_routing_key_120000');
$delayQueue->expects($this->once())->method('bind')->with('delay', 'delay_messages_routing_key_120000');
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_routing_key_120000', AMQP_NOPARAM, ['headers' => []]);
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages_routing_key_120000', AMQP_NOPARAM, ['headers' => []]);
$connection->publish('{}', [], 120000, new AmqpStamp('routing_key'));
}
@ -512,7 +507,7 @@ class ConnectionTest extends TestCase
['delivery_mode' => 2, 'headers' => ['type' => DummyMessage::class]]
);
$connection = Connection::fromDsn('amqp://localhost/%2f/messages', [], $factory);
$connection = Connection::fromDsn('amqp://localhost', [], $factory);
$connection->publish('body', ['type' => DummyMessage::class], 0, new AmqpStamp('routing_key', AMQP_IMMEDIATE, ['delivery_mode' => 2]));
}
}

View File

@ -62,9 +62,9 @@ class Connection
{
$this->connectionOptions = array_replace_recursive([
'delay' => [
'routing_key_pattern' => 'delay_%routing_key%_%delay%',
'routing_key_pattern' => 'delay_%exchange_name%_%routing_key%_%delay%',
'exchange_name' => 'delay',
'queue_name_pattern' => 'delay_queue_%routing_key%_%delay%',
'queue_name_pattern' => 'delay_queue_%exchange_name%_%routing_key%_%delay%',
],
], $connectionOptions);
$this->exchangeOptions = $exchangeOptions;
@ -93,8 +93,8 @@ class Connection
* * flags: Exchange flags (Default: AMQP_DURABLE)
* * arguments: Extra arguments
* * delay:
* * routing_key_pattern: The pattern of the routing key (Default: "delay_%routing_key%_%delay%")
* * queue_name_pattern: Pattern to use to create the queues (Default: "delay_queue_%routing_key%_%delay%")
* * routing_key_pattern: The pattern of the routing key (Default: "delay_%exchange_name%_%routing_key%_%delay%")
* * queue_name_pattern: Pattern to use to create the queues (Default: "delay_queue_%exchange_name%_%routing_key%_%delay%")
* * exchange_name: Name of the exchange to be used for the retried messages (Default: "delay")
* * auto_setup: Enable or not the auto-setup of queues and exchanges (Default: true)
* * prefetch_count: set channel prefetch count
@ -246,12 +246,12 @@ class Connection
$this->clear();
}
$exchange = $this->getDelayExchange();
$exchange->declareExchange();
$this->exchange()->declareExchange(); // setup normal exchange for delay queue to DLX messages to
$this->getDelayExchange()->declareExchange();
$queue = $this->createDelayQueue($delay, $routingKey);
$queue->declareQueue();
$queue->bind($exchange->getName(), $this->getRoutingKeyForDelay($delay, $routingKey));
$queue->bind($this->connectionOptions['delay']['exchange_name'], $this->getRoutingKeyForDelay($delay, $routingKey));
}
private function getDelayExchange(): \AMQPExchange
@ -278,27 +278,26 @@ class Connection
{
$queue = $this->amqpFactory->createQueue($this->channel());
$queue->setName(str_replace(
['%delay%', '%routing_key%'],
[$delay, $routingKey ?? ''],
['%delay%', '%exchange_name%', '%routing_key%'],
[$delay, $this->exchangeOptions['name'], $routingKey ?? ''],
$this->connectionOptions['delay']['queue_name_pattern']
));
$queue->setArguments([
'x-message-ttl' => $delay,
'x-dead-letter-exchange' => $this->exchange()->getName(),
'x-dead-letter-exchange' => $this->exchangeOptions['name'],
// after being released from to DLX, make sure the original routing key will be used
// we must use an empty string instead of null for the argument to be picked up
'x-dead-letter-routing-key' => $routingKey ?? '',
]);
// after being released from to DLX, make sure the original routing key will be used
// we must use an empty string instead of null for the argument to be picked up
$queue->setArgument('x-dead-letter-routing-key', $routingKey ?? '');
return $queue;
}
private function getRoutingKeyForDelay(int $delay, ?string $finalRoutingKey): string
{
return str_replace(
['%delay%', '%routing_key%'],
[$delay, $finalRoutingKey ?? ''],
['%delay%', '%exchange_name%', '%routing_key%'],
[$delay, $this->exchangeOptions['name'], $finalRoutingKey ?? ''],
$this->connectionOptions['delay']['routing_key_pattern']
);
}
@ -353,7 +352,7 @@ class Connection
foreach ($this->queuesOptions as $queueName => $queueConfig) {
$this->queue($queueName)->declareQueue();
foreach ($queueConfig['binding_keys'] ?? [null] as $bindingKey) {
$this->queue($queueName)->bind($this->exchange()->getName(), $bindingKey);
$this->queue($queueName)->bind($this->exchangeOptions['name'], $bindingKey);
}
}
}
@ -377,6 +376,7 @@ class Connection
} catch (\AMQPConnectionException $e) {
$credentials = $this->connectionOptions;
$credentials['password'] = '********';
unset($credentials['delay']);
throw new \AMQPException(sprintf('Could not connect to the AMQP server. Please verify the provided DSN. (%s)', json_encode($credentials)), 0, $e);
}