feature #28713 [Cache] added support for connecting to Redis clusters via DSN (nicolas-grekas)
This PR was merged into the 4.2-dev branch.
Discussion
----------
[Cache] added support for connecting to Redis clusters via DSN
| Q | A
| ------------- | ---
| Branch? | master
| Bug fix? | no
| New feature? | yes
| BC breaks? | no
| Deprecations? | no
| Tests pass? | yes
| Fixed tickets | -
| License | MIT
| Doc PR | -
Replaces #28300 and #28175
This PR allows configuring a cluster of Redis servers using all available options of either the phpredis extension or the Predis package:
- the `redis_cluster=0/1` boolean option configures whether the client should use the Redis cluster protocol;
- several hosts can be provided using a syntax very similar to #28598, enabling consistent hashing distribution of keys;
- `failover=error/distribute/slaves` can be set to direct reads at slave servers;
- extra options are passed as is to the driver (e.g. `profile=2.8`)
- Predis per-server settings are also possible, using e.g. `host[localhost][alias]=foo` in the query string, or `host[localhost]=alias%3Dfoo` (ie PHP query arrays or urlencoded key/value pairs)
Commits
-------
a42e8774d6
[Cache] added support for connecting to Redis clusters via DSN
This commit is contained in:
commit
620094a12c
@ -99,7 +99,7 @@
|
||||
"doctrine/doctrine-bundle": "~1.4",
|
||||
"monolog/monolog": "~1.11",
|
||||
"ocramius/proxy-manager": "~0.4|~1.0|~2.0",
|
||||
"predis/predis": "~1.0",
|
||||
"predis/predis": "~1.1",
|
||||
"egulias/email-validator": "~1.2,>=1.2.8|~2.0",
|
||||
"symfony/phpunit-bridge": "~3.4|~4.0",
|
||||
"symfony/security-acl": "~2.8|~3.0",
|
||||
|
@ -136,7 +136,7 @@ abstract class AbstractAdapter implements AdapterInterface, CacheInterface, Logg
|
||||
if (!\is_string($dsn)) {
|
||||
throw new InvalidArgumentException(sprintf('The %s() method expect argument #1 to be string, %s given.', __METHOD__, \gettype($dsn)));
|
||||
}
|
||||
if (0 === strpos($dsn, 'redis://')) {
|
||||
if (0 === strpos($dsn, 'redis:')) {
|
||||
return RedisAdapter::createConnection($dsn, $options);
|
||||
}
|
||||
if (0 === strpos($dsn, 'memcached:')) {
|
||||
|
@ -4,7 +4,8 @@ CHANGELOG
|
||||
4.2.0
|
||||
-----
|
||||
|
||||
* added support for configuring multiple Memcached servers in one DSN
|
||||
* added support for connecting to Redis clusters via DSN
|
||||
* added support for configuring multiple Memcached servers via DSN
|
||||
* added `MarshallerInterface` and `DefaultMarshaller` to allow changing the serializer and provide one that automatically uses igbinary when available
|
||||
* added `CacheInterface`, which provides stampede protection via probabilistic early expiration and should become the preferred way to use a cache
|
||||
* added sub-second expiry accuracy for backends that support it
|
||||
|
@ -34,21 +34,13 @@ class PredisAdapterTest extends AbstractRedisAdapterTest
|
||||
|
||||
$params = array(
|
||||
'scheme' => 'tcp',
|
||||
'host' => $redisHost,
|
||||
'path' => '',
|
||||
'dbindex' => '1',
|
||||
'host' => 'localhost',
|
||||
'port' => 6379,
|
||||
'class' => 'Predis\Client',
|
||||
'timeout' => 3,
|
||||
'persistent' => 0,
|
||||
'persistent_id' => null,
|
||||
'read_timeout' => 0,
|
||||
'retry_interval' => 0,
|
||||
'compression' => true,
|
||||
'tcp_keepalive' => 0,
|
||||
'lazy' => false,
|
||||
'timeout' => 3,
|
||||
'read_write_timeout' => 0,
|
||||
'tcp_nodelay' => true,
|
||||
'database' => '1',
|
||||
'password' => null,
|
||||
);
|
||||
$this->assertSame($params, $connection->getParameters()->toArray());
|
||||
}
|
||||
|
@ -11,6 +11,8 @@
|
||||
|
||||
namespace Symfony\Component\Cache\Tests\Adapter;
|
||||
|
||||
use Symfony\Component\Cache\Adapter\RedisAdapter;
|
||||
|
||||
class PredisRedisClusterAdapterTest extends AbstractRedisAdapterTest
|
||||
{
|
||||
public static function setupBeforeClass()
|
||||
@ -18,7 +20,8 @@ class PredisRedisClusterAdapterTest extends AbstractRedisAdapterTest
|
||||
if (!$hosts = getenv('REDIS_CLUSTER_HOSTS')) {
|
||||
self::markTestSkipped('REDIS_CLUSTER_HOSTS env var is not defined.');
|
||||
}
|
||||
self::$redis = new \Predis\Client(explode(' ', $hosts), array('cluster' => 'redis'));
|
||||
|
||||
self::$redis = RedisAdapter::createConnection('redis:?host['.str_replace(' ', ']&host[', $hosts).']', array('class' => \Predis\Client::class, 'redis_cluster' => true));
|
||||
}
|
||||
|
||||
public static function tearDownAfterClass()
|
||||
|
@ -33,6 +33,11 @@ class RedisAdapterTest extends AbstractRedisAdapterTest
|
||||
|
||||
public function testCreateConnection()
|
||||
{
|
||||
$redis = RedisAdapter::createConnection('redis:?host[h1]&host[h2]&host[/foo:]');
|
||||
$this->assertInstanceOf(\RedisArray::class, $redis);
|
||||
$this->assertSame(array('h1:6379', 'h2:6379', '/foo'), $redis->_hosts());
|
||||
@$redis = null; // some versions of phpredis connect on destruct, let's silence the warning
|
||||
|
||||
$redisHost = getenv('REDIS_HOST');
|
||||
|
||||
$redis = RedisAdapter::createConnection('redis://'.$redisHost);
|
||||
|
@ -11,6 +11,10 @@
|
||||
|
||||
namespace Symfony\Component\Cache\Tests\Adapter;
|
||||
|
||||
use Symfony\Component\Cache\Adapter\AbstractAdapter;
|
||||
use Symfony\Component\Cache\Adapter\RedisAdapter;
|
||||
use Symfony\Component\Cache\Traits\RedisClusterProxy;
|
||||
|
||||
class RedisClusterAdapterTest extends AbstractRedisAdapterTest
|
||||
{
|
||||
public static function setupBeforeClass()
|
||||
@ -22,6 +26,33 @@ class RedisClusterAdapterTest extends AbstractRedisAdapterTest
|
||||
self::markTestSkipped('REDIS_CLUSTER_HOSTS env var is not defined.');
|
||||
}
|
||||
|
||||
self::$redis = new \RedisCluster(null, explode(' ', $hosts));
|
||||
self::$redis = AbstractAdapter::createConnection('redis:?host['.str_replace(' ', ']&host[', $hosts).']', array('lazy' => true, 'redis_cluster' => true));
|
||||
}
|
||||
|
||||
public function createCachePool($defaultLifetime = 0)
|
||||
{
|
||||
$this->assertInstanceOf(RedisClusterProxy::class, self::$redis);
|
||||
$adapter = new RedisAdapter(self::$redis, str_replace('\\', '.', __CLASS__), $defaultLifetime);
|
||||
|
||||
return $adapter;
|
||||
}
|
||||
|
||||
/**
|
||||
* @dataProvider provideFailedCreateConnection
|
||||
* @expectedException \Symfony\Component\Cache\Exception\InvalidArgumentException
|
||||
* @expectedExceptionMessage Redis connection failed
|
||||
*/
|
||||
public function testFailedCreateConnection($dsn)
|
||||
{
|
||||
RedisAdapter::createConnection($dsn);
|
||||
}
|
||||
|
||||
public function provideFailedCreateConnection()
|
||||
{
|
||||
return array(
|
||||
array('redis://localhost:1234?redis_cluster=1'),
|
||||
array('redis://foo@localhost?redis_cluster=1'),
|
||||
array('redis://localhost/123?redis_cluster=1'),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
@ -13,7 +13,6 @@ namespace Symfony\Component\Cache\Traits;
|
||||
|
||||
use Predis\Connection\Aggregate\ClusterInterface;
|
||||
use Predis\Connection\Aggregate\RedisCluster;
|
||||
use Predis\Connection\Factory;
|
||||
use Predis\Response\Status;
|
||||
use Symfony\Component\Cache\Exception\CacheException;
|
||||
use Symfony\Component\Cache\Exception\InvalidArgumentException;
|
||||
@ -37,7 +36,10 @@ trait RedisTrait
|
||||
'retry_interval' => 0,
|
||||
'compression' => true,
|
||||
'tcp_keepalive' => 0,
|
||||
'lazy' => false,
|
||||
'lazy' => null,
|
||||
'redis_cluster' => false,
|
||||
'dbindex' => 0,
|
||||
'failover' => 'none',
|
||||
);
|
||||
private $redis;
|
||||
private $marshaller;
|
||||
@ -53,7 +55,7 @@ trait RedisTrait
|
||||
throw new InvalidArgumentException(sprintf('RedisAdapter namespace contains "%s" but only characters in [-+_.A-Za-z0-9] are allowed.', $match[0]));
|
||||
}
|
||||
if (!$redisClient instanceof \Redis && !$redisClient instanceof \RedisArray && !$redisClient instanceof \RedisCluster && !$redisClient instanceof \Predis\Client && !$redisClient instanceof RedisProxy && !$redisClient instanceof RedisClusterProxy) {
|
||||
throw new InvalidArgumentException(sprintf('%s() expects parameter 1 to be Redis, RedisArray, RedisCluster or Predis\Client, %s given', __METHOD__, \is_object($redisClient) ? \get_class($redisClient) : \gettype($redisClient)));
|
||||
throw new InvalidArgumentException(sprintf('%s() expects parameter 1 to be Redis, RedisArray, RedisCluster or Predis\Client, %s given.', __METHOD__, \is_object($redisClient) ? \get_class($redisClient) : \gettype($redisClient)));
|
||||
}
|
||||
$this->redis = $redisClient;
|
||||
$this->marshaller = $marshaller ?? new DefaultMarshaller();
|
||||
@ -74,57 +76,87 @@ trait RedisTrait
|
||||
*
|
||||
* @throws InvalidArgumentException when the DSN is invalid
|
||||
*
|
||||
* @return \Redis|\Predis\Client According to the "class" option
|
||||
* @return \Redis|\RedisCluster|\Predis\Client According to the "class" option
|
||||
*/
|
||||
public static function createConnection($dsn, array $options = array())
|
||||
{
|
||||
if (0 !== strpos($dsn, 'redis://')) {
|
||||
throw new InvalidArgumentException(sprintf('Invalid Redis DSN: %s does not start with "redis://"', $dsn));
|
||||
}
|
||||
$params = preg_replace_callback('#^redis://(?:(?:[^:@]*+:)?([^@]*+)@)?#', function ($m) use (&$auth) {
|
||||
if (isset($m[1])) {
|
||||
$auth = $m[1];
|
||||
if (0 !== strpos($dsn, 'redis:')) {
|
||||
throw new InvalidArgumentException(sprintf('Invalid Redis DSN: %s does not start with "redis:".', $dsn));
|
||||
}
|
||||
|
||||
return 'file://';
|
||||
if (!\extension_loaded('redis') && !class_exists(\Predis\Client::class)) {
|
||||
throw new CacheException(sprintf('Cannot find the "redis" extension nor the "predis/predis" package: %s', $dsn));
|
||||
}
|
||||
|
||||
$params = preg_replace_callback('#^redis:(//)?(?:(?:[^:@]*+:)?([^@]*+)@)?#', function ($m) use (&$auth) {
|
||||
if (isset($m[2])) {
|
||||
$auth = $m[2];
|
||||
}
|
||||
|
||||
return 'file:'.($m[1] ?? '');
|
||||
}, $dsn);
|
||||
if (false === $params = parse_url($params)) {
|
||||
|
||||
if (false === $params = parse_url($dsn)) {
|
||||
throw new InvalidArgumentException(sprintf('Invalid Redis DSN: %s', $dsn));
|
||||
}
|
||||
if (!isset($params['host']) && !isset($params['path'])) {
|
||||
|
||||
$query = $hosts = array();
|
||||
|
||||
if (isset($params['query'])) {
|
||||
parse_str($params['query'], $query);
|
||||
|
||||
if (isset($query['host'])) {
|
||||
if (!\is_array($hosts = $query['host'])) {
|
||||
throw new InvalidArgumentException(sprintf('Invalid Redis DSN: %s', $dsn));
|
||||
}
|
||||
if (isset($params['path']) && preg_match('#/(\d+)$#', $params['path'], $m)) {
|
||||
foreach ($hosts as $host => $parameters) {
|
||||
if (\is_string($parameters)) {
|
||||
parse_str($parameters, $parameters);
|
||||
}
|
||||
if (false === $i = strrpos($host, ':')) {
|
||||
$hosts[$host] = array('scheme' => 'tcp', 'host' => $host, 'port' => 6379) + $parameters;
|
||||
} elseif ($port = (int) substr($host, 1 + $i)) {
|
||||
$hosts[$host] = array('scheme' => 'tcp', 'host' => substr($host, 0, $i), 'port' => $port) + $parameters;
|
||||
} else {
|
||||
$hosts[$host] = array('scheme' => 'unix', 'path' => substr($host, 0, $i)) + $parameters;
|
||||
}
|
||||
}
|
||||
$hosts = array_values($hosts);
|
||||
}
|
||||
}
|
||||
|
||||
if (isset($params['host']) || isset($params['path'])) {
|
||||
if (!isset($params['dbindex']) && isset($params['path']) && preg_match('#/(\d+)$#', $params['path'], $m)) {
|
||||
$params['dbindex'] = $m[1];
|
||||
$params['path'] = substr($params['path'], 0, -\strlen($m[0]));
|
||||
}
|
||||
|
||||
if (isset($params['host'])) {
|
||||
$scheme = 'tcp';
|
||||
array_unshift($hosts, array('scheme' => 'tcp', 'host' => $params['host'], 'port' => $params['port'] ?? 6379));
|
||||
} else {
|
||||
$scheme = 'unix';
|
||||
array_unshift($hosts, array('scheme' => 'unix', 'path' => $params['path']));
|
||||
}
|
||||
$params += array(
|
||||
'host' => isset($params['host']) ? $params['host'] : $params['path'],
|
||||
'port' => isset($params['host']) ? 6379 : null,
|
||||
'dbindex' => 0,
|
||||
);
|
||||
if (isset($params['query'])) {
|
||||
parse_str($params['query'], $query);
|
||||
$params += $query;
|
||||
}
|
||||
$params += $options + self::$defaultConnectionOptions;
|
||||
if (null === $params['class'] && !\extension_loaded('redis') && !class_exists(\Predis\Client::class)) {
|
||||
throw new CacheException(sprintf('Cannot find the "redis" extension, and "predis/predis" is not installed: %s', $dsn));
|
||||
|
||||
if (!$hosts) {
|
||||
throw new InvalidArgumentException(sprintf('Invalid Redis DSN: %s', $dsn));
|
||||
}
|
||||
|
||||
$params += $query + $options + self::$defaultConnectionOptions;
|
||||
|
||||
if (null === $params['class'] && \extension_loaded('redis')) {
|
||||
$class = $params['redis_cluster'] ? \RedisCluster::class : (1 < \count($hosts) ? \RedisArray::class : \Redis::class);
|
||||
} else {
|
||||
$class = null === $params['class'] ? \Predis\Client::class : $params['class'];
|
||||
}
|
||||
$class = null === $params['class'] ? (\extension_loaded('redis') ? \Redis::class : \Predis\Client::class) : $params['class'];
|
||||
|
||||
if (is_a($class, \Redis::class, true)) {
|
||||
$connect = $params['persistent'] || $params['persistent_id'] ? 'pconnect' : 'connect';
|
||||
$redis = new $class();
|
||||
|
||||
$initializer = function ($redis) use ($connect, $params, $dsn, $auth) {
|
||||
$initializer = function ($redis) use ($connect, $params, $dsn, $auth, $hosts) {
|
||||
try {
|
||||
@$redis->{$connect}($params['host'], $params['port'], $params['timeout'], $params['persistent_id'], $params['retry_interval']);
|
||||
@$redis->{$connect}($hosts[0]['host'], $hosts[0]['port'], $params['timeout'], (string) $params['persistent_id'], $params['retry_interval']);
|
||||
} catch (\RedisException $e) {
|
||||
throw new InvalidArgumentException(sprintf('Redis connection failed (%s): %s', $e->getMessage(), $dsn));
|
||||
}
|
||||
@ -160,15 +192,82 @@ trait RedisTrait
|
||||
} else {
|
||||
$initializer($redis);
|
||||
}
|
||||
} elseif (is_a($class, \RedisArray::class, true)) {
|
||||
foreach ($hosts as $i => $host) {
|
||||
$hosts[$i] = 'tcp' === $host['scheme'] ? $host['host'].':'.$host['port'] : $host['path'];
|
||||
}
|
||||
$params['lazy_connect'] = $params['lazy'] ?? true;
|
||||
$params['connect_timeout'] = $params['timeout'];
|
||||
|
||||
try {
|
||||
$redis = new $class($hosts, $params);
|
||||
} catch (\RedisClusterException $e) {
|
||||
throw new InvalidArgumentException(sprintf('Redis connection failed (%s): %s', $e->getMessage(), $dsn));
|
||||
}
|
||||
|
||||
if (0 < $params['tcp_keepalive'] && \defined('Redis::OPT_TCP_KEEPALIVE')) {
|
||||
$redis->setOption(\Redis::OPT_TCP_KEEPALIVE, $params['tcp_keepalive']);
|
||||
}
|
||||
if ($params['compression'] && \defined('Redis::COMPRESSION_LZF')) {
|
||||
$redis->setOption(\Redis::OPT_COMPRESSION, \Redis::COMPRESSION_LZF);
|
||||
}
|
||||
} elseif (is_a($class, \RedisCluster::class, true)) {
|
||||
$initializer = function () use ($class, $params, $dsn, $hosts) {
|
||||
foreach ($hosts as $i => $host) {
|
||||
$hosts[$i] = 'tcp' === $host['scheme'] ? $host['host'].':'.$host['port'] : $host['path'];
|
||||
}
|
||||
|
||||
try {
|
||||
$redis = new $class(null, $hosts, $params['timeout'], $params['read_timeout'], (bool) $params['persistent']);
|
||||
} catch (\RedisClusterException $e) {
|
||||
throw new InvalidArgumentException(sprintf('Redis connection failed (%s): %s', $e->getMessage(), $dsn));
|
||||
}
|
||||
|
||||
if (0 < $params['tcp_keepalive'] && \defined('Redis::OPT_TCP_KEEPALIVE')) {
|
||||
$redis->setOption(\Redis::OPT_TCP_KEEPALIVE, $params['tcp_keepalive']);
|
||||
}
|
||||
if ($params['compression'] && \defined('Redis::COMPRESSION_LZF')) {
|
||||
$redis->setOption(\Redis::OPT_COMPRESSION, \Redis::COMPRESSION_LZF);
|
||||
}
|
||||
switch ($params['failover']) {
|
||||
case 'error': $redis->setOption(\RedisCluster::OPT_SLAVE_FAILOVER, \RedisCluster::FAILOVER_ERROR); break;
|
||||
case 'distribute': $redis->setOption(\RedisCluster::OPT_SLAVE_FAILOVER, \RedisCluster::FAILOVER_DISTRIBUTE); break;
|
||||
case 'slaves': $redis->setOption(\RedisCluster::OPT_SLAVE_FAILOVER, \RedisCluster::FAILOVER_DISTRIBUTE_SLAVES); break;
|
||||
}
|
||||
|
||||
return $redis;
|
||||
};
|
||||
|
||||
$redis = $params['lazy'] ? new RedisClusterProxy($initializer) : $initializer();
|
||||
} elseif (is_a($class, \Predis\Client::class, true)) {
|
||||
$params['scheme'] = $scheme;
|
||||
$params['database'] = $params['dbindex'] ?: null;
|
||||
$params['password'] = $auth;
|
||||
$redis = new $class((new Factory())->create($params));
|
||||
if ($params['redis_cluster']) {
|
||||
$params['cluster'] = 'redis';
|
||||
}
|
||||
$params += array('parameters' => array());
|
||||
$params['parameters'] += array(
|
||||
'persistent' => $params['persistent'],
|
||||
'timeout' => $params['timeout'],
|
||||
'read_write_timeout' => $params['read_timeout'],
|
||||
'tcp_nodelay' => true,
|
||||
);
|
||||
if ($params['dbindex']) {
|
||||
$params['parameters']['database'] = $params['dbindex'];
|
||||
}
|
||||
if (null !== $auth) {
|
||||
$params['parameters']['password'] = $auth;
|
||||
}
|
||||
if (1 === \count($hosts) && !$params['redis_cluster']) {
|
||||
$hosts = $hosts[0];
|
||||
} elseif (\in_array($params['failover'], array('slaves', 'distribute'), true) && !isset($params['replication'])) {
|
||||
$params['replication'] = true;
|
||||
$hosts[0] += array('alias' => 'master');
|
||||
}
|
||||
|
||||
$redis = new $class($hosts, array_diff_key($params, self::$defaultConnectionOptions));
|
||||
} elseif (class_exists($class, false)) {
|
||||
throw new InvalidArgumentException(sprintf('"%s" is not a subclass of "Redis" or "Predis\Client"', $class));
|
||||
throw new InvalidArgumentException(sprintf('"%s" is not a subclass of "Redis", "RedisArray", "RedisCluster" nor "Predis\Client".', $class));
|
||||
} else {
|
||||
throw new InvalidArgumentException(sprintf('Class "%s" does not exist', $class));
|
||||
throw new InvalidArgumentException(sprintf('Class "%s" does not exist.', $class));
|
||||
}
|
||||
|
||||
return $redis;
|
||||
@ -183,7 +282,6 @@ trait RedisTrait
|
||||
return array();
|
||||
}
|
||||
|
||||
$i = -1;
|
||||
$result = array();
|
||||
|
||||
if ($this->redis instanceof \Predis\Client) {
|
||||
@ -244,6 +342,7 @@ trait RedisTrait
|
||||
$h->connect($host[0], $host[1]);
|
||||
}
|
||||
}
|
||||
|
||||
foreach ($hosts as $host) {
|
||||
if (!isset($namespace[0])) {
|
||||
$cleared = $host->flushDb() && $cleared;
|
||||
|
@ -32,7 +32,7 @@
|
||||
"cache/integration-tests": "dev-master",
|
||||
"doctrine/cache": "~1.6",
|
||||
"doctrine/dbal": "~2.5",
|
||||
"predis/predis": "~1.0",
|
||||
"predis/predis": "~1.1",
|
||||
"symfony/config": "~4.2",
|
||||
"symfony/dependency-injection": "~3.4",
|
||||
"symfony/var-dumper": "^4.1.1"
|
||||
|
Reference in New Issue
Block a user