feature #36864 [Messenger] Ability to distinguish retry and delay actions (theravel)

This PR was squashed before being merged into the 5.3-dev branch.

Discussion
----------

[Messenger] Ability to distinguish retry and delay actions

Added ability to distinguish retry and delay actions so that different "x-dead-letter-exchange" exchange name will be used in different scenarios.

| Q             | A
| ------------- | ---
| Branch?       | 5.x
| Bug fix?      | no
| New feature?  | yes
| Deprecations? | no
| Tickets       | -
| License       | MIT
| Doc PR        |

This is a bug which existed since v4.4. The following scenario is possible:

- There are two queues: `A` and `B`, both are bound to the same routing key via "topic" exchange (two different applications for example).
- A message is published to this routing key to "topic" exchange.
- Consumer of queue `A` handles it correctly and acknowledges the message.
- Consumer of queue `B` throws and exception and message goes to retry (for example to queue `delay_delays_key_5`).
- Once message expired in `delay_delays_key_5`, it is delivered again to both `A` and `B` (**again** consumed by consumer `A`).

Expected: behavior of consumer `B` should not cause message duplication to queue `A`.

It is required to make a change of name of temporary delay queue (otherwise "delay" and "retry" queues have incompatible declaration arguments). I left `queue_name_pattern` as is to keep settings of connection backward compatible, but changed internals of queue name construction.

Commits
-------

417aaab6ee [Messenger] Ability to distinguish retry and delay actions
This commit is contained in:
Nyholm 2021-05-10 22:46:16 +02:00
commit 649d115f0a
No known key found for this signature in database
GPG Key ID: D6332DE2B6F8FA38
6 changed files with 150 additions and 51 deletions

View File

@ -6,6 +6,7 @@ CHANGELOG
* Deprecated the `prefetch_count` parameter, it has no effect and will be removed in Symfony 6.0.
* `AmqpReceiver` implements `QueueReceiverInterface` to fetch messages from a specific set of queues.
* Add ability to distinguish retry and delay actions
5.2.0
-----

View File

@ -140,6 +140,45 @@ class AmqpExtIntegrationTest extends TestCase
$receiver->ack($envelope);
}
public function testRetryAffectsOnlyOriginalQueue()
{
$connection = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN'), [
'exchange' => [
'name' => 'messages_topic',
'type' => 'topic',
'default_publish_routing_key' => 'topic_routing_key',
],
'queues' => [
'A' => ['binding_keys' => ['topic_routing_key']],
'B' => ['binding_keys' => ['topic_routing_key']],
],
]);
$connection->setup();
$connection->purgeQueues();
$serializer = $this->createSerializer();
$sender = new AmqpSender($connection, $serializer);
$receiver = new AmqpReceiver($connection, $serializer);
// initial delivery: should receive in both queues
$sender->send(new Envelope(new DummyMessage('Payload')));
$receivedEnvelopes = $this->receiveWithQueueName($receiver);
$this->assertCount(2, $receivedEnvelopes);
$this->assertArrayHasKey('A', $receivedEnvelopes);
$this->assertArrayHasKey('B', $receivedEnvelopes);
// retry: should receive in only "A" queue
$retryEnvelope = $receivedEnvelopes['A']
->with(new DelayStamp(10))
->with(new RedeliveryStamp(1));
$sender->send($retryEnvelope);
$retriedEnvelopes = $this->receiveWithQueueName($receiver);
$this->assertCount(1, $retriedEnvelopes);
$this->assertArrayHasKey('A', $retriedEnvelopes);
}
public function testItReceivesSignals()
{
$serializer = $this->createSerializer();
@ -255,4 +294,19 @@ TXT
return $envelopes;
}
private function receiveWithQueueName(AmqpReceiver $receiver)
{
// let RabbitMQ receive messages
usleep(100 * 1000); // 100ms
$receivedEnvelopes = [];
foreach ($receiver->get() as $envelope) {
$queueName = $envelope->last(AmqpReceivedStamp::class)->getQueueName();
$receivedEnvelopes[$queueName] = $envelope;
$receiver->ack($envelope);
}
return $receivedEnvelopes;
}
}

View File

