Changes to make plugin use database instead of polling session readiness

This commit is contained in:
Luke Fitzgerald 2010-07-30 17:12:35 -07:00
parent e911480587
commit 0f7fdd4804
4 changed files with 169 additions and 13 deletions

View File

@ -106,6 +106,7 @@ class MsnPlugin extends ImPlugin {
require_once(INSTALLDIR.'/plugins/Msn/extlib/phpmsnclass/msn.class.php');
return false;
case 'MsnManager':
case 'Msn_waiting_message':
include_once $dir . '/'.strtolower($cls).'.php';
return false;
default:
@ -124,6 +125,25 @@ class MsnPlugin extends ImPlugin {
return true;
}
/**
* Ensure the database table is present
*
*/
public function onCheckSchema() {
$schema = Schema::get();
// For storing messages while sessions become ready
$schema->ensureTable('msn_waiting_message',
array(new ColumnDef('id', 'integer', null,
false, 'PRI', null, null, true),
new ColumnDef('screenname', 'integer', null, false),
new ColumnDef('message', 'text', null, false),
new ColumnDef('created', 'datetime', null, false),
new ColumnDef('claimed', 'datetime')));
return true;
}
/**
* Get a microid URI for the given screenname
*

View File

@ -949,7 +949,10 @@ class MSN {
// SB: <<< IRO {id} {rooster} {roostercount} {email} {alias} {clientid}
@list(/* IRO */, /* id */, $cur_num, $total, $email, $alias, $clientid) = @explode(' ', $data);
$this->debug_message("*** $email joined session");
$session['joined'] = true;
if ($email == $session['to']) {
$session['joined'] = true;
$this->callHandler('SessionReady', array('to' => $email));
}
break;
case 'BYE':
$this->debug_message("*** Quit for BYE");
@ -970,9 +973,11 @@ class MSN {
case 'JOI':
// SB: <<< JOI {user} {alias} {clientid?}
// someone join us
// we don't need the data, just ignore it
// no more user here
$session['joined'] = true;
@list(/* JOI */, $email) = @explode(' ', $data);
if ($email == $session['to']) {
$session['joined'] = true;
$this->callHandler('SessionReady', array('to' => $email));
}
break;
case 'MSG':
// SB: <<< MSG {email} {alias} {len}
@ -1478,6 +1483,7 @@ class MSN {
if ($this->sb_writeln($socket, $id, "MSG $id N $len") === false ||
$this->sb_writedata($socket, $SendString) === false) {
$this->endSBSession($socket);
return false;
}
}
@ -1485,6 +1491,7 @@ class MSN {
if ($this->sb_writeln($socket, $id, "MSG $id N $len") === false ||
$this->sb_writedata($socket, $aMessage) === false) {
$this->endSBSession($socket);
return false;
}
@ -1518,8 +1525,14 @@ class MSN {
* where network is 1 for MSN, 32 for Yahoo
* and 'Offline' for offline messages
* @param string $message Message
* @param boolean &$waitForSession Boolean passed by reference,
* if set to true on return, message
* did not fail to send but is
* waiting for a valid session
*
* @return boolean true on success
*/
public function sendMessage($to, $message) {
public function sendMessage($to, $message, &$waitForSession) {
if ($message != '') {
$toParts = explode('@', $to);
if(count($toParts) < 3) {
@ -1537,6 +1550,8 @@ class MSN {
$this->debug_message("*** No existing SB session or request has timed out");
$this->reqSBSession($recipient);
}
$waitForSession = true;
return false;
} else {
$socket = $this->switchBoardSessionLookup[$recipient];
@ -1544,10 +1559,12 @@ class MSN {
if ($this->switchBoardSessions[$intsocket]['offline']) {
$this->debug_message("*** Contact ($recipient) offline, sending OIM");
$this->endSBSession($socket);
$waitForSession = false;
return $this->sendMessage($recipient.'@Offline', $message);
} else {
if ($this->switchBoardSessions[$intsocket]['joined'] !== true) {
$this->debug_message("*** Recipient has not joined session, returning false");
$waitForSession = true;
return false;
}
@ -1558,6 +1575,7 @@ class MSN {
return true;
}
$waitForSession = false;
return false;
}
}
@ -3100,7 +3118,7 @@ X-OIM-Sequence-Num: 1
* Registers a user handler
*
* Handler List
* IMIn, Pong, ConnectFailed, Reconnect,
* IMIn, SessionReady, Pong, ConnectFailed, Reconnect,
* AddedToList, RemovedFromList, StatusChange
*
* @param string $event Event name

View File

@ -0,0 +1,73 @@
<?php
/**
* Table Definition for msn_plugin_message
*/
require_once INSTALLDIR.'/classes/Memcached_DataObject.php';
class Msn_waiting_message extends Memcached_DataObject {
public $__table = 'msn_waiting_message'; // table name
public $id; // int primary_key not_null auto_increment
public $screenname; // varchar(255) not_null
public $message; // text not_null
public $created; // datetime() not_null
public $claimed; // datetime()
/* Static get */
public function staticGet($k, $v = null) {
return Memcached_DataObject::staticGet('Msn_waiting_message', $k, $v);
}
/**
* @param mixed $screenname screenname or array of screennames to pull from
* If not specified, checks all queues in the system.
*/
public static function top($screenname = null) {
$wm = new Msn_waiting_message();
if ($screenname) {
if (is_array($screenname)) {
// @fixme use safer escaping
$list = implode("','", array_map('addslashes', $transports));
$wm->whereAdd("screename in ('$list')");
} else {
$wm->screenname = $screenname;
}
}
$wm->orderBy('created');
$wm->whereAdd('claimed is null');
$wm->limit(1);
$cnt = $wm->find(true);
if ($cnt) {
# XXX: potential race condition
# can we force it to only update if claimed is still null
# (or old)?
common_log(LOG_INFO, 'claiming msn waiting message id = ' . $wm->id);
$orig = clone($wm);
$wm->claimed = common_sql_now();
$result = $wm->update($orig);
if ($result) {
common_log(LOG_INFO, 'claim succeeded.');
return $wm;
} else {
common_log(LOG_INFO, 'claim failed.');
}
}
$wm = null;
return null;
}
/**
* Release a claimed item.
*/
public function releaseClaim() {
// DB_DataObject doesn't let us save nulls right now
$sql = sprintf("UPDATE msn_waiting_message SET claimed=NULL WHERE id=%d", $this->id);
$this->query($sql);
$this->claimed = null;
$this->encache();
}
}

View File

@ -113,6 +113,7 @@ class MsnManager extends ImManager {
)
);
$this->conn->registerHandler('IMin', array($this, 'handle_msn_message'));
$this->conn->registerHandler('SessionReady', array($this, 'handle_session_ready'));
$this->conn->registerHandler('Pong', array($this, 'update_ping_time'));
$this->conn->registerHandler('ConnectFailed', array($this, 'handle_connect_failed'));
$this->conn->registerHandler('Reconnect', array($this, 'handle_reconnect'));
@ -142,7 +143,7 @@ class MsnManager extends ImManager {
/**
* Update the time till the next ping
*
*
* @param $data Time till next ping
* @return void
*/
@ -163,6 +164,22 @@ class MsnManager extends ImManager {
return true;
}
/**
* Called via a callback when a session becomes ready
*
* @param array $data Data
*/
public function handle_session_ready($data) {
while (($wm = Msn_waiting_message::top($data['to']) != NULL)) {
if ($this->conn->sendMessage($wm->screenname, $wm->message, $ignore)) {
$wm->delete();
} else {
// Requeue the message in the regular queue
$this->plugin->send_message($wm->screenname, $wm->message);
}
}
}
/**
* Called by callback to log failure during connect
*
@ -182,11 +199,34 @@ class MsnManager extends ImManager {
public function handle_reconnect($data) {
common_log(LOG_NOTICE, 'MSN reconnecting');
}
/**
* Enters a message into the database for sending via a callback
* when the session is established
*
* @param string $to Intended recipient
* @param string $message Message
*/
private function enqueue_waiting_message($to, $message) {
$wm = new Msn_waiting_message();
$wm->screenname = $to;
$wm->message = $message;
$wm->created = common_sql_now();
$result = $wm->insert();
if (!$result) {
common_log_db_error($wm, 'INSERT', __FILE__);
throw new ServerException('DB error inserting queue item');
}
return true;
}
/**
* Send a message using the daemon
*
* @param $data Message
*
* @param $data Message data
* @return boolean true on success
*/
public function send_raw_message($data) {
@ -195,10 +235,15 @@ class MsnManager extends ImManager {
return false;
}
if (!$this->conn->sendMessage($data['to'], $data['message'])) {
return false;
$waitForSession = false;
if (!$this->conn->sendMessage($data['to'], $data['message'], $waitForSession)) {
if ($waitForSession) {
$this->enqueue_waiting_message($data['to'], $data['message']);
} else {
return false;
}
}
// Sending a command updates the time till next ping
$this->lastping = time();
$this->pingInterval = 50;