362 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			PHP
		
	
	
	
	
	
		
		
			
		
	
	
			362 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			PHP
		
	
	
	
	
	
|   | <?php | ||
|  | /** | ||
|  |  * StatusNet, the distributed open-source microblogging tool | ||
|  |  * | ||
|  |  * I/O manager to wrap around socket-reading and polling queue & connection managers. | ||
|  |  * | ||
|  |  * PHP version 5 | ||
|  |  * | ||
|  |  * LICENCE: This program is free software: you can redistribute it and/or modify | ||
|  |  * it under the terms of the GNU Affero General Public License as published by | ||
|  |  * the Free Software Foundation, either version 3 of the License, or | ||
|  |  * (at your option) any later version. | ||
|  |  * | ||
|  |  * This program is distributed in the hope that it will be useful, | ||
|  |  * but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
|  |  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the | ||
|  |  * GNU Affero General Public License for more details. | ||
|  |  * | ||
|  |  * You should have received a copy of the GNU Affero General Public License | ||
|  |  * along with this program.  If not, see <http://www.gnu.org/licenses/>. | ||
|  |  * | ||
|  |  * @category  QueueManager | ||
|  |  * @package   StatusNet | ||
|  |  * @author    Brion Vibber <brion@status.net> | ||
|  |  * @copyright 2009 StatusNet, Inc. | ||
|  |  * @license   http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0 | ||
|  |  * @link      http://status.net/ | ||
|  |  */ | ||
|  | 
 | ||
