From e8ad436a99401c36cc7fcf7e8d14d3bc00cdd3d3 Mon Sep 17 00:00:00 2001 From: Brion Vibber Date: Thu, 23 Sep 2010 15:23:56 -0700 Subject: [PATCH] Work in progress: YammerRunner state machine wrapper for running the Yammer import in chunks. --- plugins/YammerImport/YammerImportPlugin.php | 1 + .../YammerImport/actions/yammeradminpanel.php | 2 +- plugins/YammerImport/classes/Yammer_state.php | 3 + plugins/YammerImport/lib/yammerrunner.php | 198 ++++++++++++++++++ 4 files changed, 203 insertions(+), 1 deletion(-) create mode 100644 plugins/YammerImport/lib/yammerrunner.php diff --git a/plugins/YammerImport/YammerImportPlugin.php b/plugins/YammerImport/YammerImportPlugin.php index 58fc8b772a..85eab74c04 100644 --- a/plugins/YammerImport/YammerImportPlugin.php +++ b/plugins/YammerImport/YammerImportPlugin.php @@ -116,6 +116,7 @@ class YammerImportPlugin extends Plugin switch ($lower) { case 'sn_yammerclient': case 'yammerimporter': + case 'yammerrunner': require_once "$base/lib/$lower.php"; return false; case 'yammeradminpanelaction': diff --git a/plugins/YammerImport/actions/yammeradminpanel.php b/plugins/YammerImport/actions/yammeradminpanel.php index 875debac92..9f935bbefb 100644 --- a/plugins/YammerImport/actions/yammeradminpanel.php +++ b/plugins/YammerImport/actions/yammeradminpanel.php @@ -133,7 +133,7 @@ class YammerAdminPanelForm extends AdminForm break; case 'import-users': case 'import-groups': - case 'import-messages': + case 'fetch-messages': case 'save-messages': $this->showImportState(); break; diff --git a/plugins/YammerImport/classes/Yammer_state.php b/plugins/YammerImport/classes/Yammer_state.php index 98a656bfc5..0174ead15d 100644 --- a/plugins/YammerImport/classes/Yammer_state.php +++ b/plugins/YammerImport/classes/Yammer_state.php @@ -38,6 +38,7 @@ class Yammer_state extends Memcached_DataObject public $state; // import state key public $request_token; // oauth request token; clear when auth is complete. public $oauth_token; // actual oauth token! clear when import is done? + public $oauth_secret; // actual oauth secret! clear when import is done? public $users_page; // last page of users we've fetched public $groups_page; // last page of groups we've fetched public $messages_oldest; // oldest message ID we've fetched @@ -55,6 +56,7 @@ class Yammer_state extends Memcached_DataObject new ColumnDef('state', 'text'), new ColumnDef('request_token', 'text'), new ColumnDef('oauth_token', 'text'), + new ColumnDef('oauth_secret', 'text'), new ColumnDef('users_page', 'int'), new ColumnDef('groups_page', 'int'), new ColumnDef('messages_oldest', 'bigint'), @@ -78,6 +80,7 @@ class Yammer_state extends Memcached_DataObject 'state' => DB_DATAOBJECT_STR, 'request_token' => DB_DATAOBJECT_STR, 'oauth_token' => DB_DATAOBJECT_STR, + 'oauth_secret' => DB_DATAOBJECT_STR, 'users_page' => DB_DATAOBJECT_INT, 'groups_page' => DB_DATAOBJECT_INT, 'messages_oldest' => DB_DATAOBJECT_INT, diff --git a/plugins/YammerImport/lib/yammerrunner.php b/plugins/YammerImport/lib/yammerrunner.php new file mode 100644 index 0000000000..e229b2acbe --- /dev/null +++ b/plugins/YammerImport/lib/yammerrunner.php @@ -0,0 +1,198 @@ +. + */ + +if (!defined('STATUSNET')) { + exit(1); +} + +/** + * State machine for running through Yammer import. + * + * @package YammerImportPlugin + * @author Brion Vibber + */ +class YammerRunner +{ + private $state; + private $client; + private $importer; + + function __construct() + { + $state = Yammer_state::staticGet('id', 1); + if (!$state) { + common_log(LOG_ERR, "No YammerImport state during import run. Should not happen!"); + throw new ServerException('No YammerImport state during import run.'); + } + + $this->state = $state; + $this->client = new SN_YammerClient( + common_config('yammer', 'consumer_key'), + common_config('yammer', 'consumer_secret'), + $this->state->oauth_token, + $this->state->oauth_secret); + $this->importer = new YammerImporter($client); + } + + public function iterate() + { + + switch($state->state) + { + case null: + case 'requesting-auth': + // Neither of these should reach our background state! + common_log(LOG_ERR, "Non-background YammerImport state '$state->state' during import run!"); + return false; + case 'import-users': + return $this->iterateUsers(); + case 'import-groups': + return $this->iterateGroups(); + case 'fetch-messages': + return $this->iterateFetchMessages(); + case 'save-messages': + return $this->iterateSaveMessages(); + default: + common_log(LOG_ERR, "Invalid YammerImport state '$state->state' during import run!"); + return false; + } + } + + /** + * Trundle through one 'page' return of up to 50 user accounts retrieved + * from the Yammer API, importing them as we go. + * + * When we run out of users, move on to groups. + * + * @return boolean success + */ + private function iterateUsers() + { + $old = clone($this->state); + + $page = intval($this->state->users_page) + 1; + $data = $this->client->users(array('page' => $page)); + + if (count($data) == 0) { + common_log(LOG_INFO, "Finished importing Yammer users; moving on to groups."); + $this->state->state = 'import-groups'; + } else { + foreach ($data as $item) { + $user = $imp->importUser($item); + common_log(LOG_INFO, "Imported Yammer user " . $item['id'] . " as $user->nickname ($user->id)"); + } + $this->state->users_page = $page; + } + $this->state->update($old); + return true; + } + + /** + * Trundle through one 'page' return of up to 20 user groups retrieved + * from the Yammer API, importing them as we go. + * + * When we run out of groups, move on to messages. + * + * @return boolean success + */ + private function iterateGroups() + { + $old = clone($this->state); + + $page = intval($this->state->groups_page) + 1; + $data = $this->client->groups(array('page' => $page)); + + if (count($data) == 0) { + common_log(LOG_INFO, "Finished importing Yammer groups; moving on to messages."); + $this->state->state = 'import-messages'; + } else { + foreach ($data as $item) { + $group = $imp->importGroup($item); + common_log(LOG_INFO, "Imported Yammer group " . $item['id'] . " as $group->nickname ($group->id)"); + } + $this->state->groups_page = $page; + } + $this->state->update($old); + return true; + } + + /** + * Trundle through one 'page' return of up to 20 public messages retrieved + * from the Yammer API, saving them to our stub table for future import in + * correct chronological order. + * + * When we run out of messages to fetch, move on to saving the messages. + * + * @return boolean success + */ + private function iterateFetchMessages() + { + $old = clone($this->state); + + $oldest = intval($this->state->messages_oldest); + if ($oldest) { + $params = array('older_than' => $oldest); + } else { + $params = array(); + } + $data = $this->client->messages($params); + $messages = $data['messages']; + + if (count($data) == 0) { + common_log(LOG_INFO, "Finished fetching Yammer messages; moving on to save messages."); + $this->state->state = 'save-messages'; + } else { + foreach ($data as $item) { + Yammer_notice_stub::record($item['id'], $item); + $oldest = $item['id']; + } + $this->state->messages_oldest = $oldest; + } + $this->state->update($old); + return true; + } + + private function iterateSaveMessages() + { + $old = clone($this->state); + + $newest = intval($this->state->messages_newest); + if ($newest) { + $stub->addWhere('id > ' . $newest); + } + $stub->limit(20); + $stub->find(); + + if ($stub->N == 0) { + common_log(LOG_INFO, "Finished saving Yammer messages; import complete!"); + $this->state->state = 'done'; + } else { + while ($stub->fetch()) { + $item = json_decode($stub->json_data); + $notice = $this->importer->importNotice($item); + common_log(LOG_INFO, "Imported Yammer notice " . $item['id'] . " as $notice->id"); + $newest = $item['id']; + } + $this->state->messages_newest = $newest; + } + $this->state->update($old); + return true; + } + +}