forked from GNUsocial/gnu-social
		
	Merge branch 'master' of git@gitorious.org:statusnet/mainline
This commit is contained in:
		| @@ -91,10 +91,13 @@ $default = | ||||
|               'spawndelay' => 1, // Wait at least N seconds between (re)spawns of child processes to avoid slamming the queue server with subscription startup | ||||
|               'debug_memory' => false, // true to spit memory usage to log | ||||
|               'inboxes' => true, // true to do inbox distribution & output queueing from in background via 'distrib' queue | ||||
|               'breakout' => array('*' => 'shared'), // set global or per-handler queue breakout | ||||
|                                       // 'shared': use a shared queue for all sites | ||||
|                                       // 'handler': share each/this handler over multiple sites | ||||
|                                       // 'site': break out for each/this handler on this site | ||||
|               'breakout' => array(), // List queue specifiers to break out when using Stomp queue. | ||||
|                                      // Default will share all queues for all sites within each group. | ||||
|                                      // Specify as <group>/<queue> or <group>/<queue>/<site>, | ||||
|                                      // using nickname identifier as site. | ||||
|                                      // | ||||
|                                      // 'main/distrib' separate "distrib" queue covering all sites | ||||
|                                      // 'xmpp/xmppout/mysite' separate "xmppout" queue covering just 'mysite' | ||||
|               'max_retries' => 10, // drop messages after N failed attempts to process (Stomp) | ||||
|               'dead_letter_dir' => false, // set to directory to save dropped messages into (Stomp) | ||||
|               ), | ||||
|   | ||||
| @@ -55,27 +55,18 @@ abstract class IoMaster | ||||
|         if ($multiSite !== null) { | ||||
|             $this->multiSite = $multiSite; | ||||
|         } | ||||
|         if ($this->multiSite) { | ||||
|             $this->sites = StatusNet::findAllSites(); | ||||
|         } else { | ||||
|             $this->sites = array(StatusNet::currentSite()); | ||||
|         } | ||||
|  | ||||
|         if (empty($this->sites)) { | ||||
|             throw new Exception("Empty status_network table, cannot init"); | ||||
|         } | ||||
|  | ||||
|         foreach ($this->sites as $site) { | ||||
|             StatusNet::switchSite($site); | ||||
|             $this->initManagers(); | ||||
|         } | ||||
|         $this->initManagers(); | ||||
|     } | ||||
|  | ||||
|     /** | ||||
|      * Initialize IoManagers for the currently configured site | ||||
|      * which are appropriate to this instance. | ||||
|      * Initialize IoManagers which are appropriate to this instance; | ||||
|      * pass class names or instances into $this->instantiate(). | ||||
|      * | ||||
|      * Pass class names into $this->instantiate() | ||||
|      * If setup and configuration may vary between sites in multi-site | ||||
|      * mode, it's the subclass's responsibility to set them up here. | ||||
|      * | ||||
|      * Switching site configurations is an acceptable side effect. | ||||
|      */ | ||||
|     abstract function initManagers(); | ||||
|  | ||||
|   | ||||
| @@ -63,7 +63,7 @@ class StompQueueManager extends QueueManager | ||||
|         $this->password = common_config('queue', 'stomp_password'); | ||||
|         $this->base     = common_config('queue', 'queue_basename'); | ||||
|         $this->control  = common_config('queue', 'control_channel'); | ||||
|         $this->subscriptions = array($this->control => $this->control); | ||||
|         $this->breakout = common_config('queue', 'breakout'); | ||||
|     } | ||||
|  | ||||
|     /** | ||||
| @@ -75,28 +75,6 @@ class StompQueueManager extends QueueManager | ||||
|         return IoManager::INSTANCE_PER_PROCESS; | ||||
|     } | ||||
|  | ||||
|     /** | ||||
|      * Record queue subscriptions we'll need to handle the current site. | ||||
|      */ | ||||
|     public function addSite() | ||||
|     { | ||||
|         $this->sites[] = StatusNet::currentSite(); | ||||
|  | ||||
|         // Set up handlers active for this site... | ||||
|         $this->initialize(); | ||||
|  | ||||
|         foreach ($this->activeGroups as $group) { | ||||
|             if (isset($this->groups[$group])) { | ||||
|                 // Actual queues may be broken out or consolidated... | ||||
|                 // Subscribe to all the target queues we'll need. | ||||
|                 foreach ($this->groups[$group] as $transport => $class) { | ||||
|                     $target = $this->queueName($transport); | ||||
|                     $this->subscriptions[$target] = $target; | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     /** | ||||
|      * Optional; ping any running queue handler daemons with a notification | ||||
|      * such as announcing a new site to handle or requesting clean shutdown. | ||||
| @@ -166,14 +144,15 @@ class StompQueueManager extends QueueManager | ||||
|  | ||||
|         $con = $this->cons[$idx]; | ||||
|         $host = $con->getServer(); | ||||
|         $result = $con->send($this->queueName($queue), $msg, $props); | ||||
|         $target = $this->queueName($queue); | ||||
|         $result = $con->send($target, $msg, $props); | ||||
|  | ||||
|         if (!$result) { | ||||
|             $this->_log(LOG_ERR, "Error sending $rep to $queue queue on $host"); | ||||
|             $this->_log(LOG_ERR, "Error sending $rep to $queue queue on $host $target"); | ||||
|             return false; | ||||
|         } | ||||
|  | ||||
|         $this->_log(LOG_DEBUG, "complete remote queueing $rep for $queue on $host"); | ||||
|         $this->_log(LOG_DEBUG, "complete remote queueing $rep for $queue on $host $target"); | ||||
|         $this->stats('enqueued', $queue); | ||||
|         return true; | ||||
|     } | ||||
| @@ -432,11 +411,42 @@ class StompQueueManager extends QueueManager | ||||
|     protected function doSubscribe(LiberalStomp $con) | ||||
|     { | ||||
|         $host = $con->getServer(); | ||||
|         foreach ($this->subscriptions as $queue) { | ||||
|             $this->_log(LOG_INFO, "Subscribing to $queue on $host"); | ||||
|             $con->subscribe($queue); | ||||
|         foreach ($this->subscriptions() as $sub) { | ||||
|             $this->_log(LOG_INFO, "Subscribing to $sub on $host"); | ||||
|             $con->subscribe($sub); | ||||
|         } | ||||
|     } | ||||
|      | ||||
|     /** | ||||
|      * Grab a full list of stomp-side queue subscriptions. | ||||
|      * Will include: | ||||
|      *  - control broadcast channel | ||||
|      *  - shared group queues for active groups | ||||
|      *  - per-handler and per-site breakouts from $config['queue']['breakout'] | ||||
|      *    that are rooted in the active groups. | ||||
|      * | ||||
|      * @return array of strings | ||||
|      */ | ||||
|     protected function subscriptions() | ||||
|     { | ||||
|         $subs = array(); | ||||
|         $subs[] = $this->control; | ||||
|  | ||||
|         foreach ($this->activeGroups as $group) { | ||||
|             $subs[] = $this->base . $group; | ||||
|         } | ||||
|  | ||||
|         foreach ($this->breakout as $spec) { | ||||
|             $parts = explode('/', $spec); | ||||
|             if (count($parts) < 2 || count($parts) > 3) { | ||||
|                 common_log(LOG_ERR, "Bad queue breakout specifier $spec"); | ||||
|             } | ||||
|             if (in_array($parts[0], $this->activeGroups)) { | ||||
|                 $subs[] = $this->base . $spec; | ||||
|             } | ||||
|         } | ||||
|         return array_unique($subs); | ||||
|     } | ||||
|  | ||||
|     /** | ||||
|      * Handle and acknowledge an event that's come in through a queue. | ||||
| @@ -612,32 +622,26 @@ class StompQueueManager extends QueueManager | ||||
|     } | ||||
|  | ||||
|     /** | ||||
|      * Set us up with queue subscriptions for a new site added at runtime, | ||||
|      * (Re)load runtime configuration for a given site by nickname, | ||||
|      * triggered by a broadcast to the 'statusnet-control' topic. | ||||
|      * | ||||
|      * Configuration changes in database should update, but config | ||||
|      * files might not. | ||||
|      * | ||||
|      * @param array $frame Stomp frame | ||||
|      * @return bool true to continue; false to stop further processing. | ||||
|      */ | ||||
|     protected function updateSiteConfig($nickname) | ||||
|     { | ||||
|         if (empty($this->sites)) { | ||||
|             if ($nickname == common_config('site', 'nickname')) { | ||||
|                 StatusNet::init(common_config('site', 'server')); | ||||
|             } else { | ||||
|                 $this->_log(LOG_INFO, "Ignoring update ping for other site $nickname"); | ||||
|         $sn = Status_network::staticGet($nickname); | ||||
|         if ($sn) { | ||||
|             $this->switchSite($nickname); | ||||
|             if (!in_array($nickname, $this->sites)) { | ||||
|                 $this->addSite(); | ||||
|             } | ||||
|             $this->stats('siteupdate'); | ||||
|         } else { | ||||
|             $sn = Status_network::staticGet($nickname); | ||||
|             if ($sn) { | ||||
|                 $this->switchSite($nickname); | ||||
|                 if (!in_array($nickname, $this->sites)) { | ||||
|                     $this->addSite(); | ||||
|                 } | ||||
|                 // @fixme update subscriptions, if applicable | ||||
|                 $this->stats('siteupdate'); | ||||
|             } else { | ||||
|                 $this->_log(LOG_ERR, "Ignoring ping for unrecognized new site $nickname"); | ||||
|             } | ||||
|             $this->_log(LOG_ERR, "Ignoring ping for unrecognized new site $nickname"); | ||||
|         } | ||||
|     } | ||||
|  | ||||
| @@ -646,24 +650,25 @@ class StompQueueManager extends QueueManager | ||||
|      * group name for this queue to give eg: | ||||
|      * | ||||
|      * /queue/statusnet/main | ||||
|      * /queue/statusnet/main/distrib | ||||
|      * /queue/statusnet/xmpp/xmppout/site01 | ||||
|      * | ||||
|      * @param string $queue | ||||
|      * @return string | ||||
|      */ | ||||
|     protected function queueName($queue) | ||||
|     { | ||||
|         $base = common_config('queue', 'queue_basename'); | ||||
|         $group = $this->queueGroup($queue); | ||||
|         $breakout = $this->breakoutMode($queue); | ||||
|         if ($breakout == 'shared') { | ||||
|             return $base . "$group"; | ||||
|         } else if ($breakout == 'handler') { | ||||
|             return $base . "$group/$queue"; | ||||
|         } else if ($breakout == 'site') { | ||||
|             $site = StatusNet::currentSite(); | ||||
|             return $base . "$group/$queue/$site"; | ||||
|         $site = StatusNet::currentSite(); | ||||
|  | ||||
|         $specs = array("$group/$queue/$site", | ||||
|                        "$group/$queue"); | ||||
|         foreach ($specs as $spec) { | ||||
|             if (in_array($spec, $this->breakout)) { | ||||
|                 return $this->base . $spec; | ||||
|             } | ||||
|         } | ||||
|         throw Exception("Unrecognized queue breakout mode '$breakout' for '$queue'"); | ||||
|         return $this->base . $group; | ||||
|     } | ||||
|  | ||||
|     /** | ||||
|   | ||||
| @@ -126,8 +126,7 @@ class QueueDaemon extends SpawningDaemon | ||||
| class QueueMaster extends IoMaster | ||||
| { | ||||
|     /** | ||||
|      * Initialize IoManagers for the currently configured site | ||||
|      * which are appropriate to this instance. | ||||
|      * Initialize IoManagers which are appropriate to this instance. | ||||
|      */ | ||||
|     function initManagers() | ||||
|     { | ||||
|   | ||||
		Reference in New Issue
	
	Block a user