Merge branch 'testing' of git@gitorious.org:statusnet/mainline into testing

This commit is contained in:
Evan Prodromou 2010-02-21 21:39:50 -05:00
commit 273c0e0363
5 changed files with 51 additions and 25 deletions

View File

@ -64,6 +64,11 @@ class AtomNoticeFeed extends Atom10Feed
'http://activitystrea.ms/spec/1.0/' 'http://activitystrea.ms/spec/1.0/'
); );
$this->addNamespace(
'poco',
'http://portablecontacts.net/spec/1.0'
);
// XXX: What should the uri be? // XXX: What should the uri be?
$this->addNamespace( $this->addNamespace(
'ostatus', 'ostatus',

View File

@ -29,6 +29,7 @@ class PushCallbackAction extends Action
{ {
function handle() function handle()
{ {
StatusNet::setApi(true); // Minimize error messages to aid in debugging
parent::handle(); parent::handle();
if ($_SERVER['REQUEST_METHOD'] == 'POST') { if ($_SERVER['REQUEST_METHOD'] == 'POST') {
$this->handlePost(); $this->handlePost();

View File

@ -226,6 +226,26 @@ class HubSub extends Memcached_DataObject
return parent::insert(); return parent::insert();
} }
/**
* Schedule delivery of a 'fat ping' to the subscriber's callback
* endpoint. If queues are disabled, this will run immediately.
*
* @param string $atom well-formed Atom feed
* @param int $retries optional count of retries if POST fails; defaults to hub_retries from config or 0 if unset
*/
function distribute($atom, $retries=null)
{
if ($retries === null) {
$retries = intval(common_config('ostatus', 'hub_retries'));
}
$data = array('sub' => clone($this),
'atom' => $atom,
'retries' => $retries);
$qm = QueueManager::get();
$qm->enqueue($data, 'hubout');
}
/** /**
* Send a 'fat ping' to the subscriber's callback endpoint * Send a 'fat ping' to the subscriber's callback endpoint
* containing the given Atom feed chunk. * containing the given Atom feed chunk.
@ -234,6 +254,7 @@ class HubSub extends Memcached_DataObject
* a higher level; don't just shove in a complete feed! * a higher level; don't just shove in a complete feed!
* *
* @param string $atom well-formed Atom feed * @param string $atom well-formed Atom feed
* @throws Exception (HTTP or general)
*/ */
function push($atom) function push($atom)
{ {
@ -245,24 +266,18 @@ class HubSub extends Memcached_DataObject
$hmac = '(none)'; $hmac = '(none)';
} }
common_log(LOG_INFO, "About to push feed to $this->callback for $this->topic, HMAC $hmac"); common_log(LOG_INFO, "About to push feed to $this->callback for $this->topic, HMAC $hmac");
try {
$request = new HTTPClient();
$request->setBody($atom);
$response = $request->post($this->callback, $headers);
if ($response->isOk()) { $request = new HTTPClient();
return true; $request->setBody($atom);
} $response = $request->post($this->callback, $headers);
common_log(LOG_ERR, "Error sending PuSH content " .
"to $this->callback for $this->topic: " .
$response->getStatus());
return false;
} catch (Exception $e) { if ($response->isOk()) {
common_log(LOG_ERR, "Error sending PuSH content " . return true;
"to $this->callback for $this->topic: " . } else {
$e->getMessage()); throw new Exception("Callback returned status: " .
return false; $response->getStatus() .
"; body: " .
trim($response->getBody()));
} }
} }
} }

View File

@ -124,10 +124,7 @@ class HubDistribQueueHandler extends QueueHandler
common_log(LOG_INFO, "Preparing $sub->N PuSH distribution(s) for $sub->topic"); common_log(LOG_INFO, "Preparing $sub->N PuSH distribution(s) for $sub->topic");
$qm = QueueManager::get(); $qm = QueueManager::get();
while ($sub->fetch()) { while ($sub->fetch()) {
common_log(LOG_INFO, "Prepping PuSH distribution to $sub->callback for $sub->topic"); $sub->distribute($atom);
$data = array('sub' => clone($sub),
'atom' => $atom);
$qm->enqueue($data, 'hubout');
} }
} }

View File

@ -33,6 +33,7 @@ class HubOutQueueHandler extends QueueHandler
{ {
$sub = $data['sub']; $sub = $data['sub'];
$atom = $data['atom']; $atom = $data['atom'];
$retries = $data['retries'];
assert($sub instanceof HubSub); assert($sub instanceof HubSub);
assert(is_string($atom)); assert(is_string($atom));
@ -40,13 +41,20 @@ class HubOutQueueHandler extends QueueHandler
try { try {
$sub->push($atom); $sub->push($atom);
} catch (Exception $e) { } catch (Exception $e) {
common_log(LOG_ERR, "Failed PuSH to $sub->callback for $sub->topic: " . $retries--;
$e->getMessage()); $msg = "Failed PuSH to $sub->callback for $sub->topic: " .
// @fixme Reschedule a later delivery? $e->getMessage();
return true; if ($retries > 0) {
common_log(LOG_ERR, "$msg; scheduling for $retries more tries");
// @fixme when we have infrastructure to schedule a retry
// after a delay, use it.
$sub->distribute($atom, $retries);
} else {
common_log(LOG_ERR, "$msg; discarding");
}
} }
return true; return true;
} }
} }