| 
									
										
										
										
											2009-06-28 14:38:31 -04:00
										 |  |  | <?php | 
					
						
							|  |  |  | /** | 
					
						
							| 
									
										
										
										
											2009-08-25 18:12:20 -04:00
										 |  |  |  * StatusNet, the distributed open-source microblogging tool | 
					
						
							| 
									
										
										
										
											2009-06-28 14:38:31 -04:00
										 |  |  |  * | 
					
						
							|  |  |  |  * Abstract class for queue managers | 
					
						
							|  |  |  |  * | 
					
						
							|  |  |  |  * PHP version 5 | 
					
						
							|  |  |  |  * | 
					
						
							|  |  |  |  * LICENCE: This program is free software: you can redistribute it and/or modify | 
					
						
							|  |  |  |  * it under the terms of the GNU Affero General Public License as published by | 
					
						
							|  |  |  |  * the Free Software Foundation, either version 3 of the License, or | 
					
						
							|  |  |  |  * (at your option) any later version. | 
					
						
							|  |  |  |  * | 
					
						
							|  |  |  |  * This program is distributed in the hope that it will be useful, | 
					
						
							|  |  |  |  * but WITHOUT ANY WARRANTY; without even the implied warranty of | 
					
						
							|  |  |  |  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the | 
					
						
							|  |  |  |  * GNU Affero General Public License for more details. | 
					
						
							|  |  |  |  * | 
					
						
							|  |  |  |  * You should have received a copy of the GNU Affero General Public License | 
					
						
							|  |  |  |  * along with this program.  If not, see <http://www.gnu.org/licenses/>. | 
					
						
							|  |  |  |  * | 
					
						
							|  |  |  |  * @category  QueueManager | 
					
						
							| 
									
										
										
										
											2009-08-25 18:12:20 -04:00
										 |  |  |  * @package   StatusNet | 
					
						
							| 
									
										
										
										
											2009-08-25 18:19:04 -04:00
										 |  |  |  * @author    Evan Prodromou <evan@status.net> | 
					
						
							|  |  |  |  * @author    Sarven Capadisli <csarven@status.net> | 
					
						
							| 
									
										
										
										
											2009-08-25 18:12:20 -04:00
										 |  |  |  * @copyright 2009 StatusNet, Inc. | 
					
						
							| 
									
										
										
										
											2009-06-28 14:38:31 -04:00
										 |  |  |  * @license   http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0 | 
					
						
							| 
									
										
										
										
											2009-08-25 18:16:46 -04:00
										 |  |  |  * @link      http://status.net/ | 
					
						
							| 
									
										
										
										
											2009-06-28 14:38:31 -04:00
										 |  |  |  */ | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | require_once 'Stomp.php'; | 
					
						
							| 
									
										
										
										
											2010-01-28 16:49:32 -08:00
										 |  |  | require_once 'Stomp/Exception.php'; | 
					
						
							| 
									
										
										
										
											2009-06-28 14:38:31 -04:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2010-01-12 19:57:15 -08:00
										 |  |  | class StompQueueManager extends QueueManager | 
					
						
							| 
									
										
										
										
											2009-06-28 14:38:31 -04:00
										 |  |  | { | 
					
						
							| 
									
										
										
										
											2010-01-28 16:49:32 -08:00
										 |  |  |     protected $servers; | 
					
						
							|  |  |  |     protected $username; | 
					
						
							|  |  |  |     protected $password; | 
					
						
							|  |  |  |     protected $base; | 
					
						
							| 
									
										
										
										
											2010-01-26 11:49:49 -08:00
										 |  |  |     protected $control; | 
					
						
							| 
									
										
										
										
											2010-01-28 16:49:32 -08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2010-03-29 15:25:57 +04:00
										 |  |  |     protected $useTransactions; | 
					
						
							|  |  |  |     protected $useAcks; | 
					
						
							| 
									
										
										
										
											2010-01-30 13:15:17 -05:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2010-01-12 19:57:15 -08:00
										 |  |  |     protected $sites = array(); | 
					
						
							| 
									
										
										
										
											2010-01-26 11:49:49 -08:00
										 |  |  |     protected $subscriptions = array(); | 
					
						
							| 
									
										
										
										
											2009-06-28 14:38:31 -04:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2010-01-28 16:49:32 -08:00
										 |  |  |     protected $cons = array(); // all open connections
 | 
					
						
							|  |  |  |     protected $disconnect = array(); | 
					
						
							|  |  |  |     protected $transaction = array(); | 
					
						
							|  |  |  |     protected $transactionCount = array(); | 
					
						
							|  |  |  |     protected $defaultIdx = 0; | 
					
						
							| 
									
										
										
										
											2010-01-22 12:35:05 -08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2009-06-28 14:38:31 -04:00
										 |  |  |     function __construct() | 
					
						
							|  |  |  |     { | 
					
						
							| 
									
										
										
										
											2010-01-12 19:57:15 -08:00
										 |  |  |         parent::__construct(); | 
					
						
							| 
									
										
										
										
											2010-01-28 16:49:32 -08:00
										 |  |  |         $server = common_config('queue', 'stomp_server'); | 
					
						
							|  |  |  |         if (is_array($server)) { | 
					
						
							|  |  |  |             $this->servers = $server; | 
					
						
							|  |  |  |         } else { | 
					
						
							|  |  |  |             $this->servers = array($server); | 
					
						
							|  |  |  |         } | 
					
						
							| 
									
										
										
										
											2010-03-29 15:46:41 -07:00
										 |  |  |         $this->username        = common_config('queue', 'stomp_username'); | 
					
						
							|  |  |  |         $this->password        = common_config('queue', 'stomp_password'); | 
					
						
							|  |  |  |         $this->base            = common_config('queue', 'queue_basename'); | 
					
						
							|  |  |  |         $this->control         = common_config('queue', 'control_channel'); | 
					
						
							|  |  |  |         $this->breakout        = common_config('queue', 'breakout'); | 
					
						
							|  |  |  |         $this->useTransactions = common_config('queue', 'stomp_transactions'); | 
					
						
							|  |  |  |         $this->useAcks         = common_config('queue', 'stomp_acks'); | 
					
						
							| 
									
										
										
										
											2009-06-28 14:38:31 -04:00
										 |  |  |     } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2010-01-12 19:57:15 -08:00
										 |  |  |     /** | 
					
						
							|  |  |  |      * Tell the i/o master we only need a single instance to cover | 
					
						
							|  |  |  |      * all sites running in this process. | 
					
						
							|  |  |  |      */ | 
					
						
							|  |  |  |     public static function multiSite() | 
					
						
							| 
									
										
										
										
											2009-06-28 14:38:31 -04:00
										 |  |  |     { | 
					
						
							| 
									
										
										
										
											2010-01-12 19:57:15 -08:00
										 |  |  |         return IoManager::INSTANCE_PER_PROCESS; | 
					
						
							|  |  |  |     } | 
					
						
							| 
									
										
										
										
											2009-06-28 14:38:31 -04:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2010-01-26 11:49:49 -08:00
										 |  |  |     /** | 
					
						
							|  |  |  |      * Optional; ping any running queue handler daemons with a notification | 
					
						
							|  |  |  |      * such as announcing a new site to handle or requesting clean shutdown. | 
					
						
							|  |  |  |      * This avoids having to restart all the daemons manually to update configs | 
					
						
							|  |  |  |      * and such. | 
					
						
							|  |  |  |      * | 
					
						
							|  |  |  |      * Currently only relevant for multi-site queue managers such as Stomp. | 
					
						
							|  |  |  |      * | 
					
						
							|  |  |  |      * @param string $event event key | 
					
						
							|  |  |  |      * @param string $param optional parameter to append to key | 
					
						
							|  |  |  |      * @return boolean success | 
					
						
							|  |  |  |      */ | 
					
						
							|  |  |  |     public function sendControlSignal($event, $param='') | 
					
						
							|  |  |  |     { | 
					
						
							|  |  |  |         $message = $event; | 
					
						
							|  |  |  |         if ($param != '') { | 
					
						
							|  |  |  |             $message .= ':' . $param; | 
					
						
							|  |  |  |         } | 
					
						
							|  |  |  |         $this->_connect(); | 
					
						
							| 
									
										
										
										
											2010-02-10 10:59:30 -08:00
										 |  |  |         $con = $this->cons[$this->defaultIdx]; | 
					
						
							|  |  |  |         $result = $con->send($this->control, | 
					
						
							|  |  |  |                              $message, | 
					
						
							|  |  |  |                              array ('created' => common_sql_now())); | 
					
						
							| 
									
										
										
										
											2010-01-26 11:49:49 -08:00
										 |  |  |         if ($result) { | 
					
						
							|  |  |  |             $this->_log(LOG_INFO, "Sent control ping to queue daemons: $message"); | 
					
						
							|  |  |  |             return true; | 
					
						
							|  |  |  |         } else { | 
					
						
							|  |  |  |             $this->_log(LOG_ERR, "Failed sending control ping to queue daemons: $message"); | 
					
						
							|  |  |  |             return false; | 
					
						
							|  |  |  |         } | 
					
						
							|  |  |  |     } | 
					
						
							| 
									
										
										
										
											2010-01-14 00:19:25 -08:00
										 |  |  | 
 | 
					
						
							|  |  |  |     /** | 
					
						
							| 
									
										
										
										
											2010-02-16 09:01:59 -08:00
										 |  |  |      * Saves an object into the queue item table. | 
					
						
							| 
									
										
										
										
											2010-01-14 00:19:25 -08:00
										 |  |  |      * | 
					
						
							| 
									
										
										
										
											2010-02-16 09:01:59 -08:00
										 |  |  |      * @param mixed $object | 
					
						
							| 
									
										
										
										
											2010-01-14 00:19:25 -08:00
										 |  |  |      * @param string $queue | 
					
						
							| 
									
										
										
										
											2010-06-03 17:49:20 -07:00
										 |  |  |      * @param string $siteNickname optional override to drop into another site's queue | 
					
						
							| 
									
										
										
										
											2010-01-14 00:19:25 -08:00
										 |  |  |      * | 
					
						
							| 
									
										
										
										
											2010-01-12 19:57:15 -08:00
										 |  |  |      * @return boolean true on success | 
					
						
							| 
									
										
										
										
											2010-01-28 16:49:32 -08:00
										 |  |  |      * @throws StompException on connection or send error | 
					
						
							| 
									
										
										
										
											2010-01-12 19:57:15 -08:00
										 |  |  |      */ | 
					
						
							| 
									
										
										
										
											2010-06-03 17:49:20 -07:00
										 |  |  |     public function enqueue($object, $queue, $siteNickname=null) | 
					
						
							| 
									
										
										
										
											2010-01-28 16:49:32 -08:00
										 |  |  |     { | 
					
						
							|  |  |  |         $this->_connect(); | 
					
						
							| 
									
										
										
										
											2010-05-21 14:07:59 -07:00
										 |  |  |         if (common_config('queue', 'stomp_enqueue_on')) { | 
					
						
							|  |  |  |             // We're trying to force all writes to a single server.
 | 
					
						
							|  |  |  |             // WARNING: this might do odd things if that server connection dies.
 | 
					
						
							|  |  |  |             $idx = array_search(common_config('queue', 'stomp_enqueue_on'), | 
					
						
							|  |  |  |                                 $this->servers); | 
					
						
							|  |  |  |             if ($idx === false) { | 
					
						
							|  |  |  |                 common_log(LOG_ERR, 'queue stomp_enqueue_on setting does not match our server list.'); | 
					
						
							|  |  |  |                 $idx = $this->defaultIdx; | 
					
						
							|  |  |  |             } | 
					
						
							|  |  |  |         } else { | 
					
						
							|  |  |  |             $idx = $this->defaultIdx; | 
					
						
							|  |  |  |         } | 
					
						
							| 
									
										
										
										
											2010-06-03 17:49:20 -07:00
										 |  |  |         return $this->_doEnqueue($object, $queue, $idx, $siteNickname); | 
					
						
							| 
									
										
										
										
											2010-01-28 16:49:32 -08:00
										 |  |  |     } | 
					
						
							| 
									
										
										
										
											2010-01-30 13:15:17 -05:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2010-01-28 16:49:32 -08:00
										 |  |  |     /** | 
					
						
							|  |  |  |      * Saves a notice object reference into the queue item table | 
					
						
							|  |  |  |      * on the given connection. | 
					
						
							|  |  |  |      * | 
					
						
							|  |  |  |      * @return boolean true on success | 
					
						
							|  |  |  |      * @throws StompException on connection or send error | 
					
						
							|  |  |  |      */ | 
					
						
							| 
									
										
										
										
											2010-06-03 17:49:20 -07:00
										 |  |  |     protected function _doEnqueue($object, $queue, $idx, $siteNickname=null) | 
					
						
							| 
									
										
										
										
											2009-06-28 14:38:31 -04:00
										 |  |  |     { | 
					
						
							| 
									
										
										
										
											2010-01-21 16:42:50 -08:00
										 |  |  |         $rep = $this->logrep($object); | 
					
						
							| 
									
										
										
										
											2010-06-03 17:49:20 -07:00
										 |  |  |         $envelope = array('site' => $siteNickname ? $siteNickname : common_config('site', 'nickname'), | 
					
						
							| 
									
										
										
										
											2010-02-16 09:01:59 -08:00
										 |  |  |                           'handler' => $queue, | 
					
						
							|  |  |  |                           'payload' => $this->encode($object)); | 
					
						
							|  |  |  |         $msg = serialize($envelope); | 
					
						
							| 
									
										
										
										
											2009-06-28 14:38:31 -04:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2010-01-28 09:52:35 -08:00
										 |  |  |         $props = array('created' => common_sql_now()); | 
					
						
							| 
									
										
										
										
											2010-01-28 16:49:32 -08:00
										 |  |  |         if ($this->isPersistent($queue)) { | 
					
						
							| 
									
										
										
										
											2010-01-28 09:52:35 -08:00
										 |  |  |             $props['persistent'] = 'true'; | 
					
						
							|  |  |  |         } | 
					
						
							| 
									
										
										
										
											2010-01-28 16:49:32 -08:00
										 |  |  | 
 | 
					
						
							|  |  |  |         $con = $this->cons[$idx]; | 
					
						
							|  |  |  |         $host = $con->getServer(); | 
					
						
							| 
									
										
										
										
											2010-02-17 16:49:00 -08:00
										 |  |  |         $target = $this->queueName($queue); | 
					
						
							|  |  |  |         $result = $con->send($target, $msg, $props); | 
					
						
							| 
									
										
										
										
											2009-06-28 14:38:31 -04:00
										 |  |  | 
 | 
					
						
							|  |  |  |         if (!$result) { | 
					
						
							| 
									
										
										
										
											2010-02-17 16:49:00 -08:00
										 |  |  |             $this->_log(LOG_ERR, "Error sending $rep to $queue queue on $host $target"); | 
					
						
							| 
									
										
										
										
											2009-06-28 14:38:31 -04:00
										 |  |  |             return false; | 
					
						
							|  |  |  |         } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2010-02-17 16:49:00 -08:00
										 |  |  |         $this->_log(LOG_DEBUG, "complete remote queueing $rep for $queue on $host $target"); | 
					
						
							| 
									
										
										
										
											2010-01-12 19:57:15 -08:00
										 |  |  |         $this->stats('enqueued', $queue); | 
					
						
							| 
									
										
										
										
											2010-01-28 16:49:32 -08:00
										 |  |  |         return true; | 
					
						
							|  |  |  |     } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     /** | 
					
						
							|  |  |  |      * Determine whether messages to this queue should be marked as persistent. | 
					
						
							|  |  |  |      * Actual persistent storage depends on the queue server's configuration. | 
					
						
							|  |  |  |      * @param string $queue | 
					
						
							|  |  |  |      * @return bool | 
					
						
							|  |  |  |      */ | 
					
						
							|  |  |  |     protected function isPersistent($queue) | 
					
						
							|  |  |  |     { | 
					
						
							|  |  |  |         $mode = common_config('queue', 'stomp_persistent'); | 
					
						
							|  |  |  |         if (is_array($mode)) { | 
					
						
							|  |  |  |             return in_array($queue, $mode); | 
					
						
							|  |  |  |         } else { | 
					
						
							|  |  |  |             return (bool)$mode; | 
					
						
							|  |  |  |         } | 
					
						
							| 
									
										
										
										
											2009-06-28 14:38:31 -04:00
										 |  |  |     } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2010-01-12 19:57:15 -08:00
										 |  |  |     /** | 
					
						
							|  |  |  |      * Send any sockets we're listening on to the IO manager | 
					
						
							|  |  |  |      * to wait for input. | 
					
						
							|  |  |  |      * | 
					
						
							|  |  |  |      * @return array of resources | 
					
						
							|  |  |  |      */ | 
					
						
							|  |  |  |     public function getSockets() | 
					
						
							| 
									
										
										
										
											2009-06-28 14:38:31 -04:00
										 |  |  |     { | 
					
						
							| 
									
										
										
										
											2010-01-28 16:49:32 -08:00
										 |  |  |         $sockets = array(); | 
					
						
							|  |  |  |         foreach ($this->cons as $con) { | 
					
						
							|  |  |  |             if ($con) { | 
					
						
							|  |  |  |                 $sockets[] = $con->getSocket(); | 
					
						
							|  |  |  |             } | 
					
						
							|  |  |  |         } | 
					
						
							|  |  |  |         return $sockets; | 
					
						
							|  |  |  |     } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     /** | 
					
						
							|  |  |  |      * Get the Stomp connection object associated with the given socket. | 
					
						
							|  |  |  |      * @param resource $socket | 
					
						
							|  |  |  |      * @return int index into connections list | 
					
						
							|  |  |  |      * @throws Exception | 
					
						
							|  |  |  |      */ | 
					
						
							|  |  |  |     protected function connectionFromSocket($socket) | 
					
						
							|  |  |  |     { | 
					
						
							|  |  |  |         foreach ($this->cons as $i => $con) { | 
					
						
							|  |  |  |             if ($con && $con->getSocket() === $socket) { | 
					
						
							|  |  |  |                 return $i; | 
					
						
							|  |  |  |             } | 
					
						
							|  |  |  |         } | 
					
						
							|  |  |  |         throw new Exception(__CLASS__ . " asked to read from unrecognized socket"); | 
					
						
							| 
									
										
										
										
											2010-01-12 19:57:15 -08:00
										 |  |  |     } | 
					
						
							| 
									
										
										
										
											2009-06-28 14:38:31 -04:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2010-01-12 19:57:15 -08:00
										 |  |  |     /** | 
					
						
							|  |  |  |      * We've got input to handle on our socket! | 
					
						
							|  |  |  |      * Read any waiting Stomp frame(s) and process them. | 
					
						
							|  |  |  |      * | 
					
						
							|  |  |  |      * @param resource $socket | 
					
						
							|  |  |  |      * @return boolean ok on success | 
					
						
							|  |  |  |      */ | 
					
						
							|  |  |  |     public function handleInput($socket) | 
					
						
							|  |  |  |     { | 
					
						
							| 
									
										
										
										
											2010-01-28 16:49:32 -08:00
										 |  |  |         $idx = $this->connectionFromSocket($socket); | 
					
						
							|  |  |  |         $con = $this->cons[$idx]; | 
					
						
							|  |  |  |         $host = $con->getServer(); | 
					
						
							| 
									
										
										
										
											2010-02-16 09:01:59 -08:00
										 |  |  |         $this->defaultIdx = $idx; | 
					
						
							| 
									
										
										
										
											2010-01-28 16:49:32 -08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2010-01-12 19:57:15 -08:00
										 |  |  |         $ok = true; | 
					
						
							| 
									
										
										
										
											2010-01-28 16:49:32 -08:00
										 |  |  |         try { | 
					
						
							|  |  |  |             $frames = $con->readFrames(); | 
					
						
							|  |  |  |         } catch (StompException $e) { | 
					
						
							| 
									
										
										
										
											2010-02-16 09:01:59 -08:00
										 |  |  |             $this->_log(LOG_ERR, "Lost connection to $host: " . $e->getMessage()); | 
					
						
							|  |  |  |             fclose($socket); // ???
 | 
					
						
							| 
									
										
										
										
											2010-01-28 16:49:32 -08:00
										 |  |  |             $this->cons[$idx] = null; | 
					
						
							|  |  |  |             $this->transaction[$idx] = null; | 
					
						
							|  |  |  |             $this->disconnect[$idx] = time(); | 
					
						
							|  |  |  |             return false; | 
					
						
							|  |  |  |         } | 
					
						
							| 
									
										
										
										
											2010-01-12 19:57:15 -08:00
										 |  |  |         foreach ($frames as $frame) { | 
					
						
							| 
									
										
										
										
											2010-01-26 11:49:49 -08:00
										 |  |  |             $dest = $frame->headers['destination']; | 
					
						
							|  |  |  |             if ($dest == $this->control) { | 
					
						
							| 
									
										
										
										
											2010-02-16 09:01:59 -08:00
										 |  |  |                 if (!$this->handleControlSignal($frame)) { | 
					
						
							| 
									
										
										
										
											2010-01-26 11:49:49 -08:00
										 |  |  |                     // We got a control event that requests a shutdown;
 | 
					
						
							|  |  |  |                     // close out and stop handling anything else!
 | 
					
						
							|  |  |  |                     break; | 
					
						
							|  |  |  |                 } | 
					
						
							|  |  |  |             } else { | 
					
						
							| 
									
										
										
										
											2010-02-16 09:01:59 -08:00
										 |  |  |                 $ok = $this->handleItem($frame) && $ok; | 
					
						
							| 
									
										
										
										
											2010-01-26 11:49:49 -08:00
										 |  |  |             } | 
					
						
							| 
									
										
										
										
											2010-02-16 09:01:59 -08:00
										 |  |  |             $this->ack($idx, $frame); | 
					
						
							|  |  |  |             $this->commit($idx); | 
					
						
							|  |  |  |             $this->begin($idx); | 
					
						
							| 
									
										
										
										
											2010-01-12 19:57:15 -08:00
										 |  |  |         } | 
					
						
							|  |  |  |         return $ok; | 
					
						
							|  |  |  |     } | 
					
						
							| 
									
										
										
										
											2009-06-28 14:38:31 -04:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2010-01-28 16:49:32 -08:00
										 |  |  |     /** | 
					
						
							|  |  |  |      * Attempt to reconnect in background if we lost a connection. | 
					
						
							|  |  |  |      */ | 
					
						
							|  |  |  |     function idle() | 
					
						
							|  |  |  |     { | 
					
						
							|  |  |  |         $now = time(); | 
					
						
							|  |  |  |         foreach ($this->cons as $idx => $con) { | 
					
						
							|  |  |  |             if (empty($con)) { | 
					
						
							|  |  |  |                 $age = $now - $this->disconnect[$idx]; | 
					
						
							|  |  |  |                 if ($age >= 60) { | 
					
						
							|  |  |  |                     $this->_reconnect($idx); | 
					
						
							|  |  |  |                 } | 
					
						
							|  |  |  |             } | 
					
						
							|  |  |  |         } | 
					
						
							|  |  |  |         return true; | 
					
						
							|  |  |  |     } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2010-01-12 19:57:15 -08:00
										 |  |  |     /** | 
					
						
							|  |  |  |      * Initialize our connection and subscribe to all the queues | 
					
						
							| 
									
										
										
										
											2010-01-28 16:49:32 -08:00
										 |  |  |      * we're going to need to handle... If multiple queue servers | 
					
						
							|  |  |  |      * are configured for failover, we'll listen to all of them. | 
					
						
							| 
									
										
										
										
											2010-01-12 19:57:15 -08:00
										 |  |  |      * | 
					
						
							|  |  |  |      * Side effects: in multi-site mode, may reset site configuration. | 
					
						
							|  |  |  |      * | 
					
						
							|  |  |  |      * @param IoMaster $master process/event controller | 
					
						
							|  |  |  |      * @return bool return false on failure | 
					
						
							|  |  |  |      */ | 
					
						
							|  |  |  |     public function start($master) | 
					
						
							|  |  |  |     { | 
					
						
							|  |  |  |         parent::start($master); | 
					
						
							| 
									
										
										
										
											2010-01-28 16:49:32 -08:00
										 |  |  |         $this->_connectAll(); | 
					
						
							| 
									
										
										
										
											2010-01-26 11:49:49 -08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2010-01-28 16:49:32 -08:00
										 |  |  |         foreach ($this->cons as $i => $con) { | 
					
						
							|  |  |  |             if ($con) { | 
					
						
							| 
									
										
										
										
											2010-02-16 09:01:59 -08:00
										 |  |  |                 $this->doSubscribe($con); | 
					
						
							| 
									
										
										
										
											2010-01-28 16:49:32 -08:00
										 |  |  |                 $this->begin($i); | 
					
						
							|  |  |  |             } | 
					
						
							|  |  |  |         } | 
					
						
							| 
									
										
										
										
											2010-01-12 19:57:15 -08:00
										 |  |  |         return true; | 
					
						
							|  |  |  |     } | 
					
						
							| 
									
										
										
										
											2010-01-30 13:15:17 -05:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2010-01-12 19:57:15 -08:00
										 |  |  |     /** | 
					
						
							| 
									
										
										
										
											2010-02-16 09:01:59 -08:00
										 |  |  |      * Close out any active connections. | 
					
						
							| 
									
										
										
										
											2010-01-12 19:57:15 -08:00
										 |  |  |      * | 
					
						
							|  |  |  |      * @return bool return false on failure | 
					
						
							|  |  |  |      */ | 
					
						
							|  |  |  |     public function finish() | 
					
						
							|  |  |  |     { | 
					
						
							| 
									
										
										
										
											2010-01-22 12:35:05 -08:00
										 |  |  |         // If there are any outstanding delivered messages we haven't processed,
 | 
					
						
							|  |  |  |         // free them for another thread to take.
 | 
					
						
							| 
									
										
										
										
											2010-01-28 16:49:32 -08:00
										 |  |  |         foreach ($this->cons as $i => $con) { | 
					
						
							|  |  |  |             if ($con) { | 
					
						
							|  |  |  |                 $this->rollback($i); | 
					
						
							| 
									
										
										
										
											2010-02-10 10:59:30 -08:00
										 |  |  |                 $con->disconnect(); | 
					
						
							|  |  |  |                 $this->cons[$i] = null; | 
					
						
							| 
									
										
										
										
											2010-01-28 16:49:32 -08:00
										 |  |  |             } | 
					
						
							|  |  |  |         } | 
					
						
							| 
									
										
										
										
											2010-01-12 19:57:15 -08:00
										 |  |  |         return true; | 
					
						
							|  |  |  |     } | 
					
						
							| 
									
										
										
										
											2010-01-26 11:49:49 -08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2010-01-12 19:57:15 -08:00
										 |  |  |     /** | 
					
						
							| 
									
										
										
										
											2010-01-28 16:49:32 -08:00
										 |  |  |      * Lazy open a single connection to Stomp queue server. | 
					
						
							|  |  |  |      * If multiple servers are configured, we let the Stomp client library | 
					
						
							|  |  |  |      * worry about finding a working connection among them. | 
					
						
							| 
									
										
										
										
											2010-01-12 19:57:15 -08:00
										 |  |  |      */ | 
					
						
							|  |  |  |     protected function _connect() | 
					
						
							|  |  |  |     { | 
					
						
							| 
									
										
										
										
											2010-01-28 16:49:32 -08:00
										 |  |  |         if (empty($this->cons)) { | 
					
						
							|  |  |  |             $list = $this->servers; | 
					
						
							|  |  |  |             if (count($list) > 1) { | 
					
						
							|  |  |  |                 shuffle($list); // Randomize to spread load
 | 
					
						
							|  |  |  |                 $url = 'failover://(' . implode(',', $list) . ')'; | 
					
						
							| 
									
										
										
										
											2010-01-12 19:57:15 -08:00
										 |  |  |             } else { | 
					
						
							| 
									
										
										
										
											2010-01-28 16:49:32 -08:00
										 |  |  |                 $url = $list[0]; | 
					
						
							| 
									
										
										
										
											2010-01-12 19:57:15 -08:00
										 |  |  |             } | 
					
						
							| 
									
										
										
										
											2010-01-28 16:49:32 -08:00
										 |  |  |             $con = $this->_doConnect($url); | 
					
						
							|  |  |  |             $this->cons = array($con); | 
					
						
							|  |  |  |             $this->transactionCount = array(0); | 
					
						
							|  |  |  |             $this->transaction = array(null); | 
					
						
							|  |  |  |             $this->disconnect = array(null); | 
					
						
							| 
									
										
										
										
											2010-01-12 19:57:15 -08:00
										 |  |  |         } | 
					
						
							|  |  |  |     } | 
					
						
							| 
									
										
										
										
											2009-07-01 11:10:23 -04:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2010-01-28 16:49:32 -08:00
										 |  |  |     /** | 
					
						
							|  |  |  |      * Lazy open connections to all Stomp servers, if in manual failover | 
					
						
							|  |  |  |      * mode. This means the queue servers don't speak to each other, so | 
					
						
							|  |  |  |      * we have to listen to all of them to make sure we get all events. | 
					
						
							|  |  |  |      */ | 
					
						
							|  |  |  |     protected function _connectAll() | 
					
						
							|  |  |  |     { | 
					
						
							|  |  |  |         if (!common_config('queue', 'stomp_manual_failover')) { | 
					
						
							|  |  |  |             return $this->_connect(); | 
					
						
							|  |  |  |         } | 
					
						
							|  |  |  |         if (empty($this->cons)) { | 
					
						
							|  |  |  |             $this->cons = array(); | 
					
						
							|  |  |  |             $this->transactionCount = array(); | 
					
						
							|  |  |  |             $this->transaction = array(); | 
					
						
							|  |  |  |             foreach ($this->servers as $idx => $server) { | 
					
						
							|  |  |  |                 try { | 
					
						
							|  |  |  |                     $this->cons[] = $this->_doConnect($server); | 
					
						
							|  |  |  |                     $this->disconnect[] = null; | 
					
						
							|  |  |  |                 } catch (Exception $e) { | 
					
						
							|  |  |  |                     // s'okay, we'll live
 | 
					
						
							|  |  |  |                     $this->cons[] = null; | 
					
						
							|  |  |  |                     $this->disconnect[] = time(); | 
					
						
							|  |  |  |                 } | 
					
						
							|  |  |  |                 $this->transactionCount[] = 0; | 
					
						
							|  |  |  |                 $this->transaction[] = null; | 
					
						
							|  |  |  |             } | 
					
						
							|  |  |  |             if (empty($this->cons)) { | 
					
						
							|  |  |  |                 throw new ServerException("No queue servers reachable..."); | 
					
						
							|  |  |  |                 return false; | 
					
						
							|  |  |  |             } | 
					
						
							|  |  |  |         } | 
					
						
							|  |  |  |     } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2010-02-16 09:01:59 -08:00
										 |  |  |     /** | 
					
						
							|  |  |  |      * Attempt to manually reconnect to the Stomp server for the given | 
					
						
							|  |  |  |      * slot. If successful, set up our subscriptions on it. | 
					
						
							|  |  |  |      */ | 
					
						
							| 
									
										
										
										
											2010-01-28 16:49:32 -08:00
										 |  |  |     protected function _reconnect($idx) | 
					
						
							|  |  |  |     { | 
					
						
							|  |  |  |         try { | 
					
						
							|  |  |  |             $con = $this->_doConnect($this->servers[$idx]); | 
					
						
							|  |  |  |         } catch (Exception $e) { | 
					
						
							|  |  |  |             $this->_log(LOG_ERR, $e->getMessage()); | 
					
						
							|  |  |  |             $con = null; | 
					
						
							|  |  |  |         } | 
					
						
							|  |  |  |         if ($con) { | 
					
						
							|  |  |  |             $this->cons[$idx] = $con; | 
					
						
							|  |  |  |             $this->disconnect[$idx] = null; | 
					
						
							| 
									
										
										
										
											2010-01-30 13:15:17 -05:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2010-02-16 09:01:59 -08:00
										 |  |  |             $this->doSubscribe($con); | 
					
						
							| 
									
										
										
										
											2010-01-28 16:49:32 -08:00
										 |  |  |             $this->begin($idx); | 
					
						
							|  |  |  |         } else { | 
					
						
							|  |  |  |             // Try again later...
 | 
					
						
							|  |  |  |             $this->disconnect[$idx] = time(); | 
					
						
							|  |  |  |         } | 
					
						
							|  |  |  |     } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     protected function _doConnect($server) | 
					
						
							|  |  |  |     { | 
					
						
							|  |  |  |         $this->_log(LOG_INFO, "Connecting to '$server' as '$this->username'..."); | 
					
						
							|  |  |  |         $con = new LiberalStomp($server); | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         if ($con->connect($this->username, $this->password)) { | 
					
						
							|  |  |  |             $this->_log(LOG_INFO, "Connected."); | 
					
						
							|  |  |  |         } else { | 
					
						
							|  |  |  |             $this->_log(LOG_ERR, 'Failed to connect to queue server'); | 
					
						
							|  |  |  |             throw new ServerException('Failed to connect to queue server'); | 
					
						
							|  |  |  |         } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         return $con; | 
					
						
							|  |  |  |     } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2010-01-12 19:57:15 -08:00
										 |  |  |     /** | 
					
						
							| 
									
										
										
										
											2010-02-16 09:01:59 -08:00
										 |  |  |      * Set up all our raw queue subscriptions on the given connection | 
					
						
							|  |  |  |      * @param LiberalStomp $con | 
					
						
							| 
									
										
										
										
											2010-01-12 19:57:15 -08:00
										 |  |  |      */ | 
					
						
							| 
									
										
										
										
											2010-02-16 09:01:59 -08:00
										 |  |  |     protected function doSubscribe(LiberalStomp $con) | 
					
						
							| 
									
										
										
										
											2010-01-12 19:57:15 -08:00
										 |  |  |     { | 
					
						
							| 
									
										
										
										
											2010-02-16 09:01:59 -08:00
										 |  |  |         $host = $con->getServer(); | 
					
						
							| 
									
										
										
										
											2010-02-17 16:49:00 -08:00
										 |  |  |         foreach ($this->subscriptions() as $sub) { | 
					
						
							|  |  |  |             $this->_log(LOG_INFO, "Subscribing to $sub on $host"); | 
					
						
							|  |  |  |             $con->subscribe($sub); | 
					
						
							| 
									
										
										
										
											2010-01-12 19:57:15 -08:00
										 |  |  |         } | 
					
						
							|  |  |  |     } | 
					
						
							| 
									
										
										
										
											2010-02-17 16:49:00 -08:00
										 |  |  |      | 
					
						
							|  |  |  |     /** | 
					
						
							|  |  |  |      * Grab a full list of stomp-side queue subscriptions. | 
					
						
							|  |  |  |      * Will include: | 
					
						
							|  |  |  |      *  - control broadcast channel | 
					
						
							|  |  |  |      *  - shared group queues for active groups | 
					
						
							|  |  |  |      *  - per-handler and per-site breakouts from $config['queue']['breakout'] | 
					
						
							|  |  |  |      *    that are rooted in the active groups. | 
					
						
							|  |  |  |      * | 
					
						
							|  |  |  |      * @return array of strings | 
					
						
							|  |  |  |      */ | 
					
						
							|  |  |  |     protected function subscriptions() | 
					
						
							|  |  |  |     { | 
					
						
							|  |  |  |         $subs = array(); | 
					
						
							|  |  |  |         $subs[] = $this->control; | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         foreach ($this->activeGroups as $group) { | 
					
						
							|  |  |  |             $subs[] = $this->base . $group; | 
					
						
							|  |  |  |         } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         foreach ($this->breakout as $spec) { | 
					
						
							|  |  |  |             $parts = explode('/', $spec); | 
					
						
							|  |  |  |             if (count($parts) < 2 || count($parts) > 3) { | 
					
						
							|  |  |  |                 common_log(LOG_ERR, "Bad queue breakout specifier $spec"); | 
					
						
							|  |  |  |             } | 
					
						
							|  |  |  |             if (in_array($parts[0], $this->activeGroups)) { | 
					
						
							|  |  |  |                 $subs[] = $this->base . $spec; | 
					
						
							|  |  |  |             } | 
					
						
							|  |  |  |         } | 
					
						
							|  |  |  |         return array_unique($subs); | 
					
						
							|  |  |  |     } | 
					
						
							| 
									
										
										
										
											2009-06-28 14:38:31 -04:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2010-01-12 19:57:15 -08:00
										 |  |  |     /** | 
					
						
							| 
									
										
										
										
											2010-01-21 16:42:50 -08:00
										 |  |  |      * Handle and acknowledge an event that's come in through a queue. | 
					
						
							| 
									
										
										
										
											2010-01-12 19:57:15 -08:00
										 |  |  |      * | 
					
						
							|  |  |  |      * If the queue handler reports failure, the message is requeued for later. | 
					
						
							|  |  |  |      * Missing notices or handler classes will drop the message. | 
					
						
							|  |  |  |      * | 
					
						
							|  |  |  |      * Side effects: in multi-site mode, may reset site configuration to | 
					
						
							|  |  |  |      * match the site that queued the event. | 
					
						
							|  |  |  |      * | 
					
						
							|  |  |  |      * @param StompFrame $frame | 
					
						
							| 
									
										
										
										
											2010-02-16 09:01:59 -08:00
										 |  |  |      * @return bool success | 
					
						
							| 
									
										
										
										
											2010-01-12 19:57:15 -08:00
										 |  |  |      */ | 
					
						
							| 
									
										
										
										
											2010-02-16 09:01:59 -08:00
										 |  |  |     protected function handleItem($frame) | 
					
						
							| 
									
										
										
										
											2010-01-12 19:57:15 -08:00
										 |  |  |     { | 
					
						
							| 
									
										
										
										
											2010-02-16 09:01:59 -08:00
										 |  |  |         $host = $this->cons[$this->defaultIdx]->getServer(); | 
					
						
							|  |  |  |         $message = unserialize($frame->body); | 
					
						
							|  |  |  |         $site = $message['site']; | 
					
						
							|  |  |  |         $queue = $message['handler']; | 
					
						
							| 
									
										
										
										
											2010-01-28 16:49:32 -08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2010-02-16 09:01:59 -08:00
										 |  |  |         if ($this->isDeadletter($frame, $message)) { | 
					
						
							|  |  |  |             $this->stats('deadletter', $queue); | 
					
						
							|  |  |  | 	        return false; | 
					
						
							| 
									
										
										
										
											2010-01-12 19:57:15 -08:00
										 |  |  |         } | 
					
						
							| 
									
										
										
										
											2009-06-28 14:38:31 -04:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2010-02-16 09:01:59 -08:00
										 |  |  |         // @fixme detect failing site switches
 | 
					
						
							|  |  |  |         $this->switchSite($site); | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         $item = $this->decode($message['payload']); | 
					
						
							| 
									
										
										
										
											2010-02-16 09:15:29 -08:00
										 |  |  |         if (empty($item)) { | 
					
						
							|  |  |  |             $this->_log(LOG_ERR, "Skipping empty or deleted item in queue $queue from $host"); | 
					
						
							| 
									
										
										
										
											2010-02-16 09:01:59 -08:00
										 |  |  |             $this->stats('baditem', $queue); | 
					
						
							|  |  |  |             return false; | 
					
						
							| 
									
										
										
										
											2010-01-12 19:57:15 -08:00
										 |  |  |         } | 
					
						
							| 
									
										
										
										
											2010-02-16 09:15:29 -08:00
										 |  |  |         $info = $this->logrep($item) . " posted at " . | 
					
						
							|  |  |  |                 $frame->headers['created'] . " in queue $queue from $host"; | 
					
						
							|  |  |  |         $this->_log(LOG_DEBUG, "Dequeued $info"); | 
					
						
							| 
									
										
										
										
											2009-07-09 12:09:20 -04:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2010-01-12 19:57:15 -08:00
										 |  |  |         $handler = $this->getHandler($queue); | 
					
						
							|  |  |  |         if (!$handler) { | 
					
						
							| 
									
										
										
										
											2010-01-26 11:49:49 -08:00
										 |  |  |             $this->_log(LOG_ERR, "Missing handler class; skipping $info"); | 
					
						
							| 
									
										
										
										
											2010-01-12 19:57:15 -08:00
										 |  |  |             $this->stats('badhandler', $queue); | 
					
						
							|  |  |  |             return false; | 
					
						
							|  |  |  |         } | 
					
						
							| 
									
										
										
										
											2009-07-09 12:09:20 -04:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2010-01-30 13:15:17 -05:00
										 |  |  |         try { | 
					
						
							|  |  |  |             $ok = $handler->handle($item); | 
					
						
							|  |  |  |         } catch (Exception $e) { | 
					
						
							|  |  |  |             $this->_log(LOG_ERR, "Exception on queue $queue: " . $e->getMessage()); | 
					
						
							|  |  |  |             $ok = false; | 
					
						
							|  |  |  |         } | 
					
						
							| 
									
										
										
										
											2009-07-09 12:09:20 -04:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2010-02-16 09:01:59 -08:00
										 |  |  |         if ($ok) { | 
					
						
							|  |  |  |             $this->_log(LOG_INFO, "Successfully handled $info"); | 
					
						
							|  |  |  |             $this->stats('handled', $queue); | 
					
						
							|  |  |  |         } else { | 
					
						
							| 
									
										
										
										
											2010-01-12 19:57:15 -08:00
										 |  |  |             $this->_log(LOG_WARNING, "Failed handling $info"); | 
					
						
							| 
									
										
										
										
											2010-02-16 09:01:59 -08:00
										 |  |  |             // Requeing moves the item to the end of the line for its next try.
 | 
					
						
							|  |  |  |             // @fixme add a manual retry count
 | 
					
						
							| 
									
										
										
										
											2010-01-21 16:42:50 -08:00
										 |  |  |             $this->enqueue($item, $queue); | 
					
						
							| 
									
										
										
										
											2010-01-12 19:57:15 -08:00
										 |  |  |             $this->stats('requeued', $queue); | 
					
						
							| 
									
										
										
										
											2009-07-09 12:33:38 -04:00
										 |  |  |         } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2010-02-16 09:01:59 -08:00
										 |  |  |         return $ok; | 
					
						
							|  |  |  |     } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     /** | 
					
						
							|  |  |  |      * Check if a redelivered message has been run through enough | 
					
						
							|  |  |  |      * that we're going to give up on it. | 
					
						
							|  |  |  |      * | 
					
						
							|  |  |  |      * @param StompFrame $frame | 
					
						
							|  |  |  |      * @param array $message unserialized message body | 
					
						
							|  |  |  |      * @return boolean true if we should discard | 
					
						
							|  |  |  |      */ | 
					
						
							|  |  |  |     protected function isDeadLetter($frame, $message) | 
					
						
							|  |  |  |     { | 
					
						
							|  |  |  |         if (isset($frame->headers['redelivered']) && $frame->headers['redelivered'] == 'true') { | 
					
						
							|  |  |  | 	        // Message was redelivered, possibly indicating a previous failure.
 | 
					
						
							|  |  |  |             $msgId = $frame->headers['message-id']; | 
					
						
							|  |  |  |             $site = $message['site']; | 
					
						
							|  |  |  |             $queue = $message['handler']; | 
					
						
							|  |  |  | 	        $msgInfo = "message $msgId for $site in queue $queue"; | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	        $deliveries = $this->incDeliveryCount($msgId); | 
					
						
							|  |  |  | 	        if ($deliveries > common_config('queue', 'max_retries')) { | 
					
						
							|  |  |  | 		        $info = "DEAD-LETTER FILE: Gave up after retry $deliveries on $msgInfo"; | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		        $outdir = common_config('queue', 'dead_letter_dir'); | 
					
						
							|  |  |  | 		        if ($outdir) { | 
					
						
							|  |  |  |     		        $filename = $outdir . "/$site-$queue-" . rawurlencode($msgId); | 
					
						
							|  |  |  |     		        $info .= ": dumping to $filename"; | 
					
						
							|  |  |  |     		        file_put_contents($filename, $message['payload']); | 
					
						
							|  |  |  | 		        } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		        common_log(LOG_ERR, $info); | 
					
						
							|  |  |  | 		        return true; | 
					
						
							|  |  |  | 	        } else { | 
					
						
							|  |  |  | 	            common_log(LOG_INFO, "retry $deliveries on $msgInfo"); | 
					
						
							|  |  |  | 	        } | 
					
						
							|  |  |  |         } | 
					
						
							|  |  |  |         return false; | 
					
						
							|  |  |  |     } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     /** | 
					
						
							|  |  |  |      * Update count of times we've re-encountered this message recently, | 
					
						
							|  |  |  |      * triggered when we get a message marked as 'redelivered'. | 
					
						
							|  |  |  |      * | 
					
						
							|  |  |  |      * Requires a CLI-friendly cache configuration. | 
					
						
							|  |  |  |      * | 
					
						
							|  |  |  |      * @param string $msgId message-id header from message | 
					
						
							|  |  |  |      * @return int number of retries recorded | 
					
						
							|  |  |  |      */ | 
					
						
							|  |  |  |     function incDeliveryCount($msgId) | 
					
						
							|  |  |  |     { | 
					
						
							|  |  |  | 	    $count = 0; | 
					
						
							| 
									
										
										
										
											2010-09-06 09:56:45 -04:00
										 |  |  | 	    $cache = Cache::instance(); | 
					
						
							| 
									
										
										
										
											2010-02-16 09:01:59 -08:00
										 |  |  | 	    if ($cache) { | 
					
						
							|  |  |  | 		    $key = 'statusnet:stomp:message-retries:' . $msgId; | 
					
						
							|  |  |  | 		    $count = $cache->increment($key); | 
					
						
							|  |  |  | 		    if (!$count) { | 
					
						
							|  |  |  | 			    $count = 1; | 
					
						
							|  |  |  | 			    $cache->set($key, $count, null, 3600); | 
					
						
							|  |  |  | 			    $got = $cache->get($key); | 
					
						
							|  |  |  | 		    } | 
					
						
							|  |  |  | 	    } | 
					
						
							|  |  |  | 	    return $count; | 
					
						
							| 
									
										
										
										
											2009-07-09 12:33:38 -04:00
										 |  |  |     } | 
					
						
							| 
									
										
										
										
											2009-07-09 11:40:01 -04:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2010-01-26 11:49:49 -08:00
										 |  |  |     /** | 
					
						
							|  |  |  |      * Process a control signal broadcast. | 
					
						
							|  |  |  |      * | 
					
						
							| 
									
										
										
										
											2010-01-28 16:49:32 -08:00
										 |  |  |      * @param int $idx connection index | 
					
						
							| 
									
										
										
										
											2010-01-26 11:49:49 -08:00
										 |  |  |      * @param array $frame Stomp frame | 
					
						
							|  |  |  |      * @return bool true to continue; false to stop further processing. | 
					
						
							|  |  |  |      */ | 
					
						
							| 
									
										
										
										
											2010-01-28 16:49:32 -08:00
										 |  |  |     protected function handleControlSignal($idx, $frame) | 
					
						
							| 
									
										
										
										
											2010-01-26 11:49:49 -08:00
										 |  |  |     { | 
					
						
							|  |  |  |         $message = trim($frame->body); | 
					
						
							|  |  |  |         if (strpos($message, ':') !== false) { | 
					
						
							|  |  |  |             list($event, $param) = explode(':', $message, 2); | 
					
						
							|  |  |  |         } else { | 
					
						
							|  |  |  |             $event = $message; | 
					
						
							|  |  |  |             $param = ''; | 
					
						
							|  |  |  |         } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         $shutdown = false; | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         if ($event == 'shutdown') { | 
					
						
							|  |  |  |             $this->master->requestShutdown(); | 
					
						
							|  |  |  |             $shutdown = true; | 
					
						
							|  |  |  |         } else if ($event == 'restart') { | 
					
						
							|  |  |  |             $this->master->requestRestart(); | 
					
						
							|  |  |  |             $shutdown = true; | 
					
						
							|  |  |  |         } else if ($event == 'update') { | 
					
						
							|  |  |  |             $this->updateSiteConfig($param); | 
					
						
							|  |  |  |         } else { | 
					
						
							|  |  |  |             $this->_log(LOG_ERR, "Ignoring unrecognized control message: $message"); | 
					
						
							|  |  |  |         } | 
					
						
							|  |  |  |         return $shutdown; | 
					
						
							|  |  |  |     } | 
					
						
							| 
									
										
										
										
											2010-01-30 13:15:17 -05:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2010-02-16 09:01:59 -08:00
										 |  |  |     /** | 
					
						
							|  |  |  |      * Switch site, if necessary, and reset current handler assignments | 
					
						
							|  |  |  |      * @param string $site | 
					
						
							|  |  |  |      */ | 
					
						
							|  |  |  |     function switchSite($site) | 
					
						
							|  |  |  |     { | 
					
						
							|  |  |  |         if ($site != StatusNet::currentSite()) { | 
					
						
							|  |  |  |             $this->stats('switch'); | 
					
						
							|  |  |  |             StatusNet::switchSite($site); | 
					
						
							|  |  |  |             $this->initialize(); | 
					
						
							|  |  |  |         } | 
					
						
							|  |  |  |     } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2010-01-26 11:49:49 -08:00
										 |  |  |     /** | 
					
						
							| 
									
										
										
										
											2010-02-17 16:49:00 -08:00
										 |  |  |      * (Re)load runtime configuration for a given site by nickname, | 
					
						
							| 
									
										
										
										
											2010-01-26 11:49:49 -08:00
										 |  |  |      * triggered by a broadcast to the 'statusnet-control' topic. | 
					
						
							|  |  |  |      * | 
					
						
							| 
									
										
										
										
											2010-02-17 16:49:00 -08:00
										 |  |  |      * Configuration changes in database should update, but config | 
					
						
							|  |  |  |      * files might not. | 
					
						
							|  |  |  |      * | 
					
						
							| 
									
										
										
										
											2010-01-26 11:49:49 -08:00
										 |  |  |      * @param array $frame Stomp frame | 
					
						
							|  |  |  |      * @return bool true to continue; false to stop further processing. | 
					
						
							|  |  |  |      */ | 
					
						
							|  |  |  |     protected function updateSiteConfig($nickname) | 
					
						
							|  |  |  |     { | 
					
						
							| 
									
										
										
										
											2010-08-10 17:22:26 -07:00
										 |  |  |         $sn = Status_network::staticGet('nickname', $nickname); | 
					
						
							| 
									
										
										
										
											2010-02-17 16:49:00 -08:00
										 |  |  |         if ($sn) { | 
					
						
							|  |  |  |             $this->switchSite($nickname); | 
					
						
							|  |  |  |             if (!in_array($nickname, $this->sites)) { | 
					
						
							|  |  |  |                 $this->addSite(); | 
					
						
							| 
									
										
										
										
											2010-01-26 11:49:49 -08:00
										 |  |  |             } | 
					
						
							| 
									
										
										
										
											2010-02-17 16:49:00 -08:00
										 |  |  |             $this->stats('siteupdate'); | 
					
						
							| 
									
										
										
										
											2010-01-26 11:49:49 -08:00
										 |  |  |         } else { | 
					
						
							| 
									
										
										
										
											2010-02-17 16:49:00 -08:00
										 |  |  |             $this->_log(LOG_ERR, "Ignoring ping for unrecognized new site $nickname"); | 
					
						
							| 
									
										
										
										
											2010-01-26 11:49:49 -08:00
										 |  |  |         } | 
					
						
							|  |  |  |     } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2010-01-12 19:57:15 -08:00
										 |  |  |     /** | 
					
						
							|  |  |  |      * Combines the queue_basename from configuration with the | 
					
						
							| 
									
										
										
										
											2010-02-16 09:01:59 -08:00
										 |  |  |      * group name for this queue to give eg: | 
					
						
							| 
									
										
										
										
											2010-01-12 19:57:15 -08:00
										 |  |  |      * | 
					
						
							| 
									
										
										
										
											2010-02-16 09:01:59 -08:00
										 |  |  |      * /queue/statusnet/main | 
					
						
							| 
									
										
										
										
											2010-02-17 16:49:00 -08:00
										 |  |  |      * /queue/statusnet/main/distrib | 
					
						
							|  |  |  |      * /queue/statusnet/xmpp/xmppout/site01 | 
					
						
							| 
									
										
										
										
											2010-01-12 19:57:15 -08:00
										 |  |  |      * | 
					
						
							|  |  |  |      * @param string $queue | 
					
						
							|  |  |  |      * @return string | 
					
						
							|  |  |  |      */ | 
					
						
							|  |  |  |     protected function queueName($queue) | 
					
						
							| 
									
										
										
										
											2009-07-09 12:33:38 -04:00
										 |  |  |     { | 
					
						
							| 
									
										
										
										
											2010-02-16 09:01:59 -08:00
										 |  |  |         $group = $this->queueGroup($queue); | 
					
						
							| 
									
										
										
										
											2010-02-17 16:49:00 -08:00
										 |  |  |         $site = StatusNet::currentSite(); | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         $specs = array("$group/$queue/$site", | 
					
						
							|  |  |  |                        "$group/$queue"); | 
					
						
							|  |  |  |         foreach ($specs as $spec) { | 
					
						
							|  |  |  |             if (in_array($spec, $this->breakout)) { | 
					
						
							|  |  |  |                 return $this->base . $spec; | 
					
						
							|  |  |  |             } | 
					
						
							|  |  |  |         } | 
					
						
							|  |  |  |         return $this->base . $group; | 
					
						
							| 
									
										
										
										
											2009-07-01 11:10:23 -04:00
										 |  |  |     } | 
					
						
							| 
									
										
										
										
											2009-07-02 12:43:09 -04:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2010-01-12 19:57:15 -08:00
										 |  |  |     /** | 
					
						
							| 
									
										
										
										
											2010-02-16 09:01:59 -08:00
										 |  |  |      * Get the breakout mode for the given queue on the current site. | 
					
						
							| 
									
										
										
										
											2010-01-12 19:57:15 -08:00
										 |  |  |      * | 
					
						
							| 
									
										
										
										
											2010-02-16 09:01:59 -08:00
										 |  |  |      * @param string $queue | 
					
						
							|  |  |  |      * @return string one of 'shared', 'handler', 'site' | 
					
						
							| 
									
										
										
										
											2010-01-12 19:57:15 -08:00
										 |  |  |      */ | 
					
						
							| 
									
										
										
										
											2010-02-16 09:01:59 -08:00
										 |  |  |     protected function breakoutMode($queue) | 
					
						
							| 
									
										
										
										
											2009-07-03 10:05:07 -04:00
										 |  |  |     { | 
					
						
							| 
									
										
										
										
											2010-02-16 09:01:59 -08:00
										 |  |  |         $breakout = common_config('queue', 'breakout'); | 
					
						
							|  |  |  |         if (isset($breakout[$queue])) { | 
					
						
							|  |  |  |             return $breakout[$queue]; | 
					
						
							|  |  |  |         } else if (isset($breakout['*'])) { | 
					
						
							|  |  |  |             return $breakout['*']; | 
					
						
							| 
									
										
										
										
											2010-01-12 19:57:15 -08:00
										 |  |  |         } else { | 
					
						
							| 
									
										
										
										
											2010-02-16 09:01:59 -08:00
										 |  |  |             return 'shared'; | 
					
						
							| 
									
										
										
										
											2010-01-12 19:57:15 -08:00
										 |  |  |         } | 
					
						
							| 
									
										
										
										
											2009-07-03 10:05:07 -04:00
										 |  |  |     } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2010-01-28 16:49:32 -08:00
										 |  |  |     protected function begin($idx) | 
					
						
							| 
									
										
										
										
											2010-01-22 12:35:05 -08:00
										 |  |  |     { | 
					
						
							|  |  |  |         if ($this->useTransactions) { | 
					
						
							| 
									
										
										
										
											2010-01-28 16:49:32 -08:00
										 |  |  |             if (!empty($this->transaction[$idx])) { | 
					
						
							| 
									
										
										
										
											2010-01-22 12:35:05 -08:00
										 |  |  |                 throw new Exception("Tried to start transaction in the middle of a transaction"); | 
					
						
							|  |  |  |             } | 
					
						
							| 
									
										
										
										
											2010-01-28 16:49:32 -08:00
										 |  |  |             $this->transactionCount[$idx]++; | 
					
						
							|  |  |  |             $this->transaction[$idx] = $this->master->id . '-' . $this->transactionCount[$idx] . '-' . time(); | 
					
						
							|  |  |  |             $this->cons[$idx]->begin($this->transaction[$idx]); | 
					
						
							| 
									
										
										
										
											2010-01-22 12:35:05 -08:00
										 |  |  |         } | 
					
						
							|  |  |  |     } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2010-01-28 16:49:32 -08:00
										 |  |  |     protected function ack($idx, $frame) | 
					
						
							| 
									
										
										
										
											2010-01-22 12:35:05 -08:00
										 |  |  |     { | 
					
						
							| 
									
										
										
										
											2010-03-29 15:25:57 +04:00
										 |  |  |         if ($this->useAcks) { | 
					
						
							|  |  |  |             if ($this->useTransactions) { | 
					
						
							|  |  |  |                 if (empty($this->transaction[$idx])) { | 
					
						
							|  |  |  |                     throw new Exception("Tried to ack but not in a transaction"); | 
					
						
							|  |  |  |                 } | 
					
						
							|  |  |  |                 $this->cons[$idx]->ack($frame, $this->transaction[$idx]); | 
					
						
							|  |  |  |             } else { | 
					
						
							|  |  |  |                 $this->cons[$idx]->ack($frame); | 
					
						
							| 
									
										
										
										
											2010-01-22 12:35:05 -08:00
										 |  |  |             } | 
					
						
							|  |  |  |         } | 
					
						
							|  |  |  |     } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2010-01-28 16:49:32 -08:00
										 |  |  |     protected function commit($idx) | 
					
						
							| 
									
										
										
										
											2010-01-22 12:35:05 -08:00
										 |  |  |     { | 
					
						
							|  |  |  |         if ($this->useTransactions) { | 
					
						
							| 
									
										
										
										
											2010-01-28 16:49:32 -08:00
										 |  |  |             if (empty($this->transaction[$idx])) { | 
					
						
							| 
									
										
										
										
											2010-01-22 12:35:05 -08:00
										 |  |  |                 throw new Exception("Tried to commit but not in a transaction"); | 
					
						
							|  |  |  |             } | 
					
						
							| 
									
										
										
										
											2010-01-28 16:49:32 -08:00
										 |  |  |             $this->cons[$idx]->commit($this->transaction[$idx]); | 
					
						
							|  |  |  |             $this->transaction[$idx] = null; | 
					
						
							| 
									
										
										
										
											2010-01-22 12:35:05 -08:00
										 |  |  |         } | 
					
						
							|  |  |  |     } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2010-01-28 16:49:32 -08:00
										 |  |  |     protected function rollback($idx) | 
					
						
							| 
									
										
										
										
											2010-01-22 12:35:05 -08:00
										 |  |  |     { | 
					
						
							|  |  |  |         if ($this->useTransactions) { | 
					
						
							| 
									
										
										
										
											2010-01-28 16:49:32 -08:00
										 |  |  |             if (empty($this->transaction[$idx])) { | 
					
						
							| 
									
										
										
										
											2010-01-22 12:35:05 -08:00
										 |  |  |                 throw new Exception("Tried to rollback but not in a transaction"); | 
					
						
							|  |  |  |             } | 
					
						
							| 
									
										
										
										
											2010-01-28 16:49:32 -08:00
										 |  |  |             $this->cons[$idx]->commit($this->transaction[$idx]); | 
					
						
							|  |  |  |             $this->transaction[$idx] = null; | 
					
						
							| 
									
										
										
										
											2010-01-22 12:35:05 -08:00
										 |  |  |         } | 
					
						
							|  |  |  |     } | 
					
						
							| 
									
										
										
										
											2009-06-28 14:38:31 -04:00
										 |  |  | } | 
					
						
							| 
									
										
										
										
											2010-01-12 19:57:15 -08:00
										 |  |  | 
 |