diff --git a/lib/queue/liberalstomp.php b/lib/queue/liberalstomp.php deleted file mode 100644 index 70c22c17e6..0000000000 --- a/lib/queue/liberalstomp.php +++ /dev/null @@ -1,176 +0,0 @@ - - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -class LiberalStomp extends Stomp -{ - /** - * We need to be able to get the socket so advanced daemons can - * do a select() waiting for input both from the queue and from - * other sources such as an XMPP connection. - * - * @return resource - */ - function getSocket() - { - return $this->_socket; - } - - /** - * Return the host we're currently connected to. - * - * @return string - */ - function getServer() - { - $idx = $this->_currentHost; - if ($idx >= 0) { - $host = $this->_hosts[$idx]; - return "$host[0]:$host[1]"; - } else { - return '[unconnected]'; - } - } - - /** - * Make socket connection to the server - * We also set the stream to non-blocking mode, since we'll be - * select'ing to wait for updates. In blocking mode it seems - * to get confused sometimes. - * - * @throws StompException - */ - protected function _makeConnection () - { - parent::_makeConnection(); - stream_set_blocking($this->_socket, 0); - } - - /** - * Version 1.0.0 of the Stomp library gets confused if messages - * come in too fast over the connection. This version will read - * out as many frames as are ready to be read from the socket. - * - * Modified from Stomp::readFrame() - * - * @return StompFrame False when no frame to read - */ - public function readFrames () - { - if (!$this->hasFrameToRead()) { - return false; - } - - $rb = 1024; - $data = ''; - $end = false; - $frames = array(); - - do { - // @fixme this sometimes hangs in blocking mode... - // shouldn't we have been idle until we found there's more data? - $read = fread($this->_socket, $rb); - if ($read === false || ($read === '' && feof($this->_socket))) { - // @fixme possibly attempt an auto reconnect as old code? - throw new StompException("Error reading"); - //$this->_reconnect(); - // @fixme this will lose prior items - //return $this->readFrames(); - } - $data .= $read; - if (strpos($data, "\x00") !== false) { - // Frames are null-delimited, but some servers - // may append an extra \n according to old bug reports. - $data = str_replace("\x00\n", "\x00", $data); - $chunks = explode("\x00", $data); - - $data = array_pop($chunks); - $frames = array_merge($frames, $chunks); - if ($data == '') { - // We're at the end of a frame; stop reading. - break; - } else { - // In the middle of a frame; keep going. - } - } - // @fixme find out why this len < 2 check was there - //$len = strlen($data); - } while (true);//$len < 2 || $end == false); - - return array_map(array($this, 'parseFrame'), $frames); - } - - /** - * Parse a raw Stomp frame into an object. - * Extracted from Stomp::readFrame() - * - * @param string $data - * @return StompFrame - */ - function parseFrame($data) - { - list ($header, $body) = explode("\n\n", $data, 2); - $header = explode("\n", $header); - $headers = array(); - $command = null; - foreach ($header as $v) { - if (isset($command)) { - list ($name, $value) = explode(':', $v, 2); - $headers[$name] = $value; - } else { - $command = $v; - } - } - $frame = new StompFrame($command, $headers, trim($body)); - if (isset($frame->headers['transformation']) && $frame->headers['transformation'] == 'jms-map-json') { - require_once 'Stomp/Message/Map.php'; - return new StompMessageMap($frame); - } else { - return $frame; - } - return $frame; - } - - /** - * Write frame to server - * - * @param StompFrame $stompFrame - */ - protected function _writeFrame (StompFrame $stompFrame) - { - if (!is_resource($this->_socket)) { - require_once 'Stomp/Exception.php'; - throw new StompException('Socket connection hasn\'t been established'); - } - - $data = $stompFrame->__toString(); - - // Make sure the socket's in a writable state; if not, wait a bit. - stream_set_blocking($this->_socket, 1); - - $r = fwrite($this->_socket, $data, strlen($data)); - stream_set_blocking($this->_socket, 0); - if ($r === false || $r == 0) { - $this->_reconnect(); - $this->_writeFrame($stompFrame); - } - } - } - diff --git a/lib/queue/queuemanager.php b/lib/queue/queuemanager.php index 5158086a0b..e73211a08b 100644 --- a/lib/queue/queuemanager.php +++ b/lib/queue/queuemanager.php @@ -59,25 +59,9 @@ abstract class QueueManager extends IoManager if (empty(self::$qm)) { if (Event::handle('StartNewQueueManager', array(&self::$qm))) { - - $enabled = common_config('queue', 'enabled'); - $type = common_config('queue', 'subsystem'); - - if (!$enabled) { - // does everything immediately - self::$qm = new UnQueueManager(); - } else { - switch ($type) { - case 'db': - self::$qm = new DBQueueManager(); - break; - case 'stomp': - self::$qm = new StompQueueManager(); - break; - default: - throw new ServerException("No queue manager class for type '$type'"); - } - } + common_log(LOG_ERR, 'Some form of queue manager must be active' . + '(UnQueue does everything immediately and is the default)'); + throw new ServerException('Some form of queue manager must be active'); } } @@ -164,7 +148,7 @@ abstract class QueueManager extends IoManager * @param mixed $item * @return string */ - protected function encode($item) + protected function encode($item): string { return serialize($item); } @@ -176,7 +160,7 @@ abstract class QueueManager extends IoManager * @param string * @return mixed */ - protected function decode($frame) + protected function decode(string $frame) { $object = unserialize($frame); diff --git a/lib/queue/stompqueuemanager.php b/lib/queue/stompqueuemanager.php deleted file mode 100644 index b1afe176a1..0000000000 --- a/lib/queue/stompqueuemanager.php +++ /dev/null @@ -1,762 +0,0 @@ -. - * - * @category QueueManager - * @package StatusNet - * @author Evan Prodromou - * @author Sarven Capadisli - * @copyright 2009 StatusNet, Inc. - * @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0 - * @link http://status.net/ - */ - -require_once 'Stomp.php'; -require_once 'Stomp/Exception.php'; - -class StompQueueManager extends QueueManager -{ - protected $servers; - protected $username; - protected $password; - protected $base; - protected $control; - - protected $useTransactions; - protected $useAcks; - - protected $sites = array(); - protected $subscriptions = array(); - - protected $cons = array(); // all open connections - protected $disconnect = array(); - protected $transaction = array(); - protected $transactionCount = array(); - protected $defaultIdx = 0; - - function __construct() - { - parent::__construct(); - $server = common_config('queue', 'stomp_server'); - if (is_array($server)) { - $this->servers = $server; - } else { - $this->servers = array($server); - } - $this->username = common_config('queue', 'stomp_username'); - $this->password = common_config('queue', 'stomp_password'); - $this->base = common_config('queue', 'queue_basename'); - $this->control = common_config('queue', 'control_channel'); - $this->breakout = common_config('queue', 'breakout'); - $this->useTransactions = common_config('queue', 'stomp_transactions'); - $this->useAcks = common_config('queue', 'stomp_acks'); - } - - /** - * Tell the i/o master we only need a single instance to cover - * all sites running in this process. - */ - public static function multiSite() - { - return IoManager::INSTANCE_PER_PROCESS; - } - - /** - * Optional; ping any running queue handler daemons with a notification - * such as announcing a new site to handle or requesting clean shutdown. - * This avoids having to restart all the daemons manually to update configs - * and such. - * - * Currently only relevant for multi-site queue managers such as Stomp. - * - * @param string $event event key - * @param string $param optional parameter to append to key - * @return boolean success - */ - public function sendControlSignal($event, $param='') - { - $message = $event; - if ($param != '') { - $message .= ':' . $param; - } - $this->_connect(); - $con = $this->cons[$this->defaultIdx]; - $result = $con->send($this->control, - $message, - array ('created' => common_sql_now())); - if ($result) { - $this->_log(LOG_INFO, "Sent control ping to queue daemons: $message"); - return true; - } else { - $this->_log(LOG_ERR, "Failed sending control ping to queue daemons: $message"); - return false; - } - } - - /** - * Saves an object into the queue item table. - * - * @param mixed $object - * @param string $queue - * @param string $siteNickname optional override to drop into another site's queue - * - * @return boolean true on success - * @throws StompException on connection or send error - */ - public function enqueue($object, $queue, $siteNickname=null) - { - $this->_connect(); - if (common_config('queue', 'stomp_enqueue_on')) { - // We're trying to force all writes to a single server. - // WARNING: this might do odd things if that server connection dies. - $idx = array_search(common_config('queue', 'stomp_enqueue_on'), - $this->servers); - if ($idx === false) { - common_log(LOG_ERR, 'queue stomp_enqueue_on setting does not match our server list.'); - $idx = $this->defaultIdx; - } - } else { - $idx = $this->defaultIdx; - } - return $this->_doEnqueue($object, $queue, $idx, $siteNickname); - } - - /** - * Saves a notice object reference into the queue item table - * on the given connection. - * - * @return boolean true on success - * @throws StompException on connection or send error - */ - protected function _doEnqueue($object, $queue, $idx, $siteNickname=null) - { - $rep = $this->logrep($object); - $envelope = array('site' => $siteNickname ? $siteNickname : common_config('site', 'nickname'), - 'handler' => $queue, - 'payload' => $this->encode($object)); - $msg = base64_encode(serialize($envelope)); - - $props = array('created' => common_sql_now()); - if ($this->isPersistent($queue)) { - $props['persistent'] = 'true'; - } - - $con = $this->cons[$idx]; - $host = $con->getServer(); - $target = $this->queueName($queue); - $result = $con->send($target, $msg, $props); - - if (!$result) { - $this->_log(LOG_ERR, "Error sending $rep to $queue queue on $host $target"); - return false; - } - - $this->_log(LOG_DEBUG, "complete remote queueing $rep for $queue on $host $target"); - $this->stats('enqueued', $queue); - return true; - } - - /** - * Determine whether messages to this queue should be marked as persistent. - * Actual persistent storage depends on the queue server's configuration. - * @param string $queue - * @return bool - */ - protected function isPersistent($queue) - { - $mode = common_config('queue', 'stomp_persistent'); - if (is_array($mode)) { - return in_array($queue, $mode); - } else { - return (bool)$mode; - } - } - - /** - * Send any sockets we're listening on to the IO manager - * to wait for input. - * - * @return array of resources - */ - public function getSockets() - { - $sockets = array(); - foreach ($this->cons as $con) { - if ($con) { - $sockets[] = $con->getSocket(); - } - } - return $sockets; - } - - /** - * Get the Stomp connection object associated with the given socket. - * @param resource $socket - * @return int index into connections list - * @throws Exception - */ - protected function connectionFromSocket($socket) - { - foreach ($this->cons as $i => $con) { - if ($con && $con->getSocket() === $socket) { - return $i; - } - } - throw new Exception(__CLASS__ . " asked to read from unrecognized socket"); - } - - /** - * We've got input to handle on our socket! - * Read any waiting Stomp frame(s) and process them. - * - * @param resource $socket - * @return boolean ok on success - */ - public function handleInput($socket) - { - $idx = $this->connectionFromSocket($socket); - $con = $this->cons[$idx]; - $host = $con->getServer(); - $this->defaultIdx = $idx; - - $ok = true; - try { - $frames = $con->readFrames(); - } catch (StompException $e) { - $this->_log(LOG_ERR, "Lost connection to $host: " . $e->getMessage()); - fclose($socket); // ??? - $this->cons[$idx] = null; - $this->transaction[$idx] = null; - $this->disconnect[$idx] = time(); - return false; - } - foreach ($frames as $frame) { - $dest = $frame->headers['destination']; - if ($dest == $this->control) { - if (!$this->handleControlSignal($frame)) { - // We got a control event that requests a shutdown; - // close out and stop handling anything else! - break; - } - } else { - $ok = $this->handleItem($frame) && $ok; - } - $this->ack($idx, $frame); - $this->commit($idx); - $this->begin($idx); - } - return $ok; - } - - /** - * Attempt to reconnect in background if we lost a connection. - */ - function idle() - { - $now = time(); - foreach ($this->cons as $idx => $con) { - if (empty($con)) { - $age = $now - $this->disconnect[$idx]; - if ($age >= 60) { - $this->_reconnect($idx); - } - } - } - return true; - } - - /** - * Initialize our connection and subscribe to all the queues - * we're going to need to handle... If multiple queue servers - * are configured for failover, we'll listen to all of them. - * - * Side effects: in multi-site mode, may reset site configuration. - * - * @param IoMaster $master process/event controller - * @return bool return false on failure - */ - public function start($master) - { - parent::start($master); - $this->_connectAll(); - - foreach ($this->cons as $i => $con) { - if ($con) { - $this->doSubscribe($con); - $this->begin($i); - } - } - return true; - } - - /** - * Close out any active connections. - * - * @return bool return false on failure - */ - public function finish() - { - // If there are any outstanding delivered messages we haven't processed, - // free them for another thread to take. - foreach ($this->cons as $i => $con) { - if ($con) { - $this->rollback($i); - $con->disconnect(); - $this->cons[$i] = null; - } - } - return true; - } - - /** - * Lazy open a single connection to Stomp queue server. - * If multiple servers are configured, we let the Stomp client library - * worry about finding a working connection among them. - */ - protected function _connect() - { - if (empty($this->cons)) { - $list = $this->servers; - if (count($list) > 1) { - shuffle($list); // Randomize to spread load - $url = 'failover://(' . implode(',', $list) . ')'; - } else { - $url = $list[0]; - } - $con = $this->_doConnect($url); - $this->cons = array($con); - $this->transactionCount = array(0); - $this->transaction = array(null); - $this->disconnect = array(null); - } - } - - /** - * Lazy open connections to all Stomp servers, if in manual failover - * mode. This means the queue servers don't speak to each other, so - * we have to listen to all of them to make sure we get all events. - */ - protected function _connectAll() - { - if (!common_config('queue', 'stomp_manual_failover')) { - return $this->_connect(); - } - if (empty($this->cons)) { - $this->cons = array(); - $this->transactionCount = array(); - $this->transaction = array(); - foreach ($this->servers as $idx => $server) { - try { - $this->cons[] = $this->_doConnect($server); - $this->disconnect[] = null; - } catch (Exception $e) { - // s'okay, we'll live - $this->cons[] = null; - $this->disconnect[] = time(); - } - $this->transactionCount[] = 0; - $this->transaction[] = null; - } - if (empty($this->cons)) { - throw new ServerException("No queue servers reachable..."); - return false; - } - } - } - - /** - * Attempt to manually reconnect to the Stomp server for the given - * slot. If successful, set up our subscriptions on it. - */ - protected function _reconnect($idx) - { - try { - $con = $this->_doConnect($this->servers[$idx]); - } catch (Exception $e) { - $this->_log(LOG_ERR, $e->getMessage()); - $con = null; - } - if ($con) { - $this->cons[$idx] = $con; - $this->disconnect[$idx] = null; - - $this->doSubscribe($con); - $this->begin($idx); - } else { - // Try again later... - $this->disconnect[$idx] = time(); - } - } - - protected function _doConnect($server) - { - $this->_log(LOG_INFO, "Connecting to '$server' as '$this->username'..."); - $con = new LiberalStomp($server); - - if ($con->connect($this->username, $this->password)) { - $this->_log(LOG_INFO, "Connected."); - } else { - $this->_log(LOG_ERR, 'Failed to connect to queue server'); - throw new ServerException('Failed to connect to queue server'); - } - - return $con; - } - - /** - * Set up all our raw queue subscriptions on the given connection - * @param LiberalStomp $con - */ - protected function doSubscribe(LiberalStomp $con) - { - $host = $con->getServer(); - foreach ($this->subscriptions() as $sub) { - $this->_log(LOG_INFO, "Subscribing to $sub on $host"); - $con->subscribe($sub); - } - } - - /** - * Grab a full list of stomp-side queue subscriptions. - * Will include: - * - control broadcast channel - * - shared group queues for active groups - * - per-handler and per-site breakouts from $config['queue']['breakout'] - * that are rooted in the active groups. - * - * @return array of strings - */ - protected function subscriptions() - { - $subs = array(); - $subs[] = $this->control; - - foreach ($this->activeGroups as $group) { - $subs[] = $this->base . $group; - } - - foreach ($this->breakout as $spec) { - $parts = explode('/', $spec); - if (count($parts) < 2 || count($parts) > 3) { - common_log(LOG_ERR, "Bad queue breakout specifier $spec"); - } - if (in_array($parts[0], $this->activeGroups)) { - $subs[] = $this->base . $spec; - } - } - return array_unique($subs); - } - - /** - * Handle and acknowledge an event that's come in through a queue. - * - * If the queue handler reports failure, the message is requeued for later. - * Missing notices or handler classes will drop the message. - * - * Side effects: in multi-site mode, may reset site configuration to - * match the site that queued the event. - * - * @param StompFrame $frame - * @return bool success - */ - protected function handleItem($frame) - { - $host = $this->cons[$this->defaultIdx]->getServer(); - $message = unserialize(base64_decode($frame->body)); - - if ($message === false) { - $this->_log(LOG_ERR, "Can't unserialize frame: {$frame->body}"); - $this->_log(LOG_ERR, "Unserializable frame length: " . strlen($frame->body)); - return false; - } - - $site = $message['site']; - $queue = $message['handler']; - - if ($this->isDeadLetter($frame, $message)) { - $this->stats('deadletter', $queue); - return false; - } - - // @fixme detect failing site switches - $this->switchSite($site); - - try { - $item = $this->decode($message['payload']); - } catch (Exception $e) { - $this->_log(LOG_ERR, "Skipping empty or deleted item in queue $queue from $host"); - $this->stats('baditem', $queue); - return false; - } - $info = $this->logrep($item) . " posted at " . - $frame->headers['created'] . " in queue $queue from $host"; - $this->_log(LOG_DEBUG, "Dequeued $info"); - - try { - $handler = $this->getHandler($queue); - $ok = $handler->handle($item); - } catch (NoQueueHandlerException $e) { - $this->_log(LOG_ERR, "Missing handler class; skipping $info"); - $this->stats('badhandler', $queue); - return false; - } catch (Exception $e) { - $this->_log(LOG_ERR, "Exception on queue $queue: " . $e->getMessage()); - $ok = false; - } - - if ($ok) { - $this->_log(LOG_INFO, "Successfully handled $info"); - $this->stats('handled', $queue); - } else { - $this->_log(LOG_WARNING, "Failed handling $info"); - // Requeing moves the item to the end of the line for its next try. - // @fixme add a manual retry count - $this->enqueue($item, $queue); - $this->stats('requeued', $queue); - } - - return $ok; - } - - /** - * Check if a redelivered message has been run through enough - * that we're going to give up on it. - * - * @param StompFrame $frame - * @param array $message unserialized message body - * @return boolean true if we should discard - */ - protected function isDeadLetter($frame, $message) - { - if (isset($frame->headers['redelivered']) && $frame->headers['redelivered'] == 'true') { - // Message was redelivered, possibly indicating a previous failure. - $msgId = $frame->headers['message-id']; - $site = $message['site']; - $queue = $message['handler']; - $msgInfo = "message $msgId for $site in queue $queue"; - - $deliveries = $this->incDeliveryCount($msgId); - if ($deliveries > common_config('queue', 'max_retries')) { - $info = "DEAD-LETTER FILE: Gave up after retry $deliveries on $msgInfo"; - - $outdir = common_config('queue', 'dead_letter_dir'); - if ($outdir) { - $filename = $outdir . "/$site-$queue-" . rawurlencode($msgId); - $info .= ": dumping to $filename"; - file_put_contents($filename, $message['payload']); - } - - common_log(LOG_ERR, $info); - return true; - } else { - common_log(LOG_INFO, "retry $deliveries on $msgInfo"); - } - } - return false; - } - - /** - * Update count of times we've re-encountered this message recently, - * triggered when we get a message marked as 'redelivered'. - * - * Requires a CLI-friendly cache configuration. - * - * @param string $msgId message-id header from message - * @return int number of retries recorded - */ - function incDeliveryCount($msgId) - { - $count = 0; - $cache = Cache::instance(); - if ($cache) { - $key = 'statusnet:stomp:message-retries:' . $msgId; - $count = $cache->increment($key); - if (!$count) { - $count = 1; - $cache->set($key, $count, null, 3600); - $got = $cache->get($key); - } - } - return $count; - } - - /** - * Process a control signal broadcast. - * - * @param int $idx connection index - * @param array $frame Stomp frame - * @return bool true to continue; false to stop further processing. - */ - protected function handleControlSignal($idx, $frame) - { - $message = trim($frame->body); - if (strpos($message, ':') !== false) { - list($event, $param) = explode(':', $message, 2); - } else { - $event = $message; - $param = ''; - } - - $shutdown = false; - - if ($event == 'shutdown') { - $this->master->requestShutdown(); - $shutdown = true; - } else if ($event == 'restart') { - $this->master->requestRestart(); - $shutdown = true; - } else if ($event == 'update') { - $this->updateSiteConfig($param); - } else { - $this->_log(LOG_ERR, "Ignoring unrecognized control message: $message"); - } - return $shutdown; - } - - /** - * Switch site, if necessary, and reset current handler assignments - * @param string $site - */ - function switchSite($site) - { - if ($site != GNUsocial::currentSite()) { - $this->stats('switch'); - GNUsocial::switchSite($site); - $this->initialize(); - } - } - - /** - * (Re)load runtime configuration for a given site by nickname, - * triggered by a broadcast to the 'statusnet-control' topic. - * - * Configuration changes in database should update, but config - * files might not. - * - * @param array $frame Stomp frame - * @return bool true to continue; false to stop further processing. - */ - protected function updateSiteConfig($nickname) - { - $sn = Status_network::getKV('nickname', $nickname); - if ($sn) { - $this->switchSite($nickname); - if (!in_array($nickname, $this->sites)) { - $this->addSite(); - } - $this->stats('siteupdate'); - } else { - $this->_log(LOG_ERR, "Ignoring ping for unrecognized new site $nickname"); - } - } - - /** - * Combines the queue_basename from configuration with the - * group name for this queue to give eg: - * - * /queue/statusnet/main - * /queue/statusnet/main/distrib - * /queue/statusnet/xmpp/xmppout/site01 - * - * @param string $queue - * @return string - */ - protected function queueName($queue) - { - $group = $this->queueGroup($queue); - $site = GNUsocial::currentSite(); - - $specs = array("$group/$queue/$site", - "$group/$queue"); - foreach ($specs as $spec) { - if (in_array($spec, $this->breakout)) { - return $this->base . $spec; - } - } - return $this->base . $group; - } - - /** - * Get the breakout mode for the given queue on the current site. - * - * @param string $queue - * @return string one of 'shared', 'handler', 'site' - */ - protected function breakoutMode($queue) - { - $breakout = common_config('queue', 'breakout'); - if (isset($breakout[$queue])) { - return $breakout[$queue]; - } else if (isset($breakout['*'])) { - return $breakout['*']; - } else { - return 'shared'; - } - } - - protected function begin($idx) - { - if ($this->useTransactions) { - if (!empty($this->transaction[$idx])) { - throw new Exception("Tried to start transaction in the middle of a transaction"); - } - $this->transactionCount[$idx]++; - $this->transaction[$idx] = $this->master->id . '-' . $this->transactionCount[$idx] . '-' . time(); - $this->cons[$idx]->begin($this->transaction[$idx]); - } - } - - protected function ack($idx, $frame) - { - if ($this->useAcks) { - if ($this->useTransactions) { - if (empty($this->transaction[$idx])) { - throw new Exception("Tried to ack but not in a transaction"); - } - $this->cons[$idx]->ack($frame, $this->transaction[$idx]); - } else { - $this->cons[$idx]->ack($frame); - } - } - } - - protected function commit($idx) - { - if ($this->useTransactions) { - if (empty($this->transaction[$idx])) { - throw new Exception("Tried to commit but not in a transaction"); - } - $this->cons[$idx]->commit($this->transaction[$idx]); - $this->transaction[$idx] = null; - } - } - - protected function rollback($idx) - { - if ($this->useTransactions) { - if (empty($this->transaction[$idx])) { - throw new Exception("Tried to rollback but not in a transaction"); - } - $this->cons[$idx]->commit($this->transaction[$idx]); - $this->transaction[$idx] = null; - } - } -} - diff --git a/lib/util/default.php b/lib/util/default.php index 1de401d090..dfefaf2958 100644 --- a/lib/util/default.php +++ b/lib/util/default.php @@ -361,6 +361,7 @@ $default = 'Poll' => [], 'SimpleCaptcha' => [], 'TagSub' => [], + 'UnQueue' => [], 'WebFinger' => [], ], 'locale_path' => false, // Set to a path to use *instead of* each plugin's own locale subdirectories diff --git a/plugins/DBQueue/DBQueuePlugin.php b/plugins/DBQueue/DBQueuePlugin.php new file mode 100644 index 0000000000..59779dba9f --- /dev/null +++ b/plugins/DBQueue/DBQueuePlugin.php @@ -0,0 +1,49 @@ +. + +/** + * DB interface for GNU social queues + * + * @package GNUsocial + * @author Miguel Dantas + * @copyright 2019 Free Software Foundation, Inc http://www.fsf.org + * @license https://www.gnu.org/licenses/agpl.html GNU AGPL v3 or later + */ + +defined('GNUSOCIAL') || die(); + +class DBQueuePlugin extends Plugin +{ + const PLUGIN_VERSION = '0.0.1'; + + public function onStartNewQueueManager(?QueueManager &$qm) + { + common_debug("Starting DB queue manager."); + $qm = new DBQueueManager(); + return false; + } + + public function onPluginVersion(array &$versions): bool + { + $versions[] = array('name' => 'DBQueue', + 'version' => self::PLUGIN_VERSION, + 'author' => 'Miguel Dantas', + 'description' => + // TRANS: Plugin description. + _m('Plugin using the database as a backend for GNU social queues')); + return true; + } +}; diff --git a/plugins/DBQueue/README b/plugins/DBQueue/README new file mode 100644 index 0000000000..10913a8907 --- /dev/null +++ b/plugins/DBQueue/README @@ -0,0 +1,18 @@ +DBQueuePlugin wraps the DBQueueManager class which is a queue manager +that uses the database as it's backing storage. + +Installation +============ + +This plugin is replaces other queue manager plugins, such as UnQueue, +which enabled by default and which should, but is not required to be +disabled. + +addPlugin('DBQueue'); + +Example +======= + +In config.php + +addPlugin('DBQueue'); \ No newline at end of file diff --git a/lib/queue/dbqueuemanager.php b/plugins/DBQueue/classes/DBQueueManager.php similarity index 91% rename from lib/queue/dbqueuemanager.php rename to plugins/DBQueue/classes/DBQueueManager.php index 0fc7305c0a..bf8e7fe5d4 100644 --- a/lib/queue/dbqueuemanager.php +++ b/plugins/DBQueue/classes/DBQueueManager.php @@ -50,13 +50,13 @@ class DBQueueManager extends QueueManager } $this->stats('enqueued', $queue); - return true; } /** * Poll every 10 seconds for new events during idle periods. * We'll look in more often when there's data available. + * Must be greater than 0 for the poll method to be called * * @return int seconds */ @@ -69,9 +69,9 @@ class DBQueueManager extends QueueManager * Run a polling cycle during idle processing in the input loop. * @return boolean true if we should poll again for more data immediately */ - public function poll() + public function poll(): bool { - //$this->_log(LOG_DEBUG, 'Checking for notices...'); + $this->_log(LOG_DEBUG, 'Checking for notices...'); $qi = Queue_item::top($this->activeQueues(), $this->getIgnoredTransports()); if (!$qi instanceof Queue_item) { //$this->_log(LOG_DEBUG, 'No notices waiting; idling.'); @@ -88,7 +88,7 @@ class DBQueueManager extends QueueManager $rep = $this->logrep($item); $this->_log(LOG_DEBUG, 'Got '._ve($rep).' for transport '._ve($qi->transport)); - + try { $handler = $this->getHandler($qi->transport); $result = $handler->handle($item); @@ -96,13 +96,16 @@ class DBQueueManager extends QueueManager $this->noHandlerFound($qi, $rep); return true; } catch (NoResultException $e) { - $this->_log(LOG_ERR, "[{$qi->transport}:$rep] ".get_class($e).' thrown ('._ve($e->getMessage()).'), ignoring queue_item '._ve($qi->getID())); + $this->_log(LOG_ERR, "[{$qi->transport}:$rep] ".get_class($e).' thrown ('. + _ve($e->getMessage()).'), ignoring queue_item '._ve($qi->getID())); $result = true; } catch (AlreadyFulfilledException $e) { - $this->_log(LOG_ERR, "[{$qi->transport}:$rep] ".get_class($e).' thrown ('._ve($e->getMessage()).'), ignoring queue_item '._ve($qi->getID())); + $this->_log(LOG_ERR, "[{$qi->transport}:$rep] ".get_class($e).' thrown ('. + _ve($e->getMessage()).'), ignoring queue_item '._ve($qi->getID())); $result = true; } catch (Exception $e) { - $this->_log(LOG_ERR, "[{$qi->transport}:$rep] Exception (".get_class($e).') thrown: '._ve($e->getMessage())); + $this->_log(LOG_ERR, "[{$qi->transport}:$rep] Exception (". + get_class($e).') thrown: '._ve($e->getMessage())); $result = false; } diff --git a/plugins/OpportunisticQM/OpportunisticQMPlugin.php b/plugins/OpportunisticQM/OpportunisticQMPlugin.php index 8f42849d9a..4a67a37602 100644 --- a/plugins/OpportunisticQM/OpportunisticQMPlugin.php +++ b/plugins/OpportunisticQM/OpportunisticQMPlugin.php @@ -1,17 +1,16 @@ connect('main/runqueue', - ['action' => 'runqueue']); + $m->connect('main/runqueue', ['action' => 'runqueue']); } /** @@ -26,23 +25,21 @@ class OpportunisticQMPlugin extends Plugin { global $_startTime; - $args = array( - 'qmkey' => common_config('opportunisticqm', 'qmkey'), - 'max_execution_time' => $this->secs_per_action, - 'started_at' => $this->rel_to_pageload ? $_startTime : null, - 'verbosity' => $this->verbosity, - ); - $qm = new OpportunisticQueueManager($args); + $args = ['qmkey' => common_config('opportunisticqm', 'qmkey'), + 'max_execution_time' => $this->secs_per_action, + 'started_at' => $this->rel_to_pageload ? $_startTime : null, + 'verbosity' => $this->verbosity]; + $qm = new OpportunisticQueueManager($args); $qm->runQueue(); return true; } public function onPluginVersion(array &$versions): bool { - $versions[] = array('name' => 'OpportunisticQM', - 'version' => self::PLUGIN_VERSION, - 'author' => 'Mikael Nordfeldth', - 'homepage' => 'http://www.gnu.org/software/social/', + $versions[] = array('name' => 'OpportunisticQM', + 'version' => self::PLUGIN_VERSION, + 'author' => 'Mikael Nordfeldth', + 'homepage' => 'http://www.gnu.org/software/social/', 'description' => // TRANS: Plugin description. _m('Opportunistic queue manager plugin for background processing.')); diff --git a/plugins/OpportunisticQM/actions/runqueue.php b/plugins/OpportunisticQM/actions/runqueue.php index e6ce5b84c0..9b028c723e 100644 --- a/plugins/OpportunisticQM/actions/runqueue.php +++ b/plugins/OpportunisticQM/actions/runqueue.php @@ -23,16 +23,14 @@ class RunqueueAction extends Action { protected $qm = null; - protected function prepare(array $args=array()) + protected function prepare(array $args = []) { parent::prepare($args); - $args = array(); + $args = []; - foreach (array('qmkey') as $key) { - if ($this->arg($key) !== null) { - $args[$key] = $this->arg($key); - } + if ($this->arg('qmkey') !== null) { + $args['qmkey'] = $this->arg('qmkey'); } try { diff --git a/plugins/OpportunisticQM/lib/opportunisticqueuemanager.php b/plugins/OpportunisticQM/lib/opportunisticqueuemanager.php index c0d9125f10..520cc52af2 100644 --- a/plugins/OpportunisticQM/lib/opportunisticqueuemanager.php +++ b/plugins/OpportunisticQM/lib/opportunisticqueuemanager.php @@ -14,7 +14,7 @@ * @license http://www.fsf.org/licensing/licenses/agpl-3.0.html AGPL 3.0 * @link http://status.net/ */ -class OpportunisticQueueManager extends DBQueueManager +class OpportunisticQueueManager extends QueueManager { protected $qmkey = false; protected $max_execution_time = null; @@ -26,9 +26,10 @@ class OpportunisticQueueManager extends DBQueueManager protected $verbosity = null; - const MAXEXECTIME = 20; // typically just used for the /main/cron action, only used if php.ini max_execution_time is 0 + // typically just used for the /main/cron action, only used if php.ini max_execution_time is 0 + const MAXEXECTIME = 20; - public function __construct(array $args=array()) { + public function __construct(array $args = []) { foreach (get_class_vars(get_class($this)) as $key=>$val) { if (array_key_exists($key, $args)) { $this->$key = $args[$key]; @@ -45,12 +46,14 @@ class OpportunisticQueueManager extends DBQueueManager } if ($this->max_execution_margin === null) { - $this->max_execution_margin = common_config('http', 'connect_timeout') + 1; // think PHP's max exec time, minus this value to have time for timeouts etc. + // think PHP's max exec time, minus this value to have time for timeouts etc. + $this->max_execution_margin = common_config('http', 'connect_timeout') + 1; } return parent::__construct(); } + protected function verifyKey() { if ($this->qmkey !== common_config('opportunisticqm', 'qmkey')) { @@ -58,6 +61,12 @@ class OpportunisticQueueManager extends DBQueueManager } } + public function enqueue($object, $key) + { + // Nothing to do, should never get called + throw new ServerException('OpportunisticQueueManager::enqueue should never be called'); + } + public function canContinue() { $time_passed = time() - $this->started_at; @@ -67,7 +76,8 @@ class OpportunisticQueueManager extends DBQueueManager return false; } // If too much time has passed, stop - if ($time_passed >= $this->max_execution_time || $time_passed > ini_get('max_execution_time') - $this->max_execution_margin) { + if ($time_passed >= $this->max_execution_time || + $time_passed > ini_get('max_execution_time') - $this->max_execution_margin) { return false; } // If we have a max-item-limit, check if it has been passed @@ -80,25 +90,15 @@ class OpportunisticQueueManager extends DBQueueManager public function poll() { - $this->handled_items++; - if (!parent::poll()) { - throw new RunQueueOutOfWorkException(); + $qm = $this->get(); + if ($qm->pollInterval() <= 0 && ! $qm instanceof UnQueueManager) { + // Special case for UnQueueManager as it is a default plugin + // and does not require queues, since it does everything immediately + throw new ServerException('OpportunisticQM cannot work together' . + 'with queues that do not implement polling'); } - return true; - } - - // OpportunisticQM shouldn't discard items it can't handle, we're - // only here to take care of what we _can_ handle! - protected function noHandlerFound(Queue_item $qi, $rep=null) { - $this->_log(LOG_WARNING, "[{$qi->transport}:item {$qi->id}] Releasing claim for queue item without a handler"); - $this->_fail($qi, true); // true here means "releaseOnly", so no error statistics since it's not an _error_ - } - - protected function _fail(Queue_item $qi, $releaseOnly=false) - { - parent::_fail($qi, $releaseOnly); - $this->_log(LOG_DEBUG, "[{$qi->transport}:item {$qi->id}] Ignoring this transport for the rest of this execution"); - $this->ignoreTransport($qi->transport); + ++$this->handled_items; + return $qm->poll(); } /** @@ -110,17 +110,22 @@ class OpportunisticQueueManager extends DBQueueManager public function runQueue() { while ($this->canContinue()) { - try { - $this->poll(); - } catch (RunQueueOutOfWorkException $e) { + if (!$this->poll()) { + // Out of work return true; } } + if ($this->handled_items > 0) { - common_debug('Opportunistic queue manager passed execution time/item handling limit without being out of work.'); + common_debug('Opportunistic queue manager passed execution time/item ' . + 'handling limit without being out of work.'); + return true; } elseif ($this->verbosity > 1) { - common_debug('Opportunistic queue manager did not have time to start on this action (max: '.$this->max_execution_time.' exceeded: '.abs(time()-$this->started_at).').'); + common_debug('Opportunistic queue manager did not have time to start ' . + 'on this action (max: ' . $this->max_execution_time . + ' exceeded: ' . abs(time() - $this->started_at) . ').'); } + return false; } } diff --git a/plugins/RedisQueue/README b/plugins/RedisQueue/README new file mode 100644 index 0000000000..655b393044 --- /dev/null +++ b/plugins/RedisQueue/README @@ -0,0 +1,18 @@ +RedisQueuePlugin wraps the RedisQueueManager class which is a queue manager +that uses Redis as it's backing storage. + +Installation +============ + +This plugin is replaces other queue manager plugins, such as UnQueue, +which enabled by default and which should, but is not required to be +disabled. + +addPlugin('RedisQueue', ['server' => 'your-redis-instance-and-port']); + +Example +======= + +In config.php + +addPlugin('RedisQueue', ['server' => 'tcp://localhost:6379']); \ No newline at end of file diff --git a/plugins/RedisQueue/RedisQueuePlugin.php b/plugins/RedisQueue/RedisQueuePlugin.php new file mode 100644 index 0000000000..6437a08eb6 --- /dev/null +++ b/plugins/RedisQueue/RedisQueuePlugin.php @@ -0,0 +1,51 @@ +. + +/** + * Redis interface for GNU social queues + * + * @package GNUsocial + * @author Miguel Dantas + * @copyright 2019 Free Software Foundation, Inc http://www.fsf.org + * @license https://www.gnu.org/licenses/agpl.html GNU AGPL v3 or later + */ + +defined('GNUSOCIAL') || die(); + +class RedisQueuePlugin extends Plugin +{ + const PLUGIN_VERSION = '0.0.1'; + + // settings which can be set in config.php with addPlugin('RedisQueue', ['param'=>'value', ...]); + public $server = null; + + public function onStartNewQueueManager(?QueueManager &$qm) + { + $qm = new RedisQueueManager($this->server); + return false; + } + + public function onPluginVersion(array &$versions): bool + { + $versions[] = array('name' => 'RedisQueue', + 'version' => self::PLUGIN_VERSION, + 'author' => 'Miguel Dantas', + 'description' => + // TRANS: Plugin description. + _m('Plugin implementing Redis as a backend for GNU social queues')); + return true; + } +}; diff --git a/plugins/RedisQueue/classes/RedisQueueManager.php b/plugins/RedisQueue/classes/RedisQueueManager.php new file mode 100644 index 0000000000..b4adca7e2f --- /dev/null +++ b/plugins/RedisQueue/classes/RedisQueueManager.php @@ -0,0 +1,66 @@ +server = $server; + $this->queue = 'gnusocial:' . common_config('site', 'name'); + } + + private function _ensureConn() + { + if ($this->client === null) { + $this->client = new Client($this->server); + } + } + + public function pollInterval() + { + return 10; + } + + public function enqueue($object, $queue) + { + $this->_ensureConn(); + $ret = $this->client->rpush($this->queue, $this->encode([$queue, $object])); + if (empty($ret)) { + common_log(LOG_ERR, "Unable to insert object into Redis queue {$queue}"); + } else { + common_debug("The Redis queue for {$queue} has length {$ret}"); + } + } + + public function poll() + { + try { + $this->_ensureConn(); + $ret = $this->client->lpop($this->queue); + if (!empty($ret)) { + list($queue, $object) = $this->decode($ret); + } else { + return false; + } + } catch (Exception $e) { + $this->_log(LOG_INFO, "[Queue {$queue}] Discarding: " . _ve($e->getMessage())); + return false; + } + + try { + $handler = $this->getHandler($queue); + $handler->handle($object); + common_debug("Redis Queue handled item from {$queue} queue"); + return true; + } catch (Exception $e) { + $this->_log(LOG_ERR, "[Queue: {$queue}] `" . get_class($e) . '` thrown: ' . _ve($e->getMessage())); + return false; + } + } +}; diff --git a/plugins/StompQueue/README b/plugins/StompQueue/README new file mode 100644 index 0000000000..9685272ad3 --- /dev/null +++ b/plugins/StompQueue/README @@ -0,0 +1,42 @@ +StompQueuePlugin wraps the StompQueueManager class which is a queue manager +that uses STOMP as a communication method to some form of backing storage. + +Installation +============ + +This plugin is replaces other queue manager plugins, such as UnQueue, +which enabled by default and which should, but is not required to be +disabled. + +addPlugin('StompQueue', ['servers' => ['your-redis-instance-and-port'], + 'vhost' => 'your-vhost', + 'username' => 'your-username', + 'password' => 'your-password']); + +Options +======= + +servers (default: null) - array of server addresses to use +vhost (default: '') - configured vhost -- required +username (default: 'guest') -- configured username -- don't use the default +password (default: 'guest') -- configured password -- don't use the default +basename (default: "queue:gnusocial-{$site_name}") -- prefix for all queue names, +useful to avoid collisions. Cannot contain `/` +control (default: 'gnusocial:control') -- control channel name. Cannot contain `/` +breakout (default: null) -- array of queue names which should be broken out into a previously unused server +useTransactions (default: false) -- whether to use transactions, allowing rollbacks in case of failure +useAcks (default: false) -- whether to explicitly use acknowledgements when receiving a message. +Usefull to avoid timeouts and possibly reduce load on the STOMP server +manualFailover (default: false) -- whether to coordinate failover in PHP or to let all servers act +as one coordinated unit +defaultIdx (default: 0) -- index in the servers array which is used by default. Will be updated in case of an error +persistent (default: []) -- list of queues which should be persistent + +Example +======= + +In config.php + +addPlugin('StompQueue', ['servers' => 'tcp://localhost:61613', 'vhost' => '/', + // Please don't actually use the default credentials + 'username' => 'guest', 'password' => 'guest']); \ No newline at end of file diff --git a/plugins/StompQueue/StompQueuePlugin.php b/plugins/StompQueue/StompQueuePlugin.php new file mode 100644 index 0000000000..4038e223e8 --- /dev/null +++ b/plugins/StompQueue/StompQueuePlugin.php @@ -0,0 +1,72 @@ +. + +/** + * STOMP interface for GNU social queues + * + * @package GNUsocial + * @author Miguel Dantas + * @copyright 2019 Free Software Foundation, Inc http://www.fsf.org + * @license https://www.gnu.org/licenses/agpl.html GNU AGPL v3 or later + */ + +defined('GNUSOCIAL') || die(); + +class StompQueuePlugin extends Plugin +{ + const PLUGIN_VERSION = '0.0.1'; + + // settings which can be set in config.php with addPlugin('StompQueue', ['param'=>'value', ...]); + public $servers = null; + public $vhost = ''; + public $username = 'guest'; + public $password = 'guest'; + public $basename = ''; + public $control = 'gnusocial:control'; + public $breakout = []; + public $useTransactions = false; + public $useAcks = false; + public $manualFailover = false; + public $defaultIdx = 0; + public $persistent = []; + + public function onStartNewQueueManager(?QueueManager &$qm) + { + if (empty($this->servers)) { + throw new ServerException('Invalid STOMP server address'); + } elseif (!is_array($this->servers)) { + $this->servers = [$this->servers]; + } + + if (empty($this->basename)) { + $this->basename = 'queue:gnusocial-' . common_config('site', 'name') . ':'; + } + + $qm = new StompQueueManager($this); + return false; + } + + public function onPluginVersion(array &$versions): bool + { + $versions[] = array('name' => 'StompQueue', + 'version' => self::PLUGIN_VERSION, + 'author' => 'Miguel Dantas', + 'description' => + // TRANS: Plugin description. + _m('Plugin implementing STOMP as a backend for GNU social queues')); + return true; + } +}; diff --git a/plugins/StompQueue/classes/StompQueueManager.php b/plugins/StompQueue/classes/StompQueueManager.php new file mode 100644 index 0000000000..300e141ef4 --- /dev/null +++ b/plugins/StompQueue/classes/StompQueueManager.php @@ -0,0 +1,608 @@ +pl = $pl; + } + + /** + * Initialize our connection and subscribe to all the queues + * we're going to need to handle... If multiple queue servers + * are configured for failover, we'll listen to all of them. + * + * Side effects: in multi-site mode, may reset site configuration. + * + * @param IoMaster $master process/event controller + * @return bool return false on failure + * @throws ServerException + */ + public function start($master) + { + parent::start($master); + return $this->_ensureConn(); + } + + /** + * Close out any active connections. + * + * @return bool return false on failure + * @throws Exception + */ + public function finish() + { + foreach ($this->stomps as $idx => $stomp) { + common_log(LOG_INFO, "Unsubscribing on: " . $stomp->getClient()->getConnection()->getHost()); + $stomp->unsubscribe(); + } + // If there are any outstanding delivered messages we haven't processed, + // free them for another thread to take. + foreach ($this->stomps as $i => $st) { + if ($st) { + $this->rollback($i); + $st->getClient()->disconnect(); + $this->stomps[$i] = null; + } + } + return true; + } + + + /** + * Lazy open connections to all STOMP servers, if in manual failover + * mode. This means the queue servers don't speak to each other, so + * we have to listen to all of them to make sure we get all events. + */ + private function _ensureConn() + { + if ($this->stomps === null) { + if (!$this->pl->manualFailover) { + $list = $this->pl->servers; + if (count($list) > 1) { + shuffle($list); // Randomize to spread load + $url = 'failover://(' . implode(',', $list) . ')'; + } else { + $url = $list[0]; + } + $st = $this->_doConnect($url); + $this->stomps = [$st]; + $this->transactionCount = [0]; + $this->transaction = [null]; + $this->disconnect = [null]; + } else { + $this->stomps = []; + $this->transactionCount = []; + $this->transaction = []; + foreach ($this->pl->servers as $_ => $url) { + try { + $this->stomps[] = $this->_doConnect($url); + $this->disconnect[] = null; // If the previous succeeded + } catch (Exception $e) { + // s'okay, we'll live + $this->stomps[] = null; + $this->disconnect[] = time(); + } + $this->transactionCount[] = 0; + $this->transaction[] = null; + } + } + // Done attempting connections + if (empty($this->stomps)) { + throw new ServerException("No STOMP queue servers reachable"); + } else { + foreach ($this->stomps as $i => $st) { + if ($st) { + $this->subscribe($st); + $this->begin($i); + } + } + } + } + return true; + } + + protected function _doConnect($server_url) + { + common_debug("STOMP: connecting to '{$server_url}' as '{$this->pl->username}'..."); + $cl = new Client($server_url); + $cl->setLogin($this->pl->username, $this->pl->password); + $cl->setVhostname($this->pl->vhost); + + try { + $cl->connect(); + common_debug("STOMP connected."); + } catch (StompException $e) { + common_log(LOG_ERR, 'Failed to connect to STOMP queue server'); + throw new ServerException('Failed to connect to STOMP queue server'); + } + + return new StatefulStomp($cl); + } + + /** + * Grab a full list of stomp-side queue subscriptions. + * Will include: + * - control broadcast channel + * - shared group queues for active groups + * - per-handler and per-site breakouts that are rooted in the active groups. + * + * @return array of strings + */ + protected function subscriptions(): array + { + $subs = []; + $subs[] = $this->pl->control; + + foreach ($this->activeGroups as $group) { + $subs[] = $this->pl->basename . $group; + } + + foreach ($this->pl->breakout as $spec) { + $parts = explode(':', $spec); + if (count($parts) < 2 || count($parts) > 3) { + common_log(LOG_ERR, "Bad queue breakout specifier '{$spec}'"); + } + if (in_array($parts[0], $this->activeGroups)) { + $subs[] = $this->pl->basename . $spec; + } + } + + return array_unique($subs); + } + + /** + * Set up all our raw queue subscriptions on the given connection + * @param Client $st + */ + protected function subscribe(StatefulStomp $st) + { + $this->_ensureConn(); + $host = $st->getClient()->getConnection()->getHost(); + foreach ($this->subscriptions() as $sub) { + if (!in_array($sub, $this->subscriptions)) { + $this->_log(LOG_INFO, "Subscribing to '{$sub}' on '{$host}'"); + try { + $st->subscribe($sub); + } catch (Exception $e) { + common_log(LOG_ERR, "STOMP received exception: " . get_class($e) . + " while trying to subscribe: " . $e->getMessage()); + throw $e; + } + + $this->subscriptions[] = $sub; + } + } + } + + protected function begin($idx) + { + if ($this->pl->useTransactions) { + if (!empty($this->transaction[$idx])) { + throw new Exception("Tried to start transaction in the middle of a transaction"); + } + $this->transactionCount[$idx]++; + $this->transaction[$idx] = $this->master->id . '-' . $this->transactionCount[$idx] . '-' . time(); + $this->stomps[$idx]->begin($this->transaction[$idx]); + } + } + + protected function ack($idx, $frame) + { + if ($this->pl->useAcks) { + if ($this->pl->useTransactions) { + if (empty($this->transaction[$idx])) { + throw new Exception("Tried to ack but not in a transaction"); + } + $this->stomps[$idx]->ack($frame, $this->transaction[$idx]); + } else { + $this->stomps[$idx]->ack($frame); + } + } + } + + protected function commit($idx) + { + if ($this->useTransactions) { + if (empty($this->transaction[$idx])) { + throw new Exception("Tried to commit but not in a transaction"); + } + $this->stomps[$idx]->commit($this->transaction[$idx]); + $this->transaction[$idx] = null; + } + } + + protected function rollback($idx) + { + if ($this->useTransactions) { + if (empty($this->transaction[$idx])) { + throw new Exception("Tried to rollback but not in a transaction"); + } + $this->stomps[$idx]->abort($this->transaction[$idx]); + $this->transaction[$idx] = null; + } + } + + /** + * Saves an object into the queue item table. + * + * @param mixed $object + * @param string $queue + * @param string $siteNickname optional override to drop into another site's queue + * @throws Exception + */ + public function enqueue($object, $queue, $siteNickname = null) + { + $this->_ensureConn(); + $idx = $this->pl->defaultIdx; + $rep = $this->logrep($object); + $envelope = ['site' => $siteNickname ?: common_config('site', 'nickname'), + 'handler' => $queue, + 'payload' => $this->encode($object)]; + $msg = base64_encode(serialize($envelope)); + + $props = ['created' => common_sql_now()]; + if ($this->isPersistent($queue)) { + $props['persistent'] = 'true'; + } + + $st = $this->stomps[$idx]; + $host = $st->getClient()->getConnection()->getHost(); + $target = $this->queueName($queue); + + $result = $st->send($target, new Message($msg), $props); + + if (!$result) { + common_log(LOG_ERR, "STOMP error sending $rep to $queue queue on $host $target"); + return false; + } + + common_debug("STOMP complete remote queueing $rep for queue `$queue` on host `$host` on channel `$target`"); + $this->stats('enqueued', $queue); + return true; + } + + /** + * Determine whether messages to this queue should be marked as persistent. + * Actual persistent storage depends on the queue server's configuration. + * @param string $queue + * @return bool + */ + protected function isPersistent($queue) + { + $mode = $this->pl->persistent; + if (is_array($mode)) { + return in_array($queue, $mode); + } else { + return (bool)$mode; + } + } + + /** + * Poll every 10 seconds for new events during idle periods. + * We'll look in more often when there's data available. + * Must be greater than 0 for the poll method to be called + * + * @return int seconds + */ + public function pollInterval() + { + return 10; + } + + /** + * Poll a queue and Handle an event + * + * If the queue handler reports failure, the message is requeued for later. + * Missing notices or handler classes will drop the message. + * + * Side effects: in multi-site mode, may reset site configuration to + * match the site that queued the event. + * + * @return bool success + * @throws ConfigException + * @throws NoConfigException + * @throws ServerException + * @throws StompException + */ + public function poll(): bool + { + $this->_ensureConn(); + + $frame = $this->stomps[$this->pl->defaultIdx]->getClient()->readFrame(); + + if (empty($frame)) { + return false; + } + + $host = $this->stomps[$this->pl->defaultIdx]->getHost(); + $message = unserialize(base64_decode($frame->body)); + + if ($message === false) { + common_log(LOG_ERR, "STOMP can't unserialize frame: {$frame->body}\n" . + 'Unserializable frame length: ' . strlen($frame->body)); + return false; + } + + $site = $message['site']; + $queue = $message['handler']; + + if ($this->isDeadLetter($frame, $message)) { + $this->stats('deadletter', $queue); + return false; + } + + // @fixme detect failing site switches + $this->switchSite($site); + + try { + $item = $this->decode($message['payload']); + } catch (Exception $e) { + common_log(LOG_ERR, "Skipping empty or deleted item in queue {$queue} from {$host}"); + $this->stats('baditem', $queue); + return false; + } + + $info = $this->logrep($item) . ' posted at ' . + $frame->getHeaders()['created'] . " in queue {$queue} from {$host}"; + try { + $handler = $this->getHandler($queue); + $ok = $handler->handle($item); + } catch (NoQueueHandlerException $e) { + common_log(LOG_ERR, "Missing handler class; skipping $info"); + $this->stats('badhandler', $queue); + return false; + } catch (Exception $e) { + common_log(LOG_ERR, "Exception on queue $queue: " . $e->getMessage()); + $ok = false; + } + + if ($ok) { + common_log(LOG_INFO, "Successfully handled $info"); + $this->stats('handled', $queue); + } else { + common_log(LOG_WARNING, "Failed handling $info"); + // Requeing moves the item to the end of the line for its next try. + // @fixme add a manual retry count + $this->enqueue($item, $queue); + $this->stats('requeued', $queue); + } + + return false; + } + + /** + * Check if a redelivered message has been run through enough + * that we're going to give up on it. + * + * @param Frame $frame + * @param array $message unserialized message body + * @return bool true if we should discard + */ + protected function isDeadLetter($frame, $message) + { + if (isset($frame->getHeaders()['redelivered']) && $frame->getHeaders()['redelivered'] == 'true') { + // Message was redelivered, possibly indicating a previous failure. + $msgId = $frame->getHeaders()['message-id']; + $site = $message['site']; + $queue = $message['handler']; + $msgInfo = "message $msgId for $site in queue $queue"; + + $deliveries = $this->incDeliveryCount($msgId); + if ($deliveries > common_config('queue', 'max_retries')) { + $info = "DEAD-LETTER FILE: Gave up after retry $deliveries on $msgInfo"; + + $outdir = common_config('queue', 'dead_letter_dir'); + if ($outdir) { + $filename = $outdir . "/$site-$queue-" . rawurlencode($msgId); + $info .= ": dumping to $filename"; + file_put_contents($filename, $message['payload']); + } + + common_log(LOG_ERR, $info); + return true; + } else { + common_log(LOG_INFO, "retry $deliveries on $msgInfo"); + } + } + return false; + } + + /** + * Update count of times we've re-encountered this message recently, + * triggered when we get a message marked as 'redelivered'. + * + * Requires a CLI-friendly cache configuration. + * + * @param string $msgId message-id header from message + * @return int number of retries recorded + */ + function incDeliveryCount($msgId) + { + $count = 0; + $cache = Cache::instance(); + if ($cache) { + $key = 'gnusocial:stomp:message-retries:' . $msgId; + $count = $cache->increment($key); + if (!$count) { + $count = 1; + $cache->set($key, $count, null, 3600); + } + } + return $count; + } + + /** + * Combines the queue_basename from configuration with the + * group name for this queue to give eg: + * + * /queue/statusnet/main + * /queue/statusnet/main/distrib + * /queue/statusnet/xmpp/xmppout/site01 + * + * @param string $queue + * @return string + * @throws Exception + */ + protected function queueName(string $queue): string + { + $group = $this->queueGroup($queue); + $site = GNUsocial::currentSite(); + + foreach (["$group:$queue:$site", "$group:$queue"] as $spec) { + if (in_array($spec, $this->breakout)) { + return $this->pl->basename . $spec; + } + } + return $this->pl->basename . $group; + } + + /** + * Get the breakout mode for the given queue on the current site. + * + * @param string $queue + * @return string one of 'shared', 'handler', 'site' + */ + protected function breakoutMode($queue) + { + if (isset($this->pl->breakout[$queue])) { + return $this->pl->breakout[$queue]; + } else if (isset($this->pl->breakout['*'])) { + return $this->pl->breakout['*']; + } else { + return 'shared'; + } + } + + /** + * Tell the i/o master we only need a single instance to cover + * all sites running in this process. + */ + public static function multiSite() + { + return IoManager::INSTANCE_PER_PROCESS; + } + + /** + * Optional; ping any running queue handler daemons with a notification + * such as announcing a new site to handle or requesting clean shutdown. + * This avoids having to restart all the daemons manually to update configs + * and such. + * + * Currently only relevant for multi-site queue managers such as Stomp. + * + * @param string $event event key + * @param string $param optional parameter to append to key + * @return bool success + */ + public function sendControlSignal($event, $param = '') + { + $message = $event; + if ($param != '') { + $message .= ':' . $param; + } + $this->_ensureConn(); + $st = $this->stomps[$this->pl->defaultIdx]; + $result = $st->send($this->pl->control, $message, ['created' => common_sql_now()]); + if ($result) { + common_log(LOG_INFO, "Sent control ping to STOMP queue daemons: $message"); + return true; + } else { + common_log(LOG_ERR, "Failed sending control ping to STOMP queue daemons: $message"); + return false; + } + } + + /** + * Process a control signal broadcast. + * + * @param int $idx connection index + * @param array $frame Stomp frame + * @return bool true to continue; false to stop further processing. + * @throws ConfigException + * @throws NoConfigException + * @throws ServerException + */ + protected function handleControlSignal(int $idx, $frame): bool + { + $message = trim($frame->body); + if (strpos($message, ':') !== false) { + list($event, $param) = explode(':', $message, 2); + } else { + $event = $message; + $param = ''; + } + + $shutdown = false; + + if ($event == 'shutdown') { + $this->master->requestShutdown(); + $shutdown = true; + } else if ($event == 'restart') { + $this->master->requestRestart(); + $shutdown = true; + } else if ($event == 'update') { + $this->updateSiteConfig($param); + } else { + common_log(LOG_ERR, "Ignoring unrecognized control message: $message"); + } + return $shutdown; + } + + /** + * Switch site, if necessary, and reset current handler assignments + * @param string $site + * @throws ConfigException + * @throws NoConfigException + * @throws ServerException + */ + private function switchSite(string $site): void + { + if ($site != GNUsocial::currentSite()) { + $this->stats('switch'); + GNUsocial::switchSite($site); + $this->initialize(); + } + } + + /** + * (Re)load runtime configuration for a given site by nickname, + * triggered by a broadcast to the 'statusnet-control' topic. + * + * Configuration changes in database should update, but config + * files might not. + * + * @param $nickname + * @return bool true to continue; false to stop further processing. + * @throws ConfigException + * @throws NoConfigException + * @throws ServerException + */ + protected function updateSiteConfig($nickname): bool + { + $sn = Status_network::getKV('nickname', $nickname); + if ($sn) { + $this->switchSite($nickname); + if (!in_array($nickname, $this->sites)) { + $this->addSite(); + } + $this->stats('siteupdate'); + } else { + common_log(LOG_ERR, "Ignoring ping for unrecognized new site $nickname"); + } + } +}; diff --git a/plugins/UnQueue/README b/plugins/UnQueue/README new file mode 100644 index 0000000000..ed90e921cd --- /dev/null +++ b/plugins/UnQueue/README @@ -0,0 +1,14 @@ +UnQueuePlugin wraps the UnQueueManager class which is a queue manager that does all work immediately. + +Installation +============ + +This plugin is enabled by default and cannot be disabled unless another queue manager is in use. +Disabling is not necessary but recommended in such cases. + +Example +======= + +In config.php + +addPlugin('UnQueue'); \ No newline at end of file diff --git a/plugins/UnQueue/UnQueuePlugin.php b/plugins/UnQueue/UnQueuePlugin.php new file mode 100644 index 0000000000..1c6822a075 --- /dev/null +++ b/plugins/UnQueue/UnQueuePlugin.php @@ -0,0 +1,48 @@ +. + +/** + * Immediate action queue + * + * @package GNUsocial + * @author Miguel Dantas + * @copyright 2019 Free Software Foundation, Inc http://www.fsf.org + * @license https://www.gnu.org/licenses/agpl.html GNU AGPL v3 or later + */ + +defined('GNUSOCIAL') || die(); + +class UnQueuePlugin extends Plugin +{ + const PLUGIN_VERSION = '0.0.1'; + + public function onStartNewQueueManager(?QueueManager &$qm) + { + $qm = new UnQueueManager(); + return false; + } + + public function onPluginVersion(array &$versions): bool + { + $versions[] = array('name' => 'UnQueue', + 'version' => self::PLUGIN_VERSION, + 'author' => 'Miguel Dantas', + 'description' => + // TRANS: Plugin description. + _m('Plugin using the database as a backend for GNU social queues')); + return true; + } +}; diff --git a/lib/queue/unqueuemanager.php b/plugins/UnQueue/classes/UnQueueManager.php similarity index 100% rename from lib/queue/unqueuemanager.php rename to plugins/UnQueue/classes/UnQueueManager.php