bug #31355 [Messenger] Adding final routing key to delay queue name (weaverryan)

This PR was merged into the 4.3-dev branch.

Discussion
----------

[Messenger] Adding final routing key to delay queue name

| Q             | A
| ------------- | ---
| Branch?       | master
| Bug fix?      | yes
| New feature?  | no
| BC breaks?    | no
| Deprecations? | no
| Tests pass?   | yes
| Fixed tickets | #31241
| License       | MIT
| Doc PR        | n/a

Fixes #31241.

When we delay, we create a queue whose `x-message-ttl` matches the delay length and `x-dead-letter-routing-key` matches the original routing key used for the message. However, before this PR, the original routing key was not part of that queue's name. The result is that if two messages were delayed by the same length, but with different routing keys, the second would try to "redeclare" the existing delay queue with a new `x-dead-letter-routing-key`, resulting in an error similar to:

> Server channel error: 406, message: PRECONDITION_FAILED - inequivalent arg 'x-dead-letter-routing-key' for queue 'delay_queue_1000'

Integration test was improved to catch this.

Cheers!

Commits
-------

9940e71ae1 fixing a bug where the delay queue name did not contain the final routing key
This commit is contained in:
Fabien Potencier 2019-05-07 12:54:07 +02:00
commit 224ab703c1
2 changed files with 80 additions and 26 deletions

View File

@ -19,7 +19,9 @@ use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceivedStamp;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceiver;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpSender;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpStamp;
use Symfony\Component\Messenger\Transport\AmqpExt\Connection;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Process\PhpProcess;
@ -84,8 +86,10 @@ class AmqpExtIntegrationTest extends TestCase
$sender = new AmqpSender($connection, $serializer);
$receiver = new AmqpReceiver($connection, $serializer);
// send a first message
$sender->send($first = new Envelope(new DummyMessage('First')));
// receive it immediately and imitate a redeliver with 2 second delay
$envelopes = iterator_to_array($receiver->get());
/** @var Envelope $envelope */
$envelope = $envelopes[0];
@ -95,25 +99,36 @@ class AmqpExtIntegrationTest extends TestCase
$sender->send($newEnvelope);
$receiver->ack($envelope);
$envelopes = [];
$startTime = time();
// wait for next message, but only for max 3 seconds
while (0 === \count($envelopes) && $startTime + 3 > time()) {
$envelopes = iterator_to_array($receiver->get());
}
// send a 2nd message with a shorter delay and custom routing key
$customRoutingKeyMessage = new DummyMessage('custom routing key');
$envelopeCustomRoutingKey = new Envelope($customRoutingKeyMessage, [
new DelayStamp(1000),
new AmqpStamp('my_custom_routing_key'),
]);
$sender->send($envelopeCustomRoutingKey);
// wait for next message (but max at 3 seconds)
$startTime = microtime(true);
$envelopes = $this->receiveEnvelopes($receiver, 3);
// duration should be about 1 second
$this->assertApproximateDuration($startTime, 1);
// this should be the custom routing key message first
$this->assertCount(1, $envelopes);
/** @var Envelope $envelope */
$envelope = $envelopes[0];
/* @var Envelope $envelope */
$receiver->ack($envelopes[0]);
$this->assertEquals($customRoutingKeyMessage, $envelopes[0]->getMessage());
// should have a 2 second delay
$this->assertGreaterThanOrEqual($startTime + 2, time());
// but only a 2 second delay
$this->assertLessThan($startTime + 4, time());
// wait for final message (but max at 3 seconds)
$envelopes = $this->receiveEnvelopes($receiver, 3);
// duration should be about 2 seconds
$this->assertApproximateDuration($startTime, 2);
/** @var RedeliveryStamp|null $retryStamp */
/* @var RedeliveryStamp|null $retryStamp */
// verify the stamp still exists from the last send
$retryStamp = $envelope->last(RedeliveryStamp::class);
$this->assertCount(1, $envelopes);
$retryStamp = $envelopes[0]->last(RedeliveryStamp::class);
$this->assertNotNull($retryStamp);
$this->assertSame(1, $retryStamp->getRetryCount());
@ -206,4 +221,29 @@ TXT
new SerializerComponent\Serializer([new ObjectNormalizer(), new ArrayDenormalizer()], ['json' => new JsonEncoder()])
);
}
private function assertApproximateDuration($startTime, int $expectedDuration)
{
$actualDuration = microtime(true) - $startTime;
if (\method_exists([$this, 'assertEqualsWithDelta'])) {
$this->assertEqualsWithDelta($expectedDuration, $actualDuration, 'Duration was not within expected range', .5);
} else {
$this->assertEquals($expectedDuration, $actualDuration, 'Duration was not within expected range', .5);
}
}
/**
* @return Envelope[]
*/
private function receiveEnvelopes(ReceiverInterface $receiver, int $timeout): array
{
$envelopes = [];
$startTime = microtime(true);
while (0 === \count($envelopes) && $startTime + $timeout > time()) {
$envelopes = iterator_to_array($receiver->get());
}
return $envelopes;
}
}

