[Messenger] Ability to distinguish retry and delay actions

This commit is contained in:
theravel 2021-03-28 22:48:20 +02:00 committed by Nyholm
parent c469ea6fff
commit 417aaab6ee
No known key found for this signature in database
GPG Key ID: D6332DE2B6F8FA38
6 changed files with 150 additions and 51 deletions

View File

@ -6,6 +6,7 @@ CHANGELOG
* Deprecated the `prefetch_count` parameter, it has no effect and will be removed in Symfony 6.0. * Deprecated the `prefetch_count` parameter, it has no effect and will be removed in Symfony 6.0.
* `AmqpReceiver` implements `QueueReceiverInterface` to fetch messages from a specific set of queues. * `AmqpReceiver` implements `QueueReceiverInterface` to fetch messages from a specific set of queues.
* Add ability to distinguish retry and delay actions
5.2.0 5.2.0
----- -----

View File

@ -140,6 +140,45 @@ class AmqpExtIntegrationTest extends TestCase
$receiver->ack($envelope); $receiver->ack($envelope);
} }
public function testRetryAffectsOnlyOriginalQueue()
{
$connection = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN'), [
'exchange' => [
'name' => 'messages_topic',
'type' => 'topic',
'default_publish_routing_key' => 'topic_routing_key',
],
'queues' => [
'A' => ['binding_keys' => ['topic_routing_key']],
'B' => ['binding_keys' => ['topic_routing_key']],
],
]);
$connection->setup();
$connection->purgeQueues();
$serializer = $this->createSerializer();
$sender = new AmqpSender($connection, $serializer);
$receiver = new AmqpReceiver($connection, $serializer);
// initial delivery: should receive in both queues
$sender->send(new Envelope(new DummyMessage('Payload')));
$receivedEnvelopes = $this->receiveWithQueueName($receiver);
$this->assertCount(2, $receivedEnvelopes);
$this->assertArrayHasKey('A', $receivedEnvelopes);
$this->assertArrayHasKey('B', $receivedEnvelopes);
// retry: should receive in only "A" queue
$retryEnvelope = $receivedEnvelopes['A']
->with(new DelayStamp(10))
->with(new RedeliveryStamp(1));
$sender->send($retryEnvelope);
$retriedEnvelopes = $this->receiveWithQueueName($receiver);
$this->assertCount(1, $retriedEnvelopes);
$this->assertArrayHasKey('A', $retriedEnvelopes);
}
public function testItReceivesSignals() public function testItReceivesSignals()
{ {
$serializer = $this->createSerializer(); $serializer = $this->createSerializer();
@ -255,4 +294,19 @@ TXT
return $envelopes; return $envelopes;
} }
private function receiveWithQueueName(AmqpReceiver $receiver)
{
// let RabbitMQ receive messages
usleep(100 * 1000); // 100ms
$receivedEnvelopes = [];
foreach ($receiver->get() as $envelope) {
$queueName = $envelope->last(AmqpReceivedStamp::class)->getQueueName();
$receivedEnvelopes[$queueName] = $envelope;
$receiver->ack($envelope);
}
return $receivedEnvelopes;
}
} }

View File

