forked from GNUsocial/gnu-social
		
	
		
			
				
	
	
		
			361 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			PHP
		
	
	
	
	
	
			
		
		
	
	
			361 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			PHP
		
	
	
	
	
	
| <?php
 | |
| /**
 | |
|  * StatusNet, the distributed open-source microblogging tool
 | |
|  *
 | |
|  * I/O manager to wrap around socket-reading and polling queue & connection managers.
 | |
|  *
 | |
|  * PHP version 5
 | |
|  *
 | |
|  * LICENCE: This program is free software: you can redistribute it and/or modify
 | |
|  * it under the terms of the GNU Affero General Public License as published by
 | |
|  * the Free Software Foundation, either version 3 of the License, or
 | |
|  * (at your option) any later version.
 | |
|  *
 | |
|  * This program is distributed in the hope that it will be useful,
 | |
|  * but WITHOUT ANY WARRANTY; without even the implied warranty of
 | |
|  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | |
|  * GNU Affero General Public License for more details.
 | |
|  *
 | |
|  * You should have received a copy of the GNU Affero General Public License
 | |
|  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 | |
|  *
 | |
|  * @category  QueueManager
 | |
|  * @package   StatusNet
 | |
|  * @author    Brion Vibber <brion@status.net>
 | |
|  * @copyright 2009 StatusNet, Inc.
 | |
|  * @license   http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0
 | |
|  * @link      http://status.net/
 | |
|  */
 | |
| 
 | |
| abstract class IoMaster
 | |
