single function for important streams, with memcached support

I moved the 4 streams for a user (with friends, faves, replies,
personal) into functions on the User object. Added a helper function
in Notice for making notice streams. Also, will fetch notice streams
out of the memcached server, if possible. Made the API, RSS, and HTML
output all use the same streams (hopefully cached).

Added some code to Notice to blow the cache when a notice is posted.
Also, added code to favor and disfavor actions to blow the faves
cache, too.

darcs-hash:20080928120119-5ed1f-ead542348bcd3cf315be6f42934353154402eb16.gz
This commit is contained in:
Evan Prodromou 2008-09-28 08:01:19 -04:00
parent a8624b2b72
commit 02a3f24b92
11 changed files with 273 additions and 221 deletions

View File

@ -63,6 +63,8 @@ class DisfavorAction extends Action {
$this->server_error(_('Could not delete favorite.'));
return;
}
$user->blowFavesCache();
if ($this->boolean('ajax')) {
common_start_html('text/xml');

View File

@ -62,7 +62,8 @@ class FavorAction extends Action {
}
$this->notify($fave, $notice, $user);
$user->blowFavesCache();
if ($this->boolean('ajax')) {
common_start_html('text/xml');
common_element_start('head');

View File

@ -59,32 +59,17 @@ class PublicAction extends StreamAction {
function show_notices($page) {
$notice = new Notice();
# XXX: sub-optimal
if (common_config('public', 'localonly')) {
$notice->is_local = 1;
}
$notice->orderBy('created DESC, notice.id DESC');
# We fetch one extra, to see if we need an "older" link
$notice->limit((($page-1)*NOTICES_PER_PAGE), NOTICES_PER_PAGE + 1);
$cnt = $notice->find();
if ($cnt > 0) {
$cnt = 0;
$notice = Notice::publicStream($page);
if ($notice) {
common_element_start('ul', array('id' => 'notices'));
$iMax = min($cnt, NOTICES_PER_PAGE);
for ($i = 0; $i < $iMax; $i++) {
if ($notice->fetch()) {
$this->show_notice($notice);
} else {
// shouldn't happen!
while ($notice->fetch()) {
$cnt++;
if ($cnt > NOTICES_PER_PAGE) {
break;
}
$this->show_notice($notice);
}
common_element_end('ul');
}
@ -92,5 +77,4 @@ class PublicAction extends StreamAction {
common_pagination($page > 1, $cnt > NOTICES_PER_PAGE,
$page, 'public');
}
}
}

View File

@ -30,29 +30,15 @@ class PublicrssAction extends Rss10Action {
}
function get_notices($limit=0) {
$user = $this->user;
$notices = array();
$notice = new Notice();
# XXX: bad performance
if (common_config('public', 'localonly')) {
$notice->is_local = 1;
}
$notice->orderBy('created DESC, notice.id DESC');
if ($limit != 0) {
$notice->limit(0, $limit);
}
$notice->find();
$notice = Notice::publicStream(0, ($limit == 0) ? 48 : $limit);
while ($notice->fetch()) {
$notices[] = clone($notice);
}
return $notices;
}

View File

@ -48,7 +48,7 @@ class RepliesAction extends StreamAction {
array($this, 'show_header'), $user,
array($this, 'show_top'));
$this->show_replies($profile);
$this->show_replies($user);
common_show_footer();
}
@ -75,35 +75,22 @@ class RepliesAction extends StreamAction {
$this->views_menu();
}
function show_replies($profile) {
$reply = new Reply();
$reply->profile_id = $profile->id;
$reply->orderBy('modified DESC');
function show_replies($user) {
$page = ($this->arg('page')) ? ($this->arg('page')+0) : 1;
$reply->limit((($page-1)*NOTICES_PER_PAGE), NOTICES_PER_PAGE + 1);
$cnt = $reply->find();
if ($cnt > 0) {
$notice = $user->getReplies(($page-1) * NOTICES_PER_PAGE, NOTICES_PER_PAGE + 1);
$cnt = 0;
if ($notice) {
common_element_start('ul', array('id' => 'notices'));
for ($i = 0; $i < min($cnt, NOTICES_PER_PAGE); $i++) {
if ($reply->fetch()) {
$notice = new Notice();
$notice->id = $reply->notice_id;
$result = $notice->find(true);
if (!$result) {
continue;
}
$this->show_notice($notice);
} else {
// shouldn't happen!
while ($notice->fetch()) {
$cnt++;
if ($cnt > NOTICES_PER_PAGE) {
break;
}
$this->show_notice($notice);
}
common_element_end('ul');
}

View File

@ -42,27 +42,13 @@ class RepliesrssAction extends Rss10Action {
function get_notices($limit=0) {
$user = $this->user;
$notice = $user->getReplies(0, ($limit == 0) ? 48 : $limit);
$notices = array();
$reply = new Reply();
$reply->profile_id = $user->id;
$reply->orderBy('modified DESC');
if ($limit) {
$reply->limit(0, $limit);
}
$cnt = $reply->find();
if ($cnt) {
while ($reply->fetch()) {
$notice = new Notice();
$notice->id = $reply->notice_id;
$result = $notice->find(true);
if (!$result) {
continue;
}
$notices[] = clone($notice);
}
while ($notice->fetch()) {
$notices[] = clone($notice);
}
return $notices;

View File

@ -337,31 +337,27 @@ class ShowstreamAction extends StreamAction {
function show_notices($profile) {
$notice = DB_DataObject::factory('notice');
$notice->profile_id = $profile->id;
$notice->orderBy('created DESC, notice.id DESC');
$page = ($this->arg('page')) ? ($this->arg('page')+0) : 1;
$notice->limit((($page-1)*NOTICES_PER_PAGE), NOTICES_PER_PAGE + 1);
$notice = $user->getNotices(($page-1)*NOTICES_PER_PAGE, NOTICES_PER_PAGE + 1);
$cnt = 0;
$cnt = $notice->find();
if ($cnt > 0) {
if ($notice) {
common_element_start('ul', array('id' => 'notices'));
for ($i = 0; $i < min($cnt, NOTICES_PER_PAGE); $i++) {
if ($notice->fetch()) {
$this->show_notice($notice);
} else {
// shouldn't happen!
while ($notice->fetch()) {
$cnt++;
if ($cnt > NOTICES_PER_PAGE) {
break;
}
$this->show_notice($notice);
}
common_element_end('ul');
}
common_pagination($page>1, $cnt>NOTICES_PER_PAGE, $page,
'showstream', array('nickname' => $profile->nickname));
}

View File

@ -54,22 +54,12 @@ class TwitapistatusesAction extends TwitterapiAction {
// Number of public statuses to return by default -- Twitter sends 20
$MAX_PUBSTATUSES = 20;
$notice = new Notice();
// FIXME: To really live up to the spec we need to build a list
// of notices by users who have custom avatars, so fix this SQL -- Zach
# XXX: sub-optimal performance
if (common_config('public', 'localonly')) {
$notice->is_local = 1;
}
$notice->orderBy('created DESC, notice.id DESC');
$notice->limit($MAX_PUBSTATUSES);
$cnt = $notice->find();
if ($cnt > 0) {
$notice = Notice::publicStream(0, $MAX_PUBSTATUSES);
if ($notice) {
switch($apidata['content-type']) {
case 'xml':
@ -341,18 +331,10 @@ class TwitapistatusesAction extends TwitterapiAction {
$link = common_local_url('showstream', array('nickname' => $user->nickname));
$subtitle = sprintf(_('Updates from %1$s on %2$s!'), $user->nickname, $sitename);
$notice = new Notice();
$notice->profile_id = $user->id;
# XXX: since
# XXX: since_id
$notice->orderBy('created DESC, notice.id DESC');
$notice->limit((($page-1)*20), $count);
$cnt = $notice->find();
$notice = $user->getNotices((($page-1)*20), $count);
switch($apidata['content-type']) {
case 'xml':
@ -490,30 +472,11 @@ class TwitapistatusesAction extends TwitterapiAction {
$count = 20;
}
$reply = new Reply();
$reply->profile_id = $user->id;
$reply->orderBy('modified DESC');
$page = ($this->arg('page')) ? ($this->arg('page')+0) : 1;
$reply->limit((($page-1)*20), $count);
$cnt = $reply->find();
$notice = $user->getReplies((($page-1)*20), $count);
$notices = array();
if ($cnt) {
while ($reply->fetch()) {
$notice = new Notice();
$notice->id = $reply->notice_id;
$result = $notice->find(true);
if (!$result) {
continue;
}
$notices[] = clone($notice);
}
while ($notice->fetch()) {
$notices[] = clone($notice);
}
switch($apidata['content-type']) {

View File

@ -42,14 +42,13 @@ class UserrssAction extends Rss10Action {
function get_notices($limit=0) {
$user = $this->user;
$notices = array();
$notice = DB_DataObject::factory('notice');
$notice->profile_id = $user->id; # user id === profile id
$notice->orderBy('created DESC, notice.id DESC');
if ($limit != 0) {
$notice->limit(0, $limit);
if (is_null($user)) {
return NULL;
}
$notice = $user->getNotices(0, ($limit == 0) ? NOTICES_PER_PAGE : $limit);
$notice->find();
while ($notice->fetch()) {

View File

@ -24,6 +24,11 @@ if (!defined('LACONICA')) { exit(1); }
*/
require_once INSTALLDIR.'/classes/Memcached_DataObject.php';
/* We keep the first three 20-notice pages, plus one for pagination check,
* in the memcached cache. */
define('NOTICE_CACHE_WINDOW', 61);
class Notice extends Memcached_DataObject
{
###START_AUTOCODE
@ -52,6 +57,12 @@ class Notice extends Memcached_DataObject
return Profile::staticGet($this->profile_id);
}
function delete() {
$this->blowCaches();
$this->blowFavesCache();
parent::delete();
}
function saveTags() {
/* extract all #hastags */
$count = preg_match_all('/(?:^|\s)#([A-Za-z0-9_\-\.]{1,64})/', strtolower($this->content), $match);
@ -118,11 +129,18 @@ class Notice extends Memcached_DataObject
# XXX: someone clever could prepend instead of clearing the cache
if (common_config('memcached', 'enabled')) {
$notice->blowSubsCache();
$notice->blowCaches();
}
return $notice;
}
function blowCaches() {
$this->blowSubsCache();
$this->blowNoticeCache();
$this->blowRepliesCache();
$this->blowPublicCache();
}
function blowSubsCache() {
$cache = common_memcache();
@ -141,4 +159,148 @@ class Notice extends Memcached_DataObject
unset($user);
}
}
function blowNoticeCache() {
if ($this->is_local) {
$cache = common_memcache();
if ($cache) {
$cache->delete(common_cache_key('user:notices:'.$this->profile_id));
}
}
}
function blowRepliesCache() {
$cache = common_memcache();
if ($cache) {
$reply = new Reply();
$reply->notice_id = $this->id;
if ($reply->find()) {
while ($reply->fetch()) {
$cache->delete(common_cache_key('user:replies:'.$reply->profile_id));
}
}
$reply->free();
unset($reply);
}
}
function blowPublicCache() {
if ($this->is_local) {
$cache = common_memcache();
if ($cache) {
$cache->delete(common_cache_key('public'));
}
}
}
function blowFavesCache() {
$cache = common_memcache();
if ($cache) {
$fave = new Fave();
$fave->notice_id = $this->id;
if ($fave->find()) {
while ($fave->fetch()) {
$cache->delete(common_cache_key('user:faves:'.$fave->user_id));
}
}
$fave->free();
unset($fave);
}
}
static function getStream($qry, $cachekey, $offset=0, $limit=20) {
if (common_config('memcached', 'enabled')) {
return Notice::getCachedStream($qry, $cachekey, $offset, $limit);
} else {
return Notice::getStreamDirect($qry, $offset, $limit);
}
}
static function getStreamDirect($qry, $offset, $limit) {
$qry .= 'ORDER BY notice.created DESC, notice.id DESC ';
if(common_config('db','type')=='pgsql') {
$qry .= 'LIMIT ' . $limit . ' OFFSET ' . $offset;
} else {
$qry .= 'LIMIT ' . $offset . ', ' . $limit;
}
$notice = new Notice();
$notice->query($qry);
return $notice;
}
static function getCachedStream($qry, $cachekey, $offset, $limit) {
# If outside our cache window, just go to the DB
if ($offset + $limit > NOTICE_CACHE_WINDOW) {
return Notice::getStreamDirect($qry, $offset, $limit);
}
# Get the cache; if we can't, just go to the DB
$cache = common_memcache();
if (!$cache) {
return Notice::getStreamDirect($qry, $offset, $limit);
}
# Get the notices out of the cache
$notices = $cache->get(common_cache_key($cachekey));
# On a cache hit, return a DB-object-like wrapper
if ($notices) {
$wrapper = new NoticeWrapper(array_slice($notices, $offset, $limit));
return $wrapper;
}
# Otherwise, get the full cache window out of the DB
$notice = Notice::getStreamDirect($qry, 0, NOTICE_CACHE_WINDOW);
# If there are no hits, just return the value
if (!$notice) {
return $notice;
}
# Pack results into an array
$notices = array();
while ($notice->fetch()) {
$notices[] = clone($notice);
}
# Store the array in the cache for next time
$cache->set(common_cache_key($cachekey), $notices);
# return a wrapper of the array for use now
$wrapper = new NoticeWrapper(array_slice($notices, $offset, $limit));
return $wrapper;
}
function publicStream($page) {
$qry = 'SELECT * FROM notice ';
if (common_config('public', 'localonly')) {
$qry .= ' WHERE is_local = 1 ';
}
return Notice::getStream($qry,
'public',
($page-1)*NOTICES_PER_PAGE,
NOTICES_PER_PAGE + 1);
}
}

View File

@ -19,11 +19,6 @@
if (!defined('LACONICA')) { exit(1); }
/* We keep the first three 20-notice pages, plus one for pagination check,
* in the memcached cache. */
define('WITHFRIENDS_CACHE_WINDOW', 61);
/**
* Table Definition for user
*/
@ -153,60 +148,8 @@ class User extends Memcached_DataObject
return true;
}
function noticesWithFriends($offset=0, $limit=20) {
# We clearly need a more elegant way to make this work.
if (common_config('memcached', 'enabled')) {
if ($offset + $limit <= WITHFRIENDS_CACHE_WINDOW) {
$cached = $this->noticesWithFriendsWindow();
$wrapper = new NoticeWrapper(array_slice($cached, $offset, $limit));
return $wrapper;
}
}
$notice = new Notice();
$query='SELECT notice.* ' .
'FROM notice JOIN subscription on notice.profile_id = subscription.subscribed ' .
'WHERE subscription.subscriber = ' . $this->id . ' ' .
'ORDER BY created DESC, notice.id DESC ';
if(common_config('db','type')=='pgsql') {
$query=$query . 'LIMIT ' . $limit . ' OFFSET ' . $offset;
} else {
$query=$query . 'LIMIT ' . $offset . ', ' . $limit;
}
$notice->query($query);
return $notice;
}
function favoriteNotices($offset=0, $limit=20) {
$notice = new Notice();
$notice->query('SELECT notice.* ' .
'FROM notice JOIN fave on notice.id = fave.notice_id ' .
'WHERE fave.user_id = ' . $this->id . ' ' .
'ORDER BY notice.created DESC, notice.id DESC ' .
'LIMIT ' . $offset . ', ' . $limit);
return $notice;
}
function noticesWithFriendsWindow() {
$cache = common_memcache();
if (!$cache) {
return NULL;
}
$notices = $cache->get(common_cache_key('user:notices_with_friends:' . $this->id));
if ($notices) {
return $notices;
}
$notice = new Notice();
@ -216,14 +159,6 @@ class User extends Memcached_DataObject
'ORDER BY created DESC, notice.id DESC ' .
'LIMIT 0, ' . WITHFRIENDS_CACHE_WINDOW);
$notices = array();
while ($notice->fetch()) {
$notices[] = clone($notice);
}
$cache->set(common_cache_key('user:notices_with_friends:' . $this->id), $notices);
return $notices;
}
static function register($fields) {
@ -383,4 +318,55 @@ class User extends Memcached_DataObject
return $user;
}
function getReplies($offset=0, $limit=NOTICES_PER_PAGE) {
$qry =
'SELECT notice.* ' .
'FROM notice JOIN reply ON notice.id = reply.notice_id ' .
'WHERE reply.profile_id = %d ';
return Notice::getStream(sprintf($qry, $this->id),
'user:replies:'.$this->id,
$offset, $limit);
}
function getNotices($offset=0, $limit=NOTICES_PER_PAGE) {
$qry =
'SELECT * ' .
'FROM notice ' .
'WHERE profile_id = %d ';
return Notice::getStream(sprintf($qry, $this->id),
'user:notices:'.$this->id,
$offset, $limit);
}
function favoriteNotices($offset=0, $limit=NOTICES_PER_PAGE) {
$qry =
'SELECT notice.* ' .
'FROM notice JOIN fave ON notice.id = fave.notice_id ' .
'WHERE fave.profile_id = %d ';
return Notice::getStream(sprintf($qry, $this->id),
'user:faves:'.$this->id,
$offset, $limit);
}
function noticesWithFriends($offset=0, $limit=NOTICES_PER_PAGE) {
$qry =
'SELECT notice.* ' .
'FROM notice JOIN subscription ON notice.profile_id = subscription.subscribed ' .
'WHERE subscription.subscriber = %d';
return Notice::getStream(sprintf($qry, $this->id),
'user:notices_with_friends:' . $this->id,
$offset, $limit);
}
function blowFavesCache() {
$cache = common_memcache();
if ($cache) {
$cache->delete(common_cache_key('user:faves:'.$this->id));
}
}
}