diff --git a/lib/dbqueuemanager.php b/lib/dbqueuemanager.php index c9e5ef243f..6e7172de00 100644 --- a/lib/dbqueuemanager.php +++ b/lib/dbqueuemanager.php @@ -51,7 +51,36 @@ class DBQueueManager extends QueueManager return true; } - function nextItem($queue, $timeout=null) + function service($queue, $handler) + { + while (true) { + $this->_log(LOG_DEBUG, 'Checking for notices...'); + $notice = $this->_nextItem($queue, null); + if (empty($notice)) { + $this->_log(LOG_DEBUG, 'No notices waiting; idling.'); + // Nothing in the queue. Do you + // have other tasks, like servicing your + // XMPP connection, to do? + $handler->idle(QUEUE_HANDLER_MISS_IDLE); + } else { + $this->_log(LOG_INFO, 'Got notice '. $notice->id); + // Yay! Got one! + if ($handler->handle_notice($notice)) { + $this->_log(LOG_INFO, 'Successfully handled notice '. $notice->id); + $this->_done($notice, $queue); + } else { + $this->_log(LOG_INFO, 'Failed to handle notice '. $notice->id); + $this->_fail($notice, $queue); + } + // Chance to e.g. service your XMPP connection + $this->_log(LOG_DEBUG, 'Idling after success.'); + $handler->idle(QUEUE_HANDLER_HIT_IDLE); + } + // XXX: when do we give up? + } + } + + function _nextItem($queue, $timeout=null) { $start = time(); $result = null; @@ -74,7 +103,7 @@ class DBQueueManager extends QueueManager return $result; } - function done($object, $queue) + function _done($object, $queue) { // XXX: right now, we only handle notices @@ -101,7 +130,7 @@ class DBQueueManager extends QueueManager $notice = null; } - function fail($object, $queue) + function _fail($object, $queue) { // XXX: right now, we only handle notices diff --git a/lib/queuehandler.php b/lib/queuehandler.php index ddb47a28e9..c0f38f4e35 100644 --- a/lib/queuehandler.php +++ b/lib/queuehandler.php @@ -25,7 +25,7 @@ require_once(INSTALLDIR.'/classes/Notice.php'); define('CLAIM_TIMEOUT', 1200); define('QUEUE_HANDLER_MISS_IDLE', 10); -define('QUEUE_HANDLER_HIT_IDLE', 10); +define('QUEUE_HANDLER_HIT_IDLE', 0); class QueueHandler extends Daemon { @@ -42,7 +42,7 @@ class QueueHandler extends Daemon function timeout() { - return null; + return 60; } function class_name() @@ -96,31 +96,7 @@ class QueueHandler extends Daemon $qm = QueueManager::get(); - while (true) { - $this->log(LOG_DEBUG, 'Checking for notices...'); - $notice = $qm->nextItem($queue, $timeout); - if (empty($notice)) { - $this->log(LOG_DEBUG, 'No notices waiting; idling.'); - // Nothing in the queue. Do you - // have other tasks, like servicing your - // XMPP connection, to do? - $this->idle(QUEUE_HANDLER_MISS_IDLE); - } else { - $this->log(LOG_INFO, 'Got notice '. $notice->id); - // Yay! Got one! - if ($this->handle_notice($notice)) { - $this->log(LOG_INFO, 'Successfully handled notice '. $notice->id); - $qm->done($notice, $queue); - } else { - $this->log(LOG_INFO, 'Failed to handle notice '. $notice->id); - $qm->fail($notice, $queue); - } - // Chance to e.g. service your XMPP connection - $this->log(LOG_DEBUG, 'Idling after success.'); - $this->idle(QUEUE_HANDLER_HIT_IDLE); - } - // XXX: when do we give up? - } + $qm->service($queue, $this); if (!$this->finish()) { return false; diff --git a/lib/queuemanager.php b/lib/queuemanager.php index 1bf4d4decc..f36e99d16a 100644 --- a/lib/queuemanager.php +++ b/lib/queuemanager.php @@ -67,23 +67,8 @@ class QueueManager throw ServerException("Unimplemented function 'enqueue' called"); } - function peek($queue) + function service($queue, $handler) { - throw ServerException("Unimplemented function 'peek' called"); - } - - function nextItem($queue, $timeout=null) - { - throw ServerException("Unimplemented function 'nextItem' called"); - } - - function done($object, $queue) - { - throw ServerException("Unimplemented function 'done' called"); - } - - function fail($object, $queue) - { - throw ServerException("Unimplemented function 'fail' called"); + throw ServerException("Unimplemented function 'service' called"); } } diff --git a/lib/stompqueuemanager.php b/lib/stompqueuemanager.php index 1ad6870363..b8731d5439 100644 --- a/lib/stompqueuemanager.php +++ b/lib/stompqueuemanager.php @@ -84,85 +84,33 @@ class StompQueueManager . $notice->id . ' for ' . $transport); } - function nextItem($queue, $timeout=null) + function service($queue, $handler) { $result = null; $this->_connect(); - $frame = $this->con->readFrame(); + $this->con->setReadTimeout($handler->timeout()); - if ($frame) { - $this->log(LOG_INFO, 'Got item enqueued '.common_exact_date($frame->headers['created'])); + $this->con->subscribe($this->_queueName($queue)); - // XXX: Now the queue handler receives only the ID of the - // notice, and it has to get it from the DB - // A massive improvement would be avoid DB query by transmitting - // all the notice details via queue server... + while (true) { - $notice = Notice::staticGet($frame->body); + $frame = $this->con->readFrame(); - if ($notice) { - $this->_saveFrame($notice, $queue, $frame); - } else { - $this->log(LOG_WARNING, 'queue item for notice that does not exist'); + if ($frame) { + $notice = Notice::staticGet($frame->body); + + if ($handler->handle_notice($notice)) { + $this->_log(LOG_INFO, 'Successfully handled notice '. $notice->id); + $this->con->ack($frame); + } } + + $handler->idle(0); } - } - function done($object, $queue) - { - $notice = $object; - - $this->_connect(); - - $frame = $this->_getFrame($notice, $queue); - - if (empty($frame)) { - $this->log(LOG_ERR, 'Cannot find frame for notice '.$notice->id.' in queue '.$queue); - } else { - // if the msg has been handled positively, ack it - // and the queue server will remove it from the queue - $this->con->ack($frame); - $this->_clearFrame($notice, $queue); - - $this->log(LOG_INFO, 'finished broadcasting notice ID = ' . $notice->id); - } - } - - function fail($object, $queue) - { - $notice = $object; - - // STOMP server will requeue it after a while anyways, - // so no need to notify. Just get it out of our little - // array - - $this->_clearFrame($notice, $queue); - } - - function _frameKey($notice, $queue) - { - return ((string)$notice->id) . '-' . $queue; - } - - function _saveFrame($notice, $queue, $frame) - { - $k = $this->_frameKey($notice, $queue); - $this->_frames[$k] = $frame; - return true; - } - - function _getFrame($notice, $queue) - { - $k = $this->_frameKey($notice, $queue); - return $this->_frames[$k]; - } - - function _clearFrame($notice, $queue) - { - $k = $this->_frameKey($notice, $queue); - unset($this->_frames[$k]); + $this->con->unsubscribe($this->_queueName($queue)); } function _queueName($queue)