fixing a bug where the delay queue name did not contain the final routing key

This commit is contained in:
Ryan Weaver 2019-05-01 15:33:07 -04:00
parent bec45edbea
commit 9940e71ae1
2 changed files with 80 additions and 26 deletions

View File

@ -19,7 +19,9 @@ use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceivedStamp;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceiver;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpSender;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpStamp;
use Symfony\Component\Messenger\Transport\AmqpExt\Connection;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Process\PhpProcess;
@ -84,8 +86,10 @@ class AmqpExtIntegrationTest extends TestCase
$sender = new AmqpSender($connection, $serializer);
$receiver = new AmqpReceiver($connection, $serializer);
// send a first message
$sender->send($first = new Envelope(new DummyMessage('First')));
// receive it immediately and imitate a redeliver with 2 second delay
$envelopes = iterator_to_array($receiver->get());
/** @var Envelope $envelope */
$envelope = $envelopes[0];
@ -95,25 +99,36 @@ class AmqpExtIntegrationTest extends TestCase
$sender->send($newEnvelope);
$receiver->ack($envelope);
$envelopes = [];
$startTime = time();
// wait for next message, but only for max 3 seconds
while (0 === \count($envelopes) && $startTime + 3 > time()) {
$envelopes = iterator_to_array($receiver->get());
}
// send a 2nd message with a shorter delay and custom routing key
$customRoutingKeyMessage = new DummyMessage('custom routing key');
$envelopeCustomRoutingKey = new Envelope($customRoutingKeyMessage, [
new DelayStamp(1000),
new AmqpStamp('my_custom_routing_key'),
]);
$sender->send($envelopeCustomRoutingKey);
// wait for next message (but max at 3 seconds)
$startTime = microtime(true);
$envelopes = $this->receiveEnvelopes($receiver, 3);
// duration should be about 1 second
$this->assertApproximateDuration($startTime, 1);
// this should be the custom routing key message first
$this->assertCount(1, $envelopes);
/** @var Envelope $envelope */
$envelope = $envelopes[0];
/* @var Envelope $envelope */
$receiver->ack($envelopes[0]);
$this->assertEquals($customRoutingKeyMessage, $envelopes[0]->getMessage());
// should have a 2 second delay
$this->assertGreaterThanOrEqual($startTime + 2, time());
// but only a 2 second delay
$this->assertLessThan($startTime + 4, time());
// wait for final message (but max at 3 seconds)
$envelopes = $this->receiveEnvelopes($receiver, 3);
// duration should be about 2 seconds
$this->assertApproximateDuration($startTime, 2);
/** @var RedeliveryStamp|null $retryStamp */
/* @var RedeliveryStamp|null $retryStamp */
// verify the stamp still exists from the last send
$retryStamp = $envelope->last(RedeliveryStamp::class);
$this->assertCount(1, $envelopes);
$retryStamp = $envelopes[0]->last(RedeliveryStamp::class);
$this->assertNotNull($retryStamp);
$this->assertSame(1, $retryStamp->getRetryCount());
@ -206,4 +221,29 @@ TXT
new SerializerComponent\Serializer([new ObjectNormalizer(), new ArrayDenormalizer()], ['json' => new JsonEncoder()])
);
}
private function assertApproximateDuration($startTime, int $expectedDuration)
{
$actualDuration = microtime(true) - $startTime;
if (\method_exists([$this, 'assertEqualsWithDelta'])) {
$this->assertEqualsWithDelta($expectedDuration, $actualDuration, 'Duration was not within expected range', .5);
} else {
$this->assertEquals($expectedDuration, $actualDuration, 'Duration was not within expected range', .5);
}
}
/**
* @return Envelope[]
*/
private function receiveEnvelopes(ReceiverInterface $receiver, int $timeout): array
{
$envelopes = [];
$startTime = microtime(true);
while (0 === \count($envelopes) && $startTime + $timeout > time()) {
$envelopes = iterator_to_array($receiver->get());
}
return $envelopes;
}
}

View File

