gnu-social/lib/liberalstomp.php

152 lines
4.7 KiB
PHP
Raw Normal View History

Major refactoring of queue handlers to support running multiple sites in one daemon. Key changes: * Initialization code moved from common.php to StatusNet class; can now switch configurations during runtime. * As a consequence, configuration files must now be idempotent... Be careful with constant, function or class definitions. * Control structure for daemons/QueueManager/QueueHandler has been refactored; the run loop is now managed by IoMaster run via scripts/queuedaemon.php IoManager subclasses are woken to handle socket input or polling, and may cover multiple sites. * Plugins can implement notice queue handlers more easily by registering a QueueHandler class; no more need to add a daemon. The new QueueDaemon runs from scripts/queuedaemon.php: * This replaces most of the old *handler.php scripts; they've been refactored to the bare handler classes. * Spawns multiple child processes to spread load; defaults to CPU count on Linux and Mac OS X systems, or override with --threads=N * When multithreaded, child processes are automatically respawned on failure. * Threads gracefully shut down and restart when passing a soft memory limit (defaults to 90% of memory_limit), limiting damage from memory leaks. * Support for UDP-based monitoring: http://www.gitorious.org/snqmon Rough control flow diagram: QueueDaemon -> IoMaster -> IoManager QueueManager [listen or poll] -> QueueHandler XmppManager [ping & keepalive] XmppConfirmManager [poll updates] Todo: * Respawning features not currently available running single-threaded. * When running single-site, configuration changes aren't picked up. * New sites or config changes affecting queue subscriptions are not yet handled without a daemon restart. * SNMP monitoring output to integrate with general tools (nagios, ganglia) * Convert XMPP confirmation message sends to use stomp queue instead of polling * Convert xmppdaemon.php to IoManager? * Convert Twitter status, friends import polling daemons to IoManager * Clean up some error reporting and failure modes * May need to adjust queue priorities for best perf in backlog/flood cases Detailed code history available in my daemon-work branch: http://www.gitorious.org/~brion/statusnet/brion-fixes/commits/daemon-work
2010-01-13 03:57:15 +00:00
<?php
/**
* Based on code from Stomp PHP library, working around bugs in the base class.
*
* Original code is copyright 2005-2006 The Apache Software Foundation
* Modifications copyright 2009 StatusNet Inc by Brion Vibber <brion@status.net>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
class LiberalStomp extends Stomp
{
/**
* We need to be able to get the socket so advanced daemons can
* do a select() waiting for input both from the queue and from
* other sources such as an XMPP connection.
*
* @return resource
*/
function getSocket()
{
return $this->_socket;
}
/**
* Return the host we're currently connected to.
*
* @return string
*/
function getServer()
{
$idx = $this->_currentHost;
if ($idx >= 0) {
$host = $this->_hosts[$idx];
return "$host[0]:$host[1]";
} else {
return '[unconnected]';
}
}
Major refactoring of queue handlers to support running multiple sites in one daemon. Key changes: * Initialization code moved from common.php to StatusNet class; can now switch configurations during runtime. * As a consequence, configuration files must now be idempotent... Be careful with constant, function or class definitions. * Control structure for daemons/QueueManager/QueueHandler has been refactored; the run loop is now managed by IoMaster run via scripts/queuedaemon.php IoManager subclasses are woken to handle socket input or polling, and may cover multiple sites. * Plugins can implement notice queue handlers more easily by registering a QueueHandler class; no more need to add a daemon. The new QueueDaemon runs from scripts/queuedaemon.php: * This replaces most of the old *handler.php scripts; they've been refactored to the bare handler classes. * Spawns multiple child processes to spread load; defaults to CPU count on Linux and Mac OS X systems, or override with --threads=N * When multithreaded, child processes are automatically respawned on failure. * Threads gracefully shut down and restart when passing a soft memory limit (defaults to 90% of memory_limit), limiting damage from memory leaks. * Support for UDP-based monitoring: http://www.gitorious.org/snqmon Rough control flow diagram: QueueDaemon -> IoMaster -> IoManager QueueManager [listen or poll] -> QueueHandler XmppManager [ping & keepalive] XmppConfirmManager [poll updates] Todo: * Respawning features not currently available running single-threaded. * When running single-site, configuration changes aren't picked up. * New sites or config changes affecting queue subscriptions are not yet handled without a daemon restart. * SNMP monitoring output to integrate with general tools (nagios, ganglia) * Convert XMPP confirmation message sends to use stomp queue instead of polling * Convert xmppdaemon.php to IoManager? * Convert Twitter status, friends import polling daemons to IoManager * Clean up some error reporting and failure modes * May need to adjust queue priorities for best perf in backlog/flood cases Detailed code history available in my daemon-work branch: http://www.gitorious.org/~brion/statusnet/brion-fixes/commits/daemon-work
2010-01-13 03:57:15 +00:00
/**
* Make socket connection to the server
* We also set the stream to non-blocking mode, since we'll be
* select'ing to wait for updates. In blocking mode it seems
* to get confused sometimes.
*
* @throws StompException
*/
protected function _makeConnection ()
{
parent::_makeConnection();
stream_set_blocking($this->_socket, 0);
}
/**
* Version 1.0.0 of the Stomp library gets confused if messages
* come in too fast over the connection. This version will read
* out as many frames as are ready to be read from the socket.
*
* Modified from Stomp::readFrame()
*
* @return StompFrame False when no frame to read
*/
public function readFrames ()
{
if (!$this->hasFrameToRead()) {
return false;
}
$rb = 1024;
$data = '';
$end = false;
$frames = array();
do {
// @fixme this sometimes hangs in blocking mode...
// shouldn't we have been idle until we found there's more data?
$read = fread($this->_socket, $rb);
if ($read === false || ($read === '' && feof($this->_socket))) {
// @fixme possibly attempt an auto reconnect as old code?
throw new StompException("Error reading");
//$this->_reconnect();
Major refactoring of queue handlers to support running multiple sites in one daemon. Key changes: * Initialization code moved from common.php to StatusNet class; can now switch configurations during runtime. * As a consequence, configuration files must now be idempotent... Be careful with constant, function or class definitions. * Control structure for daemons/QueueManager/QueueHandler has been refactored; the run loop is now managed by IoMaster run via scripts/queuedaemon.php IoManager subclasses are woken to handle socket input or polling, and may cover multiple sites. * Plugins can implement notice queue handlers more easily by registering a QueueHandler class; no more need to add a daemon. The new QueueDaemon runs from scripts/queuedaemon.php: * This replaces most of the old *handler.php scripts; they've been refactored to the bare handler classes. * Spawns multiple child processes to spread load; defaults to CPU count on Linux and Mac OS X systems, or override with --threads=N * When multithreaded, child processes are automatically respawned on failure. * Threads gracefully shut down and restart when passing a soft memory limit (defaults to 90% of memory_limit), limiting damage from memory leaks. * Support for UDP-based monitoring: http://www.gitorious.org/snqmon Rough control flow diagram: QueueDaemon -> IoMaster -> IoManager QueueManager [listen or poll] -> QueueHandler XmppManager [ping & keepalive] XmppConfirmManager [poll updates] Todo: * Respawning features not currently available running single-threaded. * When running single-site, configuration changes aren't picked up. * New sites or config changes affecting queue subscriptions are not yet handled without a daemon restart. * SNMP monitoring output to integrate with general tools (nagios, ganglia) * Convert XMPP confirmation message sends to use stomp queue instead of polling * Convert xmppdaemon.php to IoManager? * Convert Twitter status, friends import polling daemons to IoManager * Clean up some error reporting and failure modes * May need to adjust queue priorities for best perf in backlog/flood cases Detailed code history available in my daemon-work branch: http://www.gitorious.org/~brion/statusnet/brion-fixes/commits/daemon-work
2010-01-13 03:57:15 +00:00
// @fixme this will lose prior items
//return $this->readFrames();
Major refactoring of queue handlers to support running multiple sites in one daemon. Key changes: * Initialization code moved from common.php to StatusNet class; can now switch configurations during runtime. * As a consequence, configuration files must now be idempotent... Be careful with constant, function or class definitions. * Control structure for daemons/QueueManager/QueueHandler has been refactored; the run loop is now managed by IoMaster run via scripts/queuedaemon.php IoManager subclasses are woken to handle socket input or polling, and may cover multiple sites. * Plugins can implement notice queue handlers more easily by registering a QueueHandler class; no more need to add a daemon. The new QueueDaemon runs from scripts/queuedaemon.php: * This replaces most of the old *handler.php scripts; they've been refactored to the bare handler classes. * Spawns multiple child processes to spread load; defaults to CPU count on Linux and Mac OS X systems, or override with --threads=N * When multithreaded, child processes are automatically respawned on failure. * Threads gracefully shut down and restart when passing a soft memory limit (defaults to 90% of memory_limit), limiting damage from memory leaks. * Support for UDP-based monitoring: http://www.gitorious.org/snqmon Rough control flow diagram: QueueDaemon -> IoMaster -> IoManager QueueManager [listen or poll] -> QueueHandler XmppManager [ping & keepalive] XmppConfirmManager [poll updates] Todo: * Respawning features not currently available running single-threaded. * When running single-site, configuration changes aren't picked up. * New sites or config changes affecting queue subscriptions are not yet handled without a daemon restart. * SNMP monitoring output to integrate with general tools (nagios, ganglia) * Convert XMPP confirmation message sends to use stomp queue instead of polling * Convert xmppdaemon.php to IoManager? * Convert Twitter status, friends import polling daemons to IoManager * Clean up some error reporting and failure modes * May need to adjust queue priorities for best perf in backlog/flood cases Detailed code history available in my daemon-work branch: http://www.gitorious.org/~brion/statusnet/brion-fixes/commits/daemon-work
2010-01-13 03:57:15 +00:00
}
$data .= $read;
if (strpos($data, "\x00") !== false) {
// Frames are null-delimited, but some servers
// may append an extra \n according to old bug reports.
$data = str_replace("\x00\n", "\x00", $data);
$chunks = explode("\x00", $data);
$data = array_pop($chunks);
$frames = array_merge($frames, $chunks);
if ($data == '') {
// We're at the end of a frame; stop reading.
break;
} else {
// In the middle of a frame; keep going.
}
}
// @fixme find out why this len < 2 check was there
//$len = strlen($data);
} while (true);//$len < 2 || $end == false);
return array_map(array($this, 'parseFrame'), $frames);
}
/**
* Parse a raw Stomp frame into an object.
* Extracted from Stomp::readFrame()
*
* @param string $data
* @return StompFrame
*/
function parseFrame($data)
{
list ($header, $body) = explode("\n\n", $data, 2);
$header = explode("\n", $header);
$headers = array();
$command = null;
foreach ($header as $v) {
if (isset($command)) {
list ($name, $value) = explode(':', $v, 2);
$headers[$name] = $value;
} else {
$command = $v;
}
}
$frame = new StompFrame($command, $headers, trim($body));
if (isset($frame->headers['transformation']) && $frame->headers['transformation'] == 'jms-map-json') {
require_once 'Stomp/Message/Map.php';
return new StompMessageMap($frame);
} else {
return $frame;
}
return $frame;
}
}