[Cache] fix eventual consistency when using RedisTagAwareAdapter with a cluster
This commit is contained in:
parent
a9e6087e8d
commit
5f2d5e0437
@ -23,17 +23,13 @@ use Symfony\Component\Cache\Marshaller\TagAwareMarshaller;
|
|||||||
use Symfony\Component\Cache\Traits\RedisTrait;
|
use Symfony\Component\Cache\Traits\RedisTrait;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Stores tag id <> cache id relationship as a Redis Set, lookup on invalidation using RENAME+SMEMBERS.
|
* Stores tag id <> cache id relationship as a Redis Set.
|
||||||
*
|
*
|
||||||
* Set (tag relation info) is stored without expiry (non-volatile), while cache always gets an expiry (volatile) even
|
* Set (tag relation info) is stored without expiry (non-volatile), while cache always gets an expiry (volatile) even
|
||||||
* if not set by caller. Thus if you configure redis with the right eviction policy you can be safe this tag <> cache
|
* if not set by caller. Thus if you configure redis with the right eviction policy you can be safe this tag <> cache
|
||||||
* relationship survives eviction (cache cleanup when Redis runs out of memory).
|
* relationship survives eviction (cache cleanup when Redis runs out of memory).
|
||||||
*
|
*
|
||||||
* Requirements:
|
* Redis server 2.8+ with any `volatile-*` eviction policy, OR `noeviction` if you're sure memory will NEVER fill up
|
||||||
* - Client: PHP Redis or Predis
|
|
||||||
* Note: Due to lack of RENAME support it is NOT recommended to use Cluster on Predis, instead use phpredis.
|
|
||||||
* - Server: Redis 2.8+
|
|
||||||
* Configured with any `volatile-*` eviction policy, OR `noeviction` if it will NEVER fill up memory
|
|
||||||
*
|
*
|
||||||
* Design limitations:
|
* Design limitations:
|
||||||
* - Max 4 billion cache keys per cache tag as limited by Redis Set datatype.
|
* - Max 4 billion cache keys per cache tag as limited by Redis Set datatype.
|
||||||
@ -49,11 +45,6 @@ class RedisTagAwareAdapter extends AbstractTagAwareAdapter
|
|||||||
{
|
{
|
||||||
use RedisTrait;
|
use RedisTrait;
|
||||||
|
|
||||||
/**
|
|
||||||
* Limits for how many keys are deleted in batch.
|
|
||||||
*/
|
|
||||||
private const BULK_DELETE_LIMIT = 10000;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* On cache items without a lifetime set, we set it to 100 days. This is to make sure cache items are
|
* On cache items without a lifetime set, we set it to 100 days. This is to make sure cache items are
|
||||||
* preferred to be evicted over tag Sets, if eviction policy is configured according to requirements.
|
* preferred to be evicted over tag Sets, if eviction policy is configured according to requirements.
|
||||||
@ -96,7 +87,7 @@ class RedisTagAwareAdapter extends AbstractTagAwareAdapter
|
|||||||
{
|
{
|
||||||
$eviction = $this->getRedisEvictionPolicy();
|
$eviction = $this->getRedisEvictionPolicy();
|
||||||
if ('noeviction' !== $eviction && 0 !== strpos($eviction, 'volatile-')) {
|
if ('noeviction' !== $eviction && 0 !== strpos($eviction, 'volatile-')) {
|
||||||
throw new LogicException(sprintf('Redis maxmemory-policy setting "%s" is *not* supported by RedisTagAwareAdapter, use "noeviction" or "volatile-*" eviction policies.', $eviction));
|
throw new LogicException(sprintf('Redis maxmemory-policy setting "%s" is *not* supported by RedisTagAwareAdapter, use "noeviction" or "volatile-*" eviction policies.', $eviction));
|
||||||
}
|
}
|
||||||
|
|
||||||
// serialize values
|
// serialize values
|
||||||
@ -159,15 +150,9 @@ class RedisTagAwareAdapter extends AbstractTagAwareAdapter
|
|||||||
return v:sub(14, 13 + v:byte(13) + v:byte(12) * 256 + v:byte(11) * 65536)
|
return v:sub(14, 13 + v:byte(13) + v:byte(12) * 256 + v:byte(11) * 65536)
|
||||||
EOLUA;
|
EOLUA;
|
||||||
|
|
||||||
if ($this->redis instanceof \Predis\ClientInterface) {
|
$results = $this->pipeline(function () use ($ids, $lua) {
|
||||||
$evalArgs = [$lua, 1, &$id];
|
|
||||||
} else {
|
|
||||||
$evalArgs = [$lua, [&$id], 1];
|
|
||||||
}
|
|
||||||
|
|
||||||
$results = $this->pipeline(function () use ($ids, &$id, $evalArgs) {
|
|
||||||
foreach ($ids as $id) {
|
foreach ($ids as $id) {
|
||||||
yield 'eval' => $evalArgs;
|
yield 'eval' => $this->redis instanceof \Predis\ClientInterface ? [$lua, 1, $id] : [$lua, [$id], 1];
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
@ -185,12 +170,15 @@ EOLUA;
|
|||||||
*/
|
*/
|
||||||
protected function doDeleteTagRelations(array $tagData): bool
|
protected function doDeleteTagRelations(array $tagData): bool
|
||||||
{
|
{
|
||||||
$this->pipeline(static function () use ($tagData) {
|
$results = $this->pipeline(static function () use ($tagData) {
|
||||||
foreach ($tagData as $tagId => $idList) {
|
foreach ($tagData as $tagId => $idList) {
|
||||||
array_unshift($idList, $tagId);
|
array_unshift($idList, $tagId);
|
||||||
yield 'sRem' => $idList;
|
yield 'sRem' => $idList;
|
||||||
}
|
}
|
||||||
})->rewind();
|
});
|
||||||
|
foreach ($results as $result) {
|
||||||
|
// no-op
|
||||||
|
}
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
@ -200,79 +188,83 @@ EOLUA;
|
|||||||
*/
|
*/
|
||||||
protected function doInvalidate(array $tagIds): bool
|
protected function doInvalidate(array $tagIds): bool
|
||||||
{
|
{
|
||||||
if (!$this->redis instanceof \Predis\ClientInterface || !$this->redis->getConnection() instanceof PredisCluster) {
|
// This script scans the set of items linked to tag: it empties the set
|
||||||
$movedTagSetIds = $this->renameKeys($this->redis, $tagIds);
|
// and removes the linked items. When the set is still not empty after
|
||||||
} else {
|
// the scan, it means we're in cluster mode and that the linked items
|
||||||
$clusterConnection = $this->redis->getConnection();
|
// are on other nodes: we move the links to a temporary set and we
|
||||||
$tagIdsByConnection = new \SplObjectStorage();
|
// gargage collect that set from the client side.
|
||||||
$movedTagSetIds = [];
|
|
||||||
|
$lua = <<<'EOLUA'
|
||||||
|
local cursor = '0'
|
||||||
|
local id = KEYS[1]
|
||||||
|
repeat
|
||||||
|
local result = redis.call('SSCAN', id, cursor, 'COUNT', 5000);
|
||||||
|
cursor = result[1];
|
||||||
|
local rems = {}
|
||||||
|
|
||||||
|
for _, v in ipairs(result[2]) do
|
||||||
|
local ok, _ = pcall(redis.call, 'DEL', ARGV[1]..v)
|
||||||
|
if ok then
|
||||||
|
table.insert(rems, v)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
if 0 < #rems then
|
||||||
|
redis.call('SREM', id, unpack(rems))
|
||||||
|
end
|
||||||
|
until '0' == cursor;
|
||||||
|
|
||||||
|
redis.call('SUNIONSTORE', '{'..id..'}'..id, id)
|
||||||
|
redis.call('DEL', id)
|
||||||
|
|
||||||
|
return redis.call('SSCAN', '{'..id..'}'..id, '0', 'COUNT', 5000)
|
||||||
|
EOLUA;
|
||||||
|
|
||||||
|
$results = $this->pipeline(function () use ($tagIds, $lua) {
|
||||||
|
if ($this->redis instanceof \Predis\ClientInterface) {
|
||||||
|
$prefix = $this->redis->getOptions()->prefix ? $this->redis->getOptions()->prefix->getPrefix() : '';
|
||||||
|
} elseif (\is_array($prefix = $this->redis->getOption(\Redis::OPT_PREFIX) ?? '')) {
|
||||||
|
$prefix = current($prefix);
|
||||||
|
}
|
||||||
|
|
||||||
foreach ($tagIds as $id) {
|
foreach ($tagIds as $id) {
|
||||||
$connection = $clusterConnection->getConnectionByKey($id);
|
yield 'eval' => $this->redis instanceof \Predis\ClientInterface ? [$lua, 1, $id, $prefix] : [$lua, [$id, $prefix], 1];
|
||||||
$slot = $tagIdsByConnection[$connection] ?? $tagIdsByConnection[$connection] = new \ArrayObject();
|
|
||||||
$slot[] = $id;
|
|
||||||
}
|
|
||||||
|
|
||||||
foreach ($tagIdsByConnection as $connection) {
|
|
||||||
$slot = $tagIdsByConnection[$connection];
|
|
||||||
$movedTagSetIds = array_merge($movedTagSetIds, $this->renameKeys(new $this->redis($connection, $this->redis->getOptions()), $slot->getArrayCopy()));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// No Sets found
|
|
||||||
if (!$movedTagSetIds) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Now safely take the time to read the keys in each set and collect ids we need to delete
|
|
||||||
$tagIdSets = $this->pipeline(static function () use ($movedTagSetIds) {
|
|
||||||
foreach ($movedTagSetIds as $movedTagId) {
|
|
||||||
yield 'sMembers' => [$movedTagId];
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
// Return combination of the temporary Tag Set ids and their values (cache ids)
|
$lua = <<<'EOLUA'
|
||||||
$ids = array_merge($movedTagSetIds, ...iterator_to_array($tagIdSets, false));
|
local id = KEYS[1]
|
||||||
|
local cursor = table.remove(ARGV)
|
||||||
|
redis.call('SREM', '{'..id..'}'..id, unpack(ARGV))
|
||||||
|
|
||||||
// Delete cache in chunks to avoid overloading the connection
|
return redis.call('SSCAN', '{'..id..'}'..id, cursor, 'COUNT', 5000)
|
||||||
foreach (array_chunk(array_unique($ids), self::BULK_DELETE_LIMIT) as $chunkIds) {
|
EOLUA;
|
||||||
$this->doDelete($chunkIds);
|
|
||||||
|
foreach ($results as $id => [$cursor, $ids]) {
|
||||||
|
while ($ids || '0' !== $cursor) {
|
||||||
|
$this->doDelete($ids);
|
||||||
|
|
||||||
|
$evalArgs = [$id, $cursor];
|
||||||
|
array_splice($evalArgs, 1, 0, $ids);
|
||||||
|
|
||||||
|
if ($this->redis instanceof \Predis\ClientInterface) {
|
||||||
|
array_unshift($evalArgs, $lua, 1);
|
||||||
|
} else {
|
||||||
|
$evalArgs = [$lua, $evalArgs, 1];
|
||||||
|
}
|
||||||
|
|
||||||
|
$results = $this->pipeline(function () use ($evalArgs) {
|
||||||
|
yield 'eval' => $evalArgs;
|
||||||
|
});
|
||||||
|
|
||||||
|
foreach ($results as [$cursor, $ids]) {
|
||||||
|
// no-op
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Renames several keys in order to be able to operate on them without risk of race conditions.
|
|
||||||
*
|
|
||||||
* Filters out keys that do not exist before returning new keys.
|
|
||||||
*
|
|
||||||
* @see https://redis.io/commands/rename
|
|
||||||
* @see https://redis.io/topics/cluster-spec#keys-hash-tags
|
|
||||||
*
|
|
||||||
* @return array Filtered list of the valid moved keys (only those that existed)
|
|
||||||
*/
|
|
||||||
private function renameKeys($redis, array $ids): array
|
|
||||||
{
|
|
||||||
$newIds = [];
|
|
||||||
$uniqueToken = bin2hex(random_bytes(10));
|
|
||||||
|
|
||||||
$results = $this->pipeline(static function () use ($ids, $uniqueToken) {
|
|
||||||
foreach ($ids as $id) {
|
|
||||||
yield 'rename' => [$id, '{'.$id.'}'.$uniqueToken];
|
|
||||||
}
|
|
||||||
}, $redis);
|
|
||||||
|
|
||||||
foreach ($results as $id => $result) {
|
|
||||||
if (true === $result || ($result instanceof Status && Status::get('OK') === $result)) {
|
|
||||||
// Only take into account if ok (key existed), will be false on phpredis if it did not exist
|
|
||||||
$newIds[] = '{'.$id.'}'.$uniqueToken;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return $newIds;
|
|
||||||
}
|
|
||||||
|
|
||||||
private function getRedisEvictionPolicy(): string
|
private function getRedisEvictionPolicy(): string
|
||||||
{
|
{
|
||||||
if (null !== $this->redisEvictionPolicy) {
|
if (null !== $this->redisEvictionPolicy) {
|
||||||
|
@ -19,6 +19,7 @@ use Symfony\Component\Cache\Adapter\AdapterInterface;
|
|||||||
use Symfony\Component\Cache\Adapter\ArrayAdapter;
|
use Symfony\Component\Cache\Adapter\ArrayAdapter;
|
||||||
use Symfony\Component\Cache\Adapter\FilesystemAdapter;
|
use Symfony\Component\Cache\Adapter\FilesystemAdapter;
|
||||||
use Symfony\Component\Cache\Adapter\TagAwareAdapter;
|
use Symfony\Component\Cache\Adapter\TagAwareAdapter;
|
||||||
|
use Symfony\Component\Cache\LockRegistry;
|
||||||
use Symfony\Component\Cache\Tests\Fixtures\PrunableAdapter;
|
use Symfony\Component\Cache\Tests\Fixtures\PrunableAdapter;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -199,6 +200,8 @@ class TagAwareAdapterTest extends AdapterTestCase
|
|||||||
|
|
||||||
public function testLog()
|
public function testLog()
|
||||||
{
|
{
|
||||||
|
$lockFiles = LockRegistry::setFiles([__FILE__]);
|
||||||
|
|
||||||
$logger = $this->createMock(LoggerInterface::class);
|
$logger = $this->createMock(LoggerInterface::class);
|
||||||
$logger
|
$logger
|
||||||
->expects($this->atLeastOnce())
|
->expects($this->atLeastOnce())
|
||||||
@ -209,6 +212,8 @@ class TagAwareAdapterTest extends AdapterTestCase
|
|||||||
|
|
||||||
// Computing will produce at least one log
|
// Computing will produce at least one log
|
||||||
$cache->get('foo', static function (): string { return 'ccc'; });
|
$cache->get('foo', static function (): string { return 'ccc'; });
|
||||||
|
|
||||||
|
LockRegistry::setFiles($lockFiles);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -18,6 +18,9 @@ class LockRegistryTest extends TestCase
|
|||||||
{
|
{
|
||||||
public function testFiles()
|
public function testFiles()
|
||||||
{
|
{
|
||||||
|
if ('\\' === \DIRECTORY_SEPARATOR) {
|
||||||
|
$this->markTestSkipped('LockRegistry is disabled on Windows');
|
||||||
|
}
|
||||||
$lockFiles = LockRegistry::setFiles([]);
|
$lockFiles = LockRegistry::setFiles([]);
|
||||||
LockRegistry::setFiles($lockFiles);
|
LockRegistry::setFiles($lockFiles);
|
||||||
$expected = array_map('realpath', glob(__DIR__.'/../Adapter/*'));
|
$expected = array_map('realpath', glob(__DIR__.'/../Adapter/*'));
|
||||||
|
@ -363,12 +363,6 @@ trait RedisTrait
|
|||||||
protected function doClear($namespace)
|
protected function doClear($namespace)
|
||||||
{
|
{
|
||||||
$cleared = true;
|
$cleared = true;
|
||||||
if ($this->redis instanceof \Predis\ClientInterface) {
|
|
||||||
$evalArgs = [0, $namespace];
|
|
||||||
} else {
|
|
||||||
$evalArgs = [[$namespace], 0];
|
|
||||||
}
|
|
||||||
|
|
||||||
$hosts = $this->getHosts();
|
$hosts = $this->getHosts();
|
||||||
$host = reset($hosts);
|
$host = reset($hosts);
|
||||||
if ($host instanceof \Predis\Client && $host->getConnection() instanceof ReplicationInterface) {
|
if ($host instanceof \Predis\Client && $host->getConnection() instanceof ReplicationInterface) {
|
||||||
@ -385,17 +379,20 @@ trait RedisTrait
|
|||||||
$info = $host->info('Server');
|
$info = $host->info('Server');
|
||||||
$info = $info['Server'] ?? $info;
|
$info = $info['Server'] ?? $info;
|
||||||
|
|
||||||
|
$pattern = $namespace.'*';
|
||||||
|
|
||||||
if (!version_compare($info['redis_version'], '2.8', '>=')) {
|
if (!version_compare($info['redis_version'], '2.8', '>=')) {
|
||||||
// As documented in Redis documentation (http://redis.io/commands/keys) using KEYS
|
// As documented in Redis documentation (http://redis.io/commands/keys) using KEYS
|
||||||
// can hang your server when it is executed against large databases (millions of items).
|
// can hang your server when it is executed against large databases (millions of items).
|
||||||
// Whenever you hit this scale, you should really consider upgrading to Redis 2.8 or above.
|
// Whenever you hit this scale, you should really consider upgrading to Redis 2.8 or above.
|
||||||
$cleared = $host->eval("local keys=redis.call('KEYS',ARGV[1]..'*') for i=1,#keys,5000 do redis.call('DEL',unpack(keys,i,math.min(i+4999,#keys))) end return 1", $evalArgs[0], $evalArgs[1]) && $cleared;
|
$args = $this->redis instanceof \Predis\ClientInterface ? [0, $pattern] : [[$pattern], 0];
|
||||||
|
$cleared = $host->eval("local keys=redis.call('KEYS',ARGV[1]) for i=1,#keys,5000 do redis.call('DEL',unpack(keys,i,math.min(i+4999,#keys))) end return 1", $args[0], $args[1]) && $cleared;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
$cursor = null;
|
$cursor = null;
|
||||||
do {
|
do {
|
||||||
$keys = $host instanceof \Predis\ClientInterface ? $host->scan($cursor, 'MATCH', $namespace.'*', 'COUNT', 1000) : $host->scan($cursor, $namespace.'*', 1000);
|
$keys = $host instanceof \Predis\ClientInterface ? $host->scan($cursor, 'MATCH', $pattern, 'COUNT', 1000) : $host->scan($cursor, $pattern, 1000);
|
||||||
if (isset($keys[1]) && \is_array($keys[1])) {
|
if (isset($keys[1]) && \is_array($keys[1])) {
|
||||||
$cursor = $keys[0];
|
$cursor = $keys[0];
|
||||||
$keys = $keys[1];
|
$keys = $keys[1];
|
||||||
@ -507,6 +504,11 @@ trait RedisTrait
|
|||||||
$results = $redis->exec();
|
$results = $redis->exec();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!$redis instanceof \Predis\ClientInterface && 'eval' === $command && $redis->getLastError()) {
|
||||||
|
$e = new \RedisException($redis->getLastError());
|
||||||
|
$results = array_map(function ($v) use ($e) { return false === $v ? $e : $v; }, $results);
|
||||||
|
}
|
||||||
|
|
||||||
foreach ($ids as $k => $id) {
|
foreach ($ids as $k => $id) {
|
||||||
yield $id => $results[$k];
|
yield $id => $results[$k];
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user