From 8e925927c6fe1bb485030bdc0151f0b90e8ceab4 Mon Sep 17 00:00:00 2001 From: Luke Fitzgerald Date: Tue, 10 Aug 2010 17:27:02 -0700 Subject: [PATCH] First commit of message throttling code --- plugins/Irc/IrcPlugin.php | 23 +++- plugins/Irc/Irc_waiting_message.php | 125 ++++++++++++++++++ .../phergie/Phergie/Plugin/Statusnet.php | 30 +++++ plugins/Irc/ircmanager.php | 100 ++++++++++++-- 4 files changed, 266 insertions(+), 12 deletions(-) create mode 100644 plugins/Irc/Irc_waiting_message.php diff --git a/plugins/Irc/IrcPlugin.php b/plugins/Irc/IrcPlugin.php index 85c348d9b3..54b7585211 100644 --- a/plugins/Irc/IrcPlugin.php +++ b/plugins/Irc/IrcPlugin.php @@ -126,6 +126,7 @@ class IrcPlugin extends ImPlugin { include_once $dir . '/'.strtolower($cls).'.php'; return false; case 'Fake_Irc': + case 'Irc_waiting_message': case 'ChannelResponseChannel': include_once $dir . '/'. $cls .'.php'; return false; @@ -150,6 +151,25 @@ class IrcPlugin extends ImPlugin { return true; } + /** + * Ensure the database table is present + * + */ + public function onCheckSchema() { + $schema = Schema::get(); + + // For storing messages while sessions become ready + $schema->ensureTable('irc_waiting_message', + array(new ColumnDef('id', 'integer', null, + false, 'PRI', null, null, true), + new ColumnDef('data', 'blob', null, false), + new ColumnDef('prioritise', 'tinyint', 1, false), + new ColumnDef('created', 'datetime', null, false), + new ColumnDef('claimed', 'datetime'))); + + return true; + } + /** * Get a microid URI for the given screenname * @@ -171,7 +191,7 @@ class IrcPlugin extends ImPlugin { $lines = explode("\n", $body); foreach ($lines as $line) { $this->fake_irc->doPrivmsg($screenname, $line); - $this->enqueue_outgoing_raw(array('type' => 'message', 'data' => $this->fake_irc->would_be_sent)); + $this->enqueue_outgoing_raw(array('type' => 'message', 'prioritise' => 0, 'data' => $this->fake_irc->would_be_sent)); } return true; } @@ -297,6 +317,7 @@ class IrcPlugin extends ImPlugin { $this->enqueue_outgoing_raw( array( 'type' => 'nickcheck', + 'prioritise' => 1, 'data' => $this->fake_irc->would_be_sent, 'nickdata' => array( diff --git a/plugins/Irc/Irc_waiting_message.php b/plugins/Irc/Irc_waiting_message.php new file mode 100644 index 0000000000..05f72754c7 --- /dev/null +++ b/plugins/Irc/Irc_waiting_message.php @@ -0,0 +1,125 @@ + DB_DATAOBJECT_INT + DB_DATAOBJECT_NOTNULL, + 'data' => DB_DATAOBJECT_BLOB + DB_DATAOBJECT_STR + DB_DATAOBJECT_NOTNULL, + 'prioritise' => DB_DATAOBJECT_INT + DB_DATAOBJECT_NOTNULL, + 'created' => DB_DATAOBJECT_TIME + DB_DATAOBJECT_STR + DB_DATAOBJECT_NOTNULL, + 'claimed' => DB_DATAOBJECT_TIME + DB_DATAOBJECT_STR); + } + + /** + * return key definitions for DB_DataObject + * + * DB_DataObject needs to know about keys that the table has, since it + * won't appear in StatusNet's own keys list. In most cases, this will + * simply reference your keyTypes() function. + * + * @return array list of key field names + */ + public function keys() { + return array_keys($this->keyTypes()); + } + + /** + * return key definitions for Memcached_DataObject + * + * Our caching system uses the same key definitions, but uses a different + * method to get them. This key information is used to store and clear + * cached data, so be sure to list any key that will be used for static + * lookups. + * + * @return array associative array of key definitions, field name to type: + * 'K' for primary key: for compound keys, add an entry for each component; + * 'U' for unique keys: compound keys are not well supported here. + */ + public function keyTypes() { + return array('id' => 'K'); + } + + /** + * Magic formula for non-autoincrementing integer primary keys + * + * If a table has a single integer column as its primary key, DB_DataObject + * assumes that the column is auto-incrementing and makes a sequence table + * to do this incrementation. Since we don't need this for our class, we + * overload this method and return the magic formula that DB_DataObject needs. + * + * @return array magic three-false array that stops auto-incrementing. + */ + public function sequenceKey() { + return array(false, false, false); + } + + /** + * Get the next item in the queue + * + * @return Irc_waiting_message Next message if there is one + */ + public static function top() { + $wm = new Irc_waiting_message(); + + $wm->orderBy('prioritise DESC, created'); + $wm->whereAdd('claimed is null'); + + $wm->limit(1); + + $cnt = $wm->find(true); + + if ($cnt) { + # XXX: potential race condition + # can we force it to only update if claimed is still null + # (or old)? + common_log(LOG_INFO, 'claiming IRC waiting message id = ' . $wm->id); + $orig = clone($wm); + $wm->claimed = common_sql_now(); + $result = $wm->update($orig); + if ($result) { + common_log(LOG_INFO, 'claim succeeded.'); + return $wm; + } else { + common_log(LOG_INFO, 'claim failed.'); + } + } + $wm = null; + return null; + } + + /** + * Release a claimed item. + */ + public function releaseClaim() { + // DB_DataObject doesn't let us save nulls right now + $sql = sprintf("UPDATE irc_waiting_message SET claimed=NULL WHERE id=%d", $this->id); + $this->query($sql); + + $this->claimed = null; + $this->encache(); + } +} diff --git a/plugins/Irc/extlib/phergie/Phergie/Plugin/Statusnet.php b/plugins/Irc/extlib/phergie/Phergie/Plugin/Statusnet.php index fb75f1d798..dc2680a6df 100644 --- a/plugins/Irc/extlib/phergie/Phergie/Plugin/Statusnet.php +++ b/plugins/Irc/extlib/phergie/Phergie/Plugin/Statusnet.php @@ -41,6 +41,13 @@ class Phergie_Plugin_Statusnet extends Phergie_Plugin_Abstract { */ protected $regCallback; + /** + * Connection established callback details + * + * @var array + */ + protected $connectedCallback; + /** * Load callback from config */ @@ -59,6 +66,13 @@ class Phergie_Plugin_Statusnet extends Phergie_Plugin_Abstract { $this->regCallback = NULL; } + $connectedCallback = $this->config['statusnet.connectedcallback']; + if (is_callable($connectedCallback)) { + $this->connectedCallback = $connectedCallback; + } else { + $this->connectedCallback = NULL; + } + $this->unregRegexp = $this->getConfig('statusnet.unregregexp', '/\x02(.*?)\x02 (?:isn\'t|is not) registered/i'); $this->regRegexp = $this->getConfig('statusnet.regregexp', '/(?:\A|\x02)(\w+?)\x02? (?:\(account|is \w+?\z)/i'); } @@ -110,4 +124,20 @@ class Phergie_Plugin_Statusnet extends Phergie_Plugin_Abstract { } } } + + /** + * Intercepts the end of the "message of the day" response and tells + * StatusNet we're connected + * + * @return void + */ + public function onResponse() { + switch ($this->getEvent()->getCode()) { + case Phergie_Event_Response::RPL_ENDOFMOTD: + case Phergie_Event_Response::ERR_NOMOTD: + if ($this->connectedCallback !== NULL) { + call_user_func($this->connectedCallback); + } + } + } } diff --git a/plugins/Irc/ircmanager.php b/plugins/Irc/ircmanager.php index 96c019b7fc..7e2b32b23e 100644 --- a/plugins/Irc/ircmanager.php +++ b/plugins/Irc/ircmanager.php @@ -32,10 +32,14 @@ if (!defined('STATUSNET') && !defined('LACONICA')) { exit(1); } class IrcManager extends ImManager { protected $conn = null; protected $lastPing = null; + protected $messageWaiting = true; + protected $lastMessage = null; protected $regChecks = array(); protected $regChecksLookup = array(); + protected $connected = false; + /** * Initialize connection to server. * @@ -65,16 +69,38 @@ class IrcManager extends ImManager { } } + public function timeout() { + return 1; + } + /** * Idle processing for io manager's execution loop. - * Send keepalive pings to server. * * @return void */ public function idle() { + // Send a ping if necessary if (empty($this->lastPing) || time() - $this->lastPing > 120) { $this->sendPing(); } + + if ($this->connected) { + // Send a waiting message if appropriate + if ($this->messageWaiting && time() - $this->lastMessage > 1) { + $wm = Irc_waiting_message::top(); + if ($wm === NULL) { + $this->messageWaiting = false; + return; + } + $data = unserialize($wm->data); + + if (!$this->send_raw_message($data)) { + $this->plugin->enqueue_outgoing_raw($data); + } + + $wm->delete(); + } + } } /** @@ -90,6 +116,7 @@ class IrcManager extends ImManager { try { $this->conn->handleEvents(); } catch (Phergie_Driver_Exception $e) { + $this->connected = false; $this->conn->reconnect(); } } @@ -142,6 +169,7 @@ class IrcManager extends ImManager { 'statusnet.messagecallback' => array($this, 'handle_irc_message'), 'statusnet.regcallback' => array($this, 'handle_reg_response'), + 'statusnet.connectedcallback' => array($this, 'handle_connected'), 'statusnet.unregregexp' => $this->plugin->unregregexp, 'statusnet.regregexp' => $this->plugin->regregexp ) @@ -150,6 +178,7 @@ class IrcManager extends ImManager { $this->conn->setConfig($config); $this->conn->connect(); $this->lastPing = time(); + $this->lastMessage = time(); } return $this->conn; } @@ -211,6 +240,38 @@ class IrcManager extends ImManager { } } + /** + * Called when the connection is established + * + * @return void + */ + public function handle_connected() { + $this->connected = true; + } + + /** + * Enters a message into the database for sending when ready + * + * @param string $command Command + * @param array $args Arguments + * @return boolean + */ + protected function enqueue_waiting_message($data) { + $wm = new Irc_waiting_message(); + + $wm->data = serialize($data); + $wm->prioritise = $data['prioritise']; + $wm->created = common_sql_now(); + $result = $wm->insert(); + + if (!$result) { + common_log_db_error($wm, 'INSERT', __FILE__); + throw new ServerException('DB error inserting IRC waiting queue item'); + } + + return true; + } + /** * Send a message using the daemon * @@ -223,28 +284,45 @@ class IrcManager extends ImManager { return false; } - if ($data['type'] != 'message') { - // Nick checking - $nickdata = $data['nickdata']; - $usernick = $nickdata['user']->nickname; - $screenname = $nickdata['screenname']; + if ($data['type'] != 'delayedmessage') { + if ($data['type'] != 'message') { + // Nick checking + $nickdata = $data['nickdata']; + $usernick = $nickdata['user']->nickname; + $screenname = $nickdata['screenname']; - // Cancel any existing checks for this user - if (isset($this->regChecksLookup[$usernick])) { - unset($this->regChecks[$this->regChecksLookup[$usernick]]); + // Cancel any existing checks for this user + if (isset($this->regChecksLookup[$usernick])) { + unset($this->regChecks[$this->regChecksLookup[$usernick]]); + } + + $this->regChecks[$screenname] = $nickdata; + $this->regChecksLookup[$usernick] = $screenname; } - $this->regChecks[$screenname] = $nickdata; - $this->regChecksLookup[$usernick] = $screenname; + // If there is a backlog or we need to wait, queue the message + if ($this->messageWaiting || time() - $this->lastMessage < 1) { + $this->enqueue_waiting_message( + array( + 'type' => 'delayedmessage', + 'prioritise' => $data['prioritise'], + 'data' => $data['data'] + ) + ); + $this->messageWaiting = true; + return true; + } } try { $this->conn->send($data['data']['command'], $data['data']['args']); } catch (Phergie_Driver_Exception $e) { + $this->connected = false; $this->conn->reconnect(); return false; } + $this->lastMessage = time(); return true; }