@ -493,38 +493,32 @@ class ConnectionTest extends TestCase
public function testItDelaysTheMessage()
{
$amqpConnection = $this->createMock(\AMQPConnection::class);
$amqpChannel = $this->createMock(\AMQPChannel::class);
$factory = $this->createMock(AmqpFactory::class);
$factory->method('createConnection')->willReturn($amqpConnection);
$factory->method('createChannel')->willReturn($amqpChannel);
$factory->method('createQueue')->will($this->onConsecutiveCalls(
$this->createMock(\AMQPQueue::class),
$delayQueue = $this->createMock(\AMQPQueue::class)
));
$factory->method('createExchange')->will($this->onConsecutiveCalls(
$this->createMock(\AMQPExchange::class),
$delayExchange = $this->createMock(\AMQPExchange::class)
));
$delayQueue->expects($this->once())->method('setName')->with('delay_messages__5000');
$delayQueue->expects($this->once())->method('setArguments')->with([
'x-message-ttl' => 5000,
'x-expires' => 5000 + 10000,
'x-dead-letter-exchange' => self::DEFAULT_EXCHANGE_NAME,
'x-dead-letter-routing-key' => '',
$delayExchange = $this->createMock(\AMQPExchange::class);
$delayExchange->expects($this->once())
->method('publish')
->with('{}', 'delay_messages__5000_delay', AMQP_NOPARAM, [
'headers' => ['x-some-headers' => 'foo'],
'delivery_mode' => 2,
'timestamp' => time(),
]);
$connection = $this->createDelayOrRetryConnection($delayExchange, self::DEFAULT_EXCHANGE_NAME, 'delay_messages__5000_delay');
$delayQueue->expects($this->once())->method('declareQueue');
$delayQueue->expects($this->once())->method('bind')->with('delays', 'delay_messages__5000');
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages__5000', \AMQP_NOPARAM, ['headers' => ['x-some-headers' => 'foo'], 'delivery_mode' => 2, 'timestamp' => time()]);
$connection = Connection::fromDsn('amqp://localhost', [], $factory);
$connection->publish('{}', ['x-some-headers' => 'foo'], 5000);
}
public function testItRetriesTheMessage()
{
$delayExchange = $this->createMock(\AMQPExchange::class);
$delayExchange->expects($this->once())
->method('publish')
->with('{}', 'delay_messages__5000_retry', AMQP_NOPARAM);
$connection = $this->createDelayOrRetryConnection($delayExchange, '', 'delay_messages__5000_retry');
$amqpEnvelope = $this->createMock(\AMQPEnvelope::class);
$amqpStamp = AmqpStamp::createFromAmqpEnvelope($amqpEnvelope, null, '');
$connection->publish('{}', [], 5000, $amqpStamp);
}
public function testItDelaysTheMessageWithADifferentRoutingKeyAndTTLs()
{
$amqpConnection = $this->createMock(\AMQPConnection::class);
@ -550,7 +544,7 @@ class ConnectionTest extends TestCase
$connection = Connection::fromDsn('amqp://localhost', $connectionOptions, $factory);
$delayQueue->expects($this->once())->method('setName')->with('delay_messages__120000');
$delayQueue->expects($this->once())->method('setName')->with('delay_messages__120000_delay');
$delayQueue->expects($this->once())->method('setArguments')->with([
'x-message-ttl' => 120000,
'x-expires' => 120000 + 10000,
@ -559,9 +553,9 @@ class ConnectionTest extends TestCase
]);
$delayQueue->expects($this->once())->method('declareQueue');
$delayQueue->expects($this->once())->method('bind')->with('delays', 'delay_messages__120000');
$delayQueue->expects($this->once())->method('bind')->with('delays', 'delay_messages__120000_delay');
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages__120000', \AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2, 'timestamp' => time()]);
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages__120000_delay', \AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2, 'timestamp' => time()]);
$connection->publish('{}', [], 120000);
}
@ -690,7 +684,7 @@ class ConnectionTest extends TestCase
$connection = Connection::fromDsn('amqp://localhost', $connectionOptions, $factory);
$delayQueue->expects($this->once())->method('setName')->with('delay_messages_routing_key_120000');
$delayQueue->expects($this->once())->method('setName')->with('delay_messages_routing_key_120000_delay');
$delayQueue->expects($this->once())->method('setArguments')->with([
'x-message-ttl' => 120000,
'x-expires' => 120000 + 10000,
@ -699,9 +693,9 @@ class ConnectionTest extends TestCase
]);
$delayQueue->expects($this->once())->method('declareQueue');
$delayQueue->expects($this->once())->method('bind')->with('delays', 'delay_messages_routing_key_120000');
$delayQueue->expects($this->once())->method('bind')->with('delays', 'delay_messages_routing_key_120000_delay');
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages_routing_key_120000', \AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2, 'timestamp' => time()]);
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages_routing_key_120000_delay', \AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2, 'timestamp' => time()]);
$connection->publish('{}', [], 120000, new AmqpStamp('routing_key'));
}
@ -769,6 +763,37 @@ class ConnectionTest extends TestCase
$connection = Connection::fromDsn('amqp://localhost?confirm_timeout=0.5', [], $factory);
$connection->publish('body');
}
private function createDelayOrRetryConnection(\AMQPExchange $delayExchange, string $deadLetterExchangeName, string $delayQueueName): Connection
{
$amqpConnection = $this->createMock(\AMQPConnection::class);
$amqpChannel = $this->createMock(\AMQPChannel::class);
$factory = $this->createMock(AmqpFactory::class);
$factory->method('createConnection')->willReturn($amqpConnection);
$factory->method('createChannel')->willReturn($amqpChannel);
$factory->method('createQueue')->will($this->onConsecutiveCalls(
$this->createMock(\AMQPQueue::class),
$delayQueue = $this->createMock(\AMQPQueue::class)
));
$factory->method('createExchange')->will($this->onConsecutiveCalls(
$this->createMock(\AMQPExchange::class),
$delayExchange
));
$delayQueue->expects($this->once())->method('setName')->with($delayQueueName);
$delayQueue->expects($this->once())->method('setArguments')->with([
'x-message-ttl' => 5000,
'x-expires' => 5000 + 10000,
'x-dead-letter-exchange' => $deadLetterExchangeName,
'x-dead-letter-routing-key' => '',
]);
$delayQueue->expects($this->once())->method('declareQueue');
$delayQueue->expects($this->once())->method('bind')->with('delays', $delayQueueName);
return Connection::fromDsn('amqp://localhost', [], $factory);
}
}
class TestAmqpFactory extends AmqpFactory

