Only enqueue inbox-dependent transports after inboxes have been filled

This commit is contained in:
Evan Prodromou 2009-05-28 18:19:22 -04:00
parent 047038959c
commit 3532cd0490
3 changed files with 46 additions and 20 deletions

View File

@ -195,12 +195,12 @@ class Notice extends Memcached_DataObject
$notice->saveReplies(); $notice->saveReplies();
$notice->saveTags(); $notice->saveTags();
$notice->saveGroups();
if (common_config('queue', 'enabled')) { if (common_config('queue', 'enabled')) {
$notice->addToAuthorInbox(); $notice->addToAuthorInbox();
} else { } else {
$notice->addToInboxes(); $notice->addToInboxes();
$notice->saveGroups();
} }
$notice->query('COMMIT'); $notice->query('COMMIT');

View File

@ -879,7 +879,27 @@ function common_broadcast_notice($notice, $remote=false)
function common_enqueue_notice($notice) function common_enqueue_notice($notice)
{ {
$transports = array('omb', 'sms', 'twitter', 'facebook', 'ping'); $transports = array('twitter', 'facebook', 'ping');
// If inboxes are enabled, wait till inboxes are filled
// before doing inbox-dependent broadcasts
if (common_config('inboxes', 'enabled') === true ||
common_config('inboxes', 'enabled') === 'transitional') {
$transports[] = 'inbox';
} else {
$transports = array_merge($transports, common_post_inbox_transports());
}
foreach ($transports as $transport) {
common_enqueue_notice_transport($notice, $transport);
}
return $result;
}
function common_post_inbox_transports()
{
$transports = array('omb', 'sms');
if (common_config('xmpp', 'enabled')) { if (common_config('xmpp', 'enabled')) {
$transports = array_merge($transports, array('jabber', 'public')); $transports = array_merge($transports, array('jabber', 'public'));
@ -890,25 +910,23 @@ function common_enqueue_notice($notice)
$transports[] = 'memcache'; $transports[] = 'memcache';
} }
if (common_config('inboxes', 'enabled') === true || return $transports;
common_config('inboxes', 'enabled') === 'transitional') { }
$transports[] = 'inbox';
}
foreach ($transports as $transport) { function common_enqueue_notice_transport($notice, $transport)
$qi = new Queue_item(); {
$qi->notice_id = $notice->id; $qi = new Queue_item();
$qi->transport = $transport; $qi->notice_id = $notice->id;
$qi->created = $notice->created; $qi->transport = $transport;
$result = $qi->insert(); $qi->created = $notice->created;
if (!$result) { $result = $qi->insert();
$last_error = &PEAR::getStaticProperty('DB_DataObject','lastError'); if (!$result) {
common_log(LOG_ERR, 'DB error inserting queue item: ' . $last_error->message); $last_error = &PEAR::getStaticProperty('DB_DataObject','lastError');
return false; common_log(LOG_ERR, 'DB error inserting queue item: ' . $last_error->message);
} throw new ServerException('DB error inserting queue item: ' . $last_error->message);
common_log(LOG_DEBUG, 'complete queueing notice ID = ' . $notice->id . ' for ' . $transport);
} }
return $result; common_log(LOG_DEBUG, 'complete queueing notice ID = ' . $notice->id . ' for ' . $transport);
return true;
} }
function common_real_broadcast($notice, $remote=false) function common_real_broadcast($notice, $remote=false)

View File

@ -41,7 +41,7 @@ class InboxQueueHandler extends QueueHandler
} }
function start() { function start() {
$this->log(LOG_INFO, "INITIALIZE"); $this->log(LOG_INFO, "Initialize inbox queue handler");
return true; return true;
} }
@ -49,11 +49,19 @@ class InboxQueueHandler extends QueueHandler
{ {
$this->log(LOG_INFO, "Distributing notice to inboxes for $notice->id"); $this->log(LOG_INFO, "Distributing notice to inboxes for $notice->id");
$notice->addToInboxes(); $notice->addToInboxes();
$notice->saveGroups();
$notice->blowSubsCache(); $notice->blowSubsCache();
$transports = common_post_inbox_transports();
foreach ($transports as $transport) {
common_enqueue_notice_transport($notice, $transport);
}
return true; return true;
} }
function finish() { function finish() {
$this->log(LOG_INFO, "Terminating inbox queue handler");
} }
} }