First steps on converting FeedSub into the pub/sub basis for OStatus communications:

* renamed FeedSub plugin to OStatus
* now setting avatar on subscriptions
* general fixes for subscription
* integrated PuSH hub to handle only user timelines on canonical ID url; sends updates directly
* set $config['feedsub']['nohub'] = true to test w/ foreign feeds that don't have hubs (won't actually receive updates though)
* a few bits of code documentation
* HMAC support for verified distributions (safest if sub setup is on HTTPS)

And a couple core changes:
* minimizing HTML output for exceptions in API requests to aid in debugging
* fix for rel=self link in apitimelineuser when id given

This does not not yet include any of the individual subscription management (Salmon notifications for sub/unsub, etc) nor a nice UI for user subscriptions.
Needs some further cleanup to treat posts as status updates instead of link references.
This commit is contained in:
Brion Vibber 2010-02-08 11:06:03 -08:00
parent 5fdcd88176
commit dc09453a77
63 changed files with 866 additions and 44 deletions

View File

@ -145,10 +145,11 @@ class ApiTimelineUserAction extends ApiBareAuthAction
); );
break; break;
case 'atom': case 'atom':
if (isset($apidata['api_arg'])) { $id = $this->arg('id');
if ($id) {
$selfuri = common_root_url() . $selfuri = common_root_url() .
'api/statuses/user_timeline/' . 'api/statuses/user_timeline/' .
$apidata['api_arg'] . '.atom'; rawurlencode($id) . '.atom';
} else { } else {
$selfuri = common_root_url() . $selfuri = common_root_url() .
'api/statuses/user_timeline.atom'; 'api/statuses/user_timeline.atom';

View File

@ -77,6 +77,7 @@ class ApiAction extends Action
function prepare($args) function prepare($args)
{ {
StatusNet::setApi(true); // reduce exception reports to aid in debugging
parent::prepare($args); parent::prepare($args);
$this->format = $this->arg('format'); $this->format = $this->arg('format');

View File

@ -56,6 +56,7 @@ class ErrorAction extends Action
$this->code = $code; $this->code = $code;
$this->message = $message; $this->message = $message;
$this->minimal = StatusNet::isApi();
// XXX: hack alert: usually we aren't going to // XXX: hack alert: usually we aren't going to
// call this page directly, but because it's // call this page directly, but because it's
@ -102,7 +103,14 @@ class ErrorAction extends Action
function showPage() function showPage()
{ {
parent::showPage(); if ($this->minimal) {
// Even more minimal -- we're in a machine API
// and don't want to flood the output.
$this->extraHeaders();
$this->showContent();
} else {
parent::showPage();
}
// We don't want to have any more output after this // We don't want to have any more output after this
exit(); exit();

View File

@ -81,12 +81,13 @@ class HTTPResponse extends HTTP_Request2_Response
} }
/** /**
* Check if the response is OK, generally a 200 status code. * Check if the response is OK, generally a 200 or other 2xx status code.
* @return bool * @return bool
*/ */
function isOk() function isOk()
{ {
return ($this->getStatus() == 200); $status = $this->getStatus();
return ($status >= 200 && $status < 300);
} }
} }

View File

@ -213,6 +213,7 @@ class MysqlSchema extends Schema
$sql .= "); "; $sql .= "); ";
common_log(LOG_INFO, $sql);
$res = $this->conn->query($sql); $res = $this->conn->query($sql);
if (PEAR::isError($res)) { if (PEAR::isError($res)) {

View File

@ -30,6 +30,7 @@ global $config, $_server, $_path;
class StatusNet class StatusNet
{ {
protected static $have_config; protected static $have_config;
protected static $is_api;
/** /**
* Configure and instantiate a plugin into the current configuration. * Configure and instantiate a plugin into the current configuration.
@ -63,7 +64,7 @@ class StatusNet
} }
} }
if (!class_exists($pluginclass)) { if (!class_exists($pluginclass)) {
throw new ServerException(500, "Plugin $name not found."); throw new ServerException("Plugin $name not found.", 500);
} }
} }
@ -147,6 +148,16 @@ class StatusNet
return self::$have_config; return self::$have_config;
} }
public function isApi()
{
return self::$is_api;
}
public function setApi($mode)
{
self::$is_api = $mode;
}
/** /**
* Build default configuration array * Build default configuration array
* @return array * @return array

View File

@ -1,14 +0,0 @@
CREATE TABLE `feedinfo` (
`id` int(11) NOT NULL auto_increment,
`profile_id` int(11) NOT NULL,
`feeduri` varchar(255) NOT NULL,
`homeuri` varchar(255) NOT NULL,
`huburi` varchar(255) NOT NULL,
`verify_token` varchar(32) default NULL,
`sub_start` datetime default NULL,
`sub_end` datetime default NULL,
`created` datetime NOT NULL,
`lastupdate` datetime NOT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `feedinfo_feeduri_idx` (`feeduri`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

View File

@ -43,7 +43,7 @@ class FeedSubException extends Exception
{ {
} }
class FeedSubPlugin extends Plugin class OStatusPlugin extends Plugin
{ {
/** /**
* Hook for RouterInitialized event. * Hook for RouterInitialized event.
@ -53,6 +53,8 @@ class FeedSubPlugin extends Plugin
*/ */
function onRouterInitialized($m) function onRouterInitialized($m)
{ {
$m->connect('push/hub', array('action' => 'hub'));
$m->connect('feedsub/callback/:feed', $m->connect('feedsub/callback/:feed',
array('action' => 'feedsubcallback'), array('action' => 'feedsubcallback'),
array('feed' => '[0-9]+')); array('feed' => '[0-9]+'));
@ -61,6 +63,46 @@ class FeedSubPlugin extends Plugin
return true; return true;
} }
/**
* Set up queue handlers for outgoing hub pushes
* @param QueueManager $qm
* @return boolean hook return
*/
function onEndInitializeQueueManager(QueueManager $qm)
{
$qm->connect('hubverify', 'HubVerifyQueueHandler');
$qm->connect('hubdistrib', 'HubDistribQueueHandler');
$qm->connect('hubout', 'HubOutQueueHandler');
return true;
}
/**
* Put saved notices into the queue for pubsub distribution.
*/
function onStartEnqueueNotice($notice, &$transports)
{
$transports[] = 'hubdistrib';
return true;
}
/**
* Set up a PuSH hub link to our internal link for canonical timeline
* Atom feeds for users.
*/
function onStartApiAtom(Action $action)
{
if ($action instanceof ApiTimelineUserAction) {
$id = $action->arg('id');
if (strval(intval($id)) === strval($id)) {
// Canonical form of id in URL?
// Updates will be handled for our internal PuSH hub.
$action->element('link', array('rel' => 'hub',
'href' => common_local_url('hub')));
}
}
return true;
}
/** /**
* Add the feed settings page to the Connect Settings menu * Add the feed settings page to the Connect Settings menu
* *
@ -92,7 +134,8 @@ class FeedSubPlugin extends Plugin
{ {
$base = dirname(__FILE__); $base = dirname(__FILE__);
$lower = strtolower($cls); $lower = strtolower($cls);
$files = array("$base/$lower.php"); $files = array("$base/classes/$cls.php",
"$base/lib/$lower.php");
if (substr($lower, -6) == 'action') { if (substr($lower, -6) == 'action') {
$files[] = "$base/actions/" . substr($lower, 0, -6) . ".php"; $files[] = "$base/actions/" . substr($lower, 0, -6) . ".php";
} }
@ -110,6 +153,7 @@ class FeedSubPlugin extends Plugin
// alter table feedinfo change column id id int(11) not null auto_increment; // alter table feedinfo change column id id int(11) not null auto_increment;
$schema = Schema::get(); $schema = Schema::get();
$schema->ensureTable('feedinfo', Feedinfo::schemaDef()); $schema->ensureTable('feedinfo', Feedinfo::schemaDef());
$schema->ensureTable('hubsub', HubSub::schemaDef());
return true; return true;
} }
} }

View File

@ -52,9 +52,14 @@ class FeedSubCallbackAction extends Action
if (!$feedinfo) { if (!$feedinfo) {
throw new ServerException('Unknown feed id ' . $feedid, 400); throw new ServerException('Unknown feed id ' . $feedid, 400);
} }
$hmac = '';
if (isset($_SERVER['HTTP_X_HUB_SIGNATURE'])) {
$hmac = $_SERVER['HTTP_X_HUB_SIGNATURE'];
}
$post = file_get_contents('php://input'); $post = file_get_contents('php://input');
$feedinfo->postUpdates($post); $feedinfo->postUpdates($post, $hmac);
} }
/** /**

View File

@ -184,7 +184,7 @@ class FeedSubSettingsAction extends ConnectSettingsAction
$this->munger = $discover->feedMunger(); $this->munger = $discover->feedMunger();
$this->feedinfo = $this->munger->feedInfo(); $this->feedinfo = $this->munger->feedInfo();
if ($this->feedinfo->huburi == '') { if ($this->feedinfo->huburi == '' && !common_config('feedsub', 'nohub')) {
$this->showForm(_m('Feed is not PuSH-enabled; cannot subscribe.')); $this->showForm(_m('Feed is not PuSH-enabled; cannot subscribe.'));
return false; return false;
} }
@ -213,7 +213,10 @@ class FeedSubSettingsAction extends ConnectSettingsAction
// And subscribe the current user to the local profile // And subscribe the current user to the local profile
$user = common_current_user(); $user = common_current_user();
$profile = $this->feedinfo->getProfile(); $profile = $this->feedinfo->getProfile();
if (!$profile) {
throw new ServerException("Feed profile was not saved properly.");
}
if ($user->isSubscribed($profile)) { if ($user->isSubscribed($profile)) {
$this->showForm(_m('Already subscribed!')); $this->showForm(_m('Already subscribed!'));
} elseif ($user->subscribeTo($profile)) { } elseif ($user->subscribeTo($profile)) {

View File

@ -0,0 +1,176 @@
<?php
/*
* StatusNet - the distributed open-source microblogging tool
* Copyright (C) 2010, StatusNet, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
/**
* Integrated PuSH hub; lets us only ping them what need it.
* @package Hub
* @maintainer Brion Vibber <brion@status.net>
*/
/**
Things to consider...
* should we purge incomplete subscriptions that never get a verification pingback?
* when can we send subscription renewal checks?
- at next send time probably ok
* when can we handle trimming of subscriptions?
- at next send time probably ok
* should we keep a fail count?
*/
class HubAction extends Action
{
function arg($arg, $def=null)
{
// PHP converts '.'s in incoming var names to '_'s.
// It also merges multiple values, which'll break hub.verify and hub.topic for publishing
// @fixme handle multiple args
$arg = str_replace('.', '_', $arg);
return parent::arg($arg, $def);
}
function prepare($args)
{
StatusNet::setApi(true); // reduce exception reports to aid in debugging
return parent::prepare($args);
}
function handle()
{
$mode = $this->trimmed('hub.mode');
switch ($mode) {
case "subscribe":
$this->subscribe();
break;
case "unsubscribe":
$this->unsubscribe();
break;
case "publish":
throw new ServerException("Publishing outside feeds not supported.", 400);
default:
throw new ServerException("Unrecognized mode '$mode'.", 400);
}
}
/**
* Process a PuSH feed subscription request.
*
* HTTP return codes:
* 202 Accepted - request saved and awaiting verification
* 204 No Content - already subscribed
* 403 Forbidden - rejecting this (not specifically spec'd)
*/
function subscribe()
{
$feed = $this->argUrl('hub.topic');
$callback = $this->argUrl('hub.callback');
common_log(LOG_DEBUG, __METHOD__ . ": checking sub'd to $feed $callback");
if ($this->getSub($feed, $callback)) {
// Already subscribed; return 204 per spec.
header('HTTP/1.1 204 No Content');
common_log(LOG_DEBUG, __METHOD__ . ': already subscribed');
return;
}
common_log(LOG_DEBUG, __METHOD__ . ': setting up');
$sub = new HubSub();
$sub->topic = $feed;
$sub->callback = $callback;
$sub->secret = $this->arg('hub.secret', null);
$sub->setLease(intval($this->arg('hub.lease_seconds')));
// @fixme check for feeds we don't manage
// @fixme check the verification mode, might want a return immediately?
common_log(LOG_DEBUG, __METHOD__ . ': inserting');
$ok = $sub->insert();
if (!$ok) {
throw new ServerException("Failed to save subscription record", 500);
}
// @fixme check errors ;)
$data = array('sub' => $sub, 'mode' => 'subscribe');
$qm = QueueManager::get();
$qm->enqueue($data, 'hubverify');
header('HTTP/1.1 202 Accepted');
common_log(LOG_DEBUG, __METHOD__ . ': done');
}
/**
* Process a PuSH feed unsubscription request.
*
* HTTP return codes:
* 202 Accepted - request saved and awaiting verification
* 204 No Content - already subscribed
* 400 Bad Request - invalid params or rejected feed
*/
function unsubscribe()
{
$feed = $this->argUrl('hub.topic');
$callback = $this->argUrl('hub.callback');
$sub = $this->getSub($feed, $callback);
if ($sub) {
if ($sub->verify('unsubscribe')) {
$sub->delete();
common_log(LOG_INFO, "PuSH unsubscribed $feed for $callback");
} else {
throw new ServerException("Failed PuSH unsubscription: verification failed! $feed for $callback");
}
} else {
throw new ServerException("Failed PuSH unsubscription: not subscribed! $feed for $callback");
}
}
/**
* Grab and validate a URL from POST parameters.
* @throws ServerException for malformed or non-http/https URLs
*/
protected function argUrl($arg)
{
$url = $this->arg($arg);
$params = array('domain_check' => false, // otherwise breaks my local tests :P
'allowed_schemes' => array('http', 'https'));
if (Validate::uri($url, $params)) {
return $url;
} else {
throw new ServerException("Invalid URL passed for $arg: '$url'", 400);
}
}
/**
* Get HubSub subscription record for a given feed & subscriber.
*
* @param string $feed
* @param string $callback
* @return mixed HubSub or false
*/
protected function getSub($feed, $callback)
{
return HubSub::staticGet($feed, $callback);
}
}

View File

@ -1,8 +1,29 @@
<?php <?php
/*
* StatusNet - the distributed open-source microblogging tool
* Copyright (C) 2009-2010, StatusNet, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
/**
* @package FeedSubPlugin
* @maintainer Brion Vibber <brion@status.net>
*/
/* /*
PuSH subscription flow:
Subscription flow:
$feedinfo->subscribe() $feedinfo->subscribe()
generate random verification token generate random verification token
@ -16,7 +37,6 @@ Subscription flow:
feedsub/callback feedsub/callback
hub sends us updates via POST hub sends us updates via POST
?
*/ */
@ -43,6 +63,7 @@ class Feedinfo extends Memcached_DataObject
public $huburi; public $huburi;
// PuSH subscription data // PuSH subscription data
public $secret;
public $verify_token; public $verify_token;
public $sub_start; public $sub_start;
public $sub_end; public $sub_end;
@ -72,6 +93,7 @@ class Feedinfo extends Memcached_DataObject
'feeduri' => DB_DATAOBJECT_STR + DB_DATAOBJECT_NOTNULL, 'feeduri' => DB_DATAOBJECT_STR + DB_DATAOBJECT_NOTNULL,
'homeuri' => DB_DATAOBJECT_STR + DB_DATAOBJECT_NOTNULL, 'homeuri' => DB_DATAOBJECT_STR + DB_DATAOBJECT_NOTNULL,
'huburi' => DB_DATAOBJECT_STR + DB_DATAOBJECT_NOTNULL, 'huburi' => DB_DATAOBJECT_STR + DB_DATAOBJECT_NOTNULL,
'secret' => DB_DATAOBJECT_STR,
'verify_token' => DB_DATAOBJECT_STR, 'verify_token' => DB_DATAOBJECT_STR,
'sub_start' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME, 'sub_start' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME,
'sub_end' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME, 'sub_end' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME,
@ -98,6 +120,8 @@ class Feedinfo extends Memcached_DataObject
255, false), 255, false),
new ColumnDef('verify_token', 'varchar', new ColumnDef('verify_token', 'varchar',
32, true), 32, true),
new ColumnDef('secret', 'varchar',
64, true),
new ColumnDef('sub_start', 'datetime', new ColumnDef('sub_start', 'datetime',
null, true), null, true),
new ColumnDef('sub_end', 'datetime', new ColumnDef('sub_end', 'datetime',
@ -119,7 +143,7 @@ class Feedinfo extends Memcached_DataObject
function keys() function keys()
{ {
return array('id' => 'P'); //? return array_keys($this->keyTypes());
} }
/** /**
@ -133,7 +157,12 @@ class Feedinfo extends Memcached_DataObject
function keyTypes() function keyTypes()
{ {
return $this->keys(); return array('id' => 'K'); // @fixme we'll need a profile_id key at least
}
function sequenceKey()
{
return array('id', true, false);
} }
/** /**
@ -161,6 +190,10 @@ class Feedinfo extends Memcached_DataObject
$feedinfo->query('BEGIN'); $feedinfo->query('BEGIN');
// Awful hack! Awful hack!
$feedinfo->verify = common_good_rand(16);
$feedinfo->secret = common_good_rand(32);
try { try {
$profile = $munger->profile(); $profile = $munger->profile();
$result = $profile->insert(); $result = $profile->insert();
@ -168,6 +201,21 @@ class Feedinfo extends Memcached_DataObject
throw new FeedDBException($profile); throw new FeedDBException($profile);
} }
$avatar = $munger->getAvatar();
if ($avatar) {
// @fixme this should be better encapsulated
// ripped from oauthstore.php (for old OMB client)
$temp_filename = tempnam(sys_get_temp_dir(), 'listener_avatar');
copy($avatar, $temp_filename);
$imagefile = new ImageFile($profile->id, $temp_filename);
$filename = Avatar::filename($profile->id,
image_type_to_extension($imagefile->type),
null,
common_timestamp());
rename($temp_filename, Avatar::path($filename));
$profile->setOriginal($filename);
}
$feedinfo->profile_id = $profile->id; $feedinfo->profile_id = $profile->id;
$result = $feedinfo->insert(); $result = $feedinfo->insert();
if (empty($result)) { if (empty($result)) {
@ -191,27 +239,38 @@ class Feedinfo extends Memcached_DataObject
*/ */
public function subscribe() public function subscribe()
{ {
if (common_config('feedsub', 'nohub')) {
// Fake it! We're just testing remote feeds w/o hubs.
return true;
}
// @fixme use the verification token // @fixme use the verification token
#$token = md5(mt_rand() . ':' . $this->feeduri); #$token = md5(mt_rand() . ':' . $this->feeduri);
#$this->verify_token = $token; #$this->verify_token = $token;
#$this->update(); // @fixme #$this->update(); // @fixme
try { try {
$callback = common_local_url('feedsubcallback', array('feed' => $this->id)); $callback = common_local_url('feedsubcallback', array('feed' => $this->id));
$headers = array('Content-Type: application/x-www-form-urlencoded'); $headers = array('Content-Type: application/x-www-form-urlencoded');
$post = array('hub.mode' => 'subscribe', $post = array('hub.mode' => 'subscribe',
'hub.callback' => $callback, 'hub.callback' => $callback,
'hub.verify' => 'async', 'hub.verify' => 'async',
//'hub.verify_token' => $token, 'hub.verify_token' => $this->verify_token,
'hub.secret' => $this->secret,
//'hub.lease_seconds' => 0, //'hub.lease_seconds' => 0,
'hub.topic' => $this->feeduri); 'hub.topic' => $this->feeduri);
$client = new HTTPClient(); $client = new HTTPClient();
$response = $client->post($this->huburi, $headers, $post); $response = $client->post($this->huburi, $headers, $post);
if ($response->getStatus() >= 200 && $response->getStatus() < 300) { $status = $response->getStatus();
common_log(LOG_INFO, __METHOD__ . ': sub req ok'); if ($status == 202) {
common_log(LOG_INFO, __METHOD__ . ': sub req ok, awaiting verification callback');
return true; return true;
} else if ($status == 204) {
common_log(LOG_INFO, __METHOD__ . ': sub req ok and verified');
return true;
} else if ($status >= 200 && $status < 300) {
common_log(LOG_ERR, __METHOD__ . ": sub req returned unexpected HTTP $status: " . $response->getBody());
return false;
} else { } else {
common_log(LOG_INFO, __METHOD__ . ': sub req failed'); common_log(LOG_ERR, __METHOD__ . ": sub req failed with HTTP $status: " . $response->getBody());
return false; return false;
} }
} catch (Exception $e) { } catch (Exception $e) {
@ -227,10 +286,29 @@ class Feedinfo extends Memcached_DataObject
* coming from a PuSH hub. * coming from a PuSH hub.
* *
* @param string $xml source of Atom or RSS feed * @param string $xml source of Atom or RSS feed
* @param string $hmac X-Hub-Signature header, if present
*/ */
public function postUpdates($xml) public function postUpdates($xml, $hmac)
{ {
common_log(LOG_INFO, __METHOD__ . ": packet for \"$this->feeduri\"! $xml"); common_log(LOG_INFO, __METHOD__ . ": packet for \"$this->feeduri\"! $hmac $xml");
if ($this->secret) {
if (preg_match('/^sha1=([0-9a-fA-F]{40})$/', $hmac, $matches)) {
$their_hmac = strtolower($matches[1]);
$our_hmac = sha1($xml . $this->secret);
if ($their_hmac !== $our_hmac) {
common_log(LOG_ERR, __METHOD__ . ": ignoring PuSH with bad SHA-1 HMAC: got $their_hmac, expected $our_hmac");
return;
}
} else {
common_log(LOG_ERR, __METHOD__ . ": ignoring PuSH with bogus HMAC '$hmac'");
return;
}
} else if ($hmac) {
common_log(LOG_ERR, __METHOD__ . ": ignoring PuSH with unexpected HMAC '$hmac'");
return;
}
require_once "XML/Feed/Parser.php"; require_once "XML/Feed/Parser.php";
$feed = new XML_Feed_Parser($xml, false, false, true); $feed = new XML_Feed_Parser($xml, false, false, true);
$munger = new FeedMunger($feed); $munger = new FeedMunger($feed);
@ -246,8 +324,7 @@ class Feedinfo extends Memcached_DataObject
// @fixme this could explode horribly for multiple feeds on a blog. sigh // @fixme this could explode horribly for multiple feeds on a blog. sigh
$dupe = new Notice(); $dupe = new Notice();
$dupe->uri = $notice->uri; $dupe->uri = $notice->uri;
$dupe->find(); if ($dupe->find(true)) {
if ($dupe->fetch()) {
common_log(LOG_WARNING, __METHOD__ . ": tried to save dupe notice for entry {$notice->uri} of feed {$this->feeduri}"); common_log(LOG_WARNING, __METHOD__ . ": tried to save dupe notice for entry {$notice->uri} of feed {$this->feeduri}");
continue; continue;
} }

View File

@ -0,0 +1,272 @@
<?php
/*
* StatusNet - the distributed open-source microblogging tool
* Copyright (C) 2010, StatusNet, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
/**
* PuSH feed subscription record
* @package Hub
* @author Brion Vibber <brion@status.net>
*/
class HubSub extends Memcached_DataObject
{
public $__table = 'hubsub';
public $hashkey; // sha1(topic . '|' . $callback); (topic, callback) key is too long for myisam in utf8
public $topic;
public $callback;
public $secret;
public $verify_token;
public $challenge;
public $lease;
public $sub_start;
public $sub_end;
public $created;
public /*static*/ function staticGet($topic, $callback)
{
return parent::staticGet(__CLASS__, 'hashkey', self::hashkey($topic, $callback));
}
protected static function hashkey($topic, $callback)
{
return sha1($topic . '|' . $callback);
}
/**
* return table definition for DB_DataObject
*
* DB_DataObject needs to know something about the table to manipulate
* instances. This method provides all the DB_DataObject needs to know.
*
* @return array array of column definitions
*/
function table()
{
return array('hashkey' => DB_DATAOBJECT_STR + DB_DATAOBJECT_NOTNULL,
'topic' => DB_DATAOBJECT_STR + DB_DATAOBJECT_NOTNULL,
'callback' => DB_DATAOBJECT_STR + DB_DATAOBJECT_NOTNULL,
'secret' => DB_DATAOBJECT_STR,
'verify_token' => DB_DATAOBJECT_STR,
'challenge' => DB_DATAOBJECT_STR,
'lease' => DB_DATAOBJECT_INT,
'sub_start' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME,
'sub_end' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME,
'created' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME + DB_DATAOBJECT_NOTNULL);
}
static function schemaDef()
{
return array(new ColumnDef('hashkey', 'char',
/*size*/40,
/*nullable*/false,
/*key*/'PRI'),
new ColumnDef('topic', 'varchar',
/*size*/255,
/*nullable*/false,
/*key*/'KEY'),
new ColumnDef('callback', 'varchar',
255, false),
new ColumnDef('secret', 'text',
null, true),
new ColumnDef('verify_token', 'text',
null, true),
new ColumnDef('challenge', 'varchar',
32, true),
new ColumnDef('lease', 'int',
null, true),
new ColumnDef('sub_start', 'datetime',
null, true),
new ColumnDef('sub_end', 'datetime',
null, true),
new ColumnDef('created', 'datetime',
null, false));
}
function keys()
{
return array_keys($this->keyTypes());
}
function sequenceKeys()
{
return array(false, false, false);
}
/**
* return key definitions for DB_DataObject
*
* DB_DataObject needs to know about keys that the table has; this function
* defines them.
*
* @return array key definitions
*/
function keyTypes()
{
return array('hashkey' => 'K');
}
/**
* Validates a requested lease length, sets length plus
* subscription start & end dates.
*
* Does not save to database -- use before insert() or update().
*
* @param int $length in seconds
*/
function setLease($length)
{
assert(is_int($length));
$min = 86400;
$max = 86400 * 30;
if ($length == 0) {
// We want to garbage collect dead subscriptions!
$length = $max;
} elseif( $length < $min) {
$length = $min;
} else if ($length > $max) {
$length = $max;
}
$this->lease = $length;
$this->start_sub = common_sql_now();
$this->end_sub = common_sql_date(time() + $length);
}
/**
* Send a verification ping to subscriber
* @param string $mode 'subscribe' or 'unsubscribe'
*/
function verify($mode)
{
assert($mode == 'subscribe' || $mode == 'unsubscribe');
// Is this needed? data object fun...
$clone = clone($this);
$clone->challenge = common_good_rand(16);
$clone->update($this);
$this->challenge = $clone->challenge;
unset($clone);
$params = array('hub.mode' => $mode,
'hub.topic' => $this->topic,
'hub.challenge' => $this->challenge);
if ($mode == 'subscribe') {
$params['hub.lease_seconds'] = $this->lease;
}
if ($this->verify_token) {
$params['hub.verify_token'] = $this->verify_token;
}
$url = $this->callback . '?' . http_build_query($params, '', '&'); // @fixme ugly urls
try {
$request = new HTTPClient();
$response = $request->get($url);
$status = $response->getStatus();
if ($status >= 200 && $status < 300) {
$fail = false;
} else {
// @fixme how can we schedule a second attempt?
// Or should we?
$fail = "Returned HTTP $status";
}
} catch (Exception $e) {
$fail = $e->getMessage();
}
if ($fail) {
// @fixme how can we schedule a second attempt?
// or save a fail count?
// Or should we?
common_log(LOG_ERR, "Failed to verify $mode for $this->topic at $this->callback: $fail");
return false;
} else {
if ($mode == 'subscribe') {
// Establish or renew the subscription!
// This seems unnecessary... dataobject fun!
$clone = clone($this);
$clone->challenge = null;
$clone->setLease($this->lease);
$clone->update($this);
unset($clone);
$this->challenge = null;
$this->setLease($this->lease);
common_log(LOG_ERR, "Verified $mode of $this->callback:$this->topic for $this->lease seconds");
} else if ($mode == 'unsubscribe') {
common_log(LOG_ERR, "Verified $mode of $this->callback:$this->topic");
$this->delete();
}
return true;
}
}
/**
* Insert wrapper; transparently set the hash key from topic and callback columns.
* @return boolean success
*/
function insert()
{
$this->hashkey = self::hashkey($this->topic, $this->callback);
return parent::insert();
}
/**
* Send a 'fat ping' to the subscriber's callback endpoint
* containing the given Atom feed chunk.
*
* Determination of which items to send should be done at
* a higher level; don't just shove in a complete feed!
*
* @param string $atom well-formed Atom feed
*/
function push($atom)
{
$headers = array('Content-Type: application/atom+xml');
if ($this->secret) {
$hmac = sha1($atom . $this->secret);
$headers[] = "X-Hub-Signature: sha1=$hmac";
} else {
$hmac = '(none)';
}
common_log(LOG_INFO, "About to push feed to $this->callback for $this->topic, HMAC $hmac");
try {
$request = new HTTPClient();
$request->setBody($atom);
$response = $request->post($this->callback, $headers);
if ($response->isOk()) {
return true;
}
common_log(LOG_ERR, "Error sending PuSH content " .
"to $this->callback for $this->topic: " .
$response->getStatus());
return false;
} catch (Exception $e) {
common_log(LOG_ERR, "Error sending PuSH content " .
"to $this->callback for $this->topic: " .
$e->getMessage());
return false;
}
}
}

View File

Before

Width:  |  Height:  |  Size: 1.2 KiB

After

Width:  |  Height:  |  Size: 1.2 KiB

View File

Before

Width:  |  Height:  |  Size: 2.4 KiB

After

Width:  |  Height:  |  Size: 2.4 KiB

View File

Before

Width:  |  Height:  |  Size: 5.3 KiB

After

Width:  |  Height:  |  Size: 5.3 KiB

View File

@ -48,6 +48,18 @@ class FeedSubNoFeedException extends FeedSubException
{ {
} }
/**
* Given a web page or feed URL, discover the final location of the feed
* and return its current contents.
*
* @example
* $feed = new FeedDiscovery();
* if ($feed->discoverFromURL($url)) {
* print $feed->uri;
* print $feed->type;
* processFeed($feed->body);
* }
*/
class FeedDiscovery class FeedDiscovery
{ {
public $uri; public $uri;
@ -64,7 +76,7 @@ class FeedDiscovery
/** /**
* @param string $url * @param string $url
* @param bool $htmlOk * @param bool $htmlOk pass false here if you don't want to follow web pages.
* @return string with validated URL * @return string with validated URL
* @throws FeedSubBadURLException * @throws FeedSubBadURLException
* @throws FeedSubBadHtmlException * @throws FeedSubBadHtmlException

View File

@ -30,8 +30,8 @@ class FeedSubPreviewNotice extends Notice
function __construct($profile) function __construct($profile)
{ {
//parent::__construct(); // uhhh?
$this->profile = $profile; $this->profile = $profile;
$this->profile_id = 0;
} }
function getProfile() function getProfile()
@ -56,14 +56,19 @@ class FeedSubPreviewProfile extends Profile
{ {
function getAvatar($width, $height=null) function getAvatar($width, $height=null)
{ {
return new FeedSubPreviewAvatar($width, $height); return new FeedSubPreviewAvatar($width, $height, $this->avatar);
} }
} }
class FeedSubPreviewAvatar extends Avatar class FeedSubPreviewAvatar extends Avatar
{ {
function __construct($width, $height, $remote)
{
$this->remoteImage = $remote;
}
function displayUrl() { function displayUrl() {
return common_path('plugins/FeedSub/images/48px-Feed-icon.svg.png'); return $this->remoteImage;
} }
} }
@ -150,6 +155,23 @@ class FeedMunger
return $this->getAtomLink($this->feed, array('rel' => 'hub')); return $this->getAtomLink($this->feed, array('rel' => 'hub'));
} }
/**
* Get an appropriate avatar image source URL, if available.
* @return mixed string or false
*/
function getAvatar()
{
$logo = $this->feed->logo;
if ($logo) {
return $logo;
}
$icon = $this->feed->icon;
if ($icon) {
return $icon;
}
return common_path('plugins/OStatus/images/48px-Feed-icon.svg.png');
}
function profile($preview=false) function profile($preview=false)
{ {
if ($preview) { if ($preview) {
@ -164,6 +186,10 @@ class FeedMunger
$profile->homepage = $this->getAltLink($this->feed); $profile->homepage = $this->getAltLink($this->feed);
$profile->bio = $this->feed->description; $profile->bio = $this->feed->description;
$profile->profileurl = $this->getAltLink($this->feed); $profile->profileurl = $this->getAltLink($this->feed);
if ($preview) {
$profile->avatar = $this->getAvatar();
}
// @todo tags from categories // @todo tags from categories
// @todo lat/lon/location? // @todo lat/lon/location?
@ -186,6 +212,12 @@ class FeedMunger
} }
$link = $this->getAltLink($entry); $link = $this->getAltLink($entry);
if (empty($link)) {
if (preg_match('!^https?://!', $entry->id)) {
$link = $entry->id;
common_log(LOG_DEBUG, "No link on entry, using URL from id: $link");
}
}
$notice->uri = $link; $notice->uri = $link;
$notice->url = $link; $notice->url = $link;
$notice->content = $this->noticeFromEntry($entry); $notice->content = $this->noticeFromEntry($entry);

View File

@ -0,0 +1,87 @@
<?php
/*
* StatusNet - the distributed open-source microblogging tool
* Copyright (C) 2010, StatusNet, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
/**
* Send a PuSH subscription verification from our internal hub.
* Queue up final distribution for
* @package Hub
* @author Brion Vibber <brion@status.net>
*/
class HubDistribQueueHandler extends QueueHandler
{
function transport()
{
return 'hubdistrib';
}
function handle($notice)
{
assert($notice instanceof Notice);
// See if there's any PuSH subscriptions, including OStatus clients.
// @fixme handle group subscriptions as well
// http://identi.ca/api/statuses/user_timeline/1.atom
$feed = common_local_url('ApiTimelineUser',
array('id' => $notice->profile_id,
'format' => 'atom'));
$sub = new HubSub();
$sub->topic = $feed;
if ($sub->find()) {
common_log(LOG_INFO, "Preparing $sub->N PuSH distribution(s) for $feed");
$qm = QueueManager::get();
$atom = $this->userFeedForNotice($notice);
while ($sub->fetch()) {
common_log(LOG_INFO, "Prepping PuSH distribution to $sub->callback for $feed");
$data = array('sub' => clone($sub),
'atom' => $atom);
$qm->enqueue($data, 'hubout');
}
} else {
common_log(LOG_INFO, "No PuSH subscribers for $feed");
}
}
/**
* Build a single-item version of the sending user's Atom feed.
* @param Notice $notice
* @return string
*/
function userFeedForNotice($notice)
{
// @fixme this feels VERY hacky...
// should probably be a cleaner way to do it
ob_start();
$api = new ApiTimelineUserAction();
$api->prepare(array('id' => $notice->profile_id,
'format' => 'atom',
'max_id' => $notice->id,
'since_id' => $notice->id - 1));
$api->showTimeline();
$feed = ob_get_clean();
// ...and override the content-type back to something normal... eww!
// hope there's no other headers that got set while we weren't looking.
header('Content-Type: text/html; charset=utf-8');
common_log(LOG_DEBUG, $feed);
return $feed;
}
}

View File

@ -0,0 +1,52 @@
<?php
/*
* StatusNet - the distributed open-source microblogging tool
* Copyright (C) 2010, StatusNet, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
/**
* Send a raw PuSH atom update from our internal hub.
* @package Hub
* @author Brion Vibber <brion@status.net>
*/
class HubOutQueueHandler extends QueueHandler
{
function transport()
{
return 'hubout';
}
function handle($data)
{
$sub = $data['sub'];
$atom = $data['atom'];
assert($sub instanceof HubSub);
assert(is_string($atom));
try {
$sub->push($atom);
} catch (Exception $e) {
common_log(LOG_ERR, "Failed PuSH to $sub->callback for $sub->topic: " .
$e->getMessage());
// @fixme Reschedule a later delivery?
// Currently we have no way to do this other than 'send NOW'
}
return true;
}
}

View File

@ -0,0 +1,53 @@
<?php
/*
* StatusNet - the distributed open-source microblogging tool
* Copyright (C) 2010, StatusNet, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
/**
* Send a PuSH subscription verification from our internal hub.
* @package Hub
* @author Brion Vibber <brion@status.net>
*/
class HubVerifyQueueHandler extends QueueHandler
{
function transport()
{
return 'hubverify';
}
function handle($data)
{
$sub = $data['sub'];
$mode = $data['mode'];
assert($sub instanceof HubSub);
assert($mode === 'subscribe' || $mode === 'unsubscribe');
common_log(LOG_INFO, __METHOD__ . ": $mode $sub->callback $sub->topic");
try {
$sub->verify($mode);
} catch (Exception $e) {
common_log(LOG_ERR, "Failed PuSH $mode verify to $sub->callback for $sub->topic: " .
$e->getMessage());
// @fixme schedule retry?
// @fixme just kill it?
}
return true;
}
}