forked from GNUsocial/gnu-social
Will re-enable anything queueing after 0.9.x merge
Revert "Any object (not just Notice's) can be queued"
This reverts commit 77ea02cac3
.
This commit is contained in:
parent
ef7db60fed
commit
78eb9c78a7
@ -10,8 +10,7 @@ class Queue_item extends Memcached_DataObject
|
|||||||
/* the code below is auto generated do not remove the above tag */
|
/* the code below is auto generated do not remove the above tag */
|
||||||
|
|
||||||
public $__table = 'queue_item'; // table name
|
public $__table = 'queue_item'; // table name
|
||||||
public $id; // int(4) primary_key not_null
|
public $notice_id; // int(4) primary_key not_null
|
||||||
public $frame; // blob not_null
|
|
||||||
public $transport; // varchar(8) primary_key not_null
|
public $transport; // varchar(8) primary_key not_null
|
||||||
public $created; // datetime() not_null
|
public $created; // datetime() not_null
|
||||||
public $claimed; // datetime()
|
public $claimed; // datetime()
|
||||||
@ -23,6 +22,9 @@ class Queue_item extends Memcached_DataObject
|
|||||||
/* the code above is auto generated do not remove the tag below */
|
/* the code above is auto generated do not remove the tag below */
|
||||||
###END_AUTOCODE
|
###END_AUTOCODE
|
||||||
|
|
||||||
|
function sequenceKey()
|
||||||
|
{ return array(false, false); }
|
||||||
|
|
||||||
static function top($transport=null) {
|
static function top($transport=null) {
|
||||||
|
|
||||||
$qi = new Queue_item();
|
$qi = new Queue_item();
|
||||||
@ -40,7 +42,7 @@ class Queue_item extends Memcached_DataObject
|
|||||||
# XXX: potential race condition
|
# XXX: potential race condition
|
||||||
# can we force it to only update if claimed is still null
|
# can we force it to only update if claimed is still null
|
||||||
# (or old)?
|
# (or old)?
|
||||||
common_log(LOG_INFO, 'claiming queue item id=' . $qi->id .
|
common_log(LOG_INFO, 'claiming queue item = ' . $qi->notice_id .
|
||||||
' for transport ' . $qi->transport);
|
' for transport ' . $qi->transport);
|
||||||
$orig = clone($qi);
|
$orig = clone($qi);
|
||||||
$qi->claimed = common_sql_now();
|
$qi->claimed = common_sql_now();
|
||||||
@ -55,4 +57,9 @@ class Queue_item extends Memcached_DataObject
|
|||||||
$qi = null;
|
$qi = null;
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function pkeyGet($kv)
|
||||||
|
{
|
||||||
|
return Memcached_DataObject::pkeyGet('Queue_item', $kv);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -428,14 +428,14 @@ tagged = K
|
|||||||
tag = K
|
tag = K
|
||||||
|
|
||||||
[queue_item]
|
[queue_item]
|
||||||
id = 129
|
notice_id = 129
|
||||||
frame = 66
|
|
||||||
transport = 130
|
transport = 130
|
||||||
created = 142
|
created = 142
|
||||||
claimed = 14
|
claimed = 14
|
||||||
|
|
||||||
[queue_item__keys]
|
[queue_item__keys]
|
||||||
id = K
|
notice_id = K
|
||||||
|
transport = K
|
||||||
|
|
||||||
[related_group]
|
[related_group]
|
||||||
group_id = 129
|
group_id = 129
|
||||||
|
@ -274,12 +274,13 @@ create table remember_me (
|
|||||||
) ENGINE=InnoDB CHARACTER SET utf8 COLLATE utf8_bin;
|
) ENGINE=InnoDB CHARACTER SET utf8 COLLATE utf8_bin;
|
||||||
|
|
||||||
create table queue_item (
|
create table queue_item (
|
||||||
id integer auto_increment primary key comment 'unique identifier',
|
|
||||||
frame blob not null comment 'serialized object',
|
notice_id integer not null comment 'notice queued' references notice (id),
|
||||||
transport varchar(8) not null comment 'queue for what? "email", "jabber", "sms", "irc", ...',
|
transport varchar(8) not null comment 'queue for what? "email", "jabber", "sms", "irc", ...',
|
||||||
created datetime not null comment 'date this record was created',
|
created datetime not null comment 'date this record was created',
|
||||||
claimed datetime comment 'date this item was claimed',
|
claimed datetime comment 'date this item was claimed',
|
||||||
|
|
||||||
|
constraint primary key (notice_id, transport),
|
||||||
index queue_item_created_idx (created)
|
index queue_item_created_idx (created)
|
||||||
|
|
||||||
) ENGINE=InnoDB CHARACTER SET utf8 COLLATE utf8_bin;
|
) ENGINE=InnoDB CHARACTER SET utf8 COLLATE utf8_bin;
|
||||||
|
@ -31,17 +31,19 @@
|
|||||||
class DBQueueManager extends QueueManager
|
class DBQueueManager extends QueueManager
|
||||||
{
|
{
|
||||||
/**
|
/**
|
||||||
* Saves an object into the queue item table.
|
* Saves a notice object reference into the queue item table.
|
||||||
* @return boolean true on success
|
* @return boolean true on success
|
||||||
* @throws ServerException on failure
|
* @throws ServerException on failure
|
||||||
*/
|
*/
|
||||||
public function enqueue($object, $queue)
|
public function enqueue($object, $queue)
|
||||||
{
|
{
|
||||||
|
$notice = $object;
|
||||||
|
|
||||||
$qi = new Queue_item();
|
$qi = new Queue_item();
|
||||||
|
|
||||||
$qi->frame = serialize($object);
|
$qi->notice_id = $notice->id;
|
||||||
$qi->transport = $queue;
|
$qi->transport = $queue;
|
||||||
$qi->created = common_sql_now();
|
$qi->created = $notice->created;
|
||||||
$result = $qi->insert();
|
$result = $qi->insert();
|
||||||
|
|
||||||
if (!$result) {
|
if (!$result) {
|
||||||
@ -71,35 +73,34 @@ class DBQueueManager extends QueueManager
|
|||||||
*/
|
*/
|
||||||
public function poll()
|
public function poll()
|
||||||
{
|
{
|
||||||
$this->_log(LOG_DEBUG, 'Checking for queued objects...');
|
$this->_log(LOG_DEBUG, 'Checking for notices...');
|
||||||
$qi = $this->_nextItem();
|
$item = $this->_nextItem();
|
||||||
if ($qi === false) {
|
if ($item === false) {
|
||||||
$this->_log(LOG_DEBUG, 'No queue items waiting; idling.');
|
$this->_log(LOG_DEBUG, 'No notices waiting; idling.');
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
if ($qi === true) {
|
if ($item === true) {
|
||||||
// We dequeued an entry for a deleted or invalid object.
|
// We dequeued an entry for a deleted or invalid notice.
|
||||||
// Consider it a hit for poll rate purposes.
|
// Consider it a hit for poll rate purposes.
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
$queue = $qi->transport;
|
list($queue, $notice) = $item;
|
||||||
$object = unserialize($qi->frame);
|
$this->_log(LOG_INFO, 'Got notice '. $notice->id . ' for transport ' . $queue);
|
||||||
$this->_log(LOG_INFO, 'Got item id=' . $qi->id . ' for transport ' . $queue);
|
|
||||||
|
|
||||||
// Yay! Got one!
|
// Yay! Got one!
|
||||||
$handler = $this->getHandler($queue);
|
$handler = $this->getHandler($queue);
|
||||||
if ($handler) {
|
if ($handler) {
|
||||||
if ($handler->handle($object)) {
|
if ($handler->handle_notice($notice)) {
|
||||||
$this->_log(LOG_INFO, "[$queue] Successfully handled object");
|
$this->_log(LOG_INFO, "[$queue:notice $notice->id] Successfully handled notice");
|
||||||
$this->_done($qi);
|
$this->_done($notice, $queue);
|
||||||
} else {
|
} else {
|
||||||
$this->_log(LOG_INFO, "[$queue] Failed to handle object");
|
$this->_log(LOG_INFO, "[$queue:notice $notice->id] Failed to handle notice");
|
||||||
$this->_fail($qi);
|
$this->_fail($notice, $queue);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
$this->_log(LOG_INFO, "[$queue] No handler for queue $queue; discarding.");
|
$this->_log(LOG_INFO, "[$queue:notice $notice->id] No handler for queue $queue; discarding.");
|
||||||
$this->_done($qi);
|
$this->_done($notice, $queue);
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
@ -107,7 +108,8 @@ class DBQueueManager extends QueueManager
|
|||||||
/**
|
/**
|
||||||
* Pop the oldest unclaimed item off the queue set and claim it.
|
* Pop the oldest unclaimed item off the queue set and claim it.
|
||||||
*
|
*
|
||||||
* @return mixed false if no items; true if bogus hit; otherwise Queue_item
|
* @return mixed false if no items; true if bogus hit; otherwise array(string, Notice)
|
||||||
|
* giving the queue transport name.
|
||||||
*/
|
*/
|
||||||
protected function _nextItem()
|
protected function _nextItem()
|
||||||
{
|
{
|
||||||
@ -119,42 +121,70 @@ class DBQueueManager extends QueueManager
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
return $qi;
|
$queue = $qi->transport;
|
||||||
|
$notice = Notice::staticGet('id', $qi->notice_id);
|
||||||
|
if (empty($notice)) {
|
||||||
|
$this->_log(LOG_INFO, "[$queue:notice $notice->id] dequeued non-existent notice");
|
||||||
|
$qi->delete();
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
$result = $notice;
|
||||||
|
return array($queue, $notice);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Delete our claimed item from the queue after successful processing.
|
* Delete our claimed item from the queue after successful processing.
|
||||||
*
|
*
|
||||||
* @param QueueItem $qi
|
* @param Notice $object
|
||||||
|
* @param string $queue
|
||||||
*/
|
*/
|
||||||
protected function _done($qi)
|
protected function _done($object, $queue)
|
||||||
{
|
{
|
||||||
|
// XXX: right now, we only handle notices
|
||||||
|
|
||||||
|
$notice = $object;
|
||||||
|
|
||||||
|
$qi = Queue_item::pkeyGet(array('notice_id' => $notice->id,
|
||||||
|
'transport' => $queue));
|
||||||
|
|
||||||
if (empty($qi)) {
|
if (empty($qi)) {
|
||||||
$this->_log(LOG_INFO, "_done passed an empty queue item");
|
$this->_log(LOG_INFO, "[$queue:notice $notice->id] Cannot find queue item");
|
||||||
} else {
|
} else {
|
||||||
if (empty($qi->claimed)) {
|
if (empty($qi->claimed)) {
|
||||||
$this->_log(LOG_WARNING, "Reluctantly releasing unclaimed queue item");
|
$this->_log(LOG_WARNING, "[$queue:notice $notice->id] Reluctantly releasing unclaimed queue item");
|
||||||
}
|
}
|
||||||
$qi->delete();
|
$qi->delete();
|
||||||
$qi->free();
|
$qi->free();
|
||||||
}
|
}
|
||||||
|
|
||||||
$this->_log(LOG_INFO, "done with item");
|
$this->_log(LOG_INFO, "[$queue:notice $notice->id] done with item");
|
||||||
|
$this->stats('handled', $queue);
|
||||||
|
|
||||||
|
$notice->free();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Free our claimed queue item for later reprocessing in case of
|
* Free our claimed queue item for later reprocessing in case of
|
||||||
* temporary failure.
|
* temporary failure.
|
||||||
*
|
*
|
||||||
* @param QueueItem $qi
|
* @param Notice $object
|
||||||
|
* @param string $queue
|
||||||
*/
|
*/
|
||||||
protected function _fail($qi)
|
protected function _fail($object, $queue)
|
||||||
{
|
{
|
||||||
|
// XXX: right now, we only handle notices
|
||||||
|
|
||||||
|
$notice = $object;
|
||||||
|
|
||||||
|
$qi = Queue_item::pkeyGet(array('notice_id' => $notice->id,
|
||||||
|
'transport' => $queue));
|
||||||
|
|
||||||
if (empty($qi)) {
|
if (empty($qi)) {
|
||||||
$this->_log(LOG_INFO, "_fail passed an empty queue item");
|
$this->_log(LOG_INFO, "[$queue:notice $notice->id] Cannot find queue item");
|
||||||
} else {
|
} else {
|
||||||
if (empty($qi->claimed)) {
|
if (empty($qi->claimed)) {
|
||||||
$this->_log(LOG_WARNING, "Ignoring failure for unclaimed queue item");
|
$this->_log(LOG_WARNING, "[$queue:notice $notice->id] Ignoring failure for unclaimed queue item");
|
||||||
} else {
|
} else {
|
||||||
$orig = clone($qi);
|
$orig = clone($qi);
|
||||||
$qi->claimed = null;
|
$qi->claimed = null;
|
||||||
@ -163,7 +193,10 @@ class DBQueueManager extends QueueManager
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
$this->_log(LOG_INFO, "done with queue item");
|
$this->_log(LOG_INFO, "[$queue:notice $notice->id] done with queue item");
|
||||||
|
$this->stats('error', $queue);
|
||||||
|
|
||||||
|
$notice->free();
|
||||||
}
|
}
|
||||||
|
|
||||||
protected function _log($level, $msg)
|
protected function _log($level, $msg)
|
||||||
|
@ -34,14 +34,14 @@ class JabberQueueHandler extends QueueHandler
|
|||||||
return 'jabber';
|
return 'jabber';
|
||||||
}
|
}
|
||||||
|
|
||||||
function handle($notice)
|
function handle_notice($notice)
|
||||||
{
|
{
|
||||||
require_once(INSTALLDIR.'/lib/jabber.php');
|
require_once(INSTALLDIR.'/lib/jabber.php');
|
||||||
try {
|
try {
|
||||||
return jabber_broadcast_notice($notice);
|
return jabber_broadcast_notice($notice);
|
||||||
} catch (XMPPHP_Exception $e) {
|
} catch (XMPPHP_Exception $e) {
|
||||||
$this->log(LOG_ERR, "Got an XMPPHP_Exception: " . $e->getMessage());
|
$this->log(LOG_ERR, "Got an XMPPHP_Exception: " . $e->getMessage());
|
||||||
return false;
|
exit(1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -36,7 +36,7 @@ class OmbQueueHandler extends QueueHandler
|
|||||||
* @fixme doesn't currently report failure back to the queue manager
|
* @fixme doesn't currently report failure back to the queue manager
|
||||||
* because omb_broadcast_notice() doesn't report it to us
|
* because omb_broadcast_notice() doesn't report it to us
|
||||||
*/
|
*/
|
||||||
function handle($notice)
|
function handle_notice($notice)
|
||||||
{
|
{
|
||||||
if ($this->is_remote($notice)) {
|
if ($this->is_remote($notice)) {
|
||||||
$this->log(LOG_DEBUG, 'Ignoring remote notice ' . $notice->id);
|
$this->log(LOG_DEBUG, 'Ignoring remote notice ' . $notice->id);
|
||||||
|
@ -30,7 +30,7 @@ class PingQueueHandler extends QueueHandler {
|
|||||||
return 'ping';
|
return 'ping';
|
||||||
}
|
}
|
||||||
|
|
||||||
function handle($notice) {
|
function handle_notice($notice) {
|
||||||
require_once INSTALLDIR . '/lib/ping.php';
|
require_once INSTALLDIR . '/lib/ping.php';
|
||||||
return ping_broadcast_notice($notice);
|
return ping_broadcast_notice($notice);
|
||||||
}
|
}
|
||||||
|
@ -42,7 +42,7 @@ class PluginQueueHandler extends QueueHandler
|
|||||||
return 'plugin';
|
return 'plugin';
|
||||||
}
|
}
|
||||||
|
|
||||||
function handle($notice)
|
function handle_notice($notice)
|
||||||
{
|
{
|
||||||
Event::handle('HandleQueuedNotice', array(&$notice));
|
Event::handle('HandleQueuedNotice', array(&$notice));
|
||||||
return true;
|
return true;
|
||||||
|
@ -23,6 +23,7 @@ if (!defined('STATUSNET') && !defined('LACONICA')) {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Queue handler for pushing new notices to public XMPP subscribers.
|
* Queue handler for pushing new notices to public XMPP subscribers.
|
||||||
|
* @fixme correct this exception handling
|
||||||
*/
|
*/
|
||||||
class PublicQueueHandler extends QueueHandler
|
class PublicQueueHandler extends QueueHandler
|
||||||
{
|
{
|
||||||
@ -32,14 +33,15 @@ class PublicQueueHandler extends QueueHandler
|
|||||||
return 'public';
|
return 'public';
|
||||||
}
|
}
|
||||||
|
|
||||||
function handle($notice)
|
function handle_notice($notice)
|
||||||
{
|
{
|
||||||
require_once(INSTALLDIR.'/lib/jabber.php');
|
require_once(INSTALLDIR.'/lib/jabber.php');
|
||||||
try {
|
try {
|
||||||
return jabber_public_notice($notice);
|
return jabber_public_notice($notice);
|
||||||
} catch (XMPPHP_Exception $e) {
|
} catch (XMPPHP_Exception $e) {
|
||||||
$this->log(LOG_ERR, "Got an XMPPHP_Exception: " . $e->getMessage());
|
$this->log(LOG_ERR, "Got an XMPPHP_Exception: " . $e->getMessage());
|
||||||
return false;
|
die($e->getMessage());
|
||||||
}
|
}
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -22,20 +22,51 @@ if (!defined('STATUSNET') && !defined('LACONICA')) { exit(1); }
|
|||||||
/**
|
/**
|
||||||
* Base class for queue handlers.
|
* Base class for queue handlers.
|
||||||
*
|
*
|
||||||
* As of 0.9, queue handlers are short-lived for items as they are
|
* As extensions of the Daemon class, each queue handler has the ability
|
||||||
* dequeued by a QueueManager running in an IoMaster in a daemon
|
* to launch itself in the background, at which point it'll pass control
|
||||||
* such as queuedaemon.php.
|
* to the configured QueueManager class to poll for updates.
|
||||||
*
|
|
||||||
* Extensions requiring long-running maintenance or polling should
|
|
||||||
* register an IoManager.
|
|
||||||
*
|
*
|
||||||
* Subclasses must override at least the following methods:
|
* Subclasses must override at least the following methods:
|
||||||
* - transport
|
* - transport
|
||||||
* - handle
|
* - handle_notice
|
||||||
*/
|
*/
|
||||||
|
#class QueueHandler extends Daemon
|
||||||
class QueueHandler
|
class QueueHandler
|
||||||
{
|
{
|
||||||
|
|
||||||
|
# function __construct($id=null, $daemonize=true)
|
||||||
|
# {
|
||||||
|
# parent::__construct($daemonize);
|
||||||
|
#
|
||||||
|
# if ($id) {
|
||||||
|
# $this->set_id($id);
|
||||||
|
# }
|
||||||
|
# }
|
||||||
|
|
||||||
|
/**
|
||||||
|
* How many seconds a polling-based queue manager should wait between
|
||||||
|
* checks for new items to handle.
|
||||||
|
*
|
||||||
|
* Defaults to 60 seconds; override to speed up or slow down.
|
||||||
|
*
|
||||||
|
* @fixme not really compatible with global queue manager
|
||||||
|
* @return int timeout in seconds
|
||||||
|
*/
|
||||||
|
# function timeout()
|
||||||
|
# {
|
||||||
|
# return 60;
|
||||||
|
# }
|
||||||
|
|
||||||
|
# function class_name()
|
||||||
|
# {
|
||||||
|
# return ucfirst($this->transport()) . 'Handler';
|
||||||
|
# }
|
||||||
|
|
||||||
|
# function name()
|
||||||
|
# {
|
||||||
|
# return strtolower($this->class_name().'.'.$this->get_id());
|
||||||
|
# }
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return transport keyword which identifies items this queue handler
|
* Return transport keyword which identifies items this queue handler
|
||||||
* services; must be defined for all subclasses.
|
* services; must be defined for all subclasses.
|
||||||
@ -52,17 +83,61 @@ class QueueHandler
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Here's the meat of your queue handler -- you're handed a Notice
|
* Here's the meat of your queue handler -- you're handed a Notice
|
||||||
* or other object, which you may do as you will with.
|
* object, which you may do as you will with.
|
||||||
*
|
*
|
||||||
* If this function indicates failure, a warning will be logged
|
* If this function indicates failure, a warning will be logged
|
||||||
* and the item is placed back in the queue to be re-run.
|
* and the item is placed back in the queue to be re-run.
|
||||||
*
|
*
|
||||||
* @param mixed $object
|
* @param Notice $notice
|
||||||
* @return boolean true on success, false on failure
|
* @return boolean true on success, false on failure
|
||||||
*/
|
*/
|
||||||
function handle($object)
|
function handle_notice($notice)
|
||||||
{
|
{
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Setup and start of run loop for this queue handler as a daemon.
|
||||||
|
* Most of the heavy lifting is passed on to the QueueManager's service()
|
||||||
|
* method, which passes control back to our handle_notice() method for
|
||||||
|
* each notice that comes in on the queue.
|
||||||
|
*
|
||||||
|
* Most of the time this won't need to be overridden in a subclass.
|
||||||
|
*
|
||||||
|
* @return boolean true on success, false on failure
|
||||||
|
*/
|
||||||
|
function run()
|
||||||
|
{
|
||||||
|
if (!$this->start()) {
|
||||||
|
$this->log(LOG_WARNING, 'failed to start');
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
$this->log(LOG_INFO, 'checking for queued notices');
|
||||||
|
|
||||||
|
$queue = $this->transport();
|
||||||
|
$timeout = $this->timeout();
|
||||||
|
|
||||||
|
$qm = QueueManager::get();
|
||||||
|
|
||||||
|
$qm->service($queue, $this);
|
||||||
|
|
||||||
|
$this->log(LOG_INFO, 'finished servicing the queue');
|
||||||
|
|
||||||
|
if (!$this->finish()) {
|
||||||
|
$this->log(LOG_WARNING, 'failed to clean up');
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
$this->log(LOG_INFO, 'terminating normally');
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
function log($level, $msg)
|
||||||
|
{
|
||||||
|
common_log($level, $this->class_name() . ' ('. $this->get_id() .'): '.$msg);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -31,7 +31,7 @@ class SmsQueueHandler extends QueueHandler
|
|||||||
return 'sms';
|
return 'sms';
|
||||||
}
|
}
|
||||||
|
|
||||||
function handle($notice)
|
function handle_notice($notice)
|
||||||
{
|
{
|
||||||
require_once(INSTALLDIR.'/lib/mail.php');
|
require_once(INSTALLDIR.'/lib/mail.php');
|
||||||
return mail_broadcast_notice_sms($notice);
|
return mail_broadcast_notice_sms($notice);
|
||||||
|
@ -125,25 +125,28 @@ class StompQueueManager extends QueueManager
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Saves an object into the queue item table.
|
* Saves a notice object reference into the queue item table.
|
||||||
* @return boolean true on success
|
* @return boolean true on success
|
||||||
*/
|
*/
|
||||||
public function enqueue($object, $queue)
|
public function enqueue($object, $queue)
|
||||||
{
|
{
|
||||||
$msg = serialize($object);
|
$notice = $object;
|
||||||
|
|
||||||
$this->_connect();
|
$this->_connect();
|
||||||
|
|
||||||
|
// XXX: serialize and send entire notice
|
||||||
|
|
||||||
$result = $this->con->send($this->queueName($queue),
|
$result = $this->con->send($this->queueName($queue),
|
||||||
$msg, // BODY of the message
|
$notice->id, // BODY of the message
|
||||||
array ('created' => $timestamp));
|
array ('created' => $notice->created));
|
||||||
|
|
||||||
if (!$result) {
|
if (!$result) {
|
||||||
common_log(LOG_ERR, 'Error sending to '.$queue.' queue');
|
common_log(LOG_ERR, 'Error sending to '.$queue.' queue');
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
common_log(LOG_DEBUG, "complete remote queueing $log for $queue");
|
common_log(LOG_DEBUG, 'complete remote queueing notice ID = '
|
||||||
|
. $notice->id . ' for ' . $queue);
|
||||||
$this->stats('enqueued', $queue);
|
$this->stats('enqueued', $queue);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -171,7 +174,7 @@ class StompQueueManager extends QueueManager
|
|||||||
$ok = true;
|
$ok = true;
|
||||||
$frames = $this->con->readFrames();
|
$frames = $this->con->readFrames();
|
||||||
foreach ($frames as $frame) {
|
foreach ($frames as $frame) {
|
||||||
$ok = $ok && $this->_handleItem($frame);
|
$ok = $ok && $this->_handleNotice($frame);
|
||||||
}
|
}
|
||||||
return $ok;
|
return $ok;
|
||||||
}
|
}
|
||||||
@ -262,10 +265,10 @@ class StompQueueManager extends QueueManager
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Handle and acknowledge an event that's come in through a queue.
|
* Handle and acknowledge a notice event that's come in through a queue.
|
||||||
*
|
*
|
||||||
* If the queue handler reports failure, the message is requeued for later.
|
* If the queue handler reports failure, the message is requeued for later.
|
||||||
* Missing objects or handler classes will drop the message.
|
* Missing notices or handler classes will drop the message.
|
||||||
*
|
*
|
||||||
* Side effects: in multi-site mode, may reset site configuration to
|
* Side effects: in multi-site mode, may reset site configuration to
|
||||||
* match the site that queued the event.
|
* match the site that queued the event.
|
||||||
@ -273,15 +276,24 @@ class StompQueueManager extends QueueManager
|
|||||||
* @param StompFrame $frame
|
* @param StompFrame $frame
|
||||||
* @return bool
|
* @return bool
|
||||||
*/
|
*/
|
||||||
protected function _handleItem($frame)
|
protected function _handleNotice($frame)
|
||||||
{
|
{
|
||||||
list($site, $queue) = $this->parseDestination($frame->headers['destination']);
|
list($site, $queue) = $this->parseDestination($frame->headers['destination']);
|
||||||
if ($site != common_config('site', 'server')) {
|
if ($site != common_config('site', 'server')) {
|
||||||
$this->stats('switch');
|
$this->stats('switch');
|
||||||
StatusNet::init($site);
|
StatusNet::init($site);
|
||||||
}
|
}
|
||||||
$info = "object posted at {$frame->headers['created']} in queue $queue";
|
|
||||||
$item = unserialize($frame->body);
|
$id = intval($frame->body);
|
||||||
|
$info = "notice $id posted at {$frame->headers['created']} in queue $queue";
|
||||||
|
|
||||||
|
$notice = Notice::staticGet('id', $id);
|
||||||
|
if (empty($notice)) {
|
||||||
|
$this->_log(LOG_WARNING, "Skipping missing $info");
|
||||||
|
$this->con->ack($frame);
|
||||||
|
$this->stats('badnotice', $queue);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
$handler = $this->getHandler($queue);
|
$handler = $this->getHandler($queue);
|
||||||
if (!$handler) {
|
if (!$handler) {
|
||||||
@ -291,7 +303,7 @@ class StompQueueManager extends QueueManager
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
$ok = $handler->handle($item);
|
$ok = $handler->handle_notice($notice);
|
||||||
|
|
||||||
if (!$ok) {
|
if (!$ok) {
|
||||||
$this->_log(LOG_WARNING, "Failed handling $info");
|
$this->_log(LOG_WARNING, "Failed handling $info");
|
||||||
@ -299,7 +311,7 @@ class StompQueueManager extends QueueManager
|
|||||||
// this kind of queue management ourselves;
|
// this kind of queue management ourselves;
|
||||||
// if we don't ack, it should resend...
|
// if we don't ack, it should resend...
|
||||||
$this->con->ack($frame);
|
$this->con->ack($frame);
|
||||||
$this->enqueue($item, $queue);
|
$this->enqueue($notice, $queue);
|
||||||
$this->stats('requeued', $queue);
|
$this->stats('requeued', $queue);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -175,30 +175,6 @@ class XmppManager extends IoManager
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* For queue handlers to pass us a message to push out,
|
|
||||||
* if we're active.
|
|
||||||
*
|
|
||||||
* @fixme should this be blocking etc?
|
|
||||||
*
|
|
||||||
* @param string $msg XML stanza to send
|
|
||||||
* @return boolean success
|
|
||||||
*/
|
|
||||||
public function send($msg)
|
|
||||||
{
|
|
||||||
if ($this->conn && !$this->conn->isDisconnected()) {
|
|
||||||
$bytes = $this->conn->send($msg);
|
|
||||||
if ($bytes > 0) {
|
|
||||||
return true;
|
|
||||||
} else {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// Can't send right now...
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Send a keepalive ping to the XMPP server.
|
* Send a keepalive ping to the XMPP server.
|
||||||
*/
|
*/
|
||||||
|
@ -32,7 +32,14 @@ class EnjitQueueHandler extends QueueHandler
|
|||||||
return 'enjit';
|
return 'enjit';
|
||||||
}
|
}
|
||||||
|
|
||||||
function handle($notice)
|
function start()
|
||||||
|
{
|
||||||
|
$this->log(LOG_INFO, "Starting EnjitQueueHandler");
|
||||||
|
$this->log(LOG_INFO, "Broadcasting to ".common_config('enjit', 'apiurl'));
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
function handle_notice($notice)
|
||||||
{
|
{
|
||||||
|
|
||||||
$profile = Profile::staticGet($notice->profile_id);
|
$profile = Profile::staticGet($notice->profile_id);
|
||||||
|
@ -28,7 +28,7 @@ class FacebookQueueHandler extends QueueHandler
|
|||||||
return 'facebook';
|
return 'facebook';
|
||||||
}
|
}
|
||||||
|
|
||||||
function handle($notice)
|
function handle_notice($notice)
|
||||||
{
|
{
|
||||||
if ($this->_isLocal($notice)) {
|
if ($this->_isLocal($notice)) {
|
||||||
return facebookBroadcastNotice($notice);
|
return facebookBroadcastNotice($notice);
|
||||||
|
@ -138,9 +138,6 @@ class RSSCloudPlugin extends Plugin
|
|||||||
case 'RSSCloudNotifier':
|
case 'RSSCloudNotifier':
|
||||||
include_once INSTALLDIR . '/plugins/RSSCloud/RSSCloudNotifier.php';
|
include_once INSTALLDIR . '/plugins/RSSCloud/RSSCloudNotifier.php';
|
||||||
return false;
|
return false;
|
||||||
case 'RSSCloudQueueHandler':
|
|
||||||
include_once INSTALLDIR . '/plugins/RSSCloud/RSSCloudQueueHandler.php';
|
|
||||||
return false;
|
|
||||||
case 'RSSCloudRequestNotifyAction':
|
case 'RSSCloudRequestNotifyAction':
|
||||||
case 'LoggingAggregatorAction':
|
case 'LoggingAggregatorAction':
|
||||||
include_once INSTALLDIR . '/plugins/RSSCloud/' .
|
include_once INSTALLDIR . '/plugins/RSSCloud/' .
|
||||||
@ -196,6 +193,32 @@ class RSSCloudPlugin extends Plugin
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* broadcast the message when not using queuehandler
|
||||||
|
*
|
||||||
|
* @param Notice &$notice the notice
|
||||||
|
* @param array $queue destination queue
|
||||||
|
*
|
||||||
|
* @return boolean hook return
|
||||||
|
*/
|
||||||
|
|
||||||
|
function onUnqueueHandleNotice(&$notice, $queue)
|
||||||
|
{
|
||||||
|
if (($queue == 'rsscloud') && ($this->_isLocal($notice))) {
|
||||||
|
|
||||||
|
common_debug('broadcasting rssCloud bound notice ' . $notice->id);
|
||||||
|
|
||||||
|
$profile = $notice->getProfile();
|
||||||
|
|
||||||
|
$notifier = new RSSCloudNotifier();
|
||||||
|
$notifier->notify($profile);
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Determine whether the notice was locally created
|
* Determine whether the notice was locally created
|
||||||
*
|
*
|
||||||
@ -238,15 +261,19 @@ class RSSCloudPlugin extends Plugin
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Register RSSCloud notice queue handler
|
* Add RSSCloudQueueHandler to the list of valid daemons to
|
||||||
|
* start
|
||||||
*
|
*
|
||||||
* @param QueueManager $manager
|
* @param array $daemons the list of daemons to run
|
||||||
*
|
*
|
||||||
* @return boolean hook return
|
* @return boolean hook return
|
||||||
|
*
|
||||||
*/
|
*/
|
||||||
function onEndInitializeQueueManager($manager)
|
|
||||||
|
function onGetValidDaemons($daemons)
|
||||||
{
|
{
|
||||||
$manager->connect('rsscloud', 'RSSCloudQueueHandler');
|
array_push($daemons, INSTALLDIR .
|
||||||
|
'/plugins/RSSCloud/RSSCloudQueueHandler.php');
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
50
plugins/RSSCloud/RSSCloudQueueHandler.php
Normal file → Executable file
50
plugins/RSSCloud/RSSCloudQueueHandler.php
Normal file → Executable file
@ -1,3 +1,4 @@
|
|||||||
|
#!/usr/bin/env php
|
||||||
<?php
|
<?php
|
||||||
/*
|
/*
|
||||||
* StatusNet - the distributed open-source microblogging tool
|
* StatusNet - the distributed open-source microblogging tool
|
||||||
@ -17,20 +18,61 @@
|
|||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
if (!defined('STATUSNET') && !defined('LACONICA')) { exit(1); }
|
define('INSTALLDIR', realpath(dirname(__FILE__) . '/../..'));
|
||||||
|
|
||||||
|
$shortoptions = 'i::';
|
||||||
|
$longoptions = array('id::');
|
||||||
|
|
||||||
|
$helptext = <<<END_OF_ENJIT_HELP
|
||||||
|
Daemon script for pushing new notices to RSSCloud subscribers.
|
||||||
|
|
||||||
|
-i --id Identity (default none)
|
||||||
|
|
||||||
|
END_OF_ENJIT_HELP;
|
||||||
|
|
||||||
|
require_once INSTALLDIR . '/scripts/commandline.inc';
|
||||||
|
require_once INSTALLDIR . '/lib/queuehandler.php';
|
||||||
|
require_once INSTALLDIR . '/plugins/RSSCloud/RSSCloudNotifier.php';
|
||||||
|
require_once INSTALLDIR . '/plugins/RSSCloud/RSSCloudSubscription.php';
|
||||||
|
|
||||||
class RSSCloudQueueHandler extends QueueHandler
|
class RSSCloudQueueHandler extends QueueHandler
|
||||||
{
|
{
|
||||||
|
var $notifier = null;
|
||||||
|
|
||||||
function transport()
|
function transport()
|
||||||
{
|
{
|
||||||
return 'rsscloud';
|
return 'rsscloud';
|
||||||
}
|
}
|
||||||
|
|
||||||
function handle($notice)
|
function start()
|
||||||
|
{
|
||||||
|
$this->log(LOG_INFO, "INITIALIZE");
|
||||||
|
$this->notifier = new RSSCloudNotifier();
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
function handle_notice($notice)
|
||||||
{
|
{
|
||||||
$profile = $notice->getProfile();
|
$profile = $notice->getProfile();
|
||||||
$notifier = new RSSCloudNotifier();
|
return $this->notifier->notify($profile);
|
||||||
return $notifier->notify($profile);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function finish()
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (have_option('i')) {
|
||||||
|
$id = get_option_value('i');
|
||||||
|
} else if (have_option('--id')) {
|
||||||
|
$id = get_option_value('--id');
|
||||||
|
} else if (count($args) > 0) {
|
||||||
|
$id = $args[0];
|
||||||
|
} else {
|
||||||
|
$id = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
$handler = new RSSCloudQueueHandler($id);
|
||||||
|
|
||||||
|
$handler->runOnce();
|
||||||
|
@ -28,7 +28,7 @@ class TwitterQueueHandler extends QueueHandler
|
|||||||
return 'twitter';
|
return 'twitter';
|
||||||
}
|
}
|
||||||
|
|
||||||
function handle($notice)
|
function handle_notice($notice)
|
||||||
{
|
{
|
||||||
return broadcast_twitter($notice);
|
return broadcast_twitter($notice);
|
||||||
}
|
}
|
||||||
|
@ -50,7 +50,7 @@ if (empty($notice)) {
|
|||||||
exit(1);
|
exit(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!$handler->handle($notice)) {
|
if (!$handler->handle_notice($notice)) {
|
||||||
print "Failed to handle notice id $noticeId on queue '$queue'.\n";
|
print "Failed to handle notice id $noticeId on queue '$queue'.\n";
|
||||||
exit(1);
|
exit(1);
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user