|  | class IoMaster | ||
|  | { | ||
|  |     public $id; | ||
|  | 
 | ||
|  |     protected $multiSite = false; | ||
|  |     protected $managers = array(); | ||
|  |     protected $singletons = array(); | ||
|  | 
 | ||
|  |     protected $pollTimeouts = array(); | ||
|  |     protected $lastPoll = array(); | ||
|  | 
 | ||
|  |     /** | ||
|  |      * @param string $id process ID to use in logging/monitoring | ||
|  |      */ | ||
|  |     public function __construct($id) | ||
|  |     { | ||
|  |         $this->id = $id; | ||
|  |         $this->monitor = new QueueMonitor(); | ||
|  |     } | ||
|  | 
 | ||
|  |     public function init($multiSite=null) | ||
|  |     { | ||
|  |         if ($multiSite !== null) { | ||
|  |             $this->multiSite = $multiSite; | ||
|  |         } | ||
|  |         if ($this->multiSite) { | ||
|  |             $this->sites = $this->findAllSites(); | ||
|  |         } else { | ||
|  |             $this->sites = array(common_config('site', 'server')); | ||
|  |         } | ||
|  | 
 | ||
|  |         if (empty($this->sites)) { | ||
|  |             throw new Exception("Empty status_network table, cannot init"); | ||
|  |         } | ||
|  | 
 | ||
|  |         foreach ($this->sites as $site) { | ||
|  |             if ($site != common_config('site', 'server')) { | ||
|  |                 StatusNet::init($site); | ||
|  |             } | ||
|  | 
 | ||
|  |             $classes = array(); | ||
|  |             if (Event::handle('StartIoManagerClasses', array(&$classes))) { | ||
|  |                 $classes[] = 'QueueManager'; | ||
|  |                 if (common_config('xmpp', 'enabled')) { | ||
|  |                     $classes[] = 'XmppManager'; // handles pings/reconnects
 | ||
|  |                     $classes[] = 'XmppConfirmManager'; // polls for outgoing confirmations
 | ||
|  |                 } | ||
|  |             } | ||
|  |             Event::handle('EndIoManagerClasses', array(&$classes)); | ||
|  | 
 | ||
|  |             foreach ($classes as $class) { | ||
|  |                 $this->instantiate($class); | ||
|  |             } | ||
|  |         } | ||
|  |     } | ||
|  | 
 | ||
|  |     /** | ||
|  |      * Pull all local sites from status_network table. | ||
|  |      * @return array of hostnames | ||
|  |      */ | ||
|  |     protected function findAllSites() | ||
|  |     { | ||
|  |         $hosts = array(); | ||
|  |         $sn = new Status_network(); | ||
|  |         $sn->find(); | ||
|  |         while ($sn->fetch()) { | ||
|  |             $hosts[] = $sn->hostname; | ||
|  |         } | ||
|  |         return $hosts; | ||
|  |     } | ||
|  | 
 | ||
|  |     /** | ||
|  |      * Instantiate an i/o manager class for the current site. | ||
|  |      * If a multi-site capable handler is already present, | ||
|  |      * we don't need to build a new one. | ||
|  |      * | ||
|  |      * @param string $class | ||
|  |      */ | ||
|  |     protected function instantiate($class) | ||
|  |     { | ||
|  |         if (isset($this->singletons[$class])) { | ||
|  |             // Already instantiated a multi-site-capable handler.
 | ||
|  |             // Just let it know it should listen to this site too!
 | ||
|  |             $this->singletons[$class]->addSite(common_config('site', 'server')); | ||
|  |             return; | ||
|  |         } | ||
|  | 
 | ||
|  |         $manager = $this->getManager($class); | ||
|  | 
 | ||
|  |         if ($this->multiSite) { | ||
|  |             $caps = $manager->multiSite(); | ||
|  |             if ($caps == IoManager::SINGLE_ONLY) { | ||
|  |                 throw new Exception("$class can't run with --all; aborting."); | ||
|  |             } | ||
|  |             if ($caps == IoManager::INSTANCE_PER_PROCESS) { | ||
|  |                 // Save this guy for later!
 | ||
|  |                 // We'll only need the one to cover multiple sites.
 | ||
|  |                 $this->singletons[$class] = $manager; | ||
|  |                 $manager->addSite(common_config('site', 'server')); | ||
|  |             } | ||
|  |         } | ||
|  | 
 | ||
|  |         $this->managers[] = $manager; | ||
|  |     } | ||
|  |      | ||
|  |     protected function getManager($class) | ||
|  |     { | ||
|  |         return call_user_func(array($class, 'get')); | ||
|  |     } | ||
|  | 
 | ||
|  |     /** | ||
|  |      * Basic run loop... | ||
|  |      * | ||
|  |      * Initialize all io managers, then sit around waiting for input. | ||
|  |      * Between events or timeouts, pass control back to idle() method | ||
|  |      * to allow for any additional background processing. | ||
|  |      */ | ||
|  |     function service() | ||
|  |     { | ||
|  |         $this->logState('init'); | ||
|  |         $this->start(); | ||
|  | 
 | ||
|  |         while (true) { | ||
|  |             $timeouts = array_values($this->pollTimeouts); | ||
|  |             $timeouts[] = 60; // default max timeout
 | ||
|  | 
 | ||
|  |             // Wait for something on one of our sockets
 | ||
|  |             $sockets = array(); | ||
|  |             $managers = array(); | ||
|  |             foreach ($this->managers as $manager) { | ||
|  |                 foreach ($manager->getSockets() as $socket) { | ||
|  |                     $sockets[] = $socket; | ||
|  |                     $managers[] = $manager; | ||
|  |                 } | ||
|  |                 $timeouts[] = intval($manager->timeout()); | ||
|  |             } | ||
|  | 
 | ||
|  |             $timeout = min($timeouts); | ||
|  |             if ($sockets) { | ||
|  |                 $read = $sockets; | ||
|  |                 $write = array(); | ||
|  |                 $except = array(); | ||
|  |                 $this->logState('listening'); | ||
|  |                 common_log(LOG_INFO, "Waiting up to $timeout seconds for socket data..."); | ||
|  |                 $ready = stream_select($read, $write, $except, $timeout, 0); | ||
|  | 
 | ||
|  |                 if ($ready === false) { | ||
|  |                     common_log(LOG_ERR, "Error selecting on sockets"); | ||
|  |                 } else if ($ready > 0) { | ||
|  |                     foreach ($read as $socket) { | ||
|  |                         $index = array_search($socket, $sockets, true); | ||
|  |                         if ($index !== false) { | ||
|  |                             $this->logState('queue'); | ||
|  |                             $managers[$index]->handleInput($socket); | ||
|  |                         } else { | ||
|  |                             common_log(LOG_ERR, "Saw input on a socket we didn't listen to"); | ||
|  |                         } | ||
|  |                     } | ||
|  |                 } | ||
|  |             } | ||
|  | 
 | ||
|  |             if ($timeout > 0 && empty($sockets)) { | ||
|  |                 // If we had no listeners, sleep until the pollers' next requested wakeup.
 | ||
|  |                 common_log(LOG_INFO, "Sleeping $timeout seconds until next poll cycle..."); | ||
|  |                 $this->logState('sleep'); | ||
|  |                 sleep($timeout); | ||
|  |             } | ||
|  | 
 | ||
|  |             $this->logState('poll'); | ||
|  |             $this->poll(); | ||
|  | 
 | ||
|  |             $this->logState('idle'); | ||
|  |             $this->idle(); | ||
|  | 
 | ||
|  |             $memoryLimit = $this->softMemoryLimit(); | ||
|  |             if ($memoryLimit > 0) { | ||
|  |                 $usage = memory_get_usage(); | ||
|  |                 if ($usage > $memoryLimit) { | ||
|  |                     common_log(LOG_INFO, "Queue thread hit soft memory limit ($usage > $memoryLimit); gracefully restarting."); | ||
|  |                     break; | ||
|  |                 } | ||
|  |             } | ||
|  |         } | ||
|  | 
 | ||
|  |         $this->logState('shutdown'); | ||
|  |         $this->finish(); | ||
|  |     } | ||
|  | 
 | ||
|  |     /** | ||
|  |      * Return fully-parsed soft memory limit in bytes. | ||
|  |      * @return intval 0 or -1 if not set | ||
|  |      */ | ||
|  |     function softMemoryLimit() | ||
|  |     { | ||
|  |         $softLimit = trim(common_config('queue', 'softlimit')); | ||
|  |         if (substr($softLimit, -1) == '%') { | ||
|  |             $limit = trim(ini_get('memory_limit')); | ||
|  |             $limit = $this->parseMemoryLimit($limit); | ||
|  |             if ($limit > 0) { | ||
|  |                 return intval(substr($softLimit, 0, -1) * $limit / 100); | ||
|  |             } else { | ||
|  |                 return -1; | ||
|  |             } | ||
|  |         } else { | ||
|  |             return $this->parseMemoryLimit($limit); | ||
|  |         } | ||
|  |         return $softLimit; | ||
|  |     } | ||
|  | 
 | ||
|  |     /** | ||
|  |      * Interpret PHP shorthand for memory_limit and friends. | ||
|  |      * Why don't they just expose the actual numeric value? :P | ||
|  |      * @param string $mem | ||
|  |      * @return int | ||
|  |      */ | ||
|  |     protected function parseMemoryLimit($mem) | ||
|  |     { | ||
|  |         // http://www.php.net/manual/en/faq.using.php#faq.using.shorthandbytes
 | ||
|  |         $size = array('k' => 1024, | ||
|  |                       'm' => 1024*1024, | ||
|  |                       'g' => 1024*1024*1024); | ||
|  |         if (empty($mem)) { | ||
|  |             return 0; | ||
|  |         } else if (is_numeric($mem)) { | ||
|  |             return intval($mem); | ||
|  |         } else { | ||
|  |             $mult = strtolower(substr($mem, -1)); | ||
|  |             if (isset($size[$mult])) { | ||
|  |                 return substr($mem, 0, -1) * $size[$mult]; | ||
|  |             } else { | ||
|  |                 return intval($mem); | ||
|  |             } | ||
|  |         } | ||
|  |     } | ||
|  | 
 | ||
|  |     function start() | ||
|  |     { | ||
|  |         foreach ($this->managers as $index => $manager) { | ||
|  |             $manager->start($this); | ||
|  |             // @fixme error check
 | ||
|  |             if ($manager->pollInterval()) { | ||
|  |                 // We'll want to check for input on the first pass
 | ||
|  |                 $this->pollTimeouts[$index] = 0; | ||
|  |                 $this->lastPoll[$index] = 0; | ||
|  |             } | ||
|  |         } | ||
|  |     } | ||
|  | 
 | ||
|  |     function finish() | ||
|  |     { | ||
|  |         foreach ($this->managers as $manager) { | ||
|  |             $manager->finish(); | ||
|  |             // @fixme error check
 | ||
|  |         } | ||
|  |     } | ||
|  | 
 | ||
|  |     /** | ||
|  |      * Called during the idle portion of the runloop to see which handlers | ||
|  |      */ | ||
|  |     function poll() | ||
|  |     { | ||
|  |         foreach ($this->managers as $index => $manager) { | ||
|  |             $interval = $manager->pollInterval(); | ||
|  |             if ($interval <= 0) { | ||
|  |                 // Not a polling manager.
 | ||
|  |                 continue; | ||
|  |             } | ||
|  | 
 | ||
|  |             if (isset($this->pollTimeouts[$index])) { | ||
|  |                 $timeout = $this->pollTimeouts[$index]; | ||
|  |                 if (time() - $this->lastPoll[$index] < $timeout) { | ||
|  |                     // Not time to poll yet.
 | ||
|  |                     continue; | ||
|  |                 } | ||
|  |             } else { | ||
|  |                 $timeout = 0; | ||
|  |             } | ||
|  |             $hit = $manager->poll(); | ||
|  | 
 | ||
|  |             $this->lastPoll[$index] = time(); | ||
|  |             if ($hit) { | ||
|  |                 // Do the next poll quickly, there may be more input!
 | ||
|  |                 $this->pollTimeouts[$index] = 0; | ||
|  |             } else { | ||
|  |                 // Empty queue. Exponential backoff up to the maximum poll interval.
 | ||
|  |                 if ($timeout > 0) { | ||
|  |                     $timeout = min($timeout * 2, $interval); | ||
|  |                 } else { | ||
|  |                     $timeout = 1; | ||
|  |                 } | ||
|  |                 $this->pollTimeouts[$index] = $timeout; | ||
|  |             } | ||
|  |         } | ||
|  |     } | ||
|  | 
 | ||
|  |     /** | ||
|  |      * Called after each handled item or empty polling cycle. | ||
|  |      * This is a good time to e.g. service your XMPP connection. | ||
|  |      */ | ||
|  |     function idle() | ||
|  |     { | ||
|  |         foreach ($this->managers as $manager) { | ||
|  |             $manager->idle(); | ||
|  |         } | ||
|  |     } | ||
|  | 
 | ||
|  |     /** | ||
|  |      * Send thread state update to the monitoring server, if configured. | ||
|  |      * | ||
|  |      * @param string $state ('init', 'queue', 'shutdown' etc) | ||
|  |      * @param string $substate (optional, eg queue name 'omb' 'sms' etc) | ||
|  |      */ | ||
|  |     protected function logState($state, $substate='') | ||
|  |     { | ||
|  |         $this->monitor->logState($this->id, $state, $substate); | ||
|  |     } | ||
|  | 
 | ||
|  |     /** | ||
|  |      * Send thread stats. | ||
|  |      * Thread ID will be implicit; other owners can be listed as well | ||
|  |      * for per-queue and per-site records. | ||
|  |      * | ||
|  |      * @param string $key counter name | ||
|  |      * @param array $owners list of owner keys like 'queue:jabber' or 'site:stat01' | ||
|  |      */ | ||
|  |     public function stats($key, $owners=array()) | ||
|  |     { | ||
|  |         $owners[] = "thread:" . $this->id; | ||
|  |         $this->monitor->stats($key, $owners); | ||
|  |     } | ||
|  | } | ||
|  | 
 |