[OpportunisticQM] Revamped plugin to be able to use other poll based queuemanagers, no just the DB
This commit is contained in:
parent
78506d5249
commit
a4e6db8d76
@ -1,17 +1,16 @@
|
|||||||
<?php
|
<?php
|
||||||
|
|
||||||
class OpportunisticQMPlugin extends Plugin {
|
class OpportunisticQMPlugin extends Plugin {
|
||||||
const PLUGIN_VERSION = '2.0.0';
|
const PLUGIN_VERSION = '3.0.0';
|
||||||
|
|
||||||
public $qmkey = false;
|
public $qmkey = false;
|
||||||
public $secs_per_action = 1; // total seconds to run script per action
|
public $secs_per_action = 1; // total seconds to run script per action
|
||||||
public $rel_to_pageload = true; // relative to pageload or queue start
|
public $rel_to_pageload = true; // relative to pageload or queue start
|
||||||
public $verbosity = 1;
|
public $verbosity = 1;
|
||||||
|
|
||||||
public function onRouterInitialized($m)
|
public function onRouterInitialized($m)
|
||||||
{
|
{
|
||||||
$m->connect('main/runqueue',
|
$m->connect('main/runqueue', ['action' => 'runqueue']);
|
||||||
['action' => 'runqueue']);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -26,23 +25,21 @@ class OpportunisticQMPlugin extends Plugin {
|
|||||||
|
|
||||||
global $_startTime;
|
global $_startTime;
|
||||||
|
|
||||||
$args = array(
|
$args = ['qmkey' => common_config('opportunisticqm', 'qmkey'),
|
||||||
'qmkey' => common_config('opportunisticqm', 'qmkey'),
|
'max_execution_time' => $this->secs_per_action,
|
||||||
'max_execution_time' => $this->secs_per_action,
|
'started_at' => $this->rel_to_pageload ? $_startTime : null,
|
||||||
'started_at' => $this->rel_to_pageload ? $_startTime : null,
|
'verbosity' => $this->verbosity];
|
||||||
'verbosity' => $this->verbosity,
|
$qm = new OpportunisticQueueManager($args);
|
||||||
);
|
|
||||||
$qm = new OpportunisticQueueManager($args);
|
|
||||||
$qm->runQueue();
|
$qm->runQueue();
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
public function onPluginVersion(array &$versions): bool
|
public function onPluginVersion(array &$versions): bool
|
||||||
{
|
{
|
||||||
$versions[] = array('name' => 'OpportunisticQM',
|
$versions[] = array('name' => 'OpportunisticQM',
|
||||||
'version' => self::PLUGIN_VERSION,
|
'version' => self::PLUGIN_VERSION,
|
||||||
'author' => 'Mikael Nordfeldth',
|
'author' => 'Mikael Nordfeldth',
|
||||||
'homepage' => 'http://www.gnu.org/software/social/',
|
'homepage' => 'http://www.gnu.org/software/social/',
|
||||||
'description' =>
|
'description' =>
|
||||||
// TRANS: Plugin description.
|
// TRANS: Plugin description.
|
||||||
_m('Opportunistic queue manager plugin for background processing.'));
|
_m('Opportunistic queue manager plugin for background processing.'));
|
||||||
|
@ -23,16 +23,14 @@ class RunqueueAction extends Action
|
|||||||
{
|
{
|
||||||
protected $qm = null;
|
protected $qm = null;
|
||||||
|
|
||||||
protected function prepare(array $args=array())
|
protected function prepare(array $args = [])
|
||||||
{
|
{
|
||||||
parent::prepare($args);
|
parent::prepare($args);
|
||||||
|
|
||||||
$args = array();
|
$args = [];
|
||||||
|
|
||||||
foreach (array('qmkey') as $key) {
|
if ($this->arg('qmkey') !== null) {
|
||||||
if ($this->arg($key) !== null) {
|
$args['qmkey'] = $this->arg('qmkey');
|
||||||
$args[$key] = $this->arg($key);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
@ -14,7 +14,7 @@
|
|||||||
* @license http://www.fsf.org/licensing/licenses/agpl-3.0.html AGPL 3.0
|
* @license http://www.fsf.org/licensing/licenses/agpl-3.0.html AGPL 3.0
|
||||||
* @link http://status.net/
|
* @link http://status.net/
|
||||||
*/
|
*/
|
||||||
class OpportunisticQueueManager extends DBQueueManager
|
class OpportunisticQueueManager extends QueueManager
|
||||||
{
|
{
|
||||||
protected $qmkey = false;
|
protected $qmkey = false;
|
||||||
protected $max_execution_time = null;
|
protected $max_execution_time = null;
|
||||||
@ -26,9 +26,10 @@ class OpportunisticQueueManager extends DBQueueManager
|
|||||||
|
|
||||||
protected $verbosity = null;
|
protected $verbosity = null;
|
||||||
|
|
||||||
const MAXEXECTIME = 20; // typically just used for the /main/cron action, only used if php.ini max_execution_time is 0
|
// typically just used for the /main/cron action, only used if php.ini max_execution_time is 0
|
||||||
|
const MAXEXECTIME = 20;
|
||||||
|
|
||||||
public function __construct(array $args=array()) {
|
public function __construct(array $args = []) {
|
||||||
foreach (get_class_vars(get_class($this)) as $key=>$val) {
|
foreach (get_class_vars(get_class($this)) as $key=>$val) {
|
||||||
if (array_key_exists($key, $args)) {
|
if (array_key_exists($key, $args)) {
|
||||||
$this->$key = $args[$key];
|
$this->$key = $args[$key];
|
||||||
@ -45,12 +46,14 @@ class OpportunisticQueueManager extends DBQueueManager
|
|||||||
}
|
}
|
||||||
|
|
||||||
if ($this->max_execution_margin === null) {
|
if ($this->max_execution_margin === null) {
|
||||||
$this->max_execution_margin = common_config('http', 'connect_timeout') + 1; // think PHP's max exec time, minus this value to have time for timeouts etc.
|
// think PHP's max exec time, minus this value to have time for timeouts etc.
|
||||||
|
$this->max_execution_margin = common_config('http', 'connect_timeout') + 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
return parent::__construct();
|
return parent::__construct();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
protected function verifyKey()
|
protected function verifyKey()
|
||||||
{
|
{
|
||||||
if ($this->qmkey !== common_config('opportunisticqm', 'qmkey')) {
|
if ($this->qmkey !== common_config('opportunisticqm', 'qmkey')) {
|
||||||
@ -58,6 +61,12 @@ class OpportunisticQueueManager extends DBQueueManager
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public function enqueue($object, $key)
|
||||||
|
{
|
||||||
|
// Nothing to do, should never get called
|
||||||
|
throw new ServerException('OpportunisticQueueManager::enqueue should never be called');
|
||||||
|
}
|
||||||
|
|
||||||
public function canContinue()
|
public function canContinue()
|
||||||
{
|
{
|
||||||
$time_passed = time() - $this->started_at;
|
$time_passed = time() - $this->started_at;
|
||||||
@ -67,7 +76,8 @@ class OpportunisticQueueManager extends DBQueueManager
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
// If too much time has passed, stop
|
// If too much time has passed, stop
|
||||||
if ($time_passed >= $this->max_execution_time || $time_passed > ini_get('max_execution_time') - $this->max_execution_margin) {
|
if ($time_passed >= $this->max_execution_time ||
|
||||||
|
$time_passed > ini_get('max_execution_time') - $this->max_execution_margin) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
// If we have a max-item-limit, check if it has been passed
|
// If we have a max-item-limit, check if it has been passed
|
||||||
@ -80,25 +90,15 @@ class OpportunisticQueueManager extends DBQueueManager
|
|||||||
|
|
||||||
public function poll()
|
public function poll()
|
||||||
{
|
{
|
||||||
$this->handled_items++;
|
$qm = $this->get();
|
||||||
if (!parent::poll()) {
|
if ($qm->pollInterval() <= 0 && ! $qm instanceof UnQueueManager) {
|
||||||
throw new RunQueueOutOfWorkException();
|
// Special case for UnQueueManager as it is a default plugin
|
||||||
|
// and does not require queues, since it does everything immediately
|
||||||
|
throw new ServerException('OpportunisticQM cannot work together' .
|
||||||
|
'with queues that do not implement polling');
|
||||||
}
|
}
|
||||||
return true;
|
++$this->handled_items;
|
||||||
}
|
return $qm->poll();
|
||||||
|
|
||||||
// OpportunisticQM shouldn't discard items it can't handle, we're
|
|
||||||
// only here to take care of what we _can_ handle!
|
|
||||||
protected function noHandlerFound(Queue_item $qi, $rep=null) {
|
|
||||||
$this->_log(LOG_WARNING, "[{$qi->transport}:item {$qi->id}] Releasing claim for queue item without a handler");
|
|
||||||
$this->_fail($qi, true); // true here means "releaseOnly", so no error statistics since it's not an _error_
|
|
||||||
}
|
|
||||||
|
|
||||||
protected function _fail(Queue_item $qi, $releaseOnly=false)
|
|
||||||
{
|
|
||||||
parent::_fail($qi, $releaseOnly);
|
|
||||||
$this->_log(LOG_DEBUG, "[{$qi->transport}:item {$qi->id}] Ignoring this transport for the rest of this execution");
|
|
||||||
$this->ignoreTransport($qi->transport);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -110,17 +110,22 @@ class OpportunisticQueueManager extends DBQueueManager
|
|||||||
public function runQueue()
|
public function runQueue()
|
||||||
{
|
{
|
||||||
while ($this->canContinue()) {
|
while ($this->canContinue()) {
|
||||||
try {
|
if (!$this->poll()) {
|
||||||
$this->poll();
|
// Out of work
|
||||||
} catch (RunQueueOutOfWorkException $e) {
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if ($this->handled_items > 0) {
|
if ($this->handled_items > 0) {
|
||||||
common_debug('Opportunistic queue manager passed execution time/item handling limit without being out of work.');
|
common_debug('Opportunistic queue manager passed execution time/item ' .
|
||||||
|
'handling limit without being out of work.');
|
||||||
|
return true;
|
||||||
} elseif ($this->verbosity > 1) {
|
} elseif ($this->verbosity > 1) {
|
||||||
common_debug('Opportunistic queue manager did not have time to start on this action (max: '.$this->max_execution_time.' exceeded: '.abs(time()-$this->started_at).').');
|
common_debug('Opportunistic queue manager did not have time to start ' .
|
||||||
|
'on this action (max: ' . $this->max_execution_time .
|
||||||
|
' exceeded: ' . abs(time() - $this->started_at) . ').');
|
||||||
}
|
}
|
||||||
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user