[StompQueue] Changed StompQueueManager to use polling rather than sockets

This commit is contained in:
Miguel Dantas 2019-09-02 21:13:26 +01:00 committed by Hugo Sales
parent 6747b18b75
commit 78506d5249
3 changed files with 65 additions and 37 deletions

42
plugins/StompQueue/README Normal file
View File

@ -0,0 +1,42 @@
StompQueuePlugin wraps the StompQueueManager class which is a queue manager
that uses STOMP as a communication method to some form of backing storage.
Installation
============
This plugin is replaces other queue manager plugins, such as UnQueue,
which enabled by default and which should, but is not required to be
disabled.
addPlugin('StompQueue', ['servers' => ['your-redis-instance-and-port'],
'vhost' => 'your-vhost',
'username' => 'your-username',
'password' => 'your-password']);
Options
=======
servers (default: null) - array of server addresses to use
vhost (default: '') - configured vhost -- required
username (default: 'guest') -- configured username -- don't use the default
password (default: 'guest') -- configured password -- don't use the default
basename (default: "queue:gnusocial-{$site_name}") -- prefix for all queue names,
useful to avoid collisions. Cannot contain `/`
control (default: 'gnusocial:control') -- control channel name. Cannot contain `/`
breakout (default: null) -- array of queue names which should be broken out into a previously unused server
useTransactions (default: false) -- whether to use transactions, allowing rollbacks in case of failure
useAcks (default: false) -- whether to explicitly use acknowledgements when receiving a message.
Usefull to avoid timeouts and possibly reduce load on the STOMP server
manualFailover (default: false) -- whether to coordinate failover in PHP or to let all servers act
as one coordinated unit
defaultIdx (default: 0) -- index in the servers array which is used by default. Will be updated in case of an error
persistent (default: []) -- list of queues which should be persistent
Example
=======
In config.php
addPlugin('StompQueue', ['servers' => 'tcp://localhost:61613', 'vhost' => '/',
// Please don't actually use the default credentials
'username' => 'guest', 'password' => 'guest']);

View File

@ -36,7 +36,7 @@ class StompQueuePlugin extends Plugin
public $password = 'guest'; public $password = 'guest';
public $basename = ''; public $basename = '';
public $control = 'gnusocial:control'; public $control = 'gnusocial:control';
public $breakout; public $breakout = [];
public $useTransactions = false; public $useTransactions = false;
public $useAcks = false; public $useAcks = false;
public $manualFailover = false; public $manualFailover = false;
@ -62,7 +62,7 @@ class StompQueuePlugin extends Plugin
public function onPluginVersion(array &$versions): bool public function onPluginVersion(array &$versions): bool
{ {
$versions[] = array('name' => 'StompQueue', $versions[] = array('name' => 'StompQueue',
'version' => self::VERSION, 'version' => self::PLUGIN_VERSION,
'author' => 'Miguel Dantas', 'author' => 'Miguel Dantas',
'description' => 'description' =>
// TRANS: Plugin description. // TRANS: Plugin description.

View File

@ -35,7 +35,6 @@ class StompQueueManager extends QueueManager
public function start($master) public function start($master)
{ {
parent::start($master); parent::start($master);
common_debug("Starting STOMP queue manager");
return $this->_ensureConn(); return $this->_ensureConn();
} }
@ -172,6 +171,7 @@ class StompQueueManager extends QueueManager
*/ */
protected function subscribe(StatefulStomp $st) protected function subscribe(StatefulStomp $st)
{ {
$this->_ensureConn();
$host = $st->getClient()->getConnection()->getHost(); $host = $st->getClient()->getConnection()->getHost();
foreach ($this->subscriptions() as $sub) { foreach ($this->subscriptions() as $sub) {
if (!in_array($sub, $this->subscriptions)) { if (!in_array($sub, $this->subscriptions)) {
@ -293,40 +293,19 @@ class StompQueueManager extends QueueManager
} }
/** /**
* Send any sockets we're listening on to the IO manager * Poll every 10 seconds for new events during idle periods.
* to wait for input. * We'll look in more often when there's data available.
* Must be greater than 0 for the poll method to be called
* *
* @return array of resources * @return int seconds
*/ */
public function getSockets() public function pollInterval()
{ {
$sockets = []; return 10;
foreach ($this->stomps as $st) {
if ($st) {
$sockets[] = $st->getClient()->getConnection();
}
}
return $sockets;
} }
/** /**
* Get the Stomp connection object associated with the given socket. * Poll a queue and Handle an event
* @param resource $socket
* @return int index into connections list
* @throws Exception
*/
protected function connectionFromSocket($socket)
{
foreach ($this->stomps as $i => $st) {
if ($st && $st->getConnection() === $socket) {
return $i;
}
}
throw new Exception(__CLASS__ . " asked to read from unrecognized socket");
}
/**
* Handle and acknowledge an event that's come in through a queue.
* *
* If the queue handler reports failure, the message is requeued for later. * If the queue handler reports failure, the message is requeued for later.
* Missing notices or handler classes will drop the message. * Missing notices or handler classes will drop the message.
@ -334,15 +313,22 @@ class StompQueueManager extends QueueManager
* Side effects: in multi-site mode, may reset site configuration to * Side effects: in multi-site mode, may reset site configuration to
* match the site that queued the event. * match the site that queued the event.
* *
* @param Frame $frame
* @return bool success * @return bool success
* @throws ConfigException * @throws ConfigException
* @throws NoConfigException * @throws NoConfigException
* @throws ServerException * @throws ServerException
* @throws StompException * @throws StompException
*/ */
protected function handleItem($frame): bool public function poll(): bool
{ {
$this->_ensureConn();
$frame = $this->stomps[$this->pl->defaultIdx]->getClient()->readFrame();
if (empty($frame)) {
return false;
}
$host = $this->stomps[$this->pl->defaultIdx]->getHost(); $host = $this->stomps[$this->pl->defaultIdx]->getHost();
$message = unserialize(base64_decode($frame->body)); $message = unserialize(base64_decode($frame->body));
@ -396,7 +382,7 @@ class StompQueueManager extends QueueManager
$this->stats('requeued', $queue); $this->stats('requeued', $queue);
} }
return $ok; return false;
} }
/** /**
@ -584,7 +570,7 @@ class StompQueueManager extends QueueManager
* @throws NoConfigException * @throws NoConfigException
* @throws ServerException * @throws ServerException
*/ */
function switchSite($site) private function switchSite(string $site): void
{ {
if ($site != GNUsocial::currentSite()) { if ($site != GNUsocial::currentSite()) {
$this->stats('switch'); $this->stats('switch');
@ -601,12 +587,12 @@ class StompQueueManager extends QueueManager
* files might not. * files might not.
* *
* @param $nickname * @param $nickname
* @return void true to continue; false to stop further processing. * @return bool true to continue; false to stop further processing.
* @throws ConfigException * @throws ConfigException
* @throws NoConfigException * @throws NoConfigException
* @throws ServerException * @throws ServerException
*/ */
protected function updateSiteConfig($nickname) protected function updateSiteConfig($nickname): bool
{ {
$sn = Status_network::getKV('nickname', $nickname); $sn = Status_network::getKV('nickname', $nickname);
if ($sn) { if ($sn) {