[Messenger] make delay exchange and queues durable like the normal ones by default
This commit is contained in:
parent
9b66113701
commit
e5ecda6de1
@ -337,7 +337,7 @@ class ConnectionTest extends TestCase
|
|||||||
$amqpQueue->expects($this->once())->method('setName')->with(self::DEFAULT_EXCHANGE_NAME);
|
$amqpQueue->expects($this->once())->method('setName')->with(self::DEFAULT_EXCHANGE_NAME);
|
||||||
$amqpQueue->expects($this->once())->method('declareQueue');
|
$amqpQueue->expects($this->once())->method('declareQueue');
|
||||||
|
|
||||||
$delayExchange->expects($this->once())->method('setName')->with('delay');
|
$delayExchange->expects($this->once())->method('setName')->with('delays');
|
||||||
$delayExchange->expects($this->once())->method('declareExchange');
|
$delayExchange->expects($this->once())->method('declareExchange');
|
||||||
$delayExchange->expects($this->once())->method('publish');
|
$delayExchange->expects($this->once())->method('publish');
|
||||||
|
|
||||||
@ -371,7 +371,7 @@ class ConnectionTest extends TestCase
|
|||||||
]);
|
]);
|
||||||
|
|
||||||
$delayQueue->expects($this->once())->method('declareQueue');
|
$delayQueue->expects($this->once())->method('declareQueue');
|
||||||
$delayQueue->expects($this->once())->method('bind')->with('delay', 'delay_messages__5000');
|
$delayQueue->expects($this->once())->method('bind')->with('delays', 'delay_messages__5000');
|
||||||
|
|
||||||
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages__5000', AMQP_NOPARAM, ['headers' => ['x-some-headers' => 'foo']]);
|
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages__5000', AMQP_NOPARAM, ['headers' => ['x-some-headers' => 'foo']]);
|
||||||
|
|
||||||
@ -413,7 +413,7 @@ class ConnectionTest extends TestCase
|
|||||||
]);
|
]);
|
||||||
|
|
||||||
$delayQueue->expects($this->once())->method('declareQueue');
|
$delayQueue->expects($this->once())->method('declareQueue');
|
||||||
$delayQueue->expects($this->once())->method('bind')->with('delay', 'delay_messages__120000');
|
$delayQueue->expects($this->once())->method('bind')->with('delays', 'delay_messages__120000');
|
||||||
|
|
||||||
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages__120000', AMQP_NOPARAM, ['headers' => []]);
|
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages__120000', AMQP_NOPARAM, ['headers' => []]);
|
||||||
$connection->publish('{}', [], 120000);
|
$connection->publish('{}', [], 120000);
|
||||||
@ -517,7 +517,7 @@ class ConnectionTest extends TestCase
|
|||||||
]);
|
]);
|
||||||
|
|
||||||
$delayQueue->expects($this->once())->method('declareQueue');
|
$delayQueue->expects($this->once())->method('declareQueue');
|
||||||
$delayQueue->expects($this->once())->method('bind')->with('delay', 'delay_messages_routing_key_120000');
|
$delayQueue->expects($this->once())->method('bind')->with('delays', 'delay_messages_routing_key_120000');
|
||||||
|
|
||||||
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages_routing_key_120000', AMQP_NOPARAM, ['headers' => []]);
|
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages_routing_key_120000', AMQP_NOPARAM, ['headers' => []]);
|
||||||
$connection->publish('{}', [], 120000, new AmqpStamp('routing_key'));
|
$connection->publish('{}', [], 120000, new AmqpStamp('routing_key'));
|
||||||
|
@ -62,7 +62,7 @@ class Connection
|
|||||||
{
|
{
|
||||||
$this->connectionOptions = array_replace_recursive([
|
$this->connectionOptions = array_replace_recursive([
|
||||||
'delay' => [
|
'delay' => [
|
||||||
'exchange_name' => 'delay',
|
'exchange_name' => 'delays',
|
||||||
'queue_name_pattern' => 'delay_%exchange_name%_%routing_key%_%delay%',
|
'queue_name_pattern' => 'delay_%exchange_name%_%routing_key%_%delay%',
|
||||||
],
|
],
|
||||||
], $connectionOptions);
|
], $connectionOptions);
|
||||||
@ -93,7 +93,7 @@ class Connection
|
|||||||
* * arguments: Extra arguments
|
* * arguments: Extra arguments
|
||||||
* * delay:
|
* * delay:
|
||||||
* * queue_name_pattern: Pattern to use to create the queues (Default: "delay_%exchange_name%_%routing_key%_%delay%")
|
* * queue_name_pattern: Pattern to use to create the queues (Default: "delay_%exchange_name%_%routing_key%_%delay%")
|
||||||
* * exchange_name: Name of the exchange to be used for the delayed/retried messages (Default: "delay")
|
* * exchange_name: Name of the exchange to be used for the delayed/retried messages (Default: "delays")
|
||||||
* * auto_setup: Enable or not the auto-setup of queues and exchanges (Default: true)
|
* * auto_setup: Enable or not the auto-setup of queues and exchanges (Default: true)
|
||||||
* * prefetch_count: set channel prefetch count
|
* * prefetch_count: set channel prefetch count
|
||||||
*/
|
*/
|
||||||
@ -252,6 +252,11 @@ class Connection
|
|||||||
$this->amqpDelayExchange = $this->amqpFactory->createExchange($this->channel());
|
$this->amqpDelayExchange = $this->amqpFactory->createExchange($this->channel());
|
||||||
$this->amqpDelayExchange->setName($this->connectionOptions['delay']['exchange_name']);
|
$this->amqpDelayExchange->setName($this->connectionOptions['delay']['exchange_name']);
|
||||||
$this->amqpDelayExchange->setType(AMQP_EX_TYPE_DIRECT);
|
$this->amqpDelayExchange->setType(AMQP_EX_TYPE_DIRECT);
|
||||||
|
if ('delays' === $this->connectionOptions['delay']['exchange_name']) {
|
||||||
|
// only add the new flag when the name was not provided explicitly so we're using the new default name to prevent a redeclaration error
|
||||||
|
// the condition will be removed in 4.4
|
||||||
|
$this->amqpDelayExchange->setFlags(AMQP_DURABLE);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return $this->amqpDelayExchange;
|
return $this->amqpDelayExchange;
|
||||||
@ -274,16 +279,24 @@ class Connection
|
|||||||
[$delay, $this->exchangeOptions['name'], $routingKey ?? ''],
|
[$delay, $this->exchangeOptions['name'], $routingKey ?? ''],
|
||||||
$this->connectionOptions['delay']['queue_name_pattern']
|
$this->connectionOptions['delay']['queue_name_pattern']
|
||||||
));
|
));
|
||||||
$queue->setArguments([
|
if ('delay_%exchange_name%_%routing_key%_%delay%' === $this->connectionOptions['delay']['queue_name_pattern']) {
|
||||||
'x-message-ttl' => $delay,
|
// the condition will be removed in 4.4
|
||||||
|
$queue->setFlags(AMQP_DURABLE);
|
||||||
|
$extraArguments = [
|
||||||
// delete the delay queue 10 seconds after the message expires
|
// delete the delay queue 10 seconds after the message expires
|
||||||
// publishing another message redeclares the queue which renews the lease
|
// publishing another message redeclares the queue which renews the lease
|
||||||
'x-expires' => $delay + 10000,
|
'x-expires' => $delay + 10000,
|
||||||
|
];
|
||||||
|
} else {
|
||||||
|
$extraArguments = [];
|
||||||
|
}
|
||||||
|
$queue->setArguments([
|
||||||
|
'x-message-ttl' => $delay,
|
||||||
'x-dead-letter-exchange' => $this->exchangeOptions['name'],
|
'x-dead-letter-exchange' => $this->exchangeOptions['name'],
|
||||||
// after being released from to DLX, make sure the original routing key will be used
|
// after being released from to DLX, make sure the original routing key will be used
|
||||||
// we must use an empty string instead of null for the argument to be picked up
|
// we must use an empty string instead of null for the argument to be picked up
|
||||||
'x-dead-letter-routing-key' => $routingKey ?? '',
|
'x-dead-letter-routing-key' => $routingKey ?? '',
|
||||||
]);
|
] + $extraArguments);
|
||||||
|
|
||||||
return $queue;
|
return $queue;
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user