From 854c82cfd53cb071afa39259fb467b4730bd6494 Mon Sep 17 00:00:00 2001 From: Evan Prodromou Date: Sun, 28 Jun 2009 14:38:34 -0400 Subject: [PATCH] start of queuemanager code --- classes/Queue_item.php | 11 +++++-- lib/util.php | 75 ------------------------------------------ 2 files changed, 8 insertions(+), 78 deletions(-) diff --git a/classes/Queue_item.php b/classes/Queue_item.php index 9b909ec22b..295c321b57 100644 --- a/classes/Queue_item.php +++ b/classes/Queue_item.php @@ -4,7 +4,7 @@ */ require_once INSTALLDIR.'/classes/Memcached_DataObject.php'; -class Queue_item extends Memcached_DataObject +class Queue_item extends Memcached_DataObject { ###START_AUTOCODE /* the code below is auto generated do not remove the above tag */ @@ -13,7 +13,7 @@ class Queue_item extends Memcached_DataObject public $notice_id; // int(4) primary_key not_null public $transport; // varchar(8) primary_key not_null public $created; // datetime() not_null - public $claimed; // datetime() + public $claimed; // datetime() /* Static get */ function staticGet($k,$v=null) @@ -24,7 +24,7 @@ class Queue_item extends Memcached_DataObject function sequenceKey() { return array(false, false); } - + static function top($transport) { $qi = new Queue_item(); @@ -54,4 +54,9 @@ class Queue_item extends Memcached_DataObject $qi = null; return null; } + + function &pkeyGet($kv) + { + return Memcached_DataObject::pkeyGet('Queue_item', $kv); + } } diff --git a/lib/util.php b/lib/util.php index 9c1af7a0dc..3f924c8dec 100644 --- a/lib/util.php +++ b/lib/util.php @@ -889,69 +889,6 @@ function common_enqueue_notice($notice) return $result; } -function common_enqueue_notice_stomp($notice, $transports) -{ - // use an external message queue system via STOMP - require_once("Stomp.php"); - - $server = common_config('queue','stomp_server'); - $username = common_config('queue', 'stomp_username'); - $password = common_config('queue', 'stomp_password'); - - $con = new Stomp($server); - - if (!$con->connect($username, $password)) { - common_log(LOG_ERR, 'Failed to connect to queue server'); - return false; - } - - $queue_basename = common_config('queue','queue_basename'); - - foreach ($transports as $transport) { - $result = $con->send('/queue/'.$queue_basename.'-'.$transport, // QUEUE - $notice->id, // BODY of the message - array ('created' => $notice->created)); - if (!$result) { - common_log(LOG_ERR, 'Error sending to '.$transport.' queue'); - return false; - } - common_log(LOG_DEBUG, 'complete remote queueing notice ID = ' . $notice->id . ' for ' . $transport); - } - - //send tags as headers, so they can be used as JMS selectors - common_log(LOG_DEBUG, 'searching for tags ' . $notice->id); - $tags = array(); - $tag = new Notice_tag(); - $tag->notice_id = $notice->id; - if ($tag->find()) { - while ($tag->fetch()) { - common_log(LOG_DEBUG, 'tag found = ' . $tag->tag); - array_push($tags,$tag->tag); - } - } - $tag->free(); - - $con->send('/topic/laconica.'.$notice->profile_id, - $notice->content, - array( - 'profile_id' => $notice->profile_id, - 'created' => $notice->created, - 'tags' => implode($tags,' - ') - ) - ); - common_log(LOG_DEBUG, 'sent to personal topic ' . $notice->id); - $con->send('/topic/laconica.allusers', - $notice->content, - array( - 'profile_id' => $notice->profile_id, - 'created' => $notice->created, - 'tags' => implode($tags,' - ') - ) - ); - common_log(LOG_DEBUG, 'sent to catch-all topic ' . $notice->id); - $result = true; -} - function common_enqueue_notice_db($notice, $transports) { // in any other case, 'internal' @@ -962,18 +899,6 @@ function common_enqueue_notice_db($notice, $transports) function common_enqueue_notice_transport($notice, $transport) { - $qi = new Queue_item(); - $qi->notice_id = $notice->id; - $qi->transport = $transport; - $qi->created = $notice->created; - $result = $qi->insert(); - if (!$result) { - $last_error = &PEAR::getStaticProperty('DB_DataObject','lastError'); - common_log(LOG_ERR, 'DB error inserting queue item: ' . $last_error->message); - throw new ServerException('DB error inserting queue item: ' . $last_error->message); - } - common_log(LOG_DEBUG, 'complete queueing notice ID = ' . $notice->id . ' for ' . $transport); - return true; } function common_real_broadcast($notice, $remote=false)