Messenger: validate options for AMQP and Redis Connections

This commit is contained in:
Nicolas PHILIPPE 2019-12-10 16:47:33 +01:00 committed by Fabien Potencier
parent fc30e610d5
commit bc4f7d701f
12 changed files with 155 additions and 6 deletions

View File

@ -35,6 +35,7 @@ Messenger
* Deprecated AmqpExt transport. It has moved to a separate package. Run `composer require symfony/amqp-messenger` to use the new classes.
* Deprecated Doctrine transport. It has moved to a separate package. Run `composer require symfony/doctrine-messenger` to use the new classes.
* Deprecated RedisExt transport. It has moved to a separate package. Run `composer require symfony/redis-messenger` to use the new classes.
* Deprecated use of invalid options in Redis and AMQP connections.
Routing
-------

View File

@ -35,6 +35,7 @@ Messenger
* Removed AmqpExt transport. Run `composer require symfony/amqp-messenger` to keep the transport in your application.
* Removed Doctrine transport. Run `composer require symfony/doctrine-messenger` to keep the transport in your application.
* Removed RedisExt transport. Run `composer require symfony/redis-messenger` to keep the transport in your application.
* Use of invalid options in Redis and AMQP connections now throws an error.
Routing
-------

View File

@ -5,3 +5,4 @@ CHANGELOG
-----
* Introduced the AMQP bridge.
* Deprecated use of invalid options

View File

@ -36,8 +36,8 @@ class AmqpTransportFactoryTest extends TestCase
$factory = new AmqpTransportFactory();
$serializer = $this->createMock(SerializerInterface::class);
$expectedTransport = new AmqpTransport(Connection::fromDsn('amqp://localhost', ['foo' => 'bar']), $serializer);
$expectedTransport = new AmqpTransport(Connection::fromDsn('amqp://localhost', ['host' => 'localhost']), $serializer);
$this->assertEquals($expectedTransport, $factory->createTransport('amqp://localhost', ['foo' => 'bar'], $serializer));
$this->assertEquals($expectedTransport, $factory->createTransport('amqp://localhost', ['host' => 'localhost'], $serializer));
}
}

View File

