Merge branch 'testing' into 0.9.x
This commit is contained in:
commit
c8bc598cfd
130
actions/doc.php
130
actions/doc.php
@ -45,11 +45,23 @@ if (!defined('STATUSNET') && !defined('LACONICA')) {
|
|||||||
*/
|
*/
|
||||||
class DocAction extends Action
|
class DocAction extends Action
|
||||||
{
|
{
|
||||||
var $filename;
|
var $output = null;
|
||||||
var $title;
|
var $filename = null;
|
||||||
|
var $title = null;
|
||||||
|
|
||||||
|
function prepare($args)
|
||||||
|
{
|
||||||
|
parent::prepare($args);
|
||||||
|
|
||||||
|
$this->title = $this->trimmed('title');
|
||||||
|
$this->output = null;
|
||||||
|
|
||||||
|
$this->loadDoc();
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Class handler.
|
* Handle a request
|
||||||
*
|
*
|
||||||
* @param array $args array of arguments
|
* @param array $args array of arguments
|
||||||
*
|
*
|
||||||
@ -58,33 +70,30 @@ class DocAction extends Action
|
|||||||
function handle($args)
|
function handle($args)
|
||||||
{
|
{
|
||||||
parent::handle($args);
|
parent::handle($args);
|
||||||
|
|
||||||
$this->title = $this->trimmed('title');
|
|
||||||
$this->output = null;
|
|
||||||
|
|
||||||
if (Event::handle('StartLoadDoc', array(&$this->title, &$this->output))) {
|
|
||||||
|
|
||||||
$this->filename = INSTALLDIR.'/doc-src/'.$this->title;
|
|
||||||
if (!file_exists($this->filename)) {
|
|
||||||
$this->clientError(_('No such document.'));
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
$c = file_get_contents($this->filename);
|
|
||||||
$this->output = common_markup_to_html($c);
|
|
||||||
|
|
||||||
Event::handle('EndLoadDoc', array($this->title, &$this->output));
|
|
||||||
}
|
|
||||||
|
|
||||||
$this->showPage();
|
$this->showPage();
|
||||||
}
|
}
|
||||||
|
|
||||||
// overrrided to add entry-title class
|
/**
|
||||||
function showPageTitle() {
|
* Page title
|
||||||
|
*
|
||||||
|
* Gives the page title of the document. Override default for hAtom entry.
|
||||||
|
*
|
||||||
|
* @return void
|
||||||
|
*/
|
||||||
|
|
||||||
|
function showPageTitle()
|
||||||
|
{
|
||||||
$this->element('h1', array('class' => 'entry-title'), $this->title());
|
$this->element('h1', array('class' => 'entry-title'), $this->title());
|
||||||
}
|
}
|
||||||
|
|
||||||
// overrided to add hentry, and content-inner classes
|
/**
|
||||||
|
* Block for content.
|
||||||
|
*
|
||||||
|
* Overrides default from Action to wrap everything in an hAtom entry.
|
||||||
|
*
|
||||||
|
* @return void.
|
||||||
|
*/
|
||||||
|
|
||||||
function showContentBlock()
|
function showContentBlock()
|
||||||
{
|
{
|
||||||
$this->elementStart('div', array('id' => 'content', 'class' => 'hentry'));
|
$this->elementStart('div', array('id' => 'content', 'class' => 'hentry'));
|
||||||
@ -101,8 +110,11 @@ class DocAction extends Action
|
|||||||
/**
|
/**
|
||||||
* Display content.
|
* Display content.
|
||||||
*
|
*
|
||||||
* @return nothing
|
* Shows the content of the document.
|
||||||
|
*
|
||||||
|
* @return void
|
||||||
*/
|
*/
|
||||||
|
|
||||||
function showContent()
|
function showContent()
|
||||||
{
|
{
|
||||||
$this->raw($this->output);
|
$this->raw($this->output);
|
||||||
@ -111,6 +123,8 @@ class DocAction extends Action
|
|||||||
/**
|
/**
|
||||||
* Page title.
|
* Page title.
|
||||||
*
|
*
|
||||||
|
* Uses the title of the document.
|
||||||
|
*
|
||||||
* @return page title
|
* @return page title
|
||||||
*/
|
*/
|
||||||
function title()
|
function title()
|
||||||
@ -118,8 +132,74 @@ class DocAction extends Action
|
|||||||
return ucfirst($this->title);
|
return ucfirst($this->title);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* These pages are read-only.
|
||||||
|
*
|
||||||
|
* @param array $args unused.
|
||||||
|
*
|
||||||
|
* @return boolean read-only flag (false)
|
||||||
|
*/
|
||||||
|
|
||||||
function isReadOnly($args)
|
function isReadOnly($args)
|
||||||
{
|
{
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function loadDoc()
|
||||||
|
{
|
||||||
|
if (Event::handle('StartLoadDoc', array(&$this->title, &$this->output))) {
|
||||||
|
|
||||||
|
$this->filename = $this->getFilename();
|
||||||
|
|
||||||
|
if (empty($this->filename)) {
|
||||||
|
throw new ClientException(sprintf(_('No such document "%s"'), $this->title), 404);
|
||||||
|
}
|
||||||
|
|
||||||
|
$c = file_get_contents($this->filename);
|
||||||
|
|
||||||
|
$this->output = common_markup_to_html($c);
|
||||||
|
|
||||||
|
Event::handle('EndLoadDoc', array($this->title, &$this->output));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function getFilename()
|
||||||
|
{
|
||||||
|
if (file_exists(INSTALLDIR.'/local/doc-src/'.$this->title)) {
|
||||||
|
$localDef = INSTALLDIR.'/local/doc-src/'.$this->title;
|
||||||
|
}
|
||||||
|
|
||||||
|
$local = glob(INSTALLDIR.'/local/doc-src/'.$this->title.'.*');
|
||||||
|
|
||||||
|
if (count($local) || isset($localDef)) {
|
||||||
|
return $this->negotiateLanguage($local, $localDef);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (file_exists(INSTALLDIR.'/doc-src/'.$this->title)) {
|
||||||
|
$distDef = INSTALLDIR.'/doc-src/'.$this->title;
|
||||||
|
}
|
||||||
|
|
||||||
|
$dist = glob(INSTALLDIR.'/doc-src/'.$this->title.'.*');
|
||||||
|
|
||||||
|
if (count($dist) || isset($distDef)) {
|
||||||
|
return $this->negotiateLanguage($dist, $distDef);
|
||||||
|
}
|
||||||
|
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
function negotiateLanguage($filenames, $defaultFilename=null)
|
||||||
|
{
|
||||||
|
// XXX: do this better
|
||||||
|
|
||||||
|
$langcode = common_language();
|
||||||
|
|
||||||
|
foreach ($filenames as $filename) {
|
||||||
|
if (preg_match('/\.'.$langcode.'$/', $filename)) {
|
||||||
|
return $filename;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return $defaultFilename;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -309,6 +309,8 @@ class ImsettingsAction extends ConnectSettingsAction
|
|||||||
$confirm->address_type = 'jabber';
|
$confirm->address_type = 'jabber';
|
||||||
$confirm->user_id = $user->id;
|
$confirm->user_id = $user->id;
|
||||||
$confirm->code = common_confirmation_code(64);
|
$confirm->code = common_confirmation_code(64);
|
||||||
|
$confirm->sent = common_sql_now();
|
||||||
|
$confirm->claimed = common_sql_now();
|
||||||
|
|
||||||
$result = $confirm->insert();
|
$result = $confirm->insert();
|
||||||
|
|
||||||
@ -318,11 +320,9 @@ class ImsettingsAction extends ConnectSettingsAction
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!common_config('queue', 'enabled')) {
|
|
||||||
jabber_confirm_address($confirm->code,
|
jabber_confirm_address($confirm->code,
|
||||||
$user->nickname,
|
$user->nickname,
|
||||||
$jabber);
|
$jabber);
|
||||||
}
|
|
||||||
|
|
||||||
$msg = sprintf(_('A confirmation code was sent '.
|
$msg = sprintf(_('A confirmation code was sent '.
|
||||||
'to the IM address you added. '.
|
'to the IM address you added. '.
|
||||||
|
@ -313,6 +313,39 @@ class Memcached_DataObject extends DB_DataObject
|
|||||||
return new ArrayWrapper($cached);
|
return new ArrayWrapper($cached);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* sends query to database - this is the private one that must work
|
||||||
|
* - internal functions use this rather than $this->query()
|
||||||
|
*
|
||||||
|
* Overridden to do logging.
|
||||||
|
*
|
||||||
|
* @param string $string
|
||||||
|
* @access private
|
||||||
|
* @return mixed none or PEAR_Error
|
||||||
|
*/
|
||||||
|
function _query($string)
|
||||||
|
{
|
||||||
|
$start = microtime(true);
|
||||||
|
$result = parent::_query($string);
|
||||||
|
$delta = microtime(true) - $start;
|
||||||
|
|
||||||
|
$limit = common_config('db', 'log_slow_queries');
|
||||||
|
if (($limit > 0 && $delta >= $limit) || common_config('db', 'log_queries')) {
|
||||||
|
$clean = $this->sanitizeQuery($string);
|
||||||
|
common_log(LOG_DEBUG, sprintf("DB query (%0.3fs): %s", $delta, $clean));
|
||||||
|
}
|
||||||
|
return $result;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sanitize a query for logging
|
||||||
|
// @fixme don't trim spaces in string literals
|
||||||
|
function sanitizeQuery($string)
|
||||||
|
{
|
||||||
|
$string = preg_replace('/\s+/', ' ', $string);
|
||||||
|
$string = trim($string);
|
||||||
|
return $string;
|
||||||
|
}
|
||||||
|
|
||||||
// We overload so that 'SET NAMES "utf8"' is called for
|
// We overload so that 'SET NAMES "utf8"' is called for
|
||||||
// each connection
|
// each connection
|
||||||
|
|
||||||
|
@ -10,8 +10,8 @@ class Queue_item extends Memcached_DataObject
|
|||||||
/* the code below is auto generated do not remove the above tag */
|
/* the code below is auto generated do not remove the above tag */
|
||||||
|
|
||||||
public $__table = 'queue_item'; // table name
|
public $__table = 'queue_item'; // table name
|
||||||
public $notice_id; // int(4) primary_key not_null
|
public $id; // int(4) primary_key not_null
|
||||||
public $transport; // varchar(8) primary_key not_null
|
public $frame; // blob not_null
|
||||||
public $created; // datetime() not_null
|
public $created; // datetime() not_null
|
||||||
public $claimed; // datetime()
|
public $claimed; // datetime()
|
||||||
|
|
||||||
@ -22,14 +22,21 @@ class Queue_item extends Memcached_DataObject
|
|||||||
/* the code above is auto generated do not remove the tag below */
|
/* the code above is auto generated do not remove the tag below */
|
||||||
###END_AUTOCODE
|
###END_AUTOCODE
|
||||||
|
|
||||||
function sequenceKey()
|
/**
|
||||||
{ return array(false, false); }
|
* @param mixed $transports name of a single queue or array of queues to pull from
|
||||||
|
* If not specified, checks all queues in the system.
|
||||||
static function top($transport=null) {
|
*/
|
||||||
|
static function top($transports=null) {
|
||||||
|
|
||||||
$qi = new Queue_item();
|
$qi = new Queue_item();
|
||||||
if ($transport) {
|
if ($transports) {
|
||||||
$qi->transport = $transport;
|
if (is_array($transports)) {
|
||||||
|
// @fixme use safer escaping
|
||||||
|
$list = implode("','", array_map('addslashes', $transports));
|
||||||
|
$qi->whereAdd("transport in ('$list')");
|
||||||
|
} else {
|
||||||
|
$qi->transport = $transports;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
$qi->orderBy('created');
|
$qi->orderBy('created');
|
||||||
$qi->whereAdd('claimed is null');
|
$qi->whereAdd('claimed is null');
|
||||||
@ -42,7 +49,7 @@ class Queue_item extends Memcached_DataObject
|
|||||||
# XXX: potential race condition
|
# XXX: potential race condition
|
||||||
# can we force it to only update if claimed is still null
|
# can we force it to only update if claimed is still null
|
||||||
# (or old)?
|
# (or old)?
|
||||||
common_log(LOG_INFO, 'claiming queue item = ' . $qi->notice_id .
|
common_log(LOG_INFO, 'claiming queue item id = ' . $qi->id .
|
||||||
' for transport ' . $qi->transport);
|
' for transport ' . $qi->transport);
|
||||||
$orig = clone($qi);
|
$orig = clone($qi);
|
||||||
$qi->claimed = common_sql_now();
|
$qi->claimed = common_sql_now();
|
||||||
@ -57,9 +64,4 @@ class Queue_item extends Memcached_DataObject
|
|||||||
$qi = null;
|
$qi = null;
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
function pkeyGet($kv)
|
|
||||||
{
|
|
||||||
return Memcached_DataObject::pkeyGet('Queue_item', $kv);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -428,14 +428,14 @@ tagged = K
|
|||||||
tag = K
|
tag = K
|
||||||
|
|
||||||
[queue_item]
|
[queue_item]
|
||||||
notice_id = 129
|
id = 129
|
||||||
|
frame = 66
|
||||||
transport = 130
|
transport = 130
|
||||||
created = 142
|
created = 142
|
||||||
claimed = 14
|
claimed = 14
|
||||||
|
|
||||||
[queue_item__keys]
|
[queue_item__keys]
|
||||||
notice_id = K
|
id = K
|
||||||
transport = K
|
|
||||||
|
|
||||||
[related_group]
|
[related_group]
|
||||||
group_id = 129
|
group_id = 129
|
||||||
|
@ -94,3 +94,19 @@ create table user_location_prefs (
|
|||||||
constraint primary key (user_id)
|
constraint primary key (user_id)
|
||||||
) ENGINE=InnoDB CHARACTER SET utf8 COLLATE utf8_bin;
|
) ENGINE=InnoDB CHARACTER SET utf8 COLLATE utf8_bin;
|
||||||
|
|
||||||
|
create table queue_item_new (
|
||||||
|
id integer auto_increment primary key comment 'unique identifier',
|
||||||
|
frame blob not null comment 'data: object reference or opaque string',
|
||||||
|
transport varchar(8) not null comment 'queue for what? "email", "jabber", "sms", "irc", ...',
|
||||||
|
created datetime not null comment 'date this record was created',
|
||||||
|
claimed datetime comment 'date this item was claimed',
|
||||||
|
|
||||||
|
index queue_item_created_idx (created)
|
||||||
|
|
||||||
|
) ENGINE=InnoDB CHARACTER SET utf8 COLLATE utf8_bin;
|
||||||
|
|
||||||
|
insert into queue_item_new (frame,transport,created,claimed)
|
||||||
|
select notice_id,transport,created,claimed from queue_item;
|
||||||
|
alter table queue_item rename to queue_item_old;
|
||||||
|
alter table queue_item_new rename to queue_item;
|
||||||
|
|
||||||
|
16
db/rc3to09.sql
Normal file
16
db/rc3to09.sql
Normal file
@ -0,0 +1,16 @@
|
|||||||
|
create table queue_item_new (
|
||||||
|
id integer auto_increment primary key comment 'unique identifier',
|
||||||
|
frame blob not null comment 'data: object reference or opaque string',
|
||||||
|
transport varchar(8) not null comment 'queue for what? "email", "jabber", "sms", "irc", ...',
|
||||||
|
created datetime not null comment 'date this record was created',
|
||||||
|
claimed datetime comment 'date this item was claimed',
|
||||||
|
|
||||||
|
index queue_item_created_idx (created)
|
||||||
|
|
||||||
|
) ENGINE=InnoDB CHARACTER SET utf8 COLLATE utf8_bin;
|
||||||
|
|
||||||
|
insert into queue_item_new (frame,transport,created,claimed)
|
||||||
|
select notice_id,transport,created,claimed from queue_item;
|
||||||
|
alter table queue_item rename to queue_item_old;
|
||||||
|
alter table queue_item_new rename to queue_item;
|
||||||
|
|
@ -274,13 +274,12 @@ create table remember_me (
|
|||||||
) ENGINE=InnoDB CHARACTER SET utf8 COLLATE utf8_bin;
|
) ENGINE=InnoDB CHARACTER SET utf8 COLLATE utf8_bin;
|
||||||
|
|
||||||
create table queue_item (
|
create table queue_item (
|
||||||
|
id integer auto_increment primary key comment 'unique identifier',
|
||||||
notice_id integer not null comment 'notice queued' references notice (id),
|
frame blob not null comment 'data: object reference or opaque string',
|
||||||
transport varchar(8) not null comment 'queue for what? "email", "jabber", "sms", "irc", ...',
|
transport varchar(8) not null comment 'queue for what? "email", "jabber", "sms", "irc", ...',
|
||||||
created datetime not null comment 'date this record was created',
|
created datetime not null comment 'date this record was created',
|
||||||
claimed datetime comment 'date this item was claimed',
|
claimed datetime comment 'date this item was claimed',
|
||||||
|
|
||||||
constraint primary key (notice_id, transport),
|
|
||||||
index queue_item_created_idx (created)
|
index queue_item_created_idx (created)
|
||||||
|
|
||||||
) ENGINE=InnoDB CHARACTER SET utf8 COLLATE utf8_bin;
|
) ENGINE=InnoDB CHARACTER SET utf8 COLLATE utf8_bin;
|
||||||
|
@ -205,9 +205,11 @@ var SN = { // StatusNet
|
|||||||
cookieValue = JSON.parse(cookieValue);
|
cookieValue = JSON.parse(cookieValue);
|
||||||
NLat = $('#'+SN.C.S.NoticeLat).val(cookieValue.NLat).val();
|
NLat = $('#'+SN.C.S.NoticeLat).val(cookieValue.NLat).val();
|
||||||
NLon = $('#'+SN.C.S.NoticeLon).val(cookieValue.NLon).val();
|
NLon = $('#'+SN.C.S.NoticeLon).val(cookieValue.NLon).val();
|
||||||
|
if ($('#'+SN.C.S.NoticeLocationNs).val(cookieValue.NLNS)) {
|
||||||
NLNS = $('#'+SN.C.S.NoticeLocationNs).val(cookieValue.NLNS).val();
|
NLNS = $('#'+SN.C.S.NoticeLocationNs).val(cookieValue.NLNS).val();
|
||||||
NLID = $('#'+SN.C.S.NoticeLocationId).val(cookieValue.NLID).val();
|
NLID = $('#'+SN.C.S.NoticeLocationId).val(cookieValue.NLID).val();
|
||||||
}
|
}
|
||||||
|
}
|
||||||
if (cookieValue == 'disabled') {
|
if (cookieValue == 'disabled') {
|
||||||
NDG = $('#'+SN.C.S.NoticeDataGeo).attr('checked', false).attr('checked');
|
NDG = $('#'+SN.C.S.NoticeDataGeo).attr('checked', false).attr('checked');
|
||||||
}
|
}
|
||||||
@ -301,8 +303,10 @@ var SN = { // StatusNet
|
|||||||
|
|
||||||
$('#'+SN.C.S.NoticeLat).val(NLat);
|
$('#'+SN.C.S.NoticeLat).val(NLat);
|
||||||
$('#'+SN.C.S.NoticeLon).val(NLon);
|
$('#'+SN.C.S.NoticeLon).val(NLon);
|
||||||
|
if ($('#'+SN.C.S.NoticeLocationNs)) {
|
||||||
$('#'+SN.C.S.NoticeLocationNs).val(NLNS);
|
$('#'+SN.C.S.NoticeLocationNs).val(NLNS);
|
||||||
$('#'+SN.C.S.NoticeLocationId).val(NLID);
|
$('#'+SN.C.S.NoticeLocationId).val(NLID);
|
||||||
|
}
|
||||||
$('#'+SN.C.S.NoticeDataGeo).attr('checked', NDG);
|
$('#'+SN.C.S.NoticeDataGeo).attr('checked', NDG);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
@ -31,19 +31,17 @@
|
|||||||
class DBQueueManager extends QueueManager
|
class DBQueueManager extends QueueManager
|
||||||
{
|
{
|
||||||
/**
|
/**
|
||||||
* Saves a notice object reference into the queue item table.
|
* Saves an object reference into the queue item table.
|
||||||
* @return boolean true on success
|
* @return boolean true on success
|
||||||
* @throws ServerException on failure
|
* @throws ServerException on failure
|
||||||
*/
|
*/
|
||||||
public function enqueue($object, $queue)
|
public function enqueue($object, $queue)
|
||||||
{
|
{
|
||||||
$notice = $object;
|
|
||||||
|
|
||||||
$qi = new Queue_item();
|
$qi = new Queue_item();
|
||||||
|
|
||||||
$qi->notice_id = $notice->id;
|
$qi->frame = $this->encode($object);
|
||||||
$qi->transport = $queue;
|
$qi->transport = $queue;
|
||||||
$qi->created = $notice->created;
|
$qi->created = common_sql_now();
|
||||||
$result = $qi->insert();
|
$result = $qi->insert();
|
||||||
|
|
||||||
if (!$result) {
|
if (!$result) {
|
||||||
@ -57,146 +55,92 @@ class DBQueueManager extends QueueManager
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Poll every minute for new events during idle periods.
|
* Poll every 10 seconds for new events during idle periods.
|
||||||
* We'll look in more often when there's data available.
|
* We'll look in more often when there's data available.
|
||||||
*
|
*
|
||||||
* @return int seconds
|
* @return int seconds
|
||||||
*/
|
*/
|
||||||
public function pollInterval()
|
public function pollInterval()
|
||||||
{
|
{
|
||||||
return 60;
|
return 10;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Run a polling cycle during idle processing in the input loop.
|
* Run a polling cycle during idle processing in the input loop.
|
||||||
* @return boolean true if we had a hit
|
* @return boolean true if we should poll again for more data immediately
|
||||||
*/
|
*/
|
||||||
public function poll()
|
public function poll()
|
||||||
{
|
{
|
||||||
$this->_log(LOG_DEBUG, 'Checking for notices...');
|
$this->_log(LOG_DEBUG, 'Checking for notices...');
|
||||||
$item = $this->_nextItem();
|
$qi = Queue_item::top($this->getQueues());
|
||||||
if ($item === false) {
|
|
||||||
$this->_log(LOG_DEBUG, 'No notices waiting; idling.');
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
if ($item === true) {
|
|
||||||
// We dequeued an entry for a deleted or invalid notice.
|
|
||||||
// Consider it a hit for poll rate purposes.
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
list($queue, $notice) = $item;
|
|
||||||
$this->_log(LOG_INFO, 'Got notice '. $notice->id . ' for transport ' . $queue);
|
|
||||||
|
|
||||||
// Yay! Got one!
|
|
||||||
$handler = $this->getHandler($queue);
|
|
||||||
if ($handler) {
|
|
||||||
if ($handler->handle_notice($notice)) {
|
|
||||||
$this->_log(LOG_INFO, "[$queue:notice $notice->id] Successfully handled notice");
|
|
||||||
$this->_done($notice, $queue);
|
|
||||||
} else {
|
|
||||||
$this->_log(LOG_INFO, "[$queue:notice $notice->id] Failed to handle notice");
|
|
||||||
$this->_fail($notice, $queue);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
$this->_log(LOG_INFO, "[$queue:notice $notice->id] No handler for queue $queue; discarding.");
|
|
||||||
$this->_done($notice, $queue);
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Pop the oldest unclaimed item off the queue set and claim it.
|
|
||||||
*
|
|
||||||
* @return mixed false if no items; true if bogus hit; otherwise array(string, Notice)
|
|
||||||
* giving the queue transport name.
|
|
||||||
*/
|
|
||||||
protected function _nextItem()
|
|
||||||
{
|
|
||||||
$start = time();
|
|
||||||
$result = null;
|
|
||||||
|
|
||||||
$qi = Queue_item::top();
|
|
||||||
if (empty($qi)) {
|
if (empty($qi)) {
|
||||||
|
$this->_log(LOG_DEBUG, 'No notices waiting; idling.');
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
$queue = $qi->transport;
|
$queue = $qi->transport;
|
||||||
$notice = Notice::staticGet('id', $qi->notice_id);
|
$item = $this->decode($qi->frame);
|
||||||
if (empty($notice)) {
|
|
||||||
$this->_log(LOG_INFO, "[$queue:notice $notice->id] dequeued non-existent notice");
|
|
||||||
$qi->delete();
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
$result = $notice;
|
if ($item) {
|
||||||
return array($queue, $notice);
|
$rep = $this->logrep($item);
|
||||||
|
$this->_log(LOG_INFO, "Got $rep for transport $queue");
|
||||||
|
|
||||||
|
$handler = $this->getHandler($queue);
|
||||||
|
if ($handler) {
|
||||||
|
if ($handler->handle($item)) {
|
||||||
|
$this->_log(LOG_INFO, "[$queue:$rep] Successfully handled item");
|
||||||
|
$this->_done($qi);
|
||||||
|
} else {
|
||||||
|
$this->_log(LOG_INFO, "[$queue:$rep] Failed to handle item");
|
||||||
|
$this->_fail($qi);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
$this->_log(LOG_INFO, "[$queue:$rep] No handler for queue $queue; discarding.");
|
||||||
|
$this->_done($qi);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
$this->_log(LOG_INFO, "[$queue] Got empty/deleted item, discarding");
|
||||||
|
$this->_fail($qi);
|
||||||
|
}
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Delete our claimed item from the queue after successful processing.
|
* Delete our claimed item from the queue after successful processing.
|
||||||
*
|
*
|
||||||
* @param Notice $object
|
* @param QueueItem $qi
|
||||||
* @param string $queue
|
|
||||||
*/
|
*/
|
||||||
protected function _done($object, $queue)
|
protected function _done($qi)
|
||||||
{
|
{
|
||||||
// XXX: right now, we only handle notices
|
$queue = $qi->transport;
|
||||||
|
|
||||||
$notice = $object;
|
|
||||||
|
|
||||||
$qi = Queue_item::pkeyGet(array('notice_id' => $notice->id,
|
|
||||||
'transport' => $queue));
|
|
||||||
|
|
||||||
if (empty($qi)) {
|
|
||||||
$this->_log(LOG_INFO, "[$queue:notice $notice->id] Cannot find queue item");
|
|
||||||
} else {
|
|
||||||
if (empty($qi->claimed)) {
|
if (empty($qi->claimed)) {
|
||||||
$this->_log(LOG_WARNING, "[$queue:notice $notice->id] Reluctantly releasing unclaimed queue item");
|
$this->_log(LOG_WARNING, "Reluctantly releasing unclaimed queue item $qi->id from $qi->queue");
|
||||||
}
|
}
|
||||||
$qi->delete();
|
$qi->delete();
|
||||||
$qi->free();
|
|
||||||
}
|
|
||||||
|
|
||||||
$this->_log(LOG_INFO, "[$queue:notice $notice->id] done with item");
|
|
||||||
$this->stats('handled', $queue);
|
$this->stats('handled', $queue);
|
||||||
|
|
||||||
$notice->free();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Free our claimed queue item for later reprocessing in case of
|
* Free our claimed queue item for later reprocessing in case of
|
||||||
* temporary failure.
|
* temporary failure.
|
||||||
*
|
*
|
||||||
* @param Notice $object
|
* @param QueueItem $qi
|
||||||
* @param string $queue
|
|
||||||
*/
|
*/
|
||||||
protected function _fail($object, $queue)
|
protected function _fail($qi)
|
||||||
{
|
{
|
||||||
// XXX: right now, we only handle notices
|
$queue = $qi->transport;
|
||||||
|
|
||||||
$notice = $object;
|
|
||||||
|
|
||||||
$qi = Queue_item::pkeyGet(array('notice_id' => $notice->id,
|
|
||||||
'transport' => $queue));
|
|
||||||
|
|
||||||
if (empty($qi)) {
|
|
||||||
$this->_log(LOG_INFO, "[$queue:notice $notice->id] Cannot find queue item");
|
|
||||||
} else {
|
|
||||||
if (empty($qi->claimed)) {
|
if (empty($qi->claimed)) {
|
||||||
$this->_log(LOG_WARNING, "[$queue:notice $notice->id] Ignoring failure for unclaimed queue item");
|
$this->_log(LOG_WARNING, "[$queue:item $qi->id] Ignoring failure for unclaimed queue item");
|
||||||
} else {
|
} else {
|
||||||
$orig = clone($qi);
|
$orig = clone($qi);
|
||||||
$qi->claimed = null;
|
$qi->claimed = null;
|
||||||
$qi->update($orig);
|
$qi->update($orig);
|
||||||
$qi = null;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
$this->_log(LOG_INFO, "[$queue:notice $notice->id] done with queue item");
|
|
||||||
$this->stats('error', $queue);
|
$this->stats('error', $queue);
|
||||||
|
|
||||||
$notice->free();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
protected function _log($level, $msg)
|
protected function _log($level, $msg)
|
||||||
|
@ -67,7 +67,9 @@ $default =
|
|||||||
'db_driver' => 'DB', # XXX: JanRain libs only work with DB
|
'db_driver' => 'DB', # XXX: JanRain libs only work with DB
|
||||||
'quote_identifiers' => false,
|
'quote_identifiers' => false,
|
||||||
'type' => 'mysql',
|
'type' => 'mysql',
|
||||||
'schemacheck' => 'runtime'), // 'runtime' or 'script'
|
'schemacheck' => 'runtime', // 'runtime' or 'script'
|
||||||
|
'log_queries' => false, // true to log all DB queries
|
||||||
|
'log_slow_queries' => 0), // if set, log queries taking over N seconds
|
||||||
'syslog' =>
|
'syslog' =>
|
||||||
array('appname' => 'statusnet', # for syslog
|
array('appname' => 'statusnet', # for syslog
|
||||||
'priority' => 'debug', # XXX: currently ignored
|
'priority' => 'debug', # XXX: currently ignored
|
||||||
@ -81,6 +83,7 @@ $default =
|
|||||||
'stomp_password' => null,
|
'stomp_password' => null,
|
||||||
'monitor' => null, // URL to monitor ping endpoint (work in progress)
|
'monitor' => null, // URL to monitor ping endpoint (work in progress)
|
||||||
'softlimit' => '90%', // total size or % of memory_limit at which to restart queue threads gracefully
|
'softlimit' => '90%', // total size or % of memory_limit at which to restart queue threads gracefully
|
||||||
|
'debug_memory' => false, // true to spit memory usage to log
|
||||||
),
|
),
|
||||||
'license' =>
|
'license' =>
|
||||||
array('type' => 'cc', # can be 'cc', 'allrightsreserved', 'private'
|
array('type' => 'cc', # can be 'cc', 'allrightsreserved', 'private'
|
||||||
|
@ -27,7 +27,7 @@
|
|||||||
* @link http://status.net/
|
* @link http://status.net/
|
||||||
*/
|
*/
|
||||||
|
|
||||||
class IoMaster
|
abstract class IoMaster
|
||||||
{
|
{
|
||||||
public $id;
|
public $id;
|
||||||
|
|
||||||
@ -66,22 +66,17 @@ class IoMaster
|
|||||||
if ($site != common_config('site', 'server')) {
|
if ($site != common_config('site', 'server')) {
|
||||||
StatusNet::init($site);
|
StatusNet::init($site);
|
||||||
}
|
}
|
||||||
|
$this->initManagers();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
$classes = array();
|
/**
|
||||||
if (Event::handle('StartIoManagerClasses', array(&$classes))) {
|
* Initialize IoManagers for the currently configured site
|
||||||
$classes[] = 'QueueManager';
|
* which are appropriate to this instance.
|
||||||
if (common_config('xmpp', 'enabled') && !defined('XMPP_EMERGENCY_FLAG')) {
|
*
|
||||||
$classes[] = 'XmppManager'; // handles pings/reconnects
|
* Pass class names into $this->instantiate()
|
||||||
$classes[] = 'XmppConfirmManager'; // polls for outgoing confirmations
|
*/
|
||||||
}
|
abstract function initManagers();
|
||||||
}
|
|
||||||
Event::handle('EndIoManagerClasses', array(&$classes));
|
|
||||||
|
|
||||||
foreach ($classes as $class) {
|
|
||||||
$this->instantiate($class);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Pull all local sites from status_network table.
|
* Pull all local sites from status_network table.
|
||||||
@ -170,7 +165,7 @@ class IoMaster
|
|||||||
$write = array();
|
$write = array();
|
||||||
$except = array();
|
$except = array();
|
||||||
$this->logState('listening');
|
$this->logState('listening');
|
||||||
common_log(LOG_INFO, "Waiting up to $timeout seconds for socket data...");
|
common_log(LOG_DEBUG, "Waiting up to $timeout seconds for socket data...");
|
||||||
$ready = stream_select($read, $write, $except, $timeout, 0);
|
$ready = stream_select($read, $write, $except, $timeout, 0);
|
||||||
|
|
||||||
if ($ready === false) {
|
if ($ready === false) {
|
||||||
@ -190,7 +185,7 @@ class IoMaster
|
|||||||
|
|
||||||
if ($timeout > 0 && empty($sockets)) {
|
if ($timeout > 0 && empty($sockets)) {
|
||||||
// If we had no listeners, sleep until the pollers' next requested wakeup.
|
// If we had no listeners, sleep until the pollers' next requested wakeup.
|
||||||
common_log(LOG_INFO, "Sleeping $timeout seconds until next poll cycle...");
|
common_log(LOG_DEBUG, "Sleeping $timeout seconds until next poll cycle...");
|
||||||
$this->logState('sleep');
|
$this->logState('sleep');
|
||||||
sleep($timeout);
|
sleep($timeout);
|
||||||
}
|
}
|
||||||
@ -207,6 +202,8 @@ class IoMaster
|
|||||||
if ($usage > $memoryLimit) {
|
if ($usage > $memoryLimit) {
|
||||||
common_log(LOG_INFO, "Queue thread hit soft memory limit ($usage > $memoryLimit); gracefully restarting.");
|
common_log(LOG_INFO, "Queue thread hit soft memory limit ($usage > $memoryLimit); gracefully restarting.");
|
||||||
break;
|
break;
|
||||||
|
} else if (common_config('queue', 'debug_memory')) {
|
||||||
|
common_log(LOG_DEBUG, "Memory usage $usage");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -223,8 +220,7 @@ class IoMaster
|
|||||||
{
|
{
|
||||||
$softLimit = trim(common_config('queue', 'softlimit'));
|
$softLimit = trim(common_config('queue', 'softlimit'));
|
||||||
if (substr($softLimit, -1) == '%') {
|
if (substr($softLimit, -1) == '%') {
|
||||||
$limit = trim(ini_get('memory_limit'));
|
$limit = $this->parseMemoryLimit(ini_get('memory_limit'));
|
||||||
$limit = $this->parseMemoryLimit($limit);
|
|
||||||
if ($limit > 0) {
|
if ($limit > 0) {
|
||||||
return intval(substr($softLimit, 0, -1) * $limit / 100);
|
return intval(substr($softLimit, 0, -1) * $limit / 100);
|
||||||
} else {
|
} else {
|
||||||
@ -242,9 +238,10 @@ class IoMaster
|
|||||||
* @param string $mem
|
* @param string $mem
|
||||||
* @return int
|
* @return int
|
||||||
*/
|
*/
|
||||||
protected function parseMemoryLimit($mem)
|
public function parseMemoryLimit($mem)
|
||||||
{
|
{
|
||||||
// http://www.php.net/manual/en/faq.using.php#faq.using.shorthandbytes
|
// http://www.php.net/manual/en/faq.using.php#faq.using.shorthandbytes
|
||||||
|
$mem = strtolower(trim($mem));
|
||||||
$size = array('k' => 1024,
|
$size = array('k' => 1024,
|
||||||
'm' => 1024*1024,
|
'm' => 1024*1024,
|
||||||
'g' => 1024*1024*1024);
|
'g' => 1024*1024*1024);
|
||||||
@ -253,7 +250,7 @@ class IoMaster
|
|||||||
} else if (is_numeric($mem)) {
|
} else if (is_numeric($mem)) {
|
||||||
return intval($mem);
|
return intval($mem);
|
||||||
} else {
|
} else {
|
||||||
$mult = strtolower(substr($mem, -1));
|
$mult = substr($mem, -1);
|
||||||
if (isset($size[$mult])) {
|
if (isset($size[$mult])) {
|
||||||
return substr($mem, 0, -1) * $size[$mult];
|
return substr($mem, 0, -1) * $size[$mult];
|
||||||
} else {
|
} else {
|
||||||
|
@ -85,6 +85,27 @@ class Sharing_XMPP extends XMPPHP_XMPP
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Build an XMPP proxy connection that'll save outgoing messages
|
||||||
|
* to the 'xmppout' queue to be picked up by xmppdaemon later.
|
||||||
|
*/
|
||||||
|
function jabber_proxy()
|
||||||
|
{
|
||||||
|
$proxy = new Queued_XMPP(common_config('xmpp', 'host') ?
|
||||||
|
common_config('xmpp', 'host') :
|
||||||
|
common_config('xmpp', 'server'),
|
||||||
|
common_config('xmpp', 'port'),
|
||||||
|
common_config('xmpp', 'user'),
|
||||||
|
common_config('xmpp', 'password'),
|
||||||
|
common_config('xmpp', 'resource') . 'daemon',
|
||||||
|
common_config('xmpp', 'server'),
|
||||||
|
common_config('xmpp', 'debug') ?
|
||||||
|
true : false,
|
||||||
|
common_config('xmpp', 'debug') ?
|
||||||
|
XMPPHP_Log::LEVEL_VERBOSE : null);
|
||||||
|
return $proxy;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Lazy-connect the configured Jabber account to the configured server;
|
* Lazy-connect the configured Jabber account to the configured server;
|
||||||
* if already opened, the same connection will be returned.
|
* if already opened, the same connection will be returned.
|
||||||
@ -143,7 +164,7 @@ function jabber_connect($resource=null)
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* send a single notice to a given Jabber address
|
* Queue send for a single notice to a given Jabber address
|
||||||
*
|
*
|
||||||
* @param string $to JID to send the notice to
|
* @param string $to JID to send the notice to
|
||||||
* @param Notice $notice notice to send
|
* @param Notice $notice notice to send
|
||||||
@ -153,10 +174,7 @@ function jabber_connect($resource=null)
|
|||||||
|
|
||||||
function jabber_send_notice($to, $notice)
|
function jabber_send_notice($to, $notice)
|
||||||
{
|
{
|
||||||
$conn = jabber_connect();
|
$conn = jabber_proxy();
|
||||||
if (!$conn) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
$profile = Profile::staticGet($notice->profile_id);
|
$profile = Profile::staticGet($notice->profile_id);
|
||||||
if (!$profile) {
|
if (!$profile) {
|
||||||
common_log(LOG_WARNING, 'Refusing to send notice with ' .
|
common_log(LOG_WARNING, 'Refusing to send notice with ' .
|
||||||
@ -221,10 +239,7 @@ function jabber_format_entry($profile, $notice)
|
|||||||
|
|
||||||
function jabber_send_message($to, $body, $type='chat', $subject=null)
|
function jabber_send_message($to, $body, $type='chat', $subject=null)
|
||||||
{
|
{
|
||||||
$conn = jabber_connect();
|
$conn = jabber_proxy();
|
||||||
if (!$conn) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
$conn->message($to, $body, $type, $subject);
|
$conn->message($to, $body, $type, $subject);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
@ -319,7 +334,7 @@ function jabber_special_presence($type, $to=null, $show=null, $status=null)
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* broadcast a notice to all subscribers and reply recipients
|
* Queue broadcast of a notice to all subscribers and reply recipients
|
||||||
*
|
*
|
||||||
* This function will send a notice to all subscribers on the local server
|
* This function will send a notice to all subscribers on the local server
|
||||||
* who have Jabber addresses, and have Jabber notification enabled, and
|
* who have Jabber addresses, and have Jabber notification enabled, and
|
||||||
@ -354,7 +369,7 @@ function jabber_broadcast_notice($notice)
|
|||||||
|
|
||||||
$sent_to = array();
|
$sent_to = array();
|
||||||
|
|
||||||
$conn = jabber_connect();
|
$conn = jabber_proxy();
|
||||||
|
|
||||||
$ni = $notice->whoGets();
|
$ni = $notice->whoGets();
|
||||||
|
|
||||||
@ -389,14 +404,13 @@ function jabber_broadcast_notice($notice)
|
|||||||
'Sending notice ' . $notice->id . ' to ' . $user->jabber,
|
'Sending notice ' . $notice->id . ' to ' . $user->jabber,
|
||||||
__FILE__);
|
__FILE__);
|
||||||
$conn->message($user->jabber, $msg, 'chat', null, $entry);
|
$conn->message($user->jabber, $msg, 'chat', null, $entry);
|
||||||
$conn->processTime(0);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* send a notice to all public listeners
|
* Queue send of a notice to all public listeners
|
||||||
*
|
*
|
||||||
* For notices that are generated on the local system (by users), we can optionally
|
* For notices that are generated on the local system (by users), we can optionally
|
||||||
* forward them to remote listeners by XMPP.
|
* forward them to remote listeners by XMPP.
|
||||||
@ -429,7 +443,7 @@ function jabber_public_notice($notice)
|
|||||||
$msg = jabber_format_notice($profile, $notice);
|
$msg = jabber_format_notice($profile, $notice);
|
||||||
$entry = jabber_format_entry($profile, $notice);
|
$entry = jabber_format_entry($profile, $notice);
|
||||||
|
|
||||||
$conn = jabber_connect();
|
$conn = jabber_proxy();
|
||||||
|
|
||||||
foreach ($public as $address) {
|
foreach ($public as $address) {
|
||||||
common_log(LOG_INFO,
|
common_log(LOG_INFO,
|
||||||
@ -437,7 +451,6 @@ function jabber_public_notice($notice)
|
|||||||
' to public listener ' . $address,
|
' to public listener ' . $address,
|
||||||
__FILE__);
|
__FILE__);
|
||||||
$conn->message($address, $msg, 'chat', null, $entry);
|
$conn->message($address, $msg, 'chat', null, $entry);
|
||||||
$conn->processTime(0);
|
|
||||||
}
|
}
|
||||||
$profile->free();
|
$profile->free();
|
||||||
}
|
}
|
||||||
|
@ -34,14 +34,14 @@ class JabberQueueHandler extends QueueHandler
|
|||||||
return 'jabber';
|
return 'jabber';
|
||||||
}
|
}
|
||||||
|
|
||||||
function handle_notice($notice)
|
function handle($notice)
|
||||||
{
|
{
|
||||||
require_once(INSTALLDIR.'/lib/jabber.php');
|
require_once(INSTALLDIR.'/lib/jabber.php');
|
||||||
try {
|
try {
|
||||||
return jabber_broadcast_notice($notice);
|
return jabber_broadcast_notice($notice);
|
||||||
} catch (XMPPHP_Exception $e) {
|
} catch (XMPPHP_Exception $e) {
|
||||||
$this->log(LOG_ERR, "Got an XMPPHP_Exception: " . $e->getMessage());
|
$this->log(LOG_ERR, "Got an XMPPHP_Exception: " . $e->getMessage());
|
||||||
exit(1);
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -36,7 +36,7 @@ class OmbQueueHandler extends QueueHandler
|
|||||||
* @fixme doesn't currently report failure back to the queue manager
|
* @fixme doesn't currently report failure back to the queue manager
|
||||||
* because omb_broadcast_notice() doesn't report it to us
|
* because omb_broadcast_notice() doesn't report it to us
|
||||||
*/
|
*/
|
||||||
function handle_notice($notice)
|
function handle($notice)
|
||||||
{
|
{
|
||||||
if ($this->is_remote($notice)) {
|
if ($this->is_remote($notice)) {
|
||||||
$this->log(LOG_DEBUG, 'Ignoring remote notice ' . $notice->id);
|
$this->log(LOG_DEBUG, 'Ignoring remote notice ' . $notice->id);
|
||||||
|
@ -30,7 +30,7 @@ class PingQueueHandler extends QueueHandler {
|
|||||||
return 'ping';
|
return 'ping';
|
||||||
}
|
}
|
||||||
|
|
||||||
function handle_notice($notice) {
|
function handle($notice) {
|
||||||
require_once INSTALLDIR . '/lib/ping.php';
|
require_once INSTALLDIR . '/lib/ping.php';
|
||||||
return ping_broadcast_notice($notice);
|
return ping_broadcast_notice($notice);
|
||||||
}
|
}
|
||||||
|
@ -42,7 +42,7 @@ class PluginQueueHandler extends QueueHandler
|
|||||||
return 'plugin';
|
return 'plugin';
|
||||||
}
|
}
|
||||||
|
|
||||||
function handle_notice($notice)
|
function handle($notice)
|
||||||
{
|
{
|
||||||
Event::handle('HandleQueuedNotice', array(&$notice));
|
Event::handle('HandleQueuedNotice', array(&$notice));
|
||||||
return true;
|
return true;
|
||||||
|
@ -23,7 +23,6 @@ if (!defined('STATUSNET') && !defined('LACONICA')) {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Queue handler for pushing new notices to public XMPP subscribers.
|
* Queue handler for pushing new notices to public XMPP subscribers.
|
||||||
* @fixme correct this exception handling
|
|
||||||
*/
|
*/
|
||||||
class PublicQueueHandler extends QueueHandler
|
class PublicQueueHandler extends QueueHandler
|
||||||
{
|
{
|
||||||
@ -33,15 +32,14 @@ class PublicQueueHandler extends QueueHandler
|
|||||||
return 'public';
|
return 'public';
|
||||||
}
|
}
|
||||||
|
|
||||||
function handle_notice($notice)
|
function handle($notice)
|
||||||
{
|
{
|
||||||
require_once(INSTALLDIR.'/lib/jabber.php');
|
require_once(INSTALLDIR.'/lib/jabber.php');
|
||||||
try {
|
try {
|
||||||
return jabber_public_notice($notice);
|
return jabber_public_notice($notice);
|
||||||
} catch (XMPPHP_Exception $e) {
|
} catch (XMPPHP_Exception $e) {
|
||||||
$this->log(LOG_ERR, "Got an XMPPHP_Exception: " . $e->getMessage());
|
$this->log(LOG_ERR, "Got an XMPPHP_Exception: " . $e->getMessage());
|
||||||
die($e->getMessage());
|
return false;
|
||||||
}
|
}
|
||||||
return true;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
117
lib/queued_xmpp.php
Normal file
117
lib/queued_xmpp.php
Normal file
@ -0,0 +1,117 @@
|
|||||||
|
<?php
|
||||||
|
/**
|
||||||
|
* StatusNet, the distributed open-source microblogging tool
|
||||||
|
*
|
||||||
|
* Queue-mediated proxy class for outgoing XMPP messages.
|
||||||
|
*
|
||||||
|
* PHP version 5
|
||||||
|
*
|
||||||
|
* LICENCE: This program is free software: you can redistribute it and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License as published by
|
||||||
|
* the Free Software Foundation, either version 3 of the License, or
|
||||||
|
* (at your option) any later version.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU Affero General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*
|
||||||
|
* @category Network
|
||||||
|
* @package StatusNet
|
||||||
|
* @author Brion Vibber <brion@status.net>
|
||||||
|
* @copyright 2010 StatusNet, Inc.
|
||||||
|
* @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0
|
||||||
|
* @link http://status.net/
|
||||||
|
*/
|
||||||
|
|
||||||
|
if (!defined('STATUSNET') && !defined('LACONICA')) {
|
||||||
|
exit(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
require_once INSTALLDIR . '/lib/jabber.php';
|
||||||
|
|
||||||
|
class Queued_XMPP extends XMPPHP_XMPP
|
||||||
|
{
|
||||||
|
/**
|
||||||
|
* Constructor
|
||||||
|
*
|
||||||
|
* @param string $host
|
||||||
|
* @param integer $port
|
||||||
|
* @param string $user
|
||||||
|
* @param string $password
|
||||||
|
* @param string $resource
|
||||||
|
* @param string $server
|
||||||
|
* @param boolean $printlog
|
||||||
|
* @param string $loglevel
|
||||||
|
*/
|
||||||
|
public function __construct($host, $port, $user, $password, $resource, $server = null, $printlog = false, $loglevel = null)
|
||||||
|
{
|
||||||
|
parent::__construct($host, $port, $user, $password, $resource, $server, $printlog, $loglevel);
|
||||||
|
// Normally the fulljid isn't filled out until resource binding time;
|
||||||
|
// we need to save it here since we're not talking to a real server.
|
||||||
|
$this->fulljid = "{$this->basejid}/{$this->resource}";
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Send a formatted message to the outgoing queue for later forwarding
|
||||||
|
* to a real XMPP connection.
|
||||||
|
*
|
||||||
|
* @param string $msg
|
||||||
|
*/
|
||||||
|
public function send($msg, $timeout=NULL)
|
||||||
|
{
|
||||||
|
$qm = QueueManager::get();
|
||||||
|
$qm->enqueue(strval($msg), 'xmppout');
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Since we'll be getting input through a queue system's run loop,
|
||||||
|
* we'll process one standalone message at a time rather than our
|
||||||
|
* own XMPP message pump.
|
||||||
|
*
|
||||||
|
* @param string $message
|
||||||
|
*/
|
||||||
|
public function processMessage($message) {
|
||||||
|
$frame = array_shift($this->frames);
|
||||||
|
xml_parse($this->parser, $frame->body, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
//@{
|
||||||
|
/**
|
||||||
|
* Stream i/o functions disabled; push input through processMessage()
|
||||||
|
*/
|
||||||
|
public function connect($timeout = 30, $persistent = false, $sendinit = true)
|
||||||
|
{
|
||||||
|
throw new Exception("Can't connect to server from XMPP queue proxy.");
|
||||||
|
}
|
||||||
|
|
||||||
|
public function disconnect()
|
||||||
|
{
|
||||||
|
throw new Exception("Can't connect to server from XMPP queue proxy.");
|
||||||
|
}
|
||||||
|
|
||||||
|
public function process()
|
||||||
|
{
|
||||||
|
throw new Exception("Can't read stream from XMPP queue proxy.");
|
||||||
|
}
|
||||||
|
|
||||||
|
public function processUntil($event, $timeout=-1)
|
||||||
|
{
|
||||||
|
throw new Exception("Can't read stream from XMPP queue proxy.");
|
||||||
|
}
|
||||||
|
|
||||||
|
public function read()
|
||||||
|
{
|
||||||
|
throw new Exception("Can't read stream from XMPP queue proxy.");
|
||||||
|
}
|
||||||
|
|
||||||
|
public function readyToProcess()
|
||||||
|
{
|
||||||
|
throw new Exception("Can't read stream from XMPP queue proxy.");
|
||||||
|
}
|
||||||
|
//@}
|
||||||
|
}
|
||||||
|
|
@ -22,51 +22,20 @@ if (!defined('STATUSNET') && !defined('LACONICA')) { exit(1); }
|
|||||||
/**
|
/**
|
||||||
* Base class for queue handlers.
|
* Base class for queue handlers.
|
||||||
*
|
*
|
||||||
* As extensions of the Daemon class, each queue handler has the ability
|
* As of 0.9, queue handlers are short-lived for items as they are
|
||||||
* to launch itself in the background, at which point it'll pass control
|
* dequeued by a QueueManager running in an IoMaster in a daemon
|
||||||
* to the configured QueueManager class to poll for updates.
|
* such as queuedaemon.php.
|
||||||
|
*
|
||||||
|
* Extensions requiring long-running maintenance or polling should
|
||||||
|
* register an IoManager.
|
||||||
*
|
*
|
||||||
* Subclasses must override at least the following methods:
|
* Subclasses must override at least the following methods:
|
||||||
* - transport
|
* - transport
|
||||||
* - handle_notice
|
* - handle
|
||||||
*/
|
*/
|
||||||
#class QueueHandler extends Daemon
|
|
||||||
class QueueHandler
|
class QueueHandler
|
||||||
{
|
{
|
||||||
|
|
||||||
# function __construct($id=null, $daemonize=true)
|
|
||||||
# {
|
|
||||||
# parent::__construct($daemonize);
|
|
||||||
#
|
|
||||||
# if ($id) {
|
|
||||||
# $this->set_id($id);
|
|
||||||
# }
|
|
||||||
# }
|
|
||||||
|
|
||||||
/**
|
|
||||||
* How many seconds a polling-based queue manager should wait between
|
|
||||||
* checks for new items to handle.
|
|
||||||
*
|
|
||||||
* Defaults to 60 seconds; override to speed up or slow down.
|
|
||||||
*
|
|
||||||
* @fixme not really compatible with global queue manager
|
|
||||||
* @return int timeout in seconds
|
|
||||||
*/
|
|
||||||
# function timeout()
|
|
||||||
# {
|
|
||||||
# return 60;
|
|
||||||
# }
|
|
||||||
|
|
||||||
# function class_name()
|
|
||||||
# {
|
|
||||||
# return ucfirst($this->transport()) . 'Handler';
|
|
||||||
# }
|
|
||||||
|
|
||||||
# function name()
|
|
||||||
# {
|
|
||||||
# return strtolower($this->class_name().'.'.$this->get_id());
|
|
||||||
# }
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return transport keyword which identifies items this queue handler
|
* Return transport keyword which identifies items this queue handler
|
||||||
* services; must be defined for all subclasses.
|
* services; must be defined for all subclasses.
|
||||||
@ -83,61 +52,17 @@ class QueueHandler
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Here's the meat of your queue handler -- you're handed a Notice
|
* Here's the meat of your queue handler -- you're handed a Notice
|
||||||
* object, which you may do as you will with.
|
* or other object, which you may do as you will with.
|
||||||
*
|
*
|
||||||
* If this function indicates failure, a warning will be logged
|
* If this function indicates failure, a warning will be logged
|
||||||
* and the item is placed back in the queue to be re-run.
|
* and the item is placed back in the queue to be re-run.
|
||||||
*
|
*
|
||||||
* @param Notice $notice
|
* @param mixed $object
|
||||||
* @return boolean true on success, false on failure
|
* @return boolean true on success, false on failure
|
||||||
*/
|
*/
|
||||||
function handle_notice($notice)
|
function handle($object)
|
||||||
{
|
{
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Setup and start of run loop for this queue handler as a daemon.
|
|
||||||
* Most of the heavy lifting is passed on to the QueueManager's service()
|
|
||||||
* method, which passes control back to our handle_notice() method for
|
|
||||||
* each notice that comes in on the queue.
|
|
||||||
*
|
|
||||||
* Most of the time this won't need to be overridden in a subclass.
|
|
||||||
*
|
|
||||||
* @return boolean true on success, false on failure
|
|
||||||
*/
|
|
||||||
function run()
|
|
||||||
{
|
|
||||||
if (!$this->start()) {
|
|
||||||
$this->log(LOG_WARNING, 'failed to start');
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
$this->log(LOG_INFO, 'checking for queued notices');
|
|
||||||
|
|
||||||
$queue = $this->transport();
|
|
||||||
$timeout = $this->timeout();
|
|
||||||
|
|
||||||
$qm = QueueManager::get();
|
|
||||||
|
|
||||||
$qm->service($queue, $this);
|
|
||||||
|
|
||||||
$this->log(LOG_INFO, 'finished servicing the queue');
|
|
||||||
|
|
||||||
if (!$this->finish()) {
|
|
||||||
$this->log(LOG_WARNING, 'failed to clean up');
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
$this->log(LOG_INFO, 'terminating normally');
|
|
||||||
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
function log($level, $msg)
|
|
||||||
{
|
|
||||||
common_log($level, $this->class_name() . ' ('. $this->get_id() .'): '.$msg);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -39,6 +39,10 @@ abstract class QueueManager extends IoManager
|
|||||||
{
|
{
|
||||||
static $qm = null;
|
static $qm = null;
|
||||||
|
|
||||||
|
public $master = null;
|
||||||
|
public $handlers = array();
|
||||||
|
public $groups = array();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Factory function to pull the appropriate QueueManager object
|
* Factory function to pull the appropriate QueueManager object
|
||||||
* for this site's configuration. It can then be used to queue
|
* for this site's configuration. It can then be used to queue
|
||||||
@ -109,6 +113,64 @@ abstract class QueueManager extends IoManager
|
|||||||
*/
|
*/
|
||||||
abstract function enqueue($object, $queue);
|
abstract function enqueue($object, $queue);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Build a representation for an object for logging
|
||||||
|
* @param mixed
|
||||||
|
* @return string
|
||||||
|
*/
|
||||||
|
function logrep($object) {
|
||||||
|
if (is_object($object)) {
|
||||||
|
$class = get_class($object);
|
||||||
|
if (isset($object->id)) {
|
||||||
|
return "$class $object->id";
|
||||||
|
}
|
||||||
|
return $class;
|
||||||
|
}
|
||||||
|
if (is_string($object)) {
|
||||||
|
$len = strlen($object);
|
||||||
|
$fragment = mb_substr($object, 0, 32);
|
||||||
|
if (mb_strlen($object) > 32) {
|
||||||
|
$fragment .= '...';
|
||||||
|
}
|
||||||
|
return "string '$fragment' ($len bytes)";
|
||||||
|
}
|
||||||
|
return strval($object);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Encode an object for queued storage.
|
||||||
|
* Next gen may use serialization.
|
||||||
|
*
|
||||||
|
* @param mixed $object
|
||||||
|
* @return string
|
||||||
|
*/
|
||||||
|
protected function encode($object)
|
||||||
|
{
|
||||||
|
if ($object instanceof Notice) {
|
||||||
|
return $object->id;
|
||||||
|
} else if (is_string($object)) {
|
||||||
|
return $object;
|
||||||
|
} else {
|
||||||
|
throw new ServerException("Can't queue this type", 500);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Decode an object from queued storage.
|
||||||
|
* Accepts back-compat notice reference entries and strings for now.
|
||||||
|
*
|
||||||
|
* @param string
|
||||||
|
* @return mixed
|
||||||
|
*/
|
||||||
|
protected function decode($frame)
|
||||||
|
{
|
||||||
|
if (is_numeric($frame)) {
|
||||||
|
return Notice::staticGet(intval($frame));
|
||||||
|
} else {
|
||||||
|
return $frame;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Instantiate the appropriate QueueHandler class for the given queue.
|
* Instantiate the appropriate QueueHandler class for the given queue.
|
||||||
*
|
*
|
||||||
@ -131,13 +193,15 @@ abstract class QueueManager extends IoManager
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get a list of all registered queue transport names.
|
* Get a list of registered queue transport names to be used
|
||||||
|
* for this daemon.
|
||||||
*
|
*
|
||||||
* @return array of strings
|
* @return array of strings
|
||||||
*/
|
*/
|
||||||
function getQueues()
|
function getQueues()
|
||||||
{
|
{
|
||||||
return array_keys($this->handlers);
|
$group = $this->activeGroup();
|
||||||
|
return array_keys($this->groups[$group]);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -148,34 +212,30 @@ abstract class QueueManager extends IoManager
|
|||||||
*/
|
*/
|
||||||
function initialize()
|
function initialize()
|
||||||
{
|
{
|
||||||
|
// @fixme we'll want to be able to listen to particular queues...
|
||||||
if (Event::handle('StartInitializeQueueManager', array($this))) {
|
if (Event::handle('StartInitializeQueueManager', array($this))) {
|
||||||
if (!defined('XMPP_ONLY_FLAG')) { // hack!
|
|
||||||
$this->connect('plugin', 'PluginQueueHandler');
|
$this->connect('plugin', 'PluginQueueHandler');
|
||||||
$this->connect('omb', 'OmbQueueHandler');
|
$this->connect('omb', 'OmbQueueHandler');
|
||||||
$this->connect('ping', 'PingQueueHandler');
|
$this->connect('ping', 'PingQueueHandler');
|
||||||
if (common_config('sms', 'enabled')) {
|
if (common_config('sms', 'enabled')) {
|
||||||
$this->connect('sms', 'SmsQueueHandler');
|
$this->connect('sms', 'SmsQueueHandler');
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
// XMPP output handlers...
|
// XMPP output handlers...
|
||||||
if (common_config('xmpp', 'enabled') && !defined('XMPP_EMERGENCY_FLAG')) {
|
|
||||||
$this->connect('jabber', 'JabberQueueHandler');
|
$this->connect('jabber', 'JabberQueueHandler');
|
||||||
$this->connect('public', 'PublicQueueHandler');
|
$this->connect('public', 'PublicQueueHandler');
|
||||||
|
|
||||||
// @fixme this should move up a level or should get an actual queue
|
// @fixme this should get an actual queue
|
||||||
$this->connect('confirm', 'XmppConfirmHandler');
|
//$this->connect('confirm', 'XmppConfirmHandler');
|
||||||
}
|
|
||||||
|
|
||||||
if (!defined('XMPP_ONLY_FLAG')) { // hack!
|
|
||||||
// For compat with old plugins not registering their own handlers.
|
// For compat with old plugins not registering their own handlers.
|
||||||
$this->connect('plugin', 'PluginQueueHandler');
|
$this->connect('plugin', 'PluginQueueHandler');
|
||||||
|
|
||||||
|
$this->connect('xmppout', 'XmppOutQueueHandler', 'xmppdaemon');
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
if (!defined('XMPP_ONLY_FLAG')) { // hack!
|
|
||||||
Event::handle('EndInitializeQueueManager', array($this));
|
Event::handle('EndInitializeQueueManager', array($this));
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Register a queue transport name and handler class for your plugin.
|
* Register a queue transport name and handler class for your plugin.
|
||||||
@ -183,10 +243,27 @@ abstract class QueueManager extends IoManager
|
|||||||
*
|
*
|
||||||
* @param string $transport
|
* @param string $transport
|
||||||
* @param string $class
|
* @param string $class
|
||||||
|
* @param string $group
|
||||||
*/
|
*/
|
||||||
public function connect($transport, $class)
|
public function connect($transport, $class, $group='queuedaemon')
|
||||||
{
|
{
|
||||||
$this->handlers[$transport] = $class;
|
$this->handlers[$transport] = $class;
|
||||||
|
$this->groups[$group][$transport] = $class;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return string queue group to use for this request
|
||||||
|
*/
|
||||||
|
function activeGroup()
|
||||||
|
{
|
||||||
|
$group = 'queuedaemon';
|
||||||
|
if ($this->master) {
|
||||||
|
// hack hack
|
||||||
|
if ($this->master instanceof XmppMaster) {
|
||||||
|
return 'xmppdaemon';
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return $group;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -31,7 +31,7 @@ class SmsQueueHandler extends QueueHandler
|
|||||||
return 'sms';
|
return 'sms';
|
||||||
}
|
}
|
||||||
|
|
||||||
function handle_notice($notice)
|
function handle($notice)
|
||||||
{
|
{
|
||||||
require_once(INSTALLDIR.'/lib/mail.php');
|
require_once(INSTALLDIR.'/lib/mail.php');
|
||||||
return mail_broadcast_notice_sms($notice);
|
return mail_broadcast_notice_sms($notice);
|
||||||
|
159
lib/spawningdaemon.php
Normal file
159
lib/spawningdaemon.php
Normal file
@ -0,0 +1,159 @@
|
|||||||
|
<?php
|
||||||
|
/*
|
||||||
|
* StatusNet - the distributed open-source microblogging tool
|
||||||
|
* Copyright (C) 2010, StatusNet, Inc.
|
||||||
|
*
|
||||||
|
* This program is free software: you can redistribute it and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License as published by
|
||||||
|
* the Free Software Foundation, either version 3 of the License, or
|
||||||
|
* (at your option) any later version.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU Affero General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Base class for daemon that can launch one or more processing threads,
|
||||||
|
* respawning them if they exit.
|
||||||
|
*
|
||||||
|
* This is mainly intended for indefinite workloads such as monitoring
|
||||||
|
* a queue or maintaining an IM channel.
|
||||||
|
*
|
||||||
|
* Child classes should implement the
|
||||||
|
*
|
||||||
|
* We can then pass individual items through the QueueHandler subclasses
|
||||||
|
* they belong to. We additionally can handle queues for multiple sites.
|
||||||
|
*
|
||||||
|
* @package QueueHandler
|
||||||
|
* @author Brion Vibber <brion@status.net>
|
||||||
|
*/
|
||||||
|
abstract class SpawningDaemon extends Daemon
|
||||||
|
{
|
||||||
|
protected $threads=1;
|
||||||
|
|
||||||
|
function __construct($id=null, $daemonize=true, $threads=1)
|
||||||
|
{
|
||||||
|
parent::__construct($daemonize);
|
||||||
|
|
||||||
|
if ($id) {
|
||||||
|
$this->set_id($id);
|
||||||
|
}
|
||||||
|
$this->threads = $threads;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Perform some actual work!
|
||||||
|
*
|
||||||
|
* @return boolean true on success, false on failure
|
||||||
|
*/
|
||||||
|
public abstract function runThread();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Spawn one or more background processes and let them start running.
|
||||||
|
* Each individual process will execute whatever's in the runThread()
|
||||||
|
* method, which should be overridden.
|
||||||
|
*
|
||||||
|
* Child processes will be automatically respawned when they exit.
|
||||||
|
*
|
||||||
|
* @todo possibly allow for not respawning on "normal" exits...
|
||||||
|
* though ParallelizingDaemon is probably better for workloads
|
||||||
|
* that have forseeable endpoints.
|
||||||
|
*/
|
||||||
|
function run()
|
||||||
|
{
|
||||||
|
$children = array();
|
||||||
|
for ($i = 1; $i <= $this->threads; $i++) {
|
||||||
|
$pid = pcntl_fork();
|
||||||
|
if ($pid < 0) {
|
||||||
|
$this->log(LOG_ERROR, "Couldn't fork for thread $i; aborting\n");
|
||||||
|
exit(1);
|
||||||
|
} else if ($pid == 0) {
|
||||||
|
$this->initAndRunChild($i);
|
||||||
|
} else {
|
||||||
|
$this->log(LOG_INFO, "Spawned thread $i as pid $pid");
|
||||||
|
$children[$i] = $pid;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
$this->log(LOG_INFO, "Waiting for children to complete.");
|
||||||
|
while (count($children) > 0) {
|
||||||
|
$status = null;
|
||||||
|
$pid = pcntl_wait($status);
|
||||||
|
if ($pid > 0) {
|
||||||
|
$i = array_search($pid, $children);
|
||||||
|
if ($i === false) {
|
||||||
|
$this->log(LOG_ERR, "Unrecognized child pid $pid exited!");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
unset($children[$i]);
|
||||||
|
$this->log(LOG_INFO, "Thread $i pid $pid exited.");
|
||||||
|
|
||||||
|
$pid = pcntl_fork();
|
||||||
|
if ($pid < 0) {
|
||||||
|
$this->log(LOG_ERROR, "Couldn't fork to respawn thread $i; aborting thread.\n");
|
||||||
|
} else if ($pid == 0) {
|
||||||
|
$this->initAndRunChild($i);
|
||||||
|
} else {
|
||||||
|
$this->log(LOG_INFO, "Respawned thread $i as pid $pid");
|
||||||
|
$children[$i] = $pid;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
$this->log(LOG_INFO, "All child processes complete.");
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Initialize things for a fresh thread, call runThread(), and
|
||||||
|
* exit at completion with appropriate return value.
|
||||||
|
*/
|
||||||
|
protected function initAndRunChild($thread)
|
||||||
|
{
|
||||||
|
$this->set_id($this->get_id() . "." . $thread);
|
||||||
|
$this->resetDb();
|
||||||
|
$ok = $this->runThread();
|
||||||
|
exit($ok ? 0 : 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reconnect to the database for each child process,
|
||||||
|
* or they'll get very confused trying to use the
|
||||||
|
* same socket.
|
||||||
|
*/
|
||||||
|
protected function resetDb()
|
||||||
|
{
|
||||||
|
// @fixme do we need to explicitly open the db too
|
||||||
|
// or is this implied?
|
||||||
|
global $_DB_DATAOBJECT;
|
||||||
|
unset($_DB_DATAOBJECT['CONNECTIONS']);
|
||||||
|
|
||||||
|
// Reconnect main memcached, or threads will stomp on
|
||||||
|
// each other and corrupt their requests.
|
||||||
|
$cache = common_memcache();
|
||||||
|
if ($cache) {
|
||||||
|
$cache->reconnect();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Also reconnect memcached for status_network table.
|
||||||
|
if (!empty(Status_network::$cache)) {
|
||||||
|
Status_network::$cache->close();
|
||||||
|
Status_network::$cache = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function log($level, $msg)
|
||||||
|
{
|
||||||
|
common_log($level, get_class($this) . ' ('. $this->get_id() .'): '.$msg);
|
||||||
|
}
|
||||||
|
|
||||||
|
function name()
|
||||||
|
{
|
||||||
|
return strtolower(get_class($this).'.'.$this->get_id());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -39,7 +39,6 @@ class StompQueueManager extends QueueManager
|
|||||||
var $base = null;
|
var $base = null;
|
||||||
var $con = null;
|
var $con = null;
|
||||||
|
|
||||||
protected $master = null;
|
|
||||||
protected $sites = array();
|
protected $sites = array();
|
||||||
|
|
||||||
function __construct()
|
function __construct()
|
||||||
@ -104,11 +103,12 @@ class StompQueueManager extends QueueManager
|
|||||||
*/
|
*/
|
||||||
function getQueues()
|
function getQueues()
|
||||||
{
|
{
|
||||||
|
$group = $this->activeGroup();
|
||||||
$site = common_config('site', 'server');
|
$site = common_config('site', 'server');
|
||||||
if (empty($this->handlers[$site])) {
|
if (empty($this->groups[$site][$group])) {
|
||||||
return array();
|
return array();
|
||||||
} else {
|
} else {
|
||||||
return array_keys($this->handlers[$site]);
|
return array_keys($this->groups[$site][$group]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -118,10 +118,12 @@ class StompQueueManager extends QueueManager
|
|||||||
*
|
*
|
||||||
* @param string $transport
|
* @param string $transport
|
||||||
* @param string $class
|
* @param string $class
|
||||||
|
* @param string $group
|
||||||
*/
|
*/
|
||||||
public function connect($transport, $class)
|
public function connect($transport, $class, $group='queuedaemon')
|
||||||
{
|
{
|
||||||
$this->handlers[common_config('site', 'server')][$transport] = $class;
|
$this->handlers[common_config('site', 'server')][$transport] = $class;
|
||||||
|
$this->groups[common_config('site', 'server')][$group][$transport] = $class;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -130,23 +132,23 @@ class StompQueueManager extends QueueManager
|
|||||||
*/
|
*/
|
||||||
public function enqueue($object, $queue)
|
public function enqueue($object, $queue)
|
||||||
{
|
{
|
||||||
$notice = $object;
|
$msg = $this->encode($object);
|
||||||
|
$rep = $this->logrep($object);
|
||||||
|
|
||||||
$this->_connect();
|
$this->_connect();
|
||||||
|
|
||||||
// XXX: serialize and send entire notice
|
// XXX: serialize and send entire notice
|
||||||
|
|
||||||
$result = $this->con->send($this->queueName($queue),
|
$result = $this->con->send($this->queueName($queue),
|
||||||
$notice->id, // BODY of the message
|
$msg, // BODY of the message
|
||||||
array ('created' => $notice->created));
|
array ('created' => common_sql_now()));
|
||||||
|
|
||||||
if (!$result) {
|
if (!$result) {
|
||||||
common_log(LOG_ERR, 'Error sending to '.$queue.' queue');
|
common_log(LOG_ERR, "Error sending $rep to $queue queue");
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
common_log(LOG_DEBUG, 'complete remote queueing notice ID = '
|
common_log(LOG_DEBUG, "complete remote queueing $rep for $queue");
|
||||||
. $notice->id . ' for ' . $queue);
|
|
||||||
$this->stats('enqueued', $queue);
|
$this->stats('enqueued', $queue);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -174,7 +176,7 @@ class StompQueueManager extends QueueManager
|
|||||||
$ok = true;
|
$ok = true;
|
||||||
$frames = $this->con->readFrames();
|
$frames = $this->con->readFrames();
|
||||||
foreach ($frames as $frame) {
|
foreach ($frames as $frame) {
|
||||||
$ok = $ok && $this->_handleNotice($frame);
|
$ok = $ok && $this->_handleItem($frame);
|
||||||
}
|
}
|
||||||
return $ok;
|
return $ok;
|
||||||
}
|
}
|
||||||
@ -265,7 +267,7 @@ class StompQueueManager extends QueueManager
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Handle and acknowledge a notice event that's come in through a queue.
|
* Handle and acknowledge an event that's come in through a queue.
|
||||||
*
|
*
|
||||||
* If the queue handler reports failure, the message is requeued for later.
|
* If the queue handler reports failure, the message is requeued for later.
|
||||||
* Missing notices or handler classes will drop the message.
|
* Missing notices or handler classes will drop the message.
|
||||||
@ -276,7 +278,7 @@ class StompQueueManager extends QueueManager
|
|||||||
* @param StompFrame $frame
|
* @param StompFrame $frame
|
||||||
* @return bool
|
* @return bool
|
||||||
*/
|
*/
|
||||||
protected function _handleNotice($frame)
|
protected function _handleItem($frame)
|
||||||
{
|
{
|
||||||
list($site, $queue) = $this->parseDestination($frame->headers['destination']);
|
list($site, $queue) = $this->parseDestination($frame->headers['destination']);
|
||||||
if ($site != common_config('site', 'server')) {
|
if ($site != common_config('site', 'server')) {
|
||||||
@ -284,6 +286,7 @@ class StompQueueManager extends QueueManager
|
|||||||
StatusNet::init($site);
|
StatusNet::init($site);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (is_numeric($frame->body)) {
|
||||||
$id = intval($frame->body);
|
$id = intval($frame->body);
|
||||||
$info = "notice $id posted at {$frame->headers['created']} in queue $queue";
|
$info = "notice $id posted at {$frame->headers['created']} in queue $queue";
|
||||||
|
|
||||||
@ -295,6 +298,13 @@ class StompQueueManager extends QueueManager
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
$item = $notice;
|
||||||
|
} else {
|
||||||
|
// @fixme should we serialize, or json, or what here?
|
||||||
|
$info = "string posted at {$frame->headers['created']} in queue $queue";
|
||||||
|
$item = $frame->body;
|
||||||
|
}
|
||||||
|
|
||||||
$handler = $this->getHandler($queue);
|
$handler = $this->getHandler($queue);
|
||||||
if (!$handler) {
|
if (!$handler) {
|
||||||
$this->_log(LOG_ERROR, "Missing handler class; skipping $info");
|
$this->_log(LOG_ERROR, "Missing handler class; skipping $info");
|
||||||
@ -303,7 +313,7 @@ class StompQueueManager extends QueueManager
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
$ok = $handler->handle_notice($notice);
|
$ok = $handler->handle($item);
|
||||||
|
|
||||||
if (!$ok) {
|
if (!$ok) {
|
||||||
$this->_log(LOG_WARNING, "Failed handling $info");
|
$this->_log(LOG_WARNING, "Failed handling $info");
|
||||||
@ -311,7 +321,7 @@ class StompQueueManager extends QueueManager
|
|||||||
// this kind of queue management ourselves;
|
// this kind of queue management ourselves;
|
||||||
// if we don't ack, it should resend...
|
// if we don't ack, it should resend...
|
||||||
$this->con->ack($frame);
|
$this->con->ack($frame);
|
||||||
$this->enqueue($notice, $queue);
|
$this->enqueue($item, $queue);
|
||||||
$this->stats('requeued', $queue);
|
$this->stats('requeued', $queue);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -1130,7 +1130,8 @@ function common_request_id()
|
|||||||
$pid = getmypid();
|
$pid = getmypid();
|
||||||
$server = common_config('site', 'server');
|
$server = common_config('site', 'server');
|
||||||
if (php_sapi_name() == 'cli') {
|
if (php_sapi_name() == 'cli') {
|
||||||
return "$server:$pid";
|
$script = basename($_SERVER['PHP_SELF']);
|
||||||
|
return "$server:$script:$pid";
|
||||||
} else {
|
} else {
|
||||||
static $req_id = null;
|
static $req_id = null;
|
||||||
if (!isset($req_id)) {
|
if (!isset($req_id)) {
|
||||||
|
@ -1,168 +0,0 @@
|
|||||||
<?php
|
|
||||||
/*
|
|
||||||
* StatusNet - the distributed open-source microblogging tool
|
|
||||||
* Copyright (C) 2008-2010 StatusNet, Inc.
|
|
||||||
*
|
|
||||||
* This program is free software: you can redistribute it and/or modify
|
|
||||||
* it under the terms of the GNU Affero General Public License as published by
|
|
||||||
* the Free Software Foundation, either version 3 of the License, or
|
|
||||||
* (at your option) any later version.
|
|
||||||
*
|
|
||||||
* This program is distributed in the hope that it will be useful,
|
|
||||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
* GNU Affero General Public License for more details.
|
|
||||||
*
|
|
||||||
* You should have received a copy of the GNU Affero General Public License
|
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
*/
|
|
||||||
|
|
||||||
if (!defined('STATUSNET') && !defined('LACONICA')) {
|
|
||||||
exit(1);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Event handler for pushing new confirmations to Jabber users.
|
|
||||||
* @fixme recommend redoing this on a queue-trigger model
|
|
||||||
* @fixme expiration of old items got dropped in the past, put it back?
|
|
||||||
*/
|
|
||||||
class XmppConfirmManager extends IoManager
|
|
||||||
{
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @return mixed XmppConfirmManager, or false if unneeded
|
|
||||||
*/
|
|
||||||
public static function get()
|
|
||||||
{
|
|
||||||
if (common_config('xmpp', 'enabled')) {
|
|
||||||
$site = common_config('site', 'server');
|
|
||||||
return new XmppConfirmManager();
|
|
||||||
} else {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Tell the i/o master we need one instance for each supporting site
|
|
||||||
* being handled in this process.
|
|
||||||
*/
|
|
||||||
public static function multiSite()
|
|
||||||
{
|
|
||||||
return IoManager::INSTANCE_PER_SITE;
|
|
||||||
}
|
|
||||||
|
|
||||||
function __construct()
|
|
||||||
{
|
|
||||||
$this->site = common_config('site', 'server');
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 10 seconds? Really? That seems a bit frequent.
|
|
||||||
*/
|
|
||||||
function pollInterval()
|
|
||||||
{
|
|
||||||
return 10;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Ping!
|
|
||||||
* @return boolean true if we found something
|
|
||||||
*/
|
|
||||||
function poll()
|
|
||||||
{
|
|
||||||
$this->switchSite();
|
|
||||||
$confirm = $this->next_confirm();
|
|
||||||
if ($confirm) {
|
|
||||||
$this->handle_confirm($confirm);
|
|
||||||
return true;
|
|
||||||
} else {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
protected function handle_confirm($confirm)
|
|
||||||
{
|
|
||||||
require_once INSTALLDIR . '/lib/jabber.php';
|
|
||||||
|
|
||||||
common_log(LOG_INFO, 'Sending confirmation for ' . $confirm->address);
|
|
||||||
$user = User::staticGet($confirm->user_id);
|
|
||||||
if (!$user) {
|
|
||||||
common_log(LOG_WARNING, 'Confirmation for unknown user ' . $confirm->user_id);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
$success = jabber_confirm_address($confirm->code,
|
|
||||||
$user->nickname,
|
|
||||||
$confirm->address);
|
|
||||||
if (!$success) {
|
|
||||||
common_log(LOG_ERR, 'Confirmation failed for ' . $confirm->address);
|
|
||||||
# Just let the claim age out; hopefully things work then
|
|
||||||
return;
|
|
||||||
} else {
|
|
||||||
common_log(LOG_INFO, 'Confirmation sent for ' . $confirm->address);
|
|
||||||
# Mark confirmation sent; need a dupe so we don't have the WHERE clause
|
|
||||||
$dupe = Confirm_address::staticGet('code', $confirm->code);
|
|
||||||
if (!$dupe) {
|
|
||||||
common_log(LOG_WARNING, 'Could not refetch confirm', __FILE__);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
$orig = clone($dupe);
|
|
||||||
$dupe->sent = $dupe->claimed;
|
|
||||||
$result = $dupe->update($orig);
|
|
||||||
if (!$result) {
|
|
||||||
common_log_db_error($dupe, 'UPDATE', __FILE__);
|
|
||||||
# Just let the claim age out; hopefully things work then
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected function next_confirm()
|
|
||||||
{
|
|
||||||
$confirm = new Confirm_address();
|
|
||||||
$confirm->whereAdd('claimed IS null');
|
|
||||||
$confirm->whereAdd('sent IS null');
|
|
||||||
# XXX: eventually we could do other confirmations in the queue, too
|
|
||||||
$confirm->address_type = 'jabber';
|
|
||||||
$confirm->orderBy('modified DESC');
|
|
||||||
$confirm->limit(1);
|
|
||||||
if ($confirm->find(true)) {
|
|
||||||
common_log(LOG_INFO, 'Claiming confirmation for ' . $confirm->address);
|
|
||||||
# working around some weird DB_DataObject behaviour
|
|
||||||
$confirm->whereAdd(''); # clears where stuff
|
|
||||||
$original = clone($confirm);
|
|
||||||
$confirm->claimed = common_sql_now();
|
|
||||||
$result = $confirm->update($original);
|
|
||||||
if ($result) {
|
|
||||||
common_log(LOG_INFO, 'Succeeded in claim! '. $result);
|
|
||||||
return $confirm;
|
|
||||||
} else {
|
|
||||||
common_log(LOG_INFO, 'Failed in claim!');
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected function clear_old_confirm_claims()
|
|
||||||
{
|
|
||||||
$confirm = new Confirm();
|
|
||||||
$confirm->claimed = null;
|
|
||||||
$confirm->whereAdd('now() - claimed > '.CLAIM_TIMEOUT);
|
|
||||||
$confirm->update(DB_DATAOBJECT_WHEREADD_ONLY);
|
|
||||||
$confirm->free();
|
|
||||||
unset($confirm);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Make sure we're on the right site configuration
|
|
||||||
*/
|
|
||||||
protected function switchSite()
|
|
||||||
{
|
|
||||||
if ($this->site != common_config('site', 'server')) {
|
|
||||||
common_log(LOG_DEBUG, __METHOD__ . ": switching to site $this->site");
|
|
||||||
$this->stats('switch');
|
|
||||||
StatusNet::init($this->site);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -70,6 +70,7 @@ class XmppManager extends IoManager
|
|||||||
function __construct()
|
function __construct()
|
||||||
{
|
{
|
||||||
$this->site = common_config('site', 'server');
|
$this->site = common_config('site', 'server');
|
||||||
|
$this->resource = common_config('xmpp', 'resource') . 'daemon';
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -86,15 +87,19 @@ class XmppManager extends IoManager
|
|||||||
# Low priority; we don't want to receive messages
|
# Low priority; we don't want to receive messages
|
||||||
|
|
||||||
common_log(LOG_INFO, "INITIALIZE");
|
common_log(LOG_INFO, "INITIALIZE");
|
||||||
$this->conn = jabber_connect($this->resource());
|
$this->conn = jabber_connect($this->resource);
|
||||||
|
|
||||||
if (empty($this->conn)) {
|
if (empty($this->conn)) {
|
||||||
common_log(LOG_ERR, "Couldn't connect to server.");
|
common_log(LOG_ERR, "Couldn't connect to server.");
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
$this->conn->addEventHandler('message', 'forward_message', $this);
|
$this->log(LOG_DEBUG, "Initializing stanza handlers.");
|
||||||
|
|
||||||
|
$this->conn->addEventHandler('message', 'handle_message', $this);
|
||||||
|
$this->conn->addEventHandler('presence', 'handle_presence', $this);
|
||||||
$this->conn->addEventHandler('reconnect', 'handle_reconnect', $this);
|
$this->conn->addEventHandler('reconnect', 'handle_reconnect', $this);
|
||||||
|
|
||||||
$this->conn->setReconnectTimeout(600);
|
$this->conn->setReconnectTimeout(600);
|
||||||
jabber_send_presence("Send me a message to post a notice", 'available', null, 'available', -1);
|
jabber_send_presence("Send me a message to post a notice", 'available', null, 'available', -1);
|
||||||
|
|
||||||
@ -175,12 +180,37 @@ class XmppManager extends IoManager
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* For queue handlers to pass us a message to push out,
|
||||||
|
* if we're active.
|
||||||
|
*
|
||||||
|
* @fixme should this be blocking etc?
|
||||||
|
*
|
||||||
|
* @param string $msg XML stanza to send
|
||||||
|
* @return boolean success
|
||||||
|
*/
|
||||||
|
public function send($msg)
|
||||||
|
{
|
||||||
|
if ($this->conn && !$this->conn->isDisconnected()) {
|
||||||
|
$bytes = $this->conn->send($msg);
|
||||||
|
if ($bytes > 0) {
|
||||||
|
$this->conn->processTime(0);
|
||||||
|
return true;
|
||||||
|
} else {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Can't send right now...
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Send a keepalive ping to the XMPP server.
|
* Send a keepalive ping to the XMPP server.
|
||||||
*/
|
*/
|
||||||
protected function sendPing()
|
protected function sendPing()
|
||||||
{
|
{
|
||||||
$jid = jabber_daemon_address().'/'.$this->resource();
|
$jid = jabber_daemon_address().'/'.$this->resource;
|
||||||
$server = common_config('xmpp', 'server');
|
$server = common_config('xmpp', 'server');
|
||||||
|
|
||||||
if (!isset($this->pingid)) {
|
if (!isset($this->pingid)) {
|
||||||
@ -206,61 +236,239 @@ class XmppManager extends IoManager
|
|||||||
$this->conn->presence(null, 'available', null, 'available', -1);
|
$this->conn->presence(null, 'available', null, 'available', -1);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Callback for Jabber message event.
|
function get_user($from)
|
||||||
*
|
|
||||||
* This connection handles output; if we get a message straight to us,
|
|
||||||
* forward it on to our XmppDaemon listener for processing.
|
|
||||||
*
|
|
||||||
* @param $pl
|
|
||||||
*/
|
|
||||||
function forward_message(&$pl)
|
|
||||||
{
|
{
|
||||||
|
$user = User::staticGet('jabber', jabber_normalize_jid($from));
|
||||||
|
return $user;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* XMPP callback for handling message input...
|
||||||
|
* @param array $pl XMPP payload
|
||||||
|
*/
|
||||||
|
function handle_message(&$pl)
|
||||||
|
{
|
||||||
|
$from = jabber_normalize_jid($pl['from']);
|
||||||
|
|
||||||
if ($pl['type'] != 'chat') {
|
if ($pl['type'] != 'chat') {
|
||||||
common_log(LOG_DEBUG, 'Ignoring message of type ' . $pl['type'] . ' from ' . $pl['from']);
|
$this->log(LOG_WARNING, "Ignoring message of type ".$pl['type']." from $from.");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
$listener = $this->listener();
|
|
||||||
if (strtolower($listener) == strtolower($pl['from'])) {
|
if (mb_strlen($pl['body']) == 0) {
|
||||||
common_log(LOG_WARNING, 'Ignoring loop message.');
|
$this->log(LOG_WARNING, "Ignoring message with empty body from $from.");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Forwarded from another daemon for us to handle; this shouldn't
|
||||||
|
// happen any more but we might get some legacy items.
|
||||||
|
if ($this->is_self($from)) {
|
||||||
|
$this->log(LOG_INFO, "Got forwarded notice from self ($from).");
|
||||||
|
$from = $this->get_ofrom($pl);
|
||||||
|
$this->log(LOG_INFO, "Originally sent by $from.");
|
||||||
|
if (is_null($from) || $this->is_self($from)) {
|
||||||
|
$this->log(LOG_INFO, "Ignoring notice originally sent by $from.");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
common_log(LOG_INFO, 'Forwarding message from ' . $pl['from'] . ' to ' . $listener);
|
|
||||||
$this->conn->message($this->listener(), $pl['body'], 'chat', null, $this->ofrom($pl['from']));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
$user = $this->get_user($from);
|
||||||
* Build an <addresses> block with an ofrom entry for forwarded messages
|
|
||||||
*
|
|
||||||
* @param string $from Jabber ID of original sender
|
|
||||||
* @return string XML fragment
|
|
||||||
*/
|
|
||||||
protected function ofrom($from)
|
|
||||||
{
|
|
||||||
$address = "<addresses xmlns='http://jabber.org/protocol/address'>\n";
|
|
||||||
$address .= "<address type='ofrom' jid='$from' />\n";
|
|
||||||
$address .= "</addresses>\n";
|
|
||||||
return $address;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
// For common_current_user to work
|
||||||
* Build the complete JID of the XmppDaemon process which
|
global $_cur;
|
||||||
* handles primary XMPP input for this site.
|
$_cur = $user;
|
||||||
*
|
|
||||||
* @return string Jabber ID
|
if (!$user) {
|
||||||
*/
|
$this->from_site($from, 'Unknown user; go to ' .
|
||||||
protected function listener()
|
common_local_url('imsettings') .
|
||||||
{
|
' to add your address to your account');
|
||||||
if (common_config('xmpp', 'listener')) {
|
$this->log(LOG_WARNING, 'Message from unknown user ' . $from);
|
||||||
return common_config('xmpp', 'listener');
|
return;
|
||||||
|
}
|
||||||
|
if ($this->handle_command($user, $pl['body'])) {
|
||||||
|
$this->log(LOG_INFO, "Command message by $from handled.");
|
||||||
|
return;
|
||||||
|
} else if ($this->is_autoreply($pl['body'])) {
|
||||||
|
$this->log(LOG_INFO, 'Ignoring auto reply from ' . $from);
|
||||||
|
return;
|
||||||
|
} else if ($this->is_otr($pl['body'])) {
|
||||||
|
$this->log(LOG_INFO, 'Ignoring OTR from ' . $from);
|
||||||
|
return;
|
||||||
} else {
|
} else {
|
||||||
return jabber_daemon_address() . '/' . common_config('xmpp','resource') . 'daemon';
|
|
||||||
|
$this->log(LOG_INFO, 'Posting a notice from ' . $user->nickname);
|
||||||
|
|
||||||
|
$this->add_notice($user, $pl);
|
||||||
|
}
|
||||||
|
|
||||||
|
$user->free();
|
||||||
|
unset($user);
|
||||||
|
unset($_cur);
|
||||||
|
|
||||||
|
unset($pl['xml']);
|
||||||
|
$pl['xml'] = null;
|
||||||
|
|
||||||
|
$pl = null;
|
||||||
|
unset($pl);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
function is_self($from)
|
||||||
|
{
|
||||||
|
return preg_match('/^'.strtolower(jabber_daemon_address()).'/', strtolower($from));
|
||||||
|
}
|
||||||
|
|
||||||
|
function get_ofrom($pl)
|
||||||
|
{
|
||||||
|
$xml = $pl['xml'];
|
||||||
|
$addresses = $xml->sub('addresses');
|
||||||
|
if (!$addresses) {
|
||||||
|
$this->log(LOG_WARNING, 'Forwarded message without addresses');
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
$address = $addresses->sub('address');
|
||||||
|
if (!$address) {
|
||||||
|
$this->log(LOG_WARNING, 'Forwarded message without address');
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
if (!array_key_exists('type', $address->attrs)) {
|
||||||
|
$this->log(LOG_WARNING, 'No type for forwarded message');
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
$type = $address->attrs['type'];
|
||||||
|
if ($type != 'ofrom') {
|
||||||
|
$this->log(LOG_WARNING, 'Type of forwarded message is not ofrom');
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
if (!array_key_exists('jid', $address->attrs)) {
|
||||||
|
$this->log(LOG_WARNING, 'No jid for forwarded message');
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
$jid = $address->attrs['jid'];
|
||||||
|
if (!$jid) {
|
||||||
|
$this->log(LOG_WARNING, 'Could not get jid from address');
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
$this->log(LOG_DEBUG, 'Got message forwarded from jid ' . $jid);
|
||||||
|
return $jid;
|
||||||
|
}
|
||||||
|
|
||||||
|
function is_autoreply($txt)
|
||||||
|
{
|
||||||
|
if (preg_match('/[\[\(]?[Aa]uto[-\s]?[Rr]e(ply|sponse)[\]\)]/', $txt)) {
|
||||||
|
return true;
|
||||||
|
} else if (preg_match('/^System: Message wasn\'t delivered. Offline storage size was exceeded.$/', $txt)) {
|
||||||
|
return true;
|
||||||
|
} else {
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected function resource()
|
function is_otr($txt)
|
||||||
{
|
{
|
||||||
return 'queue' . posix_getpid(); // @fixme PIDs won't be host-unique
|
if (preg_match('/^\?OTR/', $txt)) {
|
||||||
|
return true;
|
||||||
|
} else {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function from_site($address, $msg)
|
||||||
|
{
|
||||||
|
$text = '['.common_config('site', 'name') . '] ' . $msg;
|
||||||
|
jabber_send_message($address, $text);
|
||||||
|
}
|
||||||
|
|
||||||
|
function handle_command($user, $body)
|
||||||
|
{
|
||||||
|
$inter = new CommandInterpreter();
|
||||||
|
$cmd = $inter->handle_command($user, $body);
|
||||||
|
if ($cmd) {
|
||||||
|
$chan = new XMPPChannel($this->conn);
|
||||||
|
$cmd->execute($chan);
|
||||||
|
return true;
|
||||||
|
} else {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function add_notice(&$user, &$pl)
|
||||||
|
{
|
||||||
|
$body = trim($pl['body']);
|
||||||
|
$content_shortened = common_shorten_links($body);
|
||||||
|
if (Notice::contentTooLong($content_shortened)) {
|
||||||
|
$from = jabber_normalize_jid($pl['from']);
|
||||||
|
$this->from_site($from, sprintf(_('Message too long - maximum is %1$d characters, you sent %2$d.'),
|
||||||
|
Notice::maxContent(),
|
||||||
|
mb_strlen($content_shortened)));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
$notice = Notice::saveNew($user->id, $content_shortened, 'xmpp');
|
||||||
|
} catch (Exception $e) {
|
||||||
|
$this->log(LOG_ERR, $e->getMessage());
|
||||||
|
$this->from_site($user->jabber, $e->getMessage());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
common_broadcast_notice($notice);
|
||||||
|
$this->log(LOG_INFO,
|
||||||
|
'Added notice ' . $notice->id . ' from user ' . $user->nickname);
|
||||||
|
$notice->free();
|
||||||
|
unset($notice);
|
||||||
|
}
|
||||||
|
|
||||||
|
function handle_presence(&$pl)
|
||||||
|
{
|
||||||
|
$from = jabber_normalize_jid($pl['from']);
|
||||||
|
switch ($pl['type']) {
|
||||||
|
case 'subscribe':
|
||||||
|
# We let anyone subscribe
|
||||||
|
$this->subscribed($from);
|
||||||
|
$this->log(LOG_INFO,
|
||||||
|
'Accepted subscription from ' . $from);
|
||||||
|
break;
|
||||||
|
case 'subscribed':
|
||||||
|
case 'unsubscribed':
|
||||||
|
case 'unsubscribe':
|
||||||
|
$this->log(LOG_INFO,
|
||||||
|
'Ignoring "' . $pl['type'] . '" from ' . $from);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
if (!$pl['type']) {
|
||||||
|
$user = User::staticGet('jabber', $from);
|
||||||
|
if (!$user) {
|
||||||
|
$this->log(LOG_WARNING, 'Presence from unknown user ' . $from);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if ($user->updatefrompresence) {
|
||||||
|
$this->log(LOG_INFO, 'Updating ' . $user->nickname .
|
||||||
|
' status from presence.');
|
||||||
|
$this->add_notice($user, $pl);
|
||||||
|
}
|
||||||
|
$user->free();
|
||||||
|
unset($user);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
unset($pl['xml']);
|
||||||
|
$pl['xml'] = null;
|
||||||
|
|
||||||
|
$pl = null;
|
||||||
|
unset($pl);
|
||||||
|
}
|
||||||
|
|
||||||
|
function log($level, $msg)
|
||||||
|
{
|
||||||
|
$text = 'XMPPDaemon('.$this->resource.'): '.$msg;
|
||||||
|
common_log($level, $text);
|
||||||
|
}
|
||||||
|
|
||||||
|
function subscribed($to)
|
||||||
|
{
|
||||||
|
jabber_special_presence('subscribed', $to);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
55
lib/xmppoutqueuehandler.php
Normal file
55
lib/xmppoutqueuehandler.php
Normal file
@ -0,0 +1,55 @@
|
|||||||
|
<?php
|
||||||
|
/*
|
||||||
|
* StatusNet - the distributed open-source microblogging tool
|
||||||
|
* Copyright (C) 2010, StatusNet, Inc.
|
||||||
|
*
|
||||||
|
* This program is free software: you can redistribute it and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License as published by
|
||||||
|
* the Free Software Foundation, either version 3 of the License, or
|
||||||
|
* (at your option) any later version.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU Affero General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Queue handler for pre-processed outgoing XMPP messages.
|
||||||
|
* Formatted XML stanzas will have been pushed into the queue
|
||||||
|
* via the Queued_XMPP connection proxy, probably from some
|
||||||
|
* other queue processor.
|
||||||
|
*
|
||||||
|
* Here, the XML stanzas are simply pulled out of the queue and
|
||||||
|
* pushed out over the wire; an XmppManager is needed to set up
|
||||||
|
* and maintain the actual server connection.
|
||||||
|
*
|
||||||
|
* This queue will be run via XmppDaemon rather than QueueDaemon.
|
||||||
|
*
|
||||||
|
* @author Brion Vibber <brion@status.net>
|
||||||
|
*/
|
||||||
|
class XmppOutQueueHandler extends QueueHandler
|
||||||
|
{
|
||||||
|
function transport() {
|
||||||
|
return 'xmppout';
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Take a previously-queued XMPP stanza and send it out ot the server.
|
||||||
|
* @param string $msg
|
||||||
|
* @return boolean true on success
|
||||||
|
*/
|
||||||
|
function handle($msg)
|
||||||
|
{
|
||||||
|
assert(is_string($msg));
|
||||||
|
|
||||||
|
$xmpp = XmppManager::get();
|
||||||
|
$ok = $xmpp->send($msg);
|
||||||
|
|
||||||
|
return $ok;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -32,14 +32,7 @@ class EnjitQueueHandler extends QueueHandler
|
|||||||
return 'enjit';
|
return 'enjit';
|
||||||
}
|
}
|
||||||
|
|
||||||
function start()
|
function handle($notice)
|
||||||
{
|
|
||||||
$this->log(LOG_INFO, "Starting EnjitQueueHandler");
|
|
||||||
$this->log(LOG_INFO, "Broadcasting to ".common_config('enjit', 'apiurl'));
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
function handle_notice($notice)
|
|
||||||
{
|
{
|
||||||
|
|
||||||
$profile = Profile::staticGet($notice->profile_id);
|
$profile = Profile::staticGet($notice->profile_id);
|
||||||
|
@ -28,7 +28,7 @@ class FacebookQueueHandler extends QueueHandler
|
|||||||
return 'facebook';
|
return 'facebook';
|
||||||
}
|
}
|
||||||
|
|
||||||
function handle_notice($notice)
|
function handle($notice)
|
||||||
{
|
{
|
||||||
if ($this->_isLocal($notice)) {
|
if ($this->_isLocal($notice)) {
|
||||||
return facebookBroadcastNotice($notice);
|
return facebookBroadcastNotice($notice);
|
||||||
|
@ -138,6 +138,9 @@ class RSSCloudPlugin extends Plugin
|
|||||||
case 'RSSCloudNotifier':
|
case 'RSSCloudNotifier':
|
||||||
include_once INSTALLDIR . '/plugins/RSSCloud/RSSCloudNotifier.php';
|
include_once INSTALLDIR . '/plugins/RSSCloud/RSSCloudNotifier.php';
|
||||||
return false;
|
return false;
|
||||||
|
case 'RSSCloudQueueHandler':
|
||||||
|
include_once INSTALLDIR . '/plugins/RSSCloud/RSSCloudQueueHandler.php';
|
||||||
|
return false;
|
||||||
case 'RSSCloudRequestNotifyAction':
|
case 'RSSCloudRequestNotifyAction':
|
||||||
case 'LoggingAggregatorAction':
|
case 'LoggingAggregatorAction':
|
||||||
include_once INSTALLDIR . '/plugins/RSSCloud/' .
|
include_once INSTALLDIR . '/plugins/RSSCloud/' .
|
||||||
@ -193,32 +196,6 @@ class RSSCloudPlugin extends Plugin
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* broadcast the message when not using queuehandler
|
|
||||||
*
|
|
||||||
* @param Notice &$notice the notice
|
|
||||||
* @param array $queue destination queue
|
|
||||||
*
|
|
||||||
* @return boolean hook return
|
|
||||||
*/
|
|
||||||
|
|
||||||
function onUnqueueHandleNotice(&$notice, $queue)
|
|
||||||
{
|
|
||||||
if (($queue == 'rsscloud') && ($this->_isLocal($notice))) {
|
|
||||||
|
|
||||||
common_debug('broadcasting rssCloud bound notice ' . $notice->id);
|
|
||||||
|
|
||||||
$profile = $notice->getProfile();
|
|
||||||
|
|
||||||
$notifier = new RSSCloudNotifier();
|
|
||||||
$notifier->notify($profile);
|
|
||||||
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Determine whether the notice was locally created
|
* Determine whether the notice was locally created
|
||||||
*
|
*
|
||||||
@ -261,19 +238,15 @@ class RSSCloudPlugin extends Plugin
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Add RSSCloudQueueHandler to the list of valid daemons to
|
* Register RSSCloud notice queue handler
|
||||||
* start
|
|
||||||
*
|
*
|
||||||
* @param array $daemons the list of daemons to run
|
* @param QueueManager $manager
|
||||||
*
|
*
|
||||||
* @return boolean hook return
|
* @return boolean hook return
|
||||||
*
|
|
||||||
*/
|
*/
|
||||||
|
function onEndInitializeQueueManager($manager)
|
||||||
function onGetValidDaemons($daemons)
|
|
||||||
{
|
{
|
||||||
array_push($daemons, INSTALLDIR .
|
$manager->connect('rsscloud', 'RSSCloudQueueHandler');
|
||||||
'/plugins/RSSCloud/RSSCloudQueueHandler.php');
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
52
plugins/RSSCloud/RSSCloudQueueHandler.php
Executable file → Normal file
52
plugins/RSSCloud/RSSCloudQueueHandler.php
Executable file → Normal file
@ -1,4 +1,3 @@
|
|||||||
#!/usr/bin/env php
|
|
||||||
<?php
|
<?php
|
||||||
/*
|
/*
|
||||||
* StatusNet - the distributed open-source microblogging tool
|
* StatusNet - the distributed open-source microblogging tool
|
||||||
@ -18,61 +17,20 @@
|
|||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
define('INSTALLDIR', realpath(dirname(__FILE__) . '/../..'));
|
if (!defined('STATUSNET') && !defined('LACONICA')) { exit(1); }
|
||||||
|
|
||||||
$shortoptions = 'i::';
|
|
||||||
$longoptions = array('id::');
|
|
||||||
|
|
||||||
$helptext = <<<END_OF_ENJIT_HELP
|
|
||||||
Daemon script for pushing new notices to RSSCloud subscribers.
|
|
||||||
|
|
||||||
-i --id Identity (default none)
|
|
||||||
|
|
||||||
END_OF_ENJIT_HELP;
|
|
||||||
|
|
||||||
require_once INSTALLDIR . '/scripts/commandline.inc';
|
|
||||||
require_once INSTALLDIR . '/lib/queuehandler.php';
|
|
||||||
require_once INSTALLDIR . '/plugins/RSSCloud/RSSCloudNotifier.php';
|
|
||||||
require_once INSTALLDIR . '/plugins/RSSCloud/RSSCloudSubscription.php';
|
|
||||||
|
|
||||||
class RSSCloudQueueHandler extends QueueHandler
|
class RSSCloudQueueHandler extends QueueHandler
|
||||||
{
|
{
|
||||||
var $notifier = null;
|
|
||||||
|
|
||||||
function transport()
|
function transport()
|
||||||
{
|
{
|
||||||
return 'rsscloud';
|
return 'rsscloud';
|
||||||
}
|
}
|
||||||
|
|
||||||
function start()
|
function handle($notice)
|
||||||
{
|
|
||||||
$this->log(LOG_INFO, "INITIALIZE");
|
|
||||||
$this->notifier = new RSSCloudNotifier();
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
function handle_notice($notice)
|
|
||||||
{
|
{
|
||||||
$profile = $notice->getProfile();
|
$profile = $notice->getProfile();
|
||||||
return $this->notifier->notify($profile);
|
$notifier = new RSSCloudNotifier();
|
||||||
|
return $notifier->notify($profile);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
function finish()
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
if (have_option('i')) {
|
|
||||||
$id = get_option_value('i');
|
|
||||||
} else if (have_option('--id')) {
|
|
||||||
$id = get_option_value('--id');
|
|
||||||
} else if (count($args) > 0) {
|
|
||||||
$id = $args[0];
|
|
||||||
} else {
|
|
||||||
$id = null;
|
|
||||||
}
|
|
||||||
|
|
||||||
$handler = new RSSCloudQueueHandler($id);
|
|
||||||
|
|
||||||
$handler->runOnce();
|
|
||||||
|
@ -28,7 +28,7 @@ class TwitterQueueHandler extends QueueHandler
|
|||||||
return 'twitter';
|
return 'twitter';
|
||||||
}
|
}
|
||||||
|
|
||||||
function handle_notice($notice)
|
function handle($notice)
|
||||||
{
|
{
|
||||||
return broadcast_twitter($notice);
|
return broadcast_twitter($notice);
|
||||||
}
|
}
|
||||||
|
@ -50,7 +50,7 @@ if (empty($notice)) {
|
|||||||
exit(1);
|
exit(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!$handler->handle_notice($notice)) {
|
if (!$handler->handle($notice)) {
|
||||||
print "Failed to handle notice id $noticeId on queue '$queue'.\n";
|
print "Failed to handle notice id $noticeId on queue '$queue'.\n";
|
||||||
exit(1);
|
exit(1);
|
||||||
}
|
}
|
||||||
|
@ -29,6 +29,8 @@ $longoptions = array('id=', 'foreground', 'all', 'threads=', 'skip-xmpp', 'xmpp-
|
|||||||
*
|
*
|
||||||
* Recognizes Linux and Mac OS X; others will return default of 1.
|
* Recognizes Linux and Mac OS X; others will return default of 1.
|
||||||
*
|
*
|
||||||
|
* @fixme move this to SpawningDaemon, but to get the default val for help
|
||||||
|
* text we seem to need it before loading infrastructure
|
||||||
* @return intval
|
* @return intval
|
||||||
*/
|
*/
|
||||||
function getProcessorCount()
|
function getProcessorCount()
|
||||||
@ -83,143 +85,29 @@ define('CLAIM_TIMEOUT', 1200);
|
|||||||
* We can then pass individual items through the QueueHandler subclasses
|
* We can then pass individual items through the QueueHandler subclasses
|
||||||
* they belong to.
|
* they belong to.
|
||||||
*/
|
*/
|
||||||
class QueueDaemon extends Daemon
|
class QueueDaemon extends SpawningDaemon
|
||||||
{
|
{
|
||||||
protected $allsites;
|
protected $allsites = false;
|
||||||
protected $threads=1;
|
|
||||||
|
|
||||||
function __construct($id=null, $daemonize=true, $threads=1, $allsites=false)
|
function __construct($id=null, $daemonize=true, $threads=1, $allsites=false)
|
||||||
{
|
{
|
||||||
parent::__construct($daemonize);
|
parent::__construct($id, $daemonize, $threads);
|
||||||
|
|
||||||
if ($id) {
|
|
||||||
$this->set_id($id);
|
|
||||||
}
|
|
||||||
$this->all = $allsites;
|
$this->all = $allsites;
|
||||||
$this->threads = $threads;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* How many seconds a polling-based queue manager should wait between
|
|
||||||
* checks for new items to handle.
|
|
||||||
*
|
|
||||||
* Defaults to 60 seconds; override to speed up or slow down.
|
|
||||||
*
|
|
||||||
* @return int timeout in seconds
|
|
||||||
*/
|
|
||||||
function timeout()
|
|
||||||
{
|
|
||||||
return 60;
|
|
||||||
}
|
|
||||||
|
|
||||||
function name()
|
|
||||||
{
|
|
||||||
return strtolower(get_class($this).'.'.$this->get_id());
|
|
||||||
}
|
|
||||||
|
|
||||||
function run()
|
|
||||||
{
|
|
||||||
if ($this->threads > 1) {
|
|
||||||
return $this->runThreads();
|
|
||||||
} else {
|
|
||||||
return $this->runLoop();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
function runThreads()
|
|
||||||
{
|
|
||||||
$children = array();
|
|
||||||
for ($i = 1; $i <= $this->threads; $i++) {
|
|
||||||
$pid = pcntl_fork();
|
|
||||||
if ($pid < 0) {
|
|
||||||
print "Couldn't fork for thread $i; aborting\n";
|
|
||||||
exit(1);
|
|
||||||
} else if ($pid == 0) {
|
|
||||||
$this->runChild($i);
|
|
||||||
exit(0);
|
|
||||||
} else {
|
|
||||||
$this->log(LOG_INFO, "Spawned thread $i as pid $pid");
|
|
||||||
$children[$i] = $pid;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
$this->log(LOG_INFO, "Waiting for children to complete.");
|
|
||||||
while (count($children) > 0) {
|
|
||||||
$status = null;
|
|
||||||
$pid = pcntl_wait($status);
|
|
||||||
if ($pid > 0) {
|
|
||||||
$i = array_search($pid, $children);
|
|
||||||
if ($i === false) {
|
|
||||||
$this->log(LOG_ERR, "Unrecognized child pid $pid exited!");
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
unset($children[$i]);
|
|
||||||
$this->log(LOG_INFO, "Thread $i pid $pid exited.");
|
|
||||||
|
|
||||||
$pid = pcntl_fork();
|
|
||||||
if ($pid < 0) {
|
|
||||||
print "Couldn't fork to respawn thread $i; aborting thread.\n";
|
|
||||||
} else if ($pid == 0) {
|
|
||||||
$this->runChild($i);
|
|
||||||
exit(0);
|
|
||||||
} else {
|
|
||||||
$this->log(LOG_INFO, "Respawned thread $i as pid $pid");
|
|
||||||
$children[$i] = $pid;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
$this->log(LOG_INFO, "All child processes complete.");
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
function runChild($thread)
|
|
||||||
{
|
|
||||||
$this->set_id($this->get_id() . "." . $thread);
|
|
||||||
$this->resetDb();
|
|
||||||
$this->runLoop();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Reconnect to the database for each child process,
|
|
||||||
* or they'll get very confused trying to use the
|
|
||||||
* same socket.
|
|
||||||
*/
|
|
||||||
function resetDb()
|
|
||||||
{
|
|
||||||
// @fixme do we need to explicitly open the db too
|
|
||||||
// or is this implied?
|
|
||||||
global $_DB_DATAOBJECT;
|
|
||||||
unset($_DB_DATAOBJECT['CONNECTIONS']);
|
|
||||||
|
|
||||||
// Reconnect main memcached, or threads will stomp on
|
|
||||||
// each other and corrupt their requests.
|
|
||||||
$cache = common_memcache();
|
|
||||||
if ($cache) {
|
|
||||||
$cache->reconnect();
|
|
||||||
}
|
|
||||||
|
|
||||||
// Also reconnect memcached for status_network table.
|
|
||||||
if (!empty(Status_network::$cache)) {
|
|
||||||
Status_network::$cache->close();
|
|
||||||
Status_network::$cache = null;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Setup and start of run loop for this queue handler as a daemon.
|
* Setup and start of run loop for this queue handler as a daemon.
|
||||||
* Most of the heavy lifting is passed on to the QueueManager's service()
|
* Most of the heavy lifting is passed on to the QueueManager's service()
|
||||||
* method, which passes control on to the QueueHandler's handle_notice()
|
* method, which passes control on to the QueueHandler's handle()
|
||||||
* method for each notice that comes in on the queue.
|
* method for each item that comes in on the queue.
|
||||||
*
|
|
||||||
* Most of the time this won't need to be overridden in a subclass.
|
|
||||||
*
|
*
|
||||||
* @return boolean true on success, false on failure
|
* @return boolean true on success, false on failure
|
||||||
*/
|
*/
|
||||||
function runLoop()
|
function runThread()
|
||||||
{
|
{
|
||||||
$this->log(LOG_INFO, 'checking for queued notices');
|
$this->log(LOG_INFO, 'checking for queued notices');
|
||||||
|
|
||||||
$master = new IoMaster($this->get_id());
|
$master = new QueueMaster($this->get_id());
|
||||||
$master->init($this->all);
|
$master->init($this->all);
|
||||||
$master->service();
|
$master->service();
|
||||||
|
|
||||||
@ -229,10 +117,25 @@ class QueueDaemon extends Daemon
|
|||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
function log($level, $msg)
|
class QueueMaster extends IoMaster
|
||||||
{
|
{
|
||||||
common_log($level, get_class($this) . ' ('. $this->get_id() .'): '.$msg);
|
/**
|
||||||
|
* Initialize IoManagers for the currently configured site
|
||||||
|
* which are appropriate to this instance.
|
||||||
|
*/
|
||||||
|
function initManagers()
|
||||||
|
{
|
||||||
|
$classes = array();
|
||||||
|
if (Event::handle('StartQueueDaemonIoManagers', array(&$classes))) {
|
||||||
|
$classes[] = 'QueueManager';
|
||||||
|
}
|
||||||
|
Event::handle('EndQueueDaemonIoManagers', array(&$classes));
|
||||||
|
|
||||||
|
foreach ($classes as $class) {
|
||||||
|
$this->instantiate($class);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -33,348 +33,46 @@ END_OF_XMPP_HELP;
|
|||||||
|
|
||||||
require_once INSTALLDIR.'/scripts/commandline.inc';
|
require_once INSTALLDIR.'/scripts/commandline.inc';
|
||||||
|
|
||||||
require_once INSTALLDIR . '/lib/common.php';
|
|
||||||
require_once INSTALLDIR . '/lib/jabber.php';
|
require_once INSTALLDIR . '/lib/jabber.php';
|
||||||
require_once INSTALLDIR . '/lib/daemon.php';
|
|
||||||
|
|
||||||
# This is kind of clunky; we create a class to call the global functions
|
class XMPPDaemon extends SpawningDaemon
|
||||||
# in jabber.php, which create a new XMPP class. A more elegant (?) solution
|
|
||||||
# might be to use make this a subclass of XMPP.
|
|
||||||
|
|
||||||
class XMPPDaemon extends Daemon
|
|
||||||
{
|
{
|
||||||
function __construct($resource=null, $daemonize=true)
|
function __construct($id=null, $daemonize=true, $threads=1)
|
||||||
{
|
{
|
||||||
parent::__construct($daemonize);
|
if ($threads != 1) {
|
||||||
|
// This should never happen. :)
|
||||||
|
throw new Exception("XMPPDaemon can must run single-threaded");
|
||||||
|
}
|
||||||
|
parent::__construct($id, $daemonize, $threads);
|
||||||
|
}
|
||||||
|
|
||||||
static $attrs = array('server', 'port', 'user', 'password', 'host');
|
function runThread()
|
||||||
|
|
||||||
foreach ($attrs as $attr)
|
|
||||||
{
|
{
|
||||||
$this->$attr = common_config('xmpp', $attr);
|
common_log(LOG_INFO, 'Waiting to listen to XMPP and queues');
|
||||||
}
|
|
||||||
|
|
||||||
if ($resource) {
|
$master = new XmppMaster($this->get_id());
|
||||||
$this->resource = $resource . 'daemon';
|
$master->init();
|
||||||
} else {
|
$master->service();
|
||||||
$this->resource = common_config('xmpp', 'resource') . 'daemon';
|
|
||||||
}
|
|
||||||
|
|
||||||
$this->jid = $this->user.'@'.$this->server.'/'.$this->resource;
|
common_log(LOG_INFO, 'terminating normally');
|
||||||
|
|
||||||
$this->log(LOG_INFO, "INITIALIZE XMPPDaemon {$this->jid}");
|
|
||||||
}
|
|
||||||
|
|
||||||
function connect()
|
|
||||||
{
|
|
||||||
$connect_to = ($this->host) ? $this->host : $this->server;
|
|
||||||
|
|
||||||
$this->log(LOG_INFO, "Connecting to $connect_to on port $this->port");
|
|
||||||
|
|
||||||
$this->conn = jabber_connect($this->resource);
|
|
||||||
|
|
||||||
if (!$this->conn) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
$this->log(LOG_INFO, "Connected");
|
|
||||||
|
|
||||||
$this->conn->setReconnectTimeout(600);
|
|
||||||
|
|
||||||
$this->log(LOG_INFO, "Sending initial presence.");
|
|
||||||
|
|
||||||
jabber_send_presence("Send me a message to post a notice", 'available',
|
|
||||||
null, 'available', 100);
|
|
||||||
|
|
||||||
$this->log(LOG_INFO, "Done connecting.");
|
|
||||||
|
|
||||||
return !$this->conn->isDisconnected();
|
|
||||||
}
|
|
||||||
|
|
||||||
function name()
|
|
||||||
{
|
|
||||||
return strtolower('xmppdaemon.'.$this->resource);
|
|
||||||
}
|
|
||||||
|
|
||||||
function run()
|
|
||||||
{
|
|
||||||
if ($this->connect()) {
|
|
||||||
|
|
||||||
$this->log(LOG_DEBUG, "Initializing stanza handlers.");
|
|
||||||
|
|
||||||
$this->conn->addEventHandler('message', 'handle_message', $this);
|
|
||||||
$this->conn->addEventHandler('presence', 'handle_presence', $this);
|
|
||||||
$this->conn->addEventHandler('reconnect', 'handle_reconnect', $this);
|
|
||||||
|
|
||||||
$this->log(LOG_DEBUG, "Beginning processing loop.");
|
|
||||||
|
|
||||||
while ($this->conn->processTime(60)) {
|
|
||||||
$this->sendPing();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
function sendPing()
|
|
||||||
{
|
|
||||||
if (!isset($this->pingid)) {
|
|
||||||
$this->pingid = 0;
|
|
||||||
} else {
|
|
||||||
$this->pingid++;
|
|
||||||
}
|
|
||||||
|
|
||||||
$this->log(LOG_DEBUG, "Sending ping #{$this->pingid}");
|
|
||||||
|
|
||||||
$this->conn->send("<iq from='{$this->jid}' to='{$this->server}' id='ping_{$this->pingid}' type='get'><ping xmlns='urn:xmpp:ping'/></iq>");
|
|
||||||
}
|
|
||||||
|
|
||||||
function handle_reconnect(&$pl)
|
|
||||||
{
|
|
||||||
$this->log(LOG_DEBUG, "Got reconnection callback.");
|
|
||||||
$this->conn->processUntil('session_start');
|
|
||||||
$this->log(LOG_DEBUG, "Sending reconnection presence.");
|
|
||||||
$this->conn->presence('Send me a message to post a notice', 'available', null, 'available', 100);
|
|
||||||
unset($pl['xml']);
|
|
||||||
$pl['xml'] = null;
|
|
||||||
|
|
||||||
$pl = null;
|
|
||||||
unset($pl);
|
|
||||||
}
|
|
||||||
|
|
||||||
function get_user($from)
|
|
||||||
{
|
|
||||||
$user = User::staticGet('jabber', jabber_normalize_jid($from));
|
|
||||||
return $user;
|
|
||||||
}
|
|
||||||
|
|
||||||
function handle_message(&$pl)
|
|
||||||
{
|
|
||||||
$this->log(LOG_DEBUG, "Received message: " . str_replace("\n", " ", var_export($pl, true)));
|
|
||||||
$from = jabber_normalize_jid($pl['from']);
|
|
||||||
|
|
||||||
if ($pl['type'] != 'chat') {
|
|
||||||
$this->log(LOG_WARNING, "Ignoring message of type ".$pl['type']." from $from.");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (mb_strlen($pl['body']) == 0) {
|
|
||||||
$this->log(LOG_WARNING, "Ignoring message with empty body from $from.");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
# Forwarded from another daemon (probably a broadcaster) for
|
|
||||||
# us to handle
|
|
||||||
|
|
||||||
if ($this->is_self($from)) {
|
|
||||||
$this->log(LOG_INFO, "Got forwarded notice from self ($from).");
|
|
||||||
$from = $this->get_ofrom($pl);
|
|
||||||
$this->log(LOG_INFO, "Originally sent by $from.");
|
|
||||||
if (is_null($from) || $this->is_self($from)) {
|
|
||||||
$this->log(LOG_INFO, "Ignoring notice originally sent by $from.");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
$user = $this->get_user($from);
|
|
||||||
|
|
||||||
// For common_current_user to work
|
|
||||||
global $_cur;
|
|
||||||
$_cur = $user;
|
|
||||||
|
|
||||||
if (!$user) {
|
|
||||||
$this->from_site($from, 'Unknown user; go to ' .
|
|
||||||
common_local_url('imsettings') .
|
|
||||||
' to add your address to your account');
|
|
||||||
$this->log(LOG_WARNING, 'Message from unknown user ' . $from);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if ($this->handle_command($user, $pl['body'])) {
|
|
||||||
$this->log(LOG_INFO, "Command message by $from handled.");
|
|
||||||
return;
|
|
||||||
} else if ($this->is_autoreply($pl['body'])) {
|
|
||||||
$this->log(LOG_INFO, 'Ignoring auto reply from ' . $from);
|
|
||||||
return;
|
|
||||||
} else if ($this->is_otr($pl['body'])) {
|
|
||||||
$this->log(LOG_INFO, 'Ignoring OTR from ' . $from);
|
|
||||||
return;
|
|
||||||
} else {
|
|
||||||
|
|
||||||
$this->log(LOG_INFO, 'Posting a notice from ' . $user->nickname);
|
|
||||||
|
|
||||||
$this->add_notice($user, $pl);
|
|
||||||
}
|
|
||||||
|
|
||||||
$user->free();
|
|
||||||
unset($user);
|
|
||||||
unset($_cur);
|
|
||||||
|
|
||||||
unset($pl['xml']);
|
|
||||||
$pl['xml'] = null;
|
|
||||||
|
|
||||||
$pl = null;
|
|
||||||
unset($pl);
|
|
||||||
}
|
|
||||||
|
|
||||||
function is_self($from)
|
|
||||||
{
|
|
||||||
return preg_match('/^'.strtolower(jabber_daemon_address()).'/', strtolower($from));
|
|
||||||
}
|
|
||||||
|
|
||||||
function get_ofrom($pl)
|
|
||||||
{
|
|
||||||
$xml = $pl['xml'];
|
|
||||||
$addresses = $xml->sub('addresses');
|
|
||||||
if (!$addresses) {
|
|
||||||
$this->log(LOG_WARNING, 'Forwarded message without addresses');
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
$address = $addresses->sub('address');
|
|
||||||
if (!$address) {
|
|
||||||
$this->log(LOG_WARNING, 'Forwarded message without address');
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
if (!array_key_exists('type', $address->attrs)) {
|
|
||||||
$this->log(LOG_WARNING, 'No type for forwarded message');
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
$type = $address->attrs['type'];
|
|
||||||
if ($type != 'ofrom') {
|
|
||||||
$this->log(LOG_WARNING, 'Type of forwarded message is not ofrom');
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
if (!array_key_exists('jid', $address->attrs)) {
|
|
||||||
$this->log(LOG_WARNING, 'No jid for forwarded message');
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
$jid = $address->attrs['jid'];
|
|
||||||
if (!$jid) {
|
|
||||||
$this->log(LOG_WARNING, 'Could not get jid from address');
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
$this->log(LOG_DEBUG, 'Got message forwarded from jid ' . $jid);
|
|
||||||
return $jid;
|
|
||||||
}
|
|
||||||
|
|
||||||
function is_autoreply($txt)
|
|
||||||
{
|
|
||||||
if (preg_match('/[\[\(]?[Aa]uto[-\s]?[Rr]e(ply|sponse)[\]\)]/', $txt)) {
|
|
||||||
return true;
|
return true;
|
||||||
} else if (preg_match('/^System: Message wasn\'t delivered. Offline storage size was exceeded.$/', $txt)) {
|
|
||||||
return true;
|
|
||||||
} else {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
function is_otr($txt)
|
}
|
||||||
|
|
||||||
|
class XmppMaster extends IoMaster
|
||||||
{
|
{
|
||||||
if (preg_match('/^\?OTR/', $txt)) {
|
/**
|
||||||
return true;
|
* Initialize IoManagers for the currently configured site
|
||||||
} else {
|
* which are appropriate to this instance.
|
||||||
return false;
|
*/
|
||||||
}
|
function initManagers()
|
||||||
}
|
|
||||||
|
|
||||||
function from_site($address, $msg)
|
|
||||||
{
|
{
|
||||||
$text = '['.common_config('site', 'name') . '] ' . $msg;
|
// @fixme right now there's a hack in QueueManager to determine
|
||||||
jabber_send_message($address, $text);
|
// which queues to subscribe to based on the master class.
|
||||||
}
|
$this->instantiate('QueueManager');
|
||||||
|
$this->instantiate('XmppManager');
|
||||||
function handle_command($user, $body)
|
|
||||||
{
|
|
||||||
$inter = new CommandInterpreter();
|
|
||||||
$cmd = $inter->handle_command($user, $body);
|
|
||||||
if ($cmd) {
|
|
||||||
$chan = new XMPPChannel($this->conn);
|
|
||||||
$cmd->execute($chan);
|
|
||||||
return true;
|
|
||||||
} else {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
function add_notice(&$user, &$pl)
|
|
||||||
{
|
|
||||||
$body = trim($pl['body']);
|
|
||||||
$content_shortened = common_shorten_links($body);
|
|
||||||
if (Notice::contentTooLong($content_shortened)) {
|
|
||||||
$from = jabber_normalize_jid($pl['from']);
|
|
||||||
$this->from_site($from, sprintf(_('Message too long - maximum is %1$d characters, you sent %2$d.'),
|
|
||||||
Notice::maxContent(),
|
|
||||||
mb_strlen($content_shortened)));
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
$notice = Notice::saveNew($user->id, $content_shortened, 'xmpp');
|
|
||||||
} catch (Exception $e) {
|
|
||||||
$this->log(LOG_ERR, $e->getMessage());
|
|
||||||
$this->from_site($user->jabber, $e->getMessage());
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
common_broadcast_notice($notice);
|
|
||||||
$this->log(LOG_INFO,
|
|
||||||
'Added notice ' . $notice->id . ' from user ' . $user->nickname);
|
|
||||||
$notice->free();
|
|
||||||
unset($notice);
|
|
||||||
}
|
|
||||||
|
|
||||||
function handle_presence(&$pl)
|
|
||||||
{
|
|
||||||
$from = jabber_normalize_jid($pl['from']);
|
|
||||||
switch ($pl['type']) {
|
|
||||||
case 'subscribe':
|
|
||||||
# We let anyone subscribe
|
|
||||||
$this->subscribed($from);
|
|
||||||
$this->log(LOG_INFO,
|
|
||||||
'Accepted subscription from ' . $from);
|
|
||||||
break;
|
|
||||||
case 'subscribed':
|
|
||||||
case 'unsubscribed':
|
|
||||||
case 'unsubscribe':
|
|
||||||
$this->log(LOG_INFO,
|
|
||||||
'Ignoring "' . $pl['type'] . '" from ' . $from);
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
if (!$pl['type']) {
|
|
||||||
$user = User::staticGet('jabber', $from);
|
|
||||||
if (!$user) {
|
|
||||||
$this->log(LOG_WARNING, 'Presence from unknown user ' . $from);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if ($user->updatefrompresence) {
|
|
||||||
$this->log(LOG_INFO, 'Updating ' . $user->nickname .
|
|
||||||
' status from presence.');
|
|
||||||
$this->add_notice($user, $pl);
|
|
||||||
}
|
|
||||||
$user->free();
|
|
||||||
unset($user);
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
unset($pl['xml']);
|
|
||||||
$pl['xml'] = null;
|
|
||||||
|
|
||||||
$pl = null;
|
|
||||||
unset($pl);
|
|
||||||
}
|
|
||||||
|
|
||||||
function log($level, $msg)
|
|
||||||
{
|
|
||||||
$text = 'XMPPDaemon('.$this->resource.'): '.$msg;
|
|
||||||
common_log($level, $text);
|
|
||||||
if (!$this->daemonize)
|
|
||||||
{
|
|
||||||
$line = common_log_line($level, $text);
|
|
||||||
echo $line;
|
|
||||||
echo "\n";
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
function subscribed($to)
|
|
||||||
{
|
|
||||||
jabber_special_presence('subscribed', $to);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user