From 2b6a39f70f1c1917ff53b7908ac2a1819c876166 Mon Sep 17 00:00:00 2001 From: Zach Copley Date: Mon, 15 Feb 2010 21:10:45 +0000 Subject: [PATCH 1/7] Fix for regression introduced with my last update to the TwitterStatusFetcher: the Twitter bridge was not saving a foreign user record when making a foreign link. --- plugins/TwitterBridge/twitter.php | 7 ++++--- plugins/TwitterBridge/twitterauthorization.php | 14 +++++++++++--- 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/plugins/TwitterBridge/twitter.php b/plugins/TwitterBridge/twitter.php index de30d9ebf1..bd7f19e71c 100644 --- a/plugins/TwitterBridge/twitter.php +++ b/plugins/TwitterBridge/twitter.php @@ -28,6 +28,7 @@ require_once INSTALLDIR . '/plugins/TwitterBridge/twitteroauthclient.php'; function add_twitter_user($twitter_id, $screen_name) { + common_debug("Add Twitter user"); $new_uri = 'http://twitter.com/' . $screen_name; @@ -40,7 +41,7 @@ function add_twitter_user($twitter_id, $screen_name) $luser->service = TWITTER_SERVICE; $result = $luser->delete(); - if (empty($result)) { + if ($result != false) { common_log(LOG_WARNING, "Twitter bridge - removed invalid Twitter user squatting on uri: $new_uri"); } @@ -93,9 +94,9 @@ function save_twitter_user($twitter_id, $screen_name) $screen_name, $oldname)); } - - return add_twitter_user($twitter_id, $screen_name); } + + return add_twitter_user($twitter_id, $screen_name); } function is_twitter_bound($notice, $flink) { diff --git a/plugins/TwitterBridge/twitterauthorization.php b/plugins/TwitterBridge/twitterauthorization.php index dbef438a4b..6822d33dd1 100644 --- a/plugins/TwitterBridge/twitterauthorization.php +++ b/plugins/TwitterBridge/twitterauthorization.php @@ -89,11 +89,15 @@ class TwitterauthorizationAction extends Action $user = common_current_user(); $flink = Foreign_link::getByUserID($user->id, TWITTER_SERVICE); - // If there's already a foreign link record, it means we already - // have an access token, and this is unecessary. So go back. + // If there's already a foreign link record and a foreign user + // it means the accounts are already linked, and this is unecessary. + // So go back. if (isset($flink)) { - common_redirect(common_local_url('twittersettings')); + $fuser = $flink->getForeignUser(); + if (!empty($fuser)) { + common_redirect(common_local_url('twittersettings')); + } } } @@ -254,6 +258,10 @@ class TwitterauthorizationAction extends Action { $flink = new Foreign_link(); + $flink->user_id = $user_id; + $flink->service = TWITTER_SERVICE; + $flink->delete(); // delete stale flink, if any + $flink->user_id = $user_id; $flink->foreign_id = $twuid; $flink->service = TWITTER_SERVICE; From fdf6ed7b1ade7dc4cf2f1f71063025719f3e722a Mon Sep 17 00:00:00 2001 From: Zach Copley Date: Mon, 15 Feb 2010 21:23:26 +0000 Subject: [PATCH 2/7] Better log msgs. Removed debugging statement. --- plugins/TwitterBridge/twitter.php | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/plugins/TwitterBridge/twitter.php b/plugins/TwitterBridge/twitter.php index bd7f19e71c..2a075300f2 100644 --- a/plugins/TwitterBridge/twitter.php +++ b/plugins/TwitterBridge/twitter.php @@ -28,8 +28,6 @@ require_once INSTALLDIR . '/plugins/TwitterBridge/twitteroauthclient.php'; function add_twitter_user($twitter_id, $screen_name) { - common_debug("Add Twitter user"); - $new_uri = 'http://twitter.com/' . $screen_name; // Clear out any bad old foreign_users with the new user's legit URL @@ -42,8 +40,8 @@ function add_twitter_user($twitter_id, $screen_name) $result = $luser->delete(); if ($result != false) { - common_log(LOG_WARNING, - "Twitter bridge - removed invalid Twitter user squatting on uri: $new_uri"); + common_log(LOG_INFO, + "Twitter bridge - removed old Twitter user: $screen_name ($twitter_id)."); } $luser->free(); @@ -65,7 +63,8 @@ function add_twitter_user($twitter_id, $screen_name) "Twitter bridge - failed to add new Twitter user: $twitter_id - $screen_name."); common_log_db_error($fuser, 'INSERT', __FILE__); } else { - common_debug("Twitter bridge - Added new Twitter user: $screen_name ($twitter_id)."); + common_log(LOG_INFO, + "Twitter bridge - Added new Twitter user: $screen_name ($twitter_id)."); } return $result; From 01c428796f7986fddc0e602656a57ea63055bffd Mon Sep 17 00:00:00 2001 From: Zach Copley Date: Mon, 15 Feb 2010 21:53:49 +0000 Subject: [PATCH 3/7] Twitter-bridge: lookup old foreign_user by primary key not url --- plugins/TwitterBridge/twitter.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/TwitterBridge/twitter.php b/plugins/TwitterBridge/twitter.php index 2a075300f2..ce989768e6 100644 --- a/plugins/TwitterBridge/twitter.php +++ b/plugins/TwitterBridge/twitter.php @@ -35,7 +35,7 @@ function add_twitter_user($twitter_id, $screen_name) // repoed, and things like that. $luser = new Foreign_user(); - $luser->uri = $new_uri; + $luser->id = $twitter_id; $luser->service = TWITTER_SERVICE; $result = $luser->delete(); From 9f8e25bfe70134b7e19bb0067062e55bbb5b6a23 Mon Sep 17 00:00:00 2001 From: Zach Copley Date: Mon, 15 Feb 2010 22:13:10 +0000 Subject: [PATCH 4/7] Use static class method for looking up Twitter user --- plugins/TwitterBridge/twitter.php | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/plugins/TwitterBridge/twitter.php b/plugins/TwitterBridge/twitter.php index ce989768e6..e5afde62ca 100644 --- a/plugins/TwitterBridge/twitter.php +++ b/plugins/TwitterBridge/twitter.php @@ -28,15 +28,11 @@ require_once INSTALLDIR . '/plugins/TwitterBridge/twitteroauthclient.php'; function add_twitter_user($twitter_id, $screen_name) { - $new_uri = 'http://twitter.com/' . $screen_name; - // Clear out any bad old foreign_users with the new user's legit URL // This can happen when users move around or fakester accounts get // repoed, and things like that. - $luser = new Foreign_user(); - $luser->id = $twitter_id; - $luser->service = TWITTER_SERVICE; + $luser = Foreign_user::getForeignUser($twitter_id, TWITTER_SERVICE); $result = $luser->delete(); if ($result != false) { @@ -44,11 +40,6 @@ function add_twitter_user($twitter_id, $screen_name) "Twitter bridge - removed old Twitter user: $screen_name ($twitter_id)."); } - $luser->free(); - unset($luser); - - // Otherwise, create a new Twitter user - $fuser = new Foreign_user(); $fuser->nickname = $screen_name; From 8869ccc94e57a50c729aca97aaf878751b39fb3b Mon Sep 17 00:00:00 2001 From: Brion Vibber Date: Mon, 15 Feb 2010 15:19:16 -0800 Subject: [PATCH 5/7] Temporary debug hack tracking down 'revoked accesstoken' issue with OMB posts --- extlib/libomb/service_provider.php | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/extlib/libomb/service_provider.php b/extlib/libomb/service_provider.php index 753152713b..a1c69e86ff 100755 --- a/extlib/libomb/service_provider.php +++ b/extlib/libomb/service_provider.php @@ -285,6 +285,10 @@ class OMB_Service_Provider { list($consumer, $token) = $this->getOAuthServer()->verify_request($req); } catch (OAuthException $e) { header('HTTP/1.1 403 Forbidden'); + // @debug hack + throw OMB_RemoteServiceException::forRequest($uri, + 'Revoked accesstoken for ' . $listenee . ': ' . $e->getMessage()); + // @end debug throw OMB_RemoteServiceException::forRequest($uri, 'Revoked accesstoken for ' . $listenee); } From 3d0c3f0577fb1b0a83bb65ae6439f018932c5c38 Mon Sep 17 00:00:00 2001 From: Brion Vibber Date: Tue, 16 Feb 2010 09:15:29 -0800 Subject: [PATCH 6/7] Pull fix from testing branch: use new encoding funcs w/ stomp queues --- lib/queuemanager.php | 36 +++++++++++++++++++++++++----------- lib/stompqueuemanager.php | 26 +++++++------------------- 2 files changed, 32 insertions(+), 30 deletions(-) diff --git a/lib/queuemanager.php b/lib/queuemanager.php index afe710e884..149617eb50 100644 --- a/lib/queuemanager.php +++ b/lib/queuemanager.php @@ -155,26 +155,26 @@ abstract class QueueManager extends IoManager } /** - * Encode an object for queued storage. - * Next gen may use serialization. + * Encode an object or variable for queued storage. + * Notice objects are currently stored as an id reference; + * other items are serialized. * - * @param mixed $object + * @param mixed $item * @return string */ - protected function encode($object) + protected function encode($item) { - if ($object instanceof Notice) { - return $object->id; - } else if (is_string($object)) { - return $object; + if ($item instanceof Notice) { + // Backwards compat + return $item->id; } else { - throw new ServerException("Can't queue this type", 500); + return serialize($item); } } /** * Decode an object from queued storage. - * Accepts back-compat notice reference entries and strings for now. + * Accepts notice reference entries and serialized items. * * @param string * @return mixed @@ -182,9 +182,23 @@ abstract class QueueManager extends IoManager protected function decode($frame) { if (is_numeric($frame)) { + // Back-compat for notices... return Notice::staticGet(intval($frame)); - } else { + } elseif (substr($frame, 0, 1) == '<') { + // Back-compat for XML source return $frame; + } else { + // Deserialize! + #$old = error_reporting(); + #error_reporting($old & ~E_NOTICE); + $out = unserialize($frame); + #error_reporting($old); + + if ($out === false && $frame !== 'b:0;') { + common_log(LOG_ERR, "Couldn't unserialize queued frame: $frame"); + return false; + } + return $out; } } diff --git a/lib/stompqueuemanager.php b/lib/stompqueuemanager.php index cc4c817d8f..cd62c25bd8 100644 --- a/lib/stompqueuemanager.php +++ b/lib/stompqueuemanager.php @@ -549,26 +549,14 @@ class StompQueueManager extends QueueManager } $host = $this->cons[$idx]->getServer(); - if (is_numeric($frame->body)) { - $id = intval($frame->body); - $info = "notice $id posted at {$frame->headers['created']} in queue $queue from $host"; - - $notice = Notice::staticGet('id', $id); - if (empty($notice)) { - $this->_log(LOG_WARNING, "Skipping missing $info"); - $this->ack($idx, $frame); - $this->commit($idx); - $this->begin($idx); - $this->stats('badnotice', $queue); - return false; - } - - $item = $notice; - } else { - // @fixme should we serialize, or json, or what here? - $info = "string posted at {$frame->headers['created']} in queue $queue from $host"; - $item = $frame->body; + $item = $this->decode($frame->body); + if (empty($item)) { + $this->_log(LOG_ERR, "Skipping empty or deleted item in queue $queue from $host"); + return true; } + $info = $this->logrep($item) . " posted at " . + $frame->headers['created'] . " in queue $queue from $host"; + $this->_log(LOG_DEBUG, "Dequeued $info"); $handler = $this->getHandler($queue); if (!$handler) { From c74aea589d5a79d7048470d44e457dffc8919ad3 Mon Sep 17 00:00:00 2001 From: Brion Vibber Date: Tue, 16 Feb 2010 09:01:59 -0800 Subject: [PATCH 7/7] Stomp queue restructuring for mass scalability: - Multiplexing queues into groups and for multiple sites. - Sharing vs breakout configurable per site and per queue via $config['queue']['breakout'] - Detect how many times a message is redelivered, discard if it's killed too many daemons - count configurable with $config['queue']['max_retries'] - can dump the items to files in $config['queue']['dead_letter_dir'] Queue daemon memory & resource leak fixes: - avoid unnecessary reconnections to memcached server (switch persistent connections back in on second initialization, assuming it's child process) - monkey-patch for leaky .ini loads in DB_DataObject::databaseStructure() - was leaking 200k per active switch - applied leak fixes to Status_network as well, using intermediate base Safe_DataObject for both it and Memcache_DataObject Misc queue fixes: - correct handling of child processes exiting due to signal termination instead of regular exit - shutdown instead of infinite respawn loop if we're already past the soft memory limit at startup - Added --all option for xmppdaemon... still opens one xmpp connection per site that has xmpp active Cache updates: - add Cache::increment() method with native support for memcached atomic increment --- classes/Memcached_DataObject.php | 52 +---- classes/Safe_DataObject.php | 250 +++++++++++++++++++++ classes/Status_network.php | 16 +- lib/cache.php | 26 +++ lib/dbqueuemanager.php | 7 +- lib/default.php | 8 +- lib/iomanager.php | 5 +- lib/iomaster.php | 74 +++---- lib/queued_xmpp.php | 2 +- lib/queuemanager.php | 97 +++++--- lib/spawningdaemon.php | 16 +- lib/statusnet.php | 54 +++++ lib/stompqueuemanager.php | 369 ++++++++++++++----------------- lib/xmppmanager.php | 8 +- plugins/MemcachePlugin.php | 33 ++- scripts/queuedaemon.php | 20 +- scripts/xmppdaemon.php | 28 ++- 17 files changed, 697 insertions(+), 368 deletions(-) create mode 100644 classes/Safe_DataObject.php diff --git a/classes/Memcached_DataObject.php b/classes/Memcached_DataObject.php index ab65c30ce2..16c3d906ce 100644 --- a/classes/Memcached_DataObject.php +++ b/classes/Memcached_DataObject.php @@ -19,57 +19,8 @@ if (!defined('STATUSNET') && !defined('LACONICA')) { exit(1); } -class Memcached_DataObject extends DB_DataObject +class Memcached_DataObject extends Safe_DataObject { - /** - * Destructor to free global memory resources associated with - * this data object when it's unset or goes out of scope. - * DB_DataObject doesn't do this yet by itself. - */ - - function __destruct() - { - $this->free(); - if (method_exists('DB_DataObject', '__destruct')) { - parent::__destruct(); - } - } - - /** - * Magic function called at serialize() time. - * - * We use this to drop a couple process-specific references - * from DB_DataObject which can cause trouble in future - * processes. - * - * @return array of variable names to include in serialization. - */ - function __sleep() - { - $vars = array_keys(get_object_vars($this)); - $skip = array('_DB_resultid', '_link_loaded'); - return array_diff($vars, $skip); - } - - /** - * Magic function called at unserialize() time. - * - * Clean out some process-specific variables which might - * be floating around from a previous process's cached - * objects. - * - * Old cached objects may still have them. - */ - function __wakeup() - { - // Refers to global state info from a previous process. - // Clear this out so we don't accidentally break global - // state in *this* process. - $this->_DB_resultid = null; - // We don't have any local DBO refs, so clear these out. - $this->_link_loaded = false; - } - /** * Wrapper for DB_DataObject's static lookup using memcached * as backing instead of an in-process cache array. @@ -579,3 +530,4 @@ class Memcached_DataObject extends DB_DataObject return $c->set($cacheKey, $value); } } + diff --git a/classes/Safe_DataObject.php b/classes/Safe_DataObject.php new file mode 100644 index 0000000000..021f7b5064 --- /dev/null +++ b/classes/Safe_DataObject.php @@ -0,0 +1,250 @@ +. + */ + +if (!defined('STATUSNET') && !defined('LACONICA')) { exit(1); } + +/** + * Extended DB_DataObject to improve a few things: + * - free global resources from destructor + * - remove bogus global references from serialized objects + * - don't leak memory when loading already-used .ini files + * (eg when using the same schema on thousands of databases) + */ +class Safe_DataObject extends DB_DataObject +{ + /** + * Destructor to free global memory resources associated with + * this data object when it's unset or goes out of scope. + * DB_DataObject doesn't do this yet by itself. + */ + + function __destruct() + { + $this->free(); + if (method_exists('DB_DataObject', '__destruct')) { + parent::__destruct(); + } + } + + /** + * Magic function called at serialize() time. + * + * We use this to drop a couple process-specific references + * from DB_DataObject which can cause trouble in future + * processes. + * + * @return array of variable names to include in serialization. + */ + function __sleep() + { + $vars = array_keys(get_object_vars($this)); + $skip = array('_DB_resultid', '_link_loaded'); + return array_diff($vars, $skip); + } + + /** + * Magic function called at unserialize() time. + * + * Clean out some process-specific variables which might + * be floating around from a previous process's cached + * objects. + * + * Old cached objects may still have them. + */ + function __wakeup() + { + // Refers to global state info from a previous process. + // Clear this out so we don't accidentally break global + // state in *this* process. + $this->_DB_resultid = null; + // We don't have any local DBO refs, so clear these out. + $this->_link_loaded = false; + } + + + /** + * Work around memory-leak bugs... + * Had to copy-paste the whole function in order to patch a couple lines of it. + * Would be nice if this code was better factored. + * + * @param optional string name of database to assign / read + * @param optional array structure of database, and keys + * @param optional array table links + * + * @access public + * @return true or PEAR:error on wrong paramenters.. or false if no file exists.. + * or the array(tablename => array(column_name=>type)) if called with 1 argument.. (databasename) + */ + function databaseStructure() + { + + global $_DB_DATAOBJECT; + + // Assignment code + + if ($args = func_get_args()) { + + if (count($args) == 1) { + + // this returns all the tables and their structure.. + if (!empty($_DB_DATAOBJECT['CONFIG']['debug'])) { + $this->debug("Loading Generator as databaseStructure called with args",1); + } + + $x = new DB_DataObject; + $x->_database = $args[0]; + $this->_connect(); + $DB = &$_DB_DATAOBJECT['CONNECTIONS'][$this->_database_dsn_md5]; + + $tables = $DB->getListOf('tables'); + class_exists('DB_DataObject_Generator') ? '' : + require_once 'DB/DataObject/Generator.php'; + + foreach($tables as $table) { + $y = new DB_DataObject_Generator; + $y->fillTableSchema($x->_database,$table); + } + return $_DB_DATAOBJECT['INI'][$x->_database]; + } else { + + $_DB_DATAOBJECT['INI'][$args[0]] = isset($_DB_DATAOBJECT['INI'][$args[0]]) ? + $_DB_DATAOBJECT['INI'][$args[0]] + $args[1] : $args[1]; + + if (isset($args[1])) { + $_DB_DATAOBJECT['LINKS'][$args[0]] = isset($_DB_DATAOBJECT['LINKS'][$args[0]]) ? + $_DB_DATAOBJECT['LINKS'][$args[0]] + $args[2] : $args[2]; + } + return true; + } + + } + + + + if (!$this->_database) { + $this->_connect(); + } + + // loaded already? + if (!empty($_DB_DATAOBJECT['INI'][$this->_database])) { + + // database loaded - but this is table is not available.. + if ( + empty($_DB_DATAOBJECT['INI'][$this->_database][$this->__table]) + && !empty($_DB_DATAOBJECT['CONFIG']['proxy']) + ) { + if (!empty($_DB_DATAOBJECT['CONFIG']['debug'])) { + $this->debug("Loading Generator to fetch Schema",1); + } + class_exists('DB_DataObject_Generator') ? '' : + require_once 'DB/DataObject/Generator.php'; + + + $x = new DB_DataObject_Generator; + $x->fillTableSchema($this->_database,$this->__table); + } + return true; + } + + + if (empty($_DB_DATAOBJECT['CONFIG'])) { + DB_DataObject::_loadConfig(); + } + + // if you supply this with arguments, then it will take those + // as the database and links array... + + $schemas = isset($_DB_DATAOBJECT['CONFIG']['schema_location']) ? + array("{$_DB_DATAOBJECT['CONFIG']['schema_location']}/{$this->_database}.ini") : + array() ; + + if (isset($_DB_DATAOBJECT['CONFIG']["ini_{$this->_database}"])) { + $schemas = is_array($_DB_DATAOBJECT['CONFIG']["ini_{$this->_database}"]) ? + $_DB_DATAOBJECT['CONFIG']["ini_{$this->_database}"] : + explode(PATH_SEPARATOR,$_DB_DATAOBJECT['CONFIG']["ini_{$this->_database}"]); + } + + + /* BEGIN CHANGED FROM UPSTREAM */ + $_DB_DATAOBJECT['INI'][$this->_database] = $this->parseIniFiles($schemas); + /* END CHANGED FROM UPSTREAM */ + + // now have we loaded the structure.. + + if (!empty($_DB_DATAOBJECT['INI'][$this->_database][$this->__table])) { + return true; + } + // - if not try building it.. + if (!empty($_DB_DATAOBJECT['CONFIG']['proxy'])) { + class_exists('DB_DataObject_Generator') ? '' : + require_once 'DB/DataObject/Generator.php'; + + $x = new DB_DataObject_Generator; + $x->fillTableSchema($this->_database,$this->__table); + // should this fail!!!??? + return true; + } + $this->debug("Cant find database schema: {$this->_database}/{$this->__table} \n". + "in links file data: " . print_r($_DB_DATAOBJECT['INI'],true),"databaseStructure",5); + // we have to die here!! - it causes chaos if we dont (including looping forever!) + $this->raiseError( "Unable to load schema for database and table (turn debugging up to 5 for full error message)", DB_DATAOBJECT_ERROR_INVALIDARGS, PEAR_ERROR_DIE); + return false; + } + + /** For parseIniFiles */ + protected static $iniCache = array(); + + /** + * When switching site configurations, DB_DataObject was loading its + * .ini files over and over, leaking gobs of memory. + * This refactored helper function uses a local cache of .ini files + * to minimize the leaks. + * + * @param array of .ini file names $schemas + * @return array + */ + protected function parseIniFiles($schemas) + { + $key = implode("|", $schemas); + if (!isset(Safe_DataObject::$iniCache[$key])) { + $data = array(); + foreach ($schemas as $ini) { + if (file_exists($ini) && is_file($ini)) { + $data = array_merge($data, parse_ini_file($ini, true)); + + if (!empty($_DB_DATAOBJECT['CONFIG']['debug'])) { + if (!is_readable ($ini)) { + $this->debug("ini file is not readable: $ini","databaseStructure",1); + } else { + $this->debug("Loaded ini file: $ini","databaseStructure",1); + } + } + } else { + if (!empty($_DB_DATAOBJECT['CONFIG']['debug'])) { + $this->debug("Missing ini file: $ini","databaseStructure",1); + } + } + } + Safe_DataObject::$iniCache[$key] = $data; + } + + return Safe_DataObject::$iniCache[$key]; + } +} + diff --git a/classes/Status_network.php b/classes/Status_network.php index 4bda24b6a0..a452c32ce0 100644 --- a/classes/Status_network.php +++ b/classes/Status_network.php @@ -21,7 +21,7 @@ if (!defined('STATUSNET') && !defined('LACONICA')) { exit(1); } -class Status_network extends DB_DataObject +class Status_network extends Safe_DataObject { ###START_AUTOCODE /* the code below is auto generated do not remove the above tag */ @@ -57,6 +57,7 @@ class Status_network extends DB_DataObject ###END_AUTOCODE static $cache = null; + static $cacheInitialized = false; static $base = null; static $wildcard = null; @@ -78,11 +79,15 @@ class Status_network extends DB_DataObject if (class_exists('Memcache')) { self::$cache = new Memcache(); - // Can't close persistent connections, making forking painful. + // If we're a parent command-line process we need + // to be able to close out the connection after + // forking, so disable persistence. // - // @fixme only do this in *parent* CLI processes. - // single-process and child-processes *should* use persistent. - $persist = php_sapi_name() != 'cli'; + // We'll turn it back on again the second time + // through which will either be in a child process, + // or a single-process script which is switching + // configurations. + $persist = php_sapi_name() != 'cli' || self::$cacheInitialized; if (is_array($servers)) { foreach($servers as $server) { self::$cache->addServer($server, 11211, $persist); @@ -90,6 +95,7 @@ class Status_network extends DB_DataObject } else { self::$cache->addServer($servers, 11211, $persist); } + self::$cacheInitialized = true; } self::$base = $dbname; diff --git a/lib/cache.php b/lib/cache.php index 635c96ad4c..b3ec7534f5 100644 --- a/lib/cache.php +++ b/lib/cache.php @@ -157,6 +157,32 @@ class Cache return $success; } + /** + * Atomically increment an existing numeric value. + * Existing expiration time should remain unchanged, if any. + * + * @param string $key The key to use for lookups + * @param int $step Amount to increment (default 1) + * + * @return mixed incremented value, or false if not set. + */ + function increment($key, $step=1) + { + $value = false; + if (Event::handle('StartCacheIncrement', array(&$key, &$step, &$value))) { + // Fallback is not guaranteed to be atomic, + // and may original expiry value. + $value = $this->get($key); + if ($value !== false) { + $value += $step; + $ok = $this->set($key, $value); + $got = $this->get($key); + } + Event::handle('EndCacheIncrement', array($key, $step, $value)); + } + return $value; + } + /** * Delete the value associated with a key * diff --git a/lib/dbqueuemanager.php b/lib/dbqueuemanager.php index c6350fc669..3032e4ec7a 100644 --- a/lib/dbqueuemanager.php +++ b/lib/dbqueuemanager.php @@ -72,7 +72,7 @@ class DBQueueManager extends QueueManager public function poll() { $this->_log(LOG_DEBUG, 'Checking for notices...'); - $qi = Queue_item::top($this->getQueues()); + $qi = Queue_item::top($this->activeQueues()); if (empty($qi)) { $this->_log(LOG_DEBUG, 'No notices waiting; idling.'); return false; @@ -142,9 +142,4 @@ class DBQueueManager extends QueueManager $this->stats('error', $queue); } - - protected function _log($level, $msg) - { - common_log($level, 'DBQueueManager: '.$msg); - } } diff --git a/lib/default.php b/lib/default.php index bf4b83718d..a74cccae12 100644 --- a/lib/default.php +++ b/lib/default.php @@ -81,7 +81,7 @@ $default = 'subsystem' => 'db', # default to database, or 'stomp' 'stomp_server' => null, 'queue_basename' => '/queue/statusnet/', - 'control_channel' => '/topic/statusnet-control', // broadcasts to all queue daemons + 'control_channel' => '/topic/statusnet/control', // broadcasts to all queue daemons 'stomp_username' => null, 'stomp_password' => null, 'stomp_persistent' => true, // keep items across queue server restart, if persistence is enabled @@ -91,6 +91,12 @@ $default = 'spawndelay' => 1, // Wait at least N seconds between (re)spawns of child processes to avoid slamming the queue server with subscription startup 'debug_memory' => false, // true to spit memory usage to log 'inboxes' => true, // true to do inbox distribution & output queueing from in background via 'distrib' queue + 'breakout' => array('*' => 'shared'), // set global or per-handler queue breakout + // 'shared': use a shared queue for all sites + // 'handler': share each/this handler over multiple sites + // 'site': break out for each/this handler on this site + 'max_retries' => 10, // drop messages after N failed attempts to process (Stomp) + 'dead_letter_dir' => false, // set to directory to save dropped messages into (Stomp) ), 'license' => array('type' => 'cc', # can be 'cc', 'allrightsreserved', 'private' diff --git a/lib/iomanager.php b/lib/iomanager.php index ee2ff958b9..217599a6d2 100644 --- a/lib/iomanager.php +++ b/lib/iomanager.php @@ -59,9 +59,10 @@ abstract class IoManager * your manager about each site you'll have to handle so you * can do any necessary per-site setup. * - * @param string $site target site server name + * The new site will be the currently live configuration during + * this call. */ - public function addSite($site) + public function addSite() { /* no-op */ } diff --git a/lib/iomaster.php b/lib/iomaster.php index bcab3542be..54e2dfe841 100644 --- a/lib/iomaster.php +++ b/lib/iomaster.php @@ -56,9 +56,9 @@ abstract class IoMaster $this->multiSite = $multiSite; } if ($this->multiSite) { - $this->sites = $this->findAllSites(); + $this->sites = StatusNet::findAllSites(); } else { - $this->sites = array(common_config('site', 'server')); + $this->sites = array(StatusNet::currentSite()); } if (empty($this->sites)) { @@ -66,9 +66,7 @@ abstract class IoMaster } foreach ($this->sites as $site) { - if ($site != common_config('site', 'server')) { - StatusNet::init($site); - } + StatusNet::switchSite($site); $this->initManagers(); } } @@ -81,58 +79,32 @@ abstract class IoMaster */ abstract function initManagers(); - /** - * Pull all local sites from status_network table. - * @return array of hostnames - */ - protected function findAllSites() - { - $hosts = array(); - $sn = new Status_network(); - $sn->find(); - while ($sn->fetch()) { - $hosts[] = $sn->getServerName(); - } - return $hosts; - } - /** * Instantiate an i/o manager class for the current site. * If a multi-site capable handler is already present, * we don't need to build a new one. * - * @param string $class + * @param mixed $manager class name (to run $class::get()) or object */ - protected function instantiate($class) + protected function instantiate($manager) { - if (isset($this->singletons[$class])) { - // Already instantiated a multi-site-capable handler. - // Just let it know it should listen to this site too! - $this->singletons[$class]->addSite(common_config('site', 'server')); - return; + if (is_string($manager)) { + $manager = call_user_func(array($class, 'get')); } - $manager = $this->getManager($class); - - if ($this->multiSite) { - $caps = $manager->multiSite(); - if ($caps == IoManager::SINGLE_ONLY) { + $caps = $manager->multiSite(); + if ($caps == IoManager::SINGLE_ONLY) { + if ($this->multiSite) { throw new Exception("$class can't run with --all; aborting."); } - if ($caps == IoManager::INSTANCE_PER_PROCESS) { - // Save this guy for later! - // We'll only need the one to cover multiple sites. - $this->singletons[$class] = $manager; - $manager->addSite(common_config('site', 'server')); - } + } else if ($caps == IoManager::INSTANCE_PER_PROCESS) { + $manager->addSite(); } - $this->managers[] = $manager; - } - - protected function getManager($class) - { - return call_user_func(array($class, 'get')); + if (!in_array($manager, $this->managers, true)) { + // Only need to save singletons once + $this->managers[] = $manager; + } } /** @@ -146,6 +118,7 @@ abstract class IoMaster { $this->logState('init'); $this->start(); + $this->checkMemory(false); while (!$this->shutdown) { $timeouts = array_values($this->pollTimeouts); @@ -209,17 +182,24 @@ abstract class IoMaster /** * Check runtime memory usage, possibly triggering a graceful shutdown * and thread respawn if we've crossed the soft limit. + * + * @param boolean $respawn if false we'll shut down instead of respawning */ - protected function checkMemory() + protected function checkMemory($respawn=true) { $memoryLimit = $this->softMemoryLimit(); if ($memoryLimit > 0) { $usage = memory_get_usage(); if ($usage > $memoryLimit) { common_log(LOG_INFO, "Queue thread hit soft memory limit ($usage > $memoryLimit); gracefully restarting."); - $this->requestRestart(); + if ($respawn) { + $this->requestRestart(); + } else { + $this->requestShutdown(); + } } else if (common_config('queue', 'debug_memory')) { - common_log(LOG_DEBUG, "Memory usage $usage"); + $fmt = number_format($usage); + common_log(LOG_DEBUG, "Memory usage $fmt"); } } } diff --git a/lib/queued_xmpp.php b/lib/queued_xmpp.php index 4b890c4ca4..fdd074db29 100644 --- a/lib/queued_xmpp.php +++ b/lib/queued_xmpp.php @@ -63,7 +63,7 @@ class Queued_XMPP extends XMPPHP_XMPP */ public function send($msg, $timeout=NULL) { - $qm = QueueManager::get(); + $qm = QueueManager::get('xmppout'); $qm->enqueue(strval($msg), 'xmppout'); } diff --git a/lib/queuemanager.php b/lib/queuemanager.php index 149617eb50..8f8c8f133f 100644 --- a/lib/queuemanager.php +++ b/lib/queuemanager.php @@ -39,9 +39,10 @@ abstract class QueueManager extends IoManager { static $qm = null; - public $master = null; - public $handlers = array(); - public $groups = array(); + protected $master = null; + protected $handlers = array(); + protected $groups = array(); + protected $activeGroups = array(); /** * Factory function to pull the appropriate QueueManager object @@ -215,55 +216,64 @@ abstract class QueueManager extends IoManager if (class_exists($class)) { return new $class(); } else { - common_log(LOG_ERR, "Nonexistent handler class '$class' for queue '$queue'"); + $this->_log(LOG_ERR, "Nonexistent handler class '$class' for queue '$queue'"); } } else { - common_log(LOG_ERR, "Requested handler for unkown queue '$queue'"); + $this->_log(LOG_ERR, "Requested handler for unkown queue '$queue'"); } return null; } /** * Get a list of registered queue transport names to be used - * for this daemon. + * for listening in this daemon. * * @return array of strings */ - function getQueues() + function activeQueues() { - $group = $this->activeGroup(); - return array_keys($this->groups[$group]); + $queues = array(); + foreach ($this->activeGroups as $group) { + if (isset($this->groups[$group])) { + $queues = array_merge($queues, $this->groups[$group]); + } + } + + return array_keys($queues); } /** - * Initialize the list of queue handlers + * Initialize the list of queue handlers for the current site. * * @event StartInitializeQueueManager * @event EndInitializeQueueManager */ function initialize() { - // @fixme we'll want to be able to listen to particular queues... + $this->handlers = array(); + $this->groups = array(); + $this->groupsByTransport = array(); + if (Event::handle('StartInitializeQueueManager', array($this))) { - $this->connect('plugin', 'PluginQueueHandler'); + $this->connect('distrib', 'DistribQueueHandler'); $this->connect('omb', 'OmbQueueHandler'); $this->connect('ping', 'PingQueueHandler'); - $this->connect('distrib', 'DistribQueueHandler'); if (common_config('sms', 'enabled')) { $this->connect('sms', 'SmsQueueHandler'); } // XMPP output handlers... - $this->connect('jabber', 'JabberQueueHandler'); - $this->connect('public', 'PublicQueueHandler'); - // @fixme this should get an actual queue - //$this->connect('confirm', 'XmppConfirmHandler'); + if (common_config('xmpp', 'enabled')) { + // Delivery prep, read by queuedaemon.php: + $this->connect('jabber', 'JabberQueueHandler'); + $this->connect('public', 'PublicQueueHandler'); + + // Raw output, read by xmppdaemon.php: + $this->connect('xmppout', 'XmppOutQueueHandler', 'xmpp'); + } // For compat with old plugins not registering their own handlers. $this->connect('plugin', 'PluginQueueHandler'); - - $this->connect('xmppout', 'XmppOutQueueHandler', 'xmppdaemon'); - } Event::handle('EndInitializeQueueManager', array($this)); } @@ -276,25 +286,41 @@ abstract class QueueManager extends IoManager * @param string $class * @param string $group */ - public function connect($transport, $class, $group='queuedaemon') + public function connect($transport, $class, $group='main') { $this->handlers[$transport] = $class; $this->groups[$group][$transport] = $class; + $this->groupsByTransport[$transport] = $group; } /** - * @return string queue group to use for this request + * Set the active group which will be used for listening. + * @param string $group */ - function activeGroup() + function setActiveGroup($group) { - $group = 'queuedaemon'; - if ($this->master) { - // hack hack - if ($this->master instanceof XmppMaster) { - return 'xmppdaemon'; - } + $this->activeGroups = array($group); + } + + /** + * Set the active group(s) which will be used for listening. + * @param array $groups + */ + function setActiveGroups($groups) + { + $this->activeGroups = $groups; + } + + /** + * @return string queue group for this queue + */ + function queueGroup($queue) + { + if (isset($this->groupsByTransport[$queue])) { + return $this->groupsByTransport[$queue]; + } else { + throw new Exception("Requested group for unregistered transport $queue"); } - return $group; } /** @@ -318,4 +344,15 @@ abstract class QueueManager extends IoManager $monitor->stats($key, $owners); } } + + protected function _log($level, $msg) + { + $class = get_class($this); + if ($this->activeGroups) { + $groups = ' (' . implode(',', $this->activeGroups) . ')'; + } else { + $groups = ''; + } + common_log($level, "$class$groups: $msg"); + } } diff --git a/lib/spawningdaemon.php b/lib/spawningdaemon.php index 862cbb4fa3..fd9ae43556 100644 --- a/lib/spawningdaemon.php +++ b/lib/spawningdaemon.php @@ -90,18 +90,24 @@ abstract class SpawningDaemon extends Daemon while (count($children) > 0) { $status = null; $pid = pcntl_wait($status); - if ($pid > 0 && pcntl_wifexited($status)) { - $exitCode = pcntl_wexitstatus($status); - + if ($pid > 0) { $i = array_search($pid, $children); if ($i === false) { - $this->log(LOG_ERR, "Unrecognized child pid $pid exited with status $exitCode"); + $this->log(LOG_ERR, "Ignoring exit of unrecognized child pid $pid"); continue; } + if (pcntl_wifexited($status)) { + $exitCode = pcntl_wexitstatus($status); + $info = "status $exitCode"; + } else if (pcntl_wifsignaled($status)) { + $exitCode = self::EXIT_ERR; + $signal = pcntl_wtermsig($status); + $info = "signal $signal"; + } unset($children[$i]); if ($this->shouldRespawn($exitCode)) { - $this->log(LOG_INFO, "Thread $i pid $pid exited with status $exitCode; respawing."); + $this->log(LOG_INFO, "Thread $i pid $pid exited with $info; respawing."); $pid = pcntl_fork(); if ($pid < 0) { diff --git a/lib/statusnet.php b/lib/statusnet.php index 29e9030267..9c7ede5a5d 100644 --- a/lib/statusnet.php +++ b/lib/statusnet.php @@ -101,6 +101,60 @@ class StatusNet self::initPlugins(); } + /** + * Get identifier of the currently active site configuration + * @return string + */ + public static function currentSite() + { + return common_config('site', 'nickname'); + } + + /** + * Change site configuration to site specified by nickname, + * if set up via Status_network. If not, sites other than + * the current will fail horribly. + * + * May throw exception or trigger a fatal error if the given + * site is missing or configured incorrectly. + * + * @param string $nickname + */ + public static function switchSite($nickname) + { + if ($nickname == StatusNet::currentSite()) { + return true; + } + + $sn = Status_network::staticGet($nickname); + if (empty($sn)) { + return false; + throw new Exception("No such site nickname '$nickname'"); + } + + $server = $sn->getServerName(); + StatusNet::init($server); + } + + /** + * Pull all local sites from status_network table. + * + * Behavior undefined if site is not configured via Status_network. + * + * @return array of nicknames + */ + public static function findAllSites() + { + $sites = array(); + $sn = new Status_network(); + $sn->find(); + while ($sn->fetch()) { + $sites[] = $sn->nickname; + } + return $sites; + } + + /** * Fire initialization events for all instantiated plugins. */ diff --git a/lib/stompqueuemanager.php b/lib/stompqueuemanager.php index cd62c25bd8..bfeeb23b7f 100644 --- a/lib/stompqueuemanager.php +++ b/lib/stompqueuemanager.php @@ -63,6 +63,7 @@ class StompQueueManager extends QueueManager $this->password = common_config('queue', 'stomp_password'); $this->base = common_config('queue', 'queue_basename'); $this->control = common_config('queue', 'control_channel'); + $this->subscriptions = array($this->control => $this->control); } /** @@ -75,17 +76,25 @@ class StompQueueManager extends QueueManager } /** - * Record each site we'll be handling input for in this process, - * so we can listen to the necessary queues for it. - * - * @fixme possibly actually do subscription here to save another - * loop over all sites later? - * @fixme possibly don't assume it's the current site + * Record queue subscriptions we'll need to handle the current site. */ - public function addSite($server) + public function addSite() { - $this->sites[] = $server; + $this->sites[] = StatusNet::currentSite(); + + // Set up handlers active for this site... $this->initialize(); + + foreach ($this->activeGroups as $group) { + if (isset($this->groups[$group])) { + // Actual queues may be broken out or consolidated... + // Subscribe to all the target queues we'll need. + foreach ($this->groups[$group] as $transport => $class) { + $target = $this->queueName($transport); + $this->subscriptions[$target] = $target; + } + } + } } /** @@ -121,59 +130,11 @@ class StompQueueManager extends QueueManager } /** - * Instantiate the appropriate QueueHandler class for the given queue. + * Saves an object into the queue item table. * + * @param mixed $object * @param string $queue - * @return mixed QueueHandler or null - */ - function getHandler($queue) - { - $handlers = $this->handlers[$this->currentSite()]; - if (isset($handlers[$queue])) { - $class = $handlers[$queue]; - if (class_exists($class)) { - return new $class(); - } else { - common_log(LOG_ERR, "Nonexistent handler class '$class' for queue '$queue'"); - } - } else { - common_log(LOG_ERR, "Requested handler for unkown queue '$queue'"); - } - return null; - } - - /** - * Get a list of all registered queue transport names. * - * @return array of strings - */ - function getQueues() - { - $group = $this->activeGroup(); - $site = $this->currentSite(); - if (empty($this->groups[$site][$group])) { - return array(); - } else { - return array_keys($this->groups[$site][$group]); - } - } - - /** - * Register a queue transport name and handler class for your plugin. - * Only registered transports will be reliably picked up! - * - * @param string $transport - * @param string $class - * @param string $group - */ - public function connect($transport, $class, $group='queuedaemon') - { - $this->handlers[$this->currentSite()][$transport] = $class; - $this->groups[$this->currentSite()][$group][$transport] = $class; - } - - /** - * Saves a notice object reference into the queue item table. * @return boolean true on success * @throws StompException on connection or send error */ @@ -192,8 +153,11 @@ class StompQueueManager extends QueueManager */ protected function _doEnqueue($object, $queue, $idx) { - $msg = $this->encode($object); $rep = $this->logrep($object); + $envelope = array('site' => common_config('site', 'nickname'), + 'handler' => $queue, + 'payload' => $this->encode($object)); + $msg = serialize($envelope); $props = array('created' => common_sql_now()); if ($this->isPersistent($queue)) { @@ -205,11 +169,11 @@ class StompQueueManager extends QueueManager $result = $con->send($this->queueName($queue), $msg, $props); if (!$result) { - common_log(LOG_ERR, "Error sending $rep to $queue queue on $host"); + $this->_log(LOG_ERR, "Error sending $rep to $queue queue on $host"); return false; } - common_log(LOG_DEBUG, "complete remote queueing $rep for $queue on $host"); + $this->_log(LOG_DEBUG, "complete remote queueing $rep for $queue on $host"); $this->stats('enqueued', $queue); return true; } @@ -275,12 +239,14 @@ class StompQueueManager extends QueueManager $idx = $this->connectionFromSocket($socket); $con = $this->cons[$idx]; $host = $con->getServer(); + $this->defaultIdx = $idx; $ok = true; try { $frames = $con->readFrames(); } catch (StompException $e) { - common_log(LOG_ERR, "Lost connection to $host: " . $e->getMessage()); + $this->_log(LOG_ERR, "Lost connection to $host: " . $e->getMessage()); + fclose($socket); // ??? $this->cons[$idx] = null; $this->transaction[$idx] = null; $this->disconnect[$idx] = time(); @@ -289,14 +255,17 @@ class StompQueueManager extends QueueManager foreach ($frames as $frame) { $dest = $frame->headers['destination']; if ($dest == $this->control) { - if (!$this->handleControlSignal($idx, $frame)) { + if (!$this->handleControlSignal($frame)) { // We got a control event that requests a shutdown; // close out and stop handling anything else! break; } } else { - $ok = $ok && $this->handleItem($idx, $frame); + $ok = $this->handleItem($frame) && $ok; } + $this->ack($idx, $frame); + $this->commit($idx); + $this->begin($idx); } return $ok; } @@ -333,22 +302,9 @@ class StompQueueManager extends QueueManager parent::start($master); $this->_connectAll(); - common_log(LOG_INFO, "Subscribing to $this->control"); - foreach ($this->cons as $con) { - if ($con) { - $con->subscribe($this->control); - } - } - if ($this->sites) { - foreach ($this->sites as $server) { - StatusNet::init($server); - $this->doSubscribe(); - } - } else { - $this->doSubscribe(); - } foreach ($this->cons as $i => $con) { if ($con) { + $this->doSubscribe($con); $this->begin($i); } } @@ -356,9 +312,7 @@ class StompQueueManager extends QueueManager } /** - * Subscribe to all the queues we're going to need to handle... - * - * Side effects: in multi-site mode, may reset site configuration. + * Close out any active connections. * * @return bool return false on failure */ @@ -376,15 +330,6 @@ class StompQueueManager extends QueueManager return true; } - /** - * Get identifier of the currently active site configuration - * @return string - */ - protected function currentSite() - { - return common_config('site', 'server'); // @fixme switch to nickname - } - /** * Lazy open a single connection to Stomp queue server. * If multiple servers are configured, we let the Stomp client library @@ -441,6 +386,10 @@ class StompQueueManager extends QueueManager } } + /** + * Attempt to manually reconnect to the Stomp server for the given + * slot. If successful, set up our subscriptions on it. + */ protected function _reconnect($idx) { try { @@ -453,17 +402,7 @@ class StompQueueManager extends QueueManager $this->cons[$idx] = $con; $this->disconnect[$idx] = null; - // now we have to listen to everything... - // @fixme refactor this nicer. :P - $host = $con->getServer(); - $this->_log(LOG_INFO, "Resubscribing to $this->control on $host"); - $con->subscribe($this->control); - foreach ($this->subscriptions as $site => $queues) { - foreach ($queues as $queue) { - $this->_log(LOG_INFO, "Resubscribing to $queue on $host"); - $con->subscribe($queue); - } - } + $this->doSubscribe($con); $this->begin($idx); } else { // Try again later... @@ -487,41 +426,15 @@ class StompQueueManager extends QueueManager } /** - * Subscribe to all enabled notice queues for the current site. + * Set up all our raw queue subscriptions on the given connection + * @param LiberalStomp $con */ - protected function doSubscribe() + protected function doSubscribe(LiberalStomp $con) { - $site = $this->currentSite(); - $this->_connect(); - foreach ($this->getQueues() as $queue) { - $rawqueue = $this->queueName($queue); - $this->subscriptions[$site][$queue] = $rawqueue; - $this->_log(LOG_INFO, "Subscribing to $rawqueue"); - foreach ($this->cons as $con) { - if ($con) { - $con->subscribe($rawqueue); - } - } - } - } - - /** - * Subscribe from all enabled notice queues for the current site. - */ - protected function doUnsubscribe() - { - $site = $this->currentSite(); - $this->_connect(); - if (!empty($this->subscriptions[$site])) { - foreach ($this->subscriptions[$site] as $queue => $rawqueue) { - $this->_log(LOG_INFO, "Unsubscribing from $rawqueue"); - foreach ($this->cons as $con) { - if ($con) { - $con->unsubscribe($rawqueue); - } - } - unset($this->subscriptions[$site][$queue]); - } + $host = $con->getServer(); + foreach ($this->subscriptions as $queue) { + $this->_log(LOG_INFO, "Subscribing to $queue on $host"); + $con->subscribe($queue); } } @@ -534,25 +447,29 @@ class StompQueueManager extends QueueManager * Side effects: in multi-site mode, may reset site configuration to * match the site that queued the event. * - * @param int $idx connection index * @param StompFrame $frame - * @return bool + * @return bool success */ - protected function handleItem($idx, $frame) + protected function handleItem($frame) { - $this->defaultIdx = $idx; + $host = $this->cons[$this->defaultIdx]->getServer(); + $message = unserialize($frame->body); + $site = $message['site']; + $queue = $message['handler']; - list($site, $queue) = $this->parseDestination($frame->headers['destination']); - if ($site != $this->currentSite()) { - $this->stats('switch'); - StatusNet::init($site); + if ($this->isDeadletter($frame, $message)) { + $this->stats('deadletter', $queue); + return false; } - $host = $this->cons[$idx]->getServer(); - $item = $this->decode($frame->body); + // @fixme detect failing site switches + $this->switchSite($site); + + $item = $this->decode($message['payload']); if (empty($item)) { $this->_log(LOG_ERR, "Skipping empty or deleted item in queue $queue from $host"); - return true; + $this->stats('baditem', $queue); + return false; } $info = $this->logrep($item) . " posted at " . $frame->headers['created'] . " in queue $queue from $host"; @@ -561,16 +478,10 @@ class StompQueueManager extends QueueManager $handler = $this->getHandler($queue); if (!$handler) { $this->_log(LOG_ERR, "Missing handler class; skipping $info"); - $this->ack($idx, $frame); - $this->commit($idx); - $this->begin($idx); $this->stats('badhandler', $queue); return false; } - // If there's an exception when handling, - // log the error and let it get requeued. - try { $ok = $handler->handle($item); } catch (Exception $e) { @@ -578,25 +489,80 @@ class StompQueueManager extends QueueManager $ok = false; } - if (!$ok) { + if ($ok) { + $this->_log(LOG_INFO, "Successfully handled $info"); + $this->stats('handled', $queue); + } else { $this->_log(LOG_WARNING, "Failed handling $info"); - // FIXME we probably shouldn't have to do - // this kind of queue management ourselves; - // if we don't ack, it should resend... - $this->ack($idx, $frame); + // Requeing moves the item to the end of the line for its next try. + // @fixme add a manual retry count $this->enqueue($item, $queue); - $this->commit($idx); - $this->begin($idx); $this->stats('requeued', $queue); - return false; } - $this->_log(LOG_INFO, "Successfully handled $info"); - $this->ack($idx, $frame); - $this->commit($idx); - $this->begin($idx); - $this->stats('handled', $queue); - return true; + return $ok; + } + + /** + * Check if a redelivered message has been run through enough + * that we're going to give up on it. + * + * @param StompFrame $frame + * @param array $message unserialized message body + * @return boolean true if we should discard + */ + protected function isDeadLetter($frame, $message) + { + if (isset($frame->headers['redelivered']) && $frame->headers['redelivered'] == 'true') { + // Message was redelivered, possibly indicating a previous failure. + $msgId = $frame->headers['message-id']; + $site = $message['site']; + $queue = $message['handler']; + $msgInfo = "message $msgId for $site in queue $queue"; + + $deliveries = $this->incDeliveryCount($msgId); + if ($deliveries > common_config('queue', 'max_retries')) { + $info = "DEAD-LETTER FILE: Gave up after retry $deliveries on $msgInfo"; + + $outdir = common_config('queue', 'dead_letter_dir'); + if ($outdir) { + $filename = $outdir . "/$site-$queue-" . rawurlencode($msgId); + $info .= ": dumping to $filename"; + file_put_contents($filename, $message['payload']); + } + + common_log(LOG_ERR, $info); + return true; + } else { + common_log(LOG_INFO, "retry $deliveries on $msgInfo"); + } + } + return false; + } + + /** + * Update count of times we've re-encountered this message recently, + * triggered when we get a message marked as 'redelivered'. + * + * Requires a CLI-friendly cache configuration. + * + * @param string $msgId message-id header from message + * @return int number of retries recorded + */ + function incDeliveryCount($msgId) + { + $count = 0; + $cache = common_memcache(); + if ($cache) { + $key = 'statusnet:stomp:message-retries:' . $msgId; + $count = $cache->increment($key); + if (!$count) { + $count = 1; + $cache->set($key, $count, null, 3600); + $got = $cache->get($key); + } + } + return $count; } /** @@ -629,13 +595,22 @@ class StompQueueManager extends QueueManager } else { $this->_log(LOG_ERR, "Ignoring unrecognized control message: $message"); } - - $this->ack($idx, $frame); - $this->commit($idx); - $this->begin($idx); return $shutdown; } + /** + * Switch site, if necessary, and reset current handler assignments + * @param string $site + */ + function switchSite($site) + { + if ($site != StatusNet::currentSite()) { + $this->stats('switch'); + StatusNet::switchSite($site); + $this->initialize(); + } + } + /** * Set us up with queue subscriptions for a new site added at runtime, * triggered by a broadcast to the 'statusnet-control' topic. @@ -648,22 +623,17 @@ class StompQueueManager extends QueueManager if (empty($this->sites)) { if ($nickname == common_config('site', 'nickname')) { StatusNet::init(common_config('site', 'server')); - $this->doUnsubscribe(); - $this->doSubscribe(); } else { $this->_log(LOG_INFO, "Ignoring update ping for other site $nickname"); } } else { $sn = Status_network::staticGet($nickname); if ($sn) { - $server = $sn->getServerName(); // @fixme do config-by-nick - StatusNet::init($server); - if (empty($this->sites[$server])) { - $this->addSite($server); + $this->switchSite($nickname); + if (!in_array($nickname, $this->sites)) { + $this->addSite(); } - $this->_log(LOG_INFO, "(Re)subscribing to queues for site $nickname / $server"); - $this->doUnsubscribe(); - $this->doSubscribe(); + // @fixme update subscriptions, if applicable $this->stats('siteupdate'); } else { $this->_log(LOG_ERR, "Ignoring ping for unrecognized new site $nickname"); @@ -673,42 +643,47 @@ class StompQueueManager extends QueueManager /** * Combines the queue_basename from configuration with the - * site server name and queue name to give eg: + * group name for this queue to give eg: * - * /queue/statusnet/identi.ca/sms + * /queue/statusnet/main * * @param string $queue * @return string */ protected function queueName($queue) { - return common_config('queue', 'queue_basename') . - $this->currentSite() . '/' . $queue; + $base = common_config('queue', 'queue_basename'); + $group = $this->queueGroup($queue); + $breakout = $this->breakoutMode($queue); + if ($breakout == 'shared') { + return $base . "$group"; + } else if ($breakout == 'handler') { + return $base . "$group/$queue"; + } else if ($breakout == 'site') { + $site = StatusNet::currentSite(); + return $base . "$group/$queue/$site"; + } + throw Exception("Unrecognized queue breakout mode '$breakout' for '$queue'"); } /** - * Returns the site and queue name from the server-side queue. + * Get the breakout mode for the given queue on the current site. * - * @param string queue destination (eg '/queue/statusnet/identi.ca/sms') - * @return array of site and queue: ('identi.ca','sms') or false if unrecognized + * @param string $queue + * @return string one of 'shared', 'handler', 'site' */ - protected function parseDestination($dest) + protected function breakoutMode($queue) { - $prefix = common_config('queue', 'queue_basename'); - if (substr($dest, 0, strlen($prefix)) == $prefix) { - $rest = substr($dest, strlen($prefix)); - return explode("/", $rest, 2); + $breakout = common_config('queue', 'breakout'); + if (isset($breakout[$queue])) { + return $breakout[$queue]; + } else if (isset($breakout['*'])) { + return $breakout['*']; } else { - common_log(LOG_ERR, "Got a message from unrecognized stomp queue: $dest"); - return array(false, false); + return 'shared'; } } - function _log($level, $msg) - { - common_log($level, 'StompQueueManager: '.$msg); - } - protected function begin($idx) { if ($this->useTransactions) { diff --git a/lib/xmppmanager.php b/lib/xmppmanager.php index 985e7c32e4..f376358555 100644 --- a/lib/xmppmanager.php +++ b/lib/xmppmanager.php @@ -48,7 +48,7 @@ class XmppManager extends IoManager public static function get() { if (common_config('xmpp', 'enabled')) { - $site = common_config('site', 'server'); + $site = StatusNet::currentSite(); if (empty(self::$singletons[$site])) { self::$singletons[$site] = new XmppManager(); } @@ -69,7 +69,7 @@ class XmppManager extends IoManager function __construct() { - $this->site = common_config('site', 'server'); + $this->site = StatusNet::currentSite(); $this->resource = common_config('xmpp', 'resource') . 'daemon'; } @@ -476,10 +476,10 @@ class XmppManager extends IoManager */ protected function switchSite() { - if ($this->site != common_config('site', 'server')) { + if ($this->site != StatusNet::currentSite()) { common_log(LOG_DEBUG, __METHOD__ . ": switching to site $this->site"); $this->stats('switch'); - StatusNet::init($this->site); + StatusNet::switchSite($this->site); } } } diff --git a/plugins/MemcachePlugin.php b/plugins/MemcachePlugin.php index 2bc4b892bd..93a0583fe8 100644 --- a/plugins/MemcachePlugin.php +++ b/plugins/MemcachePlugin.php @@ -51,6 +51,8 @@ if (!defined('STATUSNET')) { class MemcachePlugin extends Plugin { + static $cacheInitialized = false; + private $_conn = null; public $servers = array('127.0.0.1;11211'); @@ -71,10 +73,21 @@ class MemcachePlugin extends Plugin function onInitializePlugin() { - if (is_null($this->persistent)) { + if (self::$cacheInitialized) { + $this->persistent = true; + } else { + // If we're a parent command-line process we need + // to be able to close out the connection after + // forking, so disable persistence. + // + // We'll turn it back on again the second time + // through which will either be in a child process, + // or a single-process script which is switching + // configurations. $this->persistent = (php_sapi_name() == 'cli') ? false : true; } $this->_ensureConn(); + self::$cacheInitialized = true; return true; } @@ -121,6 +134,24 @@ class MemcachePlugin extends Plugin return false; } + /** + * Atomically increment an existing numeric key value. + * Existing expiration time will not be changed. + * + * @param string &$key in; Key to use for lookups + * @param int &$step in; Amount to increment (default 1) + * @param mixed &$value out; Incremented value, or false if key not set. + * + * @return boolean hook success + */ + function onStartCacheIncrement(&$key, &$step, &$value) + { + $this->_ensureConn(); + $value = $this->_conn->increment($key, $step); + Event::handle('EndCacheIncrement', array($key, $step, $value)); + return false; + } + /** * Delete a value associated with a key * diff --git a/scripts/queuedaemon.php b/scripts/queuedaemon.php index 30a8a96026..d372d898fa 100755 --- a/scripts/queuedaemon.php +++ b/scripts/queuedaemon.php @@ -74,8 +74,6 @@ require_once(INSTALLDIR.'/lib/daemon.php'); require_once(INSTALLDIR.'/classes/Queue_item.php'); require_once(INSTALLDIR.'/classes/Notice.php'); -define('CLAIM_TIMEOUT', 1200); - /** * Queue handling daemon... * @@ -92,7 +90,7 @@ class QueueDaemon extends SpawningDaemon function __construct($id=null, $daemonize=true, $threads=1, $allsites=false) { parent::__construct($id, $daemonize, $threads); - $this->all = $allsites; + $this->allsites = $allsites; } /** @@ -108,7 +106,7 @@ class QueueDaemon extends SpawningDaemon $this->log(LOG_INFO, 'checking for queued notices'); $master = new QueueMaster($this->get_id()); - $master->init($this->all); + $master->init($this->allsites); try { $master->service(); } catch (Exception $e) { @@ -133,14 +131,16 @@ class QueueMaster extends IoMaster */ function initManagers() { - $classes = array(); - if (Event::handle('StartQueueDaemonIoManagers', array(&$classes))) { - $classes[] = 'QueueManager'; + $managers = array(); + if (Event::handle('StartQueueDaemonIoManagers', array(&$managers))) { + $qm = QueueManager::get(); + $qm->setActiveGroup('main'); + $managers[] = $qm; } - Event::handle('EndQueueDaemonIoManagers', array(&$classes)); + Event::handle('EndQueueDaemonIoManagers', array(&$managers)); - foreach ($classes as $class) { - $this->instantiate($class); + foreach ($managers as $manager) { + $this->instantiate($manager); } } } diff --git a/scripts/xmppdaemon.php b/scripts/xmppdaemon.php index 46dd9b90cc..9302f0c43c 100755 --- a/scripts/xmppdaemon.php +++ b/scripts/xmppdaemon.php @@ -20,13 +20,15 @@ define('INSTALLDIR', realpath(dirname(__FILE__) . '/..')); -$shortoptions = 'fi::'; -$longoptions = array('id::', 'foreground'); +$shortoptions = 'fi::a'; +$longoptions = array('id::', 'foreground', 'all'); $helptext = <<allsites = $allsites; } function runThread() @@ -51,7 +56,7 @@ class XMPPDaemon extends SpawningDaemon common_log(LOG_INFO, 'Waiting to listen to XMPP and queues'); $master = new XmppMaster($this->get_id()); - $master->init(); + $master->init($this->allsites); $master->service(); common_log(LOG_INFO, 'terminating normally'); @@ -69,15 +74,19 @@ class XmppMaster extends IoMaster */ function initManagers() { - // @fixme right now there's a hack in QueueManager to determine - // which queues to subscribe to based on the master class. - $this->instantiate('QueueManager'); - $this->instantiate('XmppManager'); + if (common_config('xmpp', 'enabled')) { + $qm = QueueManager::get(); + $qm->setActiveGroup('xmpp'); + $this->instantiate($qm); + $this->instantiate(XmppManager::get()); + } } } // Abort immediately if xmpp is not enabled, otherwise the daemon chews up // lots of CPU trying to connect to unconfigured servers +// @fixme do this check after we've run through the site list so we +// don't have to find an XMPP site to start up when using --all mode. if (common_config('xmpp','enabled')==false) { print "Aborting daemon - xmpp is disabled\n"; exit(); @@ -92,7 +101,8 @@ if (have_option('i', 'id')) { } $foreground = have_option('f', 'foreground'); +$all = have_option('a') || have_option('--all'); -$daemon = new XMPPDaemon($id, !$foreground); +$daemon = new XMPPDaemon($id, !$foreground, 1, $all); $daemon->runOnce();