Queues: redid the breakout control model so we can start up and subscribe to queues without running through the complete site list, which is ok at 1k sites but too slow at 10k.

All breakout queues that we're going to need to listen to now need to be explicitly listed in $config['queue']['breakout'].

Until XMPP is moved to component model, this setting will let the individual processes work with their own queues:
$config['queue']['breakout'][] = 'xmpp/xmppout/' . $config['site']['nickname'];
This commit is contained in:
Brion Vibber 2010-02-17 16:49:00 -08:00
parent 73ba26efe3
commit ce6be4f836
4 changed files with 76 additions and 78 deletions

View File

@ -91,10 +91,13 @@ $default =
'spawndelay' => 1, // Wait at least N seconds between (re)spawns of child processes to avoid slamming the queue server with subscription startup 'spawndelay' => 1, // Wait at least N seconds between (re)spawns of child processes to avoid slamming the queue server with subscription startup
'debug_memory' => false, // true to spit memory usage to log 'debug_memory' => false, // true to spit memory usage to log
'inboxes' => true, // true to do inbox distribution & output queueing from in background via 'distrib' queue 'inboxes' => true, // true to do inbox distribution & output queueing from in background via 'distrib' queue
'breakout' => array('*' => 'shared'), // set global or per-handler queue breakout 'breakout' => array(), // List queue specifiers to break out when using Stomp queue.
// 'shared': use a shared queue for all sites // Default will share all queues for all sites within each group.
// 'handler': share each/this handler over multiple sites // Specify as <group>/<queue> or <group>/<queue>/<site>,
// 'site': break out for each/this handler on this site // using nickname identifier as site.
//
// 'main/distrib' separate "distrib" queue covering all sites
// 'xmpp/xmppout/mysite' separate "xmppout" queue covering just 'mysite'
'max_retries' => 10, // drop messages after N failed attempts to process (Stomp) 'max_retries' => 10, // drop messages after N failed attempts to process (Stomp)
'dead_letter_dir' => false, // set to directory to save dropped messages into (Stomp) 'dead_letter_dir' => false, // set to directory to save dropped messages into (Stomp)
), ),

View File

@ -55,27 +55,18 @@ abstract class IoMaster
if ($multiSite !== null) { if ($multiSite !== null) {
$this->multiSite = $multiSite; $this->multiSite = $multiSite;
} }
if ($this->multiSite) {
$this->sites = StatusNet::findAllSites();
} else {
$this->sites = array(StatusNet::currentSite());
}
if (empty($this->sites)) { $this->initManagers();
throw new Exception("Empty status_network table, cannot init");
}
foreach ($this->sites as $site) {
StatusNet::switchSite($site);
$this->initManagers();
}
} }
/** /**
* Initialize IoManagers for the currently configured site * Initialize IoManagers which are appropriate to this instance;
* which are appropriate to this instance. * pass class names or instances into $this->instantiate().
* *
* Pass class names into $this->instantiate() * If setup and configuration may vary between sites in multi-site
* mode, it's the subclass's responsibility to set them up here.
*
* Switching site configurations is an acceptable side effect.
*/ */
abstract function initManagers(); abstract function initManagers();

View File

