From 74a60ab963b5ce1ed95bd81f935a44c573cd0264 Mon Sep 17 00:00:00 2001 From: Mikael Nordfeldth Date: Mon, 10 Jul 2017 14:43:28 +0200 Subject: [PATCH] Rework the push mechanism a bit to a less DB dependant queue --- plugins/OStatus/classes/HubSub.php | 138 +++++++++++++-------- plugins/OStatus/lib/huboutqueuehandler.php | 20 ++- 2 files changed, 100 insertions(+), 58 deletions(-) diff --git a/plugins/OStatus/classes/HubSub.php b/plugins/OStatus/classes/HubSub.php index d4c29c3ce1..6239ff3070 100644 --- a/plugins/OStatus/classes/HubSub.php +++ b/plugins/OStatus/classes/HubSub.php @@ -234,15 +234,11 @@ class HubSub extends Managed_DataObject $retries = intval(common_config('ostatus', 'hub_retries')); } - // We dare not clone() as when the clone is discarded it'll - // destroy the result data for the parent query. - // @fixme use clone() again when it's safe to copy an - // individual item from a multi-item query again. - $sub = HubSub::getByHashkey($this->getTopic(), $this->callback); - $data = array('sub' => $sub, + $data = array('topic' => $this->getTopic(), + 'callback' => $this->callback, 'atom' => $atom, 'retries' => $retries); - common_log(LOG_INFO, "Queuing WebSub: {$this->getTopic()} to {$this->callback}"); + common_log(LOG_INFO, sprintf('Queuing WebSub: %s to %s', _ve($data['topic']), _ve($data['callback']))); $qm = QueueManager::get(); $qm->enqueue($data, 'hubout'); } @@ -272,41 +268,72 @@ class HubSub extends Managed_DataObject } /** - * Send a 'fat ping' to the subscriber's callback endpoint - * containing the given Atom feed chunk. - * - * Determination of which items to send should be done at - * a higher level; don't just shove in a complete feed! - * - * @param string $atom well-formed Atom feed - * @throws Exception (HTTP or general) + * @return boolean true/false for HTTP response + * @throws Exception for lower-than-HTTP errors (such as NS lookup failure, connection refused...) */ - function push($atom) + public static function pushAtom($topic, $callback, $atom, $secret=null, $hashalg='sha1') { $headers = array('Content-Type: application/atom+xml'); - if ($this->secret) { - $hmac = hash_hmac('sha1', $atom, $this->secret); - $headers[] = "X-Hub-Signature: sha1=$hmac"; + if ($secret) { + $hmac = hash_hmac($hashalg, $atom, $secret); + $headers[] = "X-Hub-Signature: {$hashalg}={$hmac}"; } else { $hmac = '(none)'; } - common_log(LOG_INFO, "About to push feed to $this->callback for {$this->getTopic()}, HMAC $hmac"); + common_log(LOG_INFO, sprintf('About to WebSub-push feed to %s for %s, HMAC %s', _ve($callback), _ve($topic), _ve($hmac))); $request = new HTTPClient(); $request->setConfig(array('follow_redirects' => false)); $request->setBody($atom); + + // This will throw exception on non-HTTP failures try { - $response = $request->post($this->callback, $headers); - - if ($response->isOk()) { - return true; - } + $response = $request->post($callback, $headers); } catch (Exception $e) { - $response = null; - - common_debug('WebSub callback to '._ve($this->callback).' for '._ve($this->getTopic()).' failed with exception: '._ve($e->getMessage())); + common_debug(sprintf('WebSub callback to %s for %s failed with exception %s: %s', _ve($callback), _ve($topic), _ve(get_class($e)), _ve($e->getMessage()))); + throw $e; } + return $response->isOk(); + } + + /** + * Send a 'fat ping' to the subscriber's callback endpoint + * containing the given Atom feed chunk. + * + * Determination of which feed items to send should be done at + * a higher level; don't just shove in a complete feed! + * + * FIXME: Add 'failed' incremental count. + * + * @param string $atom well-formed Atom feed + * @return boolean Whether the PuSH was accepted or not. + * @throws Exception (HTTP or general) + */ + function push($atom) + { + try { + $success = self::pushAtom($this->getTopic(), $this->callback, $atom, $this->secret); + if ($success) { + return true; + } elseif ('https' === parse_url($this->callback, PHP_URL_SCHEME)) { + // Already HTTPS, no need to check remote http/https migration issues + return false; + } + // if pushAtom returned false and we didn't try an HTTPS endpoint, + // let's try HTTPS too (assuming only http:// and https:// are used ;)) + + } catch (Exception $e) { + if ('https' === parse_url($this->callback, PHP_URL_SCHEME)) { + // Already HTTPS, no need to check remote http/https migration issues + throw $e; + } + } + + + // We failed the WebSub push, but it might be that the remote site has changed their configuration to HTTPS + common_debug('WebSub HTTPSFIX: push failed, so we need to see if it can be the remote http->https migration issue.'); + // XXX: DO NOT trust a Location header here, _especially_ from 'http' protocols, // but not 'https' either at least if we don't do proper CA verification. Trust that // the most common change here is simply switching 'http' to 'https' and we will @@ -314,37 +341,38 @@ class HubSub extends Managed_DataObject // if we want to change the callback URLs, preferrably just manual resubscriptions // from the remote side, combined with implemented WebSub subscription timeouts. - // We failed the WebSub, but it might be that the remote site has changed their configuration to HTTPS - if ('http' === parse_url($this->callback, PHP_URL_SCHEME)) { - // Test if the feed callback for this node has migrated to HTTPS - $httpscallback = preg_replace('/^http/', 'https', $this->callback, 1); - $alreadyreplaced = self::getByHashKey($this->getTopic(), $httpscallback); - if ($alreadyreplaced instanceof HubSub) { - $this->delete(); - throw new AlreadyFulfilledException('The remote side has already established an HTTPS callback, deleting the legacy HTTP entry.'); - } + // Test if the feed callback for this node has already been migrated to HTTPS in our database + // (otherwise we'd get collisions when inserting it further down) + $httpscallback = preg_replace('/^http/', 'https', $this->callback, 1); + $alreadyreplaced = self::getByHashKey($this->getTopic(), $httpscallback); + if ($alreadyreplaced instanceof HubSub) { + // Let's remove the old HTTP callback object. + $this->delete(); - common_debug('WebSub callback to '._ve($this->callback).' for '._ve($this->getTopic()).' trying HTTPS callback: '._ve($httpscallback)); - $response = $request->post($httpscallback, $headers); - if ($response->isOk()) { - $orig = clone($this); - $this->callback = $httpscallback; - // NOTE: hashkey will be set in $this->onUpdateKeys($orig) through updateWithKeys - $this->updateWithKeys($orig); - return true; - } + // XXX: I think this means we might lose a message or two when + // remote side migrates to HTTPS because we only try _once_ + // for _one_ WebSub push. The rest of the posts already + // stored in our queue (if any) will not find a HubSub + // object. This could maybe be fixed by handling migration + // in HubOutQueueHandler while handling the item there. + common_debug('WebSub HTTPSFIX: Pushing Atom to HTTPS callback instead of HTTP, because of switch to HTTPS since enrolled in queue.'); + return self::pushAtom($this->getTopic(), $httpscallback, $atom, $this->secret); } - // FIXME: Add 'failed' incremental count for this callback. - - if (is_null($response)) { - // This means we got a lower-than-HTTP level error, like domain not found or maybe connection refused - // This should be using a more distinguishable exception class, but for now this will do. - throw new Exception(sprintf(_m('HTTP request failed without response to URL: %s'), _ve(isset($httpscallback) ? $httpscallback : $this->callback))); + common_debug('WebSub HTTPSFIX: callback to '._ve($this->callback).' for '._ve($this->getTopic()).' trying HTTPS callback: '._ve($httpscallback)); + $success = self::pushAtom($this->getTopic(), $httpscallback, $atom, $this->secret); + if ($success) { + // Yay, we made a successful push to https://, let's remember this in the future! + $orig = clone($this); + $this->callback = $httpscallback; + // NOTE: hashkey will be set in $this->onUpdateKeys($orig) through updateWithKeys + $this->updateWithKeys($orig); + return true; } - // TRANS: Exception. %1$s is a response status code, %2$s is the body of the response. - throw new Exception(sprintf(_m('Callback returned status: %1$s. Body: %2$s'), - $response->getStatus(),trim($response->getBody()))); + // If there have been any exceptions thrown before, they're handled + // higher up. This function's return value is just whether the WebSub + // push was accepted or not. + return $success; } } diff --git a/plugins/OStatus/lib/huboutqueuehandler.php b/plugins/OStatus/lib/huboutqueuehandler.php index c35d90319e..bf96070b4d 100644 --- a/plugins/OStatus/lib/huboutqueuehandler.php +++ b/plugins/OStatus/lib/huboutqueuehandler.php @@ -35,12 +35,26 @@ class HubOutQueueHandler extends QueueHandler function handle($data) { - $sub = $data['sub']; + assert(array_key_exists('atom', $data)); + assert(is_string($data['atom'])); $atom = $data['atom']; - $retries = $data['retries']; + assert(array_key_exists('retries', $data)); + $retries = intval($data['retries']); + + if (array_key_exists('topic', $data) && array_key_exists('callback', $data)) { + assert(is_string($data['topic'])); + assert(is_string($data['callback'])); + + $sub = HubSub::getByHashkey($data['topic'], $data['callback']); + } elseif (array_key_exists('sub', $data)) { + // queue behaviour changed 2017-07-09 to store topic/callback instead of sub object + common_debug('Legacy behaviour of storing HubSub objects found, this should go away when all objects are handled...'); + $sub = $data['sub']; + } else { + throw new ServerException('No HubSub object available with queue item data.'); + } assert($sub instanceof HubSub); - assert(is_string($atom)); try { $sub->push($atom);