[Cache] Fix Redis pipelining/multi-ops

This commit is contained in:
Nicolas Grekas 2017-03-07 12:10:36 +01:00
parent 94d059d530
commit f1648e2484
2 changed files with 81 additions and 48 deletions

View File

@ -14,6 +14,7 @@ namespace Symfony\Component\Cache\Adapter;
use Predis\Connection\Factory; use Predis\Connection\Factory;
use Predis\Connection\Aggregate\PredisCluster; use Predis\Connection\Aggregate\PredisCluster;
use Predis\Connection\Aggregate\RedisCluster; use Predis\Connection\Aggregate\RedisCluster;
use Predis\Response\Status;
use Symfony\Component\Cache\Exception\InvalidArgumentException; use Symfony\Component\Cache\Exception\InvalidArgumentException;
/** /**
@ -136,11 +137,14 @@ class RedisAdapter extends AbstractAdapter
protected function doFetch(array $ids) protected function doFetch(array $ids)
{ {
if ($ids) { if ($ids) {
$values = $this->redis->mGet($ids); $values = $this->pipeline(function () use ($ids) {
$index = 0; foreach ($ids as $id) {
foreach ($ids as $id) { yield 'get' => array($id);
if ($value = $values[$index++]) { }
yield $id => parent::unserialize($value); });
foreach ($values as $id => $v) {
if ($v) {
yield $id => parent::unserialize($v);
} }
} }
} }
@ -251,61 +255,63 @@ class RedisAdapter extends AbstractAdapter
return $failed; return $failed;
} }
if (0 >= $lifetime) { $results = $this->pipeline(function () use ($serialized, $lifetime) {
$this->redis->mSet($serialized);
return $failed;
}
$this->pipeline(function ($pipe) use (&$serialized, $lifetime) {
foreach ($serialized as $id => $value) { foreach ($serialized as $id => $value) {
$pipe('setEx', $id, array($lifetime, $value)); if (0 >= $lifetime) {
yield 'set' => array($id, $value);
} else {
yield 'setEx' => array($id, $lifetime, $value);
}
} }
}); });
foreach ($results as $id => $result) {
if (true !== $result && (!$result instanceof Status || $result !== Status::get('OK'))) {
$failed[] = $id;
}
}
return $failed; return $failed;
} }
private function execute($command, $id, array $args, $redis = null) private function pipeline(\Closure $generator)
{ {
array_unshift($args, $id); $ids = array();
call_user_func_array(array($redis ?: $this->redis, $command), $args);
}
private function pipeline(\Closure $callback) if ($this->redis instanceof \Predis\Client) {
{ $results = $this->redis->pipeline(function ($redis) use ($generator, &$ids) {
$redis = $this->redis; foreach ($generator() as $command => $args) {
call_user_func_array(array($redis, $command), $args);
try { $ids[] = $args[0];
if ($redis instanceof \Predis\Client) {
$redis->pipeline(function ($pipe) use ($callback) {
$this->redis = $pipe;
$callback(array($this, 'execute'));
});
} elseif ($redis instanceof \RedisArray) {
$connections = array();
$callback(function ($command, $id, $args) use (&$connections) {
if (!isset($connections[$h = $this->redis->_target($id)])) {
$connections[$h] = $this->redis->_instance($h);
$connections[$h]->multi(\Redis::PIPELINE);
}
$this->execute($command, $id, $args, $connections[$h]);
});
foreach ($connections as $c) {
$c->exec();
} }
} else { });
$pipe = $redis->multi(\Redis::PIPELINE); } elseif ($this->redis instanceof \RedisArray) {
try { $connections = $results = $ids = array();
$callback(array($this, 'execute')); foreach ($generator() as $command => $args) {
} finally { if (!isset($connections[$h = $this->redis->_target($args[0])])) {
if ($pipe) { $connections[$h] = array($this->redis->_instance($h), -1);
$redis->exec(); $connections[$h][0]->multi(\Redis::PIPELINE);
}
} }
call_user_func_array(array($connections[$h][0], $command), $args);
$results[] = array($h, ++$connections[$h][1]);
$ids[] = $args[0];
} }
} finally { foreach ($connections as $h => $c) {
$this->redis = $redis; $connections[$h] = $c[0]->exec();
}
foreach ($results as $k => list($h, $c)) {
$results[$k] = $connections[$h][$c];
}
} else {
$this->redis->multi(\Redis::PIPELINE);
foreach ($generator() as $command => $args) {
call_user_func_array(array($this->redis, $command), $args);
$ids[] = $args[0];
}
$results = $this->redis->exec();
}
foreach ($ids as $k => $id) {
yield $id => $results[$k];
} }
} }
} }

View File

@ -0,0 +1,27 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Cache\Tests\Adapter;
class PredisClusterAdapterTest extends AbstractRedisAdapterTest
{
public static function setupBeforeClass()
{
parent::setupBeforeClass();
self::$redis = new \Predis\Client(array(getenv('REDIS_HOST')));
}
public static function tearDownAfterClass()
{
self::$redis->getConnection()->getConnectionByKey('foo')->executeCommand(self::$redis->createCommand('FLUSHDB'));
self::$redis = null;
}
}