bug #21908 [Cache] Fix Redis pipelining/multi-ops (nicolas-grekas)
This PR was merged into the 3.2 branch.
Discussion
----------
[Cache] Fix Redis pipelining/multi-ops
| Q | A
| ------------- | ---
| Branch? | 3.2
| Bug fix? | yes
| New feature? | no
| BC breaks? | no
| Deprecations? | no
| Tests pass? | yes
| Fixed tickets | #21858
| License | MIT
| Doc PR | -
`MSET` doesn't work on clustered connections. Let's use pipelining instead.
Commits
-------
f1648e2
[Cache] Fix Redis pipelining/multi-ops
This commit is contained in:
commit
3265ed4a66
@ -14,6 +14,7 @@ namespace Symfony\Component\Cache\Adapter;
|
|||||||
use Predis\Connection\Factory;
|
use Predis\Connection\Factory;
|
||||||
use Predis\Connection\Aggregate\PredisCluster;
|
use Predis\Connection\Aggregate\PredisCluster;
|
||||||
use Predis\Connection\Aggregate\RedisCluster;
|
use Predis\Connection\Aggregate\RedisCluster;
|
||||||
|
use Predis\Response\Status;
|
||||||
use Symfony\Component\Cache\Exception\InvalidArgumentException;
|
use Symfony\Component\Cache\Exception\InvalidArgumentException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -136,11 +137,14 @@ class RedisAdapter extends AbstractAdapter
|
|||||||
protected function doFetch(array $ids)
|
protected function doFetch(array $ids)
|
||||||
{
|
{
|
||||||
if ($ids) {
|
if ($ids) {
|
||||||
$values = $this->redis->mGet($ids);
|
$values = $this->pipeline(function () use ($ids) {
|
||||||
$index = 0;
|
foreach ($ids as $id) {
|
||||||
foreach ($ids as $id) {
|
yield 'get' => array($id);
|
||||||
if ($value = $values[$index++]) {
|
}
|
||||||
yield $id => parent::unserialize($value);
|
});
|
||||||
|
foreach ($values as $id => $v) {
|
||||||
|
if ($v) {
|
||||||
|
yield $id => parent::unserialize($v);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -251,61 +255,63 @@ class RedisAdapter extends AbstractAdapter
|
|||||||
return $failed;
|
return $failed;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (0 >= $lifetime) {
|
$results = $this->pipeline(function () use ($serialized, $lifetime) {
|
||||||
$this->redis->mSet($serialized);
|
|
||||||
|
|
||||||
return $failed;
|
|
||||||
}
|
|
||||||
|
|
||||||
$this->pipeline(function ($pipe) use (&$serialized, $lifetime) {
|
|
||||||
foreach ($serialized as $id => $value) {
|
foreach ($serialized as $id => $value) {
|
||||||
$pipe('setEx', $id, array($lifetime, $value));
|
if (0 >= $lifetime) {
|
||||||
|
yield 'set' => array($id, $value);
|
||||||
|
} else {
|
||||||
|
yield 'setEx' => array($id, $lifetime, $value);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
foreach ($results as $id => $result) {
|
||||||
|
if (true !== $result && (!$result instanceof Status || $result !== Status::get('OK'))) {
|
||||||
|
$failed[] = $id;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return $failed;
|
return $failed;
|
||||||
}
|
}
|
||||||
|
|
||||||
private function execute($command, $id, array $args, $redis = null)
|
private function pipeline(\Closure $generator)
|
||||||
{
|
{
|
||||||
array_unshift($args, $id);
|
$ids = array();
|
||||||
call_user_func_array(array($redis ?: $this->redis, $command), $args);
|
|
||||||
}
|
|
||||||
|
|
||||||
private function pipeline(\Closure $callback)
|
if ($this->redis instanceof \Predis\Client) {
|
||||||
{
|
$results = $this->redis->pipeline(function ($redis) use ($generator, &$ids) {
|
||||||
$redis = $this->redis;
|
foreach ($generator() as $command => $args) {
|
||||||
|
call_user_func_array(array($redis, $command), $args);
|
||||||
try {
|
$ids[] = $args[0];
|
||||||
if ($redis instanceof \Predis\Client) {
|
|
||||||
$redis->pipeline(function ($pipe) use ($callback) {
|
|
||||||
$this->redis = $pipe;
|
|
||||||
$callback(array($this, 'execute'));
|
|
||||||
});
|
|
||||||
} elseif ($redis instanceof \RedisArray) {
|
|
||||||
$connections = array();
|
|
||||||
$callback(function ($command, $id, $args) use (&$connections) {
|
|
||||||
if (!isset($connections[$h = $this->redis->_target($id)])) {
|
|
||||||
$connections[$h] = $this->redis->_instance($h);
|
|
||||||
$connections[$h]->multi(\Redis::PIPELINE);
|
|
||||||
}
|
|
||||||
$this->execute($command, $id, $args, $connections[$h]);
|
|
||||||
});
|
|
||||||
foreach ($connections as $c) {
|
|
||||||
$c->exec();
|
|
||||||
}
|
}
|
||||||
} else {
|
});
|
||||||
$pipe = $redis->multi(\Redis::PIPELINE);
|
} elseif ($this->redis instanceof \RedisArray) {
|
||||||
try {
|
$connections = $results = $ids = array();
|
||||||
$callback(array($this, 'execute'));
|
foreach ($generator() as $command => $args) {
|
||||||
} finally {
|
if (!isset($connections[$h = $this->redis->_target($args[0])])) {
|
||||||
if ($pipe) {
|
$connections[$h] = array($this->redis->_instance($h), -1);
|
||||||
$redis->exec();
|
$connections[$h][0]->multi(\Redis::PIPELINE);
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
call_user_func_array(array($connections[$h][0], $command), $args);
|
||||||
|
$results[] = array($h, ++$connections[$h][1]);
|
||||||
|
$ids[] = $args[0];
|
||||||
}
|
}
|
||||||
} finally {
|
foreach ($connections as $h => $c) {
|
||||||
$this->redis = $redis;
|
$connections[$h] = $c[0]->exec();
|
||||||
|
}
|
||||||
|
foreach ($results as $k => list($h, $c)) {
|
||||||
|
$results[$k] = $connections[$h][$c];
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
$this->redis->multi(\Redis::PIPELINE);
|
||||||
|
foreach ($generator() as $command => $args) {
|
||||||
|
call_user_func_array(array($this->redis, $command), $args);
|
||||||
|
$ids[] = $args[0];
|
||||||
|
}
|
||||||
|
$results = $this->redis->exec();
|
||||||
|
}
|
||||||
|
|
||||||
|
foreach ($ids as $k => $id) {
|
||||||
|
yield $id => $results[$k];
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,27 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This file is part of the Symfony package.
|
||||||
|
*
|
||||||
|
* (c) Fabien Potencier <fabien@symfony.com>
|
||||||
|
*
|
||||||
|
* For the full copyright and license information, please view the LICENSE
|
||||||
|
* file that was distributed with this source code.
|
||||||
|
*/
|
||||||
|
|
||||||
|
namespace Symfony\Component\Cache\Tests\Adapter;
|
||||||
|
|
||||||
|
class PredisClusterAdapterTest extends AbstractRedisAdapterTest
|
||||||
|
{
|
||||||
|
public static function setupBeforeClass()
|
||||||
|
{
|
||||||
|
parent::setupBeforeClass();
|
||||||
|
self::$redis = new \Predis\Client(array(getenv('REDIS_HOST')));
|
||||||
|
}
|
||||||
|
|
||||||
|
public static function tearDownAfterClass()
|
||||||
|
{
|
||||||
|
self::$redis->getConnection()->getConnectionByKey('foo')->executeCommand(self::$redis->createCommand('FLUSHDB'));
|
||||||
|
self::$redis = null;
|
||||||
|
}
|
||||||
|
}
|
Reference in New Issue
Block a user