diff --git a/lib/stompqueuemanager.php b/lib/stompqueuemanager.php index f057bd9e41..8f0091a138 100644 --- a/lib/stompqueuemanager.php +++ b/lib/stompqueuemanager.php @@ -41,6 +41,10 @@ class StompQueueManager extends QueueManager protected $sites = array(); + protected $useTransactions = true; + protected $transaction = null; + protected $transactionCount = 0; + function __construct() { parent::__construct(); @@ -201,6 +205,7 @@ class StompQueueManager extends QueueManager } else { $this->doSubscribe(); } + $this->begin(); return true; } @@ -213,6 +218,9 @@ class StompQueueManager extends QueueManager */ public function finish() { + // If there are any outstanding delivered messages we haven't processed, + // free them for another thread to take. + $this->rollback(); if ($this->sites) { foreach ($this->sites as $server) { StatusNet::init($server); @@ -293,7 +301,9 @@ class StompQueueManager extends QueueManager $notice = Notice::staticGet('id', $id); if (empty($notice)) { $this->_log(LOG_WARNING, "Skipping missing $info"); - $this->con->ack($frame); + $this->ack($frame); + $this->commit(); + $this->begin(); $this->stats('badnotice', $queue); return false; } @@ -308,7 +318,9 @@ class StompQueueManager extends QueueManager $handler = $this->getHandler($queue); if (!$handler) { $this->_log(LOG_ERROR, "Missing handler class; skipping $info"); - $this->con->ack($frame); + $this->ack($frame); + $this->commit(); + $this->begin(); $this->stats('badhandler', $queue); return false; } @@ -320,14 +332,18 @@ class StompQueueManager extends QueueManager // FIXME we probably shouldn't have to do // this kind of queue management ourselves; // if we don't ack, it should resend... - $this->con->ack($frame); + $this->ack($frame); $this->enqueue($item, $queue); + $this->commit(); + $this->begin(); $this->stats('requeued', $queue); return false; } $this->_log(LOG_INFO, "Successfully handled $info"); - $this->con->ack($frame); + $this->ack($frame); + $this->commit(); + $this->begin(); $this->stats('handled', $queue); return true; } @@ -369,5 +385,49 @@ class StompQueueManager extends QueueManager { common_log($level, 'StompQueueManager: '.$msg); } + + protected function begin() + { + if ($this->useTransactions) { + if ($this->transaction) { + throw new Exception("Tried to start transaction in the middle of a transaction"); + } + $this->transactionCount++; + $this->transaction = $this->master->id . '-' . $this->transactionCount . '-' . time(); + $this->con->begin($this->transaction); + } + } + + protected function ack($frame) + { + if ($this->useTransactions) { + if (!$this->transaction) { + throw new Exception("Tried to ack but not in a transaction"); + } + } + $this->con->ack($frame, $this->transaction); + } + + protected function commit() + { + if ($this->useTransactions) { + if (!$this->transaction) { + throw new Exception("Tried to commit but not in a transaction"); + } + $this->con->commit($this->transaction); + $this->transaction = null; + } + } + + protected function rollback() + { + if ($this->useTransactions) { + if (!$this->transaction) { + throw new Exception("Tried to rollback but not in a transaction"); + } + $this->con->commit($this->transaction); + $this->transaction = null; + } + } }