From fb681990d9935715074b7d6755b52d7b091a06e7 Mon Sep 17 00:00:00 2001 From: Evan Prodromou Date: Mon, 17 Jan 2011 17:31:56 -0500 Subject: [PATCH] Account moving is a background activity --- lib/accountmover.php | 143 +++++++++-------------------------- lib/activitymover.php | 168 ++++++++++++++++++++++++++++++++++++++++++ lib/queuemanager.php | 2 + scripts/moveuser.php | 4 +- 4 files changed, 205 insertions(+), 112 deletions(-) create mode 100644 lib/activitymover.php diff --git a/lib/accountmover.php b/lib/accountmover.php index 31152fd118..85c95c1132 100644 --- a/lib/accountmover.php +++ b/lib/accountmover.php @@ -45,17 +45,16 @@ if (!defined('STATUSNET')) { * @link http://status.net/ */ -class AccountMover +class AccountMover extends QueueHandler { - private $_user = null; - private $_profile = null; - private $_remote = null; - private $_sink = null; - - function __construct($user, $remote, $password) + function transport() { - $this->_user = $user; - $this->_profile = $user->getProfile(); + return 'acctmove'; + } + + function handle($object) + { + list($user, $remote, $password) = $object; $remote = Discovery::normalize($remote); @@ -65,11 +64,33 @@ class AccountMover throw new Exception("Can't locate account {$remote}"); } - $this->_remote = $oprofile->localProfile(); - list($svcDocUrl, $username) = self::getServiceDocument($remote); - $this->_sink = new ActivitySink($svcDocUrl, $username, $password); + $sink = new ActivitySink($svcDocUrl, $username, $password); + + $this->log(LOG_INFO, + "Moving user {$user->nickname} ". + "to {$remote}."); + + $stream = new UserActivityStream($user); + + // Reverse activities to run in correct chron order + + $acts = array_reverse($stream->activities); + + $this->log(LOG_INFO, + "Got ".count($acts)." activities ". + "for {$user->nickname}."); + + $qm = QueueManager::get(); + + foreach ($acts as $act) { + $qm->enqueue(array($act, $sink, $user->uri, $remote), 'actmove'); + } + + $this->log(LOG_INFO, + "Finished moving user {$user->nickname} ". + "to {$remote}."); } static function getServiceDocument($remote) @@ -108,104 +129,6 @@ class AccountMover return array($svcDocUrl, $username); } - function move() - { - $this->log(LOG_INFO, - "Moving user {$this->_user->nickname} to {$this->_remote->nickname}"); - - $stream = new UserActivityStream($this->_user); - - $acts = array_reverse($stream->activities); - - $this->log(LOG_INFO, - "Got {count($acts)} activities ". - "for {$this->_user->nickname}"); - - // Reverse activities to run in correct chron order - - foreach ($acts as $act) { - try { - $this->_moveActivity($act); - } catch (Exception $e) { - $this->log(LOG_ERR, - "Error moving activity {$act->id} {$act->verb}: " . - $e->getMessage()); - continue; - } - } - - $this->log(LOG_INFO, - "Finished moving user {$this->_user->nickname} ". - "to {$this->_remote->nickname}"); - } - - private function _moveActivity($act) - { - switch ($act->verb) { - case ActivityVerb::FAVORITE: - $this->log(LOG_INFO, - "Moving favorite of {$act->objects[0]->id} by ". - "{$act->actor->id} to {$this->_remote->nickname}."); - // push it, then delete local - $this->_sink->postActivity($act); - $notice = Notice::staticGet('uri', $act->objects[0]->id); - if (!empty($notice)) { - $fave = Fave::pkeyGet(array('user_id' => $this->_user->id, - 'notice_id' => $notice->id)); - $fave->delete(); - } - break; - case ActivityVerb::POST: - $this->log(LOG_INFO, - "Moving notice {$act->objects[0]->id} by ". - "{$act->actor->id} to {$this->_remote->nickname}."); - // XXX: send a reshare, not a post - $this->_sink->postActivity($act); - $notice = Notice::staticGet('uri', $act->objects[0]->id); - if (!empty($notice)) { - $notice->delete(); - } - break; - case ActivityVerb::JOIN: - $this->log(LOG_INFO, - "Moving group join of {$act->objects[0]->id} by ". - "{$act->actor->id} to {$this->_remote->nickname}."); - $this->_sink->postActivity($act); - $group = User_group::staticGet('uri', $act->objects[0]->id); - if (!empty($group)) { - Group_member::leave($group->id, $this->_user->id); - } - break; - case ActivityVerb::FOLLOW: - if ($act->actor->id == $this->_user->uri) { - $this->log(LOG_INFO, - "Moving subscription to {$act->objects[0]->id} by ". - "{$act->actor->id} to {$this->_remote->nickname}."); - $this->_sink->postActivity($act); - $other = Profile::fromURI($act->objects[0]->id); - if (!empty($other)) { - Subscription::cancel($this->_profile, $other); - } - } else { - $otherUser = User::staticGet('uri', $act->actor->id); - if (!empty($otherUser)) { - $this->log(LOG_INFO, - "Changing sub to {$act->objects[0]->id}". - "by {$act->actor->id} to {$this->_remote->nickname}."); - $otherProfile = $otherUser->getProfile(); - Subscription::start($otherProfile, $this->_remote); - Subscription::cancel($otherProfile, $this->_user->getProfile()); - } else { - $this->log(LOG_NOTICE, - "Not changing sub to {$act->objects[0]->id}". - "by remote {$act->actor->id} ". - "to {$this->_remote->nickname}."); - } - } - break; - } - } - /** * Log some data * diff --git a/lib/activitymover.php b/lib/activitymover.php new file mode 100644 index 0000000000..7032331104 --- /dev/null +++ b/lib/activitymover.php @@ -0,0 +1,168 @@ +. + * + * @category Cache + * @package StatusNet + * @author Evan Prodromou + * @copyright 2010 StatusNet, Inc. + * @license http://www.fsf.org/licensing/licenses/agpl-3.0.html AGPL 3.0 + * @link http://status.net/ + */ + +if (!defined('STATUSNET')) { + // This check helps protect against security problems; + // your code file can't be executed directly from the web. + exit(1); +} + +/** + * Class comment + * + * @category General + * @package StatusNet + * @author Evan Prodromou + * @copyright 2010 StatusNet, Inc. + * @license http://www.fsf.org/licensing/licenses/agpl-3.0.html AGPL 3.0 + * @link http://status.net/ + */ + +class ActivityMover extends QueueHandler +{ + function transport() + { + return 'actmove'; + } + + function handle($data) + { + list ($act, $sink, $userURI, $remoteURI) = $data; + + $user = User::staticGet('uri', $userURI); + $remote = Profile::fromURI($remoteURI); + + try { + $this->moveActivity($act, $sink, $user, $remote); + } catch (ClientException $cex) { + $this->log(LOG_WARNING, + $cex->getMessage()); + // "don't retry me" + return true; + } catch (ServerException $sex) { + $this->log(LOG_WARNING, + $sex->getMessage()); + // "retry me" (because we think the server might handle it next time) + return false; + } catch (Exception $ex) { + $this->log(LOG_WARNING, + $ex->getMessage()); + // "don't retry me" + return true; + } + } + + function moveActivity($act, $sink, $user, $remote) + { + if (empty($user)) { + throw new Exception("No such user {$act->actor->id}"); + } + + switch ($act->verb) { + case ActivityVerb::FAVORITE: + $this->log(LOG_INFO, + "Moving favorite of {$act->objects[0]->id} by ". + "{$act->actor->id} to {$remote->nickname}."); + // push it, then delete local + $sink->postActivity($act); + $notice = Notice::staticGet('uri', $act->objects[0]->id); + if (!empty($notice)) { + $fave = Fave::pkeyGet(array('user_id' => $user->id, + 'notice_id' => $notice->id)); + $fave->delete(); + } + break; + case ActivityVerb::POST: + $this->log(LOG_INFO, + "Moving notice {$act->objects[0]->id} by ". + "{$act->actor->id} to {$remote->nickname}."); + // XXX: send a reshare, not a post + $sink->postActivity($act); + $notice = Notice::staticGet('uri', $act->objects[0]->id); + if (!empty($notice)) { + $notice->delete(); + } + break; + case ActivityVerb::JOIN: + $this->log(LOG_INFO, + "Moving group join of {$act->objects[0]->id} by ". + "{$act->actor->id} to {$remote->nickname}."); + $sink->postActivity($act); + $group = User_group::staticGet('uri', $act->objects[0]->id); + if (!empty($group)) { + Group_member::leave($group->id, $user->id); + } + break; + case ActivityVerb::FOLLOW: + if ($act->actor->id == $user->uri) { + $this->log(LOG_INFO, + "Moving subscription to {$act->objects[0]->id} by ". + "{$act->actor->id} to {$remote->nickname}."); + $sink->postActivity($act); + $other = Profile::fromURI($act->objects[0]->id); + if (!empty($other)) { + Subscription::cancel($user->getProfile(), $other); + } + } else { + $otherUser = User::staticGet('uri', $act->actor->id); + if (!empty($otherUser)) { + $this->log(LOG_INFO, + "Changing sub to {$act->objects[0]->id}". + "by {$act->actor->id} to {$remote->nickname}."); + $otherProfile = $otherUser->getProfile(); + Subscription::start($otherProfile, $remote); + Subscription::cancel($otherProfile, $user->getProfile()); + } else { + $this->log(LOG_NOTICE, + "Not changing sub to {$act->objects[0]->id}". + "by remote {$act->actor->id} ". + "to {$remote->nickname}."); + } + } + break; + } + } + + /** + * Log some data + * + * Add a header for our class so we know who did it. + * + * @param int $level Log level, like LOG_ERR or LOG_INFO + * @param string $message Message to log + * + * @return void + */ + + protected function log($level, $message) + { + common_log($level, "ActivityMover: " . $message); + } +} diff --git a/lib/queuemanager.php b/lib/queuemanager.php index 65a972e234..5b59444bc2 100644 --- a/lib/queuemanager.php +++ b/lib/queuemanager.php @@ -268,6 +268,8 @@ abstract class QueueManager extends IoManager $this->connect('deluser', 'DelUserQueueHandler'); $this->connect('feedimp', 'FeedImporter'); $this->connect('actimp', 'ActivityImporter'); + $this->connect('acctmove', 'AccountMover'); + $this->connect('actmove', 'ActivityMover'); // Broadcasting profile updates to OMB remote subscribers $this->connect('profile', 'ProfileQueueHandler'); diff --git a/scripts/moveuser.php b/scripts/moveuser.php index b02b10b1e5..cf8fd2e0be 100644 --- a/scripts/moveuser.php +++ b/scripts/moveuser.php @@ -51,9 +51,9 @@ try { $password = get_option_value('w', 'password'); - $mover = new AccountMover($user, $remote, $password); + $qm = QueueManager::get(); - $mover->move(); + $qm->enqueue(array($user, $remote, $password), 'acctmove'); } catch (Exception $e) { print $e->getMessage()."\n";