[Messenger] fixed RabbitMQ arguments not passed as integer values
This commit is contained in:
parent
48e5df87a3
commit
f19c035aaa
@ -12,6 +12,7 @@
|
|||||||
namespace Symfony\Component\Messenger\Tests\Transport\AmqpExt;
|
namespace Symfony\Component\Messenger\Tests\Transport\AmqpExt;
|
||||||
|
|
||||||
use PHPUnit\Framework\TestCase;
|
use PHPUnit\Framework\TestCase;
|
||||||
|
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
|
||||||
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpFactory;
|
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpFactory;
|
||||||
use Symfony\Component\Messenger\Transport\AmqpExt\Connection;
|
use Symfony\Component\Messenger\Transport\AmqpExt\Connection;
|
||||||
|
|
||||||
@ -96,17 +97,30 @@ class ConnectionTest extends TestCase
|
|||||||
|
|
||||||
$amqpQueue->expects($this->once())->method('setArguments')->with([
|
$amqpQueue->expects($this->once())->method('setArguments')->with([
|
||||||
'x-dead-letter-exchange' => 'dead-exchange',
|
'x-dead-letter-exchange' => 'dead-exchange',
|
||||||
'x-message-ttl' => '1200',
|
'x-delay' => 100,
|
||||||
|
'x-expires' => 150,
|
||||||
|
'x-max-length' => 200,
|
||||||
|
'x-max-length-bytes' => 300,
|
||||||
|
'x-max-priority' => 4,
|
||||||
|
'x-message-ttl' => 100,
|
||||||
]);
|
]);
|
||||||
|
|
||||||
$amqpExchange->expects($this->once())->method('setArguments')->with([
|
$amqpExchange->expects($this->once())->method('setArguments')->with([
|
||||||
'alternate-exchange' => 'alternate',
|
'alternate-exchange' => 'alternate',
|
||||||
]);
|
]);
|
||||||
|
|
||||||
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?queue[arguments][x-dead-letter-exchange]=dead-exchange', [
|
$dsn = 'amqp://localhost/%2f/messages?'.
|
||||||
|
'queue[arguments][x-dead-letter-exchange]=dead-exchange&'.
|
||||||
|
'queue[arguments][x-message-ttl]=100&'.
|
||||||
|
'queue[arguments][x-delay]=100&'.
|
||||||
|
'queue[arguments][x-expires]=150&'
|
||||||
|
;
|
||||||
|
$connection = Connection::fromDsn($dsn, [
|
||||||
'queue' => [
|
'queue' => [
|
||||||
'arguments' => [
|
'arguments' => [
|
||||||
'x-message-ttl' => '1200',
|
'x-max-length' => '200',
|
||||||
|
'x-max-length-bytes' => '300',
|
||||||
|
'x-max-priority' => '4',
|
||||||
],
|
],
|
||||||
],
|
],
|
||||||
'exchange' => [
|
'exchange' => [
|
||||||
@ -118,6 +132,36 @@ class ConnectionTest extends TestCase
|
|||||||
$connection->publish('body');
|
$connection->publish('body');
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public function invalidQueueArgumentsDataProvider(): iterable
|
||||||
|
{
|
||||||
|
$baseDsn = 'amqp://localhost/%2f/messages';
|
||||||
|
yield [$baseDsn.'?queue[arguments][x-delay]=not-a-number', []];
|
||||||
|
yield [$baseDsn.'?queue[arguments][x-expires]=not-a-number', []];
|
||||||
|
yield [$baseDsn.'?queue[arguments][x-max-length]=not-a-number', []];
|
||||||
|
yield [$baseDsn.'?queue[arguments][x-max-length-bytes]=not-a-number', []];
|
||||||
|
yield [$baseDsn.'?queue[arguments][x-max-priority]=not-a-number', []];
|
||||||
|
yield [$baseDsn.'?queue[arguments][x-message-ttl]=not-a-number', []];
|
||||||
|
|
||||||
|
// Ensure the exception is thrown when the arguments are passed via the array options
|
||||||
|
yield [$baseDsn, ['queue' => ['arguments' => ['x-delay' => 'not-a-number']]]];
|
||||||
|
yield [$baseDsn, ['queue' => ['arguments' => ['x-expires' => 'not-a-number']]]];
|
||||||
|
yield [$baseDsn, ['queue' => ['arguments' => ['x-max-length' => 'not-a-number']]]];
|
||||||
|
yield [$baseDsn, ['queue' => ['arguments' => ['x-max-length-bytes' => 'not-a-number']]]];
|
||||||
|
yield [$baseDsn, ['queue' => ['arguments' => ['x-max-priority' => 'not-a-number']]]];
|
||||||
|
yield [$baseDsn, ['queue' => ['arguments' => ['x-message-ttl' => 'not-a-number']]]];
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @dataProvider invalidQueueArgumentsDataProvider
|
||||||
|
*/
|
||||||
|
public function testFromDsnWithInvalidValueOnQueueArguments(string $dsn, array $options)
|
||||||
|
{
|
||||||
|
$this->expectException(InvalidArgumentException::class);
|
||||||
|
$this->expectExceptionMessage('Integer expected for queue argument');
|
||||||
|
|
||||||
|
Connection::fromDsn($dsn, $options);
|
||||||
|
}
|
||||||
|
|
||||||
public function testItUsesANormalConnectionByDefault()
|
public function testItUsesANormalConnectionByDefault()
|
||||||
{
|
{
|
||||||
$factory = new TestAmqpFactory(
|
$factory = new TestAmqpFactory(
|
||||||
|
@ -24,6 +24,15 @@ use Symfony\Component\Messenger\Exception\InvalidArgumentException;
|
|||||||
*/
|
*/
|
||||||
class Connection
|
class Connection
|
||||||
{
|
{
|
||||||
|
private const ARGUMENTS_AS_INTEGER = [
|
||||||
|
'x-delay',
|
||||||
|
'x-expires',
|
||||||
|
'x-max-length',
|
||||||
|
'x-max-length-bytes',
|
||||||
|
'x-max-priority',
|
||||||
|
'x-message-ttl',
|
||||||
|
];
|
||||||
|
|
||||||
private $connectionCredentials;
|
private $connectionCredentials;
|
||||||
private $exchangeConfiguration;
|
private $exchangeConfiguration;
|
||||||
private $queueConfiguration;
|
private $queueConfiguration;
|
||||||
@ -89,12 +98,32 @@ class Connection
|
|||||||
|
|
||||||
$exchangeOptions = $amqpOptions['exchange'];
|
$exchangeOptions = $amqpOptions['exchange'];
|
||||||
$queueOptions = $amqpOptions['queue'];
|
$queueOptions = $amqpOptions['queue'];
|
||||||
|
|
||||||
unset($amqpOptions['queue'], $amqpOptions['exchange']);
|
unset($amqpOptions['queue'], $amqpOptions['exchange']);
|
||||||
|
|
||||||
|
if (\is_array($queueOptions['arguments'] ?? false)) {
|
||||||
|
$queueOptions['arguments'] = self::normalizeQueueArguments($queueOptions['arguments']);
|
||||||
|
}
|
||||||
|
|
||||||
return new self($amqpOptions, $exchangeOptions, $queueOptions, $debug, $amqpFactory);
|
return new self($amqpOptions, $exchangeOptions, $queueOptions, $debug, $amqpFactory);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static function normalizeQueueArguments(array $arguments): array
|
||||||
|
{
|
||||||
|
foreach (self::ARGUMENTS_AS_INTEGER as $key) {
|
||||||
|
if (!array_key_exists($key, $arguments)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!\is_numeric($arguments[$key])) {
|
||||||
|
throw new InvalidArgumentException(sprintf('Integer expected for queue argument "%s", %s given.', $key, \gettype($arguments[$key])));
|
||||||
|
}
|
||||||
|
|
||||||
|
$arguments[$key] = (int) $arguments[$key];
|
||||||
|
}
|
||||||
|
|
||||||
|
return $arguments;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @throws \AMQPException
|
* @throws \AMQPException
|
||||||
*/
|
*/
|
||||||
|
Reference in New Issue
Block a user