From 115a9bbeda0cf4cc488ec97c52435241d084c9aa Mon Sep 17 00:00:00 2001 From: Robin Chalas Date: Tue, 1 Oct 2019 15:08:41 +0200 Subject: [PATCH] [Messenger] Allow to configure the db index on Redis transport --- src/Symfony/Component/Messenger/CHANGELOG.md | 1 + .../Tests/Transport/RedisExt/ConnectionTest.php | 9 +++++++++ .../Messenger/Transport/RedisExt/Connection.php | 12 ++++++++++++ 3 files changed, 22 insertions(+) diff --git a/src/Symfony/Component/Messenger/CHANGELOG.md b/src/Symfony/Component/Messenger/CHANGELOG.md index 7a438586cc..30e195f7ee 100644 --- a/src/Symfony/Component/Messenger/CHANGELOG.md +++ b/src/Symfony/Component/Messenger/CHANGELOG.md @@ -10,6 +10,7 @@ CHANGELOG * `InMemoryTransport` handle acknowledged and rejected messages. * Made all dispatched worker event classes final. * Added support for `from_transport` attribute on `messenger.message_handler` tag. + * Added support for passing `dbindex` as a query parameter to the redis transport DSN. 4.3.0 ----- diff --git a/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/ConnectionTest.php b/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/ConnectionTest.php index 0af92af0a9..73b6d51815 100644 --- a/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/ConnectionTest.php +++ b/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/ConnectionTest.php @@ -104,6 +104,15 @@ class ConnectionTest extends TestCase Connection::fromDsn('redis://password@localhost/queue', [], $redis); } + public function testDbIndex() + { + $redis = new \Redis(); + + Connection::fromDsn('redis://password@localhost/queue?dbindex=2', [], $redis); + + $this->assertSame(2, $redis->getDbNum()); + } + public function testFirstGetPendingMessagesThenNewMessages() { $redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock(); diff --git a/src/Symfony/Component/Messenger/Transport/RedisExt/Connection.php b/src/Symfony/Component/Messenger/Transport/RedisExt/Connection.php index 20857c45f0..b3c6f6dc35 100644 --- a/src/Symfony/Component/Messenger/Transport/RedisExt/Connection.php +++ b/src/Symfony/Component/Messenger/Transport/RedisExt/Connection.php @@ -32,6 +32,7 @@ class Connection 'consumer' => 'consumer', 'auto_setup' => true, 'stream_max_entries' => 0, // any value higher than 0 defines an approximate maximum number of stream entries + 'dbindex' => 0, ]; private $connection; @@ -56,6 +57,10 @@ class Connection $this->connection->auth($connectionCredentials['auth']); } + if (($dbIndex = $configuration['dbindex'] ?? self::DEFAULT_OPTIONS['dbindex']) && !$this->connection->select($dbIndex)) { + throw new InvalidArgumentException(sprintf('Redis connection failed: %s', $redis->getLastError())); + } + $this->stream = $configuration['stream'] ?? self::DEFAULT_OPTIONS['stream']; $this->group = $configuration['group'] ?? self::DEFAULT_OPTIONS['group']; $this->consumer = $configuration['consumer'] ?? self::DEFAULT_OPTIONS['consumer']; @@ -97,12 +102,19 @@ class Connection unset($redisOptions['stream_max_entries']); } + $dbIndex = null; + if (\array_key_exists('dbindex', $redisOptions)) { + $dbIndex = filter_var($redisOptions['dbindex'], FILTER_VALIDATE_INT); + unset($redisOptions['dbindex']); + } + return new self([ 'stream' => $stream, 'group' => $group, 'consumer' => $consumer, 'auto_setup' => $autoSetup, 'stream_max_entries' => $maxEntries, + 'dbindex' => $dbIndex, ], $connectionCredentials, $redisOptions, $redis); }