362 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			PHP
		
	
	
	
	
	
		
		
			
		
	
	
			362 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			PHP
		
	
	
	
	
	
| 
								 | 
							
								<?php
							 | 
						||
| 
								 | 
							
								/**
							 | 
						||
| 
								 | 
							
								 * StatusNet, the distributed open-source microblogging tool
							 | 
						||
| 
								 | 
							
								 *
							 | 
						||
| 
								 | 
							
								 * I/O manager to wrap around socket-reading and polling queue & connection managers.
							 | 
						||
| 
								 | 
							
								 *
							 | 
						||
| 
								 | 
							
								 * PHP version 5
							 | 
						||
| 
								 | 
							
								 *
							 | 
						||
| 
								 | 
							
								 * LICENCE: This program is free software: you can redistribute it and/or modify
							 | 
						||
| 
								 | 
							
								 * it under the terms of the GNU Affero General Public License as published by
							 | 
						||
| 
								 | 
							
								 * the Free Software Foundation, either version 3 of the License, or
							 | 
						||
| 
								 | 
							
								 * (at your option) any later version.
							 | 
						||
| 
								 | 
							
								 *
							 | 
						||
| 
								 | 
							
								 * This program is distributed in the hope that it will be useful,
							 | 
						||
| 
								 | 
							
								 * but WITHOUT ANY WARRANTY; without even the implied warranty of
							 | 
						||
| 
								 | 
							
								 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
							 | 
						||
| 
								 | 
							
								 * GNU Affero General Public License for more details.
							 | 
						||
| 
								 | 
							
								 *
							 | 
						||
| 
								 | 
							
								 * You should have received a copy of the GNU Affero General Public License
							 | 
						||
| 
								 | 
							
								 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
							 | 
						||
| 
								 | 
							
								 *
							 | 
						||
| 
								 | 
							
								 * @category  QueueManager
							 | 
						||
| 
								 | 
							
								 * @package   StatusNet
							 | 
						||
| 
								 | 
							
								 * @author    Brion Vibber <brion@status.net>
							 | 
						||
| 
								 | 
							
								 * @copyright 2009 StatusNet, Inc.
							 | 
						||
| 
								 | 
							
								 * @license   http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0
							 | 
						||
| 
								 | 
							
								 * @link      http://status.net/
							 | 
						||
| 
								 | 
							
								 */
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								class IoMaster
							 | 
						||
