diff --git a/plugins/TwitterBridge/daemons/twitterdaemon.php b/plugins/TwitterBridge/daemons/twitterdaemon.php index f97f3179b8..851d191dd2 100644 --- a/plugins/TwitterBridge/daemons/twitterdaemon.php +++ b/plugins/TwitterBridge/daemons/twitterdaemon.php @@ -114,7 +114,7 @@ class TwitterManager extends IoManager * @fixme check their last-id and check whether we'll need to do a manual pull. * @fixme abstract out the fetching so we can work over multiple sites. */ - function initStreams() + protected function initStreams() { // Pull Twitter user IDs for all users we want to pull data for $flink = new Foreign_link(); @@ -146,7 +146,7 @@ class TwitterManager extends IoManager * * @param $users array of Twitter-side user IDs */ - function spawnStream($users) + protected function spawnStream($users) { $stream = $this->initSiteStream(); $stream->followUsers($userIds); @@ -168,7 +168,7 @@ class TwitterManager extends IoManager * * @return TwitterStreamReader */ - function initSiteStream() + protected function initSiteStream() { $auth = $this->siteStreamAuth(); $stream = new TwitterSiteStream($auth); @@ -190,7 +190,7 @@ class TwitterManager extends IoManager * * @return TwitterOAuthClient */ - function siteStreamAuth() + protected function siteStreamAuth() { $token = common_config('twitter', 'stream_token'); $secret = common_config('twitter', 'stream_secret'); @@ -205,7 +205,7 @@ class TwitterManager extends IoManager * * @return array of resources */ - function getSockets() + public function getSockets() { $sockets = array(); foreach ($this->streams as $stream) { @@ -223,7 +223,7 @@ class TwitterManager extends IoManager * @param resource $socket * @return boolean success */ - function handleInput($socket) + public function handleInput($socket) { foreach ($this->streams as $stream) { foreach ($stream->getSockets() as $aSocket) { @@ -236,11 +236,12 @@ class TwitterManager extends IoManager } /** - * Start the system up! + * Start the i/o system up! Prepare our connections and start opening them. + * * @fixme do some rate-limiting on the stream setup * @fixme do some sensible backoff on failure etc */ - function start() + public function start() { $this->initStreams(); foreach ($this->streams as $stream) { @@ -249,7 +250,10 @@ class TwitterManager extends IoManager return true; } - function finish() + /** + * Close down our connections when the daemon wraps up for business. + */ + public function finish() { foreach ($this->streams as $index => $stream) { $stream->close(); @@ -280,33 +284,21 @@ class TwitterManager extends IoManager /** * Event callback notifying that a user has a new message in their home timeline. + * We store the incoming message into the queues for processing, keeping our own + * daemon running as shiny-fast as possible. * - * @param object $data JSON data: Twitter status update + * @param object $status JSON data: Twitter status update + * @fixme in all-sites mode we may need to route queue items into another site's + * destination queues, or multiple sites. */ - protected function onTwitterStatus($data, $context) + protected function onTwitterStatus($status, $context) { - $importer = new TwitterImport(); - $notice = $importer->importStatus($data); - if ($notice) { - $user = $this->getTwitterUser($context); - Inbox::insertNotice($user->id, $notice->id); - } - } - - /** - * @fixme what about handling multiple sites? - */ - function getTwitterUser($context) - { - if ($context->source != 'sitestream') { - throw new ServerException("Unexpected stream source"); - } - $flink = Foreign_link::getByForeignID(TWITTER_SERVICE, $context->for_user); - if ($flink) { - return $flink->getUser(); - } else { - throw new ServerException("No local user for this Twitter ID"); - } + $data = array( + 'status' => $status, + 'for_user' => $context->for_user, + ); + $qm = QueueManager::get(); + $qm->enqueue($data, 'tweetin'); } }