diff --git a/lib/queue/queuemanager.php b/lib/queue/queuemanager.php index 5158086a0b..49c56ad09b 100644 --- a/lib/queue/queuemanager.php +++ b/lib/queue/queuemanager.php @@ -164,7 +164,7 @@ abstract class QueueManager extends IoManager * @param mixed $item * @return string */ - protected function encode($item) + protected function encode($item): string { return serialize($item); } @@ -176,7 +176,7 @@ abstract class QueueManager extends IoManager * @param string * @return mixed */ - protected function decode($frame) + protected function decode(string $frame) { $object = unserialize($frame); diff --git a/plugins/RedisQueue/README b/plugins/RedisQueue/README new file mode 100644 index 0000000000..655b393044 --- /dev/null +++ b/plugins/RedisQueue/README @@ -0,0 +1,18 @@ +RedisQueuePlugin wraps the RedisQueueManager class which is a queue manager +that uses Redis as it's backing storage. + +Installation +============ + +This plugin is replaces other queue manager plugins, such as UnQueue, +which enabled by default and which should, but is not required to be +disabled. + +addPlugin('RedisQueue', ['server' => 'your-redis-instance-and-port']); + +Example +======= + +In config.php + +addPlugin('RedisQueue', ['server' => 'tcp://localhost:6379']); \ No newline at end of file diff --git a/plugins/RedisQueue/RedisQueuePlugin.php b/plugins/RedisQueue/RedisQueuePlugin.php new file mode 100644 index 0000000000..6437a08eb6 --- /dev/null +++ b/plugins/RedisQueue/RedisQueuePlugin.php @@ -0,0 +1,51 @@ +. + +/** + * Redis interface for GNU social queues + * + * @package GNUsocial + * @author Miguel Dantas + * @copyright 2019 Free Software Foundation, Inc http://www.fsf.org + * @license https://www.gnu.org/licenses/agpl.html GNU AGPL v3 or later + */ + +defined('GNUSOCIAL') || die(); + +class RedisQueuePlugin extends Plugin +{ + const PLUGIN_VERSION = '0.0.1'; + + // settings which can be set in config.php with addPlugin('RedisQueue', ['param'=>'value', ...]); + public $server = null; + + public function onStartNewQueueManager(?QueueManager &$qm) + { + $qm = new RedisQueueManager($this->server); + return false; + } + + public function onPluginVersion(array &$versions): bool + { + $versions[] = array('name' => 'RedisQueue', + 'version' => self::PLUGIN_VERSION, + 'author' => 'Miguel Dantas', + 'description' => + // TRANS: Plugin description. + _m('Plugin implementing Redis as a backend for GNU social queues')); + return true; + } +}; diff --git a/plugins/RedisQueue/classes/RedisQueueManager.php b/plugins/RedisQueue/classes/RedisQueueManager.php new file mode 100644 index 0000000000..2eaa7fb408 --- /dev/null +++ b/plugins/RedisQueue/classes/RedisQueueManager.php @@ -0,0 +1,66 @@ +server = $server; + $this->queue = 'gnusocial:' . common_config('site', 'name'); + } + + private function _ensureConn() + { + if ($this->client === null) { + $this->client = new Client($this->server); + } + } + + public function pollInterval() + { + return 10; + } + + public function enqueue($object, $queue) + { + $this->_ensureConn(); + $ret = $this->client->rpush($this->queue, $this->encode([$queue, $object])); + if (empty($ret)) { + common_log(LOG_ERR, "Unable to insert object into Redis queue {$queue}"); + } else { + common_debug("The Redis queue for {$queue} has length {$ret}"); + } + } + + public function poll() + { + common_debug("STARTING POLL"); + try { + $this->_ensureConn(); + $ret = $this->client->lpop($this->queue); + if (!empty($ret)) { + list($queue, $object) = $this->decode($ret); + } else { + return false; + } + } catch (Exception $e) { + $this->_log(LOG_INFO, "[Queue {$queue}] Discarding: " . _ve($e->getMessage())); + return false; + } + + try { + $handler = $this->getHandler($queue); + $handler->handle($object); + common_debug("Redis Queue handled item from {$queue} queue"); + return true; + } catch (Exception $e) { + $this->_log(LOG_ERR, "[Queue: {$queue}] `" . get_class($e) . '` thrown: ' . _ve($e->getMessage())); + return false; + } + } +};