diff --git a/plugins/StompQueue/README b/plugins/StompQueue/README new file mode 100644 index 0000000000..9685272ad3 --- /dev/null +++ b/plugins/StompQueue/README @@ -0,0 +1,42 @@ +StompQueuePlugin wraps the StompQueueManager class which is a queue manager +that uses STOMP as a communication method to some form of backing storage. + +Installation +============ + +This plugin is replaces other queue manager plugins, such as UnQueue, +which enabled by default and which should, but is not required to be +disabled. + +addPlugin('StompQueue', ['servers' => ['your-redis-instance-and-port'], + 'vhost' => 'your-vhost', + 'username' => 'your-username', + 'password' => 'your-password']); + +Options +======= + +servers (default: null) - array of server addresses to use +vhost (default: '') - configured vhost -- required +username (default: 'guest') -- configured username -- don't use the default +password (default: 'guest') -- configured password -- don't use the default +basename (default: "queue:gnusocial-{$site_name}") -- prefix for all queue names, +useful to avoid collisions. Cannot contain `/` +control (default: 'gnusocial:control') -- control channel name. Cannot contain `/` +breakout (default: null) -- array of queue names which should be broken out into a previously unused server +useTransactions (default: false) -- whether to use transactions, allowing rollbacks in case of failure +useAcks (default: false) -- whether to explicitly use acknowledgements when receiving a message. +Usefull to avoid timeouts and possibly reduce load on the STOMP server +manualFailover (default: false) -- whether to coordinate failover in PHP or to let all servers act +as one coordinated unit +defaultIdx (default: 0) -- index in the servers array which is used by default. Will be updated in case of an error +persistent (default: []) -- list of queues which should be persistent + +Example +======= + +In config.php + +addPlugin('StompQueue', ['servers' => 'tcp://localhost:61613', 'vhost' => '/', + // Please don't actually use the default credentials + 'username' => 'guest', 'password' => 'guest']); \ No newline at end of file diff --git a/plugins/StompQueue/StompQueuePlugin.php b/plugins/StompQueue/StompQueuePlugin.php index 483ccb4e9e..4038e223e8 100644 --- a/plugins/StompQueue/StompQueuePlugin.php +++ b/plugins/StompQueue/StompQueuePlugin.php @@ -36,7 +36,7 @@ class StompQueuePlugin extends Plugin public $password = 'guest'; public $basename = ''; public $control = 'gnusocial:control'; - public $breakout; + public $breakout = []; public $useTransactions = false; public $useAcks = false; public $manualFailover = false; @@ -62,7 +62,7 @@ class StompQueuePlugin extends Plugin public function onPluginVersion(array &$versions): bool { $versions[] = array('name' => 'StompQueue', - 'version' => self::VERSION, + 'version' => self::PLUGIN_VERSION, 'author' => 'Miguel Dantas', 'description' => // TRANS: Plugin description. diff --git a/plugins/StompQueue/classes/StompQueueManager.php b/plugins/StompQueue/classes/StompQueueManager.php index ab29345912..300e141ef4 100644 --- a/plugins/StompQueue/classes/StompQueueManager.php +++ b/plugins/StompQueue/classes/StompQueueManager.php @@ -35,7 +35,6 @@ class StompQueueManager extends QueueManager public function start($master) { parent::start($master); - common_debug("Starting STOMP queue manager"); return $this->_ensureConn(); } @@ -172,6 +171,7 @@ class StompQueueManager extends QueueManager */ protected function subscribe(StatefulStomp $st) { + $this->_ensureConn(); $host = $st->getClient()->getConnection()->getHost(); foreach ($this->subscriptions() as $sub) { if (!in_array($sub, $this->subscriptions)) { @@ -293,40 +293,19 @@ class StompQueueManager extends QueueManager } /** - * Send any sockets we're listening on to the IO manager - * to wait for input. + * Poll every 10 seconds for new events during idle periods. + * We'll look in more often when there's data available. + * Must be greater than 0 for the poll method to be called * - * @return array of resources + * @return int seconds */ - public function getSockets() + public function pollInterval() { - $sockets = []; - foreach ($this->stomps as $st) { - if ($st) { - $sockets[] = $st->getClient()->getConnection(); - } - } - return $sockets; + return 10; } /** - * Get the Stomp connection object associated with the given socket. - * @param resource $socket - * @return int index into connections list - * @throws Exception - */ - protected function connectionFromSocket($socket) - { - foreach ($this->stomps as $i => $st) { - if ($st && $st->getConnection() === $socket) { - return $i; - } - } - throw new Exception(__CLASS__ . " asked to read from unrecognized socket"); - } - - /** - * Handle and acknowledge an event that's come in through a queue. + * Poll a queue and Handle an event * * If the queue handler reports failure, the message is requeued for later. * Missing notices or handler classes will drop the message. @@ -334,15 +313,22 @@ class StompQueueManager extends QueueManager * Side effects: in multi-site mode, may reset site configuration to * match the site that queued the event. * - * @param Frame $frame * @return bool success * @throws ConfigException * @throws NoConfigException * @throws ServerException * @throws StompException */ - protected function handleItem($frame): bool + public function poll(): bool { + $this->_ensureConn(); + + $frame = $this->stomps[$this->pl->defaultIdx]->getClient()->readFrame(); + + if (empty($frame)) { + return false; + } + $host = $this->stomps[$this->pl->defaultIdx]->getHost(); $message = unserialize(base64_decode($frame->body)); @@ -396,7 +382,7 @@ class StompQueueManager extends QueueManager $this->stats('requeued', $queue); } - return $ok; + return false; } /** @@ -584,7 +570,7 @@ class StompQueueManager extends QueueManager * @throws NoConfigException * @throws ServerException */ - function switchSite($site) + private function switchSite(string $site): void { if ($site != GNUsocial::currentSite()) { $this->stats('switch'); @@ -601,12 +587,12 @@ class StompQueueManager extends QueueManager * files might not. * * @param $nickname - * @return void true to continue; false to stop further processing. + * @return bool true to continue; false to stop further processing. * @throws ConfigException * @throws NoConfigException * @throws ServerException */ - protected function updateSiteConfig($nickname) + protected function updateSiteConfig($nickname): bool { $sn = Status_network::getKV('nickname', $nickname); if ($sn) {