New NoticeStream class to reify streams of notices

We've been muddling through with 6- or 8-argument functions for managing streams. I'd
like to start thinking of streams as their own thing, and give them some more value.

So, the new NoticeStream class takes over the Notice::stream() function and Notice::getStreamByIds().

There's probably some fine-tuning to do on the object interface.
This commit is contained in:
Evan Prodromou 2011-03-23 11:29:55 -04:00
parent 14a6ab2b04
commit 0b35ce7c37
9 changed files with 243 additions and 159 deletions

View File

@ -79,12 +79,12 @@ class Fave extends Memcached_DataObject
function stream($user_id, $offset=0, $limit=NOTICES_PER_PAGE, $own=false, $since_id=0, $max_id=0) function stream($user_id, $offset=0, $limit=NOTICES_PER_PAGE, $own=false, $since_id=0, $max_id=0)
{ {
$ids = Notice::stream(array('Fave', '_streamDirect'), $stream = new NoticeStream(array('Fave', '_streamDirect'),
array($user_id, $own), array($user_id, $own),
($own) ? 'fave:ids_by_user_own:'.$user_id : ($own) ? 'fave:ids_by_user_own:'.$user_id :
'fave:ids_by_user:'.$user_id, 'fave:ids_by_user:'.$user_id);
$offset, $limit, $since_id, $max_id);
return $ids; return $stream->getNotices($offset, $limit, $since_id, $max_id);
} }
/** /**

View File

@ -449,12 +449,11 @@ class File extends Memcached_DataObject
function stream($offset=0, $limit=NOTICES_PER_PAGE, $since_id=0, $max_id=0) function stream($offset=0, $limit=NOTICES_PER_PAGE, $since_id=0, $max_id=0)
{ {
$ids = Notice::stream(array($this, '_streamDirect'), $stream = new NoticeStream(array($this, '_streamDirect'),
array(), array(),
'file:notice-ids:'.$this->url, 'file:notice-ids:'.$this->url);
$offset, $limit, $since_id, $max_id);
return Notice::getStreamByIds($ids); return $stream->getNotices($offset, $limit, $since_id, $max_id);
} }
/** /**

View File

@ -629,54 +629,14 @@ class Notice extends Memcached_DataObject
return $att; return $att;
} }
function getStreamByIds($ids)
{
$cache = Cache::instance();
if (!empty($cache)) {
$notices = array();
foreach ($ids as $id) {
$n = Notice::staticGet('id', $id);
if (!empty($n)) {
$notices[] = $n;
}
}
return new ArrayWrapper($notices);
} else {
$notice = new Notice();
if (empty($ids)) {
//if no IDs requested, just return the notice object
return $notice;
}
$notice->whereAdd('id in (' . implode(', ', $ids) . ')');
$notice->find();
$temp = array();
while ($notice->fetch()) {
$temp[$notice->id] = clone($notice);
}
$wrapped = array();
foreach ($ids as $id) {
if (array_key_exists($id, $temp)) {
$wrapped[] = $temp[$id];
}
}
return new ArrayWrapper($wrapped);
}
}
function publicStream($offset=0, $limit=20, $since_id=0, $max_id=0) function publicStream($offset=0, $limit=20, $since_id=0, $max_id=0)
{ {
$ids = Notice::stream(array('Notice', '_publicStreamDirect'), $stream = new NoticeStream(array('Notice', '_publicStreamDirect'),
array(), array(),
'public', 'public');
$offset, $limit, $since_id, $max_id);
return Notice::getStreamByIds($ids); return $stream->getNotices($offset, $limit, $since_id, $max_id);
} }
function _publicStreamDirect($offset=0, $limit=20, $since_id=0, $max_id=0) function _publicStreamDirect($offset=0, $limit=20, $since_id=0, $max_id=0)
@ -719,12 +679,11 @@ class Notice extends Memcached_DataObject
function conversationStream($id, $offset=0, $limit=20, $since_id=0, $max_id=0) function conversationStream($id, $offset=0, $limit=20, $since_id=0, $max_id=0)
{ {
$ids = Notice::stream(array('Notice', '_conversationStreamDirect'), $stream = new NoticeStream(array('Notice', '_conversationStreamDirect'),
array($id), array($id),
'notice:conversation_ids:'.$id, 'notice:conversation_ids:'.$id);
$offset, $limit, $since_id, $max_id);
return Notice::getStreamByIds($ids); return $stream->getNotices($offset, $limit, $since_id, $max_id);
} }
function _conversationStreamDirect($id, $offset=0, $limit=20, $since_id=0, $max_id=0) function _conversationStreamDirect($id, $offset=0, $limit=20, $since_id=0, $max_id=0)
@ -1540,61 +1499,6 @@ class Notice extends Memcached_DataObject
} }
} }
function stream($fn, $args, $cachekey, $offset=0, $limit=20, $since_id=0, $max_id=0)
{
$cache = Cache::instance();
if (empty($cache) ||
$since_id != 0 || $max_id != 0 ||
is_null($limit) ||
($offset + $limit) > NOTICE_CACHE_WINDOW) {
return call_user_func_array($fn, array_merge($args, array($offset, $limit, $since_id,
$max_id)));
}
$idkey = Cache::key($cachekey);
$idstr = $cache->get($idkey);
if ($idstr !== false) {
// Cache hit! Woohoo!
$window = explode(',', $idstr);
$ids = array_slice($window, $offset, $limit);
return $ids;
}
$laststr = $cache->get($idkey.';last');
if ($laststr !== false) {
$window = explode(',', $laststr);
$last_id = $window[0];
$new_ids = call_user_func_array($fn, array_merge($args, array(0, NOTICE_CACHE_WINDOW,
$last_id, 0, null)));
$new_window = array_merge($new_ids, $window);
$new_windowstr = implode(',', $new_window);
$result = $cache->set($idkey, $new_windowstr);
$result = $cache->set($idkey . ';last', $new_windowstr);
$ids = array_slice($new_window, $offset, $limit);
return $ids;
}
$window = call_user_func_array($fn, array_merge($args, array(0, NOTICE_CACHE_WINDOW,
0, 0, null)));
$windowstr = implode(',', $window);
$result = $cache->set($idkey, $windowstr);
$result = $cache->set($idkey . ';last', $windowstr);
$ids = array_slice($window, $offset, $limit);
return $ids;
}
/** /**
* Determine which notice, if any, a new notice is in reply to. * Determine which notice, if any, a new notice is in reply to.

View File

@ -36,14 +36,13 @@ class Notice_tag extends Memcached_DataObject
/* the code above is auto generated do not remove the tag below */ /* the code above is auto generated do not remove the tag below */
###END_AUTOCODE ###END_AUTOCODE
static function getStream($tag, $offset=0, $limit=20) { static function getStream($tag, $offset=0, $limit=20, $sinceId=0, $maxId=0)
{
$stream = new NoticeStream(array('Notice_tag', '_streamDirect'),
array($tag),
'notice_tag:notice_ids:' . Cache::keyize($tag));
$ids = Notice::stream(array('Notice_tag', '_streamDirect'), return $stream->getNotices($offset, $limit, $sinceId, $maxId);
array($tag),
'notice_tag:notice_ids:' . Cache::keyize($tag),
$offset, $limit);
return Notice::getStreamByIds($ids);
} }
function _streamDirect($tag, $offset, $limit, $since_id, $max_id) function _streamDirect($tag, $offset, $limit, $since_id, $max_id)