@ -493,38 +493,32 @@ class ConnectionTest extends TestCase
public function testItDelaysTheMessage() public function testItDelaysTheMessage()
{ {
$amqpConnection = $this->createMock(\AMQPConnection::class); $delayExchange = $this->createMock(\AMQPExchange::class);
$amqpChannel = $this->createMock(\AMQPChannel::class); $delayExchange->expects($this->once())
->method('publish')
$factory = $this->createMock(AmqpFactory::class); ->with('{}', 'delay_messages__5000_delay', AMQP_NOPARAM, [
$factory->method('createConnection')->willReturn($amqpConnection); 'headers' => ['x-some-headers' => 'foo'],
$factory->method('createChannel')->willReturn($amqpChannel); 'delivery_mode' => 2,
$factory->method('createQueue')->will($this->onConsecutiveCalls( 'timestamp' => time(),
$this->createMock(\AMQPQueue::class),
$delayQueue = $this->createMock(\AMQPQueue::class)
));
$factory->method('createExchange')->will($this->onConsecutiveCalls(
$this->createMock(\AMQPExchange::class),
$delayExchange = $this->createMock(\AMQPExchange::class)
));
$delayQueue->expects($this->once())->method('setName')->with('delay_messages__5000');
$delayQueue->expects($this->once())->method('setArguments')->with([
'x-message-ttl' => 5000,
'x-expires' => 5000 + 10000,
'x-dead-letter-exchange' => self::DEFAULT_EXCHANGE_NAME,
'x-dead-letter-routing-key' => '',
]); ]);
$connection = $this->createDelayOrRetryConnection($delayExchange, self::DEFAULT_EXCHANGE_NAME, 'delay_messages__5000_delay');
$delayQueue->expects($this->once())->method('declareQueue');
$delayQueue->expects($this->once())->method('bind')->with('delays', 'delay_messages__5000');
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages__5000', \AMQP_NOPARAM, ['headers' => ['x-some-headers' => 'foo'], 'delivery_mode' => 2, 'timestamp' => time()]);
$connection = Connection::fromDsn('amqp://localhost', [], $factory);
$connection->publish('{}', ['x-some-headers' => 'foo'], 5000); $connection->publish('{}', ['x-some-headers' => 'foo'], 5000);
} }
public function testItRetriesTheMessage()
{
$delayExchange = $this->createMock(\AMQPExchange::class);
$delayExchange->expects($this->once())
->method('publish')
->with('{}', 'delay_messages__5000_retry', AMQP_NOPARAM);
$connection = $this->createDelayOrRetryConnection($delayExchange, '', 'delay_messages__5000_retry');
$amqpEnvelope = $this->createMock(\AMQPEnvelope::class);
$amqpStamp = AmqpStamp::createFromAmqpEnvelope($amqpEnvelope, null, '');
$connection->publish('{}', [], 5000, $amqpStamp);
}
public function testItDelaysTheMessageWithADifferentRoutingKeyAndTTLs() public function testItDelaysTheMessageWithADifferentRoutingKeyAndTTLs()
{ {
$amqpConnection = $this->createMock(\AMQPConnection::class); $amqpConnection = $this->createMock(\AMQPConnection::class);
@ -550,7 +544,7 @@ class ConnectionTest extends TestCase
$connection = Connection::fromDsn('amqp://localhost', $connectionOptions, $factory); $connection = Connection::fromDsn('amqp://localhost', $connectionOptions, $factory);
$delayQueue->expects($this->once())->method('setName')->with('delay_messages__120000'); $delayQueue->expects($this->once())->method('setName')->with('delay_messages__120000_delay');
$delayQueue->expects($this->once())->method('setArguments')->with([ $delayQueue->expects($this->once())->method('setArguments')->with([
'x-message-ttl' => 120000, 'x-message-ttl' => 120000,
'x-expires' => 120000 + 10000, 'x-expires' => 120000 + 10000,
@ -559,9 +553,9 @@ class ConnectionTest extends TestCase
]); ]);
$delayQueue->expects($this->once())->method('declareQueue'); $delayQueue->expects($this->once())->method('declareQueue');
$delayQueue->expects($this->once())->method('bind')->with('delays', 'delay_messages__120000'); $delayQueue->expects($this->once())->method('bind')->with('delays', 'delay_messages__120000_delay');
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages__120000', \AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2, 'timestamp' => time()]); $delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages__120000_delay', \AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2, 'timestamp' => time()]);
$connection->publish('{}', [], 120000); $connection->publish('{}', [], 120000);
} }
@ -690,7 +684,7 @@ class ConnectionTest extends TestCase
$connection = Connection::fromDsn('amqp://localhost', $connectionOptions, $factory); $connection = Connection::fromDsn('amqp://localhost', $connectionOptions, $factory);
$delayQueue->expects($this->once())->method('setName')->with('delay_messages_routing_key_120000'); $delayQueue->expects($this->once())->method('setName')->with('delay_messages_routing_key_120000_delay');
$delayQueue->expects($this->once())->method('setArguments')->with([ $delayQueue->expects($this->once())->method('setArguments')->with([
'x-message-ttl' => 120000, 'x-message-ttl' => 120000,
'x-expires' => 120000 + 10000, 'x-expires' => 120000 + 10000,
@ -699,9 +693,9 @@ class ConnectionTest extends TestCase
]); ]);
$delayQueue->expects($this->once())->method('declareQueue'); $delayQueue->expects($this->once())->method('declareQueue');
$delayQueue->expects($this->once())->method('bind')->with('delays', 'delay_messages_routing_key_120000'); $delayQueue->expects($this->once())->method('bind')->with('delays', 'delay_messages_routing_key_120000_delay');
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages_routing_key_120000', \AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2, 'timestamp' => time()]); $delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages_routing_key_120000_delay', \AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2, 'timestamp' => time()]);
$connection->publish('{}', [], 120000, new AmqpStamp('routing_key')); $connection->publish('{}', [], 120000, new AmqpStamp('routing_key'));
} }
@ -769,6 +763,37 @@ class ConnectionTest extends TestCase
$connection = Connection::fromDsn('amqp://localhost?confirm_timeout=0.5', [], $factory); $connection = Connection::fromDsn('amqp://localhost?confirm_timeout=0.5', [], $factory);
$connection->publish('body'); $connection->publish('body');
} }
private function createDelayOrRetryConnection(\AMQPExchange $delayExchange, string $deadLetterExchangeName, string $delayQueueName): Connection
{
$amqpConnection = $this->createMock(\AMQPConnection::class);
$amqpChannel = $this->createMock(\AMQPChannel::class);
$factory = $this->createMock(AmqpFactory::class);
$factory->method('createConnection')->willReturn($amqpConnection);
$factory->method('createChannel')->willReturn($amqpChannel);
$factory->method('createQueue')->will($this->onConsecutiveCalls(
$this->createMock(\AMQPQueue::class),
$delayQueue = $this->createMock(\AMQPQueue::class)
));
$factory->method('createExchange')->will($this->onConsecutiveCalls(
$this->createMock(\AMQPExchange::class),
$delayExchange
));
$delayQueue->expects($this->once())->method('setName')->with($delayQueueName);
$delayQueue->expects($this->once())->method('setArguments')->with([
'x-message-ttl' => 5000,
'x-expires' => 5000 + 10000,
'x-dead-letter-exchange' => $deadLetterExchangeName,
'x-dead-letter-routing-key' => '',
]);
$delayQueue->expects($this->once())->method('declareQueue');
$delayQueue->expects($this->once())->method('bind')->with('delays', $delayQueueName);
return Connection::fromDsn('amqp://localhost', [], $factory);
}
} }
class TestAmqpFactory extends AmqpFactory class TestAmqpFactory extends AmqpFactory

