diff --git a/classes/Fave.php b/classes/Fave.php index efbceee6a8..a61f35d190 100644 --- a/classes/Fave.php +++ b/classes/Fave.php @@ -79,12 +79,12 @@ class Fave extends Memcached_DataObject function stream($user_id, $offset=0, $limit=NOTICES_PER_PAGE, $own=false, $since_id=0, $max_id=0) { - $ids = Notice::stream(array('Fave', '_streamDirect'), - array($user_id, $own), - ($own) ? 'fave:ids_by_user_own:'.$user_id : - 'fave:ids_by_user:'.$user_id, - $offset, $limit, $since_id, $max_id); - return $ids; + $stream = new NoticeStream(array('Fave', '_streamDirect'), + array($user_id, $own), + ($own) ? 'fave:ids_by_user_own:'.$user_id : + 'fave:ids_by_user:'.$user_id); + + return $stream->getNotices($offset, $limit, $since_id, $max_id); } /** diff --git a/classes/File.php b/classes/File.php index e9a0131c4e..681c33f9cd 100644 --- a/classes/File.php +++ b/classes/File.php @@ -449,12 +449,11 @@ class File extends Memcached_DataObject function stream($offset=0, $limit=NOTICES_PER_PAGE, $since_id=0, $max_id=0) { - $ids = Notice::stream(array($this, '_streamDirect'), - array(), - 'file:notice-ids:'.$this->url, - $offset, $limit, $since_id, $max_id); + $stream = new NoticeStream(array($this, '_streamDirect'), + array(), + 'file:notice-ids:'.$this->url); - return Notice::getStreamByIds($ids); + return $stream->getNotices($offset, $limit, $since_id, $max_id); } /** diff --git a/classes/Notice.php b/classes/Notice.php index b228a49c7c..8200e4554f 100644 --- a/classes/Notice.php +++ b/classes/Notice.php @@ -629,54 +629,14 @@ class Notice extends Memcached_DataObject return $att; } - function getStreamByIds($ids) - { - $cache = Cache::instance(); - - if (!empty($cache)) { - $notices = array(); - foreach ($ids as $id) { - $n = Notice::staticGet('id', $id); - if (!empty($n)) { - $notices[] = $n; - } - } - return new ArrayWrapper($notices); - } else { - $notice = new Notice(); - if (empty($ids)) { - //if no IDs requested, just return the notice object - return $notice; - } - $notice->whereAdd('id in (' . implode(', ', $ids) . ')'); - - $notice->find(); - - $temp = array(); - - while ($notice->fetch()) { - $temp[$notice->id] = clone($notice); - } - - $wrapped = array(); - - foreach ($ids as $id) { - if (array_key_exists($id, $temp)) { - $wrapped[] = $temp[$id]; - } - } - - return new ArrayWrapper($wrapped); - } - } function publicStream($offset=0, $limit=20, $since_id=0, $max_id=0) { - $ids = Notice::stream(array('Notice', '_publicStreamDirect'), - array(), - 'public', - $offset, $limit, $since_id, $max_id); - return Notice::getStreamByIds($ids); + $stream = new NoticeStream(array('Notice', '_publicStreamDirect'), + array(), + 'public'); + + return $stream->getNotices($offset, $limit, $since_id, $max_id); } function _publicStreamDirect($offset=0, $limit=20, $since_id=0, $max_id=0) @@ -719,12 +679,11 @@ class Notice extends Memcached_DataObject function conversationStream($id, $offset=0, $limit=20, $since_id=0, $max_id=0) { - $ids = Notice::stream(array('Notice', '_conversationStreamDirect'), - array($id), - 'notice:conversation_ids:'.$id, - $offset, $limit, $since_id, $max_id); + $stream = new NoticeStream(array('Notice', '_conversationStreamDirect'), + array($id), + 'notice:conversation_ids:'.$id); - return Notice::getStreamByIds($ids); + return $stream->getNotices($offset, $limit, $since_id, $max_id); } function _conversationStreamDirect($id, $offset=0, $limit=20, $since_id=0, $max_id=0) @@ -1540,61 +1499,6 @@ class Notice extends Memcached_DataObject } } - function stream($fn, $args, $cachekey, $offset=0, $limit=20, $since_id=0, $max_id=0) - { - $cache = Cache::instance(); - - if (empty($cache) || - $since_id != 0 || $max_id != 0 || - is_null($limit) || - ($offset + $limit) > NOTICE_CACHE_WINDOW) { - return call_user_func_array($fn, array_merge($args, array($offset, $limit, $since_id, - $max_id))); - } - - $idkey = Cache::key($cachekey); - - $idstr = $cache->get($idkey); - - if ($idstr !== false) { - // Cache hit! Woohoo! - $window = explode(',', $idstr); - $ids = array_slice($window, $offset, $limit); - return $ids; - } - - $laststr = $cache->get($idkey.';last'); - - if ($laststr !== false) { - $window = explode(',', $laststr); - $last_id = $window[0]; - $new_ids = call_user_func_array($fn, array_merge($args, array(0, NOTICE_CACHE_WINDOW, - $last_id, 0, null))); - - $new_window = array_merge($new_ids, $window); - - $new_windowstr = implode(',', $new_window); - - $result = $cache->set($idkey, $new_windowstr); - $result = $cache->set($idkey . ';last', $new_windowstr); - - $ids = array_slice($new_window, $offset, $limit); - - return $ids; - } - - $window = call_user_func_array($fn, array_merge($args, array(0, NOTICE_CACHE_WINDOW, - 0, 0, null))); - - $windowstr = implode(',', $window); - - $result = $cache->set($idkey, $windowstr); - $result = $cache->set($idkey . ';last', $windowstr); - - $ids = array_slice($window, $offset, $limit); - - return $ids; - } /** * Determine which notice, if any, a new notice is in reply to. diff --git a/classes/Notice_tag.php b/classes/Notice_tag.php index 81d346c5d3..813242253d 100644 --- a/classes/Notice_tag.php +++ b/classes/Notice_tag.php @@ -36,14 +36,13 @@ class Notice_tag extends Memcached_DataObject /* the code above is auto generated do not remove the tag below */ ###END_AUTOCODE - static function getStream($tag, $offset=0, $limit=20) { - - $ids = Notice::stream(array('Notice_tag', '_streamDirect'), - array($tag), - 'notice_tag:notice_ids:' . Cache::keyize($tag), - $offset, $limit); - - return Notice::getStreamByIds($ids); + static function getStream($tag, $offset=0, $limit=20, $sinceId=0, $maxId=0) + { + $stream = new NoticeStream(array('Notice_tag', '_streamDirect'), + array($tag), + 'notice_tag:notice_ids:' . Cache::keyize($tag)); + + return $stream->getNotices($offset, $limit, $sinceId, $maxId); } function _streamDirect($tag, $offset, $limit, $since_id, $max_id) diff --git a/classes/Profile.php b/classes/Profile.php index d84d5da290..209e5ef84a 100644 --- a/classes/Profile.php +++ b/classes/Profile.php @@ -198,22 +198,20 @@ class Profile extends Memcached_DataObject function getTaggedNotices($tag, $offset=0, $limit=NOTICES_PER_PAGE, $since_id=0, $max_id=0) { - $ids = Notice::stream(array($this, '_streamTaggedDirect'), - array($tag), - 'profile:notice_ids_tagged:' . $this->id . ':' . $tag, - $offset, $limit, $since_id, $max_id); - return Notice::getStreamByIds($ids); + $stream = new NoticeStream(array($this, '_streamTaggedDirect'), + array($tag), + 'profile:notice_ids_tagged:'.$this->id.':'.$tag); + + return $stream->getNotices($offset, $limit, $since_id, $max_id); } function getNotices($offset=0, $limit=NOTICES_PER_PAGE, $since_id=0, $max_id=0) { - // XXX: I'm not sure this is going to be any faster. It probably isn't. - $ids = Notice::stream(array($this, '_streamDirect'), - array(), - 'profile:notice_ids:' . $this->id, - $offset, $limit, $since_id, $max_id); + $stream = new NoticeStream(array($this, '_streamDirect'), + array(), + 'profile:notice_ids:' . $this->id); - return Notice::getStreamByIds($ids); + return $stream->getNotices($offset, $limit, $since_id, $max_id); } function _streamTaggedDirect($tag, $offset, $limit, $since_id, $max_id) diff --git a/classes/Reply.php b/classes/Reply.php index 371c16cf48..d5341b9a05 100644 --- a/classes/Reply.php +++ b/classes/Reply.php @@ -38,11 +38,11 @@ class Reply extends Memcached_DataObject function stream($user_id, $offset=0, $limit=NOTICES_PER_PAGE, $since_id=0, $max_id=0) { - $ids = Notice::stream(array('Reply', '_streamDirect'), - array($user_id), - 'reply:stream:' . $user_id, - $offset, $limit, $since_id, $max_id); - return $ids; + $stream = new NoticeStream(array('Reply', '_streamDirect'), + array($user_id), + 'reply:stream:' . $user_id); + + return $stream->getNotices($offset, $limit, $since_id, $max_id); } function _streamDirect($user_id, $offset=0, $limit=NOTICES_PER_PAGE, $since_id=0, $max_id=0) diff --git a/classes/User.php b/classes/User.php index 31b132d0f3..4bd7b039df 100644 --- a/classes/User.php +++ b/classes/User.php @@ -448,8 +448,7 @@ class User extends Memcached_DataObject function getReplies($offset=0, $limit=NOTICES_PER_PAGE, $since_id=0, $before_id=0) { - $ids = Reply::stream($this->id, $offset, $limit, $since_id, $before_id); - return Notice::getStreamByIds($ids); + return Reply::stream($this->id, $offset, $limit, $since_id, $before_id); } function getTaggedNotices($tag, $offset=0, $limit=NOTICES_PER_PAGE, $since_id=0, $before_id=0) { @@ -465,8 +464,7 @@ class User extends Memcached_DataObject function favoriteNotices($own=false, $offset=0, $limit=NOTICES_PER_PAGE, $since_id=0, $max_id=0) { - $ids = Fave::stream($this->id, $offset, $limit, $own, $since_id, $max_id); - return Notice::getStreamByIds($ids); + return Fave::stream($this->id, $offset, $limit, $own, $since_id, $max_id); } function noticesWithFriends($offset=0, $limit=NOTICES_PER_PAGE, $since_id=0, $before_id=0) @@ -769,12 +767,11 @@ class User extends Memcached_DataObject function repeatedByMe($offset=0, $limit=20, $since_id=null, $max_id=null) { - $ids = Notice::stream(array($this, '_repeatedByMeDirect'), - array(), - 'user:repeated_by_me:'.$this->id, - $offset, $limit, $since_id, $max_id, null); + $stream = new NoticeStream(array($this, '_repeatedByMeDirect'), + array(), + 'user:repeated_by_me:'.$this->id); - return Notice::getStreamByIds($ids); + return $stream->getNotices($offset, $limit, $since_id, $max_id); } function _repeatedByMeDirect($offset, $limit, $since_id, $max_id) @@ -812,12 +809,11 @@ class User extends Memcached_DataObject function repeatsOfMe($offset=0, $limit=20, $since_id=null, $max_id=null) { - $ids = Notice::stream(array($this, '_repeatsOfMeDirect'), - array(), - 'user:repeats_of_me:'.$this->id, - $offset, $limit, $since_id, $max_id); + $stream = new NoticeStream(array($this, '_repeatsOfMeDirect'), + array(), + 'user:repeats_of_me:'.$this->id); - return Notice::getStreamByIds($ids); + return $stream->getNotices($offset, $limit, $since_id, $max_id); } function _repeatsOfMeDirect($offset, $limit, $since_id, $max_id) diff --git a/classes/User_group.php b/classes/User_group.php index 707acbd13c..4d6dcfab68 100644 --- a/classes/User_group.php +++ b/classes/User_group.php @@ -87,12 +87,11 @@ class User_group extends Memcached_DataObject function getNotices($offset, $limit, $since_id=null, $max_id=null) { - $ids = Notice::stream(array($this, '_streamDirect'), - array(), - 'user_group:notice_ids:' . $this->id, - $offset, $limit, $since_id, $max_id); + $stream = new NoticeStream(array($this, '_streamDirect'), + array(), + 'user_group:notice_ids:' . $this->id); - return Notice::getStreamByIds($ids); + return $stream->getNotices($offset, $limit, $since_id, $max_id); } function _streamDirect($offset, $limit, $since_id, $max_id) diff --git a/lib/noticestream.php b/lib/noticestream.php new file mode 100644 index 0000000000..2b6e10f7b9 --- /dev/null +++ b/lib/noticestream.php @@ -0,0 +1,189 @@ +. + * + * @category Stream + * @package StatusNet + * @author Evan Prodromou + * @copyright 2011 StatusNet, Inc. + * @license http://www.fsf.org/licensing/licenses/agpl-3.0.html AGPL 3.0 + * @link http://status.net/ + */ + +if (!defined('STATUSNET')) { + // This check helps protect against security problems; + // your code file can't be executed directly from the web. + exit(1); +} + +/** + * Class for notice streams + * + * @category Stream + * @package StatusNet + * @author Evan Prodromou + * @copyright 2011 StatusNet, Inc. + * @license http://www.fsf.org/licensing/licenses/agpl-3.0.html AGPL 3.0 + * @link http://status.net/ + */ + +class NoticeStream +{ + public $generator = null; + public $args = null; + public $cachekey = null; + + function __construct($generator, $args, $cachekey) + { + $this->generator = $generator; + $this->args = $args; + $this->cachekey = $cachekey; + } + + function getNotices($offset=0, $limit=20, $sinceId=0, $maxId=0) + { + $ids = $this->getNoticeIds($offset, $limit, $sinceId, $maxId); + + $notices = $this->getStreamByIds($ids); + + return $notices; + } + + function getNoticeIds($offset=0, $limit=20, $sinceId=0, $maxId=0) + { + $cache = Cache::instance(); + + // We cache NOTICE_CACHE_WINDOW elements at the tip of the stream. + // If the cache won't be hit, just generate directly. + + if (empty($cache) || + $sinceId != 0 || $maxId != 0 || + is_null($limit) || + ($offset + $limit) > NOTICE_CACHE_WINDOW) { + return $this->generate($offset, $limit, $sinceId, $maxId); + } + + // Check the cache to see if we have the stream. + + $idkey = Cache::key($this->cachekey); + + $idstr = $cache->get($idkey); + + if ($idstr !== false) { + // Cache hit! Woohoo! + $window = explode(',', $idstr); + $ids = array_slice($window, $offset, $limit); + return $ids; + } + + // Check the cache to see if we have a "last-known-good" version. + // The actual cache gets blown away when new notices are added, but + // the "last" value holds a lot of info. We might need to only generate + // a few at the "tip", which can bound our queries and save lots + // of time. + + $laststr = $cache->get($idkey.';last'); + + if ($laststr !== false) { + $window = explode(',', $laststr); + $last_id = $window[0]; + $new_ids = $this->generate(0, NOTICE_CACHE_WINDOW, $last_id, 0); + + $new_window = array_merge($new_ids, $window); + + $new_windowstr = implode(',', $new_window); + + $result = $cache->set($idkey, $new_windowstr); + $result = $cache->set($idkey . ';last', $new_windowstr); + + $ids = array_slice($new_window, $offset, $limit); + + return $ids; + } + + // No cache hits :( Generate directly and stick the results + // into the cache. Note we generate the full cache window. + + $window = $this->generate(0, NOTICE_CACHE_WINDOW, 0, 0); + + $windowstr = implode(',', $window); + + $result = $cache->set($idkey, $windowstr); + $result = $cache->set($idkey . ';last', $windowstr); + + // Return just the slice that was requested + + $ids = array_slice($window, $offset, $limit); + + return $ids; + } + + function getStreamByIds($ids) + { + $cache = Cache::instance(); + + if (!empty($cache)) { + $notices = array(); + foreach ($ids as $id) { + $n = Notice::staticGet('id', $id); + if (!empty($n)) { + $notices[] = $n; + } + } + return new ArrayWrapper($notices); + } else { + $notice = new Notice(); + if (empty($ids)) { + //if no IDs requested, just return the notice object + return $notice; + } + $notice->whereAdd('id in (' . implode(', ', $ids) . ')'); + + $notice->find(); + + $temp = array(); + + while ($notice->fetch()) { + $temp[$notice->id] = clone($notice); + } + + $wrapped = array(); + + foreach ($ids as $id) { + if (array_key_exists($id, $temp)) { + $wrapped[] = $temp[$id]; + } + } + + return new ArrayWrapper($wrapped); + } + } + + function generate($offset, $limit, $sinceId, $maxId) + { + $args = array_merge($this->args, array($offset, + $limit, + $sinceId, + $maxId)); + + return call_user_func_array($this->generator, $args); + } +}