forked from GNUsocial/gnu-social
save frames for StompQueueManager
This commit is contained in:
parent
4c256a6d7e
commit
7b66a12913
@ -30,7 +30,7 @@
|
|||||||
|
|
||||||
require_once 'Stomp.php';
|
require_once 'Stomp.php';
|
||||||
|
|
||||||
class QueueManager
|
class StompQueueManager
|
||||||
{
|
{
|
||||||
var $server = null;
|
var $server = null;
|
||||||
var $username = null;
|
var $username = null;
|
||||||
@ -61,10 +61,12 @@ class QueueManager
|
|||||||
|
|
||||||
function enqueue($object, $queue)
|
function enqueue($object, $queue)
|
||||||
{
|
{
|
||||||
$notice = (Notice)$object;
|
$notice = $object;
|
||||||
|
|
||||||
$this->_connect();
|
$this->_connect();
|
||||||
|
|
||||||
|
// XXX: serialize and send entire notice
|
||||||
|
|
||||||
$result = $this->con->send($this->_queueName($queue),
|
$result = $this->con->send($this->_queueName($queue),
|
||||||
$notice->id, // BODY of the message
|
$notice->id, // BODY of the message
|
||||||
array ('created' => $notice->created));
|
array ('created' => $notice->created));
|
||||||
@ -93,9 +95,11 @@ class QueueManager
|
|||||||
// notice, and it has to get it from the DB
|
// notice, and it has to get it from the DB
|
||||||
// A massive improvement would be avoid DB query by transmitting
|
// A massive improvement would be avoid DB query by transmitting
|
||||||
// all the notice details via queue server...
|
// all the notice details via queue server...
|
||||||
|
|
||||||
$notice = Notice::staticGet($frame->body);
|
$notice = Notice::staticGet($frame->body);
|
||||||
|
|
||||||
if ($notice) {
|
if ($notice) {
|
||||||
|
$this->_saveFrame($notice, $queue, $frame);
|
||||||
} else {
|
} else {
|
||||||
$this->log(LOG_WARNING, 'queue item for notice that does not exist');
|
$this->log(LOG_WARNING, 'queue item for notice that does not exist');
|
||||||
}
|
}
|
||||||
@ -104,7 +108,7 @@ class QueueManager
|
|||||||
|
|
||||||
function done($object, $queue)
|
function done($object, $queue)
|
||||||
{
|
{
|
||||||
$notice = (Notice)$object;
|
$notice = $object;
|
||||||
|
|
||||||
$this->_connect();
|
$this->_connect();
|
||||||
|
|
||||||
@ -116,7 +120,33 @@ class QueueManager
|
|||||||
// if the msg has been handled positively, ack it
|
// if the msg has been handled positively, ack it
|
||||||
// and the queue server will remove it from the queue
|
// and the queue server will remove it from the queue
|
||||||
$this->con->ack($frame);
|
$this->con->ack($frame);
|
||||||
|
$this->_clearFrame($notice, $queue);
|
||||||
|
|
||||||
$this->log(LOG_INFO, 'finished broadcasting notice ID = ' . $notice->id);
|
$this->log(LOG_INFO, 'finished broadcasting notice ID = ' . $notice->id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function _frameKey($notice, $queue)
|
||||||
|
{
|
||||||
|
return ((string)$notice->id) . '-' . $queue;
|
||||||
|
}
|
||||||
|
|
||||||
|
function _saveFrame($notice, $queue, $frame)
|
||||||
|
{
|
||||||
|
$k = $this->_frameKey($notice, $queue);
|
||||||
|
$this->_frames[$k] = $frame;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
function _getFrame($notice, $queue)
|
||||||
|
{
|
||||||
|
$k = $this->_frameKey($notice, $queue);
|
||||||
|
return $this->_frames[$k];
|
||||||
|
}
|
||||||
|
|
||||||
|
function _clearFrame($notice, $queue)
|
||||||
|
{
|
||||||
|
$k = $this->_frameKey($notice, $queue);
|
||||||
|
unset($this->_frames[$k]);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user