forked from GNUsocial/gnu-social
First commit of message throttling code
This commit is contained in:
parent
322e7c4e9f
commit
8e925927c6
@ -126,6 +126,7 @@ class IrcPlugin extends ImPlugin {
|
|||||||
include_once $dir . '/'.strtolower($cls).'.php';
|
include_once $dir . '/'.strtolower($cls).'.php';
|
||||||
return false;
|
return false;
|
||||||
case 'Fake_Irc':
|
case 'Fake_Irc':
|
||||||
|
case 'Irc_waiting_message':
|
||||||
case 'ChannelResponseChannel':
|
case 'ChannelResponseChannel':
|
||||||
include_once $dir . '/'. $cls .'.php';
|
include_once $dir . '/'. $cls .'.php';
|
||||||
return false;
|
return false;
|
||||||
@ -150,6 +151,25 @@ class IrcPlugin extends ImPlugin {
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Ensure the database table is present
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public function onCheckSchema() {
|
||||||
|
$schema = Schema::get();
|
||||||
|
|
||||||
|
// For storing messages while sessions become ready
|
||||||
|
$schema->ensureTable('irc_waiting_message',
|
||||||
|
array(new ColumnDef('id', 'integer', null,
|
||||||
|
false, 'PRI', null, null, true),
|
||||||
|
new ColumnDef('data', 'blob', null, false),
|
||||||
|
new ColumnDef('prioritise', 'tinyint', 1, false),
|
||||||
|
new ColumnDef('created', 'datetime', null, false),
|
||||||
|
new ColumnDef('claimed', 'datetime')));
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get a microid URI for the given screenname
|
* Get a microid URI for the given screenname
|
||||||
*
|
*
|
||||||
@ -171,7 +191,7 @@ class IrcPlugin extends ImPlugin {
|
|||||||
$lines = explode("\n", $body);
|
$lines = explode("\n", $body);
|
||||||
foreach ($lines as $line) {
|
foreach ($lines as $line) {
|
||||||
$this->fake_irc->doPrivmsg($screenname, $line);
|
$this->fake_irc->doPrivmsg($screenname, $line);
|
||||||
$this->enqueue_outgoing_raw(array('type' => 'message', 'data' => $this->fake_irc->would_be_sent));
|
$this->enqueue_outgoing_raw(array('type' => 'message', 'prioritise' => 0, 'data' => $this->fake_irc->would_be_sent));
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
@ -297,6 +317,7 @@ class IrcPlugin extends ImPlugin {
|
|||||||
$this->enqueue_outgoing_raw(
|
$this->enqueue_outgoing_raw(
|
||||||
array(
|
array(
|
||||||
'type' => 'nickcheck',
|
'type' => 'nickcheck',
|
||||||
|
'prioritise' => 1,
|
||||||
'data' => $this->fake_irc->would_be_sent,
|
'data' => $this->fake_irc->would_be_sent,
|
||||||
'nickdata' =>
|
'nickdata' =>
|
||||||
array(
|
array(
|
||||||
|
125
plugins/Irc/Irc_waiting_message.php
Normal file
125
plugins/Irc/Irc_waiting_message.php
Normal file
@ -0,0 +1,125 @@
|
|||||||
|
<?php
|
||||||
|
/**
|
||||||
|
* Table Definition for irc_waiting_message
|
||||||
|
*/
|
||||||
|
require_once INSTALLDIR.'/classes/Memcached_DataObject.php';
|
||||||
|
|
||||||
|
class Irc_waiting_message extends Memcached_DataObject {
|
||||||
|
|
||||||
|
public $__table = 'irc_waiting_message'; // table name
|
||||||
|
public $id; // int primary_key not_null auto_increment
|
||||||
|
public $data; // blob not_null
|
||||||
|
public $prioritise; // tinyint(1) not_null
|
||||||
|
public $created; // datetime() not_null
|
||||||
|
public $claimed; // datetime()
|
||||||
|
|
||||||
|
/* Static get */
|
||||||
|
public function staticGet($k, $v = null) {
|
||||||
|
return Memcached_DataObject::staticGet('Irc_waiting_message', $k, $v);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* return table definition for DB_DataObject
|
||||||
|
*
|
||||||
|
* DB_DataObject needs to know something about the table to manipulate
|
||||||
|
* instances. This method provides all the DB_DataObject needs to know.
|
||||||
|
*
|
||||||
|
* @return array array of column definitions
|
||||||
|
*/
|
||||||
|
public function table() {
|
||||||
|
return array('id' => DB_DATAOBJECT_INT + DB_DATAOBJECT_NOTNULL,
|
||||||
|
'data' => DB_DATAOBJECT_BLOB + DB_DATAOBJECT_STR + DB_DATAOBJECT_NOTNULL,
|
||||||
|
'prioritise' => DB_DATAOBJECT_INT + DB_DATAOBJECT_NOTNULL,
|
||||||
|
'created' => DB_DATAOBJECT_TIME + DB_DATAOBJECT_STR + DB_DATAOBJECT_NOTNULL,
|
||||||
|
'claimed' => DB_DATAOBJECT_TIME + DB_DATAOBJECT_STR);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* return key definitions for DB_DataObject
|
||||||
|
*
|
||||||
|
* DB_DataObject needs to know about keys that the table has, since it
|
||||||
|
* won't appear in StatusNet's own keys list. In most cases, this will
|
||||||
|
* simply reference your keyTypes() function.
|
||||||
|
*
|
||||||
|
* @return array list of key field names
|
||||||
|
*/
|
||||||
|
public function keys() {
|
||||||
|
return array_keys($this->keyTypes());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* return key definitions for Memcached_DataObject
|
||||||
|
*
|
||||||
|
* Our caching system uses the same key definitions, but uses a different
|
||||||
|
* method to get them. This key information is used to store and clear
|
||||||
|
* cached data, so be sure to list any key that will be used for static
|
||||||
|
* lookups.
|
||||||
|
*
|
||||||
|
* @return array associative array of key definitions, field name to type:
|
||||||
|
* 'K' for primary key: for compound keys, add an entry for each component;
|
||||||
|
* 'U' for unique keys: compound keys are not well supported here.
|
||||||
|
*/
|
||||||
|
public function keyTypes() {
|
||||||
|
return array('id' => 'K');
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Magic formula for non-autoincrementing integer primary keys
|
||||||
|
*
|
||||||
|
* If a table has a single integer column as its primary key, DB_DataObject
|
||||||
|
* assumes that the column is auto-incrementing and makes a sequence table
|
||||||
|
* to do this incrementation. Since we don't need this for our class, we
|
||||||
|
* overload this method and return the magic formula that DB_DataObject needs.
|
||||||
|
*
|
||||||
|
* @return array magic three-false array that stops auto-incrementing.
|
||||||
|
*/
|
||||||
|
public function sequenceKey() {
|
||||||
|
return array(false, false, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the next item in the queue
|
||||||
|
*
|
||||||
|
* @return Irc_waiting_message Next message if there is one
|
||||||
|
*/
|
||||||
|
public static function top() {
|
||||||
|
$wm = new Irc_waiting_message();
|
||||||
|
|
||||||
|
$wm->orderBy('prioritise DESC, created');
|
||||||
|
$wm->whereAdd('claimed is null');
|
||||||
|
|
||||||
|
$wm->limit(1);
|
||||||
|
|
||||||
|
$cnt = $wm->find(true);
|
||||||
|
|
||||||
|
if ($cnt) {
|
||||||
|
# XXX: potential race condition
|
||||||
|
# can we force it to only update if claimed is still null
|
||||||
|
# (or old)?
|
||||||
|
common_log(LOG_INFO, 'claiming IRC waiting message id = ' . $wm->id);
|
||||||
|
$orig = clone($wm);
|
||||||
|
$wm->claimed = common_sql_now();
|
||||||
|
$result = $wm->update($orig);
|
||||||
|
if ($result) {
|
||||||
|
common_log(LOG_INFO, 'claim succeeded.');
|
||||||
|
return $wm;
|
||||||
|
} else {
|
||||||
|
common_log(LOG_INFO, 'claim failed.');
|
||||||
|
}
|
||||||
|
}
|
||||||
|
$wm = null;
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Release a claimed item.
|
||||||
|
*/
|
||||||
|
public function releaseClaim() {
|
||||||
|
// DB_DataObject doesn't let us save nulls right now
|
||||||
|
$sql = sprintf("UPDATE irc_waiting_message SET claimed=NULL WHERE id=%d", $this->id);
|
||||||
|
$this->query($sql);
|
||||||
|
|
||||||
|
$this->claimed = null;
|
||||||
|
$this->encache();
|
||||||
|
}
|
||||||
|
}
|
@ -41,6 +41,13 @@ class Phergie_Plugin_Statusnet extends Phergie_Plugin_Abstract {
|
|||||||
*/
|
*/
|
||||||
protected $regCallback;
|
protected $regCallback;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Connection established callback details
|
||||||
|
*
|
||||||
|
* @var array
|
||||||
|
*/
|
||||||
|
protected $connectedCallback;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Load callback from config
|
* Load callback from config
|
||||||
*/
|
*/
|
||||||
@ -59,6 +66,13 @@ class Phergie_Plugin_Statusnet extends Phergie_Plugin_Abstract {
|
|||||||
$this->regCallback = NULL;
|
$this->regCallback = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
$connectedCallback = $this->config['statusnet.connectedcallback'];
|
||||||
|
if (is_callable($connectedCallback)) {
|
||||||
|
$this->connectedCallback = $connectedCallback;
|
||||||
|
} else {
|
||||||
|
$this->connectedCallback = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
$this->unregRegexp = $this->getConfig('statusnet.unregregexp', '/\x02(.*?)\x02 (?:isn\'t|is not) registered/i');
|
$this->unregRegexp = $this->getConfig('statusnet.unregregexp', '/\x02(.*?)\x02 (?:isn\'t|is not) registered/i');
|
||||||
$this->regRegexp = $this->getConfig('statusnet.regregexp', '/(?:\A|\x02)(\w+?)\x02? (?:\(account|is \w+?\z)/i');
|
$this->regRegexp = $this->getConfig('statusnet.regregexp', '/(?:\A|\x02)(\w+?)\x02? (?:\(account|is \w+?\z)/i');
|
||||||
}
|
}
|
||||||
@ -110,4 +124,20 @@ class Phergie_Plugin_Statusnet extends Phergie_Plugin_Abstract {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Intercepts the end of the "message of the day" response and tells
|
||||||
|
* StatusNet we're connected
|
||||||
|
*
|
||||||
|
* @return void
|
||||||
|
*/
|
||||||
|
public function onResponse() {
|
||||||
|
switch ($this->getEvent()->getCode()) {
|
||||||
|
case Phergie_Event_Response::RPL_ENDOFMOTD:
|
||||||
|
case Phergie_Event_Response::ERR_NOMOTD:
|
||||||
|
if ($this->connectedCallback !== NULL) {
|
||||||
|
call_user_func($this->connectedCallback);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -32,10 +32,14 @@ if (!defined('STATUSNET') && !defined('LACONICA')) { exit(1); }
|
|||||||
class IrcManager extends ImManager {
|
class IrcManager extends ImManager {
|
||||||
protected $conn = null;
|
protected $conn = null;
|
||||||
protected $lastPing = null;
|
protected $lastPing = null;
|
||||||
|
protected $messageWaiting = true;
|
||||||
|
protected $lastMessage = null;
|
||||||
|
|
||||||
protected $regChecks = array();
|
protected $regChecks = array();
|
||||||
protected $regChecksLookup = array();
|
protected $regChecksLookup = array();
|
||||||
|
|
||||||
|
protected $connected = false;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Initialize connection to server.
|
* Initialize connection to server.
|
||||||
*
|
*
|
||||||
@ -65,16 +69,38 @@ class IrcManager extends ImManager {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public function timeout() {
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Idle processing for io manager's execution loop.
|
* Idle processing for io manager's execution loop.
|
||||||
* Send keepalive pings to server.
|
|
||||||
*
|
*
|
||||||
* @return void
|
* @return void
|
||||||
*/
|
*/
|
||||||
public function idle() {
|
public function idle() {
|
||||||
|
// Send a ping if necessary
|
||||||
if (empty($this->lastPing) || time() - $this->lastPing > 120) {
|
if (empty($this->lastPing) || time() - $this->lastPing > 120) {
|
||||||
$this->sendPing();
|
$this->sendPing();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if ($this->connected) {
|
||||||
|
// Send a waiting message if appropriate
|
||||||
|
if ($this->messageWaiting && time() - $this->lastMessage > 1) {
|
||||||
|
$wm = Irc_waiting_message::top();
|
||||||
|
if ($wm === NULL) {
|
||||||
|
$this->messageWaiting = false;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
$data = unserialize($wm->data);
|
||||||
|
|
||||||
|
if (!$this->send_raw_message($data)) {
|
||||||
|
$this->plugin->enqueue_outgoing_raw($data);
|
||||||
|
}
|
||||||
|
|
||||||
|
$wm->delete();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -90,6 +116,7 @@ class IrcManager extends ImManager {
|
|||||||
try {
|
try {
|
||||||
$this->conn->handleEvents();
|
$this->conn->handleEvents();
|
||||||
} catch (Phergie_Driver_Exception $e) {
|
} catch (Phergie_Driver_Exception $e) {
|
||||||
|
$this->connected = false;
|
||||||
$this->conn->reconnect();
|
$this->conn->reconnect();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -142,6 +169,7 @@ class IrcManager extends ImManager {
|
|||||||
|
|
||||||
'statusnet.messagecallback' => array($this, 'handle_irc_message'),
|
'statusnet.messagecallback' => array($this, 'handle_irc_message'),
|
||||||
'statusnet.regcallback' => array($this, 'handle_reg_response'),
|
'statusnet.regcallback' => array($this, 'handle_reg_response'),
|
||||||
|
'statusnet.connectedcallback' => array($this, 'handle_connected'),
|
||||||
'statusnet.unregregexp' => $this->plugin->unregregexp,
|
'statusnet.unregregexp' => $this->plugin->unregregexp,
|
||||||
'statusnet.regregexp' => $this->plugin->regregexp
|
'statusnet.regregexp' => $this->plugin->regregexp
|
||||||
)
|
)
|
||||||
@ -150,6 +178,7 @@ class IrcManager extends ImManager {
|
|||||||
$this->conn->setConfig($config);
|
$this->conn->setConfig($config);
|
||||||
$this->conn->connect();
|
$this->conn->connect();
|
||||||
$this->lastPing = time();
|
$this->lastPing = time();
|
||||||
|
$this->lastMessage = time();
|
||||||
}
|
}
|
||||||
return $this->conn;
|
return $this->conn;
|
||||||
}
|
}
|
||||||
@ -211,6 +240,38 @@ class IrcManager extends ImManager {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Called when the connection is established
|
||||||
|
*
|
||||||
|
* @return void
|
||||||
|
*/
|
||||||
|
public function handle_connected() {
|
||||||
|
$this->connected = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Enters a message into the database for sending when ready
|
||||||
|
*
|
||||||
|
* @param string $command Command
|
||||||
|
* @param array $args Arguments
|
||||||
|
* @return boolean
|
||||||
|
*/
|
||||||
|
protected function enqueue_waiting_message($data) {
|
||||||
|
$wm = new Irc_waiting_message();
|
||||||
|
|
||||||
|
$wm->data = serialize($data);
|
||||||
|
$wm->prioritise = $data['prioritise'];
|
||||||
|
$wm->created = common_sql_now();
|
||||||
|
$result = $wm->insert();
|
||||||
|
|
||||||
|
if (!$result) {
|
||||||
|
common_log_db_error($wm, 'INSERT', __FILE__);
|
||||||
|
throw new ServerException('DB error inserting IRC waiting queue item');
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Send a message using the daemon
|
* Send a message using the daemon
|
||||||
*
|
*
|
||||||
@ -223,6 +284,7 @@ class IrcManager extends ImManager {
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if ($data['type'] != 'delayedmessage') {
|
||||||
if ($data['type'] != 'message') {
|
if ($data['type'] != 'message') {
|
||||||
// Nick checking
|
// Nick checking
|
||||||
$nickdata = $data['nickdata'];
|
$nickdata = $data['nickdata'];
|
||||||
@ -238,13 +300,29 @@ class IrcManager extends ImManager {
|
|||||||
$this->regChecksLookup[$usernick] = $screenname;
|
$this->regChecksLookup[$usernick] = $screenname;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If there is a backlog or we need to wait, queue the message
|
||||||
|
if ($this->messageWaiting || time() - $this->lastMessage < 1) {
|
||||||
|
$this->enqueue_waiting_message(
|
||||||
|
array(
|
||||||
|
'type' => 'delayedmessage',
|
||||||
|
'prioritise' => $data['prioritise'],
|
||||||
|
'data' => $data['data']
|
||||||
|
)
|
||||||
|
);
|
||||||
|
$this->messageWaiting = true;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
$this->conn->send($data['data']['command'], $data['data']['args']);
|
$this->conn->send($data['data']['command'], $data['data']['args']);
|
||||||
} catch (Phergie_Driver_Exception $e) {
|
} catch (Phergie_Driver_Exception $e) {
|
||||||
|
$this->connected = false;
|
||||||
$this->conn->reconnect();
|
$this->conn->reconnect();
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
$this->lastMessage = time();
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user