. * * @category QueueManager * @package Laconica * @author Evan Prodromou * @author Sarven Capadisli * @copyright 2009 Control Yourself, Inc. * @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0 * @link http://laconi.ca/ */ require_once 'Stomp.php'; class StompQueueManager { var $server = null; var $username = null; var $password = null; var $base = null; var $con = null; var $frames = array(); function __construct() { $this->server = common_config('queue', 'stomp_server'); $this->username = common_config('queue', 'stomp_username'); $this->password = common_config('queue', 'stomp_password'); $this->base = common_config('queue', 'queue_basename'); } function _connect() { $this->_log(LOG_DEBUG, "Connecting to $this->server..."); if (empty($this->con)) { $this->_log(LOG_INFO, "Connecting to '$this->server' as '$this->username'..."); $this->con = new Stomp($this->server); if ($this->con->connect($this->username, $this->password)) { $this->_log(LOG_INFO, "Connected."); } else { $this->_log(LOG_ERR, 'Failed to connect to queue server'); throw new ServerException('Failed to connect to queue server'); } } } function enqueue($object, $queue) { $notice = $object; $this->_connect(); // XXX: serialize and send entire notice $result = $this->con->send($this->_queueName($queue), $notice->id, // BODY of the message array ('created' => $notice->created)); if (!$result) { common_log(LOG_ERR, 'Error sending to '.$transport.' queue'); return false; } common_log(LOG_DEBUG, 'complete remote queueing notice ID = ' . $notice->id . ' for ' . $transport); } function nextItem($queue, $timeout=null) { $result = null; $this->_connect(); $frame = $this->con->readFrame(); if ($frame) { $this->log(LOG_INFO, 'Got item enqueued '.common_exact_date($frame->headers['created'])); // XXX: Now the queue handler receives only the ID of the // notice, and it has to get it from the DB // A massive improvement would be avoid DB query by transmitting // all the notice details via queue server... $notice = Notice::staticGet($frame->body); if ($notice) { $this->_saveFrame($notice, $queue, $frame); } else { $this->log(LOG_WARNING, 'queue item for notice that does not exist'); } } } function done($object, $queue) { $notice = $object; $this->_connect(); $frame = $this->_getFrame($notice, $queue); if (empty($frame)) { $this->log(LOG_ERR, 'Cannot find frame for notice '.$notice->id.' in queue '.$queue); } else { // if the msg has been handled positively, ack it // and the queue server will remove it from the queue $this->con->ack($frame); $this->_clearFrame($notice, $queue); $this->log(LOG_INFO, 'finished broadcasting notice ID = ' . $notice->id); } } function fail($object, $queue) { $notice = $object; // STOMP server will requeue it after a while anyways, // so no need to notify. Just get it out of our little // array $this->_clearFrame($notice, $queue); } function _frameKey($notice, $queue) { return ((string)$notice->id) . '-' . $queue; } function _saveFrame($notice, $queue, $frame) { $k = $this->_frameKey($notice, $queue); $this->_frames[$k] = $frame; return true; } function _getFrame($notice, $queue) { $k = $this->_frameKey($notice, $queue); return $this->_frames[$k]; } function _clearFrame($notice, $queue) { $k = $this->_frameKey($notice, $queue); unset($this->_frames[$k]); } function _log($level, $msg) { common_log($level, 'StompQueueManager: '.$msg); } }