View File

@ -198,22 +198,20 @@ class Profile extends Memcached_DataObject
function getTaggedNotices($tag, $offset=0, $limit=NOTICES_PER_PAGE, $since_id=0, $max_id=0) function getTaggedNotices($tag, $offset=0, $limit=NOTICES_PER_PAGE, $since_id=0, $max_id=0)
{ {
$ids = Notice::stream(array($this, '_streamTaggedDirect'), $stream = new NoticeStream(array($this, '_streamTaggedDirect'),
array($tag), array($tag),
'profile:notice_ids_tagged:' . $this->id . ':' . $tag, 'profile:notice_ids_tagged:'.$this->id.':'.$tag);
$offset, $limit, $since_id, $max_id);
return Notice::getStreamByIds($ids); return $stream->getNotices($offset, $limit, $since_id, $max_id);
} }
function getNotices($offset=0, $limit=NOTICES_PER_PAGE, $since_id=0, $max_id=0) function getNotices($offset=0, $limit=NOTICES_PER_PAGE, $since_id=0, $max_id=0)
{ {
// XXX: I'm not sure this is going to be any faster. It probably isn't. $stream = new NoticeStream(array($this, '_streamDirect'),
$ids = Notice::stream(array($this, '_streamDirect'), array(),
array(), 'profile:notice_ids:' . $this->id);
'profile:notice_ids:' . $this->id,
$offset, $limit, $since_id, $max_id);
return Notice::getStreamByIds($ids); return $stream->getNotices($offset, $limit, $since_id, $max_id);
} }
function _streamTaggedDirect($tag, $offset, $limit, $since_id, $max_id) function _streamTaggedDirect($tag, $offset, $limit, $since_id, $max_id)

View File

