fixing a bug where the delay queue name did not contain the final routing key
This commit is contained in:
parent
bec45edbea
commit
9940e71ae1
@ -19,7 +19,9 @@ use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
||||
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceivedStamp;
|
||||
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceiver;
|
||||
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpSender;
|
||||
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpStamp;
|
||||
use Symfony\Component\Messenger\Transport\AmqpExt\Connection;
|
||||
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
|
||||
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
|
||||
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
|
||||
use Symfony\Component\Process\PhpProcess;
|
||||
@ -84,8 +86,10 @@ class AmqpExtIntegrationTest extends TestCase
|
||||
$sender = new AmqpSender($connection, $serializer);
|
||||
$receiver = new AmqpReceiver($connection, $serializer);
|
||||
|
||||
// send a first message
|
||||
$sender->send($first = new Envelope(new DummyMessage('First')));
|
||||
|
||||
// receive it immediately and imitate a redeliver with 2 second delay
|
||||
$envelopes = iterator_to_array($receiver->get());
|
||||
/** @var Envelope $envelope */
|
||||
$envelope = $envelopes[0];
|
||||
@ -95,25 +99,36 @@ class AmqpExtIntegrationTest extends TestCase
|
||||
$sender->send($newEnvelope);
|
||||
$receiver->ack($envelope);
|
||||
|
||||
$envelopes = [];
|
||||
$startTime = time();
|
||||
// wait for next message, but only for max 3 seconds
|
||||
while (0 === \count($envelopes) && $startTime + 3 > time()) {
|
||||
$envelopes = iterator_to_array($receiver->get());
|
||||
}
|
||||
// send a 2nd message with a shorter delay and custom routing key
|
||||
$customRoutingKeyMessage = new DummyMessage('custom routing key');
|
||||
$envelopeCustomRoutingKey = new Envelope($customRoutingKeyMessage, [
|
||||
new DelayStamp(1000),
|
||||
new AmqpStamp('my_custom_routing_key'),
|
||||
]);
|
||||
$sender->send($envelopeCustomRoutingKey);
|
||||
|
||||
// wait for next message (but max at 3 seconds)
|
||||
$startTime = microtime(true);
|
||||
$envelopes = $this->receiveEnvelopes($receiver, 3);
|
||||
|
||||
// duration should be about 1 second
|
||||
$this->assertApproximateDuration($startTime, 1);
|
||||
|
||||
// this should be the custom routing key message first
|
||||
$this->assertCount(1, $envelopes);
|
||||
/** @var Envelope $envelope */
|
||||
$envelope = $envelopes[0];
|
||||
/* @var Envelope $envelope */
|
||||
$receiver->ack($envelopes[0]);
|
||||
$this->assertEquals($customRoutingKeyMessage, $envelopes[0]->getMessage());
|
||||
|
||||
// should have a 2 second delay
|
||||
$this->assertGreaterThanOrEqual($startTime + 2, time());
|
||||
// but only a 2 second delay
|
||||
$this->assertLessThan($startTime + 4, time());
|
||||
// wait for final message (but max at 3 seconds)
|
||||
$envelopes = $this->receiveEnvelopes($receiver, 3);
|
||||
// duration should be about 2 seconds
|
||||
$this->assertApproximateDuration($startTime, 2);
|
||||
|
||||
/** @var RedeliveryStamp|null $retryStamp */
|
||||
/* @var RedeliveryStamp|null $retryStamp */
|
||||
// verify the stamp still exists from the last send
|
||||
$retryStamp = $envelope->last(RedeliveryStamp::class);
|
||||
$this->assertCount(1, $envelopes);
|
||||
$retryStamp = $envelopes[0]->last(RedeliveryStamp::class);
|
||||
$this->assertNotNull($retryStamp);
|
||||
$this->assertSame(1, $retryStamp->getRetryCount());
|
||||
|
||||
@ -206,4 +221,29 @@ TXT
|
||||
new SerializerComponent\Serializer([new ObjectNormalizer(), new ArrayDenormalizer()], ['json' => new JsonEncoder()])
|
||||
);
|
||||
}
|
||||
|
||||
private function assertApproximateDuration($startTime, int $expectedDuration)
|
||||
{
|
||||
$actualDuration = microtime(true) - $startTime;
|
||||
|
||||
if (\method_exists([$this, 'assertEqualsWithDelta'])) {
|
||||
$this->assertEqualsWithDelta($expectedDuration, $actualDuration, 'Duration was not within expected range', .5);
|
||||
} else {
|
||||
$this->assertEquals($expectedDuration, $actualDuration, 'Duration was not within expected range', .5);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Envelope[]
|
||||
*/
|
||||
private function receiveEnvelopes(ReceiverInterface $receiver, int $timeout): array
|
||||
{
|
||||
$envelopes = [];
|
||||
$startTime = microtime(true);
|
||||
while (0 === \count($envelopes) && $startTime + $timeout > time()) {
|
||||
$envelopes = iterator_to_array($receiver->get());
|
||||
}
|
||||
|
||||
return $envelopes;
|
||||
}
|
||||
}
|
||||
|
@ -79,8 +79,8 @@ class Connection
|
||||
* * flags: Exchange flags (Default: AMQP_DURABLE)
|
||||
* * arguments: Extra arguments
|
||||
* * delay:
|
||||
* * routing_key_pattern: The pattern of the routing key (Default: "delay_%delay%")
|
||||
* * queue_name_pattern: Pattern to use to create the queues (Default: "delay_queue_%delay%")
|
||||
* * routing_key_pattern: The pattern of the routing key (Default: "delay_%routing_key%_%delay%")
|
||||
* * queue_name_pattern: Pattern to use to create the queues (Default: "delay_queue_%routing_key%_%delay%")
|
||||
* * exchange_name: Name of the exchange to be used for the retried messages (Default: "retry")
|
||||
* * auto_setup: Enable or not the auto-setup of queues and exchanges (Default: true)
|
||||
* * loop_sleep: Amount of micro-seconds to wait if no message are available (Default: 200000)
|
||||
@ -90,9 +90,9 @@ class Connection
|
||||
{
|
||||
$this->connectionOptions = array_replace_recursive([
|
||||
'delay' => [
|
||||
'routing_key_pattern' => 'delay_%delay%',
|
||||
'routing_key_pattern' => 'delay_%routing_key%_%delay%',
|
||||
'exchange_name' => 'delay',
|
||||
'queue_name_pattern' => 'delay_queue_%delay%',
|
||||
'queue_name_pattern' => 'delay_queue_%routing_key%_%delay%',
|
||||
],
|
||||
], $connectionOptions);
|
||||
$this->exchangeOptions = $exchangeOptions;
|
||||
@ -186,7 +186,7 @@ class Connection
|
||||
$this->publishOnExchange(
|
||||
$this->exchange(),
|
||||
$body,
|
||||
(null !== $amqpStamp ? $amqpStamp->getRoutingKey() : null) ?? $this->getDefaultPublishRoutingKey(),
|
||||
$this->getRoutingKeyForMessage($amqpStamp),
|
||||
[
|
||||
'headers' => $headers,
|
||||
],
|
||||
@ -209,14 +209,16 @@ class Connection
|
||||
*/
|
||||
private function publishWithDelay(string $body, array $headers, int $delay, AmqpStamp $amqpStamp = null)
|
||||
{
|
||||
$routingKey = $this->getRoutingKeyForMessage($amqpStamp);
|
||||
|
||||
if ($this->shouldSetup()) {
|
||||
$this->setupDelay($delay, null !== $amqpStamp ? $amqpStamp->getRoutingKey() : null);
|
||||
$this->setupDelay($delay, $routingKey);
|
||||
}
|
||||
|
||||
$this->publishOnExchange(
|
||||
$this->getDelayExchange(),
|
||||
$body,
|
||||
$this->getRoutingKeyForDelay($delay),
|
||||
$this->getRoutingKeyForDelay($delay, $routingKey),
|
||||
[
|
||||
'headers' => $headers,
|
||||
],
|
||||
@ -245,7 +247,7 @@ class Connection
|
||||
|
||||
$queue = $this->createDelayQueue($delay, $routingKey);
|
||||
$queue->declareQueue();
|
||||
$queue->bind($exchange->getName(), $this->getRoutingKeyForDelay($delay));
|
||||
$queue->bind($exchange->getName(), $this->getRoutingKeyForDelay($delay, $routingKey));
|
||||
}
|
||||
|
||||
private function getDelayExchange(): \AMQPExchange
|
||||
@ -271,13 +273,16 @@ class Connection
|
||||
private function createDelayQueue(int $delay, ?string $routingKey)
|
||||
{
|
||||
$queue = $this->amqpFactory->createQueue($this->channel());
|
||||
$queue->setName(str_replace('%delay%', $delay, $this->connectionOptions['delay']['queue_name_pattern']));
|
||||
$queue->setName(str_replace(
|
||||
['%delay%', '%routing_key%'],
|
||||
[$delay, $routingKey ?: ''],
|
||||
$this->connectionOptions['delay']['queue_name_pattern']
|
||||
));
|
||||
$queue->setArguments([
|
||||
'x-message-ttl' => $delay,
|
||||
'x-dead-letter-exchange' => $this->exchange()->getName(),
|
||||
]);
|
||||
|
||||
$routingKey = $routingKey ?? $this->getDefaultPublishRoutingKey();
|
||||
if (null !== $routingKey) {
|
||||
// after being released from to DLX, this routing key will be used
|
||||
$queue->setArgument('x-dead-letter-routing-key', $routingKey);
|
||||
@ -286,9 +291,13 @@ class Connection
|
||||
return $queue;
|
||||
}
|
||||
|
||||
private function getRoutingKeyForDelay(int $delay): string
|
||||
private function getRoutingKeyForDelay(int $delay, ?string $finalRoutingKey): string
|
||||
{
|
||||
return str_replace('%delay%', $delay, $this->connectionOptions['delay']['routing_key_pattern']);
|
||||
return str_replace(
|
||||
['%delay%', '%routing_key%'],
|
||||
[$delay, $finalRoutingKey ?: ''],
|
||||
$this->connectionOptions['delay']['routing_key_pattern']
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -444,4 +453,9 @@ class Connection
|
||||
$this->queue($queueName)->purge();
|
||||
}
|
||||
}
|
||||
|
||||
private function getRoutingKeyForMessage(?AmqpStamp $amqpStamp): ?string
|
||||
{
|
||||
return (null !== $amqpStamp ? $amqpStamp->getRoutingKey() : null) ?? $this->getDefaultPublishRoutingKey();
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user