From 31ad75564f26dfec803255ced859372a085c5c99 Mon Sep 17 00:00:00 2001 From: Hugo Sales Date: Thu, 16 Jul 2020 00:04:25 +0000 Subject: [PATCH] [CACHE][Redis] Add special support for redis (fixed size lists), set method and general fixes --- src/Core/Cache.php | 125 +++++++++++++++++++++++++++++++++------------ 1 file changed, 92 insertions(+), 33 deletions(-) diff --git a/src/Core/Cache.php b/src/Core/Cache.php index 7591b44e34..ea2915db69 100644 --- a/src/Core/Cache.php +++ b/src/Core/Cache.php @@ -1,6 +1,7 @@ . + // }}} namespace App\Core; use Functional as F; -use Symfony\Component\Cache\Adapter\AbstractAdapter; use Symfony\Component\Cache\Adapter\ChainAdapter; abstract class Cache { - protected static AbstractAdapter $pools; + protected static $pools; + protected static $redis; private static string $ENV_VAR = 'SOCIAL_CACHE_ADAPTER'; /** @@ -39,29 +41,57 @@ abstract class Cache die("A cache adapter is needed in the environment variable {$ENV_VAR}"); } + self::$pools = []; + self::$redis = []; + $adapters = []; foreach (explode(';', $_ENV[self::$ENV_VAR]) as $a) { - list($pool, $val) = explode('=', $a); + list($pool, $val) = explode('=', $a); + self::$pools[$pool] = []; + self::$redis[$pool] = []; foreach (explode(',', $val) as $dsn) { list($scheme, $rest) = explode('://', $dsn); - + $partial_to_dsn = function ($r) use ($scheme) { return $scheme . '://' . $r; }; switch ($scheme) { + case 'redis': + // Redis can have multiple servers, but we want to take proper advantage of + // redis, not just as a key value store, but using it's datastructures + $dsns = F\map(explode(',', $rest), + function ($r) use ($partial_to_dsn) { + // May be a Unix socket + return $r[0] == '/' ? $r : $partial_to_dsn($r); + }); + if (count($dsns) === 1) { + $class = \Redis; + $r = new \Redis(array_pop($dsns)); + } else { + $class = \RedisCluster; + $r = new \RedisCluster($dsns); + } + // Distribute reads randomly + $r->setOption($class::OPT_SLAVE_FAILOVER, $class::FAILOVER_DISTRIBUTE); + // Improved serializer + $r->setOption($class::OPT_SERIALIZER, $class::SERIALIZER_MSGPACK); + // Persistent connection + $r->setOption($class::OPT_TCP_KEEPALIVE, true); + // Use LZ4 for the improved decompression speed (while keeping an okay compression ratio) + $r->setOption($class::OPT_COMPRESSION, $class::COMPRESSION_LZ4); + self::$redis[$pool][] = $r; + $adapters[$pool][] = new Adapter\RedisAdapter($r); + break; case 'memcached': - // memcached can have multiple servers - $dsn = F\map(explode(',', $rest), function ($h) use ($scheme) { return $scheme . $h; }); - $adapters[$pool][] = new Adapter\MemcachedAdapter($dsn); + // memcached can also have multiple servers + $dsns = F\map(explode(',', $rest), $partial_to_dsn); + $adapters[$pool][] = new Adapter\MemcachedAdapter($dsns); break; case 'filesystem': $adapters[$pool][] = new Adapter\FilesystemAdapter($rest); break; - case 'redis': - $adapters[$pool][] = new Adapter\RedisAdapter($dsn); - break; case 'apcu': $adapters[$pool][] = new Adapter\ApcuAdapter(); break; case 'opcache': - $adapters[$pool][] = new Adapter\PhpArrayAdapter(stream_get_meta_data(tmpfile())['uri'], new FilesystemAdapter()); + $adapters[$pool][] = new Adapter\PhpArrayAdapter($rest, new FilesystemAdapter($rest . '.fallback')); break; case 'doctrine': $adapters[$pool][] = new Adapter\PdoAdapter($dsn); @@ -71,6 +101,7 @@ abstract class Cache return; } } + if (count($adapters[$pool]) === 1) { self::$pools[$pool] = array_pop($adapters[$pool]); } else { @@ -79,41 +110,69 @@ abstract class Cache } } + public static function set(string $key, mixed $value, string $pool = 'default') + { + // there's no set method, must be done this way + return self::$pools[$pool]->get($key, function ($i) use ($value) { return $value; }, INF); + } + public static function get(string $key, callable $calculate, string $pool = 'default', float $beta = 1.0) { - if (self::$pools[$pool] instanceof AbstractAdapter) { - return self::$pool->get($key, $calculate, $beta); + return self::$pools[$pool]->get($key, $calculate, $beta); + } + + public static function delete(string $key, string $pool = 'default'): bool + { + return self::$pools[$pool]->delete($key); + } + + public static function getList(string $key, callable $calculate, string $pool = 'default', int $max_count = 64, float $beta = 1.0): SplFixedArray + { + if (isset(self::$redis[$pool])) { + return SplFixedArray::fromArray(self::$redis[$pool]->lRange($key, 0, -1)); } else { + $keys = self::getKeyList($key, $max_count, $beta); + $list = new SplFixedArray($keys->count()); + foreach ($keys as $k) { + $list[] = self::get($k, $calculate, $pool, $beta); + } + return $list; } } - public static function delete(string $key): bool + public static function pushList(string $key, mixed $value, string $pool = 'default', int $max_count = 64, float $beta = 1.0): void { - return self::$pool->delete($key); - } - - public static function getList(string $key, callable $calculate, int $max_count = 64, float $beta = 1.0): SplFixedArray - { - $keys = self::getKeyList($key, $max_count, $beta); - - $list = new SplFixedArray($keys->count()); - foreach ($keys as $k) { - $list[] = self::get($k, $calculate, $beta); + if (isset(self::$redis[$pool])) { + self::$redis[$pool] + // doesn't need to be atomic, adding at one end, deleting at the other + ->multi(Redis::PIPELINE) + ->lPush($key, $value) + // trim to $max_count, unless it's 0 + ->lTrim($key, 0, $max_count != 0 ? $max_count : -1) + ->exec(); + } else { + $keys = self::getKeyList($key, $max_count, $beta); + $vkey = $key . ':' . count($keys); + self::set($vkey, $value); + $keys[] = $vkey; + self::set($key, $keys); } - - return $list; } - public static function deleteList(string $key, int $count = 0) + public static function deleteList(string $key, string $pool = 'default'): bool { - $keys = self::getKeyList($key, $max_count, $beta); - if (!F\every($keys, function ($k) { return self::delete($k); })) { - Log::warning("Some element of the list associated with {$key} was not deleted. There may be some memory leakage in the cache process"); + if (isset(self::$redis[$pool])) { + return self::$redis[$pool]->del($key) == 1; + } else { + $keys = self::getKeyList($key, $max_count, $beta); + if (!F\every($keys, function ($k) use ($pool) { return self::delete($k, $pool); })) { + Log::warning("Some element of the list associated with {$key} was not deleted. There may be some memory leakage in the cache process"); + } + return self::delete($key, $pool); } - return self::delete($key); } - private static function getKeyList(string $key, int $max_count, float $beta): array + private static function getKeyList(string $key, int $max_count, string $pool, float $beta): RingBuffer { // Get the current keys associated with a list. If the cache // is not primed, the function is called and returns an empty @@ -121,6 +180,6 @@ abstract class Cache return self::get($key, function (ItemInterface $i) use ($max_count) { return new RingBuffer($max_count); - }, $beta); + }, $pool, $beta); } }