[Cache] Fix Redis pipelining/multi-ops
This commit is contained in:
parent
94d059d530
commit
f1648e2484
@ -14,6 +14,7 @@ namespace Symfony\Component\Cache\Adapter;
|
||||
use Predis\Connection\Factory;
|
||||
use Predis\Connection\Aggregate\PredisCluster;
|
||||
use Predis\Connection\Aggregate\RedisCluster;
|
||||
use Predis\Response\Status;
|
||||
use Symfony\Component\Cache\Exception\InvalidArgumentException;
|
||||
|
||||
/**
|
||||
@ -136,11 +137,14 @@ class RedisAdapter extends AbstractAdapter
|
||||
protected function doFetch(array $ids)
|
||||
{
|
||||
if ($ids) {
|
||||
$values = $this->redis->mGet($ids);
|
||||
$index = 0;
|
||||
foreach ($ids as $id) {
|
||||
if ($value = $values[$index++]) {
|
||||
yield $id => parent::unserialize($value);
|
||||
$values = $this->pipeline(function () use ($ids) {
|
||||
foreach ($ids as $id) {
|
||||
yield 'get' => array($id);
|
||||
}
|
||||
});
|
||||
foreach ($values as $id => $v) {
|
||||
if ($v) {
|
||||
yield $id => parent::unserialize($v);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -251,61 +255,63 @@ class RedisAdapter extends AbstractAdapter
|
||||
return $failed;
|
||||
}
|
||||
|
||||
if (0 >= $lifetime) {
|
||||
$this->redis->mSet($serialized);
|
||||
|
||||
return $failed;
|
||||
}
|
||||
|
||||
$this->pipeline(function ($pipe) use (&$serialized, $lifetime) {
|
||||
$results = $this->pipeline(function () use ($serialized, $lifetime) {
|
||||
foreach ($serialized as $id => $value) {
|
||||
$pipe('setEx', $id, array($lifetime, $value));
|
||||
if (0 >= $lifetime) {
|
||||
yield 'set' => array($id, $value);
|
||||
} else {
|
||||
yield 'setEx' => array($id, $lifetime, $value);
|
||||
}
|
||||
}
|
||||
});
|
||||
foreach ($results as $id => $result) {
|
||||
if (true !== $result && (!$result instanceof Status || $result !== Status::get('OK'))) {
|
||||
$failed[] = $id;
|
||||
}
|
||||
}
|
||||
|
||||
return $failed;
|
||||
}
|
||||
|
||||
private function execute($command, $id, array $args, $redis = null)
|
||||
private function pipeline(\Closure $generator)
|
||||
{
|
||||
array_unshift($args, $id);
|
||||
call_user_func_array(array($redis ?: $this->redis, $command), $args);
|
||||
}
|
||||
$ids = array();
|
||||
|
||||
private function pipeline(\Closure $callback)
|
||||
{
|
||||
$redis = $this->redis;
|
||||
|
||||
try {
|
||||
if ($redis instanceof \Predis\Client) {
|
||||
$redis->pipeline(function ($pipe) use ($callback) {
|
||||
$this->redis = $pipe;
|
||||
$callback(array($this, 'execute'));
|
||||
});
|
||||
} elseif ($redis instanceof \RedisArray) {
|
||||
$connections = array();
|
||||
$callback(function ($command, $id, $args) use (&$connections) {
|
||||
if (!isset($connections[$h = $this->redis->_target($id)])) {
|
||||
$connections[$h] = $this->redis->_instance($h);
|
||||
$connections[$h]->multi(\Redis::PIPELINE);
|
||||
}
|
||||
$this->execute($command, $id, $args, $connections[$h]);
|
||||
});
|
||||
foreach ($connections as $c) {
|
||||
$c->exec();
|
||||
if ($this->redis instanceof \Predis\Client) {
|
||||
$results = $this->redis->pipeline(function ($redis) use ($generator, &$ids) {
|
||||
foreach ($generator() as $command => $args) {
|
||||
call_user_func_array(array($redis, $command), $args);
|
||||
$ids[] = $args[0];
|
||||
}
|
||||
} else {
|
||||
$pipe = $redis->multi(\Redis::PIPELINE);
|
||||
try {
|
||||
$callback(array($this, 'execute'));
|
||||
} finally {
|
||||
if ($pipe) {
|
||||
$redis->exec();
|
||||
}
|
||||
});
|
||||
} elseif ($this->redis instanceof \RedisArray) {
|
||||
$connections = $results = $ids = array();
|
||||
foreach ($generator() as $command => $args) {
|
||||
if (!isset($connections[$h = $this->redis->_target($args[0])])) {
|
||||
$connections[$h] = array($this->redis->_instance($h), -1);
|
||||
$connections[$h][0]->multi(\Redis::PIPELINE);
|
||||
}
|
||||
call_user_func_array(array($connections[$h][0], $command), $args);
|
||||
$results[] = array($h, ++$connections[$h][1]);
|
||||
$ids[] = $args[0];
|
||||
}
|
||||
} finally {
|
||||
$this->redis = $redis;
|
||||
foreach ($connections as $h => $c) {
|
||||
$connections[$h] = $c[0]->exec();
|
||||
}
|
||||
foreach ($results as $k => list($h, $c)) {
|
||||
$results[$k] = $connections[$h][$c];
|
||||
}
|
||||
} else {
|
||||
$this->redis->multi(\Redis::PIPELINE);
|
||||
foreach ($generator() as $command => $args) {
|
||||
call_user_func_array(array($this->redis, $command), $args);
|
||||
$ids[] = $args[0];
|
||||
}
|
||||
$results = $this->redis->exec();
|
||||
}
|
||||
|
||||
foreach ($ids as $k => $id) {
|
||||
yield $id => $results[$k];
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,27 @@
|
||||
<?php
|
||||
|
||||
/*
|
||||
* This file is part of the Symfony package.
|
||||
*
|
||||
* (c) Fabien Potencier <fabien@symfony.com>
|
||||
*
|
||||
* For the full copyright and license information, please view the LICENSE
|
||||
* file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
namespace Symfony\Component\Cache\Tests\Adapter;
|
||||
|
||||
class PredisClusterAdapterTest extends AbstractRedisAdapterTest
|
||||
{
|
||||
public static function setupBeforeClass()
|
||||
{
|
||||
parent::setupBeforeClass();
|
||||
self::$redis = new \Predis\Client(array(getenv('REDIS_HOST')));
|
||||
}
|
||||
|
||||
public static function tearDownAfterClass()
|
||||
{
|
||||
self::$redis->getConnection()->getConnectionByKey('foo')->executeCommand(self::$redis->createCommand('FLUSHDB'));
|
||||
self::$redis = null;
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user