| 
									
										
										
										
											2010-01-12 19:57:15 -08:00
										 |  |  | <?php | 
					
						
							|  |  |  | /** | 
					
						
							|  |  |  |  * StatusNet, the distributed open-source microblogging tool | 
					
						
							|  |  |  |  * | 
					
						
							|  |  |  |  * I/O manager to wrap around socket-reading and polling queue & connection managers. | 
					
						
							|  |  |  |  * | 
					
						
							|  |  |  |  * PHP version 5 | 
					
						
							|  |  |  |  * | 
					
						
							|  |  |  |  * LICENCE: This program is free software: you can redistribute it and/or modify | 
					
						
							|  |  |  |  * it under the terms of the GNU Affero General Public License as published by | 
					
						
							|  |  |  |  * the Free Software Foundation, either version 3 of the License, or | 
					
						
							|  |  |  |  * (at your option) any later version. | 
					
						
							|  |  |  |  * | 
					
						
							|  |  |  |  * This program is distributed in the hope that it will be useful, | 
					
						
							|  |  |  |  * but WITHOUT ANY WARRANTY; without even the implied warranty of | 
					
						
							|  |  |  |  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the | 
					
						
							|  |  |  |  * GNU Affero General Public License for more details. | 
					
						
							|  |  |  |  * | 
					
						
							|  |  |  |  * You should have received a copy of the GNU Affero General Public License | 
					
						
							|  |  |  |  * along with this program.  If not, see <http://www.gnu.org/licenses/>. | 
					
						
							|  |  |  |  * | 
					
						
							|  |  |  |  * @category  QueueManager | 
					
						
							|  |  |  |  * @package   StatusNet | 
					
						
							|  |  |  |  * @author    Brion Vibber <brion@status.net> | 
					
						
							|  |  |  |  * @copyright 2009 StatusNet, Inc. | 
					
						
							|  |  |  |  * @license   http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0 | 
					
						
							|  |  |  |  * @link      http://status.net/ | 
					
						
							|  |  |  |  */ | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class IoMaster | 
					
						
							|  |  |  | { | 
					
						
							|  |  |  |     public $id; | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     protected $multiSite = false; | 
					
						
							|  |  |  |     protected $managers = array(); | 
					
						
							|  |  |  |     protected $singletons = array(); | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     protected $pollTimeouts = array(); | 
					
						
							|  |  |  |     protected $lastPoll = array(); | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     /** | 
					
						
							|  |  |  |      * @param string $id process ID to use in logging/monitoring | 
					
						
							|  |  |  |      */ | 
					
						
							|  |  |  |     public function __construct($id) | 
					
						
							|  |  |  |     { | 
					
						
							|  |  |  |         $this->id = $id; | 
					
						
							|  |  |  |         $this->monitor = new QueueMonitor(); | 
					
						
							|  |  |  |     } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     public function init($multiSite=null) | 
					
						
							|  |  |  |     { | 
					
						
							|  |  |  |         if ($multiSite !== null) { | 
					
						
							|  |  |  |             $this->multiSite = $multiSite; | 
					
						
							|  |  |  |         } | 
					
						
							|  |  |  |         if ($this->multiSite) { | 
					
						
							|  |  |  |             $this->sites = $this->findAllSites(); | 
					
						
							|  |  |  |         } else { | 
					
						
							|  |  |  |             $this->sites = array(common_config('site', 'server')); | 
					
						
							|  |  |  |         } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         if (empty($this->sites)) { | 
					
						
							|  |  |  |             throw new Exception("Empty status_network table, cannot init"); | 
					
						
							|  |  |  |         } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         foreach ($this->sites as $site) { | 
					
						
							|  |  |  |             if ($site != common_config('site', 'server')) { | 
					
						
							|  |  |  |                 StatusNet::init($site); | 
					
						
							|  |  |  |             } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             $classes = array(); | 
					
						
							|  |  |  |             if (Event::handle('StartIoManagerClasses', array(&$classes))) { | 
					
						
							|  |  |  |                 $classes[] = 'QueueManager'; | 
					
						
							| 
									
										
										
										
											2010-01-14 17:14:41 -08:00
										 |  |  |                 if (common_config('xmpp', 'enabled') && !defined('XMPP_EMERGENCY_FLAG')) { | 
					
						
							| 
									
										
										
										
											2010-01-12 19:57:15 -08:00
										 |  |  |                     $classes[] = 'XmppManager'; // handles pings/reconnects
 | 
					
						
							|  |  |  |                     $classes[] = 'XmppConfirmManager'; // polls for outgoing confirmations
 | 
					
						
							|  |  |  |                 } | 
					
						
							|  |  |  |             } | 
					
						
							|  |  |  |             Event::handle('EndIoManagerClasses', array(&$classes)); | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             foreach ($classes as $class) { | 
					
						
							|  |  |  |                 $this->instantiate($class); | 
					
						
							|  |  |  |             } | 
					
						
							|  |  |  |         } | 
					
						
							|  |  |  |     } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     /** | 
					
						
							|  |  |  |      * Pull all local sites from status_network table. | 
					
						
							|  |  |  |      * @return array of hostnames | 
					
						
							|  |  |  |      */ | 
					
						
							|  |  |  |     protected function findAllSites() | 
					
						
							|  |  |  |     { | 
					
						
							|  |  |  |         $hosts = array(); | 
					
						
							|  |  |  |         $sn = new Status_network(); | 
					
						
							|  |  |  |         $sn->find(); | 
					
						
							|  |  |  |         while ($sn->fetch()) { | 
					
						
							|  |  |  |             $hosts[] = $sn->hostname; | 
					
						
							|  |  |  |         } | 
					
						
							|  |  |  |         return $hosts; | 
					
						
							|  |  |  |     } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     /** | 
					
						
							|  |  |  |      * Instantiate an i/o manager class for the current site. | 
					
						
							|  |  |  |      * If a multi-site capable handler is already present, | 
					
						
							|  |  |  |      * we don't need to build a new one. | 
					
						
							|  |  |  |      * | 
					
						
							|  |  |  |      * @param string $class | 
					
						
							|  |  |  |      */ | 
					
						
							|  |  |  |     protected function instantiate($class) | 
					
						
							|  |  |  |     { | 
					
						
							|  |  |  |         if (isset($this->singletons[$class])) { | 
					
						
							|  |  |  |             // Already instantiated a multi-site-capable handler.
 | 
					
						
							|  |  |  |             // Just let it know it should listen to this site too!
 | 
					
						
							|  |  |  |             $this->singletons[$class]->addSite(common_config('site', 'server')); | 
					
						
							|  |  |  |             return; | 
					
						
							|  |  |  |         } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         $manager = $this->getManager($class); | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         if ($this->multiSite) { | 
					
						
							|  |  |  |             $caps = $manager->multiSite(); | 
					
						
							|  |  |  |             if ($caps == IoManager::SINGLE_ONLY) { | 
					
						
							|  |  |  |                 throw new Exception("$class can't run with --all; aborting."); | 
					
						
							|  |  |  |             } | 
					
						
							|  |  |  |             if ($caps == IoManager::INSTANCE_PER_PROCESS) { | 
					
						
							|  |  |  |                 // Save this guy for later!
 | 
					
						
							|  |  |  |                 // We'll only need the one to cover multiple sites.
 | 
					
						
							|  |  |  |                 $this->singletons[$class] = $manager; | 
					
						
							|  |  |  |                 $manager->addSite(common_config('site', 'server')); | 
					
						
							|  |  |  |             } | 
					
						
							|  |  |  |         } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         $this->managers[] = $manager; | 
					
						
							|  |  |  |     } | 
					
						
							|  |  |  |      | 
					
						
							|  |  |  |     protected function getManager($class) | 
					
						
							|  |  |  |     { | 
					
						
							|  |  |  |         return call_user_func(array($class, 'get')); | 
					
						
							|  |  |  |     } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     /** | 
					
						
							|  |  |  |      * Basic run loop... | 
					
						
							|  |  |  |      * | 
					
						
							|  |  |  |      * Initialize all io managers, then sit around waiting for input. | 
					
						
							|  |  |  |      * Between events or timeouts, pass control back to idle() method | 
					
						
							|  |  |  |      * to allow for any additional background processing. | 
					
						
							|  |  |  |      */ | 
					
						
							|  |  |  |     function service() | 
					
						
							|  |  |  |     { | 
					
						
							|  |  |  |         $this->logState('init'); | 
					
						
							|  |  |  |         $this->start(); | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         while (true) { | 
					
						
							|  |  |  |             $timeouts = array_values($this->pollTimeouts); | 
					
						
							|  |  |  |             $timeouts[] = 60; // default max timeout
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             // Wait for something on one of our sockets
 | 
					
						
							|  |  |  |             $sockets = array(); | 
					
						
							|  |  |  |             $managers = array(); | 
					
						
							|  |  |  |             foreach ($this->managers as $manager) { | 
					
						
							|  |  |  |                 foreach ($manager->getSockets() as $socket) { | 
					
						
							|  |  |  |                     $sockets[] = $socket; | 
					
						
							|  |  |  |                     $managers[] = $manager; | 
					
						
							|  |  |  |                 } | 
					
						
							|  |  |  |                 $timeouts[] = intval($manager->timeout()); | 
					
						
							|  |  |  |             } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             $timeout = min($timeouts); | 
					
						
							|  |  |  |             if ($sockets) { | 
					
						
							|  |  |  |                 $read = $sockets; | 
					
						
							|  |  |  |                 $write = array(); | 
					
						
							|  |  |  |                 $except = array(); | 
					
						
							|  |  |  |                 $this->logState('listening'); | 
					
						
							|  |  |  |                 common_log(LOG_INFO, "Waiting up to $timeout seconds for socket data..."); | 
					
						
							|  |  |  |                 $ready = stream_select($read, $write, $except, $timeout, 0); | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |                 if ($ready === false) { | 
					
						
							|  |  |  |                     common_log(LOG_ERR, "Error selecting on sockets"); | 
					
						
							|  |  |  |                 } else if ($ready > 0) { | 
					
						
							|  |  |  |                     foreach ($read as $socket) { | 
					
						
							|  |  |  |                         $index = array_search($socket, $sockets, true); | 
					
						
							|  |  |  |                         if ($index !== false) { | 
					
						
							|  |  |  |                             $this->logState('queue'); | 
					
						
							|  |  |  |                             $managers[$index]->handleInput($socket); | 
					
						
							|  |  |  |                         } else { | 
					
						
							|  |  |  |                             common_log(LOG_ERR, "Saw input on a socket we didn't listen to"); | 
					
						
							|  |  |  |                         } | 
					
						
							|  |  |  |                     } | 
					
						
							|  |  |  |                 } | 
					
						
							|  |  |  |             } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             if ($timeout > 0 && empty($sockets)) { | 
					
						
							|  |  |  |                 // If we had no listeners, sleep until the pollers' next requested wakeup.
 | 
					
						
							|  |  |  |                 common_log(LOG_INFO, "Sleeping $timeout seconds until next poll cycle..."); | 
					
						
							|  |  |  |                 $this->logState('sleep'); | 
					
						
							|  |  |  |                 sleep($timeout); | 
					
						
							|  |  |  |             } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             $this->logState('poll'); | 
					
						
							|  |  |  |             $this->poll(); | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             $this->logState('idle'); | 
					
						
							|  |  |  |             $this->idle(); | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             $memoryLimit = $this->softMemoryLimit(); | 
					
						
							|  |  |  |             if ($memoryLimit > 0) { | 
					
						
							|  |  |  |                 $usage = memory_get_usage(); | 
					
						
							|  |  |  |                 if ($usage > $memoryLimit) { | 
					
						
							|  |  |  |                     common_log(LOG_INFO, "Queue thread hit soft memory limit ($usage > $memoryLimit); gracefully restarting."); | 
					
						
							|  |  |  |                     break; | 
					
						
							|  |  |  |                 } | 
					
						
							|  |  |  |             } | 
					
						
							|  |  |  |         } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         $this->logState('shutdown'); | 
					
						
							|  |  |  |         $this->finish(); | 
					
						
							|  |  |  |     } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     /** | 
					
						
							|  |  |  |      * Return fully-parsed soft memory limit in bytes. | 
					
						
							|  |  |  |      * @return intval 0 or -1 if not set | 
					
						
							|  |  |  |      */ | 
					
						
							|  |  |  |     function softMemoryLimit() | 
					
						
							|  |  |  |     { | 
					
						
							|  |  |  |         $softLimit = trim(common_config('queue', 'softlimit')); | 
					
						
							|  |  |  |         if (substr($softLimit, -1) == '%') { | 
					
						
							|  |  |  |             $limit = trim(ini_get('memory_limit')); | 
					
						
							|  |  |  |             $limit = $this->parseMemoryLimit($limit); | 
					
						
							|  |  |  |             if ($limit > 0) { | 
					
						
							|  |  |  |                 return intval(substr($softLimit, 0, -1) * $limit / 100); | 
					
						
							|  |  |  |             } else { | 
					
						
							|  |  |  |                 return -1; | 
					
						
							|  |  |  |             } | 
					
						
							|  |  |  |         } else { | 
					
						
							| 
									
										
										
										
											2010-01-13 21:35:47 -08:00
										 |  |  |             return $this->parseMemoryLimit($softLimit); | 
					
						
							| 
									
										
										
										
											2010-01-12 19:57:15 -08:00
										 |  |  |         } | 
					
						
							|  |  |  |         return $softLimit; | 
					
						
							|  |  |  |     } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     /** | 
					
						
							|  |  |  |      * Interpret PHP shorthand for memory_limit and friends. | 
					
						
							|  |  |  |      * Why don't they just expose the actual numeric value? :P | 
					
						
							|  |  |  |      * @param string $mem | 
					
						
							|  |  |  |      * @return int | 
					
						
							|  |  |  |      */ | 
					
						
							|  |  |  |     protected function parseMemoryLimit($mem) | 
					
						
							|  |  |  |     { | 
					
						
							|  |  |  |         // http://www.php.net/manual/en/faq.using.php#faq.using.shorthandbytes
 | 
					
						
							|  |  |  |         $size = array('k' => 1024, | 
					
						
							|  |  |  |                       'm' => 1024*1024, | 
					
						
							|  |  |  |                       'g' => 1024*1024*1024); | 
					
						
							|  |  |  |         if (empty($mem)) { | 
					
						
							|  |  |  |             return 0; | 
					
						
							|  |  |  |         } else if (is_numeric($mem)) { | 
					
						
							|  |  |  |             return intval($mem); | 
					
						
							|  |  |  |         } else { | 
					
						
							|  |  |  |             $mult = strtolower(substr($mem, -1)); | 
					
						
							|  |  |  |             if (isset($size[$mult])) { | 
					
						
							|  |  |  |                 return substr($mem, 0, -1) * $size[$mult]; | 
					
						
							|  |  |  |             } else { | 
					
						
							|  |  |  |                 return intval($mem); | 
					
						
							|  |  |  |             } | 
					
						
							|  |  |  |         } | 
					
						
							|  |  |  |     } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     function start() | 
					
						
							|  |  |  |     { | 
					
						
							|  |  |  |         foreach ($this->managers as $index => $manager) { | 
					
						
							|  |  |  |             $manager->start($this); | 
					
						
							|  |  |  |             // @fixme error check
 | 
					
						
							|  |  |  |             if ($manager->pollInterval()) { | 
					
						
							|  |  |  |                 // We'll want to check for input on the first pass
 | 
					
						
							|  |  |  |                 $this->pollTimeouts[$index] = 0; | 
					
						
							|  |  |  |                 $this->lastPoll[$index] = 0; | 
					
						
							|  |  |  |             } | 
					
						
							|  |  |  |         } | 
					
						
							|  |  |  |     } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     function finish() | 
					
						
							|  |  |  |     { | 
					
						
							|  |  |  |         foreach ($this->managers as $manager) { | 
					
						
							|  |  |  |             $manager->finish(); | 
					
						
							|  |  |  |             // @fixme error check
 | 
					
						
							|  |  |  |         } | 
					
						
							|  |  |  |     } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     /** | 
					
						
							|  |  |  |      * Called during the idle portion of the runloop to see which handlers | 
					
						
							|  |  |  |      */ | 
					
						
							|  |  |  |     function poll() | 
					
						
							|  |  |  |     { | 
					
						
							|  |  |  |         foreach ($this->managers as $index => $manager) { | 
					
						
							|  |  |  |             $interval = $manager->pollInterval(); | 
					
						
							|  |  |  |             if ($interval <= 0) { | 
					
						
							|  |  |  |                 // Not a polling manager.
 | 
					
						
							|  |  |  |                 continue; | 
					
						
							|  |  |  |             } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             if (isset($this->pollTimeouts[$index])) { | 
					
						
							|  |  |  |                 $timeout = $this->pollTimeouts[$index]; | 
					
						
							|  |  |  |                 if (time() - $this->lastPoll[$index] < $timeout) { | 
					
						
							|  |  |  |                     // Not time to poll yet.
 | 
					
						
							|  |  |  |                     continue; | 
					
						
							|  |  |  |                 } | 
					
						
							|  |  |  |             } else { | 
					
						
							|  |  |  |                 $timeout = 0; | 
					
						
							|  |  |  |             } | 
					
						
							|  |  |  |             $hit = $manager->poll(); | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             $this->lastPoll[$index] = time(); | 
					
						
							|  |  |  |             if ($hit) { | 
					
						
							|  |  |  |                 // Do the next poll quickly, there may be more input!
 | 
					
						
							|  |  |  |                 $this->pollTimeouts[$index] = 0; | 
					
						
							|  |  |  |             } else { | 
					
						
							|  |  |  |                 // Empty queue. Exponential backoff up to the maximum poll interval.
 | 
					
						
							|  |  |  |                 if ($timeout > 0) { | 
					
						
							|  |  |  |                     $timeout = min($timeout * 2, $interval); | 
					
						
							|  |  |  |                 } else { | 
					
						
							|  |  |  |                     $timeout = 1; | 
					
						
							|  |  |  |                 } | 
					
						
							|  |  |  |                 $this->pollTimeouts[$index] = $timeout; | 
					
						
							|  |  |  |             } | 
					
						
							|  |  |  |         } | 
					
						
							|  |  |  |     } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     /** | 
					
						
							|  |  |  |      * Called after each handled item or empty polling cycle. | 
					
						
							|  |  |  |      * This is a good time to e.g. service your XMPP connection. | 
					
						
							|  |  |  |      */ | 
					
						
							|  |  |  |     function idle() | 
					
						
							|  |  |  |     { | 
					
						
							|  |  |  |         foreach ($this->managers as $manager) { | 
					
						
							|  |  |  |             $manager->idle(); | 
					
						
							|  |  |  |         } | 
					
						
							|  |  |  |     } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     /** | 
					
						
							|  |  |  |      * Send thread state update to the monitoring server, if configured. | 
					
						
							|  |  |  |      * | 
					
						
							|  |  |  |      * @param string $state ('init', 'queue', 'shutdown' etc) | 
					
						
							|  |  |  |      * @param string $substate (optional, eg queue name 'omb' 'sms' etc) | 
					
						
							|  |  |  |      */ | 
					
						
							|  |  |  |     protected function logState($state, $substate='') | 
					
						
							|  |  |  |     { | 
					
						
							|  |  |  |         $this->monitor->logState($this->id, $state, $substate); | 
					
						
							|  |  |  |     } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     /** | 
					
						
							|  |  |  |      * Send thread stats. | 
					
						
							|  |  |  |      * Thread ID will be implicit; other owners can be listed as well | 
					
						
							|  |  |  |      * for per-queue and per-site records. | 
					
						
							|  |  |  |      * | 
					
						
							|  |  |  |      * @param string $key counter name | 
					
						
							|  |  |  |      * @param array $owners list of owner keys like 'queue:jabber' or 'site:stat01' | 
					
						
							|  |  |  |      */ | 
					
						
							|  |  |  |     public function stats($key, $owners=array()) | 
					
						
							|  |  |  |     { | 
					
						
							|  |  |  |         $owners[] = "thread:" . $this->id; | 
					
						
							|  |  |  |         $this->monitor->stats($key, $owners); | 
					
						
							|  |  |  |     } | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 |