[OpportunisticQM] Revamped plugin to be able to use other poll based queuemanagers, no just the DB

This commit is contained in:
Miguel Dantas 2019-09-02 21:39:55 +01:00 committed by Diogo Peralta Cordeiro
parent b0e10f01cb
commit cb412d228c
3 changed files with 49 additions and 49 deletions

View File

@ -1,17 +1,16 @@
<?php <?php
class OpportunisticQMPlugin extends Plugin { class OpportunisticQMPlugin extends Plugin {
const PLUGIN_VERSION = '2.0.0'; const PLUGIN_VERSION = '3.0.0';
public $qmkey = false; public $qmkey = false;
public $secs_per_action = 1; // total seconds to run script per action public $secs_per_action = 1; // total seconds to run script per action
public $rel_to_pageload = true; // relative to pageload or queue start public $rel_to_pageload = true; // relative to pageload or queue start
public $verbosity = 1; public $verbosity = 1;
public function onRouterInitialized($m) public function onRouterInitialized($m)
{ {
$m->connect('main/runqueue', $m->connect('main/runqueue', ['action' => 'runqueue']);
['action' => 'runqueue']);
} }
/** /**
@ -26,23 +25,21 @@ class OpportunisticQMPlugin extends Plugin {
global $_startTime; global $_startTime;
$args = array( $args = ['qmkey' => common_config('opportunisticqm', 'qmkey'),
'qmkey' => common_config('opportunisticqm', 'qmkey'), 'max_execution_time' => $this->secs_per_action,
'max_execution_time' => $this->secs_per_action, 'started_at' => $this->rel_to_pageload ? $_startTime : null,
'started_at' => $this->rel_to_pageload ? $_startTime : null, 'verbosity' => $this->verbosity];
'verbosity' => $this->verbosity, $qm = new OpportunisticQueueManager($args);
);
$qm = new OpportunisticQueueManager($args);
$qm->runQueue(); $qm->runQueue();
return true; return true;
} }
public function onPluginVersion(array &$versions): bool public function onPluginVersion(array &$versions): bool
{ {
$versions[] = array('name' => 'OpportunisticQM', $versions[] = array('name' => 'OpportunisticQM',
'version' => self::PLUGIN_VERSION, 'version' => self::PLUGIN_VERSION,
'author' => 'Mikael Nordfeldth', 'author' => 'Mikael Nordfeldth',
'homepage' => 'http://www.gnu.org/software/social/', 'homepage' => 'http://www.gnu.org/software/social/',
'description' => 'description' =>
// TRANS: Plugin description. // TRANS: Plugin description.
_m('Opportunistic queue manager plugin for background processing.')); _m('Opportunistic queue manager plugin for background processing.'));

View File

@ -23,16 +23,14 @@ class RunqueueAction extends Action
{ {
protected $qm = null; protected $qm = null;
protected function prepare(array $args=array()) protected function prepare(array $args = [])
{ {
parent::prepare($args); parent::prepare($args);
$args = array(); $args = [];
foreach (array('qmkey') as $key) { if ($this->arg('qmkey') !== null) {
if ($this->arg($key) !== null) { $args['qmkey'] = $this->arg('qmkey');
$args[$key] = $this->arg($key);
}
} }
try { try {

View File

@ -14,7 +14,7 @@
* @license http://www.fsf.org/licensing/licenses/agpl-3.0.html AGPL 3.0 * @license http://www.fsf.org/licensing/licenses/agpl-3.0.html AGPL 3.0
* @link http://status.net/ * @link http://status.net/
*/ */
class OpportunisticQueueManager extends DBQueueManager class OpportunisticQueueManager extends QueueManager
{ {
protected $qmkey = false; protected $qmkey = false;
protected $max_execution_time = null; protected $max_execution_time = null;
@ -26,9 +26,10 @@ class OpportunisticQueueManager extends DBQueueManager
protected $verbosity = null; protected $verbosity = null;
const MAXEXECTIME = 20; // typically just used for the /main/cron action, only used if php.ini max_execution_time is 0 // typically just used for the /main/cron action, only used if php.ini max_execution_time is 0
const MAXEXECTIME = 20;
public function __construct(array $args=array()) { public function __construct(array $args = []) {
foreach (get_class_vars(get_class($this)) as $key=>$val) { foreach (get_class_vars(get_class($this)) as $key=>$val) {
if (array_key_exists($key, $args)) { if (array_key_exists($key, $args)) {
$this->$key = $args[$key]; $this->$key = $args[$key];
@ -45,12 +46,14 @@ class OpportunisticQueueManager extends DBQueueManager
} }
if ($this->max_execution_margin === null) { if ($this->max_execution_margin === null) {
$this->max_execution_margin = common_config('http', 'connect_timeout') + 1; // think PHP's max exec time, minus this value to have time for timeouts etc. // think PHP's max exec time, minus this value to have time for timeouts etc.
$this->max_execution_margin = common_config('http', 'connect_timeout') + 1;
} }
return parent::__construct(); return parent::__construct();
} }
protected function verifyKey() protected function verifyKey()
{ {
if ($this->qmkey !== common_config('opportunisticqm', 'qmkey')) { if ($this->qmkey !== common_config('opportunisticqm', 'qmkey')) {
@ -58,6 +61,12 @@ class OpportunisticQueueManager extends DBQueueManager
} }
} }
public function enqueue($object, $key)
{
// Nothing to do, should never get called
throw new ServerException('OpportunisticQueueManager::enqueue should never be called');
}
public function canContinue() public function canContinue()
{ {
$time_passed = time() - $this->started_at; $time_passed = time() - $this->started_at;
@ -67,7 +76,8 @@ class OpportunisticQueueManager extends DBQueueManager
return false; return false;
} }
// If too much time has passed, stop // If too much time has passed, stop
if ($time_passed >= $this->max_execution_time || $time_passed > ini_get('max_execution_time') - $this->max_execution_margin) { if ($time_passed >= $this->max_execution_time ||
$time_passed > ini_get('max_execution_time') - $this->max_execution_margin) {
return false; return false;
} }
// If we have a max-item-limit, check if it has been passed // If we have a max-item-limit, check if it has been passed
@ -80,25 +90,15 @@ class OpportunisticQueueManager extends DBQueueManager
public function poll() public function poll()
{ {
$this->handled_items++; $qm = $this->get();
if (!parent::poll()) { if ($qm->pollInterval() <= 0 && ! $qm instanceof UnQueueManager) {
throw new RunQueueOutOfWorkException(); // Special case for UnQueueManager as it is a default plugin
// and does not require queues, since it does everything immediately
throw new ServerException('OpportunisticQM cannot work together' .
'with queues that do not implement polling');
} }
return true; ++$this->handled_items;
} return $qm->poll();
// OpportunisticQM shouldn't discard items it can't handle, we're
// only here to take care of what we _can_ handle!
protected function noHandlerFound(Queue_item $qi, $rep=null) {
$this->_log(LOG_WARNING, "[{$qi->transport}:item {$qi->id}] Releasing claim for queue item without a handler");
$this->_fail($qi, true); // true here means "releaseOnly", so no error statistics since it's not an _error_
}
protected function _fail(Queue_item $qi, $releaseOnly=false)
{
parent::_fail($qi, $releaseOnly);
$this->_log(LOG_DEBUG, "[{$qi->transport}:item {$qi->id}] Ignoring this transport for the rest of this execution");
$this->ignoreTransport($qi->transport);
} }
/** /**
@ -110,17 +110,22 @@ class OpportunisticQueueManager extends DBQueueManager
public function runQueue() public function runQueue()
{ {
while ($this->canContinue()) { while ($this->canContinue()) {
try { if (!$this->poll()) {
$this->poll(); // Out of work
} catch (RunQueueOutOfWorkException $e) {
return true; return true;
} }
} }
if ($this->handled_items > 0) { if ($this->handled_items > 0) {
common_debug('Opportunistic queue manager passed execution time/item handling limit without being out of work.'); common_debug('Opportunistic queue manager passed execution time/item ' .
'handling limit without being out of work.');
return true;
} elseif ($this->verbosity > 1) { } elseif ($this->verbosity > 1) {
common_debug('Opportunistic queue manager did not have time to start on this action (max: '.$this->max_execution_time.' exceeded: '.abs(time()-$this->started_at).').'); common_debug('Opportunistic queue manager did not have time to start ' .
'on this action (max: ' . $this->max_execution_time .
' exceeded: ' . abs(time() - $this->started_at) . ').');
} }
return false; return false;
} }
} }