bug #32631 [Messenger] expire delay queue and fix auto_setup logic (Tobion)

This PR was merged into the 4.3 branch.

Discussion
----------

[Messenger] expire delay queue and fix auto_setup logic

| Q             | A
| ------------- | ---
| Branch?       | 4.3
| Bug fix?      | yes
| New feature?  | yes
| BC breaks?    | no     <!-- see https://symfony.com/bc -->
| Deprecations? | no <!-- please update UPGRADE-*.md and src/**/CHANGELOG.md files -->
| Tests pass?   | yes    <!-- please add some, will be required by reviewers -->
| Fixed tickets | #32588
| License       | MIT
| Doc PR        |

Tested successfully

Commits
-------

7aee83a71f [Messenger] expire delay queue and fix auto_setup logic
This commit is contained in:
Tobias Schultze 2019-07-28 16:14:21 +02:00
commit d7d6d923ef
3 changed files with 84 additions and 64 deletions

View File

@ -308,7 +308,7 @@ class ConnectionTest extends TestCase
);
// makes sure the channel looks connected, so it's not re-created
$amqpChannel->expects($this->exactly(2))->method('isConnected')->willReturn(true);
$amqpChannel->expects($this->any())->method('isConnected')->willReturn(true);
$amqpChannel->expects($this->exactly(2))->method('setPrefetchCount')->with(2);
$connection = Connection::fromDsn('amqp://localhost?prefetch_count=2', [], $factory);
@ -317,30 +317,57 @@ class ConnectionTest extends TestCase
$connection->setup();
}
public function testItDelaysTheMessage()
public function testAutoSetupWithDelayDeclaresExchangeQueuesAndDelay()
{
$amqpConnection = $this->createMock(\AMQPConnection::class);
$amqpChannel = $this->createMock(\AMQPChannel::class);
$delayQueue = $this->createMock(\AMQPQueue::class);
$factory = $this->createMock(AmqpFactory::class);
$factory->method('createConnection')->willReturn($amqpConnection);
$factory->method('createChannel')->willReturn($amqpChannel);
$factory->method('createQueue')->willReturn($delayQueue);
$factory->method('createQueue')->will($this->onConsecutiveCalls(
$amqpQueue = $this->createMock(\AMQPQueue::class),
$delayQueue = $this->createMock(\AMQPQueue::class)
));
$factory->method('createExchange')->will($this->onConsecutiveCalls(
$amqpExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock(),
$delayExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock()
$amqpExchange = $this->createMock(\AMQPExchange::class),
$delayExchange = $this->createMock(\AMQPExchange::class)
));
$amqpExchange->expects($this->once())->method('setName')->with(self::DEFAULT_EXCHANGE_NAME);
$amqpExchange->expects($this->once())->method('declareExchange');
$amqpQueue->expects($this->once())->method('setName')->with(self::DEFAULT_EXCHANGE_NAME);
$amqpQueue->expects($this->once())->method('declareQueue');
$delayExchange->expects($this->once())->method('setName')->with('delay');
$delayExchange->expects($this->once())->method('declareExchange');
$delayExchange->expects($this->once())->method('publish');
$delayQueue->expects($this->once())->method('setName')->with('delay_queue_messages__5000');
$connection = Connection::fromDsn('amqp://localhost', [], $factory);
$connection->publish('{}', ['x-some-headers' => 'foo'], 5000);
}
public function testItDelaysTheMessage()
{
$amqpConnection = $this->createMock(\AMQPConnection::class);
$amqpChannel = $this->createMock(\AMQPChannel::class);
$factory = $this->createMock(AmqpFactory::class);
$factory->method('createConnection')->willReturn($amqpConnection);
$factory->method('createChannel')->willReturn($amqpChannel);
$factory->method('createQueue')->will($this->onConsecutiveCalls(
$this->createMock(\AMQPQueue::class),
$delayQueue = $this->createMock(\AMQPQueue::class)
));
$factory->method('createExchange')->will($this->onConsecutiveCalls(
$this->createMock(\AMQPExchange::class),
$delayExchange = $this->createMock(\AMQPExchange::class)
));
$delayQueue->expects($this->once())->method('setName')->with('delay_messages__5000');
$delayQueue->expects($this->once())->method('setArguments')->with([
'x-message-ttl' => 5000,
'x-expires' => 5000 + 10000,
'x-dead-letter-exchange' => self::DEFAULT_EXCHANGE_NAME,
'x-dead-letter-routing-key' => '',
]);
@ -358,22 +385,18 @@ class ConnectionTest extends TestCase
{
$amqpConnection = $this->createMock(\AMQPConnection::class);
$amqpChannel = $this->createMock(\AMQPChannel::class);
$delayQueue = $this->createMock(\AMQPQueue::class);
$factory = $this->createMock(AmqpFactory::class);
$factory->method('createConnection')->willReturn($amqpConnection);
$factory->method('createChannel')->willReturn($amqpChannel);
$factory->method('createQueue')->willReturn($delayQueue);
$factory->method('createExchange')->will($this->onConsecutiveCalls(
$amqpExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock(),
$delayExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock()
$factory->method('createQueue')->will($this->onConsecutiveCalls(
$this->createMock(\AMQPQueue::class),
$delayQueue = $this->createMock(\AMQPQueue::class)
));
$factory->method('createExchange')->will($this->onConsecutiveCalls(
$this->createMock(\AMQPExchange::class),
$delayExchange = $this->createMock(\AMQPExchange::class)
));
$amqpExchange->expects($this->once())->method('setName')->with(self::DEFAULT_EXCHANGE_NAME);
$amqpExchange->expects($this->once())->method('declareExchange');
$delayExchange->expects($this->once())->method('setName')->with('delay');
$delayExchange->expects($this->once())->method('declareExchange');
$connectionOptions = [
'retry' => [
@ -383,9 +406,10 @@ class ConnectionTest extends TestCase
$connection = Connection::fromDsn('amqp://localhost', $connectionOptions, $factory);
$delayQueue->expects($this->once())->method('setName')->with('delay_queue_messages__120000');
$delayQueue->expects($this->once())->method('setName')->with('delay_messages__120000');
$delayQueue->expects($this->once())->method('setArguments')->with([
'x-message-ttl' => 120000,
'x-expires' => 120000 + 10000,
'x-dead-letter-exchange' => self::DEFAULT_EXCHANGE_NAME,
'x-dead-letter-routing-key' => '',
]);
@ -467,23 +491,19 @@ class ConnectionTest extends TestCase
{
$amqpConnection = $this->createMock(\AMQPConnection::class);
$amqpChannel = $this->createMock(\AMQPChannel::class);
$delayQueue = $this->createMock(\AMQPQueue::class);
$factory = $this->createMock(AmqpFactory::class);
$factory->method('createConnection')->willReturn($amqpConnection);
$factory->method('createChannel')->willReturn($amqpChannel);
$factory->method('createQueue')->willReturn($delayQueue);
$factory->method('createQueue')->will($this->onConsecutiveCalls(
$this->createMock(\AMQPQueue::class),
$delayQueue = $this->createMock(\AMQPQueue::class)
));
$factory->method('createExchange')->will($this->onConsecutiveCalls(
$amqpExchange = $this->createMock(\AMQPExchange::class),
$this->createMock(\AMQPExchange::class),
$delayExchange = $this->createMock(\AMQPExchange::class)
));
$amqpExchange->expects($this->once())->method('setName')->with(self::DEFAULT_EXCHANGE_NAME);
$amqpExchange->expects($this->once())->method('declareExchange');
$delayExchange->expects($this->once())->method('setName')->with('delay');
$delayExchange->expects($this->once())->method('declareExchange');
$connectionOptions = [
'retry' => [
'dead_routing_key' => 'my_dead_routing_key',
@ -492,9 +512,10 @@ class ConnectionTest extends TestCase
$connection = Connection::fromDsn('amqp://localhost', $connectionOptions, $factory);
$delayQueue->expects($this->once())->method('setName')->with('delay_queue_messages_routing_key_120000');
$delayQueue->expects($this->once())->method('setName')->with('delay_messages_routing_key_120000');
$delayQueue->expects($this->once())->method('setArguments')->with([
'x-message-ttl' => 120000,
'x-expires' => 120000 + 10000,
'x-dead-letter-exchange' => self::DEFAULT_EXCHANGE_NAME,
'x-dead-letter-routing-key' => 'routing_key',
]);

View File

@ -45,10 +45,7 @@ class AmqpSender implements SenderInterface
/** @var DelayStamp|null $delayStamp */
$delayStamp = $envelope->last(DelayStamp::class);
$delay = 0;
if (null !== $delayStamp) {
$delay = $delayStamp->getDelay();
}
$delay = $delayStamp ? $delayStamp->getDelay() : 0;
$amqpStamp = $envelope->last(AmqpStamp::class);
if (isset($encodedMessage['headers']['Content-Type'])) {

View File

@ -62,9 +62,8 @@ class Connection
{
$this->connectionOptions = array_replace_recursive([
'delay' => [
'routing_key_pattern' => 'delay_%exchange_name%_%routing_key%_%delay%',
'exchange_name' => 'delay',
'queue_name_pattern' => 'delay_queue_%exchange_name%_%routing_key%_%delay%',
'queue_name_pattern' => 'delay_%exchange_name%_%routing_key%_%delay%',
],
], $connectionOptions);
$this->exchangeOptions = $exchangeOptions;
@ -93,9 +92,8 @@ class Connection
* * flags: Exchange flags (Default: AMQP_DURABLE)
* * arguments: Extra arguments
* * delay:
* * routing_key_pattern: The pattern of the routing key (Default: "delay_%exchange_name%_%routing_key%_%delay%")
* * queue_name_pattern: Pattern to use to create the queues (Default: "delay_queue_%exchange_name%_%routing_key%_%delay%")
* * exchange_name: Name of the exchange to be used for the retried messages (Default: "delay")
* * queue_name_pattern: Pattern to use to create the queues (Default: "delay_%exchange_name%_%routing_key%_%delay%")
* * exchange_name: Name of the exchange to be used for the delayed/retried messages (Default: "delay")
* * auto_setup: Enable or not the auto-setup of queues and exchanges (Default: true)
* * prefetch_count: set channel prefetch count
*/
@ -171,20 +169,20 @@ class Connection
}
/**
* @param int $delay The delay in milliseconds
*
* @throws \AMQPException
*/
public function publish(string $body, array $headers = [], int $delay = 0, AmqpStamp $amqpStamp = null): void
public function publish(string $body, array $headers = [], int $delayInMs = 0, AmqpStamp $amqpStamp = null): void
{
if (0 !== $delay) {
$this->publishWithDelay($body, $headers, $delay, $amqpStamp);
$this->clearWhenDisconnected();
if (0 !== $delayInMs) {
$this->publishWithDelay($body, $headers, $delayInMs, $amqpStamp);
return;
}
if ($this->shouldSetup()) {
$this->setup();
$this->setupExchangeAndQueues();
}
$this->publishOnExchange(
@ -213,9 +211,7 @@ class Connection
{
$routingKey = $this->getRoutingKeyForMessage($amqpStamp);
if ($this->shouldSetup()) {
$this->setupDelay($delay, $routingKey);
}
$this->setupDelay($delay, $routingKey);
$this->publishOnExchange(
$this->getDelayExchange(),
@ -241,15 +237,12 @@ class Connection
private function setupDelay(int $delay, ?string $routingKey)
{
if (!$this->channel()->isConnected()) {
$this->clear();
if ($this->shouldSetup()) {
$this->setup(); // setup delay exchange and normal exchange for delay queue to DLX messages to
}
$this->exchange()->declareExchange(); // setup normal exchange for delay queue to DLX messages to
$this->getDelayExchange()->declareExchange();
$queue = $this->createDelayQueue($delay, $routingKey);
$queue->declareQueue();
$queue->declareQueue(); // the delay queue always need to be declared because the name is dynamic and cannot be declared in advance
$queue->bind($this->connectionOptions['delay']['exchange_name'], $this->getRoutingKeyForDelay($delay, $routingKey));
}
@ -283,6 +276,9 @@ class Connection
));
$queue->setArguments([
'x-message-ttl' => $delay,
// delete the delay queue 10 seconds after the message expires
// publishing another message redeclares the queue which renews the lease
'x-expires' => $delay + 10000,
'x-dead-letter-exchange' => $this->exchangeOptions['name'],
// after being released from to DLX, make sure the original routing key will be used
// we must use an empty string instead of null for the argument to be picked up
@ -297,7 +293,7 @@ class Connection
return str_replace(
['%delay%', '%exchange_name%', '%routing_key%'],
[$delay, $this->exchangeOptions['name'], $finalRoutingKey ?? ''],
$this->connectionOptions['delay']['routing_key_pattern']
$this->connectionOptions['delay']['queue_name_pattern']
);
}
@ -308,8 +304,10 @@ class Connection
*/
public function get(string $queueName): ?\AMQPEnvelope
{
$this->clearWhenDisconnected();
if ($this->shouldSetup()) {
$this->setup();
$this->setupExchangeAndQueues();
}
try {
@ -319,7 +317,7 @@ class Connection
} catch (\AMQPQueueException $e) {
if (404 === $e->getCode() && $this->shouldSetup()) {
// If we get a 404 for the queue, it means we need to setup the exchange & queue.
$this->setup();
$this->setupExchangeAndQueues();
return $this->get();
}
@ -342,10 +340,12 @@ class Connection
public function setup(): void
{
if (!$this->channel()->isConnected()) {
$this->clear();
}
$this->setupExchangeAndQueues();
$this->getDelayExchange()->declareExchange();
}
private function setupExchangeAndQueues(): void
{
$this->exchange()->declareExchange();
foreach ($this->queuesOptions as $queueName => $queueConfig) {
@ -424,12 +424,14 @@ class Connection
return $this->amqpExchange;
}
private function clear(): void
private function clearWhenDisconnected(): void
{
$this->amqpChannel = null;
$this->amqpQueues = [];
$this->amqpExchange = null;
$this->amqpDelayExchange = null;
if (!$this->channel()->isConnected()) {
$this->amqpChannel = null;
$this->amqpQueues = [];
$this->amqpExchange = null;
$this->amqpDelayExchange = null;
}
}
private function shouldSetup(): bool