| 
								 | 
							
								{
							 | 
						||
| 
								 | 
							
								    public $id;
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    protected $multiSite = false;
							 | 
						||
| 
								 | 
							
								    protected $managers = array();
							 | 
						||
| 
								 | 
							
								    protected $singletons = array();
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    protected $pollTimeouts = array();
							 | 
						||
| 
								 | 
							
								    protected $lastPoll = array();
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    /**
							 | 
						||
| 
								 | 
							
								     * @param string $id process ID to use in logging/monitoring
							 | 
						||
| 
								 | 
							
								     */
							 | 
						||
| 
								 | 
							
								    public function __construct($id)
							 | 
						||
| 
								 | 
							
								    {
							 | 
						||
| 
								 | 
							
								        $this->id = $id;
							 | 
						||
| 
								 | 
							
								        $this->monitor = new QueueMonitor();
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    public function init($multiSite=null)
							 | 
						||
| 
								 | 
							
								    {
							 | 
						||
| 
								 | 
							
								        if ($multiSite !== null) {
							 | 
						||
| 
								 | 
							
								            $this->multiSite = $multiSite;
							 | 
						||
| 
								 | 
							
								        }
							 | 
						||
| 
								 | 
							
								        if ($this->multiSite) {
							 | 
						||
| 
								 | 
							
								            $this->sites = $this->findAllSites();
							 | 
						||
| 
								 | 
							
								        } else {
							 | 
						||
| 
								 | 
							
								            $this->sites = array(common_config('site', 'server'));
							 | 
						||
| 
								 | 
							
								        }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        if (empty($this->sites)) {
							 | 
						||
| 
								 | 
							
								            throw new Exception("Empty status_network table, cannot init");
							 | 
						||
| 
								 | 
							
								        }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        foreach ($this->sites as $site) {
							 | 
						||
| 
								 | 
							
								            if ($site != common_config('site', 'server')) {
							 | 
						||
| 
								 | 
							
								                StatusNet::init($site);
							 | 
						||
| 
								 | 
							
								            }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								            $classes = array();
							 | 
						||
| 
								 | 
							
								            if (Event::handle('StartIoManagerClasses', array(&$classes))) {
							 | 
						||
| 
								 | 
							
								                $classes[] = 'QueueManager';
							 | 
						||
| 
								 | 
							
								                if (common_config('xmpp', 'enabled')) {
							 | 
						||
| 
								 | 
							
								                    $classes[] = 'XmppManager'; // handles pings/reconnects
							 | 
						||
| 
								 | 
							
								                    $classes[] = 'XmppConfirmManager'; // polls for outgoing confirmations
							 | 
						||
| 
								 | 
							
								                }
							 | 
						||
| 
								 | 
							
								            }
							 | 
						||
| 
								 | 
							
								            Event::handle('EndIoManagerClasses', array(&$classes));
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								            foreach ($classes as $class) {
							 | 
						||
| 
								 | 
							
								                $this->instantiate($class);
							 | 
						||
| 
								 | 
							
								            }
							 | 
						||
| 
								 | 
							
								        }
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    /**
							 | 
						||
| 
								 | 
							
								     * Pull all local sites from status_network table.
							 | 
						||
| 
								 | 
							
								     * @return array of hostnames
							 | 
						||
| 
								 | 
							
								     */
							 | 
						||
| 
								 | 
							
								    protected function findAllSites()
							 | 
						||
| 
								 | 
							
								    {
							 | 
						||
| 
								 | 
							
								        $hosts = array();
							 | 
						||
| 
								 | 
							
								        $sn = new Status_network();
							 | 
						||
| 
								 | 
							
								        $sn->find();
							 | 
						||
| 
								 | 
							
								        while ($sn->fetch()) {
							 | 
						||
| 
								 | 
							
								            $hosts[] = $sn->hostname;
							 | 
						||
| 
								 | 
							
								        }
							 | 
						||
| 
								 | 
							
								        return $hosts;
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    /**
							 | 
						||
| 
								 | 
							
								     * Instantiate an i/o manager class for the current site.
							 | 
						||
| 
								 | 
							
								     * If a multi-site capable handler is already present,
							 | 
						||
| 
								 | 
							
								     * we don't need to build a new one.
							 | 
						||
| 
								 | 
							
								     *
							 | 
						||
| 
								 | 
							
								     * @param string $class
							 | 
						||
| 
								 | 
							
								     */
							 | 
						||
| 
								 | 
							
								    protected function instantiate($class)
							 | 
						||
| 
								 | 
							
								    {
							 | 
						||
| 
								 | 
							
								        if (isset($this->singletons[$class])) {
							 | 
						||
| 
								 | 
							
								            // Already instantiated a multi-site-capable handler.
							 | 
						||
| 
								 | 
							
								            // Just let it know it should listen to this site too!
							 | 
						||
| 
								 | 
							
								            $this->singletons[$class]->addSite(common_config('site', 'server'));
							 | 
						||
| 
								 | 
							
								            return;
							 | 
						||
| 
								 | 
							
								        }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        $manager = $this->getManager($class);
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        if ($this->multiSite) {
							 | 
						||
| 
								 | 
							
								            $caps = $manager->multiSite();
							 | 
						||
| 
								 | 
							
								            if ($caps == IoManager::SINGLE_ONLY) {
							 | 
						||
| 
								 | 
							
								                throw new Exception("$class can't run with --all; aborting.");
							 | 
						||
| 
								 | 
							
								            }
							 | 
						||
| 
								 | 
							
								            if ($caps == IoManager::INSTANCE_PER_PROCESS) {
							 | 
						||
| 
								 | 
							
								                // Save this guy for later!
							 | 
						||
| 
								 | 
							
								                // We'll only need the one to cover multiple sites.
							 | 
						||
| 
								 | 
							
								                $this->singletons[$class] = $manager;
							 | 
						||
| 
								 | 
							
								                $manager->addSite(common_config('site', 'server'));
							 | 
						||
| 
								 | 
							
								            }
							 | 
						||
| 
								 | 
							
								        }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        $this->managers[] = $manager;
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								    
							 | 
						||
| 
								 | 
							
								    protected function getManager($class)
							 | 
						||
| 
								 | 
							
								    {
							 | 
						||
| 
								 | 
							
								        return call_user_func(array($class, 'get'));
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    /**
							 | 
						||
| 
								 | 
							
								     * Basic run loop...
							 | 
						||
| 
								 | 
							
								     *
							 | 
						||
| 
								 | 
							
								     * Initialize all io managers, then sit around waiting for input.
							 | 
						||
| 
								 | 
							
								     * Between events or timeouts, pass control back to idle() method
							 | 
						||
| 
								 | 
							
								     * to allow for any additional background processing.
							 | 
						||
| 
								 | 
							
								     */
							 | 
						||
| 
								 | 
							
								    function service()
							 | 
						||
| 
								 | 
							
								    {
							 | 
						||
| 
								 | 
							
								        $this->logState('init');
							 | 
						||
| 
								 | 
							
								        $this->start();
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        while (true) {
							 | 
						||
| 
								 | 
							
								            $timeouts = array_values($this->pollTimeouts);
							 | 
						||
| 
								 | 
							
								            $timeouts[] = 60; // default max timeout
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								            // Wait for something on one of our sockets
							 | 
						||
| 
								 | 
							
								            $sockets = array();
							 | 
						||
| 
								 | 
							
								            $managers = array();
							 | 
						||
| 
								 | 
							
								            foreach ($this->managers as $manager) {
							 | 
						||
| 
								 | 
							
								                foreach ($manager->getSockets() as $socket) {
							 | 
						||
| 
								 | 
							
								                    $sockets[] = $socket;
							 | 
						||
| 
								 | 
							
								                    $managers[] = $manager;
							 | 
						||
| 
								 | 
							
								                }
							 | 
						||
| 
								 | 
							
								                $timeouts[] = intval($manager->timeout());
							 | 
						||
| 
								 | 
							
								            }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								            $timeout = min($timeouts);
							 | 
						||
| 
								 | 
							
								            if ($sockets) {
							 | 
						||
| 
								 | 
							
								                $read = $sockets;
							 | 
						||
| 
								 | 
							
								                $write = array();
							 | 
						||
| 
								 | 
							
								                $except = array();
							 | 
						||
| 
								 | 
							
								                $this->logState('listening');
							 | 
						||
| 
								 | 
							
								                common_log(LOG_INFO, "Waiting up to $timeout seconds for socket data...");
							 | 
						||
| 
								 | 
							
								                $ready = stream_select($read, $write, $except, $timeout, 0);
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								                if ($ready === false) {
							 | 
						||
| 
								 | 
							
								                    common_log(LOG_ERR, "Error selecting on sockets");
							 | 
						||
| 
								 | 
							
								                } else if ($ready > 0) {
							 | 
						||
| 
								 | 
							
								                    foreach ($read as $socket) {
							 | 
						||
| 
								 | 
							
								                        $index = array_search($socket, $sockets, true);
							 | 
						||
| 
								 | 
							
								                        if ($index !== false) {
							 | 
						||
| 
								 | 
							
								                            $this->logState('queue');
							 | 
						||
| 
								 | 
							
								                            $managers[$index]->handleInput($socket);
							 | 
						||
| 
								 | 
							
								                        } else {
							 | 
						||
| 
								 | 
							
								                            common_log(LOG_ERR, "Saw input on a socket we didn't listen to");
							 | 
						||
| 
								 | 
							
								                        }
							 | 
						||
| 
								 | 
							
								                    }
							 | 
						||
| 
								 | 
							
								                }
							 | 
						||
| 
								 | 
							
								            }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								            if ($timeout > 0 && empty($sockets)) {
							 | 
						||
| 
								 | 
							
								                // If we had no listeners, sleep until the pollers' next requested wakeup.
							 | 
						||
| 
								 | 
							
								                common_log(LOG_INFO, "Sleeping $timeout seconds until next poll cycle...");
							 | 
						||
| 
								 | 
							
								                $this->logState('sleep');
							 | 
						||
| 
								 | 
							
								                sleep($timeout);
							 | 
						||
| 
								 | 
							
								            }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								            $this->logState('poll');
							 | 
						||
| 
								 | 
							
								            $this->poll();
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								            $this->logState('idle');
							 | 
						||
| 
								 | 
							
								            $this->idle();
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								            $memoryLimit = $this->softMemoryLimit();
							 | 
						||
| 
								 | 
							
								            if ($memoryLimit > 0) {
							 | 
						||
| 
								 | 
							
								                $usage = memory_get_usage();
							 | 
						||
| 
								 | 
							
								                if ($usage > $memoryLimit) {
							 | 
						||
| 
								 | 
							
								                    common_log(LOG_INFO, "Queue thread hit soft memory limit ($usage > $memoryLimit); gracefully restarting.");
							 | 
						||
| 
								 | 
							
								                    break;
							 | 
						||
| 
								 | 
							
								                }
							 | 
						||
| 
								 | 
							
								            }
							 | 
						||
| 
								 | 
							
								        }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        $this->logState('shutdown');
							 | 
						||
| 
								 | 
							
								        $this->finish();
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    /**
							 | 
						||
| 
								 | 
							
								     * Return fully-parsed soft memory limit in bytes.
							 | 
						||
| 
								 | 
							
								     * @return intval 0 or -1 if not set
							 | 
						||
| 
								 | 
							
								     */
							 | 
						||
| 
								 | 
							
								    function softMemoryLimit()
							 | 
						||
| 
								 | 
							
								    {
							 | 
						||
| 
								 | 
							
								        $softLimit = trim(common_config('queue', 'softlimit'));
							 | 
						||
| 
								 | 
							
								        if (substr($softLimit, -1) == '%') {
							 | 
						||
| 
								 | 
							
								            $limit = trim(ini_get('memory_limit'));
							 | 
						||
| 
								 | 
							
								            $limit = $this->parseMemoryLimit($limit);
							 | 
						||
| 
								 | 
							
								            if ($limit > 0) {
							 | 
						||
| 
								 | 
							
								                return intval(substr($softLimit, 0, -1) * $limit / 100);
							 | 
						||
| 
								 | 
							
								            } else {
							 | 
						||
| 
								 | 
							
								                return -1;
							 | 
						||
| 
								 | 
							
								            }
							 | 
						||
| 
								 | 
							
								        } else {
							 | 
						||
| 
								 | 
							
								            return $this->parseMemoryLimit($limit);
							 | 
						||
| 
								 | 
							
								        }
							 | 
						||
| 
								 | 
							
								        return $softLimit;
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    /**
							 | 
						||
| 
								 | 
							
								     * Interpret PHP shorthand for memory_limit and friends.
							 | 
						||
| 
								 | 
							
								     * Why don't they just expose the actual numeric value? :P
							 | 
						||
| 
								 | 
							
								     * @param string $mem
							 | 
						||
| 
								 | 
							
								     * @return int
							 | 
						||
| 
								 | 
							
								     */
							 | 
						||
| 
								 | 
							
								    protected function parseMemoryLimit($mem)
							 | 
						||
| 
								 | 
							
								    {
							 | 
						||
| 
								 | 
							
								        // http://www.php.net/manual/en/faq.using.php#faq.using.shorthandbytes
							 | 
						||
| 
								 | 
							
								        $size = array('k' => 1024,
							 | 
						||
| 
								 | 
							
								                      'm' => 1024*1024,
							 | 
						||
| 
								 | 
							
								                      'g' => 1024*1024*1024);
							 | 
						||
| 
								 | 
							
								        if (empty($mem)) {
							 | 
						||
| 
								 | 
							
								            return 0;
							 | 
						||
| 
								 | 
							
								        } else if (is_numeric($mem)) {
							 | 
						||
| 
								 | 
							
								            return intval($mem);
							 | 
						||
| 
								 | 
							
								        } else {
							 | 
						||
| 
								 | 
							
								            $mult = strtolower(substr($mem, -1));
							 | 
						||
| 
								 | 
							
								            if (isset($size[$mult])) {
							 | 
						||
| 
								 | 
							
								                return substr($mem, 0, -1) * $size[$mult];
							 | 
						||
| 
								 | 
							
								            } else {
							 | 
						||
| 
								 | 
							
								                return intval($mem);
							 | 
						||
| 
								 | 
							
								            }
							 | 
						||
| 
								 | 
							
								        }
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    function start()
							 | 
						||
| 
								 | 
							
								    {
							 | 
						||
| 
								 | 
							
								        foreach ($this->managers as $index => $manager) {
							 | 
						||
| 
								 | 
							
								            $manager->start($this);
							 | 
						||
| 
								 | 
							
								            // @fixme error check
							 | 
						||
| 
								 | 
							
								            if ($manager->pollInterval()) {
							 | 
						||
| 
								 | 
							
								                // We'll want to check for input on the first pass
							 | 
						||
| 
								 | 
							
								                $this->pollTimeouts[$index] = 0;
							 | 
						||
| 
								 | 
							
								                $this->lastPoll[$index] = 0;
							 | 
						||
| 
								 | 
							
								            }
							 | 
						||
| 
								 | 
							
								        }
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    function finish()
							 | 
						||
| 
								 | 
							
								    {
							 | 
						||
| 
								 | 
							
								        foreach ($this->managers as $manager) {
							 | 
						||
| 
								 | 
							
								            $manager->finish();
							 | 
						||
| 
								 | 
							
								            // @fixme error check
							 | 
						||
| 
								 | 
							
								        }
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    /**
							 | 
						||
| 
								 | 
							
								     * Called during the idle portion of the runloop to see which handlers
							 | 
						||
| 
								 | 
							
								     */
							 | 
						||
| 
								 | 
							
								    function poll()
							 | 
						||
| 
								 | 
							
								    {
							 | 
						||
| 
								 | 
							
								        foreach ($this->managers as $index => $manager) {
							 | 
						||
| 
								 | 
							
								            $interval = $manager->pollInterval();
							 | 
						||
| 
								 | 
							
								            if ($interval <= 0) {
							 | 
						||
| 
								 | 
							
								                // Not a polling manager.
							 | 
						||
| 
								 | 
							
								                continue;
							 | 
						||
| 
								 | 
							
								            }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								            if (isset($this->pollTimeouts[$index])) {
							 | 
						||
| 
								 | 
							
								                $timeout = $this->pollTimeouts[$index];
							 | 
						||
| 
								 | 
							
								                if (time() - $this->lastPoll[$index] < $timeout) {
							 | 
						||
| 
								 | 
							
								                    // Not time to poll yet.
							 | 
						||
| 
								 | 
							
								                    continue;
							 | 
						||
| 
								 | 
							
								                }
							 | 
						||
| 
								 | 
							
								            } else {
							 | 
						||
| 
								 | 
							
								                $timeout = 0;
							 | 
						||
| 
								 | 
							
								            }
							 | 
						||
| 
								 | 
							
								            $hit = $manager->poll();
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								            $this->lastPoll[$index] = time();
							 | 
						||
| 
								 | 
							
								            if ($hit) {
							 | 
						||
| 
								 | 
							
								                // Do the next poll quickly, there may be more input!
							 | 
						||
| 
								 | 
							
								                $this->pollTimeouts[$index] = 0;
							 | 
						||
| 
								 | 
							
								            } else {
							 | 
						||
| 
								 | 
							
								                // Empty queue. Exponential backoff up to the maximum poll interval.
							 | 
						||
| 
								 | 
							
								                if ($timeout > 0) {
							 | 
						||
| 
								 | 
							
								                    $timeout = min($timeout * 2, $interval);
							 | 
						||
| 
								 | 
							
								                } else {
							 | 
						||
| 
								 | 
							
								                    $timeout = 1;
							 | 
						||
| 
								 | 
							
								                }
							 | 
						||
| 
								 | 
							
								                $this->pollTimeouts[$index] = $timeout;
							 | 
						||
| 
								 | 
							
								            }
							 | 
						||
| 
								 | 
							
								        }
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    /**
							 | 
						||
| 
								 | 
							
								     * Called after each handled item or empty polling cycle.
							 | 
						||
| 
								 | 
							
								     * This is a good time to e.g. service your XMPP connection.
							 | 
						||
| 
								 | 
							
								     */
							 | 
						||
| 
								 | 
							
								    function idle()
							 | 
						||
| 
								 | 
							
								    {
							 | 
						||
| 
								 | 
							
								        foreach ($this->managers as $manager) {
							 | 
						||
| 
								 | 
							
								            $manager->idle();
							 | 
						||
| 
								 | 
							
								        }
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    /**
							 | 
						||
| 
								 | 
							
								     * Send thread state update to the monitoring server, if configured.
							 | 
						||
| 
								 | 
							
								     *
							 | 
						||
| 
								 | 
							
								     * @param string $state ('init', 'queue', 'shutdown' etc)
							 | 
						||
| 
								 | 
							
								     * @param string $substate (optional, eg queue name 'omb' 'sms' etc)
							 | 
						||
| 
								 | 
							
								     */
							 | 
						||
| 
								 | 
							
								    protected function logState($state, $substate='')
							 | 
						||
| 
								 | 
							
								    {
							 | 
						||
| 
								 | 
							
								        $this->monitor->logState($this->id, $state, $substate);
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    /**
							 | 
						||
| 
								 | 
							
								     * Send thread stats.
							 | 
						||
| 
								 | 
							
								     * Thread ID will be implicit; other owners can be listed as well
							 | 
						||
| 
								 | 
							
								     * for per-queue and per-site records.
							 | 
						||
| 
								 | 
							
								     *
							 | 
						||
| 
								 | 
							
								     * @param string $key counter name
							 | 
						||
| 
								 | 
							
								     * @param array $owners list of owner keys like 'queue:jabber' or 'site:stat01'
							 | 
						||
| 
								 | 
							
								     */
							 | 
						||
| 
								 | 
							
								    public function stats($key, $owners=array())
							 | 
						||
| 
								 | 
							
								    {
							 | 
						||
| 
								 | 
							
								        $owners[] = "thread:" . $this->id;
							 | 
						||
| 
								 | 
							
								        $this->monitor->stats($key, $owners);
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 |