Don't retry unhandled transports in OpportunisticQM
It'd continue trying xmpp transports forever, for example...
This commit is contained in:
parent
e10d081a56
commit
beba2a25d0
@ -40,7 +40,7 @@ class Queue_item extends Managed_DataObject
|
||||
* @param mixed $transports name of a single queue or array of queues to pull from
|
||||
* If not specified, checks all queues in the system.
|
||||
*/
|
||||
static function top($transports=null) {
|
||||
static function top($transports=null, array $ignored_transports=array()) {
|
||||
|
||||
$qi = new Queue_item();
|
||||
if ($transports) {
|
||||
@ -52,6 +52,11 @@ class Queue_item extends Managed_DataObject
|
||||
$qi->transport = $transports;
|
||||
}
|
||||
}
|
||||
if (!empty($ignored_transports)) {
|
||||
// @fixme use safer escaping
|
||||
$list = implode("','", array_map(array($qi, 'escape'), $ignored_transports));
|
||||
$qi->whereAdd("transport NOT IN ('$list')");
|
||||
}
|
||||
$qi->orderBy('created');
|
||||
$qi->whereAdd('claimed is null');
|
||||
|
||||
|
@ -72,7 +72,7 @@ class DBQueueManager extends QueueManager
|
||||
public function poll()
|
||||
{
|
||||
//$this->_log(LOG_DEBUG, 'Checking for notices...');
|
||||
$qi = Queue_item::top($this->activeQueues());
|
||||
$qi = Queue_item::top($this->activeQueues(), $this->getIgnoredTransports());
|
||||
if (!$qi instanceof Queue_item) {
|
||||
//$this->_log(LOG_DEBUG, 'No notices waiting; idling.');
|
||||
return false;
|
||||
|
@ -43,6 +43,7 @@ abstract class QueueManager extends IoManager
|
||||
protected $handlers = array();
|
||||
protected $groups = array();
|
||||
protected $activeGroups = array();
|
||||
protected $ignoredTransports = array();
|
||||
|
||||
/**
|
||||
* Factory function to pull the appropriate QueueManager object
|
||||
@ -255,6 +256,17 @@ abstract class QueueManager extends IoManager
|
||||
return array_keys($queues);
|
||||
}
|
||||
|
||||
function getIgnoredTransports()
|
||||
{
|
||||
return array_keys($this->ignoredTransports);
|
||||
}
|
||||
|
||||
function ignoreTransport($transport)
|
||||
{
|
||||
// key is used for uniqueness, value doesn't mean anything
|
||||
$this->ignoredTransports[$transport] = true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize the list of queue handlers for the current site.
|
||||
*
|
||||
|
@ -83,10 +83,17 @@ class OpportunisticQueueManager extends DBQueueManager
|
||||
// OpportunisticQM shouldn't discard items it can't handle, we're
|
||||
// only here to take care of what we _can_ handle!
|
||||
protected function noHandlerFound(Queue_item $qi, $rep=null) {
|
||||
$this->_log(LOG_WARNING, "[{$qi->transport}:item {$qi->id}] Releasing claim for queue item without a handler");
|
||||
$this->_log(LOG_WARNING, "[{$qi->transport}:item {$qi->id}] Releasing claim for queue item without a handler");
|
||||
$this->_fail($qi, true); // true here means "releaseOnly", so no error statistics since it's not an _error_
|
||||
}
|
||||
|
||||
protected function _fail(Queue_item $qi, $releaseOnly=false)
|
||||
{
|
||||
parent::_fail($qi, $releaseOnly);
|
||||
$this->_log(LOG_DEBUG, "[{$qi->transport}:item {$qi->id}] Ignoring this transport for the rest of this execution");
|
||||
$this->ignoreTransport($qi->transport);
|
||||
}
|
||||
|
||||
/**
|
||||
* Takes care of running through the queue items, returning when
|
||||
* the limits setup in __construct are met.
|
||||
|
Loading…
Reference in New Issue
Block a user