View File

@ -14,6 +14,7 @@ namespace Symfony\Component\Messenger\Bridge\Amqp\Transport;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\TransportException;
use Symfony\Component\Messenger\Stamp\DelayStamp;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
@ -58,7 +59,11 @@ class AmqpSender implements SenderInterface
$amqpReceivedStamp = $envelope->last(AmqpReceivedStamp::class);
if ($amqpReceivedStamp instanceof AmqpReceivedStamp) {
$amqpStamp = AmqpStamp::createFromAmqpEnvelope($amqpReceivedStamp->getAmqpEnvelope(), $amqpStamp);
$amqpStamp = AmqpStamp::createFromAmqpEnvelope(
$amqpReceivedStamp->getAmqpEnvelope(),
$amqpStamp,
$envelope->last(RedeliveryStamp::class) ? $amqpReceivedStamp->getQueueName() : null
);
}
try {

View File

@ -22,6 +22,7 @@ final class AmqpStamp implements NonSendableStampInterface
private $routingKey;
private $flags;
private $attributes;
private $isRetryAttempt = false;
public function __construct(string $routingKey = null, int $flags = \AMQP_NOPARAM, array $attributes = [])
{
@ -45,7 +46,7 @@ final class AmqpStamp implements NonSendableStampInterface
return $this->attributes;
}
public static function createFromAmqpEnvelope(\AMQPEnvelope $amqpEnvelope, self $previousStamp = null): self
public static function createFromAmqpEnvelope(\AMQPEnvelope $amqpEnvelope, self $previousStamp = null, string $retryRoutingKey = null): self
{
$attr = $previousStamp->attributes ?? [];
@ -62,7 +63,19 @@ final class AmqpStamp implements NonSendableStampInterface
$attr['type'] = $attr['type'] ?? $amqpEnvelope->getType();
$attr['reply_to'] = $attr['reply_to'] ?? $amqpEnvelope->getReplyTo();
return new self($previousStamp->routingKey ?? $amqpEnvelope->getRoutingKey(), $previousStamp->flags ?? \AMQP_NOPARAM, $attr);
if (null === $retryRoutingKey) {
$stamp = new self($previousStamp->routingKey ?? $amqpEnvelope->getRoutingKey(), $previousStamp->flags ?? AMQP_NOPARAM, $attr);
} else {
$stamp = new self($retryRoutingKey, $previousStamp->flags ?? AMQP_NOPARAM, $attr);
$stamp->isRetryAttempt = true;
}
return $stamp;
}
public function isRetryAttempt(): bool
{
return $this->isRetryAttempt;
}
public static function createWithAttributes(array $attributes, self $previousStamp = null): self

View File

@ -323,13 +323,14 @@ class Connection
private function publishWithDelay(string $body, array $headers, int $delay, AmqpStamp $amqpStamp = null)
{
$routingKey = $this->getRoutingKeyForMessage($amqpStamp);
$isRetryAttempt = $amqpStamp ? $amqpStamp->isRetryAttempt() : false;
$this->setupDelay($delay, $routingKey);
$this->setupDelay($delay, $routingKey, $isRetryAttempt);
$this->publishOnExchange(
$this->getDelayExchange(),
$body,
$this->getRoutingKeyForDelay($delay, $routingKey),
$this->getRoutingKeyForDelay($delay, $routingKey, $isRetryAttempt),
$headers,
$amqpStamp
);
@ -354,15 +355,15 @@ class Connection
}
}
private function setupDelay(int $delay, ?string $routingKey)
private function setupDelay(int $delay, ?string $routingKey, bool $isRetryAttempt)
{
if ($this->autoSetupDelayExchange) {
$this->setupDelayExchange();
}
$queue = $this->createDelayQueue($delay, $routingKey);
$queue = $this->createDelayQueue($delay, $routingKey, $isRetryAttempt);
$queue->declareQueue(); // the delay queue always need to be declared because the name is dynamic and cannot be declared in advance
$queue->bind($this->connectionOptions['delay']['exchange_name'], $this->getRoutingKeyForDelay($delay, $routingKey));
$queue->bind($this->connectionOptions['delay']['exchange_name'], $this->getRoutingKeyForDelay($delay, $routingKey, $isRetryAttempt));
}
private function getDelayExchange(): \AMQPExchange
@ -386,21 +387,19 @@ class Connection
* which is the original exchange, resulting on it being put back into
* the original queue.
*/
private function createDelayQueue(int $delay, ?string $routingKey): \AMQPQueue
private function createDelayQueue(int $delay, ?string $routingKey, bool $isRetryAttempt): \AMQPQueue
{
$queue = $this->amqpFactory->createQueue($this->channel());
$queue->setName(str_replace(
['%delay%', '%exchange_name%', '%routing_key%'],
[$delay, $this->exchangeOptions['name'], $routingKey ?? ''],
$this->connectionOptions['delay']['queue_name_pattern']
));
$queue->setName($this->getRoutingKeyForDelay($delay, $routingKey, $isRetryAttempt));
$queue->setFlags(\AMQP_DURABLE);
$queue->setArguments([
'x-message-ttl' => $delay,
// delete the delay queue 10 seconds after the message expires
// publishing another message redeclares the queue which renews the lease
'x-expires' => $delay + 10000,
'x-dead-letter-exchange' => $this->exchangeOptions['name'],
// message should be broadcasted to all consumers during delay, but to only one queue during retry
// empty name is default direct exchange
'x-dead-letter-exchange' => $isRetryAttempt ? '' : $this->exchangeOptions['name'],
// after being released from to DLX, make sure the original routing key will be used
// we must use an empty string instead of null for the argument to be picked up
'x-dead-letter-routing-key' => $routingKey ?? '',
@ -409,13 +408,15 @@ class Connection
return $queue;
}
private function getRoutingKeyForDelay(int $delay, ?string $finalRoutingKey): string
private function getRoutingKeyForDelay(int $delay, ?string $finalRoutingKey, bool $isRetryAttempt): string
{
$action = $isRetryAttempt ? '_retry' : '_delay';
return str_replace(
['%delay%', '%exchange_name%', '%routing_key%'],
[$delay, $this->exchangeOptions['name'], $finalRoutingKey ?? ''],
$this->connectionOptions['delay']['queue_name_pattern']
);
).$action;
}
/**