From 6cd2e281c7632040758f5e2d813dd271223c546c Mon Sep 17 00:00:00 2001 From: fritzy Date: Wed, 26 Nov 2008 10:49:47 +0000 Subject: [PATCH] * applied Evan's XMLStream select() patches * made roster presence tracking optional git-svn-id: svn://netflint.net/xmpphp@69 ef36c318-a008-4979-b6e8-6b496270793b --- XMPPHP/XMLStream.php | 215 +++++++++++++++++++++++++++++-------------- XMPPHP/XMPP.php | 10 +- 2 files changed, 155 insertions(+), 70 deletions(-) diff --git a/XMPPHP/XMLStream.php b/XMPPHP/XMLStream.php index fb167d2..7b54d63 100644 --- a/XMPPHP/XMLStream.php +++ b/XMPPHP/XMLStream.php @@ -163,6 +163,10 @@ class XMPPHP_XMLStream { * @var boolean */ protected $use_ssl = false; + /** + * @var integer + */ + protected $reconnectTimeout = 30; /** * Constructor @@ -234,7 +238,7 @@ class XMPPHP_XMLStream { /** * Add Handler * - * @param string $name + * @param string $name * @param string $ns * @param string $pointer * @param string $obj @@ -289,29 +293,39 @@ class XMPPHP_XMLStream { * @param boolean $sendinit */ public function connect($timeout = 30, $persistent = false, $sendinit = true) { - $this->disconnected = false; $this->sent_disconnect = false; - if($persistent) { - $conflag = STREAM_CLIENT_CONNECT | STREAM_CLIENT_PERSISTENT; + $starttime = time(); + + do { + $this->disconnected = false; + $this->sent_disconnect = false; + if($persistent) { + $conflag = STREAM_CLIENT_CONNECT | STREAM_CLIENT_PERSISTENT; + } else { + $conflag = STREAM_CLIENT_CONNECT; + } + $conntype = 'tcp'; + if($this->use_ssl) $conntype = 'ssl'; + $this->log->log("Connecting to $conntype://{$this->host}:{$this->port}"); + try { + $this->socket = @stream_socket_client("$conntype://{$this->host}:{$this->port}", $errno, $errstr, $timeout, $conflag); + } catch (Exception $e) { + throw new XMPPHP_Exception($e->getMessage()); + } + if(!$this->socket) { + $this->log->log("Could not connect.", XMPPHP_Log::LEVEL_ERROR); + $this->disconnected = true; + # Take it easy for a few seconds + sleep(min($timeout, 5)); + } + } while (!$this->socket && (time() - $starttime) < $timeout); + + if ($this->socket) { + stream_set_blocking($this->socket, 1); + if($sendinit) $this->send($this->stream_start); } else { - $conflag = STREAM_CLIENT_CONNECT; + throw new XMPPHP_Exception("Could not connect before timeout."); } - $conntype = 'tcp'; - if($this->use_ssl) $conntype = 'ssl'; - $this->log->log("Connecting to $conntype://{$this->host}:{$this->port}"); - try { - $this->socket = @stream_socket_client("$conntype://{$this->host}:{$this->port}", $errno, $errstr, $timeout, $conflag); - } catch (Exception $e) { - throw new XMPPHP_Exception($e->getMessage()); - } - if(!$this->socket) { - $this->log->log("Could not connect.", XMPPHP_Log::LEVEL_ERROR); - $this->disconnected = true; - - throw new XMPPHP_Exception('Could not connect.'); - } - stream_set_blocking($this->socket, 1); - if($sendinit) $this->send($this->stream_start); } /** @@ -319,12 +333,17 @@ class XMPPHP_XMLStream { */ public function doReconnect() { if(!$this->is_server) { - $this->log->log("Reconnecting...", XMPPHP_Log::LEVEL_WARNING); - $this->connect(30, false, false); + $this->log->log("Reconnecting ($this->reconnectTimeout)...", XMPPHP_Log::LEVEL_WARNING); + $this->connect($this->reconnectTimeout, false, false); $this->reset(); + $this->event('reconnect'); } } + public function setReconnectTimeout($timeout) { + $this->reconnectTimeout = $timeout; + } + /** * Disconnect from XMPP Host */ @@ -349,24 +368,64 @@ class XMPPHP_XMLStream { return $this->disconnected; } - public function __process() { - $read = array($this->socket); - $write = null; - $except = null; - $updated = @stream_select($read, $write, $except, 1); - if ($updated > 0) { - $buff = @fread($this->socket, 1024); - if(!$buff) { - if($this->reconnect) { + /** + * Core reading tool + * 0 -> only read if data is immediately ready + * NULL -> wait forever and ever + * integer -> process for this amount of time + */ + + private function __process($maximum=0) { + + $remaining = $maximum; + + do { + $starttime = (microtime(true) * 1000000); + $read = array($this->socket); + $write = array(); + $except = array(); + if (is_null($maximum)) { + $secs = NULL; + $usecs = NULL; + } else if ($maximum == 0) { + $secs = 0; + $usecs = 0; + } else { + $usecs = $remaining % 1000000; + $secs = floor(($remaining - $usecs) / 1000000); + } + $updated = @stream_select($read, $write, $except, $secs, $usecs); + if ($updated === false) { + $this->log->log("Error on stream_select()", XMPPHP_Log::LEVEL_VERBOSE); + if ($this->reconnect) { $this->doReconnect(); } else { fclose($this->socket); + $this->socket = NULL; return false; } + } else if ($updated > 0) { + # XXX: Is this big enough? + $buff = @fread($this->socket, 4096); + if(!$buff) { + if($this->reconnect) { + $this->doReconnect(); + } else { + fclose($this->socket); + $this->socket = NULL; + return false; + } + } + $this->log->log("RECV: $buff", XMPPHP_Log::LEVEL_VERBOSE); + xml_parse($this->parser, $buff, false); + } else { + # $updated == 0 means no changes during timeout. } - $this->log->log("RECV: $buff", XMPPHP_Log::LEVEL_VERBOSE); - xml_parse($this->parser, $buff, false); - } + $endtime = (microtime(true)*1000000); + $time_past = $endtime - $starttime; + $remaining = $remaining - $time_past; + } while (is_null($maximum) || $remaining > 0); + return true; } /** @@ -375,10 +434,7 @@ class XMPPHP_XMLStream { * @return string */ public function process() { - $updated = ''; - while(!$this->disconnect) { - $this->__process(); - } + $this->__process(NULL); } /** @@ -387,11 +443,11 @@ class XMPPHP_XMLStream { * @param integer $timeout * @return string */ - public function processTime($timeout = -1) { - $start = time(); - $updated = ''; - while(!$this->disconnected and ($timeout == -1 or time() - $start < $timeout)) { - $this->__process(); + public function processTime($timeout=NULL) { + if (is_null($timeout)) { + return $this->__process(NULL); + } else { + return $this->__process($timeout * 1000000); } } @@ -620,34 +676,47 @@ class XMPPHP_XMLStream { * * @param string $msg */ - public function send($msg, $rec=false) { - if($this->time() - $this->last_send < .1) { - usleep(100000); + public function send($msg, $timeout=NULL) { + + if (is_null($timeout)) { + $secs = NULL; + $usecs = NULL; + } else if ($timeout == 0) { + $secs = 0; + $usecs = 0; + } else { + $maximum = $timeout * 1000000; + $usecs = $maximum % 1000000; + $secs = floor(($maximum - $usecs) / 1000000); } - $wait = true; - while($wait) { - $read = null; - $write = array($this->socket); - $except = null; - $select = @stream_select($read, $write, $except, 0, 0); - if($select === False) { - $this->doReconnect(); - return false; - } elseif ($select > 0) { - $wait = false; - } else { - usleep(100000); - //$this->processTime(.25); - } - } - $sentbytes = @fwrite($this->socket, $msg, 1024); - $this->last_send = $this->time(); - $this->log->log("SENT: " . mb_substr($msg, 0, $sentbytes, '8bit'), XMPPHP_Log::LEVEL_VERBOSE); - if($sentbytes === FALSE) { + + $read = array(); + $write = array($this->socket); + $except = array(); + + $select = @stream_select($read, $write, $except, $secs, $usecs); + + if($select === False) { + $this->log->log("ERROR sending message; reconnecting."); $this->doReconnect(); - } elseif ($sentbytes != mb_strlen($msg, '8bit')) { - $this->send(mb_substr($msg, $sentbytes, mb_strlen($msg, '8bit'), '8bit'), true); + # TODO: retry send here + return false; + } elseif ($select > 0) { + $this->log->log("Socket is ready; send it.", XMPPHP_Log::LEVEL_VERBOSE); + } else { + $this->log->log("Socket is not ready; break.", XMPPHP_Log::LEVEL_ERROR); + return false; } + + $sentbytes = @fwrite($this->socket, $msg); + $this->log->log("SENT: " . mb_substr($msg, 0, $sentbytes, '8bit'), XMPPHP_Log::LEVEL_VERBOSE); + if($sentbytes === FALSE) { + $this->log->log("ERROR sending message; reconnecting.", XMPPHP_Log::LEVEL_ERROR); + $this->doReconnect(); + return false; + } + $this->log->log("Successfully sent $sentbytes bytes.", XMPPHP_Log::LEVEL_VERBOSE); + return $sentbytes; } public function time() { @@ -680,4 +749,12 @@ class XMPPHP_XMLStream { xml_set_element_handler($this->parser, 'startXML', 'endXML'); xml_set_character_data_handler($this->parser, 'charXML'); } + + public function readyToProcess() { + $read = array($this->socket); + $write = array(); + $except = array(); + $updated = @stream_select($read, $write, $except, 0); + return (($updated !== false) && ($updated > 0)); + } } diff --git a/XMPPHP/XMPP.php b/XMPPHP/XMPP.php index ff4852c..e88441f 100644 --- a/XMPPHP/XMPP.php +++ b/XMPPHP/XMPP.php @@ -88,6 +88,11 @@ class XMPPHP_XMPP extends XMPPHP_XMLStream { */ protected $use_encryption = true; + /** + * @var boolean + */ + public $track_presence = true; + /** * @var object */ @@ -115,6 +120,7 @@ class XMPPHP_XMPP extends XMPPHP_XMLStream { $this->basejid = $this->user . '@' . $this->host; $this->roster = new Roster(); + $this->track_presence = true; $this->stream_start = ''; $this->stream_end = ''; @@ -230,7 +236,9 @@ class XMPPHP_XMPP extends XMPPHP_XMLStream { $payload['from'] = $xml->attrs['from']; $payload['status'] = (isset($xml->sub('status')->data)) ? $xml->sub('status')->data : ''; $payload['priority'] = (isset($xml->sub('priority')->data)) ? intval($xml->sub('priority')->data) : 0; - $this->roster->setPresence($payload['from'], $payload['priority'], $payload['show'], $payload['status']); + if($this->track_presence) { + $this->roster->setPresence($payload['from'], $payload['priority'], $payload['show'], $payload['status']); + } $this->log->log("Presence: {$payload['from']} [{$payload['show']}] {$payload['status']}", XMPPHP_Log::LEVEL_DEBUG); if(array_key_exists('type', $xml->attrs) and $xml->attrs['type'] == 'subscribe') { if($this->auto_subscribe) {