Offload inbox updates to a queue handler to speed up posting online
Moved much of the writing that happens when posting a notice to a new queuehandler, distribqueuehandler. This updates tags, groups, replies and inboxes at queue time (or at Web time, if queues are disabled). To make this work well, I had to break up the monolithic Notice::blowCaches() and make cache blowing happen closer to where data is updated. Squashed commit of the following: commit 5257626c62750ac4ac1db0ce2b71410c5711cfa3 Author: Evan Prodromou <evan@status.net> Date: Mon Jan 25 14:56:41 2010 -0500 slightly better handling of blowing tag memory cache commit 8a22a3cdf6ec28685da129a0313e7b2a0837c9ef Author: Evan Prodromou <evan@status.net> Date: Mon Jan 25 01:42:56 2010 -0500 change 'distribute' to 'distrib' so not too long for dbqueue commit 7a063315b0f7fad27cb6fbd2bdd74e253af83e4f Author: Evan Prodromou <evan@status.net> Date: Mon Jan 25 01:39:15 2010 -0500 change handle_notice() to handle() in distributqueuehandler commit 1a39ccd28b9994137d7bfd21bb4f230546938e77 Author: Evan Prodromou <evan@status.net> Date: Mon Jan 25 16:05:25 2010 -0500 error with queuemanager commit e6b3bb93f305cfd2de71a6340b8aa6fb890049b7 Author: Evan Prodromou <evan@status.net> Date: Mon Jan 25 01:11:34 2010 -0500 Blow memcache at different point rather than one big function for Notice class commit 94d557cdc016187d1d0647ae1794cd94d6fb8ac8 Author: Evan Prodromou <evan@status.net> Date: Mon Jan 25 00:48:44 2010 -0500 Blow memcache at different point rather than one big function for Notice class commit 1c781dd08c88a35dafc5c01230b4872fd6b95182 Author: Evan Prodromou <evan@status.net> Date: Wed Jan 20 08:54:18 2010 -0500 move broadcasting and distributing to new queuehandler commit da3e46d26b84e4f028f34a13fd2ee373e4c1b954 Author: Evan Prodromou <evan@status.net> Date: Wed Jan 20 08:53:12 2010 -0500 Move distribution of notices to new distribute queue handler
This commit is contained in:
parent
f3beed6889
commit
e26a843caf
@ -112,7 +112,7 @@ class ApiStatusesRetweetAction extends ApiAuthAction
|
|||||||
|
|
||||||
$repeat = $this->original->repeat($this->user->id, $this->source);
|
$repeat = $this->original->repeat($this->user->id, $this->source);
|
||||||
|
|
||||||
common_broadcast_notice($repeat);
|
|
||||||
|
|
||||||
$this->showNotice($repeat);
|
$this->showNotice($repeat);
|
||||||
}
|
}
|
||||||
|
@ -250,7 +250,7 @@ class ApiStatusesUpdateAction extends ApiAuthAction
|
|||||||
$upload->attachToNotice($this->notice);
|
$upload->attachToNotice($this->notice);
|
||||||
}
|
}
|
||||||
|
|
||||||
common_broadcast_notice($this->notice);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
$this->showNotice();
|
$this->showNotice();
|
||||||
|
@ -201,7 +201,7 @@ class NewnoticeAction extends Action
|
|||||||
$upload->attachToNotice($notice);
|
$upload->attachToNotice($notice);
|
||||||
}
|
}
|
||||||
|
|
||||||
common_broadcast_notice($notice);
|
|
||||||
|
|
||||||
if ($this->boolean('ajax')) {
|
if ($this->boolean('ajax')) {
|
||||||
header('Content-Type: text/xml;charset=utf-8');
|
header('Content-Type: text/xml;charset=utf-8');
|
||||||
|
@ -106,7 +106,7 @@ class RepeatAction extends Action
|
|||||||
{
|
{
|
||||||
$repeat = $this->notice->repeat($this->user->id, 'web');
|
$repeat = $this->notice->repeat($this->user->id, 'web');
|
||||||
|
|
||||||
common_broadcast_notice($repeat);
|
|
||||||
|
|
||||||
if ($this->boolean('ajax')) {
|
if ($this->boolean('ajax')) {
|
||||||
$this->startHTML('text/xml;charset=utf-8');
|
$this->startHTML('text/xml;charset=utf-8');
|
||||||
|
@ -120,11 +120,7 @@ class Inbox extends Memcached_DataObject
|
|||||||
$notice_id, $user_id));
|
$notice_id, $user_id));
|
||||||
|
|
||||||
if ($result) {
|
if ($result) {
|
||||||
$c = self::memcache();
|
self::blow('inbox:user_id:%d', $user_id);
|
||||||
|
|
||||||
if (!empty($c)) {
|
|
||||||
$c->delete(self::cacheKey('inbox', 'user_id', $user_id));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return $result;
|
return $result;
|
||||||
|
@ -425,4 +425,23 @@ class Memcached_DataObject extends DB_DataObject
|
|||||||
|
|
||||||
return $dsn;
|
return $dsn;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static function blow()
|
||||||
|
{
|
||||||
|
$c = self::memcache();
|
||||||
|
|
||||||
|
if (empty($c)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
$args = func_get_args();
|
||||||
|
|
||||||
|
$format = array_shift($args);
|
||||||
|
|
||||||
|
$keyPart = vsprintf($format, $args);
|
||||||
|
|
||||||
|
$cacheKey = common_cache_key($keyPart);
|
||||||
|
|
||||||
|
return $c->delete($cacheKey);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -94,10 +94,6 @@ class Notice extends Memcached_DataObject
|
|||||||
|
|
||||||
function delete()
|
function delete()
|
||||||
{
|
{
|
||||||
$this->blowCaches(true);
|
|
||||||
$this->blowFavesCache(true);
|
|
||||||
$this->blowSubsCache(true);
|
|
||||||
|
|
||||||
// For auditing purposes, save a record that the notice
|
// For auditing purposes, save a record that the notice
|
||||||
// was deleted.
|
// was deleted.
|
||||||
|
|
||||||
@ -109,31 +105,20 @@ class Notice extends Memcached_DataObject
|
|||||||
$deleted->created = $this->created;
|
$deleted->created = $this->created;
|
||||||
$deleted->deleted = common_sql_now();
|
$deleted->deleted = common_sql_now();
|
||||||
|
|
||||||
$this->query('BEGIN');
|
|
||||||
|
|
||||||
$deleted->insert();
|
$deleted->insert();
|
||||||
|
|
||||||
//Null any notices that are replies to this notice
|
// Clear related records
|
||||||
$this->query(sprintf("UPDATE notice set reply_to = null WHERE reply_to = %d", $this->id));
|
|
||||||
|
|
||||||
//Null any notices that are repeats of this notice
|
$this->clearReplies();
|
||||||
//XXX: probably need to uncache these, too
|
$this->clearRepeats();
|
||||||
|
$this->clearFaves();
|
||||||
|
$this->clearTags();
|
||||||
|
$this->clearGroupInboxes();
|
||||||
|
|
||||||
$this->query(sprintf("UPDATE notice set repeat_of = null WHERE repeat_of = %d", $this->id));
|
// NOTE: we don't clear inboxes
|
||||||
|
// NOTE: we don't clear queue items
|
||||||
|
|
||||||
$related = array('Reply',
|
|
||||||
'Fave',
|
|
||||||
'Notice_tag',
|
|
||||||
'Group_inbox',
|
|
||||||
'Queue_item');
|
|
||||||
|
|
||||||
foreach ($related as $cls) {
|
|
||||||
$inst = new $cls();
|
|
||||||
$inst->notice_id = $this->id;
|
|
||||||
$inst->delete();
|
|
||||||
}
|
|
||||||
$result = parent::delete();
|
$result = parent::delete();
|
||||||
$this->query('COMMIT');
|
|
||||||
}
|
}
|
||||||
|
|
||||||
function saveTags()
|
function saveTags()
|
||||||
@ -155,6 +140,7 @@ class Notice extends Memcached_DataObject
|
|||||||
foreach(array_unique($hashtags) as $hashtag) {
|
foreach(array_unique($hashtags) as $hashtag) {
|
||||||
/* elide characters we don't want in the tag */
|
/* elide characters we don't want in the tag */
|
||||||
$this->saveTag($hashtag);
|
$this->saveTag($hashtag);
|
||||||
|
self::blow('profile:notice_ids_tagged:%d:%s', $this->profile_id, $tag->tag);
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
@ -172,6 +158,9 @@ class Notice extends Memcached_DataObject
|
|||||||
$last_error->message));
|
$last_error->message));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// if it's saved, blow its cache
|
||||||
|
$tag->blowCache(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -331,29 +320,45 @@ class Notice extends Memcached_DataObject
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// XXX: do we need to change this for remote users?
|
|
||||||
|
|
||||||
$notice->saveTags();
|
|
||||||
|
|
||||||
$groups = $notice->saveGroups();
|
|
||||||
|
|
||||||
$recipients = $notice->saveReplies();
|
|
||||||
|
|
||||||
$notice->addToInboxes($groups, $recipients);
|
|
||||||
|
|
||||||
$notice->saveUrls();
|
|
||||||
|
|
||||||
Event::handle('EndNoticeSave', array($notice));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
# Clear the cache for subscribed users, so they'll update at next request
|
# Clear the cache for subscribed users, so they'll update at next request
|
||||||
# XXX: someone clever could prepend instead of clearing the cache
|
# XXX: someone clever could prepend instead of clearing the cache
|
||||||
|
$notice->blowOnInsert();
|
||||||
|
|
||||||
$notice->blowCaches();
|
$qm = QueueManager::get();
|
||||||
|
|
||||||
|
$qm->enqueue($notice, 'distrib');
|
||||||
|
|
||||||
return $notice;
|
return $notice;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function blowOnInsert()
|
||||||
|
{
|
||||||
|
self::blow('profile:notice_ids:%d', $this->profile_id);
|
||||||
|
self::blow('public');
|
||||||
|
|
||||||
|
if ($this->conversation != $this->id) {
|
||||||
|
self::blow('notice:conversation_ids:%d', $this->conversation);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!empty($this->repeat_of)) {
|
||||||
|
self::blow('notice:repeats:%d', $this->repeat_of);
|
||||||
|
}
|
||||||
|
|
||||||
|
$original = Notice::staticGet('id', $this->repeat_of);
|
||||||
|
|
||||||
|
if (!empty($original)) {
|
||||||
|
$originalUser = User::staticGet('id', $original->profile_id);
|
||||||
|
if (!empty($originalUser)) {
|
||||||
|
self::blow('user:repeats_of_me:%d', $originalUser->id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
$profile = Profile::staticGet($this->profile_id);
|
||||||
|
$profile->blowNoticeCount();
|
||||||
|
}
|
||||||
|
|
||||||
/** save all urls in the notice to the db
|
/** save all urls in the notice to the db
|
||||||
*
|
*
|
||||||
* follow redirects and save all available file information
|
* follow redirects and save all available file information
|
||||||
@ -456,227 +461,6 @@ class Notice extends Memcached_DataObject
|
|||||||
return $att;
|
return $att;
|
||||||
}
|
}
|
||||||
|
|
||||||
function blowCaches($blowLast=false)
|
|
||||||
{
|
|
||||||
$this->blowSubsCache($blowLast);
|
|
||||||
$this->blowNoticeCache($blowLast);
|
|
||||||
$this->blowRepliesCache($blowLast);
|
|
||||||
$this->blowPublicCache($blowLast);
|
|
||||||
$this->blowTagCache($blowLast);
|
|
||||||
$this->blowGroupCache($blowLast);
|
|
||||||
$this->blowConversationCache($blowLast);
|
|
||||||
$this->blowRepeatCache();
|
|
||||||
$profile = Profile::staticGet($this->profile_id);
|
|
||||||
$profile->blowNoticeCount();
|
|
||||||
}
|
|
||||||
|
|
||||||
function blowRepeatCache()
|
|
||||||
{
|
|
||||||
if (!empty($this->repeat_of)) {
|
|
||||||
$cache = common_memcache();
|
|
||||||
if (!empty($cache)) {
|
|
||||||
// XXX: only blow if <100 in cache
|
|
||||||
$ck = common_cache_key('notice:repeats:'.$this->repeat_of);
|
|
||||||
$result = $cache->delete($ck);
|
|
||||||
|
|
||||||
$user = User::staticGet('id', $this->profile_id);
|
|
||||||
|
|
||||||
if (!empty($user)) {
|
|
||||||
$uk = common_cache_key('user:repeated_by_me:'.$user->id);
|
|
||||||
$cache->delete($uk);
|
|
||||||
$user->free();
|
|
||||||
unset($user);
|
|
||||||
}
|
|
||||||
|
|
||||||
$original = Notice::staticGet('id', $this->repeat_of);
|
|
||||||
|
|
||||||
if (!empty($original)) {
|
|
||||||
$originalUser = User::staticGet('id', $original->profile_id);
|
|
||||||
if (!empty($originalUser)) {
|
|
||||||
$ouk = common_cache_key('user:repeats_of_me:'.$originalUser->id);
|
|
||||||
$cache->delete($ouk);
|
|
||||||
$originalUser->free();
|
|
||||||
unset($originalUser);
|
|
||||||
}
|
|
||||||
$original->free();
|
|
||||||
unset($original);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
function blowConversationCache($blowLast=false)
|
|
||||||
{
|
|
||||||
$cache = common_memcache();
|
|
||||||
if ($cache) {
|
|
||||||
$ck = common_cache_key('notice:conversation_ids:'.$this->conversation);
|
|
||||||
$cache->delete($ck);
|
|
||||||
if ($blowLast) {
|
|
||||||
$cache->delete($ck.';last');
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
function blowGroupCache($blowLast=false)
|
|
||||||
{
|
|
||||||
$cache = common_memcache();
|
|
||||||
if ($cache) {
|
|
||||||
$group_inbox = new Group_inbox();
|
|
||||||
$group_inbox->notice_id = $this->id;
|
|
||||||
if ($group_inbox->find()) {
|
|
||||||
while ($group_inbox->fetch()) {
|
|
||||||
$cache->delete(common_cache_key('user_group:notice_ids:' . $group_inbox->group_id));
|
|
||||||
if ($blowLast) {
|
|
||||||
$cache->delete(common_cache_key('user_group:notice_ids:' . $group_inbox->group_id.';last'));
|
|
||||||
}
|
|
||||||
$member = new Group_member();
|
|
||||||
$member->group_id = $group_inbox->group_id;
|
|
||||||
if ($member->find()) {
|
|
||||||
while ($member->fetch()) {
|
|
||||||
$cache->delete(common_cache_key('notice_inbox:by_user:' . $member->profile_id));
|
|
||||||
$cache->delete(common_cache_key('notice_inbox:by_user_own:' . $member->profile_id));
|
|
||||||
if (empty($this->repeat_of)) {
|
|
||||||
$cache->delete(common_cache_key('user:friends_timeline:' . $member->profile_id));
|
|
||||||
$cache->delete(common_cache_key('user:friends_timeline_own:' . $member->profile_id));
|
|
||||||
}
|
|
||||||
if ($blowLast) {
|
|
||||||
$cache->delete(common_cache_key('notice_inbox:by_user:' . $member->profile_id . ';last'));
|
|
||||||
$cache->delete(common_cache_key('notice_inbox:by_user_own:' . $member->profile_id . ';last'));
|
|
||||||
if (empty($this->repeat_of)) {
|
|
||||||
$cache->delete(common_cache_key('user:friends_timeline:' . $member->profile_id . ';last'));
|
|
||||||
$cache->delete(common_cache_key('user:friends_timeline_own:' . $member->profile_id . ';last'));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
$group_inbox->free();
|
|
||||||
unset($group_inbox);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
function blowTagCache($blowLast=false)
|
|
||||||
{
|
|
||||||
$cache = common_memcache();
|
|
||||||
if ($cache) {
|
|
||||||
$tag = new Notice_tag();
|
|
||||||
$tag->notice_id = $this->id;
|
|
||||||
if ($tag->find()) {
|
|
||||||
while ($tag->fetch()) {
|
|
||||||
$tag->blowCache($blowLast);
|
|
||||||
$ck = 'profile:notice_ids_tagged:' . $this->profile_id . ':' . $tag->tag;
|
|
||||||
|
|
||||||
$cache->delete($ck);
|
|
||||||
if ($blowLast) {
|
|
||||||
$cache->delete($ck . ';last');
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
$tag->free();
|
|
||||||
unset($tag);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
function blowSubsCache($blowLast=false)
|
|
||||||
{
|
|
||||||
$cache = common_memcache();
|
|
||||||
if ($cache) {
|
|
||||||
$user = new User();
|
|
||||||
|
|
||||||
$UT = common_config('db','type')=='pgsql'?'"user"':'user';
|
|
||||||
$user->query('SELECT id ' .
|
|
||||||
|
|
||||||
"FROM $UT JOIN subscription ON $UT.id = subscription.subscriber " .
|
|
||||||
'WHERE subscription.subscribed = ' . $this->profile_id);
|
|
||||||
|
|
||||||
while ($user->fetch()) {
|
|
||||||
$cache->delete(common_cache_key('notice_inbox:by_user:'.$user->id));
|
|
||||||
$cache->delete(common_cache_key('notice_inbox:by_user_own:'.$user->id));
|
|
||||||
if (empty($this->repeat_of)) {
|
|
||||||
$cache->delete(common_cache_key('user:friends_timeline:'.$user->id));
|
|
||||||
$cache->delete(common_cache_key('user:friends_timeline_own:'.$user->id));
|
|
||||||
}
|
|
||||||
if ($blowLast) {
|
|
||||||
$cache->delete(common_cache_key('notice_inbox:by_user:'.$user->id.';last'));
|
|
||||||
$cache->delete(common_cache_key('notice_inbox:by_user_own:'.$user->id.';last'));
|
|
||||||
if (empty($this->repeat_of)) {
|
|
||||||
$cache->delete(common_cache_key('user:friends_timeline:'.$user->id.';last'));
|
|
||||||
$cache->delete(common_cache_key('user:friends_timeline_own:'.$user->id.';last'));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
$user->free();
|
|
||||||
unset($user);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
function blowNoticeCache($blowLast=false)
|
|
||||||
{
|
|
||||||
if ($this->is_local) {
|
|
||||||
$cache = common_memcache();
|
|
||||||
if (!empty($cache)) {
|
|
||||||
$cache->delete(common_cache_key('profile:notice_ids:'.$this->profile_id));
|
|
||||||
if ($blowLast) {
|
|
||||||
$cache->delete(common_cache_key('profile:notice_ids:'.$this->profile_id.';last'));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
function blowRepliesCache($blowLast=false)
|
|
||||||
{
|
|
||||||
$cache = common_memcache();
|
|
||||||
if ($cache) {
|
|
||||||
$reply = new Reply();
|
|
||||||
$reply->notice_id = $this->id;
|
|
||||||
if ($reply->find()) {
|
|
||||||
while ($reply->fetch()) {
|
|
||||||
$cache->delete(common_cache_key('reply:stream:'.$reply->profile_id));
|
|
||||||
if ($blowLast) {
|
|
||||||
$cache->delete(common_cache_key('reply:stream:'.$reply->profile_id.';last'));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
$reply->free();
|
|
||||||
unset($reply);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
function blowPublicCache($blowLast=false)
|
|
||||||
{
|
|
||||||
if ($this->is_local == Notice::LOCAL_PUBLIC) {
|
|
||||||
$cache = common_memcache();
|
|
||||||
if ($cache) {
|
|
||||||
$cache->delete(common_cache_key('public'));
|
|
||||||
if ($blowLast) {
|
|
||||||
$cache->delete(common_cache_key('public').';last');
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
function blowFavesCache($blowLast=false)
|
|
||||||
{
|
|
||||||
$cache = common_memcache();
|
|
||||||
if ($cache) {
|
|
||||||
$fave = new Fave();
|
|
||||||
$fave->notice_id = $this->id;
|
|
||||||
if ($fave->find()) {
|
|
||||||
while ($fave->fetch()) {
|
|
||||||
$cache->delete(common_cache_key('fave:ids_by_user:'.$fave->user_id));
|
|
||||||
$cache->delete(common_cache_key('fave:by_user_own:'.$fave->user_id));
|
|
||||||
if ($blowLast) {
|
|
||||||
$cache->delete(common_cache_key('fave:ids_by_user:'.$fave->user_id.';last'));
|
|
||||||
$cache->delete(common_cache_key('fave:by_user_own:'.$fave->user_id.';last'));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
$fave->free();
|
|
||||||
unset($fave);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
function getStreamByIds($ids)
|
function getStreamByIds($ids)
|
||||||
{
|
{
|
||||||
$cache = common_memcache();
|
$cache = common_memcache();
|
||||||
@ -999,7 +783,14 @@ class Notice extends Memcached_DataObject
|
|||||||
$gi->notice_id = $this->id;
|
$gi->notice_id = $this->id;
|
||||||
$gi->created = $this->created;
|
$gi->created = $this->created;
|
||||||
|
|
||||||
return $gi->insert();
|
$result = $gi->insert();
|
||||||
|
|
||||||
|
if (!result) {
|
||||||
|
common_log_db_error($gi, 'INSERT', __FILE__);
|
||||||
|
throw new ServerException(_('Problem saving group inbox.'));
|
||||||
|
}
|
||||||
|
|
||||||
|
self::blow('user_group:notice_ids:%d', $gi->group_id);
|
||||||
}
|
}
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
@ -1094,7 +885,8 @@ class Notice extends Memcached_DataObject
|
|||||||
|
|
||||||
foreach ($recipientIds as $recipientId) {
|
foreach ($recipientIds as $recipientId) {
|
||||||
$user = User::staticGet('id', $recipientId);
|
$user = User::staticGet('id', $recipientId);
|
||||||
if ($user) {
|
if (!empty($user)) {
|
||||||
|
self::blow('reply:stream:%d', $reply->profile_id);
|
||||||
mail_notify_attn($user, $this);
|
mail_notify_attn($user, $this);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1553,4 +1345,104 @@ class Notice extends Memcached_DataObject
|
|||||||
|
|
||||||
return $options;
|
return $options;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function clearReplies()
|
||||||
|
{
|
||||||
|
$replyNotice = new Notice();
|
||||||
|
$replyNotice->reply_to = $this->id;
|
||||||
|
|
||||||
|
//Null any notices that are replies to this notice
|
||||||
|
|
||||||
|
if ($replyNotice->find()) {
|
||||||
|
while ($replyNotice->fetch()) {
|
||||||
|
$orig = clone($replyNotice);
|
||||||
|
$replyNotice->reply_to = null;
|
||||||
|
$replyNotice->update($orig);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reply records
|
||||||
|
|
||||||
|
$reply = new Reply();
|
||||||
|
$reply->notice_id = $this->id;
|
||||||
|
|
||||||
|
if ($reply->find()) {
|
||||||
|
while($reply->fetch()) {
|
||||||
|
self::blow('reply:stream:%d', $reply->profile_id);
|
||||||
|
$reply->delete();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
$reply->free();
|
||||||
|
|
||||||
|
return $ids;
|
||||||
|
}
|
||||||
|
|
||||||
|
function clearRepeats()
|
||||||
|
{
|
||||||
|
$repeatNotice = new Notice();
|
||||||
|
$repeatNotice->repeat_of = $this->id;
|
||||||
|
|
||||||
|
//Null any notices that are repeats of this notice
|
||||||
|
|
||||||
|
if ($repeatNotice->find()) {
|
||||||
|
while ($repeatNotice->fetch()) {
|
||||||
|
$orig = clone($repeatNotice);
|
||||||
|
$repeatNotice->repeat_of = null;
|
||||||
|
$repeatNotice->update($orig);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function clearFaves()
|
||||||
|
{
|
||||||
|
$fave = new Fave();
|
||||||
|
$fave->notice_id = $this->id;
|
||||||
|
|
||||||
|
if ($fave->find()) {
|
||||||
|
while ($fave->fetch()) {
|
||||||
|
self::blow('fave:ids_by_user_own:%d', $fave->user_id);
|
||||||
|
self::blow('fave:ids_by_user_own:%d;last', $fave->user_id);
|
||||||
|
self::blow('fave:ids_by_user:%d', $fave->user_id);
|
||||||
|
self::blow('fave:ids_by_user:%d;last', $fave->user_id);
|
||||||
|
$fave->delete();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
$fave->free();
|
||||||
|
}
|
||||||
|
|
||||||
|
function clearTags()
|
||||||
|
{
|
||||||
|
$tag = new Notice_tag();
|
||||||
|
$tag->notice_id = $this->id;
|
||||||
|
|
||||||
|
if ($tag->find()) {
|
||||||
|
while ($tag->fetch()) {
|
||||||
|
self::blow('profile:notice_ids_tagged:%d:%s', $this->profile_id, common_keyize($tag->tag));
|
||||||
|
self::blow('profile:notice_ids_tagged:%d:%s;last', $this->profile_id, common_keyize($tag->tag));
|
||||||
|
self::blow('notice_tag:notice_ids:%s', common_keyize($tag->tag));
|
||||||
|
self::blow('notice_tag:notice_ids:%s;last', common_keyize($tag->tag));
|
||||||
|
$tag->delete();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
$tag->free();
|
||||||
|
}
|
||||||
|
|
||||||
|
function clearGroupInboxes()
|
||||||
|
{
|
||||||
|
$gi = new Group_inbox();
|
||||||
|
|
||||||
|
$gi->notice_id = $this->id;
|
||||||
|
|
||||||
|
if ($gi->find()) {
|
||||||
|
while ($gi->fetch()) {
|
||||||
|
self::blow('user_group:notice_ids:%d', $gi->group_id);
|
||||||
|
$gi->delete();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
$gi->free();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -86,13 +86,9 @@ class Notice_tag extends Memcached_DataObject
|
|||||||
|
|
||||||
function blowCache($blowLast=false)
|
function blowCache($blowLast=false)
|
||||||
{
|
{
|
||||||
$cache = common_memcache();
|
self::blow('notice_tag:notice_ids:%s', common_keyize($this->tag));
|
||||||
if ($cache) {
|
if ($blowLast) {
|
||||||
$idkey = common_cache_key('notice_tag:notice_ids:' . common_keyize($this->tag));
|
self::blow('notice_tag:notice_ids:%s;last', common_keyize($this->tag));
|
||||||
$cache->delete($idkey);
|
|
||||||
if ($blowLast) {
|
|
||||||
$cache->delete($idkey.';last');
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -383,7 +383,7 @@ class User extends Memcached_DataObject
|
|||||||
common_config('site', 'name'),
|
common_config('site', 'name'),
|
||||||
$user->nickname),
|
$user->nickname),
|
||||||
'system');
|
'system');
|
||||||
common_broadcast_notice($notice);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -422,7 +422,7 @@ class RepeatCommand extends Command
|
|||||||
$repeat = $notice->repeat($this->user->id, $channel->source);
|
$repeat = $notice->repeat($this->user->id, $channel->source);
|
||||||
|
|
||||||
if ($repeat) {
|
if ($repeat) {
|
||||||
common_broadcast_notice($repeat);
|
|
||||||
$channel->output($this->user, sprintf(_('Notice from %s repeated'), $recipient->nickname));
|
$channel->output($this->user, sprintf(_('Notice from %s repeated'), $recipient->nickname));
|
||||||
} else {
|
} else {
|
||||||
$channel->error($this->user, _('Error repeating notice.'));
|
$channel->error($this->user, _('Error repeating notice.'));
|
||||||
@ -492,7 +492,7 @@ class ReplyCommand extends Command
|
|||||||
} else {
|
} else {
|
||||||
$channel->error($this->user, _('Error saving notice.'));
|
$channel->error($this->user, _('Error saving notice.'));
|
||||||
}
|
}
|
||||||
common_broadcast_notice($notice);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
84
lib/distribqueuehandler.php
Normal file
84
lib/distribqueuehandler.php
Normal file
@ -0,0 +1,84 @@
|
|||||||
|
<?php
|
||||||
|
/*
|
||||||
|
* StatusNet - the distributed open-source microblogging tool
|
||||||
|
* Copyright (C) 2008, 2009, StatusNet, Inc.
|
||||||
|
*
|
||||||
|
* This program is free software: you can redistribute it and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License as published by
|
||||||
|
* the Free Software Foundation, either version 3 of the License, or
|
||||||
|
* (at your option) any later version.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU Affero General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
if (!defined('STATUSNET') && !defined('LACONICA')) { exit(1); }
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Base class for queue handlers.
|
||||||
|
*
|
||||||
|
* As extensions of the Daemon class, each queue handler has the ability
|
||||||
|
* to launch itself in the background, at which point it'll pass control
|
||||||
|
* to the configured QueueManager class to poll for updates.
|
||||||
|
*
|
||||||
|
* Subclasses must override at least the following methods:
|
||||||
|
* - transport
|
||||||
|
* - handle_notice
|
||||||
|
*/
|
||||||
|
|
||||||
|
class DistribQueueHandler
|
||||||
|
{
|
||||||
|
/**
|
||||||
|
* Return transport keyword which identifies items this queue handler
|
||||||
|
* services; must be defined for all subclasses.
|
||||||
|
*
|
||||||
|
* Must be 8 characters or less to fit in the queue_item database.
|
||||||
|
* ex "email", "jabber", "sms", "irc", ...
|
||||||
|
*
|
||||||
|
* @return string
|
||||||
|
*/
|
||||||
|
|
||||||
|
function transport()
|
||||||
|
{
|
||||||
|
return 'distrib';
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Here's the meat of your queue handler -- you're handed a Notice
|
||||||
|
* object, which you may do as you will with.
|
||||||
|
*
|
||||||
|
* If this function indicates failure, a warning will be logged
|
||||||
|
* and the item is placed back in the queue to be re-run.
|
||||||
|
*
|
||||||
|
* @param Notice $notice
|
||||||
|
* @return boolean true on success, false on failure
|
||||||
|
*/
|
||||||
|
function handle($notice)
|
||||||
|
{
|
||||||
|
// XXX: do we need to change this for remote users?
|
||||||
|
|
||||||
|
$notice->saveTags();
|
||||||
|
|
||||||
|
$groups = $notice->saveGroups();
|
||||||
|
|
||||||
|
$recipients = $notice->saveReplies();
|
||||||
|
|
||||||
|
$notice->addToInboxes($groups, $recipients);
|
||||||
|
|
||||||
|
$notice->saveUrls();
|
||||||
|
|
||||||
|
Event::handle('EndNoticeSave', array($notice));
|
||||||
|
|
||||||
|
// Enqueue for other handlers
|
||||||
|
|
||||||
|
common_enqueue_notice($notice);
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -160,7 +160,7 @@ class MailHandler
|
|||||||
foreach($mediafiles as $mf){
|
foreach($mediafiles as $mf){
|
||||||
$mf->attachToNotice($notice);
|
$mf->attachToNotice($notice);
|
||||||
}
|
}
|
||||||
common_broadcast_notice($notice);
|
|
||||||
$this->log(LOG_INFO,
|
$this->log(LOG_INFO,
|
||||||
'Added notice ' . $notice->id . ' from user ' . $user->nickname);
|
'Added notice ' . $notice->id . ' from user ' . $user->nickname);
|
||||||
return true;
|
return true;
|
||||||
|
@ -362,7 +362,7 @@ class StatusNetOAuthDataStore extends OAuthDataStore
|
|||||||
array('is_local' => Notice::REMOTE_OMB,
|
array('is_local' => Notice::REMOTE_OMB,
|
||||||
'uri' => $omb_notice->getIdentifierURI()));
|
'uri' => $omb_notice->getIdentifierURI()));
|
||||||
|
|
||||||
common_broadcast_notice($notice, true);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -217,6 +217,7 @@ abstract class QueueManager extends IoManager
|
|||||||
$this->connect('plugin', 'PluginQueueHandler');
|
$this->connect('plugin', 'PluginQueueHandler');
|
||||||
$this->connect('omb', 'OmbQueueHandler');
|
$this->connect('omb', 'OmbQueueHandler');
|
||||||
$this->connect('ping', 'PingQueueHandler');
|
$this->connect('ping', 'PingQueueHandler');
|
||||||
|
$this->connect('distrib', 'DistribQueueHandler');
|
||||||
if (common_config('sms', 'enabled')) {
|
if (common_config('sms', 'enabled')) {
|
||||||
$this->connect('sms', 'SmsQueueHandler');
|
$this->connect('sms', 'SmsQueueHandler');
|
||||||
}
|
}
|
||||||
|
@ -987,7 +987,7 @@ function common_redirect($url, $code=307)
|
|||||||
|
|
||||||
function common_broadcast_notice($notice, $remote=false)
|
function common_broadcast_notice($notice, $remote=false)
|
||||||
{
|
{
|
||||||
return common_enqueue_notice($notice);
|
// DO NOTHING!
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stick the notice on the queue
|
// Stick the notice on the queue
|
||||||
|
@ -397,7 +397,7 @@ class FacebookAction extends Action
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
common_broadcast_notice($notice);
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user