Control channel for queue daemons to request graceful shutdown, restart, or update to listen to a newly added or reconfigured site.

queuectl.php --update -s<site>
  queuectl.php --stop
  queuectl.php --restart

Default control channel is /topic/statusnet-control. For external utilities to send a site update ping direct to the queue server, connect via Stomp and send a message formatted thus:

  update:<nickname>

(Nickname here, *not* server hostname! The rest of the queues will be updated to use nicknames later.)

Note that all currently-connected queue daemons will get these notifications, including both queuedaemon.php and xmppdaemon.php. (XMPP will ignore site update requests for sites that it's not handling.)

Limitations:
* only implemented for stomp queue manager so far
* --update may not yet handle a changed server name properly
* --restart won't reload PHP code files that were already loaded at startup. Still need to stop and restart the daemons from 'outside' when updating code base.
This commit is contained in:
Brion Vibber
2010-01-26 11:49:49 -08:00
parent ad6f0501ff
commit 58be61b641
9 changed files with 347 additions and 41 deletions

View File

@@ -36,6 +36,11 @@ abstract class SpawningDaemon extends Daemon
{
protected $threads=1;
const EXIT_OK = 0;
const EXIT_ERR = 1;
const EXIT_SHUTDOWN = 100;
const EXIT_RESTART = 101;
function __construct($id=null, $daemonize=true, $threads=1)
{
parent::__construct($daemonize);
@@ -49,7 +54,7 @@ abstract class SpawningDaemon extends Daemon
/**
* Perform some actual work!
*
* @return boolean true on success, false on failure
* @return int exit code; use self::EXIT_SHUTDOWN to request not to respawn.
*/
public abstract function runThread();
@@ -84,23 +89,30 @@ abstract class SpawningDaemon extends Daemon
while (count($children) > 0) {
$status = null;
$pid = pcntl_wait($status);
if ($pid > 0) {
if ($pid > 0 && pcntl_wifexited($status)) {
$exitCode = pcntl_wexitstatus($status);
$i = array_search($pid, $children);
if ($i === false) {
$this->log(LOG_ERR, "Unrecognized child pid $pid exited!");
$this->log(LOG_ERR, "Unrecognized child pid $pid exited with status $exitCode");
continue;
}
unset($children[$i]);
$this->log(LOG_INFO, "Thread $i pid $pid exited.");
$pid = pcntl_fork();
if ($pid < 0) {
$this->log(LOG_ERROR, "Couldn't fork to respawn thread $i; aborting thread.\n");
} else if ($pid == 0) {
$this->initAndRunChild($i);
if ($this->shouldRespawn($exitCode)) {
$this->log(LOG_INFO, "Thread $i pid $pid exited with status $exitCode; respawing.");
$pid = pcntl_fork();
if ($pid < 0) {
$this->log(LOG_ERROR, "Couldn't fork to respawn thread $i; aborting thread.\n");
} else if ($pid == 0) {
$this->initAndRunChild($i);
} else {
$this->log(LOG_INFO, "Respawned thread $i as pid $pid");
$children[$i] = $pid;
}
} else {
$this->log(LOG_INFO, "Respawned thread $i as pid $pid");
$children[$i] = $pid;
$this->log(LOG_INFO, "Thread $i pid $pid exited with status $exitCode; closing out thread.");
}
}
}
@@ -108,6 +120,24 @@ abstract class SpawningDaemon extends Daemon
return true;
}
/**
* Determine whether to respawn an exited subprocess based on its exit code.
* Otherwise we'll respawn all exits by default.
*
* @param int $exitCode
* @return boolean true to respawn
*/
protected function shouldRespawn($exitCode)
{
if ($exitCode == self::EXIT_SHUTDOWN) {
// Thread requested a clean shutdown.
return false;
} else {
// Otherwise we should always respawn!
return true;
}
}
/**
* Initialize things for a fresh thread, call runThread(), and
* exit at completion with appropriate return value.
@@ -116,8 +146,8 @@ abstract class SpawningDaemon extends Daemon
{
$this->set_id($this->get_id() . "." . $thread);
$this->resetDb();
$ok = $this->runThread();
exit($ok ? 0 : 1);
$exitCode = $this->runThread();
exit($exitCode);
}
/**