Merge branch '0.8.x' of git@gitorious.org:laconica/dev into 0.8.x

This commit is contained in:
Sarven Capadisli 2009-04-08 22:55:34 +00:00
commit 03a4a4bebf
14 changed files with 1148 additions and 69 deletions

View File

@ -31,6 +31,8 @@ if (!defined('LACONICA')) {
exit(1);
}
require_once(INSTALLDIR.'/lib/noticelist.php');
/**
* Conversation tree in the browser
*
@ -43,7 +45,6 @@ if (!defined('LACONICA')) {
class ConversationAction extends Action
{
var $id = null;
var $notices = null;
var $page = null;
/**
@ -58,10 +59,9 @@ class ConversationAction extends Action
{
parent::prepare($args);
$this->id = $this->trimmed('id');
if (!$this->id) {
if (empty($this->id)) {
return false;
}
$this->notices = $this->getNotices();
$this->page = $this->trimmed('page');
if (empty($this->page)) {
$this->page = 1;
@ -69,36 +69,9 @@ class ConversationAction extends Action
return true;
}
/**
* Get notices
*
* @param integer $limit max number of notices to return
*
* @return array notices
*/
function getNotices($limit=0)
{
$qry = 'SELECT notice.*, '.
'FROM notice WHERE conversation = %d '.
'ORDER BY created ';
$offset = 0;
$limit = NOTICES_PER_PAGE + 1;
if (common_config('db', 'type') == 'pgsql') {
$qry .= ' LIMIT ' . $limit . ' OFFSET ' . $offset;
} else {
$qry .= ' LIMIT ' . $offset . ', ' . $limit;
}
return Notice::getStream(sprintf($qry, $this->id),
'notice:conversation:'.$this->id,
$offset, $limit);
}
function handle($args)
{
parent::handle($args);
$this->showPage();
}
@ -111,7 +84,18 @@ class ConversationAction extends Action
{
// FIXME this needs to be a tree, not a list
$nl = new NoticeList($this->notices, $this);
$qry = 'SELECT * FROM notice WHERE conversation = %s ';
$offset = ($this->page-1)*NOTICES_PER_PAGE;
$limit = NOTICES_PER_PAGE + 1;
$txt = sprintf($qry, $this->id);
$notices = Notice::getStream($txt,
'notice:conversation:'.$this->id,
$offset, $limit);
$nl = new NoticeList($notices, $this);
$cnt = $nl->show();

61
classes/Status_network.php Executable file
View File

@ -0,0 +1,61 @@
<?php
/**
* Table Definition for status_network
*/
class Status_network extends DB_DataObject
{
###START_AUTOCODE
/* the code below is auto generated do not remove the above tag */
public $__table = 'status_network'; // table name
public $nickname; // varchar(64) primary_key not_null
public $hostname; // varchar(255) unique_key
public $pathname; // varchar(255) unique_key
public $sitename; // varchar(255)
public $dbhost; // varchar(255)
public $dbuser; // varchar(255)
public $dbpass; // varchar(255)
public $dbname; // varchar(255)
public $created; // datetime() not_null
public $modified; // timestamp() not_null default_CURRENT_TIMESTAMP
/* Static get */
function staticGet($k,$v=NULL) { return DB_DataObject::staticGet('Status_network',$k,$v); }
/* the code above is auto generated do not remove the tag below */
###END_AUTOCODE
static function setupDB($dbhost, $dbuser, $dbpass, $dbname)
{
global $config;
$config['db']['database_'.$dbname] = "mysqli://$dbuser:$dbpass@$dbhost/$dbname";
$config['db']['ini_'.$dbname] = INSTALLDIR.'/classes/statusnet.ini';
$config['db']['table_status_network'] = $dbname;
return true;
}
static function setupSite($servername, $pathname)
{
global $config;
$parts = explode('.', $servername);
$sn = Status_network::staticGet('nickname', $parts[0]);
if (!empty($sn)) {
$dbhost = (empty($sn->dbhost)) ? 'localhost' : $sn->dbhost;
$dbuser = (empty($sn->dbuser)) ? $sn->nickname : $sn->dbuser;
$dbpass = $sn->dbpass;
$dbname = (empty($sn->dbname)) ? $sn->nickname : $sn->dbname;
$config['db']['database'] = "mysqli://$dbuser:$dbpass@$dbhost/$dbname";
$config['site']['name'] = $sn->sitename;
return true;
} else {
return false;
}
}
}

17
classes/statusnet.ini Executable file
View File

@ -0,0 +1,17 @@
[status_network]
nickname = 130
hostname = 2
pathname = 2
sitename = 2
dbhost = 2
dbuser = 2
dbpass = 2
dbname = 2
created = 142
modified = 384
[status_network__keys]
nickname = K
hostname = U
pathname = U

View File

@ -128,6 +128,14 @@ $config['sphinx']['port'] = 3312;
#background. See the README for details.
#$config['queue']['enabled'] = true;
#Queue subsystem
#subsystems: internal (default) or stomp
#using stomp requires an external message queue server
#$config['queue']['subsystem'] = 'stomp';
#$config['queue']['stomp_server'] = 'tcp://localhost:61613';
#use different queue_basename for each laconica instance managed by the server
#$config['queue']['queue_basename'] = 'laconica';
#The following customise the behaviour of the various daemons:
#$config['daemon']['piddir'] = '/var/run';
#$config['daemon']['user'] = false;
@ -188,3 +196,11 @@ $config['sphinx']['port'] = 3312;
#Use a different hostname for SSL-encrypted pages
#$config['site']['sslserver'] = 'secure.example.org';
#If you have a lot of status networks on the same server, you can
#store the site data in a database and switch as follows
#Status_network::setupDB('localhost', 'statusnet', 'statuspass', 'statusnet');
#if (!Status_network::setupSite($_server, $_path)) {
# print "Error\n";
# exit(1);
#}

17
db/site.sql Normal file
View File

@ -0,0 +1,17 @@
/* For managing multiple sites */
create table status_network (
nickname varchar(64) primary key comment 'nickname',
hostname varchar(255) unique key comment 'alternate hostname if any',
pathname varchar(255) unique key comment 'alternate pathname if any',
sitename varchar(255) comment 'display name',
dbhost varchar(255) comment 'database host',
dbuser varchar(255) comment 'database username',
dbpass varchar(255) comment 'database password',
dbname varchar(255) comment 'database name',
created datetime not null comment 'date this record was created',
modified timestamp comment 'date this record was modified'
) ENGINE=InnoDB CHARACTER SET utf8 COLLATE utf8_general_ci;

594
extlib/Stomp.php Normal file
View File

@ -0,0 +1,594 @@
<?php
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/* vim: set expandtab tabstop=3 shiftwidth=3: */
require_once 'Stomp/Frame.php';
/**
* A Stomp Connection
*
*
* @package Stomp
* @author Hiram Chirino <hiram@hiramchirino.com>
* @author Dejan Bosanac <dejan@nighttale.net>
* @author Michael Caplan <mcaplan@labnet.net>
* @version $Revision: 43 $
*/
class Stomp
{
/**
* Perform request synchronously
*
* @var boolean
*/
public $sync = false;
/**
* Default prefetch size
*
* @var int
*/
public $prefetchSize = 1;
/**
* Client id used for durable subscriptions
*
* @var string
*/
public $clientId = null;
protected $_brokerUri = null;
protected $_socket = null;
protected $_hosts = array();
protected $_params = array();
protected $_subscriptions = array();
protected $_defaultPort = 61613;
protected $_currentHost = - 1;
protected $_attempts = 10;
protected $_username = '';
protected $_password = '';
protected $_sessionId;
protected $_read_timeout_seconds = 60;
protected $_read_timeout_milliseconds = 0;
/**
* Constructor
*
* @param string $brokerUri Broker URL
* @throws Stomp_Exception
*/
public function __construct ($brokerUri)
{
$this->_brokerUri = $brokerUri;
$this->_init();
}
/**
* Initialize connection
*
* @throws Stomp_Exception
*/
protected function _init ()
{
$pattern = "|^(([a-zA-Z]+)://)+\(*([a-zA-Z0-9\.:/i,-]+)\)*\??([a-zA-Z0-9=]*)$|i";
if (preg_match($pattern, $this->_brokerUri, $regs)) {
$scheme = $regs[2];
$hosts = $regs[3];
$params = $regs[4];
if ($scheme != "failover") {
$this->_processUrl($this->_brokerUri);
} else {
$urls = explode(",", $hosts);
foreach ($urls as $url) {
$this->_processUrl($url);
}
}
if ($params != null) {
parse_str($params, $this->_params);
}
} else {
require_once 'Stomp/Exception.php';
throw new Stomp_Exception("Bad Broker URL {$this->_brokerUri}");
}
}
/**
* Process broker URL
*
* @param string $url Broker URL
* @throws Stomp_Exception
* @return boolean
*/
protected function _processUrl ($url)
{
$parsed = parse_url($url);
if ($parsed) {
array_push($this->_hosts, array($parsed['host'] , $parsed['port'] , $parsed['scheme']));
} else {
require_once 'Stomp/Exception.php';
throw new Stomp_Exception("Bad Broker URL $url");
}
}
/**
* Make socket connection to the server
*
* @throws Stomp_Exception
*/
protected function _makeConnection ()
{
if (count($this->_hosts) == 0) {
require_once 'Stomp/Exception.php';
throw new Stomp_Exception("No broker defined");
}
// force disconnect, if previous established connection exists
$this->disconnect();
$i = $this->_currentHost;
$att = 0;
$connected = false;
while (! $connected && $att ++ < $this->_attempts) {
if (isset($this->_params['randomize']) && $this->_params['randomize'] == 'true') {
$i = rand(0, count($this->_hosts) - 1);
} else {
$i = ($i + 1) % count($this->_hosts);
}
$broker = $this->_hosts[$i];
$host = $broker[0];
$port = $broker[1];
$scheme = $broker[2];
if ($port == null) {
$port = $this->_defaultPort;
}
if ($this->_socket != null) {
fclose($this->_socket);
$this->_socket = null;
}
$this->_socket = @fsockopen($scheme . '://' . $host, $port);
if (!is_resource($this->_socket) && $att >= $this->_attempts && !array_key_exists($i + 1, $this->_hosts)) {
require_once 'Stomp/Exception.php';
throw new Stomp_Exception("Could not connect to $host:$port ($att/{$this->_attempts})");
} else if (is_resource($this->_socket)) {
$connected = true;
$this->_currentHost = $i;
break;
}
}
if (! $connected) {
require_once 'Stomp/Exception.php';
throw new Stomp_Exception("Could not connect to a broker");
}
}
/**
* Connect to server
*
* @param string $username
* @param string $password
* @return boolean
* @throws Stomp_Exception
*/
public function connect ($username = '', $password = '')
{
$this->_makeConnection();
if ($username != '') {
$this->_username = $username;
}
if ($password != '') {
$this->_password = $password;
}
$headers = array('login' => $this->_username , 'passcode' => $this->_password);
if ($this->clientId != null) {
$headers["client-id"] = $this->clientId;
}
$frame = new Stomp_Frame("CONNECT", $headers);
$this->_writeFrame($frame);
$frame = $this->readFrame();
if ($frame instanceof Stomp_Frame && $frame->command == 'CONNECTED') {
$this->_sessionId = $frame->headers["session"];
return true;
} else {
require_once 'Stomp/Exception.php';
if ($frame instanceof Stomp_Frame) {
throw new Stomp_Exception("Unexpected command: {$frame->command}", 0, $frame->body);
} else {
throw new Stomp_Exception("Connection not acknowledged");
}
}
}
/**
* Check if client session has ben established
*
* @return boolean
*/
public function isConnected ()
{
return !empty($this->_sessionId) && is_resource($this->_socket);
}
/**
* Current stomp session ID
*
* @return string
*/
public function getSessionId()
{
return $this->_sessionId;
}
/**
* Send a message to a destination in the messaging system
*
* @param string $destination Destination queue
* @param string|Stomp_Frame $msg Message
* @param array $properties
* @param boolean $sync Perform request synchronously
* @return boolean
*/
public function send ($destination, $msg, $properties = null, $sync = null)
{
if ($msg instanceof Stomp_Frame) {
$msg->headers['destination'] = $destination;
$msg->headers = array_merge($msg->headers, $properties);
$frame = $msg;
} else {
$headers = $properties;
$headers['destination'] = $destination;
$frame = new Stomp_Frame('SEND', $headers, $msg);
}
$this->_prepareReceipt($frame, $sync);
$this->_writeFrame($frame);
return $this->_waitForReceipt($frame, $sync);
}
/**
* Prepair frame receipt
*
* @param Stomp_Frame $frame
* @param boolean $sync
*/
protected function _prepareReceipt (Stomp_Frame $frame, $sync)
{
$receive = $this->sync;
if ($sync !== null) {
$receive = $sync;
}
if ($receive == true) {
$frame->headers['receipt'] = md5(microtime());
}
}
/**
* Wait for receipt
*
* @param Stomp_Frame $frame
* @param boolean $sync
* @return boolean
* @throws Stomp_Exception
*/
protected function _waitForReceipt (Stomp_Frame $frame, $sync)
{
$receive = $this->sync;
if ($sync !== null) {
$receive = $sync;
}
if ($receive == true) {
$id = (isset($frame->headers['receipt'])) ? $frame->headers['receipt'] : null;
if ($id == null) {
return true;
}
$frame = $this->readFrame();
if ($frame instanceof Stomp_Frame && $frame->command == 'RECEIPT') {
if ($frame->headers['receipt-id'] == $id) {
return true;
} else {
require_once 'Stomp/Exception.php';
throw new Stomp_Exception("Unexpected receipt id {$frame->headers['receipt-id']}", 0, $frame->body);
}
} else {
require_once 'Stomp/Exception.php';
if ($frame instanceof Stomp_Frame) {
throw new Stomp_Exception("Unexpected command {$frame->command}", 0, $frame->body);
} else {
throw new Stomp_Exception("Receipt not received");
}
}
}
return true;
}
/**
* Register to listen to a given destination
*
* @param string $destination Destination queue
* @param array $properties
* @param boolean $sync Perform request synchronously
* @return boolean
* @throws Stomp_Exception
*/
public function subscribe ($destination, $properties = null, $sync = null)
{
$headers = array('ack' => 'client');
$headers['activemq.prefetchSize'] = $this->prefetchSize;
if ($this->clientId != null) {
$headers["activemq.subcriptionName"] = $this->clientId;
}
if (isset($properties)) {
foreach ($properties as $name => $value) {
$headers[$name] = $value;
}
}
$headers['destination'] = $destination;
$frame = new Stomp_Frame('SUBSCRIBE', $headers);
$this->_prepareReceipt($frame, $sync);
$this->_writeFrame($frame);
if ($this->_waitForReceipt($frame, $sync) == true) {
$this->_subscriptions[$destination] = $properties;
return true;
} else {
return false;
}
}
/**
* Remove an existing subscription
*
* @param string $destination
* @param array $properties
* @param boolean $sync Perform request synchronously
* @return boolean
* @throws Stomp_Exception
*/
public function unsubscribe ($destination, $properties = null, $sync = null)
{
$headers = array();
if (isset($properties)) {
foreach ($properties as $name => $value) {
$headers[$name] = $value;
}
}
$headers['destination'] = $destination;
$frame = new Stomp_Frame('UNSUBSCRIBE', $headers);
$this->_prepareReceipt($frame, $sync);
$this->_writeFrame($frame);
if ($this->_waitForReceipt($frame, $sync) == true) {
unset($this->_subscriptions[$destination]);
return true;
} else {
return false;
}
}
/**
* Start a transaction
*
* @param string $transactionId
* @param boolean $sync Perform request synchronously
* @return boolean
* @throws Stomp_Exception
*/
public function begin ($transactionId = null, $sync = null)
{
$headers = array();
if (isset($transactionId)) {
$headers['transaction'] = $transactionId;
}
$frame = new Stomp_Frame('BEGIN', $headers);
$this->_prepareReceipt($frame, $sync);
$this->_writeFrame($frame);
return $this->_waitForReceipt($frame, $sync);
}
/**
* Commit a transaction in progress
*
* @param string $transactionId
* @param boolean $sync Perform request synchronously
* @return boolean
* @throws Stomp_Exception
*/
public function commit ($transactionId = null, $sync = null)
{
$headers = array();
if (isset($transactionId)) {
$headers['transaction'] = $transactionId;
}
$frame = new Stomp_Frame('COMMIT', $headers);
$this->_prepareReceipt($frame, $sync);
$this->_writeFrame($frame);
return $this->_waitForReceipt($frame, $sync);
}
/**
* Roll back a transaction in progress
*
* @param string $transactionId
* @param boolean $sync Perform request synchronously
*/
public function abort ($transactionId = null, $sync = null)
{
$headers = array();
if (isset($transactionId)) {
$headers['transaction'] = $transactionId;
}
$frame = new Stomp_Frame('ABORT', $headers);
$this->_prepareReceipt($frame, $sync);
$this->_writeFrame($frame);
return $this->_waitForReceipt($frame, $sync);
}
/**
* Acknowledge consumption of a message from a subscription
* Note: This operation is always asynchronous
*
* @param string|Stomp_Frame $messageMessage ID
* @param string $transactionId
* @return boolean
* @throws Stomp_Exception
*/
public function ack ($message, $transactionId = null)
{
if ($message instanceof Stomp_Frame) {
$frame = new Stomp_Frame('ACK', $message->headers);
$this->_writeFrame($frame);
return true;
} else {
$headers = array();
if (isset($transactionId)) {
$headers['transaction'] = $transactionId;
}
$headers['message-id'] = $message;
$frame = new Stomp_Frame('ACK', $headers);
$this->_writeFrame($frame);
return true;
}
}
/**
* Graceful disconnect from the server
*
*/
public function disconnect ()
{
$header = array();
if ($this->clientId != null) {
$headers["client-id"] = $this->clientId;
}
if (is_resource($this->_socket)) {
$this->_writeFrame(new Stomp_Frame('DISCONNECT', $headers));
fclose($this->_socket);
}
$this->_socket = null;
$this->_sessionId = null;
$this->_currentHost = -1;
$this->_subscriptions = array();
$this->_username = '';
$this->_password = '';
}
/**
* Write frame to server
*
* @param Stomp_Frame $stompFrame
*/
protected function _writeFrame (Stomp_Frame $stompFrame)
{
if (!is_resource($this->_socket)) {
require_once 'Stomp/Exception.php';
throw new Stomp_Exception('Socket connection hasn\'t been established');
}
$data = $stompFrame->__toString();
$r = fwrite($this->_socket, $data, strlen($data));
if ($r === false || $r == 0) {
$this->_reconnect();
$this->_writeFrame($stompFrame);
}
}
/**
* Set timeout to wait for content to read
*
* @param int $seconds_to_wait Seconds to wait for a frame
* @param int $milliseconds Milliseconds to wait for a frame
*/
public function setReadTimeout($seconds, $milliseconds = 0)
{
$this->_read_timeout_seconds = $seconds;
$this->_read_timeout_milliseconds = $milliseconds;
}
/**
* Read responce frame from server
*
* @return Stomp_Frame|Stomp_Message_Map|boolean False when no frame to read
*/
public function readFrame ()
{
if (!$this->hasFrameToRead()) {
return false;
}
$rb = 1024;
$data = '';
do {
$read = fgets($this->_socket, $rb);
if ($read === false) {
$this->_reconnect();
return $this->readFrame();
}
$data .= $read;
$len = strlen($data);
} while (($len < 2 || ! ($data[$len - 2] == "\x00" && $data[$len - 1] == "\n")));
list ($header, $body) = explode("\n\n", $data, 2);
$header = explode("\n", $header);
$headers = array();
$command = null;
foreach ($header as $v) {
if (isset($command)) {
list ($name, $value) = explode(':', $v, 2);
$headers[$name] = $value;
} else {
$command = $v;
}
}
$frame = new Stomp_Frame($command, $headers, trim($body));
if (isset($frame->headers['amq-msg-type']) && $frame->headers['amq-msg-type'] == 'MapMessage') {
require_once 'Stomp/Message/Map.php';
return new Stomp_Message_Map($frame);
} else {
return $frame;
}
}
/**
* Check if there is a frame to read
*
* @return boolean
*/
public function hasFrameToRead()
{
$read = array($this->_socket);
$write = null;
$except = null;
$has_frame_to_read = stream_select($read, $write, $except, $this->_read_timeout_seconds, $this->_read_timeout_milliseconds);
if ($has_frame_to_read === false) {
throw new Stomp_Exception('Check failed to determin if the socket is readable');
} else if ($has_frame_to_read > 0) {
return true;
} else {
return false;
}
}
/**
* Reconnects and renews subscriptions (if there were any)
* Call this method when you detect connection problems
*/
protected function _reconnect ()
{
$subscriptions = $this->_subscriptions;
$this->connect($this->_username, $this->_password);
foreach ($subscriptions as $dest => $properties) {
$this->subscribe($dest, $properties);
}
}
/**
* Graceful object desruction
*
*/
public function __destruct()
{
$this->disconnect();
}
}
?>

View File

@ -0,0 +1,57 @@
<?php
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/* vim: set expandtab tabstop=3 shiftwidth=3: */
/**
* A Stomp Connection
*
*
* @package Stomp
* @author Michael Caplan <mcaplan@labnet.net>
* @version $Revision: 23 $
*/
class Stomp_Exception extends Exception
{
protected $_details;
/**
* Constructor
*
* @param string $message Error message
* @param int $code Error code
* @param string $details Stomp server error details
*/
public function __construct($message = null, $code = 0, $details = '')
{
$this->_details = $details;
parent::__construct($message, $code);
}
/**
* Stomp server error details
*
* @return string
*/
public function getDetails()
{
return $this->_details;
}
}
?>

80
extlib/Stomp/Frame.php Normal file
View File

@ -0,0 +1,80 @@
<?php
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/* vim: set expandtab tabstop=3 shiftwidth=3: */
/**
* Stomp Frames are messages that are sent and received on a StompConnection.
*
* @package Stomp
* @author Hiram Chirino <hiram@hiramchirino.com>
* @author Dejan Bosanac <dejan@nighttale.net>
* @author Michael Caplan <mcaplan@labnet.net>
* @version $Revision: 36 $
*/
class Stomp_Frame
{
public $command;
public $headers = array();
public $body;
/**
* Constructor
*
* @param string $command
* @param array $headers
* @param string $body
*/
public function __construct ($command = null, $headers = null, $body = null)
{
$this->_init($command, $headers, $body);
}
protected function _init ($command = null, $headers = null, $body = null)
{
$this->command = $command;
if ($headers != null) {
$this->headers = $headers;
}
$this->body = $body;
if ($this->command == 'ERROR') {
require_once 'Stomp/Exception.php';
throw new Stomp_Exception($this->headers['message'], 0, $this->body);
}
}
/**
* Convert frame to transportable string
*
* @return string
*/
public function __toString()
{
$data = $this->command . "\n";
foreach ($this->headers as $name => $value) {
$data .= $name . ": " . $value . "\n";
}
$data .= "\n";
$data .= $this->body;
return $data .= "\x00\n";
}
}
?>

37
extlib/Stomp/Message.php Normal file
View File

@ -0,0 +1,37 @@
<?php
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/* vim: set expandtab tabstop=3 shiftwidth=3: */
require_once 'Stomp/Frame.php';
/**
* Basic text stomp message
*
* @package Stomp
* @author Dejan Bosanac <dejan@nighttale.net>
* @version $Revision: 23 $
*/
class Stomp_Message extends Stomp_Frame
{
public function __construct ($body, $headers = null)
{
$this->_init("SEND", $headers, $body);
}
}
?>

View File

@ -0,0 +1,47 @@
<?php
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/* vim: set expandtab tabstop=3 shiftwidth=3: */
require_once 'Stomp/Message.php';
/**
* Message that contains a stream of uninterpreted bytes
*
* @package Stomp
* @author Dejan Bosanac <dejan@nighttale.net>
* @version $Revision: 23 $
*/
class Stomp_Message_Bytes extends Stomp_Message
{
/**
* Constructor
*
* @param string $body
* @param array $headers
*/
function __construct ($body, $headers = null)
{
$this->_init("SEND", $headers, $body);
if ($this->headers == null) {
$this->headers = array();
}
$this->headers['content-length'] = count($body);
}
}
?>

View File

@ -0,0 +1,55 @@
<?php
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/* vim: set expandtab tabstop=3 shiftwidth=3: */
require_once 'Stomp/Message.php';
/**
* Message that contains a set of name-value pairs
*
* @package Stomp
* @author Dejan Bosanac <dejan@nighttale.net>
* @version $Revision: 23 $
*/
class Stomp_Message_Map extends Stomp_Message
{
public $map;
/**
* Constructor
*
* @param Stomp_Frame|string $msg
* @param array $headers
*/
function __construct ($msg, $headers = null)
{
if ($msg instanceof Stomp_Frame) {
$this->_init($msg->command, $msg->headers, $msg->body);
$this->map = json_decode($msg->body);
} else {
$this->_init("SEND", $headers, $msg);
if ($this->headers == null) {
$this->headers = array();
}
$this->headers['amq-msg-type'] = 'MapMessage';
$this->body = json_encode($msg);
}
}
}
?>

View File

@ -197,7 +197,7 @@ class NoticeListItem extends Widget
$this->out->elementStart('div', 'entry-content');
$this->showNoticeLink();
$this->showNoticeSource();
$this->showReplyTo();
$this->showContext();
$this->out->elementEnd('div');
}
@ -421,17 +421,18 @@ class NoticeListItem extends Widget
* @return void
*/
function showReplyTo()
function showContext()
{
if ($this->notice->reply_to) {
$replyurl = common_local_url('shownotice',
array('notice' => $this->notice->reply_to));
// XXX: also show context if there are replies to this notice
if (!empty($this->notice->conversation)
&& $this->notice->conversation != $this->notice->id) {
$convurl = common_local_url('conversation',
array('id' => $this->notice->conversation));
$this->out->elementStart('dl', 'response');
$this->out->element('dt', null, _('To'));
$this->out->elementStart('dd');
$this->out->element('a', array('href' => $replyurl,
'rel' => 'in-reply-to'),
_('in reply to'));
$this->out->element('a', array('href' => $convurl),
_('in context'));
$this->out->elementEnd('dd');
$this->out->elementEnd('dl');
}

View File

@ -36,7 +36,7 @@ class QueueHandler extends Daemon
$this->set_id($id);
}
}
function class_name()
{
return ucfirst($this->transport()) . 'Handler';
@ -46,7 +46,7 @@ class QueueHandler extends Daemon
{
return strtolower($this->class_name().'.'.$this->get_id());
}
function get_id()
{
return $this->_id;
@ -56,16 +56,16 @@ class QueueHandler extends Daemon
{
$this->_id = $id;
}
function transport()
{
return null;
}
function start()
{
}
function finish()
{
}
@ -74,16 +74,10 @@ class QueueHandler extends Daemon
{
return true;
}
function run()
{
if (!$this->start()) {
return false;
}
$this->log(LOG_INFO, 'checking for queued notices');
$transport = $this->transport();
function db_dispatch() {
do {
$qi = Queue_item::top($transport);
$qi = Queue_item::top($this->transport());
if ($qi) {
$this->log(LOG_INFO, 'Got item enqueued '.common_exact_date($qi->created));
$notice = Notice::staticGet($qi->notice_id);
@ -113,8 +107,69 @@ class QueueHandler extends Daemon
} else {
$this->clear_old_claims();
$this->idle(5);
}
}
} while (true);
}
function stomp_dispatch() {
require("Stomp.php");
$con = new Stomp(common_config('queue','stomp_server'));
if (!$con->connect()) {
$this->log(LOG_ERR, 'Failed to connect to queue server');
return false;
}
$queue_basename = common_config('queue','queue_basename');
// subscribe to the relevant queue (format: basename-transport)
$con->subscribe('/queue/'.$queue_basename.'-'.$this->transport());
do {
$frame = $con->readFrame();
if ($frame) {
$this->log(LOG_INFO, 'Got item enqueued '.common_exact_date($frame->headers['created']));
// XXX: Now the queue handler receives only the ID of the
// notice, and it has to get it from the DB
// A massive improvement would be avoid DB query by transmitting
// all the notice details via queue server...
$notice = Notice::staticGet($frame->body);
if ($notice) {
$this->log(LOG_INFO, 'broadcasting notice ID = ' . $notice->id);
$result = $this->handle_notice($notice);
if ($result) {
// if the msg has been handled positively, ack it
// and the queue server will remove it from the queue
$con->ack($frame);
$this->log(LOG_INFO, 'finished broadcasting notice ID = ' . $notice->id);
}
else {
// no ack
$this->log(LOG_WARNING, 'Failed broadcast for notice ID = ' . $notice->id);
}
$notice->free();
unset($notice);
$notice = null;
} else {
$this->log(LOG_WARNING, 'queue item for notice that does not exist');
}
}
} while (true);
$con->disconnect();
}
function run()
{
if (!$this->start()) {
return false;
}
$this->log(LOG_INFO, 'checking for queued notices');
if (common_config('queue','subsystem') == 'stomp') {
$this->stomp_dispatch();
}
else {
$this->db_dispatch();
}
if (!$this->finish()) {
return false;
}
@ -127,7 +182,7 @@ class QueueHandler extends Daemon
sleep($timeout);
}
}
function clear_old_claims()
{
$qi = new Queue_item();
@ -137,10 +192,10 @@ class QueueHandler extends Daemon
$qi->free();
unset($qi);
}
function log($level, $msg)
{
common_log($level, $this->class_name() . ' ('. $this->get_id() .'): '.$msg);
}
}