@ -102,6 +102,42 @@ class ConnectionTest extends TestCase
);
}
/**
* @group legacy
* @expectedDeprecation Invalid option(s) "foo" passed to the AMQP Messenger transport. Passing invalid options is deprecated since Symfony 5.1.
*/
public function testDeprecationIfInvalidOptionIsPassedWithDsn()
{
Connection::fromDsn('amqp://host?foo=bar');
}
/**
* @group legacy
* @expectedDeprecation Invalid option(s) "foo" passed to the AMQP Messenger transport. Passing invalid options is deprecated since Symfony 5.1.
*/
public function testDeprecationIfInvalidOptionIsPassedAsArgument()
{
Connection::fromDsn('amqp://host', ['foo' => 'bar']);
}
/**
* @group legacy
* @expectedDeprecation Invalid queue option(s) "foo" passed to the AMQP Messenger transport. Passing invalid queue options is deprecated since Symfony 5.1.
*/
public function testDeprecationIfInvalidQueueOptionIsPassed()
{
Connection::fromDsn('amqp://host', ['queues' => ['queueName' => ['foo' => 'bar']]]);
}
/**
* @group legacy
* @expectedDeprecation Invalid exchange option(s) "foo" passed to the AMQP Messenger transport. Passing invalid exchange options is deprecated since Symfony 5.1.
*/
public function testDeprecationIfInvalidExchangeOptionIsPassed()
{
Connection::fromDsn('amqp://host', ['exchange' => ['foo' => 'bar']]);
}
public function testSetsParametersOnTheQueueAndExchange()
{
$factory = new TestAmqpFactory(

View File

@ -32,6 +32,47 @@ class Connection
'x-message-ttl',
];
private const AVAILABLE_OPTIONS = [
'host',
'port',
'vhost',
'user',
'password',
'queues',
'exchange',
'delay',
'auto_setup',
'prefetch_count',
'retry',
'persistent',
'frame_max',
'channel_max',
'heartbeat',
'read_timeout',
'write_timeout',
'connect_timeout',
'cacert',
'cert',
'key',
'verify',
'sasl_method',
];
private const AVAILABLE_QUEUE_OPTIONS = [
'binding_keys',
'binding_arguments',
'flags',
'arguments',
];
private const AVAILABLE_EXCHANGE_OPTIONS = [
'name',
'type',
'default_publish_routing_key',
'flags',
'arguments',
];
private $connectionOptions;
private $exchangeOptions;
private $queuesOptions;
@ -84,6 +125,9 @@ class Connection
* * vhost: Virtual Host to use with the AMQP service
* * user: Username to use to connect the the AMQP service
* * password: Password to use the connect to the AMQP service
* * read_timeout: Timeout in for income activity. Note: 0 or greater seconds. May be fractional.
* * write_timeout: Timeout in for outcome activity. Note: 0 or greater seconds. May be fractional.
* * connect_timeout: Connection timeout. Note: 0 or greater seconds. May be fractional.
* * queues[name]: An array of queues, keyed by the name
* * binding_keys: The binding keys (if any) to bind to this queue
* * binding_arguments: Arguments to be used while binding the queue.
@ -100,6 +144,22 @@ class Connection
* * exchange_name: Name of the exchange to be used for the delayed/retried messages (Default: "delays")
* * auto_setup: Enable or not the auto-setup of queues and exchanges (Default: true)
* * prefetch_count: set channel prefetch count
*
* * Connection tuning options (see http://www.rabbitmq.com/amqp-0-9-1-reference.html#connection.tune for details):
* * channel_max: Specifies highest channel number that the server permits. 0 means standard extension limit
* (see PHP_AMQP_MAX_CHANNELS constant)
* * frame_max: The largest frame size that the server proposes for the connection, including frame header
* and end-byte. 0 means standard extension limit (depends on librabbimq default frame size limit)
* * heartbeat: The delay, in seconds, of the connection heartbeat that the server wants.
* 0 means the server does not want a heartbeat. Note, librabbitmq has limited heartbeat support,
* which means heartbeats checked only during blocking calls.
*
* TLS support (see https://www.rabbitmq.com/ssl.html for details):
* * cacert: Path to the CA cert file in PEM format..
* * cert: Path to the client certificate in PEM foramt.
* * key: Path to the client key in PEM format.
* * verify: Enable or disable peer verification. If peer verification is enabled then the common name in the
* server certificate must match the server name. Peer verification is enabled by default.
*/
public static function fromDsn(string $dsn, array $options = [], AmqpFactory $amqpFactory = null): self
{
@ -125,6 +185,8 @@ class Connection
],
], $options, $parsedQuery);
self::validateOptions($amqpOptions);
if (isset($parsedUrl['user'])) {
$amqpOptions['login'] = $parsedUrl['user'];
}
@ -155,6 +217,30 @@ class Connection
return new self($amqpOptions, $exchangeOptions, $queuesOptions, $amqpFactory);
}
private static function validateOptions(array $options): void
{
if (0 < \count($invalidOptions = array_diff(array_keys($options), self::AVAILABLE_OPTIONS))) {
@trigger_error(sprintf('Invalid option(s) "%s" passed to the AMQP Messenger transport. Passing invalid options is deprecated since Symfony 5.1.', implode('", "', $invalidOptions)), E_USER_DEPRECATED);
}
if (\is_array($options['queues'] ?? false)) {
foreach ($options['queues'] as $queue) {
if (!\is_array($queue)) {
continue;
}
if (0 < \count($invalidQueueOptions = array_diff(array_keys($queue), self::AVAILABLE_QUEUE_OPTIONS))) {
@trigger_error(sprintf('Invalid queue option(s) "%s" passed to the AMQP Messenger transport. Passing invalid queue options is deprecated since Symfony 5.1.', implode('", "', $invalidQueueOptions)), E_USER_DEPRECATED);
}
}
}
if (\is_array($options['exchange'] ?? false)
&& 0 < \count($invalidExchangeOptions = array_diff(array_keys($options['exchange']), self::AVAILABLE_EXCHANGE_OPTIONS))) {
@trigger_error(sprintf('Invalid exchange option(s) "%s" passed to the AMQP Messenger transport. Passing invalid exchange options is deprecated since Symfony 5.1.', implode('", "', $invalidExchangeOptions)), E_USER_DEPRECATED);
}
}
private static function normalizeQueueArguments(array $arguments): array
{
foreach (self::ARGUMENTS_AS_INTEGER as $key) {

View File

@ -22,7 +22,6 @@ use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Bridge\Doctrine\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Bridge\Doctrine\Transport\Connection;
class ConnectionTest extends TestCase
{
public function testGetAMessageWillChangeItsStatus()
@ -247,12 +246,14 @@ class ConnectionTest extends TestCase
public function testItThrowsAnExceptionIfAnExtraOptionsInDefined()
{
$this->expectException('Symfony\Component\Messenger\Exception\InvalidArgumentException');
$this->expectExceptionMessage('Unknown option found: [new_option]. Allowed options are [table_name, queue_name, redeliver_timeout, auto_setup]');
Connection::buildConfiguration('doctrine://default', ['new_option' => 'woops']);
}
public function testItThrowsAnExceptionIfAnExtraOptionsInDefinedInDSN()
{
$this->expectException('Symfony\Component\Messenger\Exception\InvalidArgumentException');
$this->expectExceptionMessage('Unknown option found in DSN: [new_option]. Allowed options are [table_name, queue_name, redeliver_timeout, auto_setup]');
Connection::buildConfiguration('doctrine://default?new_option=woops');
}

View File

@ -85,7 +85,7 @@ class Connection
// check for extra keys in options
$optionsExtraKeys = array_diff(array_keys($options), array_keys(self::DEFAULT_OPTIONS));
if (0 < \count($optionsExtraKeys)) {
throw new InvalidArgumentException(sprintf('Unknown option found : [%s]. Allowed options are [%s]', implode(', ', $optionsExtraKeys), implode(', ', array_keys(self::DEFAULT_OPTIONS))));
throw new InvalidArgumentException(sprintf('Unknown option found: [%s]. Allowed options are [%s]', implode(', ', $optionsExtraKeys), implode(', ', array_keys(self::DEFAULT_OPTIONS))));
}
// check for extra keys in options

View File

@ -6,3 +6,4 @@ CHANGELOG
* Introduced the Redis bridge.
* Added TLS option in the DSN. Example: `redis://127.0.0.1?tls=1`
* Deprecated use of invalid options

View File

@ -108,6 +108,15 @@ class ConnectionTest extends TestCase
);
}
/**
* @expectedDeprecation Invalid option(s) "foo" passed to the Redis Messenger transport. Passing invalid options is deprecated since Symfony 5.1.
* @group legacy
*/
public function testDeprecationIfInvalidOptionIsPassedWithDsn()
{
Connection::fromDsn('redis://localhost/queue?foo=bar');
}
public function testKeepGettingPendingMessages()
{
$redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock();

View File

@ -35,8 +35,8 @@ class RedisTransportFactoryTest extends TestCase
{
$factory = new RedisTransportFactory();
$serializer = $this->getMockBuilder(SerializerInterface::class)->getMock();
$expectedTransport = new RedisTransport(Connection::fromDsn('redis://localhost', ['foo' => 'bar']), $serializer);
$expectedTransport = new RedisTransport(Connection::fromDsn('redis://localhost', ['stream' => 'bar']), $serializer);
$this->assertEquals($expectedTransport, $factory->createTransport('redis://localhost', ['foo' => 'bar'], $serializer));
$this->assertEquals($expectedTransport, $factory->createTransport('redis://localhost', ['stream' => 'bar'], $serializer));
}
}

View File

@ -34,6 +34,7 @@ class Connection
'auto_setup' => true,
'stream_max_entries' => 0, // any value higher than 0 defines an approximate maximum number of stream entries
'dbindex' => 0,
'tls' => false,
];
private $connection;
@ -86,6 +87,8 @@ class Connection
parse_str($parsedUrl['query'], $redisOptions);
}
self::validateOptions($redisOptions);
$autoSetup = null;
if (\array_key_exists('auto_setup', $redisOptions)) {
$autoSetup = filter_var($redisOptions['auto_setup'], FILTER_VALIDATE_BOOLEAN);
@ -144,6 +147,16 @@ class Connection
return new self($configuration, $connectionCredentials, $redisOptions, $redis);
}
private static function validateOptions(array $options): void
{
$availableOptions = array_keys(self::DEFAULT_OPTIONS);
$availableOptions[] = 'serializer';
if (0 < \count($invalidOptions = array_diff(array_keys($options), $availableOptions))) {
@trigger_error(sprintf('Invalid option(s) "%s" passed to the Redis Messenger transport. Passing invalid options is deprecated since Symfony 5.1.', implode('", "', $invalidOptions)), E_USER_DEPRECATED);
}
}
public function get(): ?array
{
if ($this->autoSetup) {