From cb412d228c9b1a858776287b05833d7d846fac82 Mon Sep 17 00:00:00 2001 From: Miguel Dantas Date: Mon, 2 Sep 2019 21:39:55 +0100 Subject: [PATCH] [OpportunisticQM] Revamped plugin to be able to use other poll based queuemanagers, no just the DB --- .../OpportunisticQM/OpportunisticQMPlugin.php | 27 ++++---- plugins/OpportunisticQM/actions/runqueue.php | 10 ++- .../lib/opportunisticqueuemanager.php | 61 ++++++++++--------- 3 files changed, 49 insertions(+), 49 deletions(-) diff --git a/plugins/OpportunisticQM/OpportunisticQMPlugin.php b/plugins/OpportunisticQM/OpportunisticQMPlugin.php index 8f42849d9a..4a67a37602 100644 --- a/plugins/OpportunisticQM/OpportunisticQMPlugin.php +++ b/plugins/OpportunisticQM/OpportunisticQMPlugin.php @@ -1,17 +1,16 @@ connect('main/runqueue', - ['action' => 'runqueue']); + $m->connect('main/runqueue', ['action' => 'runqueue']); } /** @@ -26,23 +25,21 @@ class OpportunisticQMPlugin extends Plugin { global $_startTime; - $args = array( - 'qmkey' => common_config('opportunisticqm', 'qmkey'), - 'max_execution_time' => $this->secs_per_action, - 'started_at' => $this->rel_to_pageload ? $_startTime : null, - 'verbosity' => $this->verbosity, - ); - $qm = new OpportunisticQueueManager($args); + $args = ['qmkey' => common_config('opportunisticqm', 'qmkey'), + 'max_execution_time' => $this->secs_per_action, + 'started_at' => $this->rel_to_pageload ? $_startTime : null, + 'verbosity' => $this->verbosity]; + $qm = new OpportunisticQueueManager($args); $qm->runQueue(); return true; } public function onPluginVersion(array &$versions): bool { - $versions[] = array('name' => 'OpportunisticQM', - 'version' => self::PLUGIN_VERSION, - 'author' => 'Mikael Nordfeldth', - 'homepage' => 'http://www.gnu.org/software/social/', + $versions[] = array('name' => 'OpportunisticQM', + 'version' => self::PLUGIN_VERSION, + 'author' => 'Mikael Nordfeldth', + 'homepage' => 'http://www.gnu.org/software/social/', 'description' => // TRANS: Plugin description. _m('Opportunistic queue manager plugin for background processing.')); diff --git a/plugins/OpportunisticQM/actions/runqueue.php b/plugins/OpportunisticQM/actions/runqueue.php index e6ce5b84c0..9b028c723e 100644 --- a/plugins/OpportunisticQM/actions/runqueue.php +++ b/plugins/OpportunisticQM/actions/runqueue.php @@ -23,16 +23,14 @@ class RunqueueAction extends Action { protected $qm = null; - protected function prepare(array $args=array()) + protected function prepare(array $args = []) { parent::prepare($args); - $args = array(); + $args = []; - foreach (array('qmkey') as $key) { - if ($this->arg($key) !== null) { - $args[$key] = $this->arg($key); - } + if ($this->arg('qmkey') !== null) { + $args['qmkey'] = $this->arg('qmkey'); } try { diff --git a/plugins/OpportunisticQM/lib/opportunisticqueuemanager.php b/plugins/OpportunisticQM/lib/opportunisticqueuemanager.php index c0d9125f10..520cc52af2 100644 --- a/plugins/OpportunisticQM/lib/opportunisticqueuemanager.php +++ b/plugins/OpportunisticQM/lib/opportunisticqueuemanager.php @@ -14,7 +14,7 @@ * @license http://www.fsf.org/licensing/licenses/agpl-3.0.html AGPL 3.0 * @link http://status.net/ */ -class OpportunisticQueueManager extends DBQueueManager +class OpportunisticQueueManager extends QueueManager { protected $qmkey = false; protected $max_execution_time = null; @@ -26,9 +26,10 @@ class OpportunisticQueueManager extends DBQueueManager protected $verbosity = null; - const MAXEXECTIME = 20; // typically just used for the /main/cron action, only used if php.ini max_execution_time is 0 + // typically just used for the /main/cron action, only used if php.ini max_execution_time is 0 + const MAXEXECTIME = 20; - public function __construct(array $args=array()) { + public function __construct(array $args = []) { foreach (get_class_vars(get_class($this)) as $key=>$val) { if (array_key_exists($key, $args)) { $this->$key = $args[$key]; @@ -45,12 +46,14 @@ class OpportunisticQueueManager extends DBQueueManager } if ($this->max_execution_margin === null) { - $this->max_execution_margin = common_config('http', 'connect_timeout') + 1; // think PHP's max exec time, minus this value to have time for timeouts etc. + // think PHP's max exec time, minus this value to have time for timeouts etc. + $this->max_execution_margin = common_config('http', 'connect_timeout') + 1; } return parent::__construct(); } + protected function verifyKey() { if ($this->qmkey !== common_config('opportunisticqm', 'qmkey')) { @@ -58,6 +61,12 @@ class OpportunisticQueueManager extends DBQueueManager } } + public function enqueue($object, $key) + { + // Nothing to do, should never get called + throw new ServerException('OpportunisticQueueManager::enqueue should never be called'); + } + public function canContinue() { $time_passed = time() - $this->started_at; @@ -67,7 +76,8 @@ class OpportunisticQueueManager extends DBQueueManager return false; } // If too much time has passed, stop - if ($time_passed >= $this->max_execution_time || $time_passed > ini_get('max_execution_time') - $this->max_execution_margin) { + if ($time_passed >= $this->max_execution_time || + $time_passed > ini_get('max_execution_time') - $this->max_execution_margin) { return false; } // If we have a max-item-limit, check if it has been passed @@ -80,25 +90,15 @@ class OpportunisticQueueManager extends DBQueueManager public function poll() { - $this->handled_items++; - if (!parent::poll()) { - throw new RunQueueOutOfWorkException(); + $qm = $this->get(); + if ($qm->pollInterval() <= 0 && ! $qm instanceof UnQueueManager) { + // Special case for UnQueueManager as it is a default plugin + // and does not require queues, since it does everything immediately + throw new ServerException('OpportunisticQM cannot work together' . + 'with queues that do not implement polling'); } - return true; - } - - // OpportunisticQM shouldn't discard items it can't handle, we're - // only here to take care of what we _can_ handle! - protected function noHandlerFound(Queue_item $qi, $rep=null) { - $this->_log(LOG_WARNING, "[{$qi->transport}:item {$qi->id}] Releasing claim for queue item without a handler"); - $this->_fail($qi, true); // true here means "releaseOnly", so no error statistics since it's not an _error_ - } - - protected function _fail(Queue_item $qi, $releaseOnly=false) - { - parent::_fail($qi, $releaseOnly); - $this->_log(LOG_DEBUG, "[{$qi->transport}:item {$qi->id}] Ignoring this transport for the rest of this execution"); - $this->ignoreTransport($qi->transport); + ++$this->handled_items; + return $qm->poll(); } /** @@ -110,17 +110,22 @@ class OpportunisticQueueManager extends DBQueueManager public function runQueue() { while ($this->canContinue()) { - try { - $this->poll(); - } catch (RunQueueOutOfWorkException $e) { + if (!$this->poll()) { + // Out of work return true; } } + if ($this->handled_items > 0) { - common_debug('Opportunistic queue manager passed execution time/item handling limit without being out of work.'); + common_debug('Opportunistic queue manager passed execution time/item ' . + 'handling limit without being out of work.'); + return true; } elseif ($this->verbosity > 1) { - common_debug('Opportunistic queue manager did not have time to start on this action (max: '.$this->max_execution_time.' exceeded: '.abs(time()-$this->started_at).').'); + common_debug('Opportunistic queue manager did not have time to start ' . + 'on this action (max: ' . $this->max_execution_time . + ' exceeded: ' . abs(time() - $this->started_at) . ').'); } + return false; } }