forked from GNUsocial/gnu-social
26fdf0c9d2
Queue handlers for XMPP individual & firehose output now send their XML stanzas to another output queue instead of connecting directly to the chat server. This lets us have as many general processing threads as we need, while all actual XMPP input and output go through a single daemon with a single connection open. This avoids problems with multiple connected resources: * multiple windows shown in some chat clients (psi, gajim, kopete) * extra load on server * incoming message delivery forwarding issues Database changes: * queue_item drops 'notice_id' in favor of a 'frame' blob. This is based on Craig Andrews' work branch to generalize queues to take any object, but conservatively leaving out the serialization for now. Table updater (preserves any existing queued items) in db/rc3to09.sql Code changes to watch out for: * Queue handlers should now define a handle() method instead of handle_notice() * QueueDaemon and XmppDaemon now share common i/o (IoMaster) and respawning thread management (RespawningDaemon) infrastructure. * The polling XmppConfirmManager has been dropped, as the message is queued directly when saving IM settings. * Enable $config['queue']['debug_memory'] to output current memory usage at each run through the event loop to watch for memory leaks To do: * Adapt XMPP i/o to component connection mode for multi-site support. * XMPP input can also be broken out to a queue, which would allow the actual notice save etc to be handled by general queue threads. * Make sure there are no problems with simply pushing serialized Notice objects to queues. * Find a way to improve interactive performance of the database-backed queue handler; polling is pretty painful to XMPP. * Possibly redo the way QueueHandlers are injected into a QueueManager. The grouping used to split out the XMPP output queue is a bit awkward.
68 lines
2.2 KiB
PHP
68 lines
2.2 KiB
PHP
<?php
|
|
/**
|
|
* Table Definition for queue_item
|
|
*/
|
|
require_once INSTALLDIR.'/classes/Memcached_DataObject.php';
|
|
|
|
class Queue_item extends Memcached_DataObject
|
|
{
|
|
###START_AUTOCODE
|
|
/* the code below is auto generated do not remove the above tag */
|
|
|
|
public $__table = 'queue_item'; // table name
|
|
public $id; // int(4) primary_key not_null
|
|
public $frame; // blob not_null
|
|
public $created; // datetime() not_null
|
|
public $claimed; // datetime()
|
|
|
|
/* Static get */
|
|
function staticGet($k,$v=null)
|
|
{ return Memcached_DataObject::staticGet('Queue_item',$k,$v); }
|
|
|
|
/* the code above is auto generated do not remove the tag below */
|
|
###END_AUTOCODE
|
|
|
|
/**
|
|
* @param mixed $transports name of a single queue or array of queues to pull from
|
|
* If not specified, checks all queues in the system.
|
|
*/
|
|
static function top($transports=null) {
|
|
|
|
$qi = new Queue_item();
|
|
if ($transports) {
|
|
if (is_array($transports)) {
|
|
// @fixme use safer escaping
|
|
$list = implode("','", array_map('addslashes', $transports));
|
|
$qi->whereAdd("transport in ('$list')");
|
|
} else {
|
|
$qi->transport = $transports;
|
|
}
|
|
}
|
|
$qi->orderBy('created');
|
|
$qi->whereAdd('claimed is null');
|
|
|
|
$qi->limit(1);
|
|
|
|
$cnt = $qi->find(true);
|
|
|
|
if ($cnt) {
|
|
# XXX: potential race condition
|
|
# can we force it to only update if claimed is still null
|
|
# (or old)?
|
|
common_log(LOG_INFO, 'claiming queue item id = ' . $qi->id .
|
|
' for transport ' . $qi->transport);
|
|
$orig = clone($qi);
|
|
$qi->claimed = common_sql_now();
|
|
$result = $qi->update($orig);
|
|
if ($result) {
|
|
common_log(LOG_INFO, 'claim succeeded.');
|
|
return $qi;
|
|
} else {
|
|
common_log(LOG_INFO, 'claim failed.');
|
|
}
|
|
}
|
|
$qi = null;
|
|
return null;
|
|
}
|
|
}
|