move queuedaemon into xmppdaemon

darcs-hash:20080705172837-84dde-89fa8f7e844417f8157a2ecc9b24efad267258ff.gz
This commit is contained in:
Evan Prodromou 2008-07-05 13:28:37 -04:00
parent c414746e74
commit 9942b4416b
2 changed files with 121 additions and 169 deletions

View File

@ -1,127 +0,0 @@
#!/usr/bin/env php
<?php
/*
* Laconica - a distributed open-source microblogging tool
* Copyright (C) 2008, Controlez-Vous, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
# Abort if called from a web server
if (isset($_SERVER) && array_key_exists('REQUEST_METHOD', $_SERVER)) {
print "This script must be run from the command line\n";
exit();
}
define('INSTALLDIR', dirname(__FILE__));
define('LACONICA', true);
require_once(INSTALLDIR . '/lib/common.php');
# Notices should be broadcast in 30 minutes or less
define('CLAIM_TIMEOUT', 30 * 60);
function qd_log($priority, $msg) {
common_log($level, 'queuedaemon: '.$msg);
}
function qd_top_item() {
$qi = new Queue_item();
$qi->orderBy('created');
$qi->whereAdd('claimed is NULL');
$qi->limit(1);
$cnt = $qi->find(TRUE);
if ($cnt) {
# XXX: potential race condition
# can we force it to only update if claimed is still NULL
# (or old)?
qd_log(LOG_INFO, 'claiming queue item = ' . $qi->notice_id);
$orig = clone($qi);
$qi->claimed = DB_DataObject_Cast::dateTime();
$result = $qi->update($orig);
if ($result) {
qd_log(LOG_INFO, 'claim succeeded.');
return $qi;
} else {
qd_log(LOG_INFO, 'claim failed.');
}
}
$qi = NULL;
return NULL;
}
function qd_clear_old_claims() {
$qi = new Queue_item();
$qi->orderBy('created');
$qi->whereAdd('now() - claimed > '.CLAIM_TIMEOUT);
if ($qi->find()) {
while ($qi->fetch()) {
$orig = clone($qi);
$qi->claimed = NULL;
$qi->update($orig);
}
}
}
function qd_is_remote($notice) {
$user = User::staticGet($notice->profile_id);
return !$user;
}
$in_a_row = 0;
do {
qd_log(LOG_INFO, 'checking for queued notices');
$qi = qd_top_item();
if ($qi) {
$in_a_row++;
qd_log(LOG_INFO, 'Got queue item #'.$in_a_row.' enqueued '.common_exact_date($qi->created));
$notice = Notice::staticGet($qi->notice_id);
if ($notice) {
qd_log(LOG_INFO, 'broadcasting notice ID = ' . $notice->id);
# XXX: what to do if broadcast fails?
$result = common_real_broadcast($notice, qd_is_remote($notice));
if (!$result) {
qd_log(LOG_WARNING, 'Failed broadcast for notice ID = ' . $notice->id);
$orig = $qi;
$qi->claimed = NULL;
$qi->update($orig);
qd_log(LOG_WARNING, 'Abandoned claim for notice ID = ' . $notice->id);
continue;
}
qd_log(LOG_INFO, 'finished broadcasting notice ID = ' . $notice->id);
$notice = NULL;
} else {
qd_log(LOG_WARNING, 'queue item for notice that does not exist');
}
$qi->delete();
$qi = NULL;
} else {
qd_clear_old_claims();
# In busy times, sleep less
$sleeptime = 30000000/($in_a_row+1);
qd_log(LOG_INFO, 'sleeping ' . $sleeptime . ' microseconds');
usleep($sleeptime);
$in_a_row = 0;
}
# Clear the DB_DataObject cache so we get fresh data
$GLOBALS['_DB_DATAOBJECT']['CACHE'] = array();
} while (true);
?>

View File

@ -67,24 +67,32 @@ class XMPPDaemon {
} }
function handle() { function handle() {
static $parts = array('message', 'presence',
'end_stream', 'session_start');
while(!$this->conn->disconnected) { while(!$this->conn->disconnected) {
$payloads = $this->conn->processUntil(array('message', 'presence',
'end_stream', 'session_start')); $payloads = $this->conn->processUntil($parts, 10);
foreach($payloads as $event) {
$pl = $event[1]; if ($payloads) {
$this->log(LOG_DEBUG, "Received '$event[0]': " . print_r($pl, TRUE)); foreach($payloads as $event) {
switch($event[0]) { $pl = $event[1];
case 'message': switch($event[0]) {
$this->handle_message($pl); case 'message':
break; $this->handle_message($pl);
case 'presence': break;
$this->handle_presence($pl); case 'presence':
break; $this->handle_presence($pl);
case 'session_start': break;
$this->handle_session($pl); case 'session_start':
break; $this->handle_session($pl);
break;
}
} }
} }
$this->broadcast_queue();
} }
} }
@ -116,7 +124,7 @@ class XMPPDaemon {
$user = $this->get_user($from); $user = $this->get_user($from);
if (!$user) { if (!$user) {
$this->from_site($from, 'Unknown user; go to ' . $this->from_site($from, 'Unknown user; go to ' .
common_local_url('imsettings') . common_local_url('imsettings') .
' to add your address to your account'); ' to add your address to your account');
$this->log(LOG_WARNING, 'Message from unknown user ' . $from); $this->log(LOG_WARNING, 'Message from unknown user ' . $from);
@ -133,7 +141,7 @@ class XMPPDaemon {
$text = '['.common_config('site', 'name') . '] ' . $msg; $text = '['.common_config('site', 'name') . '] ' . $msg;
jabber_send_message($address, $text); jabber_send_message($address, $text);
} }
function handle_command($user, $body) { function handle_command($user, $body) {
# XXX: localise # XXX: localise
switch(trim($body)) { switch(trim($body)) {
@ -201,32 +209,32 @@ class XMPPDaemon {
function handle_presence(&$pl) { function handle_presence(&$pl) {
$from = jabber_normalize_jid($pl['from']); $from = jabber_normalize_jid($pl['from']);
switch ($pl['type']) { switch ($pl['type']) {
case 'subscribe': case 'subscribe':
# We let anyone subscribe # We let anyone subscribe
$this->subscribed($from); $this->subscribed($from);
$this->log(LOG_INFO, $this->log(LOG_INFO,
'Accepted subscription from ' . $from); 'Accepted subscription from ' . $from);
break; break;
case 'subscribed': case 'subscribed':
case 'unsubscribed': case 'unsubscribed':
case 'unsubscribe': case 'unsubscribe':
$this->log(LOG_INFO, $this->log(LOG_INFO,
'Ignoring "' . $pl['type'] . '" from ' . $from); 'Ignoring "' . $pl['type'] . '" from ' . $from);
break; break;
default: default:
if (!$pl['type']) { if (!$pl['type']) {
$user = User::staticGet('jabber', $from); $user = User::staticGet('jabber', $from);
if (!$user) { if (!$user) {
$this->log(LOG_WARNING, 'Message from unknown user ' . $from); $this->log(LOG_WARNING, 'Message from unknown user ' . $from);
return; return;
}
if ($user->updatefrompresence) {
$this->log(LOG_INFO, 'Updating ' . $user->nickname .
' status from presence.');
$this->add_notice($user, $pl);
}
} }
break; if ($user->updatefrompresence) {
$this->log(LOG_INFO, 'Updating ' . $user->nickname .
' status from presence.');
$this->add_notice($user, $pl);
}
}
break;
} }
} }
@ -242,6 +250,77 @@ class XMPPDaemon {
$this->log(LOG_INFO, 'Setting status to "' . $status . '"'); $this->log(LOG_INFO, 'Setting status to "' . $status . '"');
jabber_send_presence($status); jabber_send_presence($status);
} }
function top_queue_item() {
$qi = new Queue_item();
$qi->orderBy('created');
$qi->whereAdd('claimed is NULL');
$qi->limit(1);
$cnt = $qi->find(TRUE);
if ($cnt) {
# XXX: potential race condition
# can we force it to only update if claimed is still NULL
# (or old)?
$this->log(LOG_INFO, 'claiming queue item = ' . $qi->notice_id);
$orig = clone($qi);
$qi->claimed = DB_DataObject_Cast::dateTime();
$result = $qi->update($orig);
if ($result) {
$this->log(LOG_INFO, 'claim succeeded.');
return $qi;
} else {
$this->log(LOG_INFO, 'claim failed.');
}
}
$qi = NULL;
return NULL;
}
function broadcast_queue() {
$this->clear_old_claims();
$this->log(LOG_INFO, 'checking for queued notices');
do {
$qi = $this->top_queue_item();
if ($qi) {
$this->log(LOG_INFO, 'Got queue item #'.$in_a_row.' enqueued '.common_exact_date($qi->created));
$notice = Notice::staticGet($qi->notice_id);
if ($notice) {
$this->log(LOG_INFO, 'broadcasting notice ID = ' . $notice->id);
# XXX: what to do if broadcast fails?
$result = common_real_broadcast($notice, $this->is_remote($notice));
if (!$result) {
$this->log(LOG_WARNING, 'Failed broadcast for notice ID = ' . $notice->id);
$orig = $qi;
$qi->claimed = NULL;
$qi->update($orig);
$this->log(LOG_WARNING, 'Abandoned claim for notice ID = ' . $notice->id);
continue;
}
$this->log(LOG_INFO, 'finished broadcasting notice ID = ' . $notice->id);
$notice = NULL;
} else {
$this->log(LOG_WARNING, 'queue item for notice that does not exist');
}
$qi->delete();
}
} while ($qi);
}
function clear_old_claims() {
$qi = new Queue_item();
$qi->claimed = NULL;
$qi->whereAdd('now() - claimed > '.CLAIM_TIMEOUT);
$qi->update(DB_DATAOBJECT_WHEREADD_ONLY);
}
function is_remote($notice) {
$user = User::staticGet($notice->profile_id);
return !$user;
}
} }
$resource = ($argc > 1) ? $argv[1] : NULL; $resource = ($argc > 1) ? $argv[1] : NULL;