@ -38,11 +38,11 @@ class Reply extends Memcached_DataObject
function stream($user_id, $offset=0, $limit=NOTICES_PER_PAGE, $since_id=0, $max_id=0) function stream($user_id, $offset=0, $limit=NOTICES_PER_PAGE, $since_id=0, $max_id=0)
{ {
$ids = Notice::stream(array('Reply', '_streamDirect'), $stream = new NoticeStream(array('Reply', '_streamDirect'),
array($user_id), array($user_id),
'reply:stream:' . $user_id, 'reply:stream:' . $user_id);
$offset, $limit, $since_id, $max_id);
return $ids; return $stream->getNotices($offset, $limit, $since_id, $max_id);
} }
function _streamDirect($user_id, $offset=0, $limit=NOTICES_PER_PAGE, $since_id=0, $max_id=0) function _streamDirect($user_id, $offset=0, $limit=NOTICES_PER_PAGE, $since_id=0, $max_id=0)

View File

@ -448,8 +448,7 @@ class User extends Memcached_DataObject
function getReplies($offset=0, $limit=NOTICES_PER_PAGE, $since_id=0, $before_id=0) function getReplies($offset=0, $limit=NOTICES_PER_PAGE, $since_id=0, $before_id=0)
{ {
$ids = Reply::stream($this->id, $offset, $limit, $since_id, $before_id); return Reply::stream($this->id, $offset, $limit, $since_id, $before_id);
return Notice::getStreamByIds($ids);
} }
function getTaggedNotices($tag, $offset=0, $limit=NOTICES_PER_PAGE, $since_id=0, $before_id=0) { function getTaggedNotices($tag, $offset=0, $limit=NOTICES_PER_PAGE, $since_id=0, $before_id=0) {
@ -465,8 +464,7 @@ class User extends Memcached_DataObject
function favoriteNotices($own=false, $offset=0, $limit=NOTICES_PER_PAGE, $since_id=0, $max_id=0) function favoriteNotices($own=false, $offset=0, $limit=NOTICES_PER_PAGE, $since_id=0, $max_id=0)
{ {
$ids = Fave::stream($this->id, $offset, $limit, $own, $since_id, $max_id); return Fave::stream($this->id, $offset, $limit, $own, $since_id, $max_id);
return Notice::getStreamByIds($ids);
} }
function noticesWithFriends($offset=0, $limit=NOTICES_PER_PAGE, $since_id=0, $before_id=0) function noticesWithFriends($offset=0, $limit=NOTICES_PER_PAGE, $since_id=0, $before_id=0)
@ -769,12 +767,11 @@ class User extends Memcached_DataObject
function repeatedByMe($offset=0, $limit=20, $since_id=null, $max_id=null) function repeatedByMe($offset=0, $limit=20, $since_id=null, $max_id=null)
{ {
$ids = Notice::stream(array($this, '_repeatedByMeDirect'), $stream = new NoticeStream(array($this, '_repeatedByMeDirect'),
array(), array(),
'user:repeated_by_me:'.$this->id, 'user:repeated_by_me:'.$this->id);
$offset, $limit, $since_id, $max_id, null);
return Notice::getStreamByIds($ids); return $stream->getNotices($offset, $limit, $since_id, $max_id);
} }
function _repeatedByMeDirect($offset, $limit, $since_id, $max_id) function _repeatedByMeDirect($offset, $limit, $since_id, $max_id)
@ -812,12 +809,11 @@ class User extends Memcached_DataObject
function repeatsOfMe($offset=0, $limit=20, $since_id=null, $max_id=null) function repeatsOfMe($offset=0, $limit=20, $since_id=null, $max_id=null)
{ {
$ids = Notice::stream(array($this, '_repeatsOfMeDirect'), $stream = new NoticeStream(array($this, '_repeatsOfMeDirect'),
array(), array(),
'user:repeats_of_me:'.$this->id, 'user:repeats_of_me:'.$this->id);
$offset, $limit, $since_id, $max_id);
return Notice::getStreamByIds($ids); return $stream->getNotices($offset, $limit, $since_id, $max_id);
} }
function _repeatsOfMeDirect($offset, $limit, $since_id, $max_id) function _repeatsOfMeDirect($offset, $limit, $since_id, $max_id)

View File

@ -87,12 +87,11 @@ class User_group extends Memcached_DataObject
function getNotices($offset, $limit, $since_id=null, $max_id=null) function getNotices($offset, $limit, $since_id=null, $max_id=null)
{ {
$ids = Notice::stream(array($this, '_streamDirect'), $stream = new NoticeStream(array($this, '_streamDirect'),
array(), array(),
'user_group:notice_ids:' . $this->id, 'user_group:notice_ids:' . $this->id);
$offset, $limit, $since_id, $max_id);
return Notice::getStreamByIds($ids); return $stream->getNotices($offset, $limit, $since_id, $max_id);
} }
function _streamDirect($offset, $limit, $since_id, $max_id) function _streamDirect($offset, $limit, $since_id, $max_id)

189
lib/noticestream.php Normal file
View File

@ -0,0 +1,189 @@
<?php
/**
* StatusNet - the distributed open-source microblogging tool
* Copyright (C) 2011, StatusNet, Inc.
*
* A stream of notices
*
* PHP version 5
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* @category Stream
* @package StatusNet
* @author Evan Prodromou <evan@status.net>
* @copyright 2011 StatusNet, Inc.
* @license http://www.fsf.org/licensing/licenses/agpl-3.0.html AGPL 3.0
* @link http://status.net/
*/
if (!defined('STATUSNET')) {
// This check helps protect against security problems;
// your code file can't be executed directly from the web.
exit(1);
}
/**
* Class for notice streams
*
* @category Stream
* @package StatusNet
* @author Evan Prodromou <evan@status.net>
* @copyright 2011 StatusNet, Inc.
* @license http://www.fsf.org/licensing/licenses/agpl-3.0.html AGPL 3.0
* @link http://status.net/
*/
class NoticeStream
{
public $generator = null;
public $args = null;
public $cachekey = null;
function __construct($generator, $args, $cachekey)
{
$this->generator = $generator;
$this->args = $args;
$this->cachekey = $cachekey;
}
function getNotices($offset=0, $limit=20, $sinceId=0, $maxId=0)
{
$ids = $this->getNoticeIds($offset, $limit, $sinceId, $maxId);
$notices = $this->getStreamByIds($ids);
return $notices;
}
function getNoticeIds($offset=0, $limit=20, $sinceId=0, $maxId=0)
{
$cache = Cache::instance();
// We cache NOTICE_CACHE_WINDOW elements at the tip of the stream.
// If the cache won't be hit, just generate directly.
if (empty($cache) ||
$sinceId != 0 || $maxId != 0 ||
is_null($limit) ||
($offset + $limit) > NOTICE_CACHE_WINDOW) {
return $this->generate($offset, $limit, $sinceId, $maxId);
}
// Check the cache to see if we have the stream.
$idkey = Cache::key($this->cachekey);
$idstr = $cache->get($idkey);
if ($idstr !== false) {
// Cache hit! Woohoo!
$window = explode(',', $idstr);
$ids = array_slice($window, $offset, $limit);
return $ids;
}
// Check the cache to see if we have a "last-known-good" version.
// The actual cache gets blown away when new notices are added, but
// the "last" value holds a lot of info. We might need to only generate
// a few at the "tip", which can bound our queries and save lots
// of time.
$laststr = $cache->get($idkey.';last');
if ($laststr !== false) {
$window = explode(',', $laststr);
$last_id = $window[0];
$new_ids = $this->generate(0, NOTICE_CACHE_WINDOW, $last_id, 0);
$new_window = array_merge($new_ids, $window);
$new_windowstr = implode(',', $new_window);
$result = $cache->set($idkey, $new_windowstr);
$result = $cache->set($idkey . ';last', $new_windowstr);
$ids = array_slice($new_window, $offset, $limit);
return $ids;
}
// No cache hits :( Generate directly and stick the results
// into the cache. Note we generate the full cache window.
$window = $this->generate(0, NOTICE_CACHE_WINDOW, 0, 0);
$windowstr = implode(',', $window);
$result = $cache->set($idkey, $windowstr);
$result = $cache->set($idkey . ';last', $windowstr);
// Return just the slice that was requested
$ids = array_slice($window, $offset, $limit);
return $ids;
}
function getStreamByIds($ids)
{
$cache = Cache::instance();
if (!empty($cache)) {
$notices = array();
foreach ($ids as $id) {
$n = Notice::staticGet('id', $id);
if (!empty($n)) {
$notices[] = $n;
}
}
return new ArrayWrapper($notices);
} else {
$notice = new Notice();
if (empty($ids)) {
//if no IDs requested, just return the notice object
return $notice;
}
$notice->whereAdd('id in (' . implode(', ', $ids) . ')');
$notice->find();
$temp = array();
while ($notice->fetch()) {
$temp[$notice->id] = clone($notice);
}
$wrapped = array();
foreach ($ids as $id) {
if (array_key_exists($id, $temp)) {
$wrapped[] = $temp[$id];
}
}
return new ArrayWrapper($wrapped);
}
}
function generate($offset, $limit, $sinceId, $maxId)
{
$args = array_merge($this->args, array($offset,
$limit,
$sinceId,
$maxId));
return call_user_func_array($this->generator, $args);
}
}