View File

@ -14,6 +14,7 @@ namespace Symfony\Component\Messenger\Bridge\Amqp\Transport;
use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\TransportException; use Symfony\Component\Messenger\Exception\TransportException;
use Symfony\Component\Messenger\Stamp\DelayStamp; use Symfony\Component\Messenger\Stamp\DelayStamp;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface; use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer; use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
@ -58,7 +59,11 @@ class AmqpSender implements SenderInterface
$amqpReceivedStamp = $envelope->last(AmqpReceivedStamp::class); $amqpReceivedStamp = $envelope->last(AmqpReceivedStamp::class);
if ($amqpReceivedStamp instanceof AmqpReceivedStamp) { if ($amqpReceivedStamp instanceof AmqpReceivedStamp) {
$amqpStamp = AmqpStamp::createFromAmqpEnvelope($amqpReceivedStamp->getAmqpEnvelope(), $amqpStamp); $amqpStamp = AmqpStamp::createFromAmqpEnvelope(
$amqpReceivedStamp->getAmqpEnvelope(),
$amqpStamp,
$envelope->last(RedeliveryStamp::class) ? $amqpReceivedStamp->getQueueName() : null
);
} }
try { try {

View File

@ -22,6 +22,7 @@ final class AmqpStamp implements NonSendableStampInterface
private $routingKey; private $routingKey;
private $flags; private $flags;
private $attributes; private $attributes;
private $isRetryAttempt = false;
public function __construct(string $routingKey = null, int $flags = \AMQP_NOPARAM, array $attributes = []) public function __construct(string $routingKey = null, int $flags = \AMQP_NOPARAM, array $attributes = [])
{ {
@ -45,7 +46,7 @@ final class AmqpStamp implements NonSendableStampInterface
return $this->attributes; return $this->attributes;
} }
public static function createFromAmqpEnvelope(\AMQPEnvelope $amqpEnvelope, self $previousStamp = null): self public static function createFromAmqpEnvelope(\AMQPEnvelope $amqpEnvelope, self $previousStamp = null, string $retryRoutingKey = null): self
{ {
$attr = $previousStamp->attributes ?? []; $attr = $previousStamp->attributes ?? [];
@ -62,7 +63,19 @@ final class AmqpStamp implements NonSendableStampInterface
$attr['type'] = $attr['type'] ?? $amqpEnvelope->getType(); $attr['type'] = $attr['type'] ?? $amqpEnvelope->getType();
$attr['reply_to'] = $attr['reply_to'] ?? $amqpEnvelope->getReplyTo(); $attr['reply_to'] = $attr['reply_to'] ?? $amqpEnvelope->getReplyTo();
return new self($previousStamp->routingKey ?? $amqpEnvelope->getRoutingKey(), $previousStamp->flags ?? \AMQP_NOPARAM, $attr); if (null === $retryRoutingKey) {
$stamp = new self($previousStamp->routingKey ?? $amqpEnvelope->getRoutingKey(), $previousStamp->flags ?? AMQP_NOPARAM, $attr);
} else {
$stamp = new self($retryRoutingKey, $previousStamp->flags ?? AMQP_NOPARAM, $attr);
$stamp->isRetryAttempt = true;
}
return $stamp;
}
public function isRetryAttempt(): bool
{
return $this->isRetryAttempt;
} }
public static function createWithAttributes(array $attributes, self $previousStamp = null): self public static function createWithAttributes(array $attributes, self $previousStamp = null): self

View File

@ -323,13 +323,14 @@ class Connection
private function publishWithDelay(string $body, array $headers, int $delay, AmqpStamp $amqpStamp = null) private function publishWithDelay(string $body, array $headers, int $delay, AmqpStamp $amqpStamp = null)
{ {
$routingKey = $this->getRoutingKeyForMessage($amqpStamp); $routingKey = $this->getRoutingKeyForMessage($amqpStamp);
$isRetryAttempt = $amqpStamp ? $amqpStamp->isRetryAttempt() : false;
$this->setupDelay($delay, $routingKey); $this->setupDelay($delay, $routingKey, $isRetryAttempt);
$this->publishOnExchange( $this->publishOnExchange(
$this->getDelayExchange(), $this->getDelayExchange(),
$body, $body,
$this->getRoutingKeyForDelay($delay, $routingKey), $this->getRoutingKeyForDelay($delay, $routingKey, $isRetryAttempt),
$headers, $headers,
$amqpStamp $amqpStamp
); );
@ -354,15 +355,15 @@ class Connection
} }
} }
private function setupDelay(int $delay, ?string $routingKey) private function setupDelay(int $delay, ?string $routingKey, bool $isRetryAttempt)
{ {
if ($this->autoSetup) { if ($this->autoSetup) {
$this->setup(); // setup delay exchange and normal exchange for delay queue to DLX messages to $this->setup(); // setup delay exchange and normal exchange for delay queue to DLX messages to
} }
$queue = $this->createDelayQueue($delay, $routingKey); $queue = $this->createDelayQueue($delay, $routingKey, $isRetryAttempt);
$queue->declareQueue(); // the delay queue always need to be declared because the name is dynamic and cannot be declared in advance $queue->declareQueue(); // the delay queue always need to be declared because the name is dynamic and cannot be declared in advance
$queue->bind($this->connectionOptions['delay']['exchange_name'], $this->getRoutingKeyForDelay($delay, $routingKey)); $queue->bind($this->connectionOptions['delay']['exchange_name'], $this->getRoutingKeyForDelay($delay, $routingKey, $isRetryAttempt));
} }
private function getDelayExchange(): \AMQPExchange private function getDelayExchange(): \AMQPExchange
@ -386,21 +387,19 @@ class Connection
* which is the original exchange, resulting on it being put back into * which is the original exchange, resulting on it being put back into
* the original queue. * the original queue.
*/ */
private function createDelayQueue(int $delay, ?string $routingKey): \AMQPQueue private function createDelayQueue(int $delay, ?string $routingKey, bool $isRetryAttempt): \AMQPQueue
{ {
$queue = $this->amqpFactory->createQueue($this->channel()); $queue = $this->amqpFactory->createQueue($this->channel());
$queue->setName(str_replace( $queue->setName($this->getRoutingKeyForDelay($delay, $routingKey, $isRetryAttempt));
['%delay%', '%exchange_name%', '%routing_key%'],
[$delay, $this->exchangeOptions['name'], $routingKey ?? ''],
$this->connectionOptions['delay']['queue_name_pattern']
));
$queue->setFlags(\AMQP_DURABLE); $queue->setFlags(\AMQP_DURABLE);
$queue->setArguments([ $queue->setArguments([
'x-message-ttl' => $delay, 'x-message-ttl' => $delay,
// delete the delay queue 10 seconds after the message expires // delete the delay queue 10 seconds after the message expires
// publishing another message redeclares the queue which renews the lease // publishing another message redeclares the queue which renews the lease
'x-expires' => $delay + 10000, 'x-expires' => $delay + 10000,
'x-dead-letter-exchange' => $this->exchangeOptions['name'], // message should be broadcasted to all consumers during delay, but to only one queue during retry
// empty name is default direct exchange
'x-dead-letter-exchange' => $isRetryAttempt ? '' : $this->exchangeOptions['name'],
// after being released from to DLX, make sure the original routing key will be used // after being released from to DLX, make sure the original routing key will be used
// we must use an empty string instead of null for the argument to be picked up // we must use an empty string instead of null for the argument to be picked up
'x-dead-letter-routing-key' => $routingKey ?? '', 'x-dead-letter-routing-key' => $routingKey ?? '',
@ -409,13 +408,15 @@ class Connection
return $queue; return $queue;
} }
private function getRoutingKeyForDelay(int $delay, ?string $finalRoutingKey): string private function getRoutingKeyForDelay(int $delay, ?string $finalRoutingKey, bool $isRetryAttempt): string
{ {
$action = $isRetryAttempt ? '_retry' : '_delay';
return str_replace( return str_replace(
['%delay%', '%exchange_name%', '%routing_key%'], ['%delay%', '%exchange_name%', '%routing_key%'],
[$delay, $this->exchangeOptions['name'], $finalRoutingKey ?? ''], [$delay, $this->exchangeOptions['name'], $finalRoutingKey ?? ''],
$this->connectionOptions['delay']['queue_name_pattern'] $this->connectionOptions['delay']['queue_name_pattern']
); ).$action;
} }
/** /**