fix up hub queueing to work w/ stomp queues

This commit is contained in:
Brion Vibber 2010-02-08 16:43:37 -08:00
parent c4557d4d07
commit 045797331c
4 changed files with 34 additions and 31 deletions

View File

@ -155,26 +155,26 @@ abstract class QueueManager extends IoManager
} }
/** /**
* Encode an object for queued storage. * Encode an object or variable for queued storage.
* Next gen may use serialization. * Notice objects are currently stored as an id reference;
* other items are serialized.
* *
* @param mixed $object * @param mixed $item
* @return string * @return string
*/ */
protected function encode($object) protected function encode($item)
{ {
if ($object instanceof Notice) { if ($item instanceof Notice) {
return $object->id; // Backwards compat
} else if (is_string($object)) { return $item->id;
return $object;
} else { } else {
throw new ServerException("Can't queue this type", 500); return serialize($item);
} }
} }
/** /**
* Decode an object from queued storage. * Decode an object from queued storage.
* Accepts back-compat notice reference entries and strings for now. * Accepts notice reference entries and serialized items.
* *
* @param string * @param string
* @return mixed * @return mixed
@ -182,9 +182,23 @@ abstract class QueueManager extends IoManager
protected function decode($frame) protected function decode($frame)
{ {
if (is_numeric($frame)) { if (is_numeric($frame)) {
// Back-compat for notices...
return Notice::staticGet(intval($frame)); return Notice::staticGet(intval($frame));
} else { } elseif (substr($frame, 0, 1) == '<') {
// Back-compat for XML source
return $frame; return $frame;
} else {
// Deserialize!
#$old = error_reporting();
#error_reporting($old & ~E_NOTICE);
$out = unserialize($frame);
#error_reporting($old);
if ($out === false && $frame !== 'b:0;') {
common_log(LOG_ERR, "Couldn't unserialize queued frame: $frame");
return false;
}
return $out;
} }
} }

View File

@ -549,26 +549,14 @@ class StompQueueManager extends QueueManager
} }
$host = $this->cons[$idx]->getServer(); $host = $this->cons[$idx]->getServer();
if (is_numeric($frame->body)) { $item = $this->decode($frame->body);
$id = intval($frame->body); if (empty($item)) {
$info = "notice $id posted at {$frame->headers['created']} in queue $queue from $host"; $this->_log(LOG_ERR, "Skipping empty or deleted item in queue $queue from $host");
return true;
$notice = Notice::staticGet('id', $id);
if (empty($notice)) {
$this->_log(LOG_WARNING, "Skipping missing $info");
$this->ack($idx, $frame);
$this->commit($idx);
$this->begin($idx);
$this->stats('badnotice', $queue);
return false;
}
$item = $notice;
} else {
// @fixme should we serialize, or json, or what here?
$info = "string posted at {$frame->headers['created']} in queue $queue from $host";
$item = $frame->body;
} }
$info = $this->logrep($item) . " posted at " .
$frame->headers['created'] . " in queue $queue from $host";
$this->_log(LOG_DEBUG, "Dequeued $info");
$handler = $this->getHandler($queue); $handler = $this->getHandler($queue);
if (!$handler) { if (!$handler) {

View File

@ -56,6 +56,7 @@ class HubDistribQueueHandler extends QueueHandler
} else { } else {
common_log(LOG_INFO, "No PuSH subscribers for $feed"); common_log(LOG_INFO, "No PuSH subscribers for $feed");
} }
return true;
} }
function pushGroup($notice, $group_id) function pushGroup($notice, $group_id)

View File

@ -43,7 +43,7 @@ class HubOutQueueHandler extends QueueHandler
common_log(LOG_ERR, "Failed PuSH to $sub->callback for $sub->topic: " . common_log(LOG_ERR, "Failed PuSH to $sub->callback for $sub->topic: " .
$e->getMessage()); $e->getMessage());
// @fixme Reschedule a later delivery? // @fixme Reschedule a later delivery?
// Currently we have no way to do this other than 'send NOW' return true;
} }
return true; return true;