* applied Evan's XMLStream select() patches
* made roster presence tracking optional git-svn-id: svn://netflint.net/xmpphp@69 ef36c318-a008-4979-b6e8-6b496270793b
This commit is contained in:
parent
1a3c6cf010
commit
6cd2e281c7
@ -163,6 +163,10 @@ class XMPPHP_XMLStream {
|
|||||||
* @var boolean
|
* @var boolean
|
||||||
*/
|
*/
|
||||||
protected $use_ssl = false;
|
protected $use_ssl = false;
|
||||||
|
/**
|
||||||
|
* @var integer
|
||||||
|
*/
|
||||||
|
protected $reconnectTimeout = 30;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructor
|
* Constructor
|
||||||
@ -289,6 +293,10 @@ class XMPPHP_XMLStream {
|
|||||||
* @param boolean $sendinit
|
* @param boolean $sendinit
|
||||||
*/
|
*/
|
||||||
public function connect($timeout = 30, $persistent = false, $sendinit = true) {
|
public function connect($timeout = 30, $persistent = false, $sendinit = true) {
|
||||||
|
$this->sent_disconnect = false;
|
||||||
|
$starttime = time();
|
||||||
|
|
||||||
|
do {
|
||||||
$this->disconnected = false;
|
$this->disconnected = false;
|
||||||
$this->sent_disconnect = false;
|
$this->sent_disconnect = false;
|
||||||
if($persistent) {
|
if($persistent) {
|
||||||
@ -307,11 +315,17 @@ class XMPPHP_XMLStream {
|
|||||||
if(!$this->socket) {
|
if(!$this->socket) {
|
||||||
$this->log->log("Could not connect.", XMPPHP_Log::LEVEL_ERROR);
|
$this->log->log("Could not connect.", XMPPHP_Log::LEVEL_ERROR);
|
||||||
$this->disconnected = true;
|
$this->disconnected = true;
|
||||||
|
# Take it easy for a few seconds
|
||||||
throw new XMPPHP_Exception('Could not connect.');
|
sleep(min($timeout, 5));
|
||||||
}
|
}
|
||||||
|
} while (!$this->socket && (time() - $starttime) < $timeout);
|
||||||
|
|
||||||
|
if ($this->socket) {
|
||||||
stream_set_blocking($this->socket, 1);
|
stream_set_blocking($this->socket, 1);
|
||||||
if($sendinit) $this->send($this->stream_start);
|
if($sendinit) $this->send($this->stream_start);
|
||||||
|
} else {
|
||||||
|
throw new XMPPHP_Exception("Could not connect before timeout.");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -319,12 +333,17 @@ class XMPPHP_XMLStream {
|
|||||||
*/
|
*/
|
||||||
public function doReconnect() {
|
public function doReconnect() {
|
||||||
if(!$this->is_server) {
|
if(!$this->is_server) {
|
||||||
$this->log->log("Reconnecting...", XMPPHP_Log::LEVEL_WARNING);
|
$this->log->log("Reconnecting ($this->reconnectTimeout)...", XMPPHP_Log::LEVEL_WARNING);
|
||||||
$this->connect(30, false, false);
|
$this->connect($this->reconnectTimeout, false, false);
|
||||||
$this->reset();
|
$this->reset();
|
||||||
|
$this->event('reconnect');
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public function setReconnectTimeout($timeout) {
|
||||||
|
$this->reconnectTimeout = $timeout;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Disconnect from XMPP Host
|
* Disconnect from XMPP Host
|
||||||
*/
|
*/
|
||||||
@ -349,24 +368,64 @@ class XMPPHP_XMLStream {
|
|||||||
return $this->disconnected;
|
return $this->disconnected;
|
||||||
}
|
}
|
||||||
|
|
||||||
public function __process() {
|
/**
|
||||||
|
* Core reading tool
|
||||||
|
* 0 -> only read if data is immediately ready
|
||||||
|
* NULL -> wait forever and ever
|
||||||
|
* integer -> process for this amount of time
|
||||||
|
*/
|
||||||
|
|
||||||
|
private function __process($maximum=0) {
|
||||||
|
|
||||||
|
$remaining = $maximum;
|
||||||
|
|
||||||
|
do {
|
||||||
|
$starttime = (microtime(true) * 1000000);
|
||||||
$read = array($this->socket);
|
$read = array($this->socket);
|
||||||
$write = null;
|
$write = array();
|
||||||
$except = null;
|
$except = array();
|
||||||
$updated = @stream_select($read, $write, $except, 1);
|
if (is_null($maximum)) {
|
||||||
if ($updated > 0) {
|
$secs = NULL;
|
||||||
$buff = @fread($this->socket, 1024);
|
$usecs = NULL;
|
||||||
|
} else if ($maximum == 0) {
|
||||||
|
$secs = 0;
|
||||||
|
$usecs = 0;
|
||||||
|
} else {
|
||||||
|
$usecs = $remaining % 1000000;
|
||||||
|
$secs = floor(($remaining - $usecs) / 1000000);
|
||||||
|
}
|
||||||
|
$updated = @stream_select($read, $write, $except, $secs, $usecs);
|
||||||
|
if ($updated === false) {
|
||||||
|
$this->log->log("Error on stream_select()", XMPPHP_Log::LEVEL_VERBOSE);
|
||||||
|
if ($this->reconnect) {
|
||||||
|
$this->doReconnect();
|
||||||
|
} else {
|
||||||
|
fclose($this->socket);
|
||||||
|
$this->socket = NULL;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
} else if ($updated > 0) {
|
||||||
|
# XXX: Is this big enough?
|
||||||
|
$buff = @fread($this->socket, 4096);
|
||||||
if(!$buff) {
|
if(!$buff) {
|
||||||
if($this->reconnect) {
|
if($this->reconnect) {
|
||||||
$this->doReconnect();
|
$this->doReconnect();
|
||||||
} else {
|
} else {
|
||||||
fclose($this->socket);
|
fclose($this->socket);
|
||||||
|
$this->socket = NULL;
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
$this->log->log("RECV: $buff", XMPPHP_Log::LEVEL_VERBOSE);
|
$this->log->log("RECV: $buff", XMPPHP_Log::LEVEL_VERBOSE);
|
||||||
xml_parse($this->parser, $buff, false);
|
xml_parse($this->parser, $buff, false);
|
||||||
|
} else {
|
||||||
|
# $updated == 0 means no changes during timeout.
|
||||||
}
|
}
|
||||||
|
$endtime = (microtime(true)*1000000);
|
||||||
|
$time_past = $endtime - $starttime;
|
||||||
|
$remaining = $remaining - $time_past;
|
||||||
|
} while (is_null($maximum) || $remaining > 0);
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -375,10 +434,7 @@ class XMPPHP_XMLStream {
|
|||||||
* @return string
|
* @return string
|
||||||
*/
|
*/
|
||||||
public function process() {
|
public function process() {
|
||||||
$updated = '';
|
$this->__process(NULL);
|
||||||
while(!$this->disconnect) {
|
|
||||||
$this->__process();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -387,11 +443,11 @@ class XMPPHP_XMLStream {
|
|||||||
* @param integer $timeout
|
* @param integer $timeout
|
||||||
* @return string
|
* @return string
|
||||||
*/
|
*/
|
||||||
public function processTime($timeout = -1) {
|
public function processTime($timeout=NULL) {
|
||||||
$start = time();
|
if (is_null($timeout)) {
|
||||||
$updated = '';
|
return $this->__process(NULL);
|
||||||
while(!$this->disconnected and ($timeout == -1 or time() - $start < $timeout)) {
|
} else {
|
||||||
$this->__process();
|
return $this->__process($timeout * 1000000);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -620,34 +676,47 @@ class XMPPHP_XMLStream {
|
|||||||
*
|
*
|
||||||
* @param string $msg
|
* @param string $msg
|
||||||
*/
|
*/
|
||||||
public function send($msg, $rec=false) {
|
public function send($msg, $timeout=NULL) {
|
||||||
if($this->time() - $this->last_send < .1) {
|
|
||||||
usleep(100000);
|
if (is_null($timeout)) {
|
||||||
|
$secs = NULL;
|
||||||
|
$usecs = NULL;
|
||||||
|
} else if ($timeout == 0) {
|
||||||
|
$secs = 0;
|
||||||
|
$usecs = 0;
|
||||||
|
} else {
|
||||||
|
$maximum = $timeout * 1000000;
|
||||||
|
$usecs = $maximum % 1000000;
|
||||||
|
$secs = floor(($maximum - $usecs) / 1000000);
|
||||||
}
|
}
|
||||||
$wait = true;
|
|
||||||
while($wait) {
|
$read = array();
|
||||||
$read = null;
|
|
||||||
$write = array($this->socket);
|
$write = array($this->socket);
|
||||||
$except = null;
|
$except = array();
|
||||||
$select = @stream_select($read, $write, $except, 0, 0);
|
|
||||||
|
$select = @stream_select($read, $write, $except, $secs, $usecs);
|
||||||
|
|
||||||
if($select === False) {
|
if($select === False) {
|
||||||
|
$this->log->log("ERROR sending message; reconnecting.");
|
||||||
$this->doReconnect();
|
$this->doReconnect();
|
||||||
|
# TODO: retry send here
|
||||||
return false;
|
return false;
|
||||||
} elseif ($select > 0) {
|
} elseif ($select > 0) {
|
||||||
$wait = false;
|
$this->log->log("Socket is ready; send it.", XMPPHP_Log::LEVEL_VERBOSE);
|
||||||
} else {
|
} else {
|
||||||
usleep(100000);
|
$this->log->log("Socket is not ready; break.", XMPPHP_Log::LEVEL_ERROR);
|
||||||
//$this->processTime(.25);
|
return false;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
$sentbytes = @fwrite($this->socket, $msg, 1024);
|
$sentbytes = @fwrite($this->socket, $msg);
|
||||||
$this->last_send = $this->time();
|
|
||||||
$this->log->log("SENT: " . mb_substr($msg, 0, $sentbytes, '8bit'), XMPPHP_Log::LEVEL_VERBOSE);
|
$this->log->log("SENT: " . mb_substr($msg, 0, $sentbytes, '8bit'), XMPPHP_Log::LEVEL_VERBOSE);
|
||||||
if($sentbytes === FALSE) {
|
if($sentbytes === FALSE) {
|
||||||
|
$this->log->log("ERROR sending message; reconnecting.", XMPPHP_Log::LEVEL_ERROR);
|
||||||
$this->doReconnect();
|
$this->doReconnect();
|
||||||
} elseif ($sentbytes != mb_strlen($msg, '8bit')) {
|
return false;
|
||||||
$this->send(mb_substr($msg, $sentbytes, mb_strlen($msg, '8bit'), '8bit'), true);
|
|
||||||
}
|
}
|
||||||
|
$this->log->log("Successfully sent $sentbytes bytes.", XMPPHP_Log::LEVEL_VERBOSE);
|
||||||
|
return $sentbytes;
|
||||||
}
|
}
|
||||||
|
|
||||||
public function time() {
|
public function time() {
|
||||||
@ -680,4 +749,12 @@ class XMPPHP_XMLStream {
|
|||||||
xml_set_element_handler($this->parser, 'startXML', 'endXML');
|
xml_set_element_handler($this->parser, 'startXML', 'endXML');
|
||||||
xml_set_character_data_handler($this->parser, 'charXML');
|
xml_set_character_data_handler($this->parser, 'charXML');
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public function readyToProcess() {
|
||||||
|
$read = array($this->socket);
|
||||||
|
$write = array();
|
||||||
|
$except = array();
|
||||||
|
$updated = @stream_select($read, $write, $except, 0);
|
||||||
|
return (($updated !== false) && ($updated > 0));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -88,6 +88,11 @@ class XMPPHP_XMPP extends XMPPHP_XMLStream {
|
|||||||
*/
|
*/
|
||||||
protected $use_encryption = true;
|
protected $use_encryption = true;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @var boolean
|
||||||
|
*/
|
||||||
|
public $track_presence = true;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @var object
|
* @var object
|
||||||
*/
|
*/
|
||||||
@ -115,6 +120,7 @@ class XMPPHP_XMPP extends XMPPHP_XMLStream {
|
|||||||
$this->basejid = $this->user . '@' . $this->host;
|
$this->basejid = $this->user . '@' . $this->host;
|
||||||
|
|
||||||
$this->roster = new Roster();
|
$this->roster = new Roster();
|
||||||
|
$this->track_presence = true;
|
||||||
|
|
||||||
$this->stream_start = '<stream:stream to="' . $server . '" xmlns:stream="http://etherx.jabber.org/streams" xmlns="jabber:client" version="1.0">';
|
$this->stream_start = '<stream:stream to="' . $server . '" xmlns:stream="http://etherx.jabber.org/streams" xmlns="jabber:client" version="1.0">';
|
||||||
$this->stream_end = '</stream:stream>';
|
$this->stream_end = '</stream:stream>';
|
||||||
@ -230,7 +236,9 @@ class XMPPHP_XMPP extends XMPPHP_XMLStream {
|
|||||||
$payload['from'] = $xml->attrs['from'];
|
$payload['from'] = $xml->attrs['from'];
|
||||||
$payload['status'] = (isset($xml->sub('status')->data)) ? $xml->sub('status')->data : '';
|
$payload['status'] = (isset($xml->sub('status')->data)) ? $xml->sub('status')->data : '';
|
||||||
$payload['priority'] = (isset($xml->sub('priority')->data)) ? intval($xml->sub('priority')->data) : 0;
|
$payload['priority'] = (isset($xml->sub('priority')->data)) ? intval($xml->sub('priority')->data) : 0;
|
||||||
|
if($this->track_presence) {
|
||||||
$this->roster->setPresence($payload['from'], $payload['priority'], $payload['show'], $payload['status']);
|
$this->roster->setPresence($payload['from'], $payload['priority'], $payload['show'], $payload['status']);
|
||||||
|
}
|
||||||
$this->log->log("Presence: {$payload['from']} [{$payload['show']}] {$payload['status']}", XMPPHP_Log::LEVEL_DEBUG);
|
$this->log->log("Presence: {$payload['from']} [{$payload['show']}] {$payload['status']}", XMPPHP_Log::LEVEL_DEBUG);
|
||||||
if(array_key_exists('type', $xml->attrs) and $xml->attrs['type'] == 'subscribe') {
|
if(array_key_exists('type', $xml->attrs) and $xml->attrs['type'] == 'subscribe') {
|
||||||
if($this->auto_subscribe) {
|
if($this->auto_subscribe) {
|
||||||
|
Loading…
Reference in New Issue
Block a user