| {
 | |
|     public $id;
 | |
| 
 | |
|     protected $multiSite = false;
 | |
|     protected $managers = array();
 | |
|     protected $singletons = array();
 | |
| 
 | |
|     protected $pollTimeouts = array();
 | |
|     protected $lastPoll = array();
 | |
| 
 | |
|     public $shutdown = false; // Did we do a graceful shutdown?
 | |
|     public $respawn = true; // Should we respawn after shutdown?
 | |
| 
 | |
|     /**
 | |
|      * @param string $id process ID to use in logging/monitoring
 | |
|      */
 | |
|     public function __construct($id)
 | |
|     {
 | |
|         $this->id = $id;
 | |
|         $this->monitor = new QueueMonitor();
 | |
|     }
 | |
| 
 | |
|     public function init($multiSite=null)
 | |
|     {
 | |
|         if ($multiSite !== null) {
 | |
|             $this->multiSite = $multiSite;
 | |
|         }
 | |
| 
 | |
|         $this->initManagers();
 | |
|     }
 | |
| 
 | |
|     /**
 | |
|      * Initialize IoManagers which are appropriate to this instance;
 | |
|      * pass class names or instances into $this->instantiate().
 | |
|      *
 | |
|      * If setup and configuration may vary between sites in multi-site
 | |
|      * mode, it's the subclass's responsibility to set them up here.
 | |
|      *
 | |
|      * Switching site configurations is an acceptable side effect.
 | |
|      */
 | |
|     abstract function initManagers();
 | |
| 
 | |
|     /**
 | |
|      * Instantiate an i/o manager class for the current site.
 | |
|      * If a multi-site capable handler is already present,
 | |
|      * we don't need to build a new one.
 | |
|      *
 | |
|      * @param mixed $manager class name (to run $class::get()) or object
 | |
|      */
 | |
|     protected function instantiate($manager)
 | |
|     {
 | |
|         if (is_string($manager)) {
 | |
|             $manager = call_user_func(array($class, 'get'));
 | |
|         }
 | |
| 
 | |
|         $caps = $manager->multiSite();
 | |
|         if ($caps == IoManager::SINGLE_ONLY) {
 | |
|             if ($this->multiSite) {
 | |
|                 throw new Exception("$class can't run with --all; aborting.");
 | |
|             }
 | |
|         } else if ($caps == IoManager::INSTANCE_PER_PROCESS) {
 | |
|             $manager->addSite();
 | |
|         }
 | |
| 
 | |
|         if (!in_array($manager, $this->managers, true)) {
 | |
|             // Only need to save singletons once
 | |
|             $this->managers[] = $manager;
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     /**
 | |
|      * Basic run loop...
 | |
|      *
 | |
|      * Initialize all io managers, then sit around waiting for input.
 | |
|      * Between events or timeouts, pass control back to idle() method
 | |
|      * to allow for any additional background processing.
 | |
|      */
 | |
|     function service()
 | |
|     {
 | |
|         $this->logState('init');
 | |
|         $this->start();
 | |
|         $this->checkMemory(false);
 | |
| 
 | |
|         while (!$this->shutdown) {
 | |
|             $timeouts = array_values($this->pollTimeouts);
 | |
|             $timeouts[] = 60; // default max timeout
 | |
| 
 | |
|             // Wait for something on one of our sockets
 | |
|             $sockets = array();
 | |
|             $managers = array();
 | |
|             foreach ($this->managers as $manager) {
 | |
|                 foreach ($manager->getSockets() as $socket) {
 | |
|                     $sockets[] = $socket;
 | |
|                     $managers[] = $manager;
 | |
|                 }
 | |
|                 $timeouts[] = intval($manager->timeout());
 | |
|             }
 | |
| 
 | |
|             $timeout = min($timeouts);
 | |
|             if ($sockets) {
 | |
|                 $read = $sockets;
 | |
|                 $write = array();
 | |
|                 $except = array();
 | |
|                 $this->logState('listening');
 | |
|                 common_log(LOG_DEBUG, "Waiting up to $timeout seconds for socket data...");
 | |
|                 $ready = stream_select($read, $write, $except, $timeout, 0);
 | |
| 
 | |
|                 if ($ready === false) {
 | |
|                     common_log(LOG_ERR, "Error selecting on sockets");
 | |
|                 } else if ($ready > 0) {
 | |
|                     foreach ($read as $socket) {
 | |
|                         $index = array_search($socket, $sockets, true);
 | |
|                         if ($index !== false) {
 | |
|                             $this->logState('queue');
 | |
|                             $managers[$index]->handleInput($socket);
 | |
|                         } else {
 | |
|                             common_log(LOG_ERR, "Saw input on a socket we didn't listen to");
 | |
|                         }
 | |
|                     }
 | |
|                 }
 | |
|             }
 | |
| 
 | |
|             if ($timeout > 0 && empty($sockets)) {
 | |
|                 // If we had no listeners, sleep until the pollers' next requested wakeup.
 | |
|                 common_log(LOG_DEBUG, "Sleeping $timeout seconds until next poll cycle...");
 | |
|                 $this->logState('sleep');
 | |
|                 sleep($timeout);
 | |
|             }
 | |
| 
 | |
|             $this->logState('poll');
 | |
|             $this->poll();
 | |
| 
 | |
|             $this->logState('idle');
 | |
|             $this->idle();
 | |
| 
 | |
|             $this->checkMemory();
 | |
|         }
 | |
| 
 | |
|         $this->logState('shutdown');
 | |
|         $this->finish();
 | |
|     }
 | |
| 
 | |
|     /**
 | |
|      * Check runtime memory usage, possibly triggering a graceful shutdown
 | |
|      * and thread respawn if we've crossed the soft limit.
 | |
|      *
 | |
|      * @param boolean $respawn if false we'll shut down instead of respawning
 | |
|      */
 | |
|     protected function checkMemory($respawn=true)
 | |
|     {
 | |
|         $memoryLimit = $this->softMemoryLimit();
 | |
|         if ($memoryLimit > 0) {
 | |
|             $usage = memory_get_usage();
 | |
|             if ($usage > $memoryLimit) {
 | |
|                 common_log(LOG_INFO, "Queue thread hit soft memory limit ($usage > $memoryLimit); gracefully restarting.");
 | |
|                 if ($respawn) {
 | |
|                     $this->requestRestart();
 | |
|                 } else {
 | |
|                     $this->requestShutdown();
 | |
|                 }
 | |
|             } else if (common_config('queue', 'debug_memory')) {
 | |
|                 $fmt = number_format($usage);
 | |
|                 common_log(LOG_DEBUG, "Memory usage $fmt");
 | |
|             }
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     /**
 | |
|      * Return fully-parsed soft memory limit in bytes.
 | |
|      * @return intval 0 or -1 if not set
 | |
|      */
 | |
|     function softMemoryLimit()
 | |
|     {
 | |
|         $softLimit = trim(common_config('queue', 'softlimit'));
 | |
|         if (substr($softLimit, -1) == '%') {
 | |
|             $limit = $this->parseMemoryLimit(ini_get('memory_limit'));
 | |
|             if ($limit > 0) {
 | |
|                 return intval(substr($softLimit, 0, -1) * $limit / 100);
 | |
|             } else {
 | |
|                 return -1;
 | |
|             }
 | |
|         } else {
 | |
|             return $this->parseMemoryLimit($softLimit);
 | |
|         }
 | |
|         return $softLimit;
 | |
|     }
 | |
| 
 | |
|     /**
 | |
|      * Interpret PHP shorthand for memory_limit and friends.
 | |
|      * Why don't they just expose the actual numeric value? :P
 | |
|      * @param string $mem
 | |
|      * @return int
 | |
|      */
 | |
|     public function parseMemoryLimit($mem)
 | |
|     {
 | |
|         // http://www.php.net/manual/en/faq.using.php#faq.using.shorthandbytes
 | |
|         $mem = strtolower(trim($mem));
 | |
|         $size = array('k' => 1024,
 | |
|                       'm' => 1024*1024,
 | |
|                       'g' => 1024*1024*1024);
 | |
|         if (empty($mem)) {
 | |
|             return 0;
 | |
|         } else if (is_numeric($mem)) {
 | |
|             return intval($mem);
 | |
|         } else {
 | |
|             $mult = substr($mem, -1);
 | |
|             if (isset($size[$mult])) {
 | |
|                 return substr($mem, 0, -1) * $size[$mult];
 | |
|             } else {
 | |
|                 return intval($mem);
 | |
|             }
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     function start()
 | |
|     {
 | |
|         foreach ($this->managers as $index => $manager) {
 | |
|             $manager->start($this);
 | |
|             // @fixme error check
 | |
|             if ($manager->pollInterval()) {
 | |
|                 // We'll want to check for input on the first pass
 | |
|                 $this->pollTimeouts[$index] = 0;
 | |
|                 $this->lastPoll[$index] = 0;
 | |
|             }
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     function finish()
 | |
|     {
 | |
|         foreach ($this->managers as $manager) {
 | |
|             $manager->finish();
 | |
|             // @fixme error check
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     /**
 | |
|      * Called during the idle portion of the runloop to see which handlers
 | |
|      */
 | |
|     function poll()
 | |
|     {
 | |
|         foreach ($this->managers as $index => $manager) {
 | |
|             $interval = $manager->pollInterval();
 | |
|             if ($interval <= 0) {
 | |
|                 // Not a polling manager.
 | |
|                 continue;
 | |
|             }
 | |
| 
 | |
|             if (isset($this->pollTimeouts[$index])) {
 | |
|                 $timeout = $this->pollTimeouts[$index];
 | |
|                 if (time() - $this->lastPoll[$index] < $timeout) {
 | |
|                     // Not time to poll yet.
 | |
|                     continue;
 | |
|                 }
 | |
|             } else {
 | |
|                 $timeout = 0;
 | |
|             }
 | |
|             $hit = $manager->poll();
 | |
| 
 | |
|             $this->lastPoll[$index] = time();
 | |
|             if ($hit) {
 | |
|                 // Do the next poll quickly, there may be more input!
 | |
|                 $this->pollTimeouts[$index] = 0;
 | |
|             } else {
 | |
|                 // Empty queue. Exponential backoff up to the maximum poll interval.
 | |
|                 if ($timeout > 0) {
 | |
|                     $timeout = min($timeout * 2, $interval);
 | |
|                 } else {
 | |
|                     $timeout = 1;
 | |
|                 }
 | |
|                 $this->pollTimeouts[$index] = $timeout;
 | |
|             }
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     /**
 | |
|      * Called after each handled item or empty polling cycle.
 | |
|      * This is a good time to e.g. service your XMPP connection.
 | |
|      */
 | |
|     function idle()
 | |
|     {
 | |
|         foreach ($this->managers as $manager) {
 | |
|             $manager->idle();
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     /**
 | |
|      * Send thread state update to the monitoring server, if configured.
 | |
|      *
 | |
|      * @param string $state ('init', 'queue', 'shutdown' etc)
 | |
|      * @param string $substate (optional, eg queue name 'omb' 'sms' etc)
 | |
|      */
 | |
|     protected function logState($state, $substate='')
 | |
|     {
 | |
|         $this->monitor->logState($this->id, $state, $substate);
 | |
|     }
 | |
| 
 | |
|     /**
 | |
|      * Send thread stats.
 | |
|      * Thread ID will be implicit; other owners can be listed as well
 | |
|      * for per-queue and per-site records.
 | |
|      *
 | |
|      * @param string $key counter name
 | |
|      * @param array $owners list of owner keys like 'queue:xmpp' or 'site:stat01'
 | |
|      */
 | |
|     public function stats($key, $owners=array())
 | |
|     {
 | |
|         $owners[] = "thread:" . $this->id;
 | |
|         $this->monitor->stats($key, $owners);
 | |
|     }
 | |
| 
 | |
|     /**
 | |
|      * For IoManagers to request a graceful shutdown at end of event loop.
 | |
|      */
 | |
|     public function requestShutdown()
 | |
|     {
 | |
|         $this->shutdown = true;
 | |
|         $this->respawn = false;
 | |
|     }
 | |
| 
 | |
|     /**
 | |
|      * For IoManagers to request a graceful restart at end of event loop.
 | |
|      */
 | |
|     public function requestRestart()
 | |
|     {
 | |
|         $this->shutdown = true;
 | |
|         $this->respawn = true;
 | |
|     }
 | |
| 
 | |
| }
 | |
| 
 |