From eccab870444463734b8fcd7ec0f1cf149a0f6a57 Mon Sep 17 00:00:00 2001 From: Evan Prodromou Date: Thu, 9 Jul 2009 12:33:38 -0400 Subject: [PATCH 1/2] slightly more robust select() logic --- lib/stompqueuemanager.php | 72 +++++++++++++++++++-------------------- 1 file changed, 36 insertions(+), 36 deletions(-) diff --git a/lib/stompqueuemanager.php b/lib/stompqueuemanager.php index 7da7c00110..ac55f9733e 100644 --- a/lib/stompqueuemanager.php +++ b/lib/stompqueuemanager.php @@ -108,57 +108,57 @@ class StompQueueManager $handsocks = $handler->getSockets(); - $this->_log(LOG_DEBUG, "Got ".count($handsocks)." sockets from handler."); - $this->_log(LOG_DEBUG, print_r($handsocks, true)); - $socks = array_merge(array($stompsock), $handsocks); $read = $socks; $write = array(); $except = array(); - $this->_log(LOG_DEBUG, "Starting select"); $ready = stream_select($read, $write, $except, $handler->timeout(), 0); - $this->_log(LOG_DEBUG, "Finished select with value '$ready'"); - if (!$ready || $read[0] !== $stompsock) { - $handler->idle(QUEUE_HANDLER_MISS_IDLE); - } else { - $frame = $this->con->readFrame(); - - if (!empty($frame)) { - $notice = Notice::staticGet('id', $frame->body); - - if (empty($notice)) { - $this->_log(LOG_WARNING, 'Got ID '. $frame->body .' for non-existent notice in queue '. $queue); - $this->con->ack($frame); - } else { - if ($handler->handle_notice($notice)) { - $this->_log(LOG_INFO, 'Successfully handled notice '. $notice->id .' posted at ' . $frame->headers['created'] . ' in queue '. $queue); - $this->con->ack($frame); - } else { - $this->_log(LOG_WARNING, 'Failed handling notice '. $notice->id .' posted at ' . $frame->headers['created'] . ' in queue '. $queue); - // FIXME we probably shouldn't have to do - // this kind of queue management ourselves - $this->con->ack($frame); - $this->enqueue($notice, $queue); - } - unset($notice); - } - - unset($frame); - - $handler->idle(QUEUE_HANDLER_HIT_IDLE); - - } else { - $handler->idle(QUEUE_HANDLER_MISS_IDLE); + if ($ready === false) { + $this->_log(LOG_ERR, "Error selecting on sockets"); + } else if ($ready > 0) { + if (in_array($stompsock, $read)) { + $this->_handleNotice($queue, $handler); } + $handler->idle(QUEUE_HANDLER_HIT_IDLE); + } else { // timeout + $handler->idle(QUEUE_HANDLER_MISS_IDLE); } } $this->con->unsubscribe($this->_queueName($queue)); } + function _handleNotice($queue, $handler) + { + $frame = $this->con->readFrame(); + + if (!empty($frame)) { + $notice = Notice::staticGet('id', $frame->body); + + if (empty($notice)) { + $this->_log(LOG_WARNING, 'Got ID '. $frame->body .' for non-existent notice in queue '. $queue); + $this->con->ack($frame); + } else { + if ($handler->handle_notice($notice)) { + $this->_log(LOG_INFO, 'Successfully handled notice '. $notice->id .' posted at ' . $frame->headers['created'] . ' in queue '. $queue); + $this->con->ack($frame); + } else { + $this->_log(LOG_WARNING, 'Failed handling notice '. $notice->id .' posted at ' . $frame->headers['created'] . ' in queue '. $queue); + // FIXME we probably shouldn't have to do + // this kind of queue management ourselves + $this->con->ack($frame); + $this->enqueue($notice, $queue); + } + unset($notice); + } + + unset($frame); + } + } + function _queueName($queue) { return common_config('queue', 'queue_basename') . $queue; From 031146f4c757ca00fb9f528abf69ef4b229bdc18 Mon Sep 17 00:00:00 2001 From: Evan Prodromou Date: Thu, 9 Jul 2009 12:49:37 -0400 Subject: [PATCH 2/2] yet another select() refinement --- lib/stompqueuemanager.php | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/lib/stompqueuemanager.php b/lib/stompqueuemanager.php index ac55f9733e..d13af3fa52 100644 --- a/lib/stompqueuemanager.php +++ b/lib/stompqueuemanager.php @@ -122,9 +122,12 @@ class StompQueueManager if (in_array($stompsock, $read)) { $this->_handleNotice($queue, $handler); } - $handler->idle(QUEUE_HANDLER_HIT_IDLE); - } else { // timeout - $handler->idle(QUEUE_HANDLER_MISS_IDLE); + foreach ($handsocks as $sock) { + if (in_array($sock, $read)) { + $handler->idle(QUEUE_HANDLER_HIT_IDLE); + break; + } + } } }