diff --git a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/ConnectionTest.php b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/ConnectionTest.php index cd0fc3999e..b7d1be4453 100644 --- a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/ConnectionTest.php +++ b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/ConnectionTest.php @@ -12,6 +12,7 @@ namespace Symfony\Component\Messenger\Tests\Transport\AmqpExt; use PHPUnit\Framework\TestCase; +use Symfony\Component\Messenger\Exception\InvalidArgumentException; use Symfony\Component\Messenger\Transport\AmqpExt\AmqpFactory; use Symfony\Component\Messenger\Transport\AmqpExt\Connection; @@ -96,17 +97,30 @@ class ConnectionTest extends TestCase $amqpQueue->expects($this->once())->method('setArguments')->with([ 'x-dead-letter-exchange' => 'dead-exchange', - 'x-message-ttl' => '1200', + 'x-delay' => 100, + 'x-expires' => 150, + 'x-max-length' => 200, + 'x-max-length-bytes' => 300, + 'x-max-priority' => 4, + 'x-message-ttl' => 100, ]); $amqpExchange->expects($this->once())->method('setArguments')->with([ 'alternate-exchange' => 'alternate', ]); - $connection = Connection::fromDsn('amqp://localhost/%2f/messages?queue[arguments][x-dead-letter-exchange]=dead-exchange', [ + $dsn = 'amqp://localhost/%2f/messages?'. + 'queue[arguments][x-dead-letter-exchange]=dead-exchange&'. + 'queue[arguments][x-message-ttl]=100&'. + 'queue[arguments][x-delay]=100&'. + 'queue[arguments][x-expires]=150&' + ; + $connection = Connection::fromDsn($dsn, [ 'queue' => [ 'arguments' => [ - 'x-message-ttl' => '1200', + 'x-max-length' => '200', + 'x-max-length-bytes' => '300', + 'x-max-priority' => '4', ], ], 'exchange' => [ @@ -118,6 +132,36 @@ class ConnectionTest extends TestCase $connection->publish('body'); } + public function invalidQueueArgumentsDataProvider(): iterable + { + $baseDsn = 'amqp://localhost/%2f/messages'; + yield [$baseDsn.'?queue[arguments][x-delay]=not-a-number', []]; + yield [$baseDsn.'?queue[arguments][x-expires]=not-a-number', []]; + yield [$baseDsn.'?queue[arguments][x-max-length]=not-a-number', []]; + yield [$baseDsn.'?queue[arguments][x-max-length-bytes]=not-a-number', []]; + yield [$baseDsn.'?queue[arguments][x-max-priority]=not-a-number', []]; + yield [$baseDsn.'?queue[arguments][x-message-ttl]=not-a-number', []]; + + // Ensure the exception is thrown when the arguments are passed via the array options + yield [$baseDsn, ['queue' => ['arguments' => ['x-delay' => 'not-a-number']]]]; + yield [$baseDsn, ['queue' => ['arguments' => ['x-expires' => 'not-a-number']]]]; + yield [$baseDsn, ['queue' => ['arguments' => ['x-max-length' => 'not-a-number']]]]; + yield [$baseDsn, ['queue' => ['arguments' => ['x-max-length-bytes' => 'not-a-number']]]]; + yield [$baseDsn, ['queue' => ['arguments' => ['x-max-priority' => 'not-a-number']]]]; + yield [$baseDsn, ['queue' => ['arguments' => ['x-message-ttl' => 'not-a-number']]]]; + } + + /** + * @dataProvider invalidQueueArgumentsDataProvider + */ + public function testFromDsnWithInvalidValueOnQueueArguments(string $dsn, array $options) + { + $this->expectException(InvalidArgumentException::class); + $this->expectExceptionMessage('Integer expected for queue argument'); + + Connection::fromDsn($dsn, $options); + } + public function testItUsesANormalConnectionByDefault() { $factory = new TestAmqpFactory( diff --git a/src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php b/src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php index 2f09d727c9..cffc49df29 100644 --- a/src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php +++ b/src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php @@ -24,6 +24,15 @@ use Symfony\Component\Messenger\Exception\InvalidArgumentException; */ class Connection { + private const ARGUMENTS_AS_INTEGER = [ + 'x-delay', + 'x-expires', + 'x-max-length', + 'x-max-length-bytes', + 'x-max-priority', + 'x-message-ttl', + ]; + private $connectionCredentials; private $exchangeConfiguration; private $queueConfiguration; @@ -89,12 +98,32 @@ class Connection $exchangeOptions = $amqpOptions['exchange']; $queueOptions = $amqpOptions['queue']; - unset($amqpOptions['queue'], $amqpOptions['exchange']); + if (\is_array($queueOptions['arguments'] ?? false)) { + $queueOptions['arguments'] = self::normalizeQueueArguments($queueOptions['arguments']); + } + return new self($amqpOptions, $exchangeOptions, $queueOptions, $debug, $amqpFactory); } + private static function normalizeQueueArguments(array $arguments): array + { + foreach (self::ARGUMENTS_AS_INTEGER as $key) { + if (!array_key_exists($key, $arguments)) { + continue; + } + + if (!\is_numeric($arguments[$key])) { + throw new InvalidArgumentException(sprintf('Integer expected for queue argument "%s", %s given.', $key, \gettype($arguments[$key]))); + } + + $arguments[$key] = (int) $arguments[$key]; + } + + return $arguments; + } + /** * @throws \AMQPException */