[Messenger] Allow to configure the db index on Redis transport
This commit is contained in:
parent
8f92594576
commit
115a9bbeda
|
@ -10,6 +10,7 @@ CHANGELOG
|
||||||
* `InMemoryTransport` handle acknowledged and rejected messages.
|
* `InMemoryTransport` handle acknowledged and rejected messages.
|
||||||
* Made all dispatched worker event classes final.
|
* Made all dispatched worker event classes final.
|
||||||
* Added support for `from_transport` attribute on `messenger.message_handler` tag.
|
* Added support for `from_transport` attribute on `messenger.message_handler` tag.
|
||||||
|
* Added support for passing `dbindex` as a query parameter to the redis transport DSN.
|
||||||
|
|
||||||
4.3.0
|
4.3.0
|
||||||
-----
|
-----
|
||||||
|
|
|
@ -104,6 +104,15 @@ class ConnectionTest extends TestCase
|
||||||
Connection::fromDsn('redis://password@localhost/queue', [], $redis);
|
Connection::fromDsn('redis://password@localhost/queue', [], $redis);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public function testDbIndex()
|
||||||
|
{
|
||||||
|
$redis = new \Redis();
|
||||||
|
|
||||||
|
Connection::fromDsn('redis://password@localhost/queue?dbindex=2', [], $redis);
|
||||||
|
|
||||||
|
$this->assertSame(2, $redis->getDbNum());
|
||||||
|
}
|
||||||
|
|
||||||
public function testFirstGetPendingMessagesThenNewMessages()
|
public function testFirstGetPendingMessagesThenNewMessages()
|
||||||
{
|
{
|
||||||
$redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock();
|
$redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock();
|
||||||
|
|
|
@ -32,6 +32,7 @@ class Connection
|
||||||
'consumer' => 'consumer',
|
'consumer' => 'consumer',
|
||||||
'auto_setup' => true,
|
'auto_setup' => true,
|
||||||
'stream_max_entries' => 0, // any value higher than 0 defines an approximate maximum number of stream entries
|
'stream_max_entries' => 0, // any value higher than 0 defines an approximate maximum number of stream entries
|
||||||
|
'dbindex' => 0,
|
||||||
];
|
];
|
||||||
|
|
||||||
private $connection;
|
private $connection;
|
||||||
|
@ -56,6 +57,10 @@ class Connection
|
||||||
$this->connection->auth($connectionCredentials['auth']);
|
$this->connection->auth($connectionCredentials['auth']);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (($dbIndex = $configuration['dbindex'] ?? self::DEFAULT_OPTIONS['dbindex']) && !$this->connection->select($dbIndex)) {
|
||||||
|
throw new InvalidArgumentException(sprintf('Redis connection failed: %s', $redis->getLastError()));
|
||||||
|
}
|
||||||
|
|
||||||
$this->stream = $configuration['stream'] ?? self::DEFAULT_OPTIONS['stream'];
|
$this->stream = $configuration['stream'] ?? self::DEFAULT_OPTIONS['stream'];
|
||||||
$this->group = $configuration['group'] ?? self::DEFAULT_OPTIONS['group'];
|
$this->group = $configuration['group'] ?? self::DEFAULT_OPTIONS['group'];
|
||||||
$this->consumer = $configuration['consumer'] ?? self::DEFAULT_OPTIONS['consumer'];
|
$this->consumer = $configuration['consumer'] ?? self::DEFAULT_OPTIONS['consumer'];
|
||||||
|
@ -97,12 +102,19 @@ class Connection
|
||||||
unset($redisOptions['stream_max_entries']);
|
unset($redisOptions['stream_max_entries']);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
$dbIndex = null;
|
||||||
|
if (\array_key_exists('dbindex', $redisOptions)) {
|
||||||
|
$dbIndex = filter_var($redisOptions['dbindex'], FILTER_VALIDATE_INT);
|
||||||
|
unset($redisOptions['dbindex']);
|
||||||
|
}
|
||||||
|
|
||||||
return new self([
|
return new self([
|
||||||
'stream' => $stream,
|
'stream' => $stream,
|
||||||
'group' => $group,
|
'group' => $group,
|
||||||
'consumer' => $consumer,
|
'consumer' => $consumer,
|
||||||
'auto_setup' => $autoSetup,
|
'auto_setup' => $autoSetup,
|
||||||
'stream_max_entries' => $maxEntries,
|
'stream_max_entries' => $maxEntries,
|
||||||
|
'dbindex' => $dbIndex,
|
||||||
], $connectionCredentials, $redisOptions, $redis);
|
], $connectionCredentials, $redisOptions, $redis);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Reference in New Issue