View File

@ -79,8 +79,8 @@ class Connection
* * flags: Exchange flags (Default: AMQP_DURABLE)
* * arguments: Extra arguments
* * delay:
* * routing_key_pattern: The pattern of the routing key (Default: "delay_%delay%")
* * queue_name_pattern: Pattern to use to create the queues (Default: "delay_queue_%delay%")
* * routing_key_pattern: The pattern of the routing key (Default: "delay_%routing_key%_%delay%")
* * queue_name_pattern: Pattern to use to create the queues (Default: "delay_queue_%routing_key%_%delay%")
* * exchange_name: Name of the exchange to be used for the retried messages (Default: "retry")
* * auto_setup: Enable or not the auto-setup of queues and exchanges (Default: true)
* * loop_sleep: Amount of micro-seconds to wait if no message are available (Default: 200000)
@ -90,9 +90,9 @@ class Connection
{
$this->connectionOptions = array_replace_recursive([
'delay' => [
'routing_key_pattern' => 'delay_%delay%',
'routing_key_pattern' => 'delay_%routing_key%_%delay%',
'exchange_name' => 'delay',
'queue_name_pattern' => 'delay_queue_%delay%',
'queue_name_pattern' => 'delay_queue_%routing_key%_%delay%',
],
], $connectionOptions);
$this->exchangeOptions = $exchangeOptions;
@ -186,7 +186,7 @@ class Connection
$this->publishOnExchange(
$this->exchange(),
$body,
(null !== $amqpStamp ? $amqpStamp->getRoutingKey() : null) ?? $this->getDefaultPublishRoutingKey(),
$this->getRoutingKeyForMessage($amqpStamp),
[
'headers' => $headers,
],
@ -209,14 +209,16 @@ class Connection
*/
private function publishWithDelay(string $body, array $headers, int $delay, AmqpStamp $amqpStamp = null)
{
$routingKey = $this->getRoutingKeyForMessage($amqpStamp);
if ($this->shouldSetup()) {
$this->setupDelay($delay, null !== $amqpStamp ? $amqpStamp->getRoutingKey() : null);
$this->setupDelay($delay, $routingKey);
}
$this->publishOnExchange(
$this->getDelayExchange(),
$body,
$this->getRoutingKeyForDelay($delay),
$this->getRoutingKeyForDelay($delay, $routingKey),
[
'headers' => $headers,
],
@ -245,7 +247,7 @@ class Connection
$queue = $this->createDelayQueue($delay, $routingKey);
$queue->declareQueue();
$queue->bind($exchange->getName(), $this->getRoutingKeyForDelay($delay));
$queue->bind($exchange->getName(), $this->getRoutingKeyForDelay($delay, $routingKey));
}
private function getDelayExchange(): \AMQPExchange
@ -271,13 +273,16 @@ class Connection
private function createDelayQueue(int $delay, ?string $routingKey)
{
$queue = $this->amqpFactory->createQueue($this->channel());
$queue->setName(str_replace('%delay%', $delay, $this->connectionOptions['delay']['queue_name_pattern']));
$queue->setName(str_replace(
['%delay%', '%routing_key%'],
[$delay, $routingKey ?: ''],
$this->connectionOptions['delay']['queue_name_pattern']
));
$queue->setArguments([
'x-message-ttl' => $delay,
'x-dead-letter-exchange' => $this->exchange()->getName(),
]);
$routingKey = $routingKey ?? $this->getDefaultPublishRoutingKey();
if (null !== $routingKey) {
// after being released from to DLX, this routing key will be used
$queue->setArgument('x-dead-letter-routing-key', $routingKey);
@ -286,9 +291,13 @@ class Connection
return $queue;
}
private function getRoutingKeyForDelay(int $delay): string
private function getRoutingKeyForDelay(int $delay, ?string $finalRoutingKey): string
{
return str_replace('%delay%', $delay, $this->connectionOptions['delay']['routing_key_pattern']);
return str_replace(
['%delay%', '%routing_key%'],
[$delay, $finalRoutingKey ?: ''],
$this->connectionOptions['delay']['routing_key_pattern']
);
}
/**
@ -444,4 +453,9 @@ class Connection
$this->queue($queueName)->purge();
}
}
private function getRoutingKeyForMessage(?AmqpStamp $amqpStamp): ?string
{
return (null !== $amqpStamp ? $amqpStamp->getRoutingKey() : null) ?? $this->getDefaultPublishRoutingKey();
}
}