OpportunisticQM shouldn't try to handle what it can't find
This commit is contained in:
parent
07d92acfd0
commit
0bd16b521c
@ -82,7 +82,7 @@ class Queue_item extends Managed_DataObject
|
|||||||
/**
|
/**
|
||||||
* Release a claimed item.
|
* Release a claimed item.
|
||||||
*/
|
*/
|
||||||
function releaseCLaim()
|
function releaseClaim()
|
||||||
{
|
{
|
||||||
// DB_DataObject doesn't let us save nulls right now
|
// DB_DataObject doesn't let us save nulls right now
|
||||||
$sql = sprintf("UPDATE queue_item SET claimed=NULL WHERE id=%d", $this->id);
|
$sql = sprintf("UPDATE queue_item SET claimed=NULL WHERE id=%d", $this->id);
|
||||||
|
@ -73,54 +73,57 @@ class DBQueueManager extends QueueManager
|
|||||||
{
|
{
|
||||||
//$this->_log(LOG_DEBUG, 'Checking for notices...');
|
//$this->_log(LOG_DEBUG, 'Checking for notices...');
|
||||||
$qi = Queue_item::top($this->activeQueues());
|
$qi = Queue_item::top($this->activeQueues());
|
||||||
if (empty($qi)) {
|
if (!$qi instanceof Queue_item) {
|
||||||
//$this->_log(LOG_DEBUG, 'No notices waiting; idling.');
|
//$this->_log(LOG_DEBUG, 'No notices waiting; idling.');
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
$queue = $qi->transport;
|
|
||||||
try {
|
try {
|
||||||
$item = $this->decode($qi->frame);
|
$item = $this->decode($qi->frame);
|
||||||
} catch (Exception $e) {
|
} catch (Exception $e) {
|
||||||
$this->_log(LOG_INFO, "[$queue] Discarding: ".$e->getMessage());
|
$this->_log(LOG_INFO, "[{$qi->transport}] Discarding: ".$e->getMessage());
|
||||||
$this->_done($qi);
|
$this->_done($qi);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
$rep = $this->logrep($item);
|
$rep = $this->logrep($item);
|
||||||
$this->_log(LOG_DEBUG, "Got $rep for transport $queue");
|
$this->_log(LOG_DEBUG, "Got {$rep} for transport {$qi->transport}");
|
||||||
|
|
||||||
$handler = $this->getHandler($queue);
|
$handler = $this->getHandler($qi->transport);
|
||||||
if ($handler) {
|
if ($handler) {
|
||||||
if ($handler->handle($item)) {
|
if ($handler->handle($item)) {
|
||||||
$this->_log(LOG_INFO, "[$queue:$rep] Successfully handled item");
|
$this->_log(LOG_INFO, "[{$qi->transport}:$rep] Successfully handled item");
|
||||||
$this->_done($qi);
|
$this->_done($qi);
|
||||||
} else {
|
} else {
|
||||||
$this->_log(LOG_INFO, "[$queue:$rep] Failed to handle item");
|
$this->_log(LOG_INFO, "[{$qi->transport}:$rep] Failed to handle item");
|
||||||
$this->_fail($qi);
|
$this->_fail($qi);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
$this->_log(LOG_INFO, "[$queue:$rep] No handler for queue $queue; discarding.");
|
$this->noHandlerFound($qi, $rep);
|
||||||
$this->_done($qi);
|
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// What to do if no handler was found. For example, the OpportunisticQM
|
||||||
|
// should avoid deleting items just because it can't reach XMPP queues etc.
|
||||||
|
protected function noHandlerFound(Queue_item $qi, $rep=null) {
|
||||||
|
$this->_log(LOG_INFO, "[{$qi->transport}:{$rep}] No handler for queue {$qi->transport}; discarding.");
|
||||||
|
$this->_done($qi);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Delete our claimed item from the queue after successful processing.
|
* Delete our claimed item from the queue after successful processing.
|
||||||
*
|
*
|
||||||
* @param QueueItem $qi
|
* @param QueueItem $qi
|
||||||
*/
|
*/
|
||||||
protected function _done($qi)
|
protected function _done(Queue_item $qi)
|
||||||
{
|
{
|
||||||
$queue = $qi->transport;
|
|
||||||
|
|
||||||
if (empty($qi->claimed)) {
|
if (empty($qi->claimed)) {
|
||||||
$this->_log(LOG_WARNING, "Reluctantly releasing unclaimed queue item $qi->id from $qi->queue");
|
$this->_log(LOG_WARNING, "Reluctantly releasing unclaimed queue item {$qi->id} from {$qi->transport}");
|
||||||
}
|
}
|
||||||
$qi->delete();
|
$qi->delete();
|
||||||
|
|
||||||
$this->stats('handled', $queue);
|
$this->stats('handled', $qi->transport);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -129,16 +132,16 @@ class DBQueueManager extends QueueManager
|
|||||||
*
|
*
|
||||||
* @param QueueItem $qi
|
* @param QueueItem $qi
|
||||||
*/
|
*/
|
||||||
protected function _fail($qi)
|
protected function _fail(Queue_item $qi, $releaseOnly=false)
|
||||||
{
|
{
|
||||||
$queue = $qi->transport;
|
|
||||||
|
|
||||||
if (empty($qi->claimed)) {
|
if (empty($qi->claimed)) {
|
||||||
$this->_log(LOG_WARNING, "[$queue:item $qi->id] Ignoring failure for unclaimed queue item");
|
$this->_log(LOG_WARNING, "[{$qi->transport}:item {$qi->id}] Ignoring failure for unclaimed queue item");
|
||||||
} else {
|
} else {
|
||||||
$qi->releaseClaim();
|
$qi->releaseClaim();
|
||||||
}
|
}
|
||||||
|
|
||||||
$this->stats('error', $queue);
|
if (!$releaseOnly) {
|
||||||
|
$this->stats('error', $qi->transport);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -80,6 +80,13 @@ class OpportunisticQueueManager extends DBQueueManager
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// OpportunisticQM shouldn't discard items it can't handle, we're
|
||||||
|
// only here to take care of what we _can_ handle!
|
||||||
|
protected function noHandlerFound(Queue_item $qi, $rep=null) {
|
||||||
|
$this->_log(LOG_WARNING, "[{$qi->transport}:item {$qi->id}] Releasing claim for queue item without a handler");
|
||||||
|
$this->_fail($qi, true); // true here means "releaseOnly", so no error statistics since it's not an _error_
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Takes care of running through the queue items, returning when
|
* Takes care of running through the queue items, returning when
|
||||||
* the limits setup in __construct are met.
|
* the limits setup in __construct are met.
|
||||||
|
Loading…
Reference in New Issue
Block a user