Any object (not just Notice's) can be queued

This commit is contained in:
Craig Andrews
2010-01-20 16:31:48 -05:00
parent 6a93cb2985
commit 77ea02cac3
19 changed files with 109 additions and 291 deletions

View File

@@ -125,28 +125,25 @@ class StompQueueManager extends QueueManager
}
/**
* Saves a notice object reference into the queue item table.
* Saves an object into the queue item table.
* @return boolean true on success
*/
public function enqueue($object, $queue)
{
$notice = $object;
$msg = serialize($object);
$this->_connect();
// XXX: serialize and send entire notice
$result = $this->con->send($this->queueName($queue),
$notice->id, // BODY of the message
array ('created' => $notice->created));
$msg, // BODY of the message
array ('created' => $timestamp));
if (!$result) {
common_log(LOG_ERR, 'Error sending to '.$queue.' queue');
return false;
}
common_log(LOG_DEBUG, 'complete remote queueing notice ID = '
. $notice->id . ' for ' . $queue);
common_log(LOG_DEBUG, "complete remote queueing $log for $queue");
$this->stats('enqueued', $queue);
}
@@ -174,7 +171,7 @@ class StompQueueManager extends QueueManager
$ok = true;
$frames = $this->con->readFrames();
foreach ($frames as $frame) {
$ok = $ok && $this->_handleNotice($frame);
$ok = $ok && $this->_handleItem($frame);
}
return $ok;
}
@@ -265,10 +262,10 @@ class StompQueueManager extends QueueManager
}
/**
* Handle and acknowledge a notice event that's come in through a queue.
* Handle and acknowledge an event that's come in through a queue.
*
* If the queue handler reports failure, the message is requeued for later.
* Missing notices or handler classes will drop the message.
* Missing objects or handler classes will drop the message.
*
* Side effects: in multi-site mode, may reset site configuration to
* match the site that queued the event.
@@ -276,24 +273,15 @@ class StompQueueManager extends QueueManager
* @param StompFrame $frame
* @return bool
*/
protected function _handleNotice($frame)
protected function _handleItem($frame)
{
list($site, $queue) = $this->parseDestination($frame->headers['destination']);
if ($site != common_config('site', 'server')) {
$this->stats('switch');
StatusNet::init($site);
}
$id = intval($frame->body);
$info = "notice $id posted at {$frame->headers['created']} in queue $queue";
$notice = Notice::staticGet('id', $id);
if (empty($notice)) {
$this->_log(LOG_WARNING, "Skipping missing $info");
$this->con->ack($frame);
$this->stats('badnotice', $queue);
return false;
}
$info = "object posted at {$frame->headers['created']} in queue $queue";
$item = unserialize($frame->body);
$handler = $this->getHandler($queue);
if (!$handler) {
@@ -303,7 +291,7 @@ class StompQueueManager extends QueueManager
return false;
}
$ok = $handler->handle_notice($notice);
$ok = $handler->handle($item);
if (!$ok) {
$this->_log(LOG_WARNING, "Failed handling $info");
@@ -311,7 +299,7 @@ class StompQueueManager extends QueueManager
// this kind of queue management ourselves;
// if we don't ack, it should resend...
$this->con->ack($frame);
$this->enqueue($notice, $queue);
$this->enqueue($item, $queue);
$this->stats('requeued', $queue);
return false;
}