From c4d67892751b17856b235182874c3304890dc2c3 Mon Sep 17 00:00:00 2001 From: Evan Prodromou Date: Fri, 29 Aug 2008 14:17:02 -0400 Subject: [PATCH] split public stream to its own queue handler Add another queue handler for the public stream. Should further parallelize the work of sending out messages. darcs-hash:20080829181702-84dde-594505aa73d2380b13bd98917b70b02bac597d12.gz --- lib/jabber.php | 39 +++++++++++--------- lib/util.php | 8 ++++- scripts/publicqueuehandler.php | 66 ++++++++++++++++++++++++++++++++++ scripts/startdaemons.sh | 1 + scripts/stopdaemons.sh | 1 + scripts/xmppdaemon.php | 2 +- scripts/xmppqueuehandler.php | 4 +-- 7 files changed, 101 insertions(+), 20 deletions(-) create mode 100755 scripts/publicqueuehandler.php diff --git a/lib/jabber.php b/lib/jabber.php index 215cd55374..171dff4df4 100644 --- a/lib/jabber.php +++ b/lib/jabber.php @@ -24,32 +24,32 @@ require_once('XMPPHP/XMPP.php'); # XXX: something of a hack to work around problems with the XMPPHP lib class Laconica_XMPP extends XMPPHP_XMPP { - + function messageplus($to, $body, $type = 'chat', $subject = null, $payload = null) { $to = htmlspecialchars($to); $body = htmlspecialchars($body); $subject = htmlspecialchars($subject); - + $jid = jabber_daemon_address(); - + $out = ""; if($subject) $out .= "$subject"; $out .= "$body"; if($payload) $out .= $payload; $out .= ""; - + $cnt = strlen($out); common_log(LOG_DEBUG, "Sending $cnt chars to $to"); $this->send($out); common_log(LOG_DEBUG, 'Done.'); } - + public function presence($status = null, $show = 'available', $to = null, $type='available', $priority=NULL) { if($type == 'available') $type = ''; $to = htmlspecialchars($to); $status = htmlspecialchars($status); if($show == 'unavailable') $type = 'unavailable'; - + $out = "send($out); } } @@ -105,7 +105,7 @@ function jabber_connect($resource=NULL, $status=NULL, $priority=NULL) { ); $conn->autoSubscribe(); $conn->useEncryption(common_config('xmpp', 'encryption')); - + if (!$conn) { return false; } @@ -141,7 +141,7 @@ function jabber_send_notice($to, $notice) { # Extra stuff defined by Twitter, needed by twitter clients function jabber_format_entry($profile, $notice) { - + $noticeurl = common_local_url('shownotice', array('notice' => $notice->id)); $msg = jabber_format_notice($profile, $notice); @@ -167,7 +167,7 @@ function jabber_format_entry($profile, $notice) { $html .= ($notice->rendered) ? $notice->rendered : common_render_content($notice->content, $notice); $html .= "\n\n"; $html .= "\n\n"; - + $event = "\n"; $event .= "\n"; @@ -229,6 +229,7 @@ function jabber_special_presence($type, $to=NULL, $show=NULL, $status=NULL) { } function jabber_broadcast_notice($notice) { + if (!common_config('xmpp', 'enabled')) { return true; } @@ -268,7 +269,7 @@ function jabber_broadcast_notice($notice) { # XXX: use a join here rather than looping through results $sub = new Subscription(); $sub->subscribed = $notice->profile_id; - + if ($sub->find()) { while ($sub->fetch()) { $user = User::staticGet($sub->subscriber); @@ -289,14 +290,20 @@ function jabber_broadcast_notice($notice) { } } } + + return true; +} + +function jabber_public_notice($notice) { # Now, users who want everything - + $public = common_config('xmpp', 'public'); - + # FIXME PRIV don't send out private messages here - # XXX: should we send out non-local messages if public,localonly = false? I think not - + # XXX: should we send out non-local messages if public,localonly + # = false? I think not + if ($public && $notice->is_local) { foreach ($public as $address) { common_log(LOG_INFO, @@ -305,7 +312,7 @@ function jabber_broadcast_notice($notice) { jabber_send_notice($address, $notice); } } - + return true; } diff --git a/lib/util.php b/lib/util.php index 496c6f3d2c..c6cdfbcb94 100644 --- a/lib/util.php +++ b/lib/util.php @@ -1070,7 +1070,7 @@ function common_broadcast_notice($notice, $remote=false) { # Stick the notice on the queue function common_enqueue_notice($notice) { - foreach (array('jabber', 'omb', 'sms') as $transport) { + foreach (array('jabber', 'omb', 'sms', 'public') as $transport) { $qi = new Queue_item(); $qi->notice_id = $notice->id; $qi->transport = $transport; @@ -1126,6 +1126,12 @@ function common_real_broadcast($notice, $remote=false) { common_log(LOG_ERR, 'Error in sms broadcast for notice ' . $notice->id); } } + if ($success) { + $success = jabber_public_notice($notice); + if (!$success) { + common_log(LOG_ERR, 'Error in public broadcast for notice ' . $notice->id); + } + } // XXX: broadcast notices to other IM return $success; } diff --git a/scripts/publicqueuehandler.php b/scripts/publicqueuehandler.php new file mode 100755 index 0000000000..0d95a489f8 --- /dev/null +++ b/scripts/publicqueuehandler.php @@ -0,0 +1,66 @@ +#!/usr/bin/env php +. + */ + +# Abort if called from a web server +if (isset($_SERVER) && array_key_exists('REQUEST_METHOD', $_SERVER)) { + print "This script must be run from the command line\n"; + exit(); +} + +define('INSTALLDIR', realpath(dirname(__FILE__) . '/..')); +define('LACONICA', true); + +require_once(INSTALLDIR . '/lib/common.php'); +require_once(INSTALLDIR . '/lib/jabber.php'); +require_once(INSTALLDIR . '/lib/queuehandler.php'); + +set_error_handler('common_error_handler'); + +class PublicQueueHandler extends QueueHandler { + + function transport() { + return 'public'; + } + + function start() { + # Low priority; we don't want to receive messages + $this->conn = jabber_connect($this->_id, NULL, -1); + return !is_null($this->conn); + } + + function handle_notice($notice) { + return jabber_public_notice($notice); + } + + function finish() { + } +} + +mb_internal_encoding('UTF-8'); + +$resource = ($argc > 1) ? $argv[1] : (common_config('xmpp','resource') . '-public'); + +$handler = new XmppQueueHandler($resource); + +if ($handler->start()) { + $handler->handle_queue(); +} + +$handler->finish(); diff --git a/scripts/startdaemons.sh b/scripts/startdaemons.sh index 37d30cdb40..8c054417de 100755 --- a/scripts/startdaemons.sh +++ b/scripts/startdaemons.sh @@ -22,6 +22,7 @@ export INSTALLDIR=$1 /sbin/start-stop-daemon -S --exec $INSTALLDIR/scripts/xmppdaemon.php -b -m --pidfile=/var/run/xmppdaemon.pid /sbin/start-stop-daemon -S --exec $INSTALLDIR/scripts/xmppqueuehandler.php -b -m --pidfile=/var/run/xmppqueuehandler.pid +/sbin/start-stop-daemon -S --exec $INSTALLDIR/scripts/publicqueuehandler.php -b -m --pidfile=/var/run/publicqueuehandler.pid /sbin/start-stop-daemon -S --exec $INSTALLDIR/scripts/xmppconfirmhandler.php -b -m --pidfile=/var/run/xmppconfirmhandler.pid /sbin/start-stop-daemon -S --exec $INSTALLDIR/scripts/smsqueuehandler.php -b -m --pidfile=/var/run/smsqueuehandler.pid /sbin/start-stop-daemon -S --exec $INSTALLDIR/scripts/ombqueuehandler.php -b -m --pidfile=/var/run/ombqueuehandler.pid diff --git a/scripts/stopdaemons.sh b/scripts/stopdaemons.sh index d620472f05..6814101cb2 100755 --- a/scripts/stopdaemons.sh +++ b/scripts/stopdaemons.sh @@ -22,6 +22,7 @@ export INSTALLDIR=$1 /sbin/start-stop-daemon -K -m --pidfile=/var/run/xmppdaemon.pid /sbin/start-stop-daemon -K -m --pidfile=/var/run/xmppqueuehandler.pid +/sbin/start-stop-daemon -K -m --pidfile=/var/run/publicqueuehandler.pid /sbin/start-stop-daemon -K -m --pidfile=/var/run/xmppconfirmhandler.pid /sbin/start-stop-daemon -K -m --pidfile=/var/run/smsqueuehandler.pid /sbin/start-stop-daemon -K -m --pidfile=/var/run/ombqueuehandler.pid diff --git a/scripts/xmppdaemon.php b/scripts/xmppdaemon.php index b013a34c9d..153ab5149e 100755 --- a/scripts/xmppdaemon.php +++ b/scripts/xmppdaemon.php @@ -259,7 +259,7 @@ class XMPPDaemon { mb_internal_encoding('UTF-8'); -$resource = ($argc > 1) ? $argv[1] : NULL; +$resource = ($argc > 1) ? $argv[1] : (common_config('xmpp','resource') . '-listen'); $daemon = new XMPPDaemon($resource); diff --git a/scripts/xmppqueuehandler.php b/scripts/xmppqueuehandler.php index 8fe8b1360a..3f632d5a87 100755 --- a/scripts/xmppqueuehandler.php +++ b/scripts/xmppqueuehandler.php @@ -41,7 +41,7 @@ class XmppQueueHandler extends QueueHandler { function start() { # Low priority; we don't want to receive messages - $this->conn = jabber_connect($this->resource, NULL, -100); + $this->conn = jabber_connect($this->_id, NULL, -1); return !is_null($this->conn); } @@ -55,7 +55,7 @@ class XmppQueueHandler extends QueueHandler { mb_internal_encoding('UTF-8'); -$resource = ($argc > 1) ? $argv[1] : NULL; +$resource = ($argc > 1) ? $argv[1] : (common_config('xmpp','resource') . '-queuehandler'); $handler = new XmppQueueHandler($resource);