[Messenger] expire delay queue and fix auto_setup logic
This commit is contained in:
parent
327fb95828
commit
7aee83a71f
@ -308,7 +308,7 @@ class ConnectionTest extends TestCase
|
|||||||
);
|
);
|
||||||
|
|
||||||
// makes sure the channel looks connected, so it's not re-created
|
// makes sure the channel looks connected, so it's not re-created
|
||||||
$amqpChannel->expects($this->exactly(2))->method('isConnected')->willReturn(true);
|
$amqpChannel->expects($this->any())->method('isConnected')->willReturn(true);
|
||||||
|
|
||||||
$amqpChannel->expects($this->exactly(2))->method('setPrefetchCount')->with(2);
|
$amqpChannel->expects($this->exactly(2))->method('setPrefetchCount')->with(2);
|
||||||
$connection = Connection::fromDsn('amqp://localhost?prefetch_count=2', [], $factory);
|
$connection = Connection::fromDsn('amqp://localhost?prefetch_count=2', [], $factory);
|
||||||
@ -317,30 +317,57 @@ class ConnectionTest extends TestCase
|
|||||||
$connection->setup();
|
$connection->setup();
|
||||||
}
|
}
|
||||||
|
|
||||||
public function testItDelaysTheMessage()
|
public function testAutoSetupWithDelayDeclaresExchangeQueuesAndDelay()
|
||||||
{
|
{
|
||||||
$amqpConnection = $this->createMock(\AMQPConnection::class);
|
$amqpConnection = $this->createMock(\AMQPConnection::class);
|
||||||
$amqpChannel = $this->createMock(\AMQPChannel::class);
|
$amqpChannel = $this->createMock(\AMQPChannel::class);
|
||||||
$delayQueue = $this->createMock(\AMQPQueue::class);
|
|
||||||
|
|
||||||
$factory = $this->createMock(AmqpFactory::class);
|
$factory = $this->createMock(AmqpFactory::class);
|
||||||
$factory->method('createConnection')->willReturn($amqpConnection);
|
$factory->method('createConnection')->willReturn($amqpConnection);
|
||||||
$factory->method('createChannel')->willReturn($amqpChannel);
|
$factory->method('createChannel')->willReturn($amqpChannel);
|
||||||
$factory->method('createQueue')->willReturn($delayQueue);
|
$factory->method('createQueue')->will($this->onConsecutiveCalls(
|
||||||
|
$amqpQueue = $this->createMock(\AMQPQueue::class),
|
||||||
|
$delayQueue = $this->createMock(\AMQPQueue::class)
|
||||||
|
));
|
||||||
$factory->method('createExchange')->will($this->onConsecutiveCalls(
|
$factory->method('createExchange')->will($this->onConsecutiveCalls(
|
||||||
$amqpExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock(),
|
$amqpExchange = $this->createMock(\AMQPExchange::class),
|
||||||
$delayExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock()
|
$delayExchange = $this->createMock(\AMQPExchange::class)
|
||||||
));
|
));
|
||||||
|
|
||||||
$amqpExchange->expects($this->once())->method('setName')->with(self::DEFAULT_EXCHANGE_NAME);
|
$amqpExchange->expects($this->once())->method('setName')->with(self::DEFAULT_EXCHANGE_NAME);
|
||||||
$amqpExchange->expects($this->once())->method('declareExchange');
|
$amqpExchange->expects($this->once())->method('declareExchange');
|
||||||
|
$amqpQueue->expects($this->once())->method('setName')->with(self::DEFAULT_EXCHANGE_NAME);
|
||||||
|
$amqpQueue->expects($this->once())->method('declareQueue');
|
||||||
|
|
||||||
$delayExchange->expects($this->once())->method('setName')->with('delay');
|
$delayExchange->expects($this->once())->method('setName')->with('delay');
|
||||||
$delayExchange->expects($this->once())->method('declareExchange');
|
$delayExchange->expects($this->once())->method('declareExchange');
|
||||||
|
$delayExchange->expects($this->once())->method('publish');
|
||||||
|
|
||||||
$delayQueue->expects($this->once())->method('setName')->with('delay_queue_messages__5000');
|
$connection = Connection::fromDsn('amqp://localhost', [], $factory);
|
||||||
|
$connection->publish('{}', ['x-some-headers' => 'foo'], 5000);
|
||||||
|
}
|
||||||
|
|
||||||
|
public function testItDelaysTheMessage()
|
||||||
|
{
|
||||||
|
$amqpConnection = $this->createMock(\AMQPConnection::class);
|
||||||
|
$amqpChannel = $this->createMock(\AMQPChannel::class);
|
||||||
|
|
||||||
|
$factory = $this->createMock(AmqpFactory::class);
|
||||||
|
$factory->method('createConnection')->willReturn($amqpConnection);
|
||||||
|
$factory->method('createChannel')->willReturn($amqpChannel);
|
||||||
|
$factory->method('createQueue')->will($this->onConsecutiveCalls(
|
||||||
|
$this->createMock(\AMQPQueue::class),
|
||||||
|
$delayQueue = $this->createMock(\AMQPQueue::class)
|
||||||
|
));
|
||||||
|
$factory->method('createExchange')->will($this->onConsecutiveCalls(
|
||||||
|
$this->createMock(\AMQPExchange::class),
|
||||||
|
$delayExchange = $this->createMock(\AMQPExchange::class)
|
||||||
|
));
|
||||||
|
|
||||||
|
$delayQueue->expects($this->once())->method('setName')->with('delay_messages__5000');
|
||||||
$delayQueue->expects($this->once())->method('setArguments')->with([
|
$delayQueue->expects($this->once())->method('setArguments')->with([
|
||||||
'x-message-ttl' => 5000,
|
'x-message-ttl' => 5000,
|
||||||
|
'x-expires' => 5000 + 10000,
|
||||||
'x-dead-letter-exchange' => self::DEFAULT_EXCHANGE_NAME,
|
'x-dead-letter-exchange' => self::DEFAULT_EXCHANGE_NAME,
|
||||||
'x-dead-letter-routing-key' => '',
|
'x-dead-letter-routing-key' => '',
|
||||||
]);
|
]);
|
||||||
@ -358,22 +385,18 @@ class ConnectionTest extends TestCase
|
|||||||
{
|
{
|
||||||
$amqpConnection = $this->createMock(\AMQPConnection::class);
|
$amqpConnection = $this->createMock(\AMQPConnection::class);
|
||||||
$amqpChannel = $this->createMock(\AMQPChannel::class);
|
$amqpChannel = $this->createMock(\AMQPChannel::class);
|
||||||
$delayQueue = $this->createMock(\AMQPQueue::class);
|
|
||||||
|
|
||||||
$factory = $this->createMock(AmqpFactory::class);
|
$factory = $this->createMock(AmqpFactory::class);
|
||||||
$factory->method('createConnection')->willReturn($amqpConnection);
|
$factory->method('createConnection')->willReturn($amqpConnection);
|
||||||
$factory->method('createChannel')->willReturn($amqpChannel);
|
$factory->method('createChannel')->willReturn($amqpChannel);
|
||||||
$factory->method('createQueue')->willReturn($delayQueue);
|
$factory->method('createQueue')->will($this->onConsecutiveCalls(
|
||||||
$factory->method('createExchange')->will($this->onConsecutiveCalls(
|
$this->createMock(\AMQPQueue::class),
|
||||||
$amqpExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock(),
|
$delayQueue = $this->createMock(\AMQPQueue::class)
|
||||||
$delayExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock()
|
));
|
||||||
|
$factory->method('createExchange')->will($this->onConsecutiveCalls(
|
||||||
|
$this->createMock(\AMQPExchange::class),
|
||||||
|
$delayExchange = $this->createMock(\AMQPExchange::class)
|
||||||
));
|
));
|
||||||
|
|
||||||
$amqpExchange->expects($this->once())->method('setName')->with(self::DEFAULT_EXCHANGE_NAME);
|
|
||||||
$amqpExchange->expects($this->once())->method('declareExchange');
|
|
||||||
|
|
||||||
$delayExchange->expects($this->once())->method('setName')->with('delay');
|
|
||||||
$delayExchange->expects($this->once())->method('declareExchange');
|
|
||||||
|
|
||||||
$connectionOptions = [
|
$connectionOptions = [
|
||||||
'retry' => [
|
'retry' => [
|
||||||
@ -383,9 +406,10 @@ class ConnectionTest extends TestCase
|
|||||||
|
|
||||||
$connection = Connection::fromDsn('amqp://localhost', $connectionOptions, $factory);
|
$connection = Connection::fromDsn('amqp://localhost', $connectionOptions, $factory);
|
||||||
|
|
||||||
$delayQueue->expects($this->once())->method('setName')->with('delay_queue_messages__120000');
|
$delayQueue->expects($this->once())->method('setName')->with('delay_messages__120000');
|
||||||
$delayQueue->expects($this->once())->method('setArguments')->with([
|
$delayQueue->expects($this->once())->method('setArguments')->with([
|
||||||
'x-message-ttl' => 120000,
|
'x-message-ttl' => 120000,
|
||||||
|
'x-expires' => 120000 + 10000,
|
||||||
'x-dead-letter-exchange' => self::DEFAULT_EXCHANGE_NAME,
|
'x-dead-letter-exchange' => self::DEFAULT_EXCHANGE_NAME,
|
||||||
'x-dead-letter-routing-key' => '',
|
'x-dead-letter-routing-key' => '',
|
||||||
]);
|
]);
|
||||||
@ -467,23 +491,19 @@ class ConnectionTest extends TestCase
|
|||||||
{
|
{
|
||||||
$amqpConnection = $this->createMock(\AMQPConnection::class);
|
$amqpConnection = $this->createMock(\AMQPConnection::class);
|
||||||
$amqpChannel = $this->createMock(\AMQPChannel::class);
|
$amqpChannel = $this->createMock(\AMQPChannel::class);
|
||||||
$delayQueue = $this->createMock(\AMQPQueue::class);
|
|
||||||
|
|
||||||
$factory = $this->createMock(AmqpFactory::class);
|
$factory = $this->createMock(AmqpFactory::class);
|
||||||
$factory->method('createConnection')->willReturn($amqpConnection);
|
$factory->method('createConnection')->willReturn($amqpConnection);
|
||||||
$factory->method('createChannel')->willReturn($amqpChannel);
|
$factory->method('createChannel')->willReturn($amqpChannel);
|
||||||
$factory->method('createQueue')->willReturn($delayQueue);
|
$factory->method('createQueue')->will($this->onConsecutiveCalls(
|
||||||
|
$this->createMock(\AMQPQueue::class),
|
||||||
|
$delayQueue = $this->createMock(\AMQPQueue::class)
|
||||||
|
));
|
||||||
$factory->method('createExchange')->will($this->onConsecutiveCalls(
|
$factory->method('createExchange')->will($this->onConsecutiveCalls(
|
||||||
$amqpExchange = $this->createMock(\AMQPExchange::class),
|
$this->createMock(\AMQPExchange::class),
|
||||||
$delayExchange = $this->createMock(\AMQPExchange::class)
|
$delayExchange = $this->createMock(\AMQPExchange::class)
|
||||||
));
|
));
|
||||||
|
|
||||||
$amqpExchange->expects($this->once())->method('setName')->with(self::DEFAULT_EXCHANGE_NAME);
|
|
||||||
$amqpExchange->expects($this->once())->method('declareExchange');
|
|
||||||
|
|
||||||
$delayExchange->expects($this->once())->method('setName')->with('delay');
|
|
||||||
$delayExchange->expects($this->once())->method('declareExchange');
|
|
||||||
|
|
||||||
$connectionOptions = [
|
$connectionOptions = [
|
||||||
'retry' => [
|
'retry' => [
|
||||||
'dead_routing_key' => 'my_dead_routing_key',
|
'dead_routing_key' => 'my_dead_routing_key',
|
||||||
@ -492,9 +512,10 @@ class ConnectionTest extends TestCase
|
|||||||
|
|
||||||
$connection = Connection::fromDsn('amqp://localhost', $connectionOptions, $factory);
|
$connection = Connection::fromDsn('amqp://localhost', $connectionOptions, $factory);
|
||||||
|
|
||||||
$delayQueue->expects($this->once())->method('setName')->with('delay_queue_messages_routing_key_120000');
|
$delayQueue->expects($this->once())->method('setName')->with('delay_messages_routing_key_120000');
|
||||||
$delayQueue->expects($this->once())->method('setArguments')->with([
|
$delayQueue->expects($this->once())->method('setArguments')->with([
|
||||||
'x-message-ttl' => 120000,
|
'x-message-ttl' => 120000,
|
||||||
|
'x-expires' => 120000 + 10000,
|
||||||
'x-dead-letter-exchange' => self::DEFAULT_EXCHANGE_NAME,
|
'x-dead-letter-exchange' => self::DEFAULT_EXCHANGE_NAME,
|
||||||
'x-dead-letter-routing-key' => 'routing_key',
|
'x-dead-letter-routing-key' => 'routing_key',
|
||||||
]);
|
]);
|
||||||
|
@ -45,10 +45,7 @@ class AmqpSender implements SenderInterface
|
|||||||
|
|
||||||
/** @var DelayStamp|null $delayStamp */
|
/** @var DelayStamp|null $delayStamp */
|
||||||
$delayStamp = $envelope->last(DelayStamp::class);
|
$delayStamp = $envelope->last(DelayStamp::class);
|
||||||
$delay = 0;
|
$delay = $delayStamp ? $delayStamp->getDelay() : 0;
|
||||||
if (null !== $delayStamp) {
|
|
||||||
$delay = $delayStamp->getDelay();
|
|
||||||
}
|
|
||||||
|
|
||||||
$amqpStamp = $envelope->last(AmqpStamp::class);
|
$amqpStamp = $envelope->last(AmqpStamp::class);
|
||||||
if (isset($encodedMessage['headers']['Content-Type'])) {
|
if (isset($encodedMessage['headers']['Content-Type'])) {
|
||||||
|
@ -62,9 +62,8 @@ class Connection
|
|||||||
{
|
{
|
||||||
$this->connectionOptions = array_replace_recursive([
|
$this->connectionOptions = array_replace_recursive([
|
||||||
'delay' => [
|
'delay' => [
|
||||||
'routing_key_pattern' => 'delay_%exchange_name%_%routing_key%_%delay%',
|
|
||||||
'exchange_name' => 'delay',
|
'exchange_name' => 'delay',
|
||||||
'queue_name_pattern' => 'delay_queue_%exchange_name%_%routing_key%_%delay%',
|
'queue_name_pattern' => 'delay_%exchange_name%_%routing_key%_%delay%',
|
||||||
],
|
],
|
||||||
], $connectionOptions);
|
], $connectionOptions);
|
||||||
$this->exchangeOptions = $exchangeOptions;
|
$this->exchangeOptions = $exchangeOptions;
|
||||||
@ -93,9 +92,8 @@ class Connection
|
|||||||
* * flags: Exchange flags (Default: AMQP_DURABLE)
|
* * flags: Exchange flags (Default: AMQP_DURABLE)
|
||||||
* * arguments: Extra arguments
|
* * arguments: Extra arguments
|
||||||
* * delay:
|
* * delay:
|
||||||
* * routing_key_pattern: The pattern of the routing key (Default: "delay_%exchange_name%_%routing_key%_%delay%")
|
* * queue_name_pattern: Pattern to use to create the queues (Default: "delay_%exchange_name%_%routing_key%_%delay%")
|
||||||
* * queue_name_pattern: Pattern to use to create the queues (Default: "delay_queue_%exchange_name%_%routing_key%_%delay%")
|
* * exchange_name: Name of the exchange to be used for the delayed/retried messages (Default: "delay")
|
||||||
* * exchange_name: Name of the exchange to be used for the retried messages (Default: "delay")
|
|
||||||
* * auto_setup: Enable or not the auto-setup of queues and exchanges (Default: true)
|
* * auto_setup: Enable or not the auto-setup of queues and exchanges (Default: true)
|
||||||
* * prefetch_count: set channel prefetch count
|
* * prefetch_count: set channel prefetch count
|
||||||
*/
|
*/
|
||||||
@ -171,20 +169,20 @@ class Connection
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param int $delay The delay in milliseconds
|
|
||||||
*
|
|
||||||
* @throws \AMQPException
|
* @throws \AMQPException
|
||||||
*/
|
*/
|
||||||
public function publish(string $body, array $headers = [], int $delay = 0, AmqpStamp $amqpStamp = null): void
|
public function publish(string $body, array $headers = [], int $delayInMs = 0, AmqpStamp $amqpStamp = null): void
|
||||||
{
|
{
|
||||||
if (0 !== $delay) {
|
$this->clearWhenDisconnected();
|
||||||
$this->publishWithDelay($body, $headers, $delay, $amqpStamp);
|
|
||||||
|
if (0 !== $delayInMs) {
|
||||||
|
$this->publishWithDelay($body, $headers, $delayInMs, $amqpStamp);
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if ($this->shouldSetup()) {
|
if ($this->shouldSetup()) {
|
||||||
$this->setup();
|
$this->setupExchangeAndQueues();
|
||||||
}
|
}
|
||||||
|
|
||||||
$this->publishOnExchange(
|
$this->publishOnExchange(
|
||||||
@ -213,9 +211,7 @@ class Connection
|
|||||||
{
|
{
|
||||||
$routingKey = $this->getRoutingKeyForMessage($amqpStamp);
|
$routingKey = $this->getRoutingKeyForMessage($amqpStamp);
|
||||||
|
|
||||||
if ($this->shouldSetup()) {
|
$this->setupDelay($delay, $routingKey);
|
||||||
$this->setupDelay($delay, $routingKey);
|
|
||||||
}
|
|
||||||
|
|
||||||
$this->publishOnExchange(
|
$this->publishOnExchange(
|
||||||
$this->getDelayExchange(),
|
$this->getDelayExchange(),
|
||||||
@ -241,15 +237,12 @@ class Connection
|
|||||||
|
|
||||||
private function setupDelay(int $delay, ?string $routingKey)
|
private function setupDelay(int $delay, ?string $routingKey)
|
||||||
{
|
{
|
||||||
if (!$this->channel()->isConnected()) {
|
if ($this->shouldSetup()) {
|
||||||
$this->clear();
|
$this->setup(); // setup delay exchange and normal exchange for delay queue to DLX messages to
|
||||||
}
|
}
|
||||||
|
|
||||||
$this->exchange()->declareExchange(); // setup normal exchange for delay queue to DLX messages to
|
|
||||||
$this->getDelayExchange()->declareExchange();
|
|
||||||
|
|
||||||
$queue = $this->createDelayQueue($delay, $routingKey);
|
$queue = $this->createDelayQueue($delay, $routingKey);
|
||||||
$queue->declareQueue();
|
$queue->declareQueue(); // the delay queue always need to be declared because the name is dynamic and cannot be declared in advance
|
||||||
$queue->bind($this->connectionOptions['delay']['exchange_name'], $this->getRoutingKeyForDelay($delay, $routingKey));
|
$queue->bind($this->connectionOptions['delay']['exchange_name'], $this->getRoutingKeyForDelay($delay, $routingKey));
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -283,6 +276,9 @@ class Connection
|
|||||||
));
|
));
|
||||||
$queue->setArguments([
|
$queue->setArguments([
|
||||||
'x-message-ttl' => $delay,
|
'x-message-ttl' => $delay,
|
||||||
|
// delete the delay queue 10 seconds after the message expires
|
||||||
|
// publishing another message redeclares the queue which renews the lease
|
||||||
|
'x-expires' => $delay + 10000,
|
||||||
'x-dead-letter-exchange' => $this->exchangeOptions['name'],
|
'x-dead-letter-exchange' => $this->exchangeOptions['name'],
|
||||||
// after being released from to DLX, make sure the original routing key will be used
|
// after being released from to DLX, make sure the original routing key will be used
|
||||||
// we must use an empty string instead of null for the argument to be picked up
|
// we must use an empty string instead of null for the argument to be picked up
|
||||||
@ -297,7 +293,7 @@ class Connection
|
|||||||
return str_replace(
|
return str_replace(
|
||||||
['%delay%', '%exchange_name%', '%routing_key%'],
|
['%delay%', '%exchange_name%', '%routing_key%'],
|
||||||
[$delay, $this->exchangeOptions['name'], $finalRoutingKey ?? ''],
|
[$delay, $this->exchangeOptions['name'], $finalRoutingKey ?? ''],
|
||||||
$this->connectionOptions['delay']['routing_key_pattern']
|
$this->connectionOptions['delay']['queue_name_pattern']
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -308,8 +304,10 @@ class Connection
|
|||||||
*/
|
*/
|
||||||
public function get(string $queueName): ?\AMQPEnvelope
|
public function get(string $queueName): ?\AMQPEnvelope
|
||||||
{
|
{
|
||||||
|
$this->clearWhenDisconnected();
|
||||||
|
|
||||||
if ($this->shouldSetup()) {
|
if ($this->shouldSetup()) {
|
||||||
$this->setup();
|
$this->setupExchangeAndQueues();
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
@ -319,7 +317,7 @@ class Connection
|
|||||||
} catch (\AMQPQueueException $e) {
|
} catch (\AMQPQueueException $e) {
|
||||||
if (404 === $e->getCode() && $this->shouldSetup()) {
|
if (404 === $e->getCode() && $this->shouldSetup()) {
|
||||||
// If we get a 404 for the queue, it means we need to setup the exchange & queue.
|
// If we get a 404 for the queue, it means we need to setup the exchange & queue.
|
||||||
$this->setup();
|
$this->setupExchangeAndQueues();
|
||||||
|
|
||||||
return $this->get();
|
return $this->get();
|
||||||
}
|
}
|
||||||
@ -342,10 +340,12 @@ class Connection
|
|||||||
|
|
||||||
public function setup(): void
|
public function setup(): void
|
||||||
{
|
{
|
||||||
if (!$this->channel()->isConnected()) {
|
$this->setupExchangeAndQueues();
|
||||||
$this->clear();
|
$this->getDelayExchange()->declareExchange();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private function setupExchangeAndQueues(): void
|
||||||
|
{
|
||||||
$this->exchange()->declareExchange();
|
$this->exchange()->declareExchange();
|
||||||
|
|
||||||
foreach ($this->queuesOptions as $queueName => $queueConfig) {
|
foreach ($this->queuesOptions as $queueName => $queueConfig) {
|
||||||
@ -424,12 +424,14 @@ class Connection
|
|||||||
return $this->amqpExchange;
|
return $this->amqpExchange;
|
||||||
}
|
}
|
||||||
|
|
||||||
private function clear(): void
|
private function clearWhenDisconnected(): void
|
||||||
{
|
{
|
||||||
$this->amqpChannel = null;
|
if (!$this->channel()->isConnected()) {
|
||||||
$this->amqpQueues = [];
|
$this->amqpChannel = null;
|
||||||
$this->amqpExchange = null;
|
$this->amqpQueues = [];
|
||||||
$this->amqpDelayExchange = null;
|
$this->amqpExchange = null;
|
||||||
|
$this->amqpDelayExchange = null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private function shouldSetup(): bool
|
private function shouldSetup(): bool
|
||||||
|
Reference in New Issue
Block a user