ec145b73fc
Key changes: * Initialization code moved from common.php to StatusNet class; can now switch configurations during runtime. * As a consequence, configuration files must now be idempotent... Be careful with constant, function or class definitions. * Control structure for daemons/QueueManager/QueueHandler has been refactored; the run loop is now managed by IoMaster run via scripts/queuedaemon.php IoManager subclasses are woken to handle socket input or polling, and may cover multiple sites. * Plugins can implement notice queue handlers more easily by registering a QueueHandler class; no more need to add a daemon. The new QueueDaemon runs from scripts/queuedaemon.php: * This replaces most of the old *handler.php scripts; they've been refactored to the bare handler classes. * Spawns multiple child processes to spread load; defaults to CPU count on Linux and Mac OS X systems, or override with --threads=N * When multithreaded, child processes are automatically respawned on failure. * Threads gracefully shut down and restart when passing a soft memory limit (defaults to 90% of memory_limit), limiting damage from memory leaks. * Support for UDP-based monitoring: http://www.gitorious.org/snqmon Rough control flow diagram: QueueDaemon -> IoMaster -> IoManager QueueManager [listen or poll] -> QueueHandler XmppManager [ping & keepalive] XmppConfirmManager [poll updates] Todo: * Respawning features not currently available running single-threaded. * When running single-site, configuration changes aren't picked up. * New sites or config changes affecting queue subscriptions are not yet handled without a daemon restart. * SNMP monitoring output to integrate with general tools (nagios, ganglia) * Convert XMPP confirmation message sends to use stomp queue instead of polling * Convert xmppdaemon.php to IoManager? * Convert Twitter status, friends import polling daemons to IoManager * Clean up some error reporting and failure modes * May need to adjust queue priorities for best perf in backlog/flood cases Detailed code history available in my daemon-work branch: http://www.gitorious.org/~brion/statusnet/brion-fixes/commits/daemon-work
362 lines
11 KiB
PHP
362 lines
11 KiB
PHP
<?php
|
|
/**
|
|
* StatusNet, the distributed open-source microblogging tool
|
|
*
|
|
* I/O manager to wrap around socket-reading and polling queue & connection managers.
|
|
*
|
|
* PHP version 5
|
|
*
|
|
* LICENCE: This program is free software: you can redistribute it and/or modify
|
|
* it under the terms of the GNU Affero General Public License as published by
|
|
* the Free Software Foundation, either version 3 of the License, or
|
|
* (at your option) any later version.
|
|
*
|
|
* This program is distributed in the hope that it will be useful,
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
* GNU Affero General Public License for more details.
|
|
*
|
|
* You should have received a copy of the GNU Affero General Public License
|
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
*
|
|
* @category QueueManager
|
|
* @package StatusNet
|
|
* @author Brion Vibber <brion@status.net>
|
|
* @copyright 2009 StatusNet, Inc.
|
|
* @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0
|
|
* @link http://status.net/
|
|
*/
|
|
|
|
class IoMaster
|
|
{
|
|
public $id;
|
|
|
|
protected $multiSite = false;
|
|
protected $managers = array();
|
|
protected $singletons = array();
|
|
|
|
protected $pollTimeouts = array();
|
|
protected $lastPoll = array();
|
|
|
|
/**
|
|
* @param string $id process ID to use in logging/monitoring
|
|
*/
|
|
public function __construct($id)
|
|
{
|
|
$this->id = $id;
|
|
$this->monitor = new QueueMonitor();
|
|
}
|
|
|
|
public function init($multiSite=null)
|
|
{
|
|
if ($multiSite !== null) {
|
|
$this->multiSite = $multiSite;
|
|
}
|
|
if ($this->multiSite) {
|
|
$this->sites = $this->findAllSites();
|
|
} else {
|
|
$this->sites = array(common_config('site', 'server'));
|
|
}
|
|
|
|
if (empty($this->sites)) {
|
|
throw new Exception("Empty status_network table, cannot init");
|
|
}
|
|
|
|
foreach ($this->sites as $site) {
|
|
if ($site != common_config('site', 'server')) {
|
|
StatusNet::init($site);
|
|
}
|
|
|
|
$classes = array();
|
|
if (Event::handle('StartIoManagerClasses', array(&$classes))) {
|
|
$classes[] = 'QueueManager';
|
|
if (common_config('xmpp', 'enabled')) {
|
|
$classes[] = 'XmppManager'; // handles pings/reconnects
|
|
$classes[] = 'XmppConfirmManager'; // polls for outgoing confirmations
|
|
}
|
|
}
|
|
Event::handle('EndIoManagerClasses', array(&$classes));
|
|
|
|
foreach ($classes as $class) {
|
|
$this->instantiate($class);
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Pull all local sites from status_network table.
|
|
* @return array of hostnames
|
|
*/
|
|
protected function findAllSites()
|
|
{
|
|
$hosts = array();
|
|
$sn = new Status_network();
|
|
$sn->find();
|
|
while ($sn->fetch()) {
|
|
$hosts[] = $sn->hostname;
|
|
}
|
|
return $hosts;
|
|
}
|
|
|
|
/**
|
|
* Instantiate an i/o manager class for the current site.
|
|
* If a multi-site capable handler is already present,
|
|
* we don't need to build a new one.
|
|
*
|
|
* @param string $class
|
|
*/
|
|
protected function instantiate($class)
|
|
{
|
|
if (isset($this->singletons[$class])) {
|
|
// Already instantiated a multi-site-capable handler.
|
|
// Just let it know it should listen to this site too!
|
|
$this->singletons[$class]->addSite(common_config('site', 'server'));
|
|
return;
|
|
}
|
|
|
|
$manager = $this->getManager($class);
|
|
|
|
if ($this->multiSite) {
|
|
$caps = $manager->multiSite();
|
|
if ($caps == IoManager::SINGLE_ONLY) {
|
|
throw new Exception("$class can't run with --all; aborting.");
|
|
}
|
|
if ($caps == IoManager::INSTANCE_PER_PROCESS) {
|
|
// Save this guy for later!
|
|
// We'll only need the one to cover multiple sites.
|
|
$this->singletons[$class] = $manager;
|
|
$manager->addSite(common_config('site', 'server'));
|
|
}
|
|
}
|
|
|
|
$this->managers[] = $manager;
|
|
}
|
|
|
|
protected function getManager($class)
|
|
{
|
|
return call_user_func(array($class, 'get'));
|
|
}
|
|
|
|
/**
|
|
* Basic run loop...
|
|
*
|
|
* Initialize all io managers, then sit around waiting for input.
|
|
* Between events or timeouts, pass control back to idle() method
|
|
* to allow for any additional background processing.
|
|
*/
|
|
function service()
|
|
{
|
|
$this->logState('init');
|
|
$this->start();
|
|
|
|
while (true) {
|
|
$timeouts = array_values($this->pollTimeouts);
|
|
$timeouts[] = 60; // default max timeout
|
|
|
|
// Wait for something on one of our sockets
|
|
$sockets = array();
|
|
$managers = array();
|
|
foreach ($this->managers as $manager) {
|
|
foreach ($manager->getSockets() as $socket) {
|
|
$sockets[] = $socket;
|
|
$managers[] = $manager;
|
|
}
|
|
$timeouts[] = intval($manager->timeout());
|
|
}
|
|
|
|
$timeout = min($timeouts);
|
|
if ($sockets) {
|
|
$read = $sockets;
|
|
$write = array();
|
|
$except = array();
|
|
$this->logState('listening');
|
|
common_log(LOG_INFO, "Waiting up to $timeout seconds for socket data...");
|
|
$ready = stream_select($read, $write, $except, $timeout, 0);
|
|
|
|
if ($ready === false) {
|
|
common_log(LOG_ERR, "Error selecting on sockets");
|
|
} else if ($ready > 0) {
|
|
foreach ($read as $socket) {
|
|
$index = array_search($socket, $sockets, true);
|
|
if ($index !== false) {
|
|
$this->logState('queue');
|
|
$managers[$index]->handleInput($socket);
|
|
} else {
|
|
common_log(LOG_ERR, "Saw input on a socket we didn't listen to");
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if ($timeout > 0 && empty($sockets)) {
|
|
// If we had no listeners, sleep until the pollers' next requested wakeup.
|
|
common_log(LOG_INFO, "Sleeping $timeout seconds until next poll cycle...");
|
|
$this->logState('sleep');
|
|
sleep($timeout);
|
|
}
|
|
|
|
$this->logState('poll');
|
|
$this->poll();
|
|
|
|
$this->logState('idle');
|
|
$this->idle();
|
|
|
|
$memoryLimit = $this->softMemoryLimit();
|
|
if ($memoryLimit > 0) {
|
|
$usage = memory_get_usage();
|
|
if ($usage > $memoryLimit) {
|
|
common_log(LOG_INFO, "Queue thread hit soft memory limit ($usage > $memoryLimit); gracefully restarting.");
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
$this->logState('shutdown');
|
|
$this->finish();
|
|
}
|
|
|
|
/**
|
|
* Return fully-parsed soft memory limit in bytes.
|
|
* @return intval 0 or -1 if not set
|
|
*/
|
|
function softMemoryLimit()
|
|
{
|
|
$softLimit = trim(common_config('queue', 'softlimit'));
|
|
if (substr($softLimit, -1) == '%') {
|
|
$limit = trim(ini_get('memory_limit'));
|
|
$limit = $this->parseMemoryLimit($limit);
|
|
if ($limit > 0) {
|
|
return intval(substr($softLimit, 0, -1) * $limit / 100);
|
|
} else {
|
|
return -1;
|
|
}
|
|
} else {
|
|
return $this->parseMemoryLimit($limit);
|
|
}
|
|
return $softLimit;
|
|
}
|
|
|
|
/**
|
|
* Interpret PHP shorthand for memory_limit and friends.
|
|
* Why don't they just expose the actual numeric value? :P
|
|
* @param string $mem
|
|
* @return int
|
|
*/
|
|
protected function parseMemoryLimit($mem)
|
|
{
|
|
// http://www.php.net/manual/en/faq.using.php#faq.using.shorthandbytes
|
|
$size = array('k' => 1024,
|
|
'm' => 1024*1024,
|
|
'g' => 1024*1024*1024);
|
|
if (empty($mem)) {
|
|
return 0;
|
|
} else if (is_numeric($mem)) {
|
|
return intval($mem);
|
|
} else {
|
|
$mult = strtolower(substr($mem, -1));
|
|
if (isset($size[$mult])) {
|
|
return substr($mem, 0, -1) * $size[$mult];
|
|
} else {
|
|
return intval($mem);
|
|
}
|
|
}
|
|
}
|
|
|
|
function start()
|
|
{
|
|
foreach ($this->managers as $index => $manager) {
|
|
$manager->start($this);
|
|
// @fixme error check
|
|
if ($manager->pollInterval()) {
|
|
// We'll want to check for input on the first pass
|
|
$this->pollTimeouts[$index] = 0;
|
|
$this->lastPoll[$index] = 0;
|
|
}
|
|
}
|
|
}
|
|
|
|
function finish()
|
|
{
|
|
foreach ($this->managers as $manager) {
|
|
$manager->finish();
|
|
// @fixme error check
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Called during the idle portion of the runloop to see which handlers
|
|
*/
|
|
function poll()
|
|
{
|
|
foreach ($this->managers as $index => $manager) {
|
|
$interval = $manager->pollInterval();
|
|
if ($interval <= 0) {
|
|
// Not a polling manager.
|
|
continue;
|
|
}
|
|
|
|
if (isset($this->pollTimeouts[$index])) {
|
|
$timeout = $this->pollTimeouts[$index];
|
|
if (time() - $this->lastPoll[$index] < $timeout) {
|
|
// Not time to poll yet.
|
|
continue;
|
|
}
|
|
} else {
|
|
$timeout = 0;
|
|
}
|
|
$hit = $manager->poll();
|
|
|
|
$this->lastPoll[$index] = time();
|
|
if ($hit) {
|
|
// Do the next poll quickly, there may be more input!
|
|
$this->pollTimeouts[$index] = 0;
|
|
} else {
|
|
// Empty queue. Exponential backoff up to the maximum poll interval.
|
|
if ($timeout > 0) {
|
|
$timeout = min($timeout * 2, $interval);
|
|
} else {
|
|
$timeout = 1;
|
|
}
|
|
$this->pollTimeouts[$index] = $timeout;
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Called after each handled item or empty polling cycle.
|
|
* This is a good time to e.g. service your XMPP connection.
|
|
*/
|
|
function idle()
|
|
{
|
|
foreach ($this->managers as $manager) {
|
|
$manager->idle();
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Send thread state update to the monitoring server, if configured.
|
|
*
|
|
* @param string $state ('init', 'queue', 'shutdown' etc)
|
|
* @param string $substate (optional, eg queue name 'omb' 'sms' etc)
|
|
*/
|
|
protected function logState($state, $substate='')
|
|
{
|
|
$this->monitor->logState($this->id, $state, $substate);
|
|
}
|
|
|
|
/**
|
|
* Send thread stats.
|
|
* Thread ID will be implicit; other owners can be listed as well
|
|
* for per-queue and per-site records.
|
|
*
|
|
* @param string $key counter name
|
|
* @param array $owners list of owner keys like 'queue:jabber' or 'site:stat01'
|
|
*/
|
|
public function stats($key, $owners=array())
|
|
{
|
|
$owners[] = "thread:" . $this->id;
|
|
$this->monitor->stats($key, $owners);
|
|
}
|
|
}
|
|
|