Merge branch 'testing' of gitorious.org:statusnet/mainline into 0.9.x

This commit is contained in:
Brion Vibber 2010-01-22 12:59:52 -08:00
commit aa038fa9cb

View File

@ -41,6 +41,10 @@ class StompQueueManager extends QueueManager
protected $sites = array(); protected $sites = array();
protected $useTransactions = true;
protected $transaction = null;
protected $transactionCount = 0;
function __construct() function __construct()
{ {
parent::__construct(); parent::__construct();
@ -201,6 +205,7 @@ class StompQueueManager extends QueueManager
} else { } else {
$this->doSubscribe(); $this->doSubscribe();
} }
$this->begin();
return true; return true;
} }
@ -213,6 +218,9 @@ class StompQueueManager extends QueueManager
*/ */
public function finish() public function finish()
{ {
// If there are any outstanding delivered messages we haven't processed,
// free them for another thread to take.
$this->rollback();
if ($this->sites) { if ($this->sites) {
foreach ($this->sites as $server) { foreach ($this->sites as $server) {
StatusNet::init($server); StatusNet::init($server);
@ -293,7 +301,9 @@ class StompQueueManager extends QueueManager
$notice = Notice::staticGet('id', $id); $notice = Notice::staticGet('id', $id);
if (empty($notice)) { if (empty($notice)) {
$this->_log(LOG_WARNING, "Skipping missing $info"); $this->_log(LOG_WARNING, "Skipping missing $info");
$this->con->ack($frame); $this->ack($frame);
$this->commit();
$this->begin();
$this->stats('badnotice', $queue); $this->stats('badnotice', $queue);
return false; return false;
} }
@ -308,7 +318,9 @@ class StompQueueManager extends QueueManager
$handler = $this->getHandler($queue); $handler = $this->getHandler($queue);
if (!$handler) { if (!$handler) {
$this->_log(LOG_ERROR, "Missing handler class; skipping $info"); $this->_log(LOG_ERROR, "Missing handler class; skipping $info");
$this->con->ack($frame); $this->ack($frame);
$this->commit();
$this->begin();
$this->stats('badhandler', $queue); $this->stats('badhandler', $queue);
return false; return false;
} }
@ -320,14 +332,18 @@ class StompQueueManager extends QueueManager
// FIXME we probably shouldn't have to do // FIXME we probably shouldn't have to do
// this kind of queue management ourselves; // this kind of queue management ourselves;
// if we don't ack, it should resend... // if we don't ack, it should resend...
$this->con->ack($frame); $this->ack($frame);
$this->enqueue($item, $queue); $this->enqueue($item, $queue);
$this->commit();
$this->begin();
$this->stats('requeued', $queue); $this->stats('requeued', $queue);
return false; return false;
} }
$this->_log(LOG_INFO, "Successfully handled $info"); $this->_log(LOG_INFO, "Successfully handled $info");
$this->con->ack($frame); $this->ack($frame);
$this->commit();
$this->begin();
$this->stats('handled', $queue); $this->stats('handled', $queue);
return true; return true;
} }
@ -369,5 +385,49 @@ class StompQueueManager extends QueueManager
{ {
common_log($level, 'StompQueueManager: '.$msg); common_log($level, 'StompQueueManager: '.$msg);
} }
protected function begin()
{
if ($this->useTransactions) {
if ($this->transaction) {
throw new Exception("Tried to start transaction in the middle of a transaction");
}
$this->transactionCount++;
$this->transaction = $this->master->id . '-' . $this->transactionCount . '-' . time();
$this->con->begin($this->transaction);
}
}
protected function ack($frame)
{
if ($this->useTransactions) {
if (!$this->transaction) {
throw new Exception("Tried to ack but not in a transaction");
}
}
$this->con->ack($frame, $this->transaction);
}
protected function commit()
{
if ($this->useTransactions) {
if (!$this->transaction) {
throw new Exception("Tried to commit but not in a transaction");
}
$this->con->commit($this->transaction);
$this->transaction = null;
}
}
protected function rollback()
{
if ($this->useTransactions) {
if (!$this->transaction) {
throw new Exception("Tried to rollback but not in a transaction");
}
$this->con->commit($this->transaction);
$this->transaction = null;
}
}
} }