OStatus refactoring to clean up profile vs feed and fix up subscription issues.

PuSH subscription maintenance broken back out to FeedSub, letting Ostatus_profile deal with the profile level (user or group, with unique id URI)
This commit is contained in:
Brion Vibber
2010-02-18 21:22:21 +00:00
parent 2a97901f70
commit 0dac13d197
8 changed files with 749 additions and 860 deletions

View File

@@ -18,62 +18,24 @@
*/
/**
* @package FeedSubPlugin
* @package OStatusPlugin
* @maintainer Brion Vibber <brion@status.net>
*/
/*
PuSH subscription flow:
$profile->subscribe()
generate random verification token
save to verify_token
sends a sub request to the hub...
main/push/callback
hub sends confirmation back to us via GET
We verify the request, then echo back the challenge.
On our end, we save the time we subscribed and the lease expiration
main/push/callback
hub sends us updates via POST
*/
class FeedDBException extends FeedSubException
{
public $obj;
function __construct($obj)
{
parent::__construct('Database insert failure');
$this->obj = $obj;
}
}
class Ostatus_profile extends Memcached_DataObject
{
public $__table = 'ostatus_profile';
public $id;
public $uri;
public $profile_id;
public $group_id;
public $feeduri;
public $homeuri;
// PuSH subscription data
public $huburi;
public $secret;
public $verify_token;
public $sub_state; // subscribe, active, unsubscribe
public $sub_start;
public $sub_end;
public $salmonuri;
public $created;
public $lastupdate;
public $modified;
public /*static*/ function staticGet($k, $v=null)
{
@@ -91,56 +53,30 @@ class Ostatus_profile extends Memcached_DataObject
function table()
{
return array('id' => DB_DATAOBJECT_INT + DB_DATAOBJECT_NOTNULL,
return array('uri' => DB_DATAOBJECT_STR + DB_DATAOBJECT_NOTNULL,
'profile_id' => DB_DATAOBJECT_INT,
'group_id' => DB_DATAOBJECT_INT,
'feeduri' => DB_DATAOBJECT_STR + DB_DATAOBJECT_NOTNULL,
'homeuri' => DB_DATAOBJECT_STR + DB_DATAOBJECT_NOTNULL,
'huburi' => DB_DATAOBJECT_STR,
'secret' => DB_DATAOBJECT_STR,
'verify_token' => DB_DATAOBJECT_STR,
'sub_state' => DB_DATAOBJECT_STR,
'sub_start' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME,
'sub_end' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME,
'salmonuri' => DB_DATAOBJECT_STR,
'created' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME + DB_DATAOBJECT_NOTNULL,
'lastupdate' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME + DB_DATAOBJECT_NOTNULL);
'modified' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME + DB_DATAOBJECT_NOTNULL);
}
static function schemaDef()
{
return array(new ColumnDef('id', 'integer',
/*size*/ null,
/*nullable*/ false,
/*key*/ 'PRI',
/*default*/ '0',
/*extra*/ null,
/*auto_increment*/ true),
return array(new ColumnDef('uri', 'varchar',
255, false, 'PRI'),
new ColumnDef('profile_id', 'integer',
null, true, 'UNI'),
new ColumnDef('group_id', 'integer',
null, true, 'UNI'),
new ColumnDef('feeduri', 'varchar',
255, false, 'UNI'),
new ColumnDef('homeuri', 'varchar',
255, false),
new ColumnDef('huburi', 'text',
null, true),
new ColumnDef('verify_token', 'varchar',
32, true),
new ColumnDef('secret', 'varchar',
64, true),
new ColumnDef('sub_state', "enum('subscribe','active','unsubscribe')",
null, true),
new ColumnDef('sub_start', 'datetime',
null, true),
new ColumnDef('sub_end', 'datetime',
null, true),
new ColumnDef('salmonuri', 'text',
null, true),
new ColumnDef('created', 'datetime',
null, false),
new ColumnDef('lastupdate', 'datetime',
new ColumnDef('modified', 'datetime',
null, false));
}
@@ -169,12 +105,12 @@ class Ostatus_profile extends Memcached_DataObject
function keyTypes()
{
return array('id' => 'K', 'profile_id' => 'U', 'group_id' => 'U', 'feeduri' => 'U');
return array('uri' => 'K', 'profile_id' => 'U', 'group_id' => 'U', 'feeduri' => 'U');
}
function sequenceKey()
{
return array('id', true, false);
return array(false, false, false);
}
/**
@@ -201,101 +137,6 @@ class Ostatus_profile extends Memcached_DataObject
return null;
}
/**
* @param FeedMunger $munger
* @param boolean $isGroup is this a group record?
* @return Ostatus_profile
*/
public static function ensureProfile($munger)
{
$profile = $munger->ostatusProfile();
$current = self::staticGet('feeduri', $profile->feeduri);
if ($current) {
// @fixme we should probably update info as necessary
return $current;
}
$profile->query('BEGIN');
try {
$local = $munger->profile();
if ($profile->isGroup()) {
$group = new User_group();
$group->nickname = $local->nickname . '@remote'; // @fixme
$group->fullname = $local->fullname;
$group->homepage = $local->homepage;
$group->location = $local->location;
$group->created = $local->created;
$group->insert();
if (empty($result)) {
throw new FeedDBException($group);
}
$profile->group_id = $group->id;
} else {
$result = $local->insert();
if (empty($result)) {
throw new FeedDBException($local);
}
$profile->profile_id = $local->id;
}
$profile->created = common_sql_now();
$profile->lastupdate = common_sql_now();
$result = $profile->insert();
if (empty($result)) {
throw new FeedDBException($profile);
}
$profile->query('COMMIT');
} catch (FeedDBException $e) {
common_log_db_error($e->obj, 'INSERT', __FILE__);
$profile->query('ROLLBACK');
return false;
}
$avatar = $munger->getAvatar();
if ($avatar) {
try {
$profile->updateAvatar($avatar);
} catch (Exception $e) {
common_log(LOG_ERR, "Exception setting OStatus avatar: " .
$e->getMessage());
}
}
return $profile;
}
/**
* Download and update given avatar image
* @param string $url
* @throws Exception in various failure cases
*/
public function updateAvatar($url)
{
// @fixme this should be better encapsulated
// ripped from oauthstore.php (for old OMB client)
$temp_filename = tempnam(sys_get_temp_dir(), 'listener_avatar');
copy($url, $temp_filename);
// @fixme should we be using different ids?
$imagefile = new ImageFile($this->id, $temp_filename);
$filename = Avatar::filename($this->id,
image_type_to_extension($imagefile->type),
null,
common_timestamp());
rename($temp_filename, Avatar::path($filename));
if ($this->isGroup()) {
$group = $this->localGroup();
$group->setOriginal($filename);
} else {
$profile = $this->localProfile();
$profile->setOriginal($filename);
}
}
/**
* Returns an XML string fragment with profile information as an
* Activity Streams noun object with the given element type.
@@ -345,7 +186,7 @@ class Ostatus_profile extends Memcached_DataObject
$xs->element(
'id',
null,
$this->homeuri); // ?
$this->uri); // ?
$xs->element('title', null, $self->getBestName());
$xs->element(
@@ -369,6 +210,61 @@ class Ostatus_profile extends Memcached_DataObject
return (strpos($this->feeduri, '/groups/') !== false);
}
/**
* Subscribe a local user to this remote user.
* PuSH subscription will be started if necessary, and we'll
* send a Salmon notification to the remote server if available
* notifying them of the sub.
*
* @param User $user
* @return boolean success
* @throws FeedException
*/
public function subscribeLocalToRemote(User $user)
{
if ($this->isGroup()) {
throw new ServerException("Can't subscribe to a remote group");
}
if ($this->subscribe()) {
if ($user->subscribeTo($this->localProfile())) {
$this->notify($user->getProfile(), ActivityVerb::FOLLOW, $this);
return true;
}
}
return false;
}
/**
* Mark this remote profile as subscribing to the given local user,
* and send appropriate notifications to the user.
*
* This will generally be in response to a subscription notification
* from a foreign site to our local Salmon response channel.
*
* @param User $user
* @return boolean success
*/
public function subscribeRemoteToLocal(User $user)
{
if ($this->isGroup()) {
throw new ServerException("Remote groups can't subscribe to local users");
}
// @fixme use regular channels for subbing, once they accept remote profiles
$sub = new Subscription();
$sub->subscriber = $this->profile_id;
$sub->subscribed = $user->id;
$sub->created = common_sql_now(); // current time
if ($sub->insert()) {
// @fixme use subs_notify() if refactored to take profiles?
mail_subscribe_notify_profile($user, $this->localProfile());
return true;
}
return false;
}
/**
* Send a subscription request to the hub for this feed.
* The hub will later send us a confirmation POST to /main/push/callback.
@@ -378,19 +274,14 @@ class Ostatus_profile extends Memcached_DataObject
*/
public function subscribe($mode='subscribe')
{
if ($this->sub_state != '') {
throw new ServerException("Attempting to start PuSH subscription to feed in state $this->sub_state");
$feedsub = FeedSub::ensureFeed($this->feeduri);
if ($feedsub->sub_state == 'active' || $feedsub->sub_state == 'subscribe') {
return true;
} else if ($feedsub->sub_state == '' || $feedsub->sub_state == 'inactive') {
return $feedsub->subscribe();
} else if ('unsubscribe') {
throw new FeedSubException("Unsub is pending, can't subscribe...");
}
if (empty($this->huburi)) {
if (common_config('feedsub', 'nohub')) {
// Fake it! We're just testing remote feeds w/o hubs.
return true;
} else {
throw new ServerException("Attempting to start PuSH subscription for feed with no hub");
}
}
return $this->doSubscribe('subscribe');
}
/**
@@ -401,111 +292,14 @@ class Ostatus_profile extends Memcached_DataObject
* @throws ServerException if feed state is not valid
*/
public function unsubscribe() {
if ($this->sub_state != 'active') {
throw new ServerException("Attempting to end PuSH subscription to feed in state $this->sub_state");
$feedsub = FeedSub::staticGet('uri', $this->feeduri);
if ($feedsub->sub_state == 'active') {
return $feedsub->unsubscribe();
} else if ($feedsub->sub_state == '' || $feedsub->sub_state == 'inactive' || $feedsub->sub_state == 'unsubscribe') {
return true;
} else if ($feedsub->sub_state == 'subscribe') {
throw new FeedSubException("Feed is awaiting subscription, can't unsub...");
}
if (empty($this->huburi)) {
if (common_config('feedsub', 'nohub')) {
// Fake it! We're just testing remote feeds w/o hubs.
return true;
} else {
throw new ServerException("Attempting to end PuSH subscription for feed with no hub");
}
}
return $this->doSubscribe('unsubscribe');
}
protected function doSubscribe($mode)
{
$orig = clone($this);
$this->verify_token = common_good_rand(16);
if ($mode == 'subscribe') {
$this->secret = common_good_rand(32);
}
$this->sub_state = $mode;
$this->update($orig);
unset($orig);
try {
$callback = common_local_url('pushcallback', array('feed' => $this->id));
$headers = array('Content-Type: application/x-www-form-urlencoded');
$post = array('hub.mode' => $mode,
'hub.callback' => $callback,
'hub.verify' => 'async',
'hub.verify_token' => $this->verify_token,
'hub.secret' => $this->secret,
//'hub.lease_seconds' => 0,
'hub.topic' => $this->feeduri);
$client = new HTTPClient();
$response = $client->post($this->huburi, $headers, $post);
$status = $response->getStatus();
if ($status == 202) {
common_log(LOG_INFO, __METHOD__ . ': sub req ok, awaiting verification callback');
return true;
} else if ($status == 204) {
common_log(LOG_INFO, __METHOD__ . ': sub req ok and verified');
return true;
} else if ($status >= 200 && $status < 300) {
common_log(LOG_ERR, __METHOD__ . ": sub req returned unexpected HTTP $status: " . $response->getBody());
return false;
} else {
common_log(LOG_ERR, __METHOD__ . ": sub req failed with HTTP $status: " . $response->getBody());
return false;
}
} catch (Exception $e) {
// wtf!
common_log(LOG_ERR, __METHOD__ . ": error \"{$e->getMessage()}\" hitting hub $this->huburi subscribing to $this->feeduri");
$orig = clone($this);
$this->verify_token = null;
$this->sub_state = null;
$this->update($orig);
unset($orig);
return false;
}
}
/**
* Save PuSH subscription confirmation.
* Sets approximate lease start and end times and finalizes state.
*
* @param int $lease_seconds provided hub.lease_seconds parameter, if given
*/
public function confirmSubscribe($lease_seconds=0)
{
$original = clone($this);
$this->sub_state = 'active';
$this->sub_start = common_sql_date(time());
if ($lease_seconds > 0) {
$this->sub_end = common_sql_date(time() + $lease_seconds);
} else {
$this->sub_end = null;
}
$this->lastupdate = common_sql_now();
return $this->update($original);
}
/**
* Save PuSH unsubscription confirmation.
* Wipes active PuSH sub info and resets state.
*/
public function confirmUnsubscribe()
{
$original = clone($this);
// @fixme these should all be null, but DB_DataObject doesn't save null values...?????
$this->verify_token = '';
$this->secret = '';
$this->sub_state = '';
$this->sub_start = '';
$this->sub_end = '';
$this->lastupdate = common_sql_now();
return $this->update($original);
}
/**
@@ -543,10 +337,7 @@ class Ostatus_profile extends Memcached_DataObject
$entry->elementEnd('entry');
$feed = $this->atomFeed($actor);
#$feed->initFeed();
$feed->addEntry($entry);
#$feed->renderEntries();
#$feed->endFeed();
$xml = $feed->getString();
common_log(LOG_INFO, "Posting to Salmon endpoint $salmon: $xml");
@@ -600,36 +391,10 @@ class Ostatus_profile extends Memcached_DataObject
* Currently assumes that all items in the feed are new,
* coming from a PuSH hub.
*
* @param string $post source of Atom or RSS feed
* @param string $hmac X-Hub-Signature header, if present
* @param DOMDocument $feed
*/
public function postUpdates($post, $hmac)
public function processFeed($feed)
{
common_log(LOG_INFO, __METHOD__ . ": packet for \"$this->feeduri\"! $hmac $post");
if ($this->sub_state != 'active') {
common_log(LOG_ERR, __METHOD__ . ": ignoring PuSH for inactive feed $this->feeduri (in state '$this->sub_state')");
return;
}
if ($post === '') {
common_log(LOG_ERR, __METHOD__ . ": ignoring empty post");
return;
}
if (!$this->validatePushSig($post, $hmac)) {
// Per spec we silently drop input with a bad sig,
// while reporting receipt to the server.
return;
}
$feed = new DOMDocument();
if (!$feed->loadXML($post)) {
// @fixme might help to include the err message
common_log(LOG_ERR, __METHOD__ . ": ignoring invalid XML");
return;
}
$entries = $feed->getElementsByTagNameNS(Activity::ATOM, 'entry');
if ($entries->length == 0) {
common_log(LOG_ERR, __METHOD__ . ": no entries in feed update, ignoring");
@@ -642,40 +407,6 @@ class Ostatus_profile extends Memcached_DataObject
}
}
/**
* Validate the given Atom chunk and HMAC signature against our
* shared secret that was set up at subscription time.
*
* If we don't have a shared secret, there should be no signature.
* If we we do, our the calculated HMAC should match theirs.
*
* @param string $post raw XML source as POSTed to us
* @param string $hmac X-Hub-Signature HTTP header value, or empty
* @return boolean true for a match
*/
protected function validatePushSig($post, $hmac)
{
if ($this->secret) {
if (preg_match('/^sha1=([0-9a-fA-F]{40})$/', $hmac, $matches)) {
$their_hmac = strtolower($matches[1]);
$our_hmac = hash_hmac('sha1', $post, $this->secret);
if ($their_hmac === $our_hmac) {
return true;
}
common_log(LOG_ERR, __METHOD__ . ": ignoring PuSH with bad SHA-1 HMAC: got $their_hmac, expected $our_hmac");
} else {
common_log(LOG_ERR, __METHOD__ . ": ignoring PuSH with bogus HMAC '$hmac'");
}
} else {
if (empty($hmac)) {
return true;
} else {
common_log(LOG_ERR, __METHOD__ . ": ignoring PuSH with unexpected HMAC '$hmac'");
}
}
return false;
}
/**
* Process a posted entry from this feed source.
*
@@ -704,14 +435,14 @@ class Ostatus_profile extends Memcached_DataObject
{
if ($this->isGroup()) {
// @fixme validate these profiles in some way!
$oprofile = $this->ensureActorProfile($activity);
$oprofile = self::ensureActorProfile($activity);
} else {
$actorUri = $this->getActorProfileURI($activity);
if ($actorUri == $this->homeuri) {
$actorUri = self::getActorProfileURI($activity);
if ($actorUri == $this->uri) {
// @fixme check if profile info has changed and update it
} else {
// @fixme drop or reject the messages once we've got the canonical profile URI recorded sanely
common_log(LOG_INFO, "OStatus: Warning: non-group post with unexpected author: $actorUri expected $this->homeuri");
common_log(LOG_INFO, "OStatus: Warning: non-group post with unexpected author: $actorUri expected $this->uri");
//return;
}
$oprofile = $this;
@@ -787,6 +518,65 @@ class Ostatus_profile extends Memcached_DataObject
return false;
}
/**
* @param string $profile_url
* @return Ostatus_profile
* @throws FeedSubException
*/
public static function ensureProfile($profile_uri)
{
// Get the canonical feed URI and check it
$discover = new FeedDiscovery();
$feeduri = $discover->discoverFromURL($profile_uri);
$feedsub = FeedSub::ensureFeed($feeduri, $discover->feed);
$huburi = $discover->getAtomLink('hub');
$salmonuri = $discover->getAtomLink('salmon');
if (!$huburi) {
// We can only deal with folks with a PuSH hub
throw new FeedSubNoHubException();
}
// Ok this is going to be a terrible hack!
// Won't be suitable for groups, empty feeds, or getting
// info that's only available on the profile page.
$entries = $discover->feed->getElementsByTagNameNS(Activity::ATOM, 'entry');
if (!$entries || $entries->length == 0) {
throw new FeedSubException('empty feed');
}
$first = new Activity($entries->item(0), $discover->feed);
return self::ensureActorProfile($first, $feeduri);
}
/**
* Download and update given avatar image
* @param string $url
* @throws Exception in various failure cases
*/
protected function updateAvatar($url)
{
// @fixme this should be better encapsulated
// ripped from oauthstore.php (for old OMB client)
$temp_filename = tempnam(sys_get_temp_dir(), 'listener_avatar');
copy($url, $temp_filename);
// @fixme should we be using different ids?
$imagefile = new ImageFile($this->id, $temp_filename);
$filename = Avatar::filename($this->id,
image_type_to_extension($imagefile->type),
null,
common_timestamp());
rename($temp_filename, Avatar::path($filename));
if ($this->isGroup()) {
$group = $this->localGroup();
$group->setOriginal($filename);
} else {
$profile = $this->localProfile();
$profile->setOriginal($filename);
}
}
/**
* Get an appropriate avatar image source URL, if available.
*
@@ -794,7 +584,7 @@ class Ostatus_profile extends Memcached_DataObject
* @param DOMElement $feed
* @return string
*/
function getAvatar($actor, $feed)
protected static function getAvatar($actor, $feed)
{
$url = '';
$icon = '';
@@ -833,13 +623,18 @@ class Ostatus_profile extends Memcached_DataObject
}
/**
* @fixme move off of ostatus_profile or static?
* Fetch, or build if necessary, an Ostatus_profile for the actor
* in a given Activity Streams activity.
*
* @param Activity $activity
* @param string $feeduri if we already know the canonical feed URI!
* @return Ostatus_profile
*/
function ensureActorProfile($activity)
public static function ensureActorProfile($activity, $feeduri=null)
{
$profile = $this->getActorProfile($activity);
$profile = self::getActorProfile($activity);
if (!$profile) {
$profile = $this->createActorProfile($activity);
$profile = self::createActorProfile($activity, $feeduri);
}
return $profile;
}
@@ -848,10 +643,10 @@ class Ostatus_profile extends Memcached_DataObject
* @param Activity $activity
* @return mixed matching Ostatus_profile or false if none known
*/
function getActorProfile($activity)
protected static function getActorProfile($activity)
{
$homeuri = $this->getActorProfileURI($activity);
return Ostatus_profile::staticGet('homeuri', $homeuri);
$homeuri = self::getActorProfileURI($activity);
return self::staticGet('uri', $homeuri);
}
/**
@@ -859,7 +654,7 @@ class Ostatus_profile extends Memcached_DataObject
* @return string
* @throws ServerException
*/
function getActorProfileURI($activity)
protected static function getActorProfileURI($activity)
{
$opts = array('allowed_schemes' => array('http', 'https'));
$actor = $activity->actor;
@@ -873,14 +668,19 @@ class Ostatus_profile extends Memcached_DataObject
}
/**
*
* @fixme validate stuff somewhere
*/
function createActorProfile($activity)
protected static function createActorProfile($activity, $feeduri=null)
{
$actor = $activity->actor();
$homeuri = $this->getActivityProfileURI($activity);
$nickname = $this->getAuthorNick($activity);
$avatar = $this->getAvatar($actor, $feed);
$actor = $activity->actor;
$homeuri = self::getActorProfileURI($activity);
$nickname = self::getAuthorNick($activity);
$avatar = self::getAvatar($actor, $feed);
if (!$homeuri) {
common_log(LOG_DEBUG, __METHOD__ . " empty actor profile URI: " . var_export($activity, true));
throw new ServerException("No profile URI");
}
$profile = new Profile();
$profile->nickname = $nickname;
@@ -894,9 +694,7 @@ class Ostatus_profile extends Memcached_DataObject
// @todo lat/lon/location?
$ok = $profile->insert();
if ($ok) {
$this->updateAvatar($profile, $avatar);
} else {
if (!$ok) {
throw new ServerException("Can't save local profile");
}
@@ -904,11 +702,15 @@ class Ostatus_profile extends Memcached_DataObject
// or need to split out some of the feed stuff
// so we can leave it empty until later.
$oprofile = new Ostatus_profile();
$oprofile->homeuri = $homeuri;
$oprofile->uri = $homeuri;
if ($feeduri) {
$oprofile->feeduri = $feeduri;
}
$oprofile->profile_id = $profile->id;
$ok = $oprofile->insert();
if ($ok) {
$oprofile->updateAvatar($avatar);
return $oprofile;
} else {
throw new ServerException("Can't save OStatus profile");
@@ -920,13 +722,13 @@ class Ostatus_profile extends Memcached_DataObject
* @param Activity $activity
* @return string
*/
function getAuthorNick($activity)
protected static function getAuthorNick($activity)
{
// @fixme not technically part of the actor?
foreach (array($activity->entry, $activity->feed) as $source) {
$author = ActivityUtil::child($source, 'author', Activity::ATOM);
$author = ActivityUtils::child($source, 'author', Activity::ATOM);
if ($author) {
$name = ActivityUtil::child($author, 'name', Activity::ATOM);
$name = ActivityUtils::child($author, 'name', Activity::ATOM);
if ($name) {
return trim($name->textContent);
}