diff --git a/lib/dbqueuemanager.php b/lib/dbqueuemanager.php new file mode 100644 index 0000000000..c0d4dcd293 --- /dev/null +++ b/lib/dbqueuemanager.php @@ -0,0 +1,106 @@ +. + * + * @category QueueManager + * @package Laconica + * @author Evan Prodromou + * @author Sarven Capadisli + * @copyright 2009 Control Yourself, Inc. + * @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0 + * @link http://laconi.ca/ + */ + +class DBQueueManager extends QueueManager +{ + var $qis = array(); + + function enqueue($object, $queue) + { + $notice = (Notice)$object; + + $qi = new Queue_item(); + + $qi->notice_id = $notice->id; + $qi->transport = $queue; + $qi->created = $notice->created; + $result = $qi->insert(); + + if (!$result) { + common_log_db_error($qi, 'INSERT', __FILE__); + throw new ServerException('DB error inserting queue item'); + } + + return true; + } + + function nextItem($queue, $timeout=null) + { + $start = time(); + $result = null; + + do { + $qi = Queue_item::top($queue); + if (!empty($qi)) { + $notice = Notice::staticGet('id', $qi->notice_id); + if (!empty($notice)) { + $result = $notice; + } else { + $this->_log(LOG_INFO, 'dequeued non-existent notice ' . $notice->id); + $qi->delete(); + $qi->free(); + $qi = null; + } + } + } while (empty($result) && (is_null($timeout) || (time() - $start) < $timeout)); + + return $result; + } + + function done($object, $queue) + { + $notice = (Notice)$object; + + $qi = Queue_item::pkeyGet(array('notice_id' => $notice->id, + 'transport' => $queue)); + + if (empty($qi)) { + $this->log(LOG_INFO, 'Cannot find queue item for notice '.$notice->id.', queue '.$queue); + } else { + if (empty($qi->claimed)) { + $this->log(LOG_WARNING, 'Reluctantly releasing unclaimed queue item '. + 'for '.$notice->id.', queue '.$queue); + } + $qi->delete(); + $qi->free(); + $qi = null; + } + + $this->log(LOG_INFO, 'done with notice ID = ' . $notice->id); + + $notice->free(); + $notice = null; + } + + function _log($level, $msg) + { + common_log($level, 'DBQueueManager: '.$msg); + } +} diff --git a/lib/queuemanager.php b/lib/queuemanager.php new file mode 100644 index 0000000000..64aca1bc12 --- /dev/null +++ b/lib/queuemanager.php @@ -0,0 +1,78 @@ +. + * + * @category QueueManager + * @package Laconica + * @author Evan Prodromou + * @author Sarven Capadisli + * @copyright 2009 Control Yourself, Inc. + * @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0 + * @link http://laconi.ca/ + */ + +class QueueManager +{ + static $qm = null; + + static function get() + { + if (empty(self::$qm)) { + + if (Event::handle('StartNewQueueManager', self::$qm)) { + + $type = common_config('queue', 'sub'); + + switch ($type) { + case 'db': + self::$qm = new DBQueueManager(); + break; + case 'stomp': + self::$qm = new StompQueueManager(); + break; + default: + throw new ServerException("No queue manager class for type '$type'"); + } + } + + return self::$qm; + } + } + + function enqueue($object, $queue) + { + throw ServerException("Unimplemented function 'enqueue' called"); + } + + function peek($queue) + { + throw ServerException("Unimplemented function 'peek' called"); + } + + function nextItem($queue, $timeout=null) + { + throw ServerException("Unimplemented function 'nextItem' called"); + } + + function done($object, $queue) + { + throw ServerException("Unimplemented function 'done' called"); + } +} diff --git a/lib/stompqueuemanager.php b/lib/stompqueuemanager.php new file mode 100644 index 0000000000..20c6e7a341 --- /dev/null +++ b/lib/stompqueuemanager.php @@ -0,0 +1,122 @@ +. + * + * @category QueueManager + * @package Laconica + * @author Evan Prodromou + * @author Sarven Capadisli + * @copyright 2009 Control Yourself, Inc. + * @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0 + * @link http://laconi.ca/ + */ + +require_once 'Stomp.php'; + +class QueueManager +{ + var $server = null; + var $username = null; + var $password = null; + var $base = null; + var $con = null; + var $frames = array(); + + function __construct() + { + $this->server = common_config('queue', 'stomp_server'); + $this->username = common_config('queue', 'stomp_username'); + $this->password = common_config('queue', 'stomp_password'); + $this->base = common_config('queue', 'queue_basename'); + } + + function _connect() + { + if (empty($this->con)) { + $this->con = new Stomp($this->server); + + if (!$this->con->connect($this->username, $this->password)) { + $this->_log(LOG_ERR, 'Failed to connect to queue server'); + throw new ServerException('Failed to connect to queue server'); + } + } + } + + function enqueue($object, $queue) + { + $notice = (Notice)$object; + + $this->_connect(); + + $result = $this->con->send($this->_queueName($queue), + $notice->id, // BODY of the message + array ('created' => $notice->created)); + + if (!$result) { + common_log(LOG_ERR, 'Error sending to '.$transport.' queue'); + return false; + } + + common_log(LOG_DEBUG, 'complete remote queueing notice ID = ' + . $notice->id . ' for ' . $transport); + } + + function nextItem($queue, $timeout=null) + { + $result = null; + + $this->_connect(); + + $frame = $this->con->readFrame(); + + if ($frame) { + $this->log(LOG_INFO, 'Got item enqueued '.common_exact_date($frame->headers['created'])); + + // XXX: Now the queue handler receives only the ID of the + // notice, and it has to get it from the DB + // A massive improvement would be avoid DB query by transmitting + // all the notice details via queue server... + $notice = Notice::staticGet($frame->body); + + if ($notice) { + } else { + $this->log(LOG_WARNING, 'queue item for notice that does not exist'); + } + } + } + + function done($object, $queue) + { + $notice = (Notice)$object; + + $this->_connect(); + + $frame = $this->_getFrame($notice, $queue); + + if (empty($frame)) { + $this->log(LOG_ERR, 'Cannot find frame for notice '.$notice->id.' in queue '.$queue); + } else { + // if the msg has been handled positively, ack it + // and the queue server will remove it from the queue + $this->con->ack($frame); + $this->log(LOG_INFO, 'finished broadcasting notice ID = ' . $notice->id); + } + } +}