@ -79,8 +79,8 @@ class Connection
* * flags: Exchange flags (Default: AMQP_DURABLE)
* * arguments: Extra arguments
* * delay:
* * routing_key_pattern: The pattern of the routing key (Default: "delay_%delay%")
* * queue_name_pattern: Pattern to use to create the queues (Default: "delay_queue_%delay%")
* * routing_key_pattern: The pattern of the routing key (Default: "delay_%routing_key%_%delay%")
* * queue_name_pattern: Pattern to use to create the queues (Default: "delay_queue_%routing_key%_%delay%")
* * exchange_name: Name of the exchange to be used for the retried messages (Default: "retry")
* * auto_setup: Enable or not the auto-setup of queues and exchanges (Default: true)
* * loop_sleep: Amount of micro-seconds to wait if no message are available (Default: 200000)
@ -90,9 +90,9 @@ class Connection
{
$this->connectionOptions = array_replace_recursive([
'delay' => [
'routing_key_pattern' => 'delay_%delay%',
'routing_key_pattern' => 'delay_%routing_key%_%delay%',
'exchange_name' => 'delay',
'queue_name_pattern' => 'delay_queue_%delay%',
'queue_name_pattern' => 'delay_queue_%routing_key%_%delay%',
],
], $connectionOptions);
$this->exchangeOptions = $exchangeOptions;
@ -186,7 +186,7 @@ class Connection
$this->publishOnExchange(
$this->exchange(),
$body,
(null !== $amqpStamp ? $amqpStamp->getRoutingKey() : null) ?? $this->getDefaultPublishRoutingKey(),
$this->getRoutingKeyForMessage($amqpStamp),
[
'headers' => $headers,
],
@ -209,14 +209,16 @@ class Connection
*/
private function publishWithDelay(string $body, array $headers, int $delay, AmqpStamp $amqpStamp = null)
{
$routingKey = $this->getRoutingKeyForMessage($amqpStamp);
if ($this->shouldSetup()) {
$this->setupDelay($delay, null !== $amqpStamp ? $amqpStamp->getRoutingKey() : null);
$this->setupDelay($delay, $routingKey);
}
$this->publishOnExchange(
$this->getDelayExchange(),
$body,
$this->getRoutingKeyForDelay($delay),
$this->getRoutingKeyForDelay($delay, $routingKey),
[
'headers' => $headers,
],
@ -245,7 +247,7 @@ class Connection
$queue = $this->createDelayQueue($delay, $routingKey);
$queue->declareQueue();
$queue->bind($exchange->getName(), $this->getRoutingKeyForDelay($delay));
$queue->bind($exchange->getName(), $this->getRoutingKeyForDelay($delay, $routingKey));
}
private function getDelayExchange(): \AMQPExchange
@ -271,13 +273,16 @@ class Connection
private function createDelayQueue(int $delay, ?string $routingKey)
{
$queue = $this->amqpFactory->createQueue($this->channel());
$queue->setName(str_replace('%delay%', $delay, $this->connectionOptions['delay']['queue_name_pattern']));
$queue->setName(str_replace(
['%delay%', '%routing_key%'],
[$delay, $routingKey ?: ''],
$this->connectionOptions['delay']['queue_name_pattern']
));
$queue->setArguments([
'x-message-ttl' => $delay,
'x-dead-letter-exchange' => $this->exchange()->getName(),
]);
$routingKey = $routingKey ?? $this->getDefaultPublishRoutingKey();
if (null !== $routingKey) {
// after being released from to DLX, this routing key will be used
$queue->setArgument('x-dead-letter-routing-key', $routingKey);
@ -286,9 +291,13 @@ class Connection
return $queue;
}
private function getRoutingKeyForDelay(int $delay): string
private function getRoutingKeyForDelay(int $delay, ?string $finalRoutingKey): string
{
return str_replace('%delay%', $delay, $this->connectionOptions['delay']['routing_key_pattern']);
return str_replace(
['%delay%', '%routing_key%'],
[$delay, $finalRoutingKey ?: ''],
$this->connectionOptions['delay']['routing_key_pattern']
);
}
/**
@ -444,4 +453,9 @@ class Connection
$this->queue($queueName)->purge();
}
}
private function getRoutingKeyForMessage(?AmqpStamp $amqpStamp): ?string
{
return (null !== $amqpStamp ? $amqpStamp->getRoutingKey() : null) ?? $this->getDefaultPublishRoutingKey();
}
}