[PLUGINS] Added StompQueue plugin, based on the implementation in lib/queue/stompqueuemanager. Updated to new STOMP library version. Dropped liberalstomp.php and stompqueuemanager.php
This commit is contained in:
		
				
					committed by
					
						 Hugo Sales
						Hugo Sales
					
				
			
			
				
	
			
			
			
						parent
						
							a3b228399b
						
					
				
				
					commit
					4644f6e96b
				
			| @@ -1,176 +0,0 @@ | ||||
| <?php | ||||
|  | ||||
| /** | ||||
|  * Based on code from Stomp PHP library, working around bugs in the base class. | ||||
|  * | ||||
|  * Original code is copyright 2005-2006 The Apache Software Foundation | ||||
|  * Modifications copyright 2009 StatusNet Inc by Brion Vibber <brion@status.net> | ||||
|  * | ||||
|  * Licensed under the Apache License, Version 2.0 (the "License"); | ||||
|  * you may not use this file except in compliance with the License. | ||||
|  * You may obtain a copy of the License at | ||||
|  * | ||||
|  * http://www.apache.org/licenses/LICENSE-2.0 | ||||
|  * | ||||
|  * Unless required by applicable law or agreed to in writing, software | ||||
|  * distributed under the License is distributed on an "AS IS" BASIS, | ||||
|  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
|  * See the License for the specific language governing permissions and | ||||
|  * limitations under the License. | ||||
|  */ | ||||
|  | ||||
| class LiberalStomp extends Stomp | ||||
| { | ||||
|     /** | ||||
|      * We need to be able to get the socket so advanced daemons can | ||||
|      * do a select() waiting for input both from the queue and from | ||||
|      * other sources such as an XMPP connection. | ||||
|      * | ||||
|      * @return resource | ||||
|      */ | ||||
|     function getSocket() | ||||
|     { | ||||
|         return $this->_socket; | ||||
|     } | ||||
|  | ||||
|     /** | ||||
|      * Return the host we're currently connected to. | ||||
|      * | ||||
|      * @return string | ||||
|      */ | ||||
|     function getServer() | ||||
|     { | ||||
|         $idx = $this->_currentHost; | ||||
|         if ($idx >= 0) { | ||||
|             $host = $this->_hosts[$idx]; | ||||
|             return "$host[0]:$host[1]"; | ||||
|         } else { | ||||
|             return '[unconnected]'; | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     /** | ||||
|      * Make socket connection to the server | ||||
|      * We also set the stream to non-blocking mode, since we'll be | ||||
|      * select'ing to wait for updates. In blocking mode it seems | ||||
|      * to get confused sometimes. | ||||
|      * | ||||
|      * @throws StompException | ||||
|      */ | ||||
|     protected function _makeConnection () | ||||
|     { | ||||
|         parent::_makeConnection(); | ||||
|         stream_set_blocking($this->_socket, 0); | ||||
|     } | ||||
|  | ||||
|     /** | ||||
|      * Version 1.0.0 of the Stomp library gets confused if messages | ||||
|      * come in too fast over the connection. This version will read | ||||
|      * out as many frames as are ready to be read from the socket. | ||||
|      * | ||||
|      * Modified from Stomp::readFrame() | ||||
|      * | ||||
|      * @return StompFrame False when no frame to read | ||||
|      */ | ||||
|     public function readFrames () | ||||
|     { | ||||
|         if (!$this->hasFrameToRead()) { | ||||
|             return false; | ||||
|         } | ||||
|          | ||||
|         $rb = 1024; | ||||
|         $data = ''; | ||||
|         $end = false; | ||||
|         $frames = array(); | ||||
|  | ||||
|         do { | ||||
|             // @fixme this sometimes hangs in blocking mode... | ||||
|             // shouldn't we have been idle until we found there's more data? | ||||
|             $read = fread($this->_socket, $rb); | ||||
|             if ($read === false || ($read === '' && feof($this->_socket))) { | ||||
|                 // @fixme possibly attempt an auto reconnect as old code? | ||||
|                 throw new StompException("Error reading"); | ||||
|                 //$this->_reconnect(); | ||||
|                 // @fixme this will lose prior items | ||||
|                 //return $this->readFrames(); | ||||
|             } | ||||
|             $data .= $read; | ||||
|             if (strpos($data, "\x00") !== false) { | ||||
|                 // Frames are null-delimited, but some servers | ||||
|                 // may append an extra \n according to old bug reports. | ||||
|                 $data = str_replace("\x00\n", "\x00", $data); | ||||
|                 $chunks = explode("\x00", $data); | ||||
|  | ||||
|                 $data = array_pop($chunks); | ||||
|                 $frames = array_merge($frames, $chunks); | ||||
|                 if ($data == '') { | ||||
|                     // We're at the end of a frame; stop reading. | ||||
|                     break; | ||||
|                 } else { | ||||
|                     // In the middle of a frame; keep going. | ||||
|                 } | ||||
|             } | ||||
|             // @fixme find out why this len < 2 check was there | ||||
|             //$len = strlen($data); | ||||
|         } while (true);//$len < 2 || $end == false); | ||||
|  | ||||
|         return array_map(array($this, 'parseFrame'), $frames); | ||||
|     } | ||||
|      | ||||
|     /** | ||||
|      * Parse a raw Stomp frame into an object. | ||||
|      * Extracted from Stomp::readFrame() | ||||
|      * | ||||
|      * @param string $data | ||||
|      * @return StompFrame | ||||
|      */ | ||||
|     function parseFrame($data) | ||||
|     { | ||||
|         list ($header, $body) = explode("\n\n", $data, 2); | ||||
|         $header = explode("\n", $header); | ||||
|         $headers = array(); | ||||
|         $command = null; | ||||
|         foreach ($header as $v) { | ||||
|             if (isset($command)) { | ||||
|                 list ($name, $value) = explode(':', $v, 2); | ||||
|                 $headers[$name] = $value; | ||||
|             } else { | ||||
|                 $command = $v; | ||||
|             } | ||||
|         } | ||||
|         $frame = new StompFrame($command, $headers, trim($body)); | ||||
|         if (isset($frame->headers['transformation']) && $frame->headers['transformation'] == 'jms-map-json') { | ||||
|             require_once 'Stomp/Message/Map.php'; | ||||
|             return new StompMessageMap($frame); | ||||
|         } else { | ||||
|             return $frame; | ||||
|         } | ||||
|         return $frame; | ||||
|     } | ||||
|  | ||||
|     /** | ||||
|      * Write frame to server | ||||
|      * | ||||
|      * @param StompFrame $stompFrame | ||||
|      */ | ||||
|     protected function _writeFrame (StompFrame $stompFrame) | ||||
|     { | ||||
|         if (!is_resource($this->_socket)) { | ||||
|             require_once 'Stomp/Exception.php'; | ||||
|             throw new StompException('Socket connection hasn\'t been established'); | ||||
|         } | ||||
|  | ||||
|         $data = $stompFrame->__toString(); | ||||
|  | ||||
|         // Make sure the socket's in a writable state; if not, wait a bit. | ||||
|         stream_set_blocking($this->_socket, 1); | ||||
|  | ||||
|         $r = fwrite($this->_socket, $data, strlen($data)); | ||||
|         stream_set_blocking($this->_socket, 0); | ||||
|         if ($r === false || $r == 0) { | ||||
|             $this->_reconnect(); | ||||
|             $this->_writeFrame($stompFrame); | ||||
|         } | ||||
|     } | ||||
|  } | ||||
|  | ||||
| @@ -71,9 +71,6 @@ abstract class QueueManager extends IoManager | ||||
|                      case 'db': | ||||
|                         self::$qm = new DBQueueManager(); | ||||
|                         break; | ||||
|                      case 'stomp': | ||||
|                         self::$qm = new StompQueueManager(); | ||||
|                         break; | ||||
|                      default: | ||||
|                         throw new ServerException("No queue manager class for type '$type'"); | ||||
|                     } | ||||
|   | ||||
| @@ -1,762 +0,0 @@ | ||||
| <?php | ||||
| /** | ||||
|  * StatusNet, the distributed open-source microblogging tool | ||||
|  * | ||||
|  * Abstract class for queue managers | ||||
|  * | ||||
|  * PHP version 5 | ||||
|  * | ||||
|  * LICENCE: This program is free software: you can redistribute it and/or modify | ||||
|  * it under the terms of the GNU Affero General Public License as published by | ||||
|  * the Free Software Foundation, either version 3 of the License, or | ||||
|  * (at your option) any later version. | ||||
|  * | ||||
|  * This program is distributed in the hope that it will be useful, | ||||
|  * but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
|  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the | ||||
|  * GNU Affero General Public License for more details. | ||||
|  * | ||||
|  * You should have received a copy of the GNU Affero General Public License | ||||
|  * along with this program.  If not, see <http://www.gnu.org/licenses/>. | ||||
|  * | ||||
|  * @category  QueueManager | ||||
|  * @package   StatusNet | ||||
|  * @author    Evan Prodromou <evan@status.net> | ||||
|  * @author    Sarven Capadisli <csarven@status.net> | ||||
|  * @copyright 2009 StatusNet, Inc. | ||||
|  * @license   http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0 | ||||
|  * @link      http://status.net/ | ||||
|  */ | ||||
|  | ||||
| require_once 'Stomp.php'; | ||||
| require_once 'Stomp/Exception.php'; | ||||
|  | ||||
| class StompQueueManager extends QueueManager | ||||
| { | ||||
|     protected $servers; | ||||
|     protected $username; | ||||
|     protected $password; | ||||
|     protected $base; | ||||
|     protected $control; | ||||
|  | ||||
|     protected $useTransactions; | ||||
|     protected $useAcks; | ||||
|  | ||||
|     protected $sites = array(); | ||||
|     protected $subscriptions = array(); | ||||
|  | ||||
|     protected $cons = array(); // all open connections | ||||
|     protected $disconnect = array(); | ||||
|     protected $transaction = array(); | ||||
|     protected $transactionCount = array(); | ||||
|     protected $defaultIdx = 0; | ||||
|  | ||||
|     function __construct() | ||||
|     { | ||||
|         parent::__construct(); | ||||
|         $server = common_config('queue', 'stomp_server'); | ||||
|         if (is_array($server)) { | ||||
|             $this->servers = $server; | ||||
|         } else { | ||||
|             $this->servers = array($server); | ||||
|         } | ||||
|         $this->username        = common_config('queue', 'stomp_username'); | ||||
|         $this->password        = common_config('queue', 'stomp_password'); | ||||
|         $this->base            = common_config('queue', 'queue_basename'); | ||||
|         $this->control         = common_config('queue', 'control_channel'); | ||||
|         $this->breakout        = common_config('queue', 'breakout'); | ||||
|         $this->useTransactions = common_config('queue', 'stomp_transactions'); | ||||
|         $this->useAcks         = common_config('queue', 'stomp_acks'); | ||||
|     } | ||||
|  | ||||
|     /** | ||||
|      * Tell the i/o master we only need a single instance to cover | ||||
|      * all sites running in this process. | ||||
|      */ | ||||
|     public static function multiSite() | ||||
|     { | ||||
|         return IoManager::INSTANCE_PER_PROCESS; | ||||
|     } | ||||
|  | ||||
|     /** | ||||
|      * Optional; ping any running queue handler daemons with a notification | ||||
|      * such as announcing a new site to handle or requesting clean shutdown. | ||||
|      * This avoids having to restart all the daemons manually to update configs | ||||
|      * and such. | ||||
|      * | ||||
|      * Currently only relevant for multi-site queue managers such as Stomp. | ||||
|      * | ||||
|      * @param string $event event key | ||||
|      * @param string $param optional parameter to append to key | ||||
|      * @return boolean success | ||||
|      */ | ||||
|     public function sendControlSignal($event, $param='') | ||||
|     { | ||||
|         $message = $event; | ||||
|         if ($param != '') { | ||||
|             $message .= ':' . $param; | ||||
|         } | ||||
|         $this->_connect(); | ||||
|         $con = $this->cons[$this->defaultIdx]; | ||||
|         $result = $con->send($this->control, | ||||
|                              $message, | ||||
|                              array ('created' => common_sql_now())); | ||||
|         if ($result) { | ||||
|             $this->_log(LOG_INFO, "Sent control ping to queue daemons: $message"); | ||||
|             return true; | ||||
|         } else { | ||||
|             $this->_log(LOG_ERR, "Failed sending control ping to queue daemons: $message"); | ||||
|             return false; | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     /** | ||||
|      * Saves an object into the queue item table. | ||||
|      * | ||||
|      * @param mixed $object | ||||
|      * @param string $queue | ||||
|      * @param string $siteNickname optional override to drop into another site's queue | ||||
|      * | ||||
|      * @return boolean true on success | ||||
|      * @throws StompException on connection or send error | ||||
|      */ | ||||
|     public function enqueue($object, $queue, $siteNickname=null) | ||||
|     { | ||||
|         $this->_connect(); | ||||
|         if (common_config('queue', 'stomp_enqueue_on')) { | ||||
|             // We're trying to force all writes to a single server. | ||||
|             // WARNING: this might do odd things if that server connection dies. | ||||
|             $idx = array_search(common_config('queue', 'stomp_enqueue_on'), | ||||
|                                 $this->servers); | ||||
|             if ($idx === false) { | ||||
|                 common_log(LOG_ERR, 'queue stomp_enqueue_on setting does not match our server list.'); | ||||
|                 $idx = $this->defaultIdx; | ||||
|             } | ||||
|         } else { | ||||
|             $idx = $this->defaultIdx; | ||||
|         } | ||||
|         return $this->_doEnqueue($object, $queue, $idx, $siteNickname); | ||||
|     } | ||||
|  | ||||
|     /** | ||||
|      * Saves a notice object reference into the queue item table | ||||
|      * on the given connection. | ||||
|      * | ||||
|      * @return boolean true on success | ||||
|      * @throws StompException on connection or send error | ||||
|      */ | ||||
|     protected function _doEnqueue($object, $queue, $idx, $siteNickname=null) | ||||
|     { | ||||
|         $rep = $this->logrep($object); | ||||
|         $envelope = array('site' => $siteNickname ? $siteNickname : common_config('site', 'nickname'), | ||||
|                           'handler' => $queue, | ||||
|                           'payload' => $this->encode($object)); | ||||
|         $msg = base64_encode(serialize($envelope)); | ||||
|  | ||||
|         $props = array('created' => common_sql_now()); | ||||
|         if ($this->isPersistent($queue)) { | ||||
|             $props['persistent'] = 'true'; | ||||
|         } | ||||
|  | ||||
|         $con = $this->cons[$idx]; | ||||
|         $host = $con->getServer(); | ||||
|         $target = $this->queueName($queue); | ||||
|         $result = $con->send($target, $msg, $props); | ||||
|  | ||||
|         if (!$result) { | ||||
|             $this->_log(LOG_ERR, "Error sending $rep to $queue queue on $host $target"); | ||||
|             return false; | ||||
|         } | ||||
|  | ||||
|         $this->_log(LOG_DEBUG, "complete remote queueing $rep for $queue on $host $target"); | ||||
|         $this->stats('enqueued', $queue); | ||||
|         return true; | ||||
|     } | ||||
|  | ||||
|     /** | ||||
|      * Determine whether messages to this queue should be marked as persistent. | ||||
|      * Actual persistent storage depends on the queue server's configuration. | ||||
|      * @param string $queue | ||||
|      * @return bool | ||||
|      */ | ||||
|     protected function isPersistent($queue) | ||||
|     { | ||||
|         $mode = common_config('queue', 'stomp_persistent'); | ||||
|         if (is_array($mode)) { | ||||
|             return in_array($queue, $mode); | ||||
|         } else { | ||||
|             return (bool)$mode; | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     /** | ||||
|      * Send any sockets we're listening on to the IO manager | ||||
|      * to wait for input. | ||||
|      * | ||||
|      * @return array of resources | ||||
|      */ | ||||
|     public function getSockets() | ||||
|     { | ||||
|         $sockets = array(); | ||||
|         foreach ($this->cons as $con) { | ||||
|             if ($con) { | ||||
|                 $sockets[] = $con->getSocket(); | ||||
|             } | ||||
|         } | ||||
|         return $sockets; | ||||
|     } | ||||
|  | ||||
|     /** | ||||
|      * Get the Stomp connection object associated with the given socket. | ||||
|      * @param resource $socket | ||||
|      * @return int index into connections list | ||||
|      * @throws Exception | ||||
|      */ | ||||
|     protected function connectionFromSocket($socket) | ||||
|     { | ||||
|         foreach ($this->cons as $i => $con) { | ||||
|             if ($con && $con->getSocket() === $socket) { | ||||
|                 return $i; | ||||
|             } | ||||
|         } | ||||
|         throw new Exception(__CLASS__ . " asked to read from unrecognized socket"); | ||||
|     } | ||||
|  | ||||
|     /** | ||||
|      * We've got input to handle on our socket! | ||||
|      * Read any waiting Stomp frame(s) and process them. | ||||
|      * | ||||
|      * @param resource $socket | ||||
|      * @return boolean ok on success | ||||
|      */ | ||||
|     public function handleInput($socket) | ||||
|     { | ||||
|         $idx = $this->connectionFromSocket($socket); | ||||
|         $con = $this->cons[$idx]; | ||||
|         $host = $con->getServer(); | ||||
|         $this->defaultIdx = $idx; | ||||
|  | ||||
|         $ok = true; | ||||
|         try { | ||||
|             $frames = $con->readFrames(); | ||||
|         } catch (StompException $e) { | ||||
|             $this->_log(LOG_ERR, "Lost connection to $host: " . $e->getMessage()); | ||||
|             fclose($socket); // ??? | ||||
|             $this->cons[$idx] = null; | ||||
|             $this->transaction[$idx] = null; | ||||
|             $this->disconnect[$idx] = time(); | ||||
|             return false; | ||||
|         } | ||||
|         foreach ($frames as $frame) { | ||||
|             $dest = $frame->headers['destination']; | ||||
|             if ($dest == $this->control) { | ||||
|                 if (!$this->handleControlSignal($frame)) { | ||||
|                     // We got a control event that requests a shutdown; | ||||
|                     // close out and stop handling anything else! | ||||
|                     break; | ||||
|                 } | ||||
|             } else { | ||||
|                 $ok = $this->handleItem($frame) && $ok; | ||||
|             } | ||||
|             $this->ack($idx, $frame); | ||||
|             $this->commit($idx); | ||||
|             $this->begin($idx); | ||||
|         } | ||||
|         return $ok; | ||||
|     } | ||||
|  | ||||
|     /** | ||||
|      * Attempt to reconnect in background if we lost a connection. | ||||
|      */ | ||||
|     function idle() | ||||
|     { | ||||
|         $now = time(); | ||||
|         foreach ($this->cons as $idx => $con) { | ||||
|             if (empty($con)) { | ||||
|                 $age = $now - $this->disconnect[$idx]; | ||||
|                 if ($age >= 60) { | ||||
|                     $this->_reconnect($idx); | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|         return true; | ||||
|     } | ||||
|  | ||||
|     /** | ||||
|      * Initialize our connection and subscribe to all the queues | ||||
|      * we're going to need to handle... If multiple queue servers | ||||
|      * are configured for failover, we'll listen to all of them. | ||||
|      * | ||||
|      * Side effects: in multi-site mode, may reset site configuration. | ||||
|      * | ||||
|      * @param IoMaster $master process/event controller | ||||
|      * @return bool return false on failure | ||||
|      */ | ||||
|     public function start($master) | ||||
|     { | ||||
|         parent::start($master); | ||||
|         $this->_connectAll(); | ||||
|  | ||||
|         foreach ($this->cons as $i => $con) { | ||||
|             if ($con) { | ||||
|                 $this->doSubscribe($con); | ||||
|                 $this->begin($i); | ||||
|             } | ||||
|         } | ||||
|         return true; | ||||
|     } | ||||
|  | ||||
|     /** | ||||
|      * Close out any active connections. | ||||
|      * | ||||
|      * @return bool return false on failure | ||||
|      */ | ||||
|     public function finish() | ||||
|     { | ||||
|         // If there are any outstanding delivered messages we haven't processed, | ||||
|         // free them for another thread to take. | ||||
|         foreach ($this->cons as $i => $con) { | ||||
|             if ($con) { | ||||
|                 $this->rollback($i); | ||||
|                 $con->disconnect(); | ||||
|                 $this->cons[$i] = null; | ||||
|             } | ||||
|         } | ||||
|         return true; | ||||
|     } | ||||
|  | ||||
|     /** | ||||
|      * Lazy open a single connection to Stomp queue server. | ||||
|      * If multiple servers are configured, we let the Stomp client library | ||||
|      * worry about finding a working connection among them. | ||||
|      */ | ||||
|     protected function _connect() | ||||
|     { | ||||
|         if (empty($this->cons)) { | ||||
|             $list = $this->servers; | ||||
|             if (count($list) > 1) { | ||||
|                 shuffle($list); // Randomize to spread load | ||||
|                 $url = 'failover://(' . implode(',', $list) . ')'; | ||||
|             } else { | ||||
|                 $url = $list[0]; | ||||
|             } | ||||
|             $con = $this->_doConnect($url); | ||||
|             $this->cons = array($con); | ||||
|             $this->transactionCount = array(0); | ||||
|             $this->transaction = array(null); | ||||
|             $this->disconnect = array(null); | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     /** | ||||
|      * Lazy open connections to all Stomp servers, if in manual failover | ||||
|      * mode. This means the queue servers don't speak to each other, so | ||||
|      * we have to listen to all of them to make sure we get all events. | ||||
|      */ | ||||
|     protected function _connectAll() | ||||
|     { | ||||
|         if (!common_config('queue', 'stomp_manual_failover')) { | ||||
|             return $this->_connect(); | ||||
|         } | ||||
|         if (empty($this->cons)) { | ||||
|             $this->cons = array(); | ||||
|             $this->transactionCount = array(); | ||||
|             $this->transaction = array(); | ||||
|             foreach ($this->servers as $idx => $server) { | ||||
|                 try { | ||||
|                     $this->cons[] = $this->_doConnect($server); | ||||
|                     $this->disconnect[] = null; | ||||
|                 } catch (Exception $e) { | ||||
|                     // s'okay, we'll live | ||||
|                     $this->cons[] = null; | ||||
|                     $this->disconnect[] = time(); | ||||
|                 } | ||||
|                 $this->transactionCount[] = 0; | ||||
|                 $this->transaction[] = null; | ||||
|             } | ||||
|             if (empty($this->cons)) { | ||||
|                 throw new ServerException("No queue servers reachable..."); | ||||
|                 return false; | ||||
|             } | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     /** | ||||
|      * Attempt to manually reconnect to the Stomp server for the given | ||||
|      * slot. If successful, set up our subscriptions on it. | ||||
|      */ | ||||
|     protected function _reconnect($idx) | ||||
|     { | ||||
|         try { | ||||
|             $con = $this->_doConnect($this->servers[$idx]); | ||||
|         } catch (Exception $e) { | ||||
|             $this->_log(LOG_ERR, $e->getMessage()); | ||||
|             $con = null; | ||||
|         } | ||||
|         if ($con) { | ||||
|             $this->cons[$idx] = $con; | ||||
|             $this->disconnect[$idx] = null; | ||||
|  | ||||
|             $this->doSubscribe($con); | ||||
|             $this->begin($idx); | ||||
|         } else { | ||||
|             // Try again later... | ||||
|             $this->disconnect[$idx] = time(); | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     protected function _doConnect($server) | ||||
|     { | ||||
|         $this->_log(LOG_INFO, "Connecting to '$server' as '$this->username'..."); | ||||
|         $con = new LiberalStomp($server); | ||||
|  | ||||
|         if ($con->connect($this->username, $this->password)) { | ||||
|             $this->_log(LOG_INFO, "Connected."); | ||||
|         } else { | ||||
|             $this->_log(LOG_ERR, 'Failed to connect to queue server'); | ||||
|             throw new ServerException('Failed to connect to queue server'); | ||||
|         } | ||||
|  | ||||
|         return $con; | ||||
|     } | ||||
|  | ||||
|     /** | ||||
|      * Set up all our raw queue subscriptions on the given connection | ||||
|      * @param LiberalStomp $con | ||||
|      */ | ||||
|     protected function doSubscribe(LiberalStomp $con) | ||||
|     { | ||||
|         $host = $con->getServer(); | ||||
|         foreach ($this->subscriptions() as $sub) { | ||||
|             $this->_log(LOG_INFO, "Subscribing to $sub on $host"); | ||||
|             $con->subscribe($sub); | ||||
|         } | ||||
|     } | ||||
|      | ||||
|     /** | ||||
|      * Grab a full list of stomp-side queue subscriptions. | ||||
|      * Will include: | ||||
|      *  - control broadcast channel | ||||
|      *  - shared group queues for active groups | ||||
|      *  - per-handler and per-site breakouts from $config['queue']['breakout'] | ||||
|      *    that are rooted in the active groups. | ||||
|      * | ||||
|      * @return array of strings | ||||
|      */ | ||||
|     protected function subscriptions() | ||||
|     { | ||||
|         $subs = array(); | ||||
|         $subs[] = $this->control; | ||||
|  | ||||
|         foreach ($this->activeGroups as $group) { | ||||
|             $subs[] = $this->base . $group; | ||||
|         } | ||||
|  | ||||
|         foreach ($this->breakout as $spec) { | ||||
|             $parts = explode('/', $spec); | ||||
|             if (count($parts) < 2 || count($parts) > 3) { | ||||
|                 common_log(LOG_ERR, "Bad queue breakout specifier $spec"); | ||||
|             } | ||||
|             if (in_array($parts[0], $this->activeGroups)) { | ||||
|                 $subs[] = $this->base . $spec; | ||||
|             } | ||||
|         } | ||||
|         return array_unique($subs); | ||||
|     } | ||||
|  | ||||
|     /** | ||||
|      * Handle and acknowledge an event that's come in through a queue. | ||||
|      * | ||||
|      * If the queue handler reports failure, the message is requeued for later. | ||||
|      * Missing notices or handler classes will drop the message. | ||||
|      * | ||||
|      * Side effects: in multi-site mode, may reset site configuration to | ||||
|      * match the site that queued the event. | ||||
|      * | ||||
|      * @param StompFrame $frame | ||||
|      * @return bool success | ||||
|      */ | ||||
|     protected function handleItem($frame) | ||||
|     { | ||||
|         $host = $this->cons[$this->defaultIdx]->getServer(); | ||||
|         $message = unserialize(base64_decode($frame->body)); | ||||
|  | ||||
|         if ($message === false) { | ||||
|             $this->_log(LOG_ERR, "Can't unserialize frame: {$frame->body}"); | ||||
|             $this->_log(LOG_ERR, "Unserializable frame length: " . strlen($frame->body)); | ||||
|             return false; | ||||
|         } | ||||
|  | ||||
|         $site = $message['site']; | ||||
|         $queue = $message['handler']; | ||||
|  | ||||
|         if ($this->isDeadLetter($frame, $message)) { | ||||
|             $this->stats('deadletter', $queue); | ||||
| 	        return false; | ||||
|         } | ||||
|  | ||||
|         // @fixme detect failing site switches | ||||
|         $this->switchSite($site); | ||||
|  | ||||
|         try { | ||||
|             $item = $this->decode($message['payload']); | ||||
|         } catch (Exception $e) { | ||||
|             $this->_log(LOG_ERR, "Skipping empty or deleted item in queue $queue from $host"); | ||||
|             $this->stats('baditem', $queue); | ||||
|             return false; | ||||
|         } | ||||
|         $info = $this->logrep($item) . " posted at " . | ||||
|                 $frame->headers['created'] . " in queue $queue from $host"; | ||||
|         $this->_log(LOG_DEBUG, "Dequeued $info"); | ||||
|  | ||||
|         try { | ||||
|             $handler = $this->getHandler($queue); | ||||
|             $ok = $handler->handle($item); | ||||
|         } catch (NoQueueHandlerException $e) { | ||||
|             $this->_log(LOG_ERR, "Missing handler class; skipping $info"); | ||||
|             $this->stats('badhandler', $queue); | ||||
|             return false; | ||||
|         } catch (Exception $e) { | ||||
|             $this->_log(LOG_ERR, "Exception on queue $queue: " . $e->getMessage()); | ||||
|             $ok = false; | ||||
|         } | ||||
|  | ||||
|         if ($ok) { | ||||
|             $this->_log(LOG_INFO, "Successfully handled $info"); | ||||
|             $this->stats('handled', $queue); | ||||
|         } else { | ||||
|             $this->_log(LOG_WARNING, "Failed handling $info"); | ||||
|             // Requeing moves the item to the end of the line for its next try. | ||||
|             // @fixme add a manual retry count | ||||
|             $this->enqueue($item, $queue); | ||||
|             $this->stats('requeued', $queue); | ||||
|         } | ||||
|  | ||||
|         return $ok; | ||||
|     } | ||||
|  | ||||
|     /** | ||||
|      * Check if a redelivered message has been run through enough | ||||
|      * that we're going to give up on it. | ||||
|      * | ||||
|      * @param StompFrame $frame | ||||
|      * @param array $message unserialized message body | ||||
|      * @return boolean true if we should discard | ||||
|      */ | ||||
|     protected function isDeadLetter($frame, $message) | ||||
|     { | ||||
|         if (isset($frame->headers['redelivered']) && $frame->headers['redelivered'] == 'true') { | ||||
| 	        // Message was redelivered, possibly indicating a previous failure. | ||||
|             $msgId = $frame->headers['message-id']; | ||||
|             $site = $message['site']; | ||||
|             $queue = $message['handler']; | ||||
| 	        $msgInfo = "message $msgId for $site in queue $queue"; | ||||
|  | ||||
| 	        $deliveries = $this->incDeliveryCount($msgId); | ||||
| 	        if ($deliveries > common_config('queue', 'max_retries')) { | ||||
| 		        $info = "DEAD-LETTER FILE: Gave up after retry $deliveries on $msgInfo"; | ||||
|  | ||||
| 		        $outdir = common_config('queue', 'dead_letter_dir'); | ||||
| 		        if ($outdir) { | ||||
|     		        $filename = $outdir . "/$site-$queue-" . rawurlencode($msgId); | ||||
|     		        $info .= ": dumping to $filename"; | ||||
|     		        file_put_contents($filename, $message['payload']); | ||||
| 		        } | ||||
|  | ||||
| 		        common_log(LOG_ERR, $info); | ||||
| 		        return true; | ||||
| 	        } else { | ||||
| 	            common_log(LOG_INFO, "retry $deliveries on $msgInfo"); | ||||
| 	        } | ||||
|         } | ||||
|         return false; | ||||
|     } | ||||
|  | ||||
|     /** | ||||
|      * Update count of times we've re-encountered this message recently, | ||||
|      * triggered when we get a message marked as 'redelivered'. | ||||
|      * | ||||
|      * Requires a CLI-friendly cache configuration. | ||||
|      * | ||||
|      * @param string $msgId message-id header from message | ||||
|      * @return int number of retries recorded | ||||
|      */ | ||||
|     function incDeliveryCount($msgId) | ||||
|     { | ||||
| 	    $count = 0; | ||||
| 	    $cache = Cache::instance(); | ||||
| 	    if ($cache) { | ||||
| 		    $key = 'statusnet:stomp:message-retries:' . $msgId; | ||||
| 		    $count = $cache->increment($key); | ||||
| 		    if (!$count) { | ||||
| 			    $count = 1; | ||||
| 			    $cache->set($key, $count, null, 3600); | ||||
| 			    $got = $cache->get($key); | ||||
| 		    } | ||||
| 	    } | ||||
| 	    return $count; | ||||
|     } | ||||
|  | ||||
|     /** | ||||
|      * Process a control signal broadcast. | ||||
|      * | ||||
|      * @param int $idx connection index | ||||
|      * @param array $frame Stomp frame | ||||
|      * @return bool true to continue; false to stop further processing. | ||||
|      */ | ||||
|     protected function handleControlSignal($idx, $frame) | ||||
|     { | ||||
|         $message = trim($frame->body); | ||||
|         if (strpos($message, ':') !== false) { | ||||
|             list($event, $param) = explode(':', $message, 2); | ||||
|         } else { | ||||
|             $event = $message; | ||||
|             $param = ''; | ||||
|         } | ||||
|  | ||||
|         $shutdown = false; | ||||
|  | ||||
|         if ($event == 'shutdown') { | ||||
|             $this->master->requestShutdown(); | ||||
|             $shutdown = true; | ||||
|         } else if ($event == 'restart') { | ||||
|             $this->master->requestRestart(); | ||||
|             $shutdown = true; | ||||
|         } else if ($event == 'update') { | ||||
|             $this->updateSiteConfig($param); | ||||
|         } else { | ||||
|             $this->_log(LOG_ERR, "Ignoring unrecognized control message: $message"); | ||||
|         } | ||||
|         return $shutdown; | ||||
|     } | ||||
|  | ||||
|     /** | ||||
|      * Switch site, if necessary, and reset current handler assignments | ||||
|      * @param string $site | ||||
|      */ | ||||
|     function switchSite($site) | ||||
|     { | ||||
|         if ($site != GNUsocial::currentSite()) { | ||||
|             $this->stats('switch'); | ||||
|             GNUsocial::switchSite($site); | ||||
|             $this->initialize(); | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     /** | ||||
|      * (Re)load runtime configuration for a given site by nickname, | ||||
|      * triggered by a broadcast to the 'statusnet-control' topic. | ||||
|      * | ||||
|      * Configuration changes in database should update, but config | ||||
|      * files might not. | ||||
|      * | ||||
|      * @param array $frame Stomp frame | ||||
|      * @return bool true to continue; false to stop further processing. | ||||
|      */ | ||||
|     protected function updateSiteConfig($nickname) | ||||
|     { | ||||
|         $sn = Status_network::getKV('nickname', $nickname); | ||||
|         if ($sn) { | ||||
|             $this->switchSite($nickname); | ||||
|             if (!in_array($nickname, $this->sites)) { | ||||
|                 $this->addSite(); | ||||
|             } | ||||
|             $this->stats('siteupdate'); | ||||
|         } else { | ||||
|             $this->_log(LOG_ERR, "Ignoring ping for unrecognized new site $nickname"); | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     /** | ||||
|      * Combines the queue_basename from configuration with the | ||||
|      * group name for this queue to give eg: | ||||
|      * | ||||
|      * /queue/statusnet/main | ||||
|      * /queue/statusnet/main/distrib | ||||
|      * /queue/statusnet/xmpp/xmppout/site01 | ||||
|      * | ||||
|      * @param string $queue | ||||
|      * @return string | ||||
|      */ | ||||
|     protected function queueName($queue) | ||||
|     { | ||||
|         $group = $this->queueGroup($queue); | ||||
|         $site = GNUsocial::currentSite(); | ||||
|  | ||||
|         $specs = array("$group/$queue/$site", | ||||
|                        "$group/$queue"); | ||||
|         foreach ($specs as $spec) { | ||||
|             if (in_array($spec, $this->breakout)) { | ||||
|                 return $this->base . $spec; | ||||
|             } | ||||
|         } | ||||
|         return $this->base . $group; | ||||
|     } | ||||
|  | ||||
|     /** | ||||
|      * Get the breakout mode for the given queue on the current site. | ||||
|      * | ||||
|      * @param string $queue | ||||
|      * @return string one of 'shared', 'handler', 'site' | ||||
|      */ | ||||
|     protected function breakoutMode($queue) | ||||
|     { | ||||
|         $breakout = common_config('queue', 'breakout'); | ||||
|         if (isset($breakout[$queue])) { | ||||
|             return $breakout[$queue]; | ||||
|         } else if (isset($breakout['*'])) { | ||||
|             return $breakout['*']; | ||||
|         } else { | ||||
|             return 'shared'; | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     protected function begin($idx) | ||||
|     { | ||||
|         if ($this->useTransactions) { | ||||
|             if (!empty($this->transaction[$idx])) { | ||||
|                 throw new Exception("Tried to start transaction in the middle of a transaction"); | ||||
|             } | ||||
|             $this->transactionCount[$idx]++; | ||||
|             $this->transaction[$idx] = $this->master->id . '-' . $this->transactionCount[$idx] . '-' . time(); | ||||
|             $this->cons[$idx]->begin($this->transaction[$idx]); | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     protected function ack($idx, $frame) | ||||
|     { | ||||
|         if ($this->useAcks) { | ||||
|             if ($this->useTransactions) { | ||||
|                 if (empty($this->transaction[$idx])) { | ||||
|                     throw new Exception("Tried to ack but not in a transaction"); | ||||
|                 } | ||||
|                 $this->cons[$idx]->ack($frame, $this->transaction[$idx]); | ||||
|             } else { | ||||
|                 $this->cons[$idx]->ack($frame); | ||||
|             } | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     protected function commit($idx) | ||||
|     { | ||||
|         if ($this->useTransactions) { | ||||
|             if (empty($this->transaction[$idx])) { | ||||
|                 throw new Exception("Tried to commit but not in a transaction"); | ||||
|             } | ||||
|             $this->cons[$idx]->commit($this->transaction[$idx]); | ||||
|             $this->transaction[$idx] = null; | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     protected function rollback($idx) | ||||
|     { | ||||
|         if ($this->useTransactions) { | ||||
|             if (empty($this->transaction[$idx])) { | ||||
|                 throw new Exception("Tried to rollback but not in a transaction"); | ||||
|             } | ||||
|             $this->cons[$idx]->commit($this->transaction[$idx]); | ||||
|             $this->transaction[$idx] = null; | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
							
								
								
									
										72
									
								
								plugins/StompQueue/StompQueuePlugin.php
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										72
									
								
								plugins/StompQueue/StompQueuePlugin.php
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,72 @@ | ||||
| <?php | ||||
| // This file is part of GNU social - https://www.gnu.org/software/social | ||||
| // | ||||
| // GNU social is free software: you can redistribute it and/or modify | ||||
| // it under the terms of the GNU Affero General Public License as published by | ||||
| // the Free Software Foundation, either version 3 of the License, or | ||||
| // (at your option) any later version. | ||||
| // | ||||
| // GNU social is distributed in the hope that it will be useful, | ||||
| // but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
| // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the | ||||
| // GNU Affero General Public License for more details. | ||||
| // | ||||
| // You should have received a copy of the GNU Affero General Public License | ||||
| // along with GNU social.  If not, see <http://www.gnu.org/licenses/>. | ||||
|  | ||||
| /** | ||||
|  * STOMP interface for GNU social queues | ||||
|  * | ||||
|  * @package   GNUsocial | ||||
|  * @author    Miguel Dantas <biodantasgs@gmail.com> | ||||
|  * @copyright 2019 Free Software Foundation, Inc http://www.fsf.org | ||||
|  * @license   https://www.gnu.org/licenses/agpl.html GNU AGPL v3 or later | ||||
|  */ | ||||
|  | ||||
| defined('GNUSOCIAL') || die(); | ||||
|  | ||||
| class StompQueuePlugin extends Plugin | ||||
| { | ||||
|     const PLUGIN_VERSION = '0.0.1'; | ||||
|  | ||||
|     // settings which can be set in config.php with addPlugin('StompQueue', ['param'=>'value', ...]); | ||||
|     public $servers = null; | ||||
|     public $vhost = ''; | ||||
|     public $username = 'guest'; | ||||
|     public $password = 'guest'; | ||||
|     public $basename = ''; | ||||
|     public $control = 'gnusocial:control'; | ||||
|     public $breakout; | ||||
|     public $useTransactions = false; | ||||
|     public $useAcks = false; | ||||
|     public $manualFailover = false; | ||||
|     public $defaultIdx = 0; | ||||
|     public $persistent = []; | ||||
|  | ||||
|     public function onStartNewQueueManager(?QueueManager &$qm) | ||||
|     { | ||||
|         if (empty($this->servers)) { | ||||
|             throw new ServerException('Invalid STOMP server address'); | ||||
|         } elseif (!is_array($this->servers)) { | ||||
|             $this->servers = [$this->servers]; | ||||
|         } | ||||
|  | ||||
|         if (empty($this->basename)) { | ||||
|             $this->basename = 'queue:gnusocial-' . common_config('site', 'name') . ':'; | ||||
|         } | ||||
|  | ||||
|         $qm = new StompQueueManager($this); | ||||
|         return false; | ||||
|     } | ||||
|  | ||||
|     public function onPluginVersion(array &$versions): bool | ||||
|     { | ||||
|         $versions[] = array('name' => 'StompQueue', | ||||
|                             'version' => self::VERSION, | ||||
|                             'author' => 'Miguel Dantas', | ||||
|                             'description' => | ||||
|                             // TRANS: Plugin description. | ||||
|                             _m('Plugin implementing STOMP as a backend for GNU social queues')); | ||||
|         return true; | ||||
|     } | ||||
| }; | ||||
							
								
								
									
										622
									
								
								plugins/StompQueue/classes/StompQueueManager.php
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										622
									
								
								plugins/StompQueue/classes/StompQueueManager.php
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,622 @@ | ||||
| <?php | ||||
|  | ||||
| use Stomp\Client; | ||||
| use Stomp\Exception\StompException; | ||||
| use Stomp\Network\Connection; | ||||
| use Stomp\Transport\Frame; | ||||
| use Stomp\Transport\Message; | ||||
| use Stomp\StatefulStomp; | ||||
|  | ||||
| class StompQueueManager extends QueueManager | ||||
| { | ||||
|     protected $pl = null; | ||||
|     protected $stomps = null; | ||||
|     protected $transaction; | ||||
|     protected $transactionCount; | ||||
|     protected $activeGroups; | ||||
|  | ||||
|     public function __construct($pl) | ||||
|     { | ||||
|         parent::__construct(); | ||||
|         $this->pl = $pl; | ||||
|     } | ||||
|  | ||||
|     /** | ||||
|      * Initialize our connection and subscribe to all the queues | ||||
|      * we're going to need to handle... If multiple queue servers | ||||
|      * are configured for failover, we'll listen to all of them. | ||||
|      * | ||||
|      * Side effects: in multi-site mode, may reset site configuration. | ||||
|      * | ||||
|      * @param IoMaster $master process/event controller | ||||
|      * @return bool return false on failure | ||||
|      * @throws ServerException | ||||
|      */ | ||||
|     public function start($master) | ||||
|     { | ||||
|         parent::start($master); | ||||
|         common_debug("Starting STOMP queue manager"); | ||||
|         return $this->_ensureConn(); | ||||
|     } | ||||
|  | ||||
|     /** | ||||
|      * Close out any active connections. | ||||
|      * | ||||
|      * @return bool return false on failure | ||||
|      * @throws Exception | ||||
|      */ | ||||
|     public function finish() | ||||
|     { | ||||
|         foreach ($this->stomps as $idx => $stomp) { | ||||
|             common_log(LOG_INFO, "Unsubscribing on: " . $stomp->getClient()->getConnection()->getHost()); | ||||
|             $stomp->unsubscribe(); | ||||
|         } | ||||
|         // If there are any outstanding delivered messages we haven't processed, | ||||
|         // free them for another thread to take. | ||||
|         foreach ($this->stomps as $i => $st) { | ||||
|             if ($st) { | ||||
|                 $this->rollback($i); | ||||
|                 $st->getClient()->disconnect(); | ||||
|                 $this->stomps[$i] = null; | ||||
|             } | ||||
|         } | ||||
|         return true; | ||||
|     } | ||||
|  | ||||
|  | ||||
|     /** | ||||
|      * Lazy open connections to all STOMP servers, if in manual failover | ||||
|      * mode. This means the queue servers don't speak to each other, so | ||||
|      * we have to listen to all of them to make sure we get all events. | ||||
|      */ | ||||
|     private function _ensureConn() | ||||
|     { | ||||
|         if ($this->stomps === null) { | ||||
|             if (!$this->pl->manualFailover) { | ||||
|                 $list = $this->pl->servers; | ||||
|                 if (count($list) > 1) { | ||||
|                     shuffle($list); // Randomize to spread load | ||||
|                     $url = 'failover://(' . implode(',', $list) . ')'; | ||||
|                 } else { | ||||
|                     $url = $list[0]; | ||||
|                 } | ||||
|                 $st = $this->_doConnect($url); | ||||
|                 $this->stomps = [$st]; | ||||
|                 $this->transactionCount = [0]; | ||||
|                 $this->transaction = [null]; | ||||
|                 $this->disconnect = [null]; | ||||
|             } else { | ||||
|                 $this->stomps = []; | ||||
|                 $this->transactionCount = []; | ||||
|                 $this->transaction = []; | ||||
|                 foreach ($this->pl->servers as $_ => $url) { | ||||
|                     try { | ||||
|                         $this->stomps[] = $this->_doConnect($url); | ||||
|                         $this->disconnect[] = null; // If the previous succeeded | ||||
|                     } catch (Exception $e) { | ||||
|                         // s'okay, we'll live | ||||
|                         $this->stomps[] = null; | ||||
|                         $this->disconnect[] = time(); | ||||
|                     } | ||||
|                     $this->transactionCount[] = 0; | ||||
|                     $this->transaction[] = null; | ||||
|                 } | ||||
|             } | ||||
|             // Done attempting connections | ||||
|             if (empty($this->stomps)) { | ||||
|                 throw new ServerException("No STOMP queue servers reachable"); | ||||
|             } else { | ||||
|                 foreach ($this->stomps as $i => $st) { | ||||
|                     if ($st) { | ||||
|                         $this->subscribe($st); | ||||
|                         $this->begin($i); | ||||
|                     } | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|         return true; | ||||
|     } | ||||
|  | ||||
|     protected function _doConnect($server_url) | ||||
|     { | ||||
|         common_debug("STOMP: connecting to '{$server_url}' as '{$this->pl->username}'..."); | ||||
|         $cl = new Client($server_url); | ||||
|         $cl->setLogin($this->pl->username, $this->pl->password); | ||||
|         $cl->setVhostname($this->pl->vhost); | ||||
|  | ||||
|         try { | ||||
|             $cl->connect(); | ||||
|             common_debug("STOMP connected."); | ||||
|         } catch (StompException $e) { | ||||
|             common_log(LOG_ERR, 'Failed to connect to STOMP queue server'); | ||||
|             throw new ServerException('Failed to connect to STOMP queue server'); | ||||
|         } | ||||
|  | ||||
|         return new StatefulStomp($cl); | ||||
|     } | ||||
|  | ||||
|     /** | ||||
|      * Grab a full list of stomp-side queue subscriptions. | ||||
|      * Will include: | ||||
|      *  - control broadcast channel | ||||
|      *  - shared group queues for active groups | ||||
|      *  - per-handler and per-site breakouts that are rooted in the active groups. | ||||
|      * | ||||
|      * @return array of strings | ||||
|      */ | ||||
|     protected function subscriptions(): array | ||||
|     { | ||||
|         $subs = []; | ||||
|         $subs[] = $this->pl->control; | ||||
|  | ||||
|         foreach ($this->activeGroups as $group) { | ||||
|             $subs[] = $this->pl->basename . $group; | ||||
|         } | ||||
|  | ||||
|         foreach ($this->pl->breakout as $spec) { | ||||
|             $parts = explode(':', $spec); | ||||
|             if (count($parts) < 2 || count($parts) > 3) { | ||||
|                 common_log(LOG_ERR, "Bad queue breakout specifier '{$spec}'"); | ||||
|             } | ||||
|             if (in_array($parts[0], $this->activeGroups)) { | ||||
|                 $subs[] = $this->pl->basename . $spec; | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         return array_unique($subs); | ||||
|     } | ||||
|  | ||||
|     /** | ||||
|      * Set up all our raw queue subscriptions on the given connection | ||||
|      * @param Client $st | ||||
|      */ | ||||
|     protected function subscribe(StatefulStomp $st) | ||||
|     { | ||||
|         $host = $st->getClient()->getConnection()->getHost(); | ||||
|         foreach ($this->subscriptions() as $sub) { | ||||
|             if (!in_array($sub, $this->subscriptions)) { | ||||
|                 $this->_log(LOG_INFO, "Subscribing to '{$sub}' on '{$host}'"); | ||||
|                 try { | ||||
|                     $st->subscribe($sub); | ||||
|                 } catch (Exception $e) { | ||||
|                     common_log(LOG_ERR, "STOMP received exception: " . get_class($e) . | ||||
|                                " while trying to subscribe: " . $e->getMessage()); | ||||
|                     throw $e; | ||||
|                 } | ||||
|  | ||||
|                 $this->subscriptions[] = $sub; | ||||
|             } | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     protected function begin($idx) | ||||
|     { | ||||
|         if ($this->pl->useTransactions) { | ||||
|             if (!empty($this->transaction[$idx])) { | ||||
|                 throw new Exception("Tried to start transaction in the middle of a transaction"); | ||||
|             } | ||||
|             $this->transactionCount[$idx]++; | ||||
|             $this->transaction[$idx] = $this->master->id . '-' . $this->transactionCount[$idx] . '-' . time(); | ||||
|             $this->stomps[$idx]->begin($this->transaction[$idx]); | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     protected function ack($idx, $frame) | ||||
|     { | ||||
|         if ($this->pl->useAcks) { | ||||
|             if ($this->pl->useTransactions) { | ||||
|                 if (empty($this->transaction[$idx])) { | ||||
|                     throw new Exception("Tried to ack but not in a transaction"); | ||||
|                 } | ||||
|                 $this->stomps[$idx]->ack($frame, $this->transaction[$idx]); | ||||
|             } else { | ||||
|                 $this->stomps[$idx]->ack($frame); | ||||
|             } | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     protected function commit($idx) | ||||
|     { | ||||
|         if ($this->useTransactions) { | ||||
|             if (empty($this->transaction[$idx])) { | ||||
|                 throw new Exception("Tried to commit but not in a transaction"); | ||||
|             } | ||||
|             $this->stomps[$idx]->commit($this->transaction[$idx]); | ||||
|             $this->transaction[$idx] = null; | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     protected function rollback($idx) | ||||
|     { | ||||
|         if ($this->useTransactions) { | ||||
|             if (empty($this->transaction[$idx])) { | ||||
|                 throw new Exception("Tried to rollback but not in a transaction"); | ||||
|             } | ||||
|             $this->stomps[$idx]->abort($this->transaction[$idx]); | ||||
|             $this->transaction[$idx] = null; | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     /** | ||||
|      * Saves an object into the queue item table. | ||||
|      * | ||||
|      * @param mixed $object | ||||
|      * @param string $queue | ||||
|      * @param string $siteNickname optional override to drop into another site's queue | ||||
|      * @throws Exception | ||||
|      */ | ||||
|     public function enqueue($object, $queue, $siteNickname = null) | ||||
|     { | ||||
|         $this->_ensureConn(); | ||||
|         $idx = $this->pl->defaultIdx; | ||||
|         $rep = $this->logrep($object); | ||||
|         $envelope = ['site' => $siteNickname ?: common_config('site', 'nickname'), | ||||
|                      'handler' => $queue, | ||||
|                      'payload' => $this->encode($object)]; | ||||
|         $msg = base64_encode(serialize($envelope)); | ||||
|  | ||||
|         $props = ['created' => common_sql_now()]; | ||||
|         if ($this->isPersistent($queue)) { | ||||
|             $props['persistent'] = 'true'; | ||||
|         } | ||||
|  | ||||
|         $st = $this->stomps[$idx]; | ||||
|         $host = $st->getClient()->getConnection()->getHost(); | ||||
|         $target = $this->queueName($queue); | ||||
|  | ||||
|         $result = $st->send($target, new Message($msg), $props); | ||||
|  | ||||
|         if (!$result) { | ||||
|             common_log(LOG_ERR, "STOMP error sending $rep to $queue queue on $host $target"); | ||||
|             return false; | ||||
|         } | ||||
|  | ||||
|         common_debug("STOMP complete remote queueing $rep for queue `$queue` on host `$host` on channel `$target`"); | ||||
|         $this->stats('enqueued', $queue); | ||||
|         return true; | ||||
|     } | ||||
|  | ||||
|     /** | ||||
|      * Determine whether messages to this queue should be marked as persistent. | ||||
|      * Actual persistent storage depends on the queue server's configuration. | ||||
|      * @param string $queue | ||||
|      * @return bool | ||||
|      */ | ||||
|     protected function isPersistent($queue) | ||||
|     { | ||||
|         $mode = $this->pl->persistent; | ||||
|         if (is_array($mode)) { | ||||
|             return in_array($queue, $mode); | ||||
|         } else { | ||||
|             return (bool)$mode; | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     /** | ||||
|      * Send any sockets we're listening on to the IO manager | ||||
|      * to wait for input. | ||||
|      * | ||||
|      * @return array of resources | ||||
|      */ | ||||
|     public function getSockets() | ||||
|     { | ||||
|         $sockets = []; | ||||
|         foreach ($this->stomps as $st) { | ||||
|             if ($st) { | ||||
|                 $sockets[] = $st->getClient()->getConnection(); | ||||
|             } | ||||
|         } | ||||
|         return $sockets; | ||||
|     } | ||||
|  | ||||
|     /** | ||||
|      * Get the Stomp connection object associated with the given socket. | ||||
|      * @param resource $socket | ||||
|      * @return int index into connections list | ||||
|      * @throws Exception | ||||
|      */ | ||||
|     protected function connectionFromSocket($socket) | ||||
|     { | ||||
|         foreach ($this->stomps as $i => $st) { | ||||
|             if ($st && $st->getConnection() === $socket) { | ||||
|                 return $i; | ||||
|             } | ||||
|         } | ||||
|         throw new Exception(__CLASS__ . " asked to read from unrecognized socket"); | ||||
|     } | ||||
|  | ||||
|     /** | ||||
|      * Handle and acknowledge an event that's come in through a queue. | ||||
|      * | ||||
|      * If the queue handler reports failure, the message is requeued for later. | ||||
|      * Missing notices or handler classes will drop the message. | ||||
|      * | ||||
|      * Side effects: in multi-site mode, may reset site configuration to | ||||
|      * match the site that queued the event. | ||||
|      * | ||||
|      * @param Frame $frame | ||||
|      * @return bool success | ||||
|      * @throws ConfigException | ||||
|      * @throws NoConfigException | ||||
|      * @throws ServerException | ||||
|      * @throws StompException | ||||
|      */ | ||||
|     protected function handleItem($frame): bool | ||||
|     { | ||||
|         $host = $this->stomps[$this->pl->defaultIdx]->getHost(); | ||||
|         $message = unserialize(base64_decode($frame->body)); | ||||
|  | ||||
|         if ($message === false) { | ||||
|             common_log(LOG_ERR, "STOMP can't unserialize frame: {$frame->body}\n" . | ||||
|                        'Unserializable frame length: ' . strlen($frame->body)); | ||||
|             return false; | ||||
|         } | ||||
|  | ||||
|         $site = $message['site']; | ||||
|         $queue = $message['handler']; | ||||
|  | ||||
|         if ($this->isDeadLetter($frame, $message)) { | ||||
|             $this->stats('deadletter', $queue); | ||||
|             return false; | ||||
|         } | ||||
|  | ||||
|         // @fixme detect failing site switches | ||||
|         $this->switchSite($site); | ||||
|  | ||||
|         try { | ||||
|             $item = $this->decode($message['payload']); | ||||
|         } catch (Exception $e) { | ||||
|             common_log(LOG_ERR, "Skipping empty or deleted item in queue {$queue} from {$host}"); | ||||
|             $this->stats('baditem', $queue); | ||||
|             return false; | ||||
|         } | ||||
|  | ||||
|         $info = $this->logrep($item) . ' posted at ' . | ||||
|                 $frame->getHeaders()['created'] . " in queue {$queue} from {$host}"; | ||||
|         try { | ||||
|             $handler = $this->getHandler($queue); | ||||
|             $ok = $handler->handle($item); | ||||
|         } catch (NoQueueHandlerException $e) { | ||||
|             common_log(LOG_ERR, "Missing handler class; skipping $info"); | ||||
|             $this->stats('badhandler', $queue); | ||||
|             return false; | ||||
|         } catch (Exception $e) { | ||||
|             common_log(LOG_ERR, "Exception on queue $queue: " . $e->getMessage()); | ||||
|             $ok = false; | ||||
|         } | ||||
|  | ||||
|         if ($ok) { | ||||
|             common_log(LOG_INFO, "Successfully handled $info"); | ||||
|             $this->stats('handled', $queue); | ||||
|         } else { | ||||
|             common_log(LOG_WARNING, "Failed handling $info"); | ||||
|             // Requeing moves the item to the end of the line for its next try. | ||||
|             // @fixme add a manual retry count | ||||
|             $this->enqueue($item, $queue); | ||||
|             $this->stats('requeued', $queue); | ||||
|         } | ||||
|  | ||||
|         return $ok; | ||||
|     } | ||||
|  | ||||
|     /** | ||||
|      * Check if a redelivered message has been run through enough | ||||
|      * that we're going to give up on it. | ||||
|      * | ||||
|      * @param Frame $frame | ||||
|      * @param array $message unserialized message body | ||||
|      * @return bool true if we should discard | ||||
|      */ | ||||
|     protected function isDeadLetter($frame, $message) | ||||
|     { | ||||
|         if (isset($frame->getHeaders()['redelivered']) && $frame->getHeaders()['redelivered'] == 'true') { | ||||
|             // Message was redelivered, possibly indicating a previous failure. | ||||
|             $msgId = $frame->getHeaders()['message-id']; | ||||
|             $site = $message['site']; | ||||
|             $queue = $message['handler']; | ||||
|             $msgInfo = "message $msgId for $site in queue $queue"; | ||||
|  | ||||
|             $deliveries = $this->incDeliveryCount($msgId); | ||||
|             if ($deliveries > common_config('queue', 'max_retries')) { | ||||
|                 $info = "DEAD-LETTER FILE: Gave up after retry $deliveries on $msgInfo"; | ||||
|  | ||||
|                 $outdir = common_config('queue', 'dead_letter_dir'); | ||||
|                 if ($outdir) { | ||||
|                     $filename = $outdir . "/$site-$queue-" . rawurlencode($msgId); | ||||
|                     $info .= ": dumping to $filename"; | ||||
|                     file_put_contents($filename, $message['payload']); | ||||
|                 } | ||||
|  | ||||
|                 common_log(LOG_ERR, $info); | ||||
|                 return true; | ||||
|             } else { | ||||
|                 common_log(LOG_INFO, "retry $deliveries on $msgInfo"); | ||||
|             } | ||||
|         } | ||||
|         return false; | ||||
|     } | ||||
|  | ||||
|     /** | ||||
|      * Update count of times we've re-encountered this message recently, | ||||
|      * triggered when we get a message marked as 'redelivered'. | ||||
|      * | ||||
|      * Requires a CLI-friendly cache configuration. | ||||
|      * | ||||
|      * @param string $msgId message-id header from message | ||||
|      * @return int number of retries recorded | ||||
|      */ | ||||
|     function incDeliveryCount($msgId) | ||||
|     { | ||||
|         $count = 0; | ||||
|         $cache = Cache::instance(); | ||||
|         if ($cache) { | ||||
|             $key = 'gnusocial:stomp:message-retries:' . $msgId; | ||||
|             $count = $cache->increment($key); | ||||
|             if (!$count) { | ||||
|                 $count = 1; | ||||
|                 $cache->set($key, $count, null, 3600); | ||||
|             } | ||||
|         } | ||||
|         return $count; | ||||
|     } | ||||
|  | ||||
|     /** | ||||
|      * Combines the queue_basename from configuration with the | ||||
|      * group name for this queue to give eg: | ||||
|      * | ||||
|      * /queue/statusnet/main | ||||
|      * /queue/statusnet/main/distrib | ||||
|      * /queue/statusnet/xmpp/xmppout/site01 | ||||
|      * | ||||
|      * @param string $queue | ||||
|      * @return string | ||||
|      * @throws Exception | ||||
|      */ | ||||
|     protected function queueName(string $queue): string | ||||
|     { | ||||
|         $group = $this->queueGroup($queue); | ||||
|         $site = GNUsocial::currentSite(); | ||||
|  | ||||
|         foreach (["$group:$queue:$site", "$group:$queue"] as $spec) { | ||||
|             if (in_array($spec, $this->breakout)) { | ||||
|                 return $this->pl->basename . $spec; | ||||
|             } | ||||
|         } | ||||
|         return $this->pl->basename . $group; | ||||
|     } | ||||
|  | ||||
|     /** | ||||
|      * Get the breakout mode for the given queue on the current site. | ||||
|      * | ||||
|      * @param string $queue | ||||
|      * @return string one of 'shared', 'handler', 'site' | ||||
|      */ | ||||
|     protected function breakoutMode($queue) | ||||
|     { | ||||
|         if (isset($this->pl->breakout[$queue])) { | ||||
|             return $this->pl->breakout[$queue]; | ||||
|         } else if (isset($this->pl->breakout['*'])) { | ||||
|             return $this->pl->breakout['*']; | ||||
|         } else { | ||||
|             return 'shared'; | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     /** | ||||
|      * Tell the i/o master we only need a single instance to cover | ||||
|      * all sites running in this process. | ||||
|      */ | ||||
|     public static function multiSite() | ||||
|     { | ||||
|         return IoManager::INSTANCE_PER_PROCESS; | ||||
|     } | ||||
|  | ||||
|     /** | ||||
|      * Optional; ping any running queue handler daemons with a notification | ||||
|      * such as announcing a new site to handle or requesting clean shutdown. | ||||
|      * This avoids having to restart all the daemons manually to update configs | ||||
|      * and such. | ||||
|      * | ||||
|      * Currently only relevant for multi-site queue managers such as Stomp. | ||||
|      * | ||||
|      * @param string $event event key | ||||
|      * @param string $param optional parameter to append to key | ||||
|      * @return bool success | ||||
|      */ | ||||
|     public function sendControlSignal($event, $param = '') | ||||
|     { | ||||
|         $message = $event; | ||||
|         if ($param != '') { | ||||
|             $message .= ':' . $param; | ||||
|         } | ||||
|         $this->_ensureConn(); | ||||
|         $st = $this->stomps[$this->pl->defaultIdx]; | ||||
|         $result = $st->send($this->pl->control, $message, ['created' => common_sql_now()]); | ||||
|         if ($result) { | ||||
|             common_log(LOG_INFO, "Sent control ping to STOMP queue daemons: $message"); | ||||
|             return true; | ||||
|         } else { | ||||
|             common_log(LOG_ERR, "Failed sending control ping to STOMP queue daemons: $message"); | ||||
|             return false; | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     /** | ||||
|      * Process a control signal broadcast. | ||||
|      * | ||||
|      * @param int $idx connection index | ||||
|      * @param array $frame Stomp frame | ||||
|      * @return bool true to continue; false to stop further processing. | ||||
|      * @throws ConfigException | ||||
|      * @throws NoConfigException | ||||
|      * @throws ServerException | ||||
|      */ | ||||
|     protected function handleControlSignal(int $idx, $frame): bool | ||||
|     { | ||||
|         $message = trim($frame->body); | ||||
|         if (strpos($message, ':') !== false) { | ||||
|             list($event, $param) = explode(':', $message, 2); | ||||
|         } else { | ||||
|             $event = $message; | ||||
|             $param = ''; | ||||
|         } | ||||
|  | ||||
|         $shutdown = false; | ||||
|  | ||||
|         if ($event == 'shutdown') { | ||||
|             $this->master->requestShutdown(); | ||||
|             $shutdown = true; | ||||
|         } else if ($event == 'restart') { | ||||
|             $this->master->requestRestart(); | ||||
|             $shutdown = true; | ||||
|         } else if ($event == 'update') { | ||||
|             $this->updateSiteConfig($param); | ||||
|         } else { | ||||
|             common_log(LOG_ERR, "Ignoring unrecognized control message: $message"); | ||||
|         } | ||||
|         return $shutdown; | ||||
|     } | ||||
|  | ||||
|     /** | ||||
|      * Switch site, if necessary, and reset current handler assignments | ||||
|      * @param string $site | ||||
|      * @throws ConfigException | ||||
|      * @throws NoConfigException | ||||
|      * @throws ServerException | ||||
|      */ | ||||
|     function switchSite($site) | ||||
|     { | ||||
|         if ($site != GNUsocial::currentSite()) { | ||||
|             $this->stats('switch'); | ||||
|             GNUsocial::switchSite($site); | ||||
|             $this->initialize(); | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     /** | ||||
|      * (Re)load runtime configuration for a given site by nickname, | ||||
|      * triggered by a broadcast to the 'statusnet-control' topic. | ||||
|      * | ||||
|      * Configuration changes in database should update, but config | ||||
|      * files might not. | ||||
|      * | ||||
|      * @param $nickname | ||||
|      * @return void true to continue; false to stop further processing. | ||||
|      * @throws ConfigException | ||||
|      * @throws NoConfigException | ||||
|      * @throws ServerException | ||||
|      */ | ||||
|     protected function updateSiteConfig($nickname) | ||||
|     { | ||||
|         $sn = Status_network::getKV('nickname', $nickname); | ||||
|         if ($sn) { | ||||
|             $this->switchSite($nickname); | ||||
|             if (!in_array($nickname, $this->sites)) { | ||||
|                 $this->addSite(); | ||||
|             } | ||||
|             $this->stats('siteupdate'); | ||||
|         } else { | ||||
|             common_log(LOG_ERR, "Ignoring ping for unrecognized new site $nickname"); | ||||
|         } | ||||
|     } | ||||
| }; | ||||
		Reference in New Issue
	
	Block a user