| 
									
										
										
										
											2009-06-28 14:38:31 -04:00
										 |  |  | <?php | 
					
						
							|  |  |  | /** | 
					
						
							| 
									
										
										
										
											2009-08-25 18:12:20 -04:00
										 |  |  |  * StatusNet, the distributed open-source microblogging tool | 
					
						
							| 
									
										
										
										
											2009-06-28 14:38:31 -04:00
										 |  |  |  * | 
					
						
							|  |  |  |  * Abstract class for queue managers | 
					
						
							|  |  |  |  * | 
					
						
							|  |  |  |  * PHP version 5 | 
					
						
							|  |  |  |  * | 
					
						
							|  |  |  |  * LICENCE: This program is free software: you can redistribute it and/or modify | 
					
						
							|  |  |  |  * it under the terms of the GNU Affero General Public License as published by | 
					
						
							|  |  |  |  * the Free Software Foundation, either version 3 of the License, or | 
					
						
							|  |  |  |  * (at your option) any later version. | 
					
						
							|  |  |  |  * | 
					
						
							|  |  |  |  * This program is distributed in the hope that it will be useful, | 
					
						
							|  |  |  |  * but WITHOUT ANY WARRANTY; without even the implied warranty of | 
					
						
							|  |  |  |  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the | 
					
						
							|  |  |  |  * GNU Affero General Public License for more details. | 
					
						
							|  |  |  |  * | 
					
						
							|  |  |  |  * You should have received a copy of the GNU Affero General Public License | 
					
						
							|  |  |  |  * along with this program.  If not, see <http://www.gnu.org/licenses/>. | 
					
						
							|  |  |  |  * | 
					
						
							|  |  |  |  * @category  QueueManager | 
					
						
							| 
									
										
										
										
											2009-08-25 18:12:20 -04:00
										 |  |  |  * @package   StatusNet | 
					
						
							| 
									
										
										
										
											2009-08-25 18:19:04 -04:00
										 |  |  |  * @author    Evan Prodromou <evan@status.net> | 
					
						
							|  |  |  |  * @author    Sarven Capadisli <csarven@status.net> | 
					
						
							| 
									
										
										
										
											2009-08-25 18:12:20 -04:00
										 |  |  |  * @copyright 2009 StatusNet, Inc. | 
					
						
							| 
									
										
										
										
											2009-06-28 14:38:31 -04:00
										 |  |  |  * @license   http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0 | 
					
						
							| 
									
										
										
										
											2009-08-25 18:16:46 -04:00
										 |  |  |  * @link      http://status.net/ | 
					
						
							| 
									
										
										
										
											2009-06-28 14:38:31 -04:00
										 |  |  |  */ | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | require_once 'Stomp.php'; | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2009-07-09 12:09:20 -04:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2010-01-12 19:57:15 -08:00
										 |  |  | class StompQueueManager extends QueueManager | 
					
						
							| 
									
										
										
										
											2009-06-28 14:38:31 -04:00
										 |  |  | { | 
					
						
							|  |  |  |     var $server = null; | 
					
						
							|  |  |  |     var $username = null; | 
					
						
							|  |  |  |     var $password = null; | 
					
						
							|  |  |  |     var $base = null; | 
					
						
							|  |  |  |     var $con = null; | 
					
						
							| 
									
										
										
										
											2010-01-12 19:57:15 -08:00
										 |  |  |      | 
					
						
							|  |  |  |     protected $sites = array(); | 
					
						
							| 
									
										
										
										
											2009-06-28 14:38:31 -04:00
										 |  |  | 
 | 
					
						
							|  |  |  |     function __construct() | 
					
						
							|  |  |  |     { | 
					
						
							| 
									
										
										
										
											2010-01-12 19:57:15 -08:00
										 |  |  |         parent::__construct(); | 
					
						
							| 
									
										
										
										
											2009-06-28 14:38:31 -04:00
										 |  |  |         $this->server   = common_config('queue', 'stomp_server'); | 
					
						
							|  |  |  |         $this->username = common_config('queue', 'stomp_username'); | 
					
						
							|  |  |  |         $this->password = common_config('queue', 'stomp_password'); | 
					
						
							|  |  |  |         $this->base     = common_config('queue', 'queue_basename'); | 
					
						
							|  |  |  |     } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2010-01-12 19:57:15 -08:00
										 |  |  |     /** | 
					
						
							|  |  |  |      * Tell the i/o master we only need a single instance to cover | 
					
						
							|  |  |  |      * all sites running in this process. | 
					
						
							|  |  |  |      */ | 
					
						
							|  |  |  |     public static function multiSite() | 
					
						
							| 
									
										
										
										
											2009-06-28 14:38:31 -04:00
										 |  |  |     { | 
					
						
							| 
									
										
										
										
											2010-01-12 19:57:15 -08:00
										 |  |  |         return IoManager::INSTANCE_PER_PROCESS; | 
					
						
							|  |  |  |     } | 
					
						
							| 
									
										
										
										
											2009-06-28 14:38:31 -04:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2010-01-12 19:57:15 -08:00
										 |  |  |     /** | 
					
						
							|  |  |  |      * Record each site we'll be handling input for in this process, | 
					
						
							|  |  |  |      * so we can listen to the necessary queues for it. | 
					
						
							|  |  |  |      * | 
					
						
							|  |  |  |      * @fixme possibly actually do subscription here to save another | 
					
						
							|  |  |  |      *        loop over all sites later? | 
					
						
							| 
									
										
										
										
											2010-01-14 00:19:25 -08:00
										 |  |  |      * @fixme possibly don't assume it's the current site | 
					
						
							| 
									
										
										
										
											2010-01-12 19:57:15 -08:00
										 |  |  |      */ | 
					
						
							|  |  |  |     public function addSite($server) | 
					
						
							|  |  |  |     { | 
					
						
							|  |  |  |         $this->sites[] = $server; | 
					
						
							| 
									
										
										
										
											2010-01-14 00:19:25 -08:00
										 |  |  |         $this->initialize(); | 
					
						
							|  |  |  |     } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     /** | 
					
						
							|  |  |  |      * Instantiate the appropriate QueueHandler class for the given queue. | 
					
						
							|  |  |  |      * | 
					
						
							|  |  |  |      * @param string $queue | 
					
						
							|  |  |  |      * @return mixed QueueHandler or null | 
					
						
							|  |  |  |      */ | 
					
						
							|  |  |  |     function getHandler($queue) | 
					
						
							|  |  |  |     { | 
					
						
							|  |  |  |         $handlers = $this->handlers[common_config('site', 'server')]; | 
					
						
							|  |  |  |         if (isset($handlers[$queue])) { | 
					
						
							|  |  |  |             $class = $handlers[$queue]; | 
					
						
							|  |  |  |             if (class_exists($class)) { | 
					
						
							|  |  |  |                 return new $class(); | 
					
						
							|  |  |  |             } else { | 
					
						
							|  |  |  |                 common_log(LOG_ERR, "Nonexistent handler class '$class' for queue '$queue'"); | 
					
						
							|  |  |  |             } | 
					
						
							|  |  |  |         } else { | 
					
						
							|  |  |  |             common_log(LOG_ERR, "Requested handler for unkown queue '$queue'"); | 
					
						
							|  |  |  |         } | 
					
						
							|  |  |  |         return null; | 
					
						
							|  |  |  |     } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     /** | 
					
						
							|  |  |  |      * Get a list of all registered queue transport names. | 
					
						
							|  |  |  |      * | 
					
						
							|  |  |  |      * @return array of strings | 
					
						
							|  |  |  |      */ | 
					
						
							|  |  |  |     function getQueues() | 
					
						
							|  |  |  |     { | 
					
						
							| 
									
										
										
										
											2010-01-21 16:42:50 -08:00
										 |  |  |         $group = $this->activeGroup(); | 
					
						
							| 
									
										
										
										
											2010-01-15 11:13:06 -08:00
										 |  |  |         $site = common_config('site', 'server'); | 
					
						
							| 
									
										
										
										
											2010-01-21 16:42:50 -08:00
										 |  |  |         if (empty($this->groups[$site][$group])) { | 
					
						
							| 
									
										
										
										
											2010-01-15 11:13:06 -08:00
										 |  |  |             return array(); | 
					
						
							|  |  |  |         } else { | 
					
						
							| 
									
										
										
										
											2010-01-21 16:42:50 -08:00
										 |  |  |             return array_keys($this->groups[$site][$group]); | 
					
						
							| 
									
										
										
										
											2010-01-15 11:13:06 -08:00
										 |  |  |         } | 
					
						
							| 
									
										
										
										
											2010-01-14 00:19:25 -08:00
										 |  |  |     } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     /** | 
					
						
							|  |  |  |      * Register a queue transport name and handler class for your plugin. | 
					
						
							|  |  |  |      * Only registered transports will be reliably picked up! | 
					
						
							|  |  |  |      * | 
					
						
							|  |  |  |      * @param string $transport | 
					
						
							|  |  |  |      * @param string $class | 
					
						
							| 
									
										
										
										
											2010-01-21 16:42:50 -08:00
										 |  |  |      * @param string $group | 
					
						
							| 
									
										
										
										
											2010-01-14 00:19:25 -08:00
										 |  |  |      */ | 
					
						
							| 
									
										
										
										
											2010-01-21 16:42:50 -08:00
										 |  |  |     public function connect($transport, $class, $group='queuedaemon') | 
					
						
							| 
									
										
										
										
											2010-01-14 00:19:25 -08:00
										 |  |  |     { | 
					
						
							|  |  |  |         $this->handlers[common_config('site', 'server')][$transport] = $class; | 
					
						
							| 
									
										
										
										
											2010-01-21 16:42:50 -08:00
										 |  |  |         $this->groups[common_config('site', 'server')][$group][$transport] = $class; | 
					
						
							| 
									
										
										
										
											2009-06-28 14:38:31 -04:00
										 |  |  |     } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2010-01-12 19:57:15 -08:00
										 |  |  |     /** | 
					
						
							|  |  |  |      * Saves a notice object reference into the queue item table. | 
					
						
							|  |  |  |      * @return boolean true on success | 
					
						
							|  |  |  |      */ | 
					
						
							|  |  |  |     public function enqueue($object, $queue) | 
					
						
							| 
									
										
										
										
											2009-06-28 14:38:31 -04:00
										 |  |  |     { | 
					
						
							| 
									
										
										
										
											2010-01-21 16:42:50 -08:00
										 |  |  |         $msg = $this->encode($object); | 
					
						
							|  |  |  |         $rep = $this->logrep($object); | 
					
						
							| 
									
										
										
										
											2009-06-28 14:38:31 -04:00
										 |  |  | 
 | 
					
						
							|  |  |  |         $this->_connect(); | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2009-07-01 11:10:23 -04:00
										 |  |  |         // XXX: serialize and send entire notice
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2010-01-12 19:57:15 -08:00
										 |  |  |         $result = $this->con->send($this->queueName($queue), | 
					
						
							| 
									
										
										
										
											2010-01-21 16:42:50 -08:00
										 |  |  |                                    $msg, 		// BODY of the message
 | 
					
						
							|  |  |  |                                    array ('created' => common_sql_now())); | 
					
						
							| 
									
										
										
										
											2009-06-28 14:38:31 -04:00
										 |  |  | 
 | 
					
						
							|  |  |  |         if (!$result) { | 
					
						
							| 
									
										
										
										
											2010-01-21 16:42:50 -08:00
										 |  |  |             common_log(LOG_ERR, "Error sending $rep to $queue queue"); | 
					
						
							| 
									
										
										
										
											2009-06-28 14:38:31 -04:00
										 |  |  |             return false; | 
					
						
							|  |  |  |         } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2010-01-21 16:42:50 -08:00
										 |  |  |         common_log(LOG_DEBUG, "complete remote queueing $rep for $queue"); | 
					
						
							| 
									
										
										
										
											2010-01-12 19:57:15 -08:00
										 |  |  |         $this->stats('enqueued', $queue); | 
					
						
							| 
									
										
										
										
											2009-06-28 14:38:31 -04:00
										 |  |  |     } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2010-01-12 19:57:15 -08:00
										 |  |  |     /** | 
					
						
							|  |  |  |      * Send any sockets we're listening on to the IO manager | 
					
						
							|  |  |  |      * to wait for input. | 
					
						
							|  |  |  |      * | 
					
						
							|  |  |  |      * @return array of resources | 
					
						
							|  |  |  |      */ | 
					
						
							|  |  |  |     public function getSockets() | 
					
						
							| 
									
										
										
										
											2009-06-28 14:38:31 -04:00
										 |  |  |     { | 
					
						
							| 
									
										
										
										
											2010-01-12 19:57:15 -08:00
										 |  |  |         return array($this->con->getSocket()); | 
					
						
							|  |  |  |     } | 
					
						
							| 
									
										
										
										
											2009-06-28 14:38:31 -04:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2010-01-12 19:57:15 -08:00
										 |  |  |     /** | 
					
						
							|  |  |  |      * We've got input to handle on our socket! | 
					
						
							|  |  |  |      * Read any waiting Stomp frame(s) and process them. | 
					
						
							|  |  |  |      * | 
					
						
							|  |  |  |      * @param resource $socket | 
					
						
							|  |  |  |      * @return boolean ok on success | 
					
						
							|  |  |  |      */ | 
					
						
							|  |  |  |     public function handleInput($socket) | 
					
						
							|  |  |  |     { | 
					
						
							|  |  |  |         assert($socket === $this->con->getSocket()); | 
					
						
							|  |  |  |         $ok = true; | 
					
						
							|  |  |  |         $frames = $this->con->readFrames(); | 
					
						
							|  |  |  |         foreach ($frames as $frame) { | 
					
						
							| 
									
										
										
										
											2010-01-21 16:42:50 -08:00
										 |  |  |             $ok = $ok && $this->_handleItem($frame); | 
					
						
							| 
									
										
										
										
											2010-01-12 19:57:15 -08:00
										 |  |  |         } | 
					
						
							|  |  |  |         return $ok; | 
					
						
							|  |  |  |     } | 
					
						
							| 
									
										
										
										
											2009-06-28 14:38:31 -04:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2010-01-12 19:57:15 -08:00
										 |  |  |     /** | 
					
						
							|  |  |  |      * Initialize our connection and subscribe to all the queues | 
					
						
							|  |  |  |      * we're going to need to handle... | 
					
						
							|  |  |  |      * | 
					
						
							|  |  |  |      * Side effects: in multi-site mode, may reset site configuration. | 
					
						
							|  |  |  |      * | 
					
						
							|  |  |  |      * @param IoMaster $master process/event controller | 
					
						
							|  |  |  |      * @return bool return false on failure | 
					
						
							|  |  |  |      */ | 
					
						
							|  |  |  |     public function start($master) | 
					
						
							|  |  |  |     { | 
					
						
							|  |  |  |         parent::start($master); | 
					
						
							|  |  |  |         if ($this->sites) { | 
					
						
							|  |  |  |             foreach ($this->sites as $server) { | 
					
						
							|  |  |  |                 StatusNet::init($server); | 
					
						
							|  |  |  |                 $this->doSubscribe(); | 
					
						
							|  |  |  |             } | 
					
						
							|  |  |  |         } else { | 
					
						
							|  |  |  |             $this->doSubscribe(); | 
					
						
							|  |  |  |         } | 
					
						
							|  |  |  |         return true; | 
					
						
							|  |  |  |     } | 
					
						
							|  |  |  |      | 
					
						
							|  |  |  |     /** | 
					
						
							|  |  |  |      * Subscribe to all the queues we're going to need to handle... | 
					
						
							|  |  |  |      * | 
					
						
							|  |  |  |      * Side effects: in multi-site mode, may reset site configuration. | 
					
						
							|  |  |  |      * | 
					
						
							|  |  |  |      * @return bool return false on failure | 
					
						
							|  |  |  |      */ | 
					
						
							|  |  |  |     public function finish() | 
					
						
							|  |  |  |     { | 
					
						
							|  |  |  |         if ($this->sites) { | 
					
						
							|  |  |  |             foreach ($this->sites as $server) { | 
					
						
							|  |  |  |                 StatusNet::init($server); | 
					
						
							|  |  |  |                 $this->doUnsubscribe(); | 
					
						
							|  |  |  |             } | 
					
						
							|  |  |  |         } else { | 
					
						
							|  |  |  |             $this->doUnsubscribe(); | 
					
						
							|  |  |  |         } | 
					
						
							|  |  |  |         return true; | 
					
						
							|  |  |  |     } | 
					
						
							|  |  |  |      | 
					
						
							|  |  |  |     /** | 
					
						
							|  |  |  |      * Lazy open connection to Stomp queue server. | 
					
						
							|  |  |  |      */ | 
					
						
							|  |  |  |     protected function _connect() | 
					
						
							|  |  |  |     { | 
					
						
							|  |  |  |         if (empty($this->con)) { | 
					
						
							|  |  |  |             $this->_log(LOG_INFO, "Connecting to '$this->server' as '$this->username'..."); | 
					
						
							|  |  |  |             $this->con = new LiberalStomp($this->server); | 
					
						
							| 
									
										
										
										
											2009-06-28 14:38:31 -04:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2010-01-12 19:57:15 -08:00
										 |  |  |             if ($this->con->connect($this->username, $this->password)) { | 
					
						
							|  |  |  |                 $this->_log(LOG_INFO, "Connected."); | 
					
						
							|  |  |  |             } else { | 
					
						
							|  |  |  |                 $this->_log(LOG_ERR, 'Failed to connect to queue server'); | 
					
						
							|  |  |  |                 throw new ServerException('Failed to connect to queue server'); | 
					
						
							|  |  |  |             } | 
					
						
							|  |  |  |         } | 
					
						
							|  |  |  |     } | 
					
						
							| 
									
										
										
										
											2009-07-01 11:10:23 -04:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2010-01-12 19:57:15 -08:00
										 |  |  |     /** | 
					
						
							|  |  |  |      * Subscribe to all enabled notice queues for the current site. | 
					
						
							|  |  |  |      */ | 
					
						
							|  |  |  |     protected function doSubscribe() | 
					
						
							|  |  |  |     { | 
					
						
							|  |  |  |         $this->_connect(); | 
					
						
							|  |  |  |         foreach ($this->getQueues() as $queue) { | 
					
						
							|  |  |  |             $rawqueue = $this->queueName($queue); | 
					
						
							|  |  |  |             $this->_log(LOG_INFO, "Subscribing to $rawqueue"); | 
					
						
							|  |  |  |             $this->con->subscribe($rawqueue); | 
					
						
							|  |  |  |         } | 
					
						
							|  |  |  |     } | 
					
						
							|  |  |  |      | 
					
						
							|  |  |  |     /** | 
					
						
							|  |  |  |      * Subscribe from all enabled notice queues for the current site. | 
					
						
							|  |  |  |      */ | 
					
						
							|  |  |  |     protected function doUnsubscribe() | 
					
						
							|  |  |  |     { | 
					
						
							|  |  |  |         $this->_connect(); | 
					
						
							|  |  |  |         foreach ($this->getQueues() as $queue) { | 
					
						
							|  |  |  |             $this->con->unsubscribe($this->queueName($queue)); | 
					
						
							|  |  |  |         } | 
					
						
							|  |  |  |     } | 
					
						
							| 
									
										
										
										
											2009-06-28 14:38:31 -04:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2010-01-12 19:57:15 -08:00
										 |  |  |     /** | 
					
						
							| 
									
										
										
										
											2010-01-21 16:42:50 -08:00
										 |  |  |      * Handle and acknowledge an event that's come in through a queue. | 
					
						
							| 
									
										
										
										
											2010-01-12 19:57:15 -08:00
										 |  |  |      * | 
					
						
							|  |  |  |      * If the queue handler reports failure, the message is requeued for later. | 
					
						
							|  |  |  |      * Missing notices or handler classes will drop the message. | 
					
						
							|  |  |  |      * | 
					
						
							|  |  |  |      * Side effects: in multi-site mode, may reset site configuration to | 
					
						
							|  |  |  |      * match the site that queued the event. | 
					
						
							|  |  |  |      * | 
					
						
							|  |  |  |      * @param StompFrame $frame | 
					
						
							|  |  |  |      * @return bool | 
					
						
							|  |  |  |      */ | 
					
						
							| 
									
										
										
										
											2010-01-21 16:42:50 -08:00
										 |  |  |     protected function _handleItem($frame) | 
					
						
							| 
									
										
										
										
											2010-01-12 19:57:15 -08:00
										 |  |  |     { | 
					
						
							|  |  |  |         list($site, $queue) = $this->parseDestination($frame->headers['destination']); | 
					
						
							|  |  |  |         if ($site != common_config('site', 'server')) { | 
					
						
							|  |  |  |             $this->stats('switch'); | 
					
						
							|  |  |  |             StatusNet::init($site); | 
					
						
							|  |  |  |         } | 
					
						
							| 
									
										
										
										
											2009-06-28 14:38:31 -04:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2010-01-21 16:42:50 -08:00
										 |  |  |         if (is_numeric($frame->body)) { | 
					
						
							|  |  |  |             $id = intval($frame->body); | 
					
						
							|  |  |  |             $info = "notice $id posted at {$frame->headers['created']} in queue $queue"; | 
					
						
							| 
									
										
										
										
											2009-07-09 12:09:20 -04:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2010-01-21 16:42:50 -08:00
										 |  |  |             $notice = Notice::staticGet('id', $id); | 
					
						
							|  |  |  |             if (empty($notice)) { | 
					
						
							|  |  |  |                 $this->_log(LOG_WARNING, "Skipping missing $info"); | 
					
						
							|  |  |  |                 $this->con->ack($frame); | 
					
						
							|  |  |  |                 $this->stats('badnotice', $queue); | 
					
						
							|  |  |  |                 return false; | 
					
						
							|  |  |  |             } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             $item = $notice; | 
					
						
							|  |  |  |         } else { | 
					
						
							|  |  |  |             // @fixme should we serialize, or json, or what here?
 | 
					
						
							|  |  |  |             $info = "string posted at {$frame->headers['created']} in queue $queue"; | 
					
						
							|  |  |  |             $item = $frame->body; | 
					
						
							| 
									
										
										
										
											2010-01-12 19:57:15 -08:00
										 |  |  |         } | 
					
						
							| 
									
										
										
										
											2009-07-09 12:09:20 -04:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2010-01-12 19:57:15 -08:00
										 |  |  |         $handler = $this->getHandler($queue); | 
					
						
							|  |  |  |         if (!$handler) { | 
					
						
							|  |  |  |             $this->_log(LOG_ERROR, "Missing handler class; skipping $info"); | 
					
						
							|  |  |  |             $this->con->ack($frame); | 
					
						
							|  |  |  |             $this->stats('badhandler', $queue); | 
					
						
							|  |  |  |             return false; | 
					
						
							|  |  |  |         } | 
					
						
							| 
									
										
										
										
											2009-07-09 12:09:20 -04:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2010-01-21 16:42:50 -08:00
										 |  |  |         $ok = $handler->handle($item); | 
					
						
							| 
									
										
										
										
											2009-07-09 12:09:20 -04:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2010-01-12 19:57:15 -08:00
										 |  |  |         if (!$ok) { | 
					
						
							|  |  |  |             $this->_log(LOG_WARNING, "Failed handling $info"); | 
					
						
							|  |  |  |             // FIXME we probably shouldn't have to do
 | 
					
						
							|  |  |  |             // this kind of queue management ourselves;
 | 
					
						
							|  |  |  |             // if we don't ack, it should resend...
 | 
					
						
							|  |  |  |             $this->con->ack($frame); | 
					
						
							| 
									
										
										
										
											2010-01-21 16:42:50 -08:00
										 |  |  |             $this->enqueue($item, $queue); | 
					
						
							| 
									
										
										
										
											2010-01-12 19:57:15 -08:00
										 |  |  |             $this->stats('requeued', $queue); | 
					
						
							|  |  |  |             return false; | 
					
						
							| 
									
										
										
										
											2009-07-09 12:33:38 -04:00
										 |  |  |         } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2010-01-12 19:57:15 -08:00
										 |  |  |         $this->_log(LOG_INFO, "Successfully handled $info"); | 
					
						
							|  |  |  |         $this->con->ack($frame); | 
					
						
							|  |  |  |         $this->stats('handled', $queue); | 
					
						
							|  |  |  |         return true; | 
					
						
							| 
									
										
										
										
											2009-07-09 12:33:38 -04:00
										 |  |  |     } | 
					
						
							| 
									
										
										
										
											2009-07-09 11:40:01 -04:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2010-01-12 19:57:15 -08:00
										 |  |  |     /** | 
					
						
							|  |  |  |      * Combines the queue_basename from configuration with the | 
					
						
							|  |  |  |      * site server name and queue name to give eg: | 
					
						
							|  |  |  |      * | 
					
						
							|  |  |  |      * /queue/statusnet/identi.ca/sms | 
					
						
							|  |  |  |      * | 
					
						
							|  |  |  |      * @param string $queue | 
					
						
							|  |  |  |      * @return string | 
					
						
							|  |  |  |      */ | 
					
						
							|  |  |  |     protected function queueName($queue) | 
					
						
							| 
									
										
										
										
											2009-07-09 12:33:38 -04:00
										 |  |  |     { | 
					
						
							| 
									
										
										
										
											2010-01-12 19:57:15 -08:00
										 |  |  |         return common_config('queue', 'queue_basename') . | 
					
						
							|  |  |  |             common_config('site', 'server') . '/' . $queue; | 
					
						
							| 
									
										
										
										
											2009-07-01 11:10:23 -04:00
										 |  |  |     } | 
					
						
							| 
									
										
										
										
											2009-07-02 12:43:09 -04:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2010-01-12 19:57:15 -08:00
										 |  |  |     /** | 
					
						
							|  |  |  |      * Returns the site and queue name from the server-side queue. | 
					
						
							|  |  |  |      * | 
					
						
							|  |  |  |      * @param string queue destination (eg '/queue/statusnet/identi.ca/sms') | 
					
						
							|  |  |  |      * @return array of site and queue: ('identi.ca','sms') or false if unrecognized | 
					
						
							|  |  |  |      */ | 
					
						
							|  |  |  |     protected function parseDestination($dest) | 
					
						
							| 
									
										
										
										
											2009-07-03 10:05:07 -04:00
										 |  |  |     { | 
					
						
							| 
									
										
										
										
											2010-01-12 19:57:15 -08:00
										 |  |  |         $prefix = common_config('queue', 'queue_basename'); | 
					
						
							|  |  |  |         if (substr($dest, 0, strlen($prefix)) == $prefix) { | 
					
						
							|  |  |  |             $rest = substr($dest, strlen($prefix)); | 
					
						
							|  |  |  |             return explode("/", $rest, 2); | 
					
						
							|  |  |  |         } else { | 
					
						
							|  |  |  |             common_log(LOG_ERR, "Got a message from unrecognized stomp queue: $dest"); | 
					
						
							|  |  |  |             return array(false, false); | 
					
						
							|  |  |  |         } | 
					
						
							| 
									
										
										
										
											2009-07-03 10:05:07 -04:00
										 |  |  |     } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2009-07-02 12:43:09 -04:00
										 |  |  |     function _log($level, $msg) | 
					
						
							|  |  |  |     { | 
					
						
							|  |  |  |         common_log($level, 'StompQueueManager: '.$msg); | 
					
						
							|  |  |  |     } | 
					
						
							| 
									
										
										
										
											2009-06-28 14:38:31 -04:00
										 |  |  | } | 
					
						
							| 
									
										
										
										
											2010-01-12 19:57:15 -08:00
										 |  |  | 
 |