Merge branch 'master' of git@gitorious.org:statusnet/mainline

This commit is contained in:
Evan Prodromou 2010-01-16 07:48:59 -08:00
commit 21c3e08804
7 changed files with 102 additions and 29 deletions

View File

@ -154,18 +154,24 @@ class Inbox extends Memcached_DataObject
$ids = unpack('N*', $inbox->notice_ids); $ids = unpack('N*', $inbox->notice_ids);
if (!empty($since_id)) { if (!empty($since_id)) {
$i = array_search($since_id, $ids); $newids = array();
if ($i !== false) { foreach ($ids as $id) {
$ids = array_slice($ids, 0, $i - 1); if ($id > $since_id) {
$newids[] = $id;
} }
} }
$ids = $newids;
}
if (!empty($max_id)) { if (!empty($max_id)) {
$i = array_search($max_id, $ids); $newids = array();
if ($i !== false) { foreach ($ids as $id) {
$ids = array_slice($ids, $i - 1); if ($id <= $max_id) {
$newids[] = $id;
} }
} }
$ids = $newids;
}
$ids = array_slice($ids, $offset, $limit); $ids = array_slice($ids, $offset, $limit);

View File

@ -70,7 +70,7 @@ class IoMaster
$classes = array(); $classes = array();
if (Event::handle('StartIoManagerClasses', array(&$classes))) { if (Event::handle('StartIoManagerClasses', array(&$classes))) {
$classes[] = 'QueueManager'; $classes[] = 'QueueManager';
if (common_config('xmpp', 'enabled')) { if (common_config('xmpp', 'enabled') && !defined('XMPP_EMERGENCY_FLAG')) {
$classes[] = 'XmppManager'; // handles pings/reconnects $classes[] = 'XmppManager'; // handles pings/reconnects
$classes[] = 'XmppConfirmManager'; // polls for outgoing confirmations $classes[] = 'XmppConfirmManager'; // polls for outgoing confirmations
} }

View File

@ -149,15 +149,17 @@ abstract class QueueManager extends IoManager
function initialize() function initialize()
{ {
if (Event::handle('StartInitializeQueueManager', array($this))) { if (Event::handle('StartInitializeQueueManager', array($this))) {
if (!defined('XMPP_ONLY_FLAG')) { // hack!
$this->connect('plugin', 'PluginQueueHandler'); $this->connect('plugin', 'PluginQueueHandler');
$this->connect('omb', 'OmbQueueHandler'); $this->connect('omb', 'OmbQueueHandler');
$this->connect('ping', 'PingQueueHandler'); $this->connect('ping', 'PingQueueHandler');
if (common_config('sms', 'enabled')) { if (common_config('sms', 'enabled')) {
$this->connect('sms', 'SmsQueueHandler'); $this->connect('sms', 'SmsQueueHandler');
} }
}
// XMPP output handlers... // XMPP output handlers...
if (common_config('xmpp', 'enabled')) { if (common_config('xmpp', 'enabled') && !defined('XMPP_EMERGENCY_FLAG')) {
$this->connect('jabber', 'JabberQueueHandler'); $this->connect('jabber', 'JabberQueueHandler');
$this->connect('public', 'PublicQueueHandler'); $this->connect('public', 'PublicQueueHandler');
@ -165,11 +167,15 @@ abstract class QueueManager extends IoManager
$this->connect('confirm', 'XmppConfirmHandler'); $this->connect('confirm', 'XmppConfirmHandler');
} }
if (!defined('XMPP_ONLY_FLAG')) { // hack!
// For compat with old plugins not registering their own handlers. // For compat with old plugins not registering their own handlers.
$this->connect('plugin', 'PluginQueueHandler'); $this->connect('plugin', 'PluginQueueHandler');
} }
}
if (!defined('XMPP_ONLY_FLAG')) { // hack!
Event::handle('EndInitializeQueueManager', array($this)); Event::handle('EndInitializeQueueManager', array($this));
} }
}
/** /**
* Register a queue transport name and handler class for your plugin. * Register a queue transport name and handler class for your plugin.

View File

@ -66,10 +66,62 @@ class StompQueueManager extends QueueManager
* *
* @fixme possibly actually do subscription here to save another * @fixme possibly actually do subscription here to save another
* loop over all sites later? * loop over all sites later?
* @fixme possibly don't assume it's the current site
*/ */
public function addSite($server) public function addSite($server)
{ {
$this->sites[] = $server; $this->sites[] = $server;
$this->initialize();
}
/**
* Instantiate the appropriate QueueHandler class for the given queue.
*
* @param string $queue
* @return mixed QueueHandler or null
*/
function getHandler($queue)
{
$handlers = $this->handlers[common_config('site', 'server')];
if (isset($handlers[$queue])) {
$class = $handlers[$queue];
if (class_exists($class)) {
return new $class();
} else {
common_log(LOG_ERR, "Nonexistent handler class '$class' for queue '$queue'");
}
} else {
common_log(LOG_ERR, "Requested handler for unkown queue '$queue'");
}
return null;
}
/**
* Get a list of all registered queue transport names.
*
* @return array of strings
*/
function getQueues()
{
$site = common_config('site', 'server');
if (empty($this->handlers[$site])) {
return array();
} else {
return array_keys($this->handlers[$site]);
}
}
/**
* Register a queue transport name and handler class for your plugin.
* Only registered transports will be reliably picked up!
*
* @param string $transport
* @param string $class
*/
public function connect($transport, $class)
{
$this->handlers[common_config('site', 'server')][$transport] = $class;
} }
/** /**

View File

@ -118,7 +118,11 @@ class XmppManager extends IoManager
*/ */
public function getSockets() public function getSockets()
{ {
if ($this->conn) {
return array($this->conn->getSocket()); return array($this->conn->getSocket());
} else {
return array();
}
} }
/** /**

View File

@ -165,22 +165,20 @@ class MemcachePlugin extends Plugin
$this->_conn = new Memcache(); $this->_conn = new Memcache();
if (is_array($this->servers)) { if (is_array($this->servers)) {
foreach ($this->servers as $server) { $servers = $this->servers;
list($host, $port) = @explode(';', $server); } else {
if (empty($port)) { $servers = array($this->servers);
}
foreach ($servers as $server) {
if (strpos($server, ':') !== false) {
list($host, $port) = explode(':', $server);
} else {
$host = $server;
$port = 11211; $port = 11211;
} }
$this->_conn->addServer($host, $port, $this->persistent); $this->_conn->addServer($host, $port, $this->persistent);
} }
} else {
$this->_conn->addServer($this->servers, $this->persistent);
list($host, $port) = explode(';', $this->servers);
if (empty($port)) {
$port = 11211;
}
$this->_conn->addServer($host, $port, $this->persistent);
}
// Compress items stored in the cache if they're over threshold in size // Compress items stored in the cache if they're over threshold in size
// (default 2KiB) and the compression would save more than min savings // (default 2KiB) and the compression would save more than min savings

View File

@ -21,7 +21,7 @@
define('INSTALLDIR', realpath(dirname(__FILE__) . '/..')); define('INSTALLDIR', realpath(dirname(__FILE__) . '/..'));
$shortoptions = 'fi:at:'; $shortoptions = 'fi:at:';
$longoptions = array('id=', 'foreground', 'all', 'threads='); $longoptions = array('id=', 'foreground', 'all', 'threads=', 'skip-xmpp', 'xmpp-only');
/** /**
* Attempts to get a count of the processors available on the current system * Attempts to get a count of the processors available on the current system
@ -260,6 +260,13 @@ if (!$threads) {
$daemonize = !(have_option('f') || have_option('--foreground')); $daemonize = !(have_option('f') || have_option('--foreground'));
$all = have_option('a') || have_option('--all'); $all = have_option('a') || have_option('--all');
if (have_option('--skip-xmpp')) {
define('XMPP_EMERGENCY_FLAG', true);
}
if (have_option('--xmpp-only')) {
define('XMPP_ONLY_FLAG', true);
}
$daemon = new QueueDaemon($id, $daemonize, $threads, $all); $daemon = new QueueDaemon($id, $daemonize, $threads, $all);
$daemon->runOnce(); $daemon->runOnce();