use select() to bring down xmpp latency

This commit is contained in:
Evan Prodromou 2009-07-09 12:09:20 -04:00
parent 1daad01f36
commit 03200235b1
4 changed files with 73 additions and 24 deletions

View File

@ -77,6 +77,14 @@ function jabber_daemon_address()
return common_config('xmpp', 'user') . '@' . common_config('xmpp', 'server');
}
class Sharing_XMPP extends XMPPHP_XMPP
{
function getSocket()
{
return $this->socket;
}
}
/**
* connect the configured Jabber account to the configured server
*
@ -89,7 +97,7 @@ function jabber_connect($resource=null)
{
static $conn = null;
if (!$conn) {
$conn = new XMPPHP_XMPP(common_config('xmpp', 'host') ?
$conn = new Sharing_XMPP(common_config('xmpp', 'host') ?
common_config('xmpp', 'host') :
common_config('xmpp', 'server'),
common_config('xmpp', 'port'),

View File

@ -115,5 +115,10 @@ class QueueHandler extends Daemon
{
common_log($level, $this->class_name() . ' ('. $this->get_id() .'): '.$msg);
}
function getSockets()
{
return array();
}
}

View File

@ -30,6 +30,14 @@
require_once 'Stomp.php';
class LiberalStomp extends Stomp
{
function getSocket()
{
return $this->_socket;
}
}
class StompQueueManager
{
var $server = null;
@ -50,7 +58,7 @@ class StompQueueManager
{
if (empty($this->con)) {
$this->_log(LOG_INFO, "Connecting to '$this->server' as '$this->username'...");
$this->con = new Stomp($this->server);
$this->con = new LiberalStomp($this->server);
if ($this->con->connect($this->username, $this->password)) {
$this->_log(LOG_INFO, "Connected.");
@ -94,34 +102,57 @@ class StompQueueManager
while (true) {
$frame = $this->con->readFrame();
// Wait for something on one of our sockets
if (!empty($frame)) {
$notice = Notice::staticGet('id', $frame->body);
$stompsock = $this->con->getSocket();
if (empty($notice)) {
$this->_log(LOG_WARNING, 'Got ID '. $frame->body .' for non-existent notice in queue '. $queue);
$this->con->ack($frame);
} else {
if ($handler->handle_notice($notice)) {
$this->_log(LOG_INFO, 'Successfully handled notice '. $notice->id .' posted at ' . $frame->headers['created'] . ' in queue '. $queue);
$handsocks = $handler->getSockets();
$this->_log(LOG_DEBUG, "Got ".count($handsocks)." sockets from handler.");
$this->_log(LOG_DEBUG, print_r($handsocks, true));
$socks = array_merge(array($stompsock), $handsocks);
$read = $socks;
$write = array();
$except = array();
$this->_log(LOG_DEBUG, "Starting select");
$ready = stream_select($read, $write, $except, $handler->timeout(), 0);
$this->_log(LOG_DEBUG, "Finished select with value '$ready'");
if (!$ready || $read[0] !== $stompsock) {
$handler->idle(QUEUE_HANDLER_MISS_IDLE);
} else {
$frame = $this->con->readFrame();
if (!empty($frame)) {
$notice = Notice::staticGet('id', $frame->body);
if (empty($notice)) {
$this->_log(LOG_WARNING, 'Got ID '. $frame->body .' for non-existent notice in queue '. $queue);
$this->con->ack($frame);
} else {
$this->_log(LOG_WARNING, 'Failed handling notice '. $notice->id .' posted at ' . $frame->headers['created'] . ' in queue '. $queue);
// FIXME we probably shouldn't have to do
// this kind of queue management ourselves
$this->con->ack($frame);
$this->enqueue($notice, $queue);
if ($handler->handle_notice($notice)) {
$this->_log(LOG_INFO, 'Successfully handled notice '. $notice->id .' posted at ' . $frame->headers['created'] . ' in queue '. $queue);
$this->con->ack($frame);
} else {
$this->_log(LOG_WARNING, 'Failed handling notice '. $notice->id .' posted at ' . $frame->headers['created'] . ' in queue '. $queue);
// FIXME we probably shouldn't have to do
// this kind of queue management ourselves
$this->con->ack($frame);
$this->enqueue($notice, $queue);
}
unset($notice);
}
unset($notice);
unset($frame);
$handler->idle(QUEUE_HANDLER_HIT_IDLE);
} else {
$handler->idle(QUEUE_HANDLER_MISS_IDLE);
}
unset($frame);
$handler->idle(QUEUE_HANDLER_HIT_IDLE);
} else {
$handler->idle(QUEUE_HANDLER_MISS_IDLE);
}
}

View File

@ -126,4 +126,9 @@ class XmppQueueHandler extends QueueHandler
return jabber_daemon_address() . '/' . common_config('xmpp','resource') . 'daemon';
}
}
function getSockets()
{
return array($this->conn->getSocket());
}
}