@ -63,7 +63,7 @@ class StompQueueManager extends QueueManager
$this->password = common_config('queue', 'stomp_password'); $this->password = common_config('queue', 'stomp_password');
$this->base = common_config('queue', 'queue_basename'); $this->base = common_config('queue', 'queue_basename');
$this->control = common_config('queue', 'control_channel'); $this->control = common_config('queue', 'control_channel');
$this->subscriptions = array($this->control => $this->control); $this->breakout = common_config('queue', 'breakout');
} }
/** /**
@ -75,28 +75,6 @@ class StompQueueManager extends QueueManager
return IoManager::INSTANCE_PER_PROCESS; return IoManager::INSTANCE_PER_PROCESS;
} }
/**
* Record queue subscriptions we'll need to handle the current site.
*/
public function addSite()
{
$this->sites[] = StatusNet::currentSite();
// Set up handlers active for this site...
$this->initialize();
foreach ($this->activeGroups as $group) {
if (isset($this->groups[$group])) {
// Actual queues may be broken out or consolidated...
// Subscribe to all the target queues we'll need.
foreach ($this->groups[$group] as $transport => $class) {
$target = $this->queueName($transport);
$this->subscriptions[$target] = $target;
}
}
}
}
/** /**
* Optional; ping any running queue handler daemons with a notification * Optional; ping any running queue handler daemons with a notification
* such as announcing a new site to handle or requesting clean shutdown. * such as announcing a new site to handle or requesting clean shutdown.
@ -166,14 +144,15 @@ class StompQueueManager extends QueueManager
$con = $this->cons[$idx]; $con = $this->cons[$idx];
$host = $con->getServer(); $host = $con->getServer();
$result = $con->send($this->queueName($queue), $msg, $props); $target = $this->queueName($queue);
$result = $con->send($target, $msg, $props);
if (!$result) { if (!$result) {
$this->_log(LOG_ERR, "Error sending $rep to $queue queue on $host"); $this->_log(LOG_ERR, "Error sending $rep to $queue queue on $host $target");
return false; return false;
} }
$this->_log(LOG_DEBUG, "complete remote queueing $rep for $queue on $host"); $this->_log(LOG_DEBUG, "complete remote queueing $rep for $queue on $host $target");
$this->stats('enqueued', $queue); $this->stats('enqueued', $queue);
return true; return true;
} }
@ -432,11 +411,42 @@ class StompQueueManager extends QueueManager
protected function doSubscribe(LiberalStomp $con) protected function doSubscribe(LiberalStomp $con)
{ {
$host = $con->getServer(); $host = $con->getServer();
foreach ($this->subscriptions as $queue) { foreach ($this->subscriptions() as $sub) {
$this->_log(LOG_INFO, "Subscribing to $queue on $host"); $this->_log(LOG_INFO, "Subscribing to $sub on $host");
$con->subscribe($queue); $con->subscribe($sub);
} }
} }
/**
* Grab a full list of stomp-side queue subscriptions.
* Will include:
* - control broadcast channel
* - shared group queues for active groups
* - per-handler and per-site breakouts from $config['queue']['breakout']
* that are rooted in the active groups.
*
* @return array of strings
*/
protected function subscriptions()
{
$subs = array();
$subs[] = $this->control;
foreach ($this->activeGroups as $group) {
$subs[] = $this->base . $group;
}
foreach ($this->breakout as $spec) {
$parts = explode('/', $spec);
if (count($parts) < 2 || count($parts) > 3) {
common_log(LOG_ERR, "Bad queue breakout specifier $spec");
}
if (in_array($parts[0], $this->activeGroups)) {
$subs[] = $this->base . $spec;
}
}
return array_unique($subs);
}
/** /**
* Handle and acknowledge an event that's come in through a queue. * Handle and acknowledge an event that's come in through a queue.
@ -612,32 +622,26 @@ class StompQueueManager extends QueueManager
} }
/** /**
* Set us up with queue subscriptions for a new site added at runtime, * (Re)load runtime configuration for a given site by nickname,
* triggered by a broadcast to the 'statusnet-control' topic. * triggered by a broadcast to the 'statusnet-control' topic.
* *
* Configuration changes in database should update, but config
* files might not.
*
* @param array $frame Stomp frame * @param array $frame Stomp frame
* @return bool true to continue; false to stop further processing. * @return bool true to continue; false to stop further processing.
*/ */
protected function updateSiteConfig($nickname) protected function updateSiteConfig($nickname)
{ {
if (empty($this->sites)) { $sn = Status_network::staticGet($nickname);
if ($nickname == common_config('site', 'nickname')) { if ($sn) {
StatusNet::init(common_config('site', 'server')); $this->switchSite($nickname);
} else { if (!in_array($nickname, $this->sites)) {
$this->_log(LOG_INFO, "Ignoring update ping for other site $nickname"); $this->addSite();
} }
$this->stats('siteupdate');
} else { } else {
$sn = Status_network::staticGet($nickname); $this->_log(LOG_ERR, "Ignoring ping for unrecognized new site $nickname");
if ($sn) {
$this->switchSite($nickname);
if (!in_array($nickname, $this->sites)) {
$this->addSite();
}
// @fixme update subscriptions, if applicable
$this->stats('siteupdate');
} else {
$this->_log(LOG_ERR, "Ignoring ping for unrecognized new site $nickname");
}
} }
} }
@ -646,24 +650,25 @@ class StompQueueManager extends QueueManager
* group name for this queue to give eg: * group name for this queue to give eg:
* *
* /queue/statusnet/main * /queue/statusnet/main
* /queue/statusnet/main/distrib
* /queue/statusnet/xmpp/xmppout/site01
* *
* @param string $queue * @param string $queue
* @return string * @return string
*/ */
protected function queueName($queue) protected function queueName($queue)
{ {
$base = common_config('queue', 'queue_basename');
$group = $this->queueGroup($queue); $group = $this->queueGroup($queue);
$breakout = $this->breakoutMode($queue); $site = StatusNet::currentSite();
if ($breakout == 'shared') {
return $base . "$group"; $specs = array("$group/$queue/$site",
} else if ($breakout == 'handler') { "$group/$queue");
return $base . "$group/$queue"; foreach ($specs as $spec) {
} else if ($breakout == 'site') { if (in_array($spec, $this->breakout)) {
$site = StatusNet::currentSite(); return $this->base . $spec;
return $base . "$group/$queue/$site"; }
} }
throw Exception("Unrecognized queue breakout mode '$breakout' for '$queue'"); return $this->base . $group;
} }
/** /**

View File

@ -126,8 +126,7 @@ class QueueDaemon extends SpawningDaemon
class QueueMaster extends IoMaster class QueueMaster extends IoMaster
{ {
/** /**
* Initialize IoManagers for the currently configured site * Initialize IoManagers which are appropriate to this instance.
* which are appropriate to this instance.
*/ */
function initManagers() function initManagers()
{ {