Make stomp queue manager work with basic servers

Signed-off-by: Eugene Crosser <crosser@average.org>
This commit is contained in:
Eugene Crosser 2010-03-29 15:25:57 +04:00 committed by Brion Vibber
parent 873b832827
commit bd1363c17a
2 changed files with 18 additions and 7 deletions

6
README
View File

@ -970,6 +970,12 @@ max_retries: for stomp, drop messages after N failed attempts to process.
dead_letter_dir: for stomp, optional directory to dump data on failed dead_letter_dir: for stomp, optional directory to dump data on failed
queue processing events after discarding them. queue processing events after discarding them.
stomp_no_transactions: for stomp, the server does not support transactions,
so do not try to user them. This is needed for http://www.morbidq.com/.
stomp_no_acks: for stomp, the server does not support acknowledgements.
so do not try to user them. This is needed for http://www.morbidq.com/.
license license
------- -------

View File

@ -39,7 +39,8 @@ class StompQueueManager extends QueueManager
protected $base; protected $base;
protected $control; protected $control;
protected $useTransactions = true; protected $useTransactions;
protected $useAcks;
protected $sites = array(); protected $sites = array();
protected $subscriptions = array(); protected $subscriptions = array();
@ -64,6 +65,8 @@ class StompQueueManager extends QueueManager
$this->base = common_config('queue', 'queue_basename'); $this->base = common_config('queue', 'queue_basename');
$this->control = common_config('queue', 'control_channel'); $this->control = common_config('queue', 'control_channel');
$this->breakout = common_config('queue', 'breakout'); $this->breakout = common_config('queue', 'breakout');
$this->useTransactions = !common_config('queue', 'stomp_no_transactions');
$this->useAcks = !common_config('queue', 'stomp_no_acks');
} }
/** /**
@ -703,13 +706,15 @@ class StompQueueManager extends QueueManager
protected function ack($idx, $frame) protected function ack($idx, $frame)
{ {
if ($this->useTransactions) { if ($this->useAcks) {
if (empty($this->transaction[$idx])) { if ($this->useTransactions) {
throw new Exception("Tried to ack but not in a transaction"); if (empty($this->transaction[$idx])) {
throw new Exception("Tried to ack but not in a transaction");
}
$this->cons[$idx]->ack($frame, $this->transaction[$idx]);
} else {
$this->cons[$idx]->ack($frame);
} }
$this->cons[$idx]->ack($frame, $this->transaction[$idx]);
} else {
$this->cons[$idx]->ack($frame);
} }
} }