From 681065d7342f209800014a54e400a687f615640c Mon Sep 17 00:00:00 2001 From: Craig Andrews Date: Wed, 13 Jan 2010 15:35:53 -0500 Subject: [PATCH 1/2] Made the IMAP plugin work in the style of the new queue/iomanagers --- plugins/Imap/ImapPlugin.php | 38 ++++---- plugins/Imap/imapdaemon.php | 147 ------------------------------- plugins/Imap/imapmailhandler.php | 32 +++++++ plugins/Imap/imapmanager.php | 129 +++++++++++++++++++++++++++ 4 files changed, 184 insertions(+), 162 deletions(-) delete mode 100755 plugins/Imap/imapdaemon.php create mode 100644 plugins/Imap/imapmailhandler.php create mode 100644 plugins/Imap/imapmanager.php diff --git a/plugins/Imap/ImapPlugin.php b/plugins/Imap/ImapPlugin.php index d9768b6802..89a775a16a 100644 --- a/plugins/Imap/ImapPlugin.php +++ b/plugins/Imap/ImapPlugin.php @@ -46,8 +46,6 @@ class ImapPlugin extends Plugin public $user; public $password; public $poll_frequency = 60; - public static $instances = array(); - public static $daemon_added = array(); function initialize(){ if(!isset($this->mailbox)){ @@ -63,24 +61,34 @@ class ImapPlugin extends Plugin throw new Exception("must specify a poll_frequency"); } - self::$instances[] = $this; return true; } - function cleanup(){ - $index = array_search($this, self::$instances); - unset(self::$instances[$index]); - return true; - } - - function onGetValidDaemons($daemons) + /** + * Load related modules when needed + * + * @param string $cls Name of the class to be loaded + * + * @return boolean hook value; true means continue processing, false means stop. + */ + function onAutoload($cls) { - if(! self::$daemon_added){ - array_push($daemons, INSTALLDIR . - '/plugins/Imap/imapdaemon.php'); - self::$daemon_added = true; + $dir = dirname(__FILE__); + + switch ($cls) + { + case 'ImapManager': + case 'IMAPMailHandler': + include_once $dir . '/'.strtolower($cls).'.php'; + return false; + default: + return true; } - return true; + } + + function onStartIoManagerClasses(&$classes) + { + $classes[] = new ImapManager($this); } function onPluginVersion(&$versions) diff --git a/plugins/Imap/imapdaemon.php b/plugins/Imap/imapdaemon.php deleted file mode 100755 index 7e60e13763..0000000000 --- a/plugins/Imap/imapdaemon.php +++ /dev/null @@ -1,147 +0,0 @@ -#!/usr/bin/env php -. - */ - -define('INSTALLDIR', realpath(dirname(__FILE__) . '/../..')); - -$shortoptions = 'fi::'; -$longoptions = array('id::', 'foreground'); - -$helptext = <<$value) - { - $this->$attr = $value; - } - - $this->log(LOG_INFO, "INITIALIZE IMAPDaemon {" . $this->name() . "}"); - } - - function name() - { - return strtolower('imapdaemon.'.$this->user.'.'.crc32($this->mailbox)); - } - - function run() - { - $this->connect(); - while(true) - { - if(imap_ping($this->conn) || $this->connect()) - { - $this->check_mailbox(); - } - sleep($this->poll_frequency); - } - } - - function check_mailbox() - { - $count = imap_num_msg($this->conn); - $this->log(LOG_INFO, "Found $count messages"); - if($count > 0){ - $handler = new IMAPMailHandler(); - for($i=1; $i <= $count; $i++) - { - $rawmessage = imap_fetchheader($this->conn, $count, FT_PREFETCHTEXT) . imap_body($this->conn, $i); - $handler->handle_message($rawmessage); - imap_delete($this->conn, $i); - } - imap_expunge($this->conn); - $this->log(LOG_INFO, "Finished processing messages"); - } - } - - function log($level, $msg) - { - $text = $this->name() . ': '.$msg; - common_log($level, $text); - if (!$this->daemonize) - { - $line = common_log_line($level, $text); - echo $line; - echo "\n"; - } - } - - function connect() - { - $this->conn = imap_open($this->mailbox, $this->user, $this->password); - if($this->conn){ - $this->log(LOG_INFO, "Connected"); - return true; - }else{ - $this->log(LOG_INFO, "Failed to connect: " . imap_last_error()); - return false; - } - } -} - -class IMAPMailHandler extends MailHandler -{ - function error($from, $msg) - { - $this->log(LOG_INFO, "Error: $from $msg"); - $headers['To'] = $from; - $headers['Subject'] = _m('Error'); - - return mail_send(array($from), $headers, $msg); - } -} - -if (have_option('i', 'id')) { - $id = get_option_value('i', 'id'); -} else if (count($args) > 0) { - $id = $args[0]; -} else { - $id = null; -} - -$foreground = have_option('f', 'foreground'); - -foreach(ImapPlugin::$instances as $pluginInstance){ - - $daemon = new IMAPDaemon($id, !$foreground, array( - 'mailbox' => $pluginInstance->mailbox, - 'user' => $pluginInstance->user, - 'password' => $pluginInstance->password, - 'poll_frequency' => $pluginInstance->poll_frequency - )); - - $daemon->runOnce(); - -} diff --git a/plugins/Imap/imapmailhandler.php b/plugins/Imap/imapmailhandler.php new file mode 100644 index 0000000000..3d4b6113a9 --- /dev/null +++ b/plugins/Imap/imapmailhandler.php @@ -0,0 +1,32 @@ +. + */ + +if (!defined('STATUSNET') && !defined('LACONICA')) { exit(1); } + +class IMAPMailHandler extends MailHandler +{ + function error($from, $msg) + { + $this->log(LOG_INFO, "Error: $from $msg"); + $headers['To'] = $from; + $headers['Subject'] = _m('Error'); + + return mail_send(array($from), $headers, $msg); + } +} diff --git a/plugins/Imap/imapmanager.php b/plugins/Imap/imapmanager.php new file mode 100644 index 0000000000..e4fda58099 --- /dev/null +++ b/plugins/Imap/imapmanager.php @@ -0,0 +1,129 @@ +. + * + * @category Plugin + * @package StatusNet + * @author Craig Andrews + * @copyright 2009-2010 StatusNet, Inc. + * @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0 + * @link http://status.net/ + */ + +class ImapManager extends IoManager +{ + protected $conn = null; + + function __construct($plugin) + { + $this->plugin = $plugin; + } + + /** + * Fetch the singleton manager for the current site. + * @return mixed ImapManager, or false if unneeded + */ + public static function get() + { + throw new Exception('ImapManager should be created using it\'s constructor, not the static get method'); + } + + /** + * Lists the IM connection socket to allow i/o master to wake + * when input comes in here as well as from the queue source. + * + * @return array of resources + */ + public function getSockets() + { + return array(); + } + + /** + * Tell the i/o master we need one instance for each supporting site + * being handled in this process. + */ + public static function multiSite() + { + return IoManager::INSTANCE_PER_SITE; + } + + /** + * Initialize connection to server. + * @return boolean true on success + */ + public function start($master) + { + if(parent::start($master)) + { + $this->conn = $this->connect(); + return true; + }else{ + return false; + } + } + + public function handleInput($socket) + { + $this->check_mailbox(); + return true; + } + + public function poll() + { + return $this->check_mailbox() > 0; + } + + function pollInterval() + { + return $this->plugin->poll_frequency; + } + + protected function connect() + { + $this->conn = imap_open($this->plugin->mailbox, $this->plugin->user, $this->plugin->password); + if($this->conn){ + common_log(LOG_INFO, "Connected"); + return $this->conn; + }else{ + common_log(LOG_INFO, "Failed to connect: " . imap_last_error()); + return $this->conn; + } + } + + protected function check_mailbox() + { + imap_ping($this->conn); + $count = imap_num_msg($this->conn); + common_log(LOG_INFO, "Found $count messages"); + if($count > 0){ + $handler = new IMAPMailHandler(); + for($i=1; $i <= $count; $i++) + { + $rawmessage = imap_fetchheader($this->conn, $count, FT_PREFETCHTEXT) . imap_body($this->conn, $i); + $handler->handle_message($rawmessage); + imap_delete($this->conn, $i); + } + imap_expunge($this->conn); + common_log(LOG_INFO, "Finished processing messages"); + } + return $count; + } +} From d032fc038a32c142a6156b1b33ecd6e09ab4764c Mon Sep 17 00:00:00 2001 From: Evan Prodromou Date: Wed, 13 Jan 2010 12:37:01 -0800 Subject: [PATCH 2/2] make sure whoGets() doesn't write anything --- classes/Notice.php | 69 +++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 62 insertions(+), 7 deletions(-) diff --git a/classes/Notice.php b/classes/Notice.php index 02cd20391a..a43ce867b5 100644 --- a/classes/Notice.php +++ b/classes/Notice.php @@ -335,7 +335,11 @@ class Notice extends Memcached_DataObject $notice->saveTags(); - $notice->addToInboxes(); + $groups = $notice->saveGroups(); + + $recipients = $notice->saveReplies(); + + $notice->addToInboxes($groups, $recipients); $notice->saveUrls(); @@ -822,7 +826,7 @@ class Notice extends Memcached_DataObject return $ids; } - function whoGets() + function whoGets($groups=null, $recipients=null) { $c = self::memcache(); @@ -833,6 +837,14 @@ class Notice extends Memcached_DataObject } } + if (is_null($groups)) { + $groups = $this->getGroups(); + } + + if (is_null($recipients)) { + $recipients = $this->getReplies(); + } + $users = $this->getSubscribedUsers(); // FIXME: kind of ignoring 'transitional'... @@ -845,7 +857,6 @@ class Notice extends Memcached_DataObject $ni[$id] = NOTICE_INBOX_SOURCE_SUB; } - $groups = $this->saveGroups(); $profile = $this->getProfile(); foreach ($groups as $group) { @@ -860,8 +871,6 @@ class Notice extends Memcached_DataObject } } - $recipients = $this->saveReplies(); - foreach ($recipients as $recipient) { if (!array_key_exists($recipient, $ni)) { @@ -880,9 +889,9 @@ class Notice extends Memcached_DataObject return $ni; } - function addToInboxes() + function addToInboxes($groups, $recipients) { - $ni = $this->whoGets(); + $ni = $this->whoGets($groups, $recipients); Inbox::bulkInsert($this->id, array_keys($ni)); @@ -1086,6 +1095,52 @@ class Notice extends Memcached_DataObject return $recipientIds; } + function getReplies() + { + // XXX: cache me + + $ids = array(); + + $reply = new Reply(); + $reply->selectAdd(); + $reply->selectAdd('profile_id'); + $reply->notice_id = $this->id; + + if ($reply->find()) { + while($reply->fetch()) { + $ids[] = $reply->profile_id; + } + } + + $reply->free(); + + return $ids; + } + + function getGroups() + { + // XXX: cache me + + $ids = array(); + + $gi = new Group_inbox(); + + $gi->selectAdd(); + $gi->selectAdd('group_id'); + + $gi->notice_id = $this->id; + + if ($gi->find()) { + while ($gi->fetch()) { + $ids[] = $gi->group_id; + } + } + + $gi->free(); + + return $ids; + } + function asAtomEntry($namespace=false, $source=false) { $profile = $this->getProfile();