View File

@ -874,18 +874,76 @@ function common_broadcast_notice($notice, $remote=false)
function common_enqueue_notice($notice)
{
foreach (array('jabber', 'omb', 'sms', 'public', 'twitter', 'facebook', 'ping') as $transport) {
$qi = new Queue_item();
$qi->notice_id = $notice->id;
$qi->transport = $transport;
$qi->created = $notice->created;
$result = $qi->insert();
if (!$result) {
$last_error = &PEAR::getStaticProperty('DB_DataObject','lastError');
common_log(LOG_ERR, 'DB error inserting queue item: ' . $last_error->message);
return false;
if (common_config('queue','subsystem') == 'stomp') {
// use an external message queue system via STOMP
require_once("Stomp.php");
$con = new Stomp(common_config('queue','stomp_server'));
if (!$con->connect()) {
common_log(LOG_ERR, 'Failed to connect to queue server');
return false;
}
$queue_basename = common_config('queue','queue_basename');
foreach (array('jabber', 'omb', 'sms', 'public', 'twitter', 'facebook', 'ping') as $transport) {
if (!$con->send(
'/queue/'.$queue_basename.'-'.$transport, // QUEUE
$notice->id, // BODY of the message
array ( // HEADERS of the msg
'created' => $notice->created
))) {
common_log(LOG_ERR, 'Error sending to '.$transport.' queue');
return false;
}
common_log(LOG_DEBUG, 'complete remote queueing notice ID = ' . $notice->id . ' for ' . $transport);
}
//send tags as headers, so they can be used as JMS selectors
common_log(LOG_DEBUG, 'searching for tags ' . $notice->id);
$tags = array();
$tag = new Notice_tag();
$tag->notice_id = $notice->id;
if ($tag->find()) {
while ($tag->fetch()) {
common_log(LOG_DEBUG, 'tag found = ' . $tag->tag);
array_push($tags,$tag->tag);
}
}
$tag->free();
$con->send('/topic/laconica.'.$notice->profile_id,
$notice->content,
array(
'profile_id' => $notice->profile_id,
'created' => $notice->created,
'tags' => implode($tags,' - ')
)
);
common_log(LOG_DEBUG, 'sent to personal topic ' . $notice->id);
$con->send('/topic/laconica.allusers',
$notice->content,
array(
'profile_id' => $notice->profile_id,
'created' => $notice->created,
'tags' => implode($tags,' - ')
)
);
common_log(LOG_DEBUG, 'sent to catch-all topic ' . $notice->id);
$result = true;
}
else {
// in any other case, 'internal'
foreach (array('jabber', 'omb', 'sms', 'public', 'twitter', 'facebook', 'ping') as $transport) {
$qi = new Queue_item();
$qi->notice_id = $notice->id;
$qi->transport = $transport;
$qi->created = $notice->created;
$result = $qi->insert();
if (!$result) {
$last_error = &PEAR::getStaticProperty('DB_DataObject','lastError');
common_log(LOG_ERR, 'DB error inserting queue item: ' . $last_error->message);
return false;
}
common_log(LOG_DEBUG, 'complete queueing notice ID = ' . $notice->id . ' for ' . $transport);
}
common_log(LOG_DEBUG, 'complete queueing notice ID = ' . $notice->id . ' for ' . $transport);
}
return $result;
}