diff --git a/queuedaemon.php b/queuedaemon.php deleted file mode 100755 index 8da69b9da8..0000000000 --- a/queuedaemon.php +++ /dev/null @@ -1,127 +0,0 @@ -#!/usr/bin/env php -. - */ - -# Abort if called from a web server -if (isset($_SERVER) && array_key_exists('REQUEST_METHOD', $_SERVER)) { - print "This script must be run from the command line\n"; - exit(); -} - -define('INSTALLDIR', dirname(__FILE__)); -define('LACONICA', true); - -require_once(INSTALLDIR . '/lib/common.php'); - -# Notices should be broadcast in 30 minutes or less - -define('CLAIM_TIMEOUT', 30 * 60); - -function qd_log($priority, $msg) { - common_log($level, 'queuedaemon: '.$msg); -} - -function qd_top_item() { - - $qi = new Queue_item(); - $qi->orderBy('created'); - $qi->whereAdd('claimed is NULL'); - - $qi->limit(1); - - $cnt = $qi->find(TRUE); - - if ($cnt) { - # XXX: potential race condition - # can we force it to only update if claimed is still NULL - # (or old)? - qd_log(LOG_INFO, 'claiming queue item = ' . $qi->notice_id); - $orig = clone($qi); - $qi->claimed = DB_DataObject_Cast::dateTime(); - $result = $qi->update($orig); - if ($result) { - qd_log(LOG_INFO, 'claim succeeded.'); - return $qi; - } else { - qd_log(LOG_INFO, 'claim failed.'); - } - } - $qi = NULL; - return NULL; -} - -function qd_clear_old_claims() { - $qi = new Queue_item(); - $qi->orderBy('created'); - $qi->whereAdd('now() - claimed > '.CLAIM_TIMEOUT); - if ($qi->find()) { - while ($qi->fetch()) { - $orig = clone($qi); - $qi->claimed = NULL; - $qi->update($orig); - } - } -} - -function qd_is_remote($notice) { - $user = User::staticGet($notice->profile_id); - return !$user; -} - -$in_a_row = 0; - -do { - qd_log(LOG_INFO, 'checking for queued notices'); - $qi = qd_top_item(); - if ($qi) { - $in_a_row++; - qd_log(LOG_INFO, 'Got queue item #'.$in_a_row.' enqueued '.common_exact_date($qi->created)); - $notice = Notice::staticGet($qi->notice_id); - if ($notice) { - qd_log(LOG_INFO, 'broadcasting notice ID = ' . $notice->id); - # XXX: what to do if broadcast fails? - $result = common_real_broadcast($notice, qd_is_remote($notice)); - if (!$result) { - qd_log(LOG_WARNING, 'Failed broadcast for notice ID = ' . $notice->id); - $orig = $qi; - $qi->claimed = NULL; - $qi->update($orig); - qd_log(LOG_WARNING, 'Abandoned claim for notice ID = ' . $notice->id); - continue; - } - qd_log(LOG_INFO, 'finished broadcasting notice ID = ' . $notice->id); - $notice = NULL; - } else { - qd_log(LOG_WARNING, 'queue item for notice that does not exist'); - } - $qi->delete(); - $qi = NULL; - } else { - qd_clear_old_claims(); - # In busy times, sleep less - $sleeptime = 30000000/($in_a_row+1); - qd_log(LOG_INFO, 'sleeping ' . $sleeptime . ' microseconds'); - usleep($sleeptime); - $in_a_row = 0; - } - # Clear the DB_DataObject cache so we get fresh data - $GLOBALS['_DB_DATAOBJECT']['CACHE'] = array(); -} while (true); - -?> diff --git a/xmppdaemon.php b/xmppdaemon.php index a6e0465794..26c395303a 100755 --- a/xmppdaemon.php +++ b/xmppdaemon.php @@ -67,24 +67,32 @@ class XMPPDaemon { } function handle() { + + static $parts = array('message', 'presence', + 'end_stream', 'session_start'); + while(!$this->conn->disconnected) { - $payloads = $this->conn->processUntil(array('message', 'presence', - 'end_stream', 'session_start')); - foreach($payloads as $event) { - $pl = $event[1]; - $this->log(LOG_DEBUG, "Received '$event[0]': " . print_r($pl, TRUE)); - switch($event[0]) { - case 'message': - $this->handle_message($pl); - break; - case 'presence': - $this->handle_presence($pl); - break; - case 'session_start': - $this->handle_session($pl); - break; + + $payloads = $this->conn->processUntil($parts, 10); + + if ($payloads) { + foreach($payloads as $event) { + $pl = $event[1]; + switch($event[0]) { + case 'message': + $this->handle_message($pl); + break; + case 'presence': + $this->handle_presence($pl); + break; + case 'session_start': + $this->handle_session($pl); + break; + } } } + + $this->broadcast_queue(); } } @@ -116,7 +124,7 @@ class XMPPDaemon { $user = $this->get_user($from); if (!$user) { - $this->from_site($from, 'Unknown user; go to ' . + $this->from_site($from, 'Unknown user; go to ' . common_local_url('imsettings') . ' to add your address to your account'); $this->log(LOG_WARNING, 'Message from unknown user ' . $from); @@ -133,7 +141,7 @@ class XMPPDaemon { $text = '['.common_config('site', 'name') . '] ' . $msg; jabber_send_message($address, $text); } - + function handle_command($user, $body) { # XXX: localise switch(trim($body)) { @@ -201,32 +209,32 @@ class XMPPDaemon { function handle_presence(&$pl) { $from = jabber_normalize_jid($pl['from']); switch ($pl['type']) { - case 'subscribe': - # We let anyone subscribe - $this->subscribed($from); - $this->log(LOG_INFO, - 'Accepted subscription from ' . $from); - break; - case 'subscribed': - case 'unsubscribed': - case 'unsubscribe': - $this->log(LOG_INFO, - 'Ignoring "' . $pl['type'] . '" from ' . $from); - break; - default: - if (!$pl['type']) { - $user = User::staticGet('jabber', $from); - if (!$user) { - $this->log(LOG_WARNING, 'Message from unknown user ' . $from); - return; - } - if ($user->updatefrompresence) { - $this->log(LOG_INFO, 'Updating ' . $user->nickname . - ' status from presence.'); - $this->add_notice($user, $pl); - } + case 'subscribe': + # We let anyone subscribe + $this->subscribed($from); + $this->log(LOG_INFO, + 'Accepted subscription from ' . $from); + break; + case 'subscribed': + case 'unsubscribed': + case 'unsubscribe': + $this->log(LOG_INFO, + 'Ignoring "' . $pl['type'] . '" from ' . $from); + break; + default: + if (!$pl['type']) { + $user = User::staticGet('jabber', $from); + if (!$user) { + $this->log(LOG_WARNING, 'Message from unknown user ' . $from); + return; } - break; + if ($user->updatefrompresence) { + $this->log(LOG_INFO, 'Updating ' . $user->nickname . + ' status from presence.'); + $this->add_notice($user, $pl); + } + } + break; } } @@ -242,6 +250,77 @@ class XMPPDaemon { $this->log(LOG_INFO, 'Setting status to "' . $status . '"'); jabber_send_presence($status); } + + function top_queue_item() { + + $qi = new Queue_item(); + $qi->orderBy('created'); + $qi->whereAdd('claimed is NULL'); + + $qi->limit(1); + + $cnt = $qi->find(TRUE); + + if ($cnt) { + # XXX: potential race condition + # can we force it to only update if claimed is still NULL + # (or old)? + $this->log(LOG_INFO, 'claiming queue item = ' . $qi->notice_id); + $orig = clone($qi); + $qi->claimed = DB_DataObject_Cast::dateTime(); + $result = $qi->update($orig); + if ($result) { + $this->log(LOG_INFO, 'claim succeeded.'); + return $qi; + } else { + $this->log(LOG_INFO, 'claim failed.'); + } + } + $qi = NULL; + return NULL; + } + + function broadcast_queue() { + $this->clear_old_claims(); + $this->log(LOG_INFO, 'checking for queued notices'); + do { + $qi = $this->top_queue_item(); + if ($qi) { + $this->log(LOG_INFO, 'Got queue item #'.$in_a_row.' enqueued '.common_exact_date($qi->created)); + $notice = Notice::staticGet($qi->notice_id); + if ($notice) { + $this->log(LOG_INFO, 'broadcasting notice ID = ' . $notice->id); + # XXX: what to do if broadcast fails? + $result = common_real_broadcast($notice, $this->is_remote($notice)); + if (!$result) { + $this->log(LOG_WARNING, 'Failed broadcast for notice ID = ' . $notice->id); + $orig = $qi; + $qi->claimed = NULL; + $qi->update($orig); + $this->log(LOG_WARNING, 'Abandoned claim for notice ID = ' . $notice->id); + continue; + } + $this->log(LOG_INFO, 'finished broadcasting notice ID = ' . $notice->id); + $notice = NULL; + } else { + $this->log(LOG_WARNING, 'queue item for notice that does not exist'); + } + $qi->delete(); + } + } while ($qi); + } + + function clear_old_claims() { + $qi = new Queue_item(); + $qi->claimed = NULL; + $qi->whereAdd('now() - claimed > '.CLAIM_TIMEOUT); + $qi->update(DB_DATAOBJECT_WHEREADD_ONLY); + } + + function is_remote($notice) { + $user = User::staticGet($notice->profile_id); + return !$user; + } } $resource = ($argc > 1) ? $argv[1] : NULL;