bug #41804 [Cache] fix eventual consistency when using RedisTagAwareAdapter with a cluster (nicolas-grekas)

This PR was merged into the 4.4 branch.

Discussion
----------

[Cache] fix eventual consistency when using RedisTagAwareAdapter with a cluster

| Q             | A
| ------------- | ---
| Branch?       | 4.4
| Bug fix?      | yes
| New feature?  | no
| Deprecations? | no
| Tickets       | -
| License       | MIT
| Doc PR        | -

Right now, if the Symfony app stops in the middle of the invalidation logic, we lose the keys to invalidate.

This PR fixes the invalidation logic by making it eventually consistent, and also more scalable thanks to using `SSCAN` instead of `SMEMBERS` when iterating over the items to delete.

The eventual consistency happens when the same tag is invalidated again. We could improve this eg by garbage collecting also when saving and deleting an item but I'll let this as an exercise for a future contributor :)

/cc `@andrerom` in case you'd like to have a look.

Commits
-------

5f2d5e0437 [Cache] fix eventual consistency when using RedisTagAwareAdapter with a cluster
This commit is contained in:
Nicolas Grekas 2021-06-23 20:14:15 +02:00
commit c7cd08aa6e
4 changed files with 94 additions and 92 deletions

View File

@ -23,17 +23,13 @@ use Symfony\Component\Cache\Marshaller\TagAwareMarshaller;
use Symfony\Component\Cache\Traits\RedisTrait;
/**
* Stores tag id <> cache id relationship as a Redis Set, lookup on invalidation using RENAME+SMEMBERS.
* Stores tag id <> cache id relationship as a Redis Set.
*
* Set (tag relation info) is stored without expiry (non-volatile), while cache always gets an expiry (volatile) even
* if not set by caller. Thus if you configure redis with the right eviction policy you can be safe this tag <> cache
* relationship survives eviction (cache cleanup when Redis runs out of memory).
*
* Requirements:
* - Client: PHP Redis or Predis
* Note: Due to lack of RENAME support it is NOT recommended to use Cluster on Predis, instead use phpredis.
* - Server: Redis 2.8+
* Configured with any `volatile-*` eviction policy, OR `noeviction` if it will NEVER fill up memory
* Redis server 2.8+ with any `volatile-*` eviction policy, OR `noeviction` if you're sure memory will NEVER fill up
*
* Design limitations:
* - Max 4 billion cache keys per cache tag as limited by Redis Set datatype.
@ -49,11 +45,6 @@ class RedisTagAwareAdapter extends AbstractTagAwareAdapter
{
use RedisTrait;
/**
* Limits for how many keys are deleted in batch.
*/
private const BULK_DELETE_LIMIT = 10000;
/**
* On cache items without a lifetime set, we set it to 100 days. This is to make sure cache items are
* preferred to be evicted over tag Sets, if eviction policy is configured according to requirements.
@ -96,7 +87,7 @@ class RedisTagAwareAdapter extends AbstractTagAwareAdapter
{
$eviction = $this->getRedisEvictionPolicy();
if ('noeviction' !== $eviction && 0 !== strpos($eviction, 'volatile-')) {
throw new LogicException(sprintf('Redis maxmemory-policy setting "%s" is *not* supported by RedisTagAwareAdapter, use "noeviction" or "volatile-*" eviction policies.', $eviction));
throw new LogicException(sprintf('Redis maxmemory-policy setting "%s" is *not* supported by RedisTagAwareAdapter, use "noeviction" or "volatile-*" eviction policies.', $eviction));
}
// serialize values
@ -159,15 +150,9 @@ class RedisTagAwareAdapter extends AbstractTagAwareAdapter
return v:sub(14, 13 + v:byte(13) + v:byte(12) * 256 + v:byte(11) * 65536)
EOLUA;
if ($this->redis instanceof \Predis\ClientInterface) {
$evalArgs = [$lua, 1, &$id];
} else {
$evalArgs = [$lua, [&$id], 1];
}
$results = $this->pipeline(function () use ($ids, &$id, $evalArgs) {
$results = $this->pipeline(function () use ($ids, $lua) {
foreach ($ids as $id) {
yield 'eval' => $evalArgs;
yield 'eval' => $this->redis instanceof \Predis\ClientInterface ? [$lua, 1, $id] : [$lua, [$id], 1];
}
});
@ -185,12 +170,15 @@ EOLUA;
*/
protected function doDeleteTagRelations(array $tagData): bool
{
$this->pipeline(static function () use ($tagData) {
$results = $this->pipeline(static function () use ($tagData) {
foreach ($tagData as $tagId => $idList) {
array_unshift($idList, $tagId);
yield 'sRem' => $idList;
}
})->rewind();
});
foreach ($results as $result) {
// no-op
}
return true;
}
@ -200,79 +188,83 @@ EOLUA;
*/
protected function doInvalidate(array $tagIds): bool
{
if (!$this->redis instanceof \Predis\ClientInterface || !$this->redis->getConnection() instanceof PredisCluster) {
$movedTagSetIds = $this->renameKeys($this->redis, $tagIds);
} else {
$clusterConnection = $this->redis->getConnection();
$tagIdsByConnection = new \SplObjectStorage();
$movedTagSetIds = [];
// This script scans the set of items linked to tag: it empties the set
// and removes the linked items. When the set is still not empty after
// the scan, it means we're in cluster mode and that the linked items
// are on other nodes: we move the links to a temporary set and we
// gargage collect that set from the client side.
$lua = <<<'EOLUA'
local cursor = '0'
local id = KEYS[1]
repeat
local result = redis.call('SSCAN', id, cursor, 'COUNT', 5000);
cursor = result[1];
local rems = {}
for _, v in ipairs(result[2]) do
local ok, _ = pcall(redis.call, 'DEL', ARGV[1]..v)
if ok then
table.insert(rems, v)
end
end
if 0 < #rems then
redis.call('SREM', id, unpack(rems))
end
until '0' == cursor;
redis.call('SUNIONSTORE', '{'..id..'}'..id, id)
redis.call('DEL', id)
return redis.call('SSCAN', '{'..id..'}'..id, '0', 'COUNT', 5000)
EOLUA;
$results = $this->pipeline(function () use ($tagIds, $lua) {
if ($this->redis instanceof \Predis\ClientInterface) {
$prefix = $this->redis->getOptions()->prefix ? $this->redis->getOptions()->prefix->getPrefix() : '';
} elseif (\is_array($prefix = $this->redis->getOption(\Redis::OPT_PREFIX) ?? '')) {
$prefix = current($prefix);
}
foreach ($tagIds as $id) {
$connection = $clusterConnection->getConnectionByKey($id);
$slot = $tagIdsByConnection[$connection] ?? $tagIdsByConnection[$connection] = new \ArrayObject();
$slot[] = $id;
}
foreach ($tagIdsByConnection as $connection) {
$slot = $tagIdsByConnection[$connection];
$movedTagSetIds = array_merge($movedTagSetIds, $this->renameKeys(new $this->redis($connection, $this->redis->getOptions()), $slot->getArrayCopy()));
}
}
// No Sets found
if (!$movedTagSetIds) {
return false;
}
// Now safely take the time to read the keys in each set and collect ids we need to delete
$tagIdSets = $this->pipeline(static function () use ($movedTagSetIds) {
foreach ($movedTagSetIds as $movedTagId) {
yield 'sMembers' => [$movedTagId];
yield 'eval' => $this->redis instanceof \Predis\ClientInterface ? [$lua, 1, $id, $prefix] : [$lua, [$id, $prefix], 1];
}
});
// Return combination of the temporary Tag Set ids and their values (cache ids)
$ids = array_merge($movedTagSetIds, ...iterator_to_array($tagIdSets, false));
$lua = <<<'EOLUA'
local id = KEYS[1]
local cursor = table.remove(ARGV)
redis.call('SREM', '{'..id..'}'..id, unpack(ARGV))
// Delete cache in chunks to avoid overloading the connection
foreach (array_chunk(array_unique($ids), self::BULK_DELETE_LIMIT) as $chunkIds) {
$this->doDelete($chunkIds);
return redis.call('SSCAN', '{'..id..'}'..id, cursor, 'COUNT', 5000)
EOLUA;
foreach ($results as $id => [$cursor, $ids]) {
while ($ids || '0' !== $cursor) {
$this->doDelete($ids);
$evalArgs = [$id, $cursor];
array_splice($evalArgs, 1, 0, $ids);
if ($this->redis instanceof \Predis\ClientInterface) {
array_unshift($evalArgs, $lua, 1);
} else {
$evalArgs = [$lua, $evalArgs, 1];
}
$results = $this->pipeline(function () use ($evalArgs) {
yield 'eval' => $evalArgs;
});
foreach ($results as [$cursor, $ids]) {
// no-op
}
}
}
return true;
}
/**
* Renames several keys in order to be able to operate on them without risk of race conditions.
*
* Filters out keys that do not exist before returning new keys.
*
* @see https://redis.io/commands/rename
* @see https://redis.io/topics/cluster-spec#keys-hash-tags
*
* @return array Filtered list of the valid moved keys (only those that existed)
*/
private function renameKeys($redis, array $ids): array
{
$newIds = [];
$uniqueToken = bin2hex(random_bytes(10));
$results = $this->pipeline(static function () use ($ids, $uniqueToken) {
foreach ($ids as $id) {
yield 'rename' => [$id, '{'.$id.'}'.$uniqueToken];
}
}, $redis);
foreach ($results as $id => $result) {
if (true === $result || ($result instanceof Status && Status::get('OK') === $result)) {
// Only take into account if ok (key existed), will be false on phpredis if it did not exist
$newIds[] = '{'.$id.'}'.$uniqueToken;
}
}
return $newIds;
}
private function getRedisEvictionPolicy(): string
{
if (null !== $this->redisEvictionPolicy) {

View File

@ -19,6 +19,7 @@ use Symfony\Component\Cache\Adapter\AdapterInterface;
use Symfony\Component\Cache\Adapter\ArrayAdapter;
use Symfony\Component\Cache\Adapter\FilesystemAdapter;
use Symfony\Component\Cache\Adapter\TagAwareAdapter;
use Symfony\Component\Cache\LockRegistry;
use Symfony\Component\Cache\Tests\Fixtures\PrunableAdapter;
/**
@ -199,6 +200,8 @@ class TagAwareAdapterTest extends AdapterTestCase
public function testLog()
{
$lockFiles = LockRegistry::setFiles([__FILE__]);
$logger = $this->createMock(LoggerInterface::class);
$logger
->expects($this->atLeastOnce())
@ -209,6 +212,8 @@ class TagAwareAdapterTest extends AdapterTestCase
// Computing will produce at least one log
$cache->get('foo', static function (): string { return 'ccc'; });
LockRegistry::setFiles($lockFiles);
}
/**

View File

@ -18,6 +18,9 @@ class LockRegistryTest extends TestCase
{
public function testFiles()
{
if ('\\' === \DIRECTORY_SEPARATOR) {
$this->markTestSkipped('LockRegistry is disabled on Windows');
}
$lockFiles = LockRegistry::setFiles([]);
LockRegistry::setFiles($lockFiles);
$expected = array_map('realpath', glob(__DIR__.'/../Adapter/*'));

View File

@ -363,12 +363,6 @@ trait RedisTrait
protected function doClear($namespace)
{
$cleared = true;
if ($this->redis instanceof \Predis\ClientInterface) {
$evalArgs = [0, $namespace];
} else {
$evalArgs = [[$namespace], 0];
}
$hosts = $this->getHosts();
$host = reset($hosts);
if ($host instanceof \Predis\Client && $host->getConnection() instanceof ReplicationInterface) {
@ -385,17 +379,20 @@ trait RedisTrait
$info = $host->info('Server');
$info = $info['Server'] ?? $info;
$pattern = $namespace.'*';
if (!version_compare($info['redis_version'], '2.8', '>=')) {
// As documented in Redis documentation (http://redis.io/commands/keys) using KEYS
// can hang your server when it is executed against large databases (millions of items).
// Whenever you hit this scale, you should really consider upgrading to Redis 2.8 or above.
$cleared = $host->eval("local keys=redis.call('KEYS',ARGV[1]..'*') for i=1,#keys,5000 do redis.call('DEL',unpack(keys,i,math.min(i+4999,#keys))) end return 1", $evalArgs[0], $evalArgs[1]) && $cleared;
$args = $this->redis instanceof \Predis\ClientInterface ? [0, $pattern] : [[$pattern], 0];
$cleared = $host->eval("local keys=redis.call('KEYS',ARGV[1]) for i=1,#keys,5000 do redis.call('DEL',unpack(keys,i,math.min(i+4999,#keys))) end return 1", $args[0], $args[1]) && $cleared;
continue;
}
$cursor = null;
do {
$keys = $host instanceof \Predis\ClientInterface ? $host->scan($cursor, 'MATCH', $namespace.'*', 'COUNT', 1000) : $host->scan($cursor, $namespace.'*', 1000);
$keys = $host instanceof \Predis\ClientInterface ? $host->scan($cursor, 'MATCH', $pattern, 'COUNT', 1000) : $host->scan($cursor, $pattern, 1000);
if (isset($keys[1]) && \is_array($keys[1])) {
$cursor = $keys[0];
$keys = $keys[1];
@ -507,6 +504,11 @@ trait RedisTrait
$results = $redis->exec();
}
if (!$redis instanceof \Predis\ClientInterface && 'eval' === $command && $redis->getLastError()) {
$e = new \RedisException($redis->getLastError());
$results = array_map(function ($v) use ($e) { return false === $v ? $e : $v; }, $results);
}
foreach ($ids as $k => $id) {
yield $id => $results[$k];
}