From d5f83d92521dbf838b1fc1fad2716efc3122c6b7 Mon Sep 17 00:00:00 2001 From: Evan Prodromou Date: Sat, 30 Aug 2008 20:32:10 -0400 Subject: [PATCH] switch around how XMLStream does processing darcs-hash:20080831003210-84dde-92ccffd5b2e1d50963b18babd93c70fb1d20cdba.gz --- extlib/XMPPHP/XMLStream.php | 77 ++++++++++++++++++++++------------ lib/queuehandler.php | 15 +++---- scripts/publicqueuehandler.php | 12 +----- scripts/xmppconfirmhandler.php | 22 ++-------- scripts/xmppqueuehandler.php | 11 +---- 5 files changed, 64 insertions(+), 73 deletions(-) diff --git a/extlib/XMPPHP/XMLStream.php b/extlib/XMPPHP/XMLStream.php index 1190167681..3f85ed0f86 100644 --- a/extlib/XMPPHP/XMLStream.php +++ b/extlib/XMPPHP/XMLStream.php @@ -310,24 +310,50 @@ class XMPPHP_XMLStream { return $this->disconnected; } - private function __process() { - $read = array($this->socket); - $write = null; - $except = null; - $updated = @stream_select($read, $write, $except, 1); - if ($updated > 0) { - $buff = @fread($this->socket, 1024); - if(!$buff) { - if($this->reconnect) { - $this->doReconnect(); - } else { - fclose($this->socket); - return false; - } + /** + * Core reading tool + * 0 -> only read if data is immediately ready + * NULL -> wait forever and ever + * integer -> process for this amount of time + */ + + private function __process($maximum=0) { + + $remaining = $maximum; + + do { + $starttime = microtime(); + $read = array($this->socket); + $write = array(); + $except = array(); + if (is_null($maximum)) { + $secs = NULL; + $usecs = NULL; + } else if ($maximum == 0) { + $secs = 0; + $usecs = 0; + } else { + $secs = $remaining / 1000000; + $usecs = $remaining % 1000000; } - $this->log->log("RECV: $buff", XMPPHP_Log::LEVEL_VERBOSE); - xml_parse($this->parser, $buff, false); - } + $updated = @stream_select($read, $write, $except, $secs, $usecs); + if ($updated > 0) { + # XXX: Is this big enough? + $buff = @fread($this->socket, 4096); + if(!$buff) { + if($this->reconnect) { + $this->doReconnect(); + } else { + fclose($this->socket); + return false; + } + } + $this->log->log("RECV: $buff", XMPPHP_Log::LEVEL_VERBOSE); + xml_parse($this->parser, $buff, false); + } + $remaining -= (microtime() - $starttime); + } while (is_null($maximum) || $remaining > 0); + return true; } /** @@ -336,10 +362,7 @@ class XMPPHP_XMLStream { * @return string */ public function process() { - $updated = ''; - while(!$this->disconnect) { - $this->__process(); - } + $this->__process(NULL); } /** @@ -348,11 +371,11 @@ class XMPPHP_XMLStream { * @param integer $timeout * @return string */ - public function processTime($timeout = -1) { - $start = time(); - $updated = ''; - while(!$this->disconnected and ($timeout == -1 or time() - $start < $timeout)) { - $this->__process(); + public function processTime($timeout=NULL) { + if (is_null($timeout)) { + return $this->__process(NULL); + } else { + return $this->__process($timeout * 1000000); } } @@ -372,7 +395,7 @@ class XMPPHP_XMLStream { reset($this->until); $updated = ''; while(!$this->disconnected and $this->until[$event_key] and (time() - $start < $timeout or $timeout == -1)) { - $this->__process(); + $this->__process(0); } if(array_key_exists($event_key, $this->until_payload)) { $payload = $this->until_payload[$event_key]; diff --git a/lib/queuehandler.php b/lib/queuehandler.php index ba7a93ab29..3115ea38d2 100644 --- a/lib/queuehandler.php +++ b/lib/queuehandler.php @@ -81,21 +81,18 @@ class QueueHandler { $this->log(LOG_WARNING, 'queue item for notice that does not exist'); } $qi->delete(); - $this->idle(); + $this->idle(0); } else { $this->clear_old_claims(); - $start = microtime(); - $this->idle(); - $used = microtime() - $start; - if ($used < 1000000) { - usleep(1000000 - $used); - } + $this->idle(5); } } while (true); } - function idle() { - return true; + function idle($timeout=0) { + if ($timeout>0) { + sleep($timeout); + } } function clear_old_claims() { diff --git a/scripts/publicqueuehandler.php b/scripts/publicqueuehandler.php index 9a7b6df5fa..555298f6a9 100755 --- a/scripts/publicqueuehandler.php +++ b/scripts/publicqueuehandler.php @@ -54,16 +54,8 @@ class PublicQueueHandler extends QueueHandler { return jabber_public_notice($notice); } - function idle() { - $this->log(LOG_DEBUG, 'Checking the incoming message queue.'); - # Process the queue for a second - if ($this->conn->readyToProcess()) { - $this->log(LOG_DEBUG, 'Something in the incoming message queue; processing it.'); - $this->conn->processTime(1); - $this->log(LOG_DEBUG, 'Done processing incoming message queue.'); - } else { - $this->log(LOG_DEBUG, 'Nothing in the incoming message queue; skipping it.'); - } + function idle($timeout=0) { + $this->conn->processTime($timeout); } function forward_message(&$pl) { diff --git a/scripts/xmppconfirmhandler.php b/scripts/xmppconfirmhandler.php index 08a397fc43..7971198b1e 100755 --- a/scripts/xmppconfirmhandler.php +++ b/scripts/xmppconfirmhandler.php @@ -86,16 +86,10 @@ class XmppConfirmHandler { continue; } } - $this->idle(); + $this->idle(0); } else { # $this->clear_old_confirm_claims(); - $start = microtime(); - $this->idle(); - $used = microtime() - $start; - if ($used < 10000000) { - usleep(10000000 - $used); - } - sleep(10); + $this->idle(10); } } while (true); } @@ -137,16 +131,8 @@ class XmppConfirmHandler { common_log($level, 'XmppConfirmHandler ('. $this->_id .'): '.$msg); } - function idle() { - $this->log(LOG_DEBUG, 'Checking the incoming message queue.'); - # Process the queue for a second - if ($this->conn->readyToProcess()) { - $this->log(LOG_DEBUG, 'Something in the incoming message queue; processing it.'); - $this->conn->processTime(1); - $this->log(LOG_DEBUG, 'Done processing incoming message queue.'); - } else { - $this->log(LOG_DEBUG, 'Nothing in the incoming message queue; skipping it.'); - } + function idle($timeout=0) { + $this->conn->processTime($timeout); } function forward_message(&$pl) { diff --git a/scripts/xmppqueuehandler.php b/scripts/xmppqueuehandler.php index c6f5c3f121..7f1e6c28f4 100755 --- a/scripts/xmppqueuehandler.php +++ b/scripts/xmppqueuehandler.php @@ -56,16 +56,9 @@ class XmppQueueHandler extends QueueHandler { return jabber_broadcast_notice($notice); } - function idle() { - $this->log(LOG_DEBUG, 'Checking the incoming message queue.'); + function idle($timeout=0) { # Process the queue for a second - if ($this->conn->readyToProcess()) { - $this->log(LOG_DEBUG, 'Something in the incoming message queue; processing it.'); - $this->conn->processTime(1); - $this->log(LOG_DEBUG, 'Done processing incoming message queue.'); - } else { - $this->log(LOG_DEBUG, 'Nothing in the incoming message queue; skipping it.'); - } + $this->conn->processTime($timeout); } function finish() {