diff --git a/classes/Inbox.php b/classes/Inbox.php index 312b4586b4..086dba1c9d 100644 --- a/classes/Inbox.php +++ b/classes/Inbox.php @@ -154,17 +154,23 @@ class Inbox extends Memcached_DataObject $ids = unpack('N*', $inbox->notice_ids); if (!empty($since_id)) { - $i = array_search($since_id, $ids); - if ($i !== false) { - $ids = array_slice($ids, 0, $i - 1); + $newids = array(); + foreach ($ids as $id) { + if ($id > $since_id) { + $newids[] = $id; + } } + $ids = $newids; } if (!empty($max_id)) { - $i = array_search($max_id, $ids); - if ($i !== false) { - $ids = array_slice($ids, $i - 1); + $newids = array(); + foreach ($ids as $id) { + if ($id <= $max_id) { + $newids[] = $id; + } } + $ids = $newids; } $ids = array_slice($ids, $offset, $limit); diff --git a/lib/iomaster.php b/lib/iomaster.php index 5d1071a392..ce77b53b2e 100644 --- a/lib/iomaster.php +++ b/lib/iomaster.php @@ -70,7 +70,7 @@ class IoMaster $classes = array(); if (Event::handle('StartIoManagerClasses', array(&$classes))) { $classes[] = 'QueueManager'; - if (common_config('xmpp', 'enabled')) { + if (common_config('xmpp', 'enabled') && !defined('XMPP_EMERGENCY_FLAG')) { $classes[] = 'XmppManager'; // handles pings/reconnects $classes[] = 'XmppConfirmManager'; // polls for outgoing confirmations } diff --git a/lib/queuemanager.php b/lib/queuemanager.php index a98c0efffb..291174d3c4 100644 --- a/lib/queuemanager.php +++ b/lib/queuemanager.php @@ -149,15 +149,17 @@ abstract class QueueManager extends IoManager function initialize() { if (Event::handle('StartInitializeQueueManager', array($this))) { - $this->connect('plugin', 'PluginQueueHandler'); - $this->connect('omb', 'OmbQueueHandler'); - $this->connect('ping', 'PingQueueHandler'); - if (common_config('sms', 'enabled')) { - $this->connect('sms', 'SmsQueueHandler'); + if (!defined('XMPP_ONLY_FLAG')) { // hack! + $this->connect('plugin', 'PluginQueueHandler'); + $this->connect('omb', 'OmbQueueHandler'); + $this->connect('ping', 'PingQueueHandler'); + if (common_config('sms', 'enabled')) { + $this->connect('sms', 'SmsQueueHandler'); + } } // XMPP output handlers... - if (common_config('xmpp', 'enabled')) { + if (common_config('xmpp', 'enabled') && !defined('XMPP_EMERGENCY_FLAG')) { $this->connect('jabber', 'JabberQueueHandler'); $this->connect('public', 'PublicQueueHandler'); @@ -165,10 +167,14 @@ abstract class QueueManager extends IoManager $this->connect('confirm', 'XmppConfirmHandler'); } - // For compat with old plugins not registering their own handlers. - $this->connect('plugin', 'PluginQueueHandler'); + if (!defined('XMPP_ONLY_FLAG')) { // hack! + // For compat with old plugins not registering their own handlers. + $this->connect('plugin', 'PluginQueueHandler'); + } + } + if (!defined('XMPP_ONLY_FLAG')) { // hack! + Event::handle('EndInitializeQueueManager', array($this)); } - Event::handle('EndInitializeQueueManager', array($this)); } /** diff --git a/lib/stompqueuemanager.php b/lib/stompqueuemanager.php index 3090e0bfb6..00590fdb69 100644 --- a/lib/stompqueuemanager.php +++ b/lib/stompqueuemanager.php @@ -66,10 +66,62 @@ class StompQueueManager extends QueueManager * * @fixme possibly actually do subscription here to save another * loop over all sites later? + * @fixme possibly don't assume it's the current site */ public function addSite($server) { $this->sites[] = $server; + $this->initialize(); + } + + + /** + * Instantiate the appropriate QueueHandler class for the given queue. + * + * @param string $queue + * @return mixed QueueHandler or null + */ + function getHandler($queue) + { + $handlers = $this->handlers[common_config('site', 'server')]; + if (isset($handlers[$queue])) { + $class = $handlers[$queue]; + if (class_exists($class)) { + return new $class(); + } else { + common_log(LOG_ERR, "Nonexistent handler class '$class' for queue '$queue'"); + } + } else { + common_log(LOG_ERR, "Requested handler for unkown queue '$queue'"); + } + return null; + } + + /** + * Get a list of all registered queue transport names. + * + * @return array of strings + */ + function getQueues() + { + $site = common_config('site', 'server'); + if (empty($this->handlers[$site])) { + return array(); + } else { + return array_keys($this->handlers[$site]); + } + } + + /** + * Register a queue transport name and handler class for your plugin. + * Only registered transports will be reliably picked up! + * + * @param string $transport + * @param string $class + */ + public function connect($transport, $class) + { + $this->handlers[common_config('site', 'server')][$transport] = $class; } /** diff --git a/lib/xmppmanager.php b/lib/xmppmanager.php index 0839cda57a..dfff63a30c 100644 --- a/lib/xmppmanager.php +++ b/lib/xmppmanager.php @@ -118,7 +118,11 @@ class XmppManager extends IoManager */ public function getSockets() { - return array($this->conn->getSocket()); + if ($this->conn) { + return array($this->conn->getSocket()); + } else { + return array(); + } } /** diff --git a/plugins/MemcachePlugin.php b/plugins/MemcachePlugin.php index bc7fd90765..5214ab9c89 100644 --- a/plugins/MemcachePlugin.php +++ b/plugins/MemcachePlugin.php @@ -165,20 +165,18 @@ class MemcachePlugin extends Plugin $this->_conn = new Memcache(); if (is_array($this->servers)) { - foreach ($this->servers as $server) { - list($host, $port) = @explode(';', $server); - if (empty($port)) { - $port = 11211; - } - - $this->_conn->addServer($host, $port, $this->persistent); - } + $servers = $this->servers; } else { - $this->_conn->addServer($this->servers, $this->persistent); - list($host, $port) = explode(';', $this->servers); - if (empty($port)) { + $servers = array($this->servers); + } + foreach ($servers as $server) { + if (strpos($server, ':') !== false) { + list($host, $port) = explode(':', $server); + } else { + $host = $server; $port = 11211; } + $this->_conn->addServer($host, $port, $this->persistent); } diff --git a/scripts/queuedaemon.php b/scripts/queuedaemon.php index 8ef364fe7b..162f617e0d 100755 --- a/scripts/queuedaemon.php +++ b/scripts/queuedaemon.php @@ -21,7 +21,7 @@ define('INSTALLDIR', realpath(dirname(__FILE__) . '/..')); $shortoptions = 'fi:at:'; -$longoptions = array('id=', 'foreground', 'all', 'threads='); +$longoptions = array('id=', 'foreground', 'all', 'threads=', 'skip-xmpp', 'xmpp-only'); /** * Attempts to get a count of the processors available on the current system @@ -260,6 +260,13 @@ if (!$threads) { $daemonize = !(have_option('f') || have_option('--foreground')); $all = have_option('a') || have_option('--all'); +if (have_option('--skip-xmpp')) { + define('XMPP_EMERGENCY_FLAG', true); +} +if (have_option('--xmpp-only')) { + define('XMPP_ONLY_FLAG', true); +} + $daemon = new QueueDaemon($id, $daemonize, $threads, $all); $daemon->runOnce();