diff --git a/classes/Queue_item.php b/classes/Queue_item.php index ce7762646d..0d6fd56af2 100644 --- a/classes/Queue_item.php +++ b/classes/Queue_item.php @@ -82,7 +82,7 @@ class Queue_item extends Managed_DataObject /** * Release a claimed item. */ - function releaseCLaim() + function releaseClaim() { // DB_DataObject doesn't let us save nulls right now $sql = sprintf("UPDATE queue_item SET claimed=NULL WHERE id=%d", $this->id); diff --git a/lib/dbqueuemanager.php b/lib/dbqueuemanager.php index f843d6d9e6..45c4b694d2 100644 --- a/lib/dbqueuemanager.php +++ b/lib/dbqueuemanager.php @@ -73,54 +73,57 @@ class DBQueueManager extends QueueManager { //$this->_log(LOG_DEBUG, 'Checking for notices...'); $qi = Queue_item::top($this->activeQueues()); - if (empty($qi)) { + if (!$qi instanceof Queue_item) { //$this->_log(LOG_DEBUG, 'No notices waiting; idling.'); return false; } - $queue = $qi->transport; try { $item = $this->decode($qi->frame); } catch (Exception $e) { - $this->_log(LOG_INFO, "[$queue] Discarding: ".$e->getMessage()); + $this->_log(LOG_INFO, "[{$qi->transport}] Discarding: ".$e->getMessage()); $this->_done($qi); return true; } $rep = $this->logrep($item); - $this->_log(LOG_DEBUG, "Got $rep for transport $queue"); + $this->_log(LOG_DEBUG, "Got {$rep} for transport {$qi->transport}"); - $handler = $this->getHandler($queue); + $handler = $this->getHandler($qi->transport); if ($handler) { if ($handler->handle($item)) { - $this->_log(LOG_INFO, "[$queue:$rep] Successfully handled item"); + $this->_log(LOG_INFO, "[{$qi->transport}:$rep] Successfully handled item"); $this->_done($qi); } else { - $this->_log(LOG_INFO, "[$queue:$rep] Failed to handle item"); + $this->_log(LOG_INFO, "[{$qi->transport}:$rep] Failed to handle item"); $this->_fail($qi); } } else { - $this->_log(LOG_INFO, "[$queue:$rep] No handler for queue $queue; discarding."); - $this->_done($qi); + $this->noHandlerFound($qi, $rep); } return true; } + // What to do if no handler was found. For example, the OpportunisticQM + // should avoid deleting items just because it can't reach XMPP queues etc. + protected function noHandlerFound(Queue_item $qi, $rep=null) { + $this->_log(LOG_INFO, "[{$qi->transport}:{$rep}] No handler for queue {$qi->transport}; discarding."); + $this->_done($qi); + } + /** * Delete our claimed item from the queue after successful processing. * * @param QueueItem $qi */ - protected function _done($qi) + protected function _done(Queue_item $qi) { - $queue = $qi->transport; - if (empty($qi->claimed)) { - $this->_log(LOG_WARNING, "Reluctantly releasing unclaimed queue item $qi->id from $qi->queue"); + $this->_log(LOG_WARNING, "Reluctantly releasing unclaimed queue item {$qi->id} from {$qi->transport}"); } $qi->delete(); - $this->stats('handled', $queue); + $this->stats('handled', $qi->transport); } /** @@ -129,16 +132,16 @@ class DBQueueManager extends QueueManager * * @param QueueItem $qi */ - protected function _fail($qi) + protected function _fail(Queue_item $qi, $releaseOnly=false) { - $queue = $qi->transport; - if (empty($qi->claimed)) { - $this->_log(LOG_WARNING, "[$queue:item $qi->id] Ignoring failure for unclaimed queue item"); + $this->_log(LOG_WARNING, "[{$qi->transport}:item {$qi->id}] Ignoring failure for unclaimed queue item"); } else { $qi->releaseClaim(); } - $this->stats('error', $queue); + if (!$releaseOnly) { + $this->stats('error', $qi->transport); + } } } diff --git a/plugins/OpportunisticQM/lib/opportunisticqueuemanager.php b/plugins/OpportunisticQM/lib/opportunisticqueuemanager.php index 6356afcd08..4b2b679b58 100644 --- a/plugins/OpportunisticQM/lib/opportunisticqueuemanager.php +++ b/plugins/OpportunisticQM/lib/opportunisticqueuemanager.php @@ -80,6 +80,13 @@ class OpportunisticQueueManager extends DBQueueManager return true; } + // OpportunisticQM shouldn't discard items it can't handle, we're + // only here to take care of what we _can_ handle! + protected function noHandlerFound(Queue_item $qi, $rep=null) { + $this->_log(LOG_WARNING, "[{$qi->transport}:item {$qi->id}] Releasing claim for queue item without a handler"); + $this->_fail($qi, true); // true here means "releaseOnly", so no error statistics since it's not an _error_ + } + /** * Takes care of running through the queue items, returning when * the limits setup in __construct are met.