switch around how XMLStream does processing

darcs-hash:20080831003210-84dde-92ccffd5b2e1d50963b18babd93c70fb1d20cdba.gz
This commit is contained in:
Evan Prodromou 2008-08-30 20:32:10 -04:00
parent ea5b129163
commit d5f83d9252
5 changed files with 64 additions and 73 deletions

View File

@ -310,24 +310,50 @@ class XMPPHP_XMLStream {
return $this->disconnected; return $this->disconnected;
} }
private function __process() { /**
$read = array($this->socket); * Core reading tool
$write = null; * 0 -> only read if data is immediately ready
$except = null; * NULL -> wait forever and ever
$updated = @stream_select($read, $write, $except, 1); * integer -> process for this amount of time
if ($updated > 0) { */
$buff = @fread($this->socket, 1024);
if(!$buff) { private function __process($maximum=0) {
if($this->reconnect) {
$this->doReconnect(); $remaining = $maximum;
} else {
fclose($this->socket); do {
return false; $starttime = microtime();
} $read = array($this->socket);
$write = array();
$except = array();
if (is_null($maximum)) {
$secs = NULL;
$usecs = NULL;
} else if ($maximum == 0) {
$secs = 0;
$usecs = 0;
} else {
$secs = $remaining / 1000000;
$usecs = $remaining % 1000000;
} }
$this->log->log("RECV: $buff", XMPPHP_Log::LEVEL_VERBOSE); $updated = @stream_select($read, $write, $except, $secs, $usecs);
xml_parse($this->parser, $buff, false); if ($updated > 0) {
} # XXX: Is this big enough?
$buff = @fread($this->socket, 4096);
if(!$buff) {
if($this->reconnect) {
$this->doReconnect();
} else {
fclose($this->socket);
return false;
}
}
$this->log->log("RECV: $buff", XMPPHP_Log::LEVEL_VERBOSE);
xml_parse($this->parser, $buff, false);
}
$remaining -= (microtime() - $starttime);
} while (is_null($maximum) || $remaining > 0);
return true;
} }
/** /**
@ -336,10 +362,7 @@ class XMPPHP_XMLStream {
* @return string * @return string
*/ */
public function process() { public function process() {
$updated = ''; $this->__process(NULL);
while(!$this->disconnect) {
$this->__process();
}
} }
/** /**
@ -348,11 +371,11 @@ class XMPPHP_XMLStream {
* @param integer $timeout * @param integer $timeout
* @return string * @return string
*/ */
public function processTime($timeout = -1) { public function processTime($timeout=NULL) {
$start = time(); if (is_null($timeout)) {
$updated = ''; return $this->__process(NULL);
while(!$this->disconnected and ($timeout == -1 or time() - $start < $timeout)) { } else {
$this->__process(); return $this->__process($timeout * 1000000);
} }
} }
@ -372,7 +395,7 @@ class XMPPHP_XMLStream {
reset($this->until); reset($this->until);
$updated = ''; $updated = '';
while(!$this->disconnected and $this->until[$event_key] and (time() - $start < $timeout or $timeout == -1)) { while(!$this->disconnected and $this->until[$event_key] and (time() - $start < $timeout or $timeout == -1)) {
$this->__process(); $this->__process(0);
} }
if(array_key_exists($event_key, $this->until_payload)) { if(array_key_exists($event_key, $this->until_payload)) {
$payload = $this->until_payload[$event_key]; $payload = $this->until_payload[$event_key];

View File

@ -81,21 +81,18 @@ class QueueHandler {
$this->log(LOG_WARNING, 'queue item for notice that does not exist'); $this->log(LOG_WARNING, 'queue item for notice that does not exist');
} }
$qi->delete(); $qi->delete();
$this->idle(); $this->idle(0);
} else { } else {
$this->clear_old_claims(); $this->clear_old_claims();
$start = microtime(); $this->idle(5);
$this->idle();
$used = microtime() - $start;
if ($used < 1000000) {
usleep(1000000 - $used);
}
} }
} while (true); } while (true);
} }
function idle() { function idle($timeout=0) {
return true; if ($timeout>0) {
sleep($timeout);
}
} }
function clear_old_claims() { function clear_old_claims() {

View File

@ -54,16 +54,8 @@ class PublicQueueHandler extends QueueHandler {
return jabber_public_notice($notice); return jabber_public_notice($notice);
} }
function idle() { function idle($timeout=0) {
$this->log(LOG_DEBUG, 'Checking the incoming message queue.'); $this->conn->processTime($timeout);
# Process the queue for a second
if ($this->conn->readyToProcess()) {
$this->log(LOG_DEBUG, 'Something in the incoming message queue; processing it.');
$this->conn->processTime(1);
$this->log(LOG_DEBUG, 'Done processing incoming message queue.');
} else {
$this->log(LOG_DEBUG, 'Nothing in the incoming message queue; skipping it.');
}
} }
function forward_message(&$pl) { function forward_message(&$pl) {

View File

@ -86,16 +86,10 @@ class XmppConfirmHandler {
continue; continue;
} }
} }
$this->idle(); $this->idle(0);
} else { } else {
# $this->clear_old_confirm_claims(); # $this->clear_old_confirm_claims();
$start = microtime(); $this->idle(10);
$this->idle();
$used = microtime() - $start;
if ($used < 10000000) {
usleep(10000000 - $used);
}
sleep(10);
} }
} while (true); } while (true);
} }
@ -137,16 +131,8 @@ class XmppConfirmHandler {
common_log($level, 'XmppConfirmHandler ('. $this->_id .'): '.$msg); common_log($level, 'XmppConfirmHandler ('. $this->_id .'): '.$msg);
} }
function idle() { function idle($timeout=0) {
$this->log(LOG_DEBUG, 'Checking the incoming message queue.'); $this->conn->processTime($timeout);
# Process the queue for a second
if ($this->conn->readyToProcess()) {
$this->log(LOG_DEBUG, 'Something in the incoming message queue; processing it.');
$this->conn->processTime(1);
$this->log(LOG_DEBUG, 'Done processing incoming message queue.');
} else {
$this->log(LOG_DEBUG, 'Nothing in the incoming message queue; skipping it.');
}
} }
function forward_message(&$pl) { function forward_message(&$pl) {

View File

@ -56,16 +56,9 @@ class XmppQueueHandler extends QueueHandler {
return jabber_broadcast_notice($notice); return jabber_broadcast_notice($notice);
} }
function idle() { function idle($timeout=0) {
$this->log(LOG_DEBUG, 'Checking the incoming message queue.');
# Process the queue for a second # Process the queue for a second
if ($this->conn->readyToProcess()) { $this->conn->processTime($timeout);
$this->log(LOG_DEBUG, 'Something in the incoming message queue; processing it.');
$this->conn->processTime(1);
$this->log(LOG_DEBUG, 'Done processing incoming message queue.');
} else {
$this->log(LOG_DEBUG, 'Nothing in the incoming message queue; skipping it.');
}
} }
function finish() { function finish() {