From 7b66a129139d8c2f03677f6a5b71412a111f655d Mon Sep 17 00:00:00 2001 From: Evan Prodromou Date: Wed, 1 Jul 2009 11:10:23 -0400 Subject: [PATCH] save frames for StompQueueManager --- lib/stompqueuemanager.php | 36 +++++++++++++++++++++++++++++++++--- 1 file changed, 33 insertions(+), 3 deletions(-) diff --git a/lib/stompqueuemanager.php b/lib/stompqueuemanager.php index 20c6e7a341..1b4a26f2ea 100644 --- a/lib/stompqueuemanager.php +++ b/lib/stompqueuemanager.php @@ -30,7 +30,7 @@ require_once 'Stomp.php'; -class QueueManager +class StompQueueManager { var $server = null; var $username = null; @@ -61,10 +61,12 @@ class QueueManager function enqueue($object, $queue) { - $notice = (Notice)$object; + $notice = $object; $this->_connect(); + // XXX: serialize and send entire notice + $result = $this->con->send($this->_queueName($queue), $notice->id, // BODY of the message array ('created' => $notice->created)); @@ -93,9 +95,11 @@ class QueueManager // notice, and it has to get it from the DB // A massive improvement would be avoid DB query by transmitting // all the notice details via queue server... + $notice = Notice::staticGet($frame->body); if ($notice) { + $this->_saveFrame($notice, $queue, $frame); } else { $this->log(LOG_WARNING, 'queue item for notice that does not exist'); } @@ -104,7 +108,7 @@ class QueueManager function done($object, $queue) { - $notice = (Notice)$object; + $notice = $object; $this->_connect(); @@ -116,7 +120,33 @@ class QueueManager // if the msg has been handled positively, ack it // and the queue server will remove it from the queue $this->con->ack($frame); + $this->_clearFrame($notice, $queue); + $this->log(LOG_INFO, 'finished broadcasting notice ID = ' . $notice->id); } } + + function _frameKey($notice, $queue) + { + return ((string)$notice->id) . '-' . $queue; + } + + function _saveFrame($notice, $queue, $frame) + { + $k = $this->_frameKey($notice, $queue); + $this->_frames[$k] = $frame; + return true; + } + + function _getFrame($notice, $queue) + { + $k = $this->_frameKey($notice, $queue); + return $this->_frames[$k]; + } + + function _clearFrame($notice, $queue) + { + $k = $this->_frameKey($notice, $queue); + unset($this